可觀察對象對在應用的各個部分之間傳遞消息提供了支持。 它們在 Angular 中頻繁使用,并且推薦把它們用于事件處理、異步編程以及處理多個值等場景。
觀察者(Observer
)模式是一個軟件設計模式,它有一個對象,稱之為主體 Subject
,負責維護一個依賴項(稱之為觀察者 Observer
)的列表,并且在狀態(tài)變化時自動通知它們。 該模式和發(fā)布/訂閱模式非常相似(但不完全一樣)。
可觀察對象是聲明式的 —— 也就是說,雖然你定義了一個用于發(fā)布值的函數(shù),但是在有消費者訂閱它之前,這個函數(shù)并不會實際執(zhí)行。 訂閱之后,當這個函數(shù)執(zhí)行完或取消訂閱時,訂閱者就會收到通知。
可觀察對象可以發(fā)送多個任意類型的值 —— 字面量、消息、事件。無論這些值是同步發(fā)送的還是異步發(fā)送的,接收這些值的 API 都是一樣的。 由于準備(setup
)和清場(teardown
)的邏輯都是由可觀察對象自己處理的,因此你的應用代碼只管訂閱并消費這些值就可以了,做完之后,取消訂閱。無論這個流是擊鍵流、HTTP 響應流還是定時器,對這些值進行監(jiān)聽和停止監(jiān)聽的接口都是一樣的。
由于這些優(yōu)點,可觀察對象在 Angular 中得到廣泛使用,也同樣建議應用開發(fā)者好好使用它。
作為發(fā)布者,你創(chuàng)建一個 Observable 的實例,其中定義了一個訂閱者(subscriber
)函數(shù)。 當有消費者調(diào)用 subscribe()
方法時,這個函數(shù)就會執(zhí)行。 訂閱者函數(shù)用于定義“如何獲取或生成那些要發(fā)布的值或消息”。
要執(zhí)行所創(chuàng)建的可觀察對象,并開始從中接收通知,你就要調(diào)用它的 subscribe()
方法,并傳入一個觀察者(observer
)。 這是一個 JavaScript 對象,它定義了你收到的這些消息的處理器(handler
)。 subscribe()
調(diào)用會返回一個 Subscription
對象,該對象具有一個 unsubscribe()
方法。 當調(diào)用該方法時,你就會停止接收通知。
下面這個例子中示范了這種基本用法,它展示了如何使用可觀察對象來對當前地理位置進行更新。
//Observe geolocation updates
// Create an Observable that will start listening to geolocation updates
// when a consumer subscribes.
const locations = new Observable((observer) => {
let watchId: number;
// Simple geolocation API check provides values to publish
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition((position: Position) => {
observer.next(position);
}, (error: PositionError) => {
observer.error(error);
});
} else {
observer.error('Geolocation not available');
}
// When the consumer unsubscribes, clean up data ready for next subscription.
return {
unsubscribe() {
navigator.geolocation.clearWatch(watchId);
}
};
});
// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
next(position) {
console.log('Current Position: ', position);
},
error(msg) {
console.log('Error Getting Location: ', msg);
}
});
// Stop listening for location after 10 seconds
setTimeout(() => {
locationsSubscription.unsubscribe();
}, 10000);
用于接收可觀察對象通知的處理器要實現(xiàn) Observer
接口。這個對象定義了一些回調(diào)函數(shù)來處理可觀察對象可能會發(fā)來的三種通知:
通知類型 | 說明 |
---|---|
next |
必要。用來處理每個送達值。在開始執(zhí)行后可能執(zhí)行零次或多次。 |
error |
可選。用來處理錯誤通知。錯誤會中斷這個可觀察對象實例的執(zhí)行過程。 |
complete |
可選。用來處理執(zhí)行完畢(complete)通知。當執(zhí)行完畢后,這些值就會繼續(xù)傳給下一個處理器。 |
觀察者對象可以定義這三種處理器的任意組合。如果你不為某種通知類型提供處理器,這個觀察者就會忽略相應類型的通知。
只有當有人訂閱 Observable
的實例時,它才會開始發(fā)布值。 訂閱時要先調(diào)用該實例的 subscribe()
方法,并把一個觀察者對象傳給它,用來接收通知。
為了展示訂閱的原理,我們需要創(chuàng)建新的可觀察對象。它有一個構造函數(shù)可以用來創(chuàng)建新實例,但是為了更簡明,也可以使用
Observable
上定義的一些靜態(tài)方法來創(chuàng)建一些常用的簡單可觀察對象:
- `of(...items)` —— 返回一個 `Observable` 實例,它用同步的方式把參數(shù)中提供的這些值發(fā)送出來。
- `from(iterable)` —— 把它的參數(shù)轉換成一個 `Observable` 實例。 該方法通常用于把一個數(shù)組轉換成一個(發(fā)送多個值的)可觀察對象。
下面的例子會創(chuàng)建并訂閱一個簡單的可觀察對象,它的觀察者會把接收到的消息記錄到控制臺中:
//Subscribe using observer
// Create simple observable that emits three values
const myObservable = of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification
另外,subscribe()
方法還可以接收定義在同一行中的回調(diào)函數(shù),無論 next
、error
還是 complete
處理器。比如,下面的 subscribe()
調(diào)用和前面指定預定義觀察者的例子是等價的。
//Subscribe with positional arguments
myObservable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
無論哪種情況,next
處理器都是必要的,而 error
和 complete
處理器是可選的。
注意,next()
函數(shù)可以接受消息字符串、事件對象、數(shù)字值或各種結構,具體類型取決于上下文。 為了更通用一點,我們把由可觀察對象發(fā)布出來的數(shù)據(jù)統(tǒng)稱為流。任何類型的值都可以表示為可觀察對象,而這些值會被發(fā)布為一個流。
使用 Observable
構造函數(shù)可以創(chuàng)建任何類型的可觀察流。 當執(zhí)行可觀察對象的 subscribe()
方法時,這個構造函數(shù)就會把它接收到的參數(shù)作為訂閱函數(shù)來運行。 訂閱函數(shù)會接收一個 Observer
對象,并把值發(fā)布給觀察者的 next()
方法。
比如,要創(chuàng)建一個與前面的 of(1, 2, 3) 等價的可觀察對象,你可以這樣做:
//Create observable with constructor
// This function runs when subscribe() is called
function sequenceSubscriber(observer) {
// synchronously deliver 1, 2, and 3, then complete
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
// unsubscribe function doesn't need to do anything in this
// because values are delivered synchronously
return {unsubscribe() {}};
}
// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);
// execute the Observable and print the result of each notification
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// 1
// 2
// 3
// Finished sequence
如果要略微加強這個例子,我們可以創(chuàng)建一個用來發(fā)布事件的可觀察對象。在這個例子中,訂閱函數(shù)是用內(nèi)聯(lián)方式定義的。
//Create with custom fromEvent function
function fromEvent(target, eventName) {
return new Observable((observer) => {
const handler = (e) => observer.next(e);
// Add the event handler to the target
target.addEventListener(eventName, handler);
return () => {
// Detach the event handler from the target
target.removeEventListener(eventName, handler);
};
});
}
現(xiàn)在,你就可以使用這個函數(shù)來創(chuàng)建可發(fā)布 keydown
事件的可觀察對象了:
//Use custom fromEvent function
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});
典型的可觀察對象會為每一個觀察者創(chuàng)建一次新的、獨立的執(zhí)行。 當觀察者進行訂閱時,該可觀察對象會連上一個事件處理器,并且向那個觀察者發(fā)送一些值。當?shù)诙€觀察者訂閱時,這個可觀察對象就會連上一個新的事件處理器,并獨立執(zhí)行一次,把這些值發(fā)送給第二個可觀察對象。
有時候,不應該對每一個訂閱者都獨立執(zhí)行一次,你可能會希望每次訂閱都得到同一批值 —— 即使是那些你已經(jīng)發(fā)送過的。這在某些情況下有用,比如用來發(fā)送 document 上的點擊事件的可觀察對象。
多播用來讓可觀察對象在一次執(zhí)行中同時廣播給多個訂閱者。借助支持多播的可觀察對象,你不必注冊多個監(jiān)聽器,而是復用第一個(next
)監(jiān)聽器,并且把值發(fā)送給各個訂閱者。
當創(chuàng)建可觀察對象時,你要決定你希望別人怎么用這個對象以及是否對它的值進行多播。
來看一個從 1 到 3 進行計數(shù)的例子,它每發(fā)出一個數(shù)字就會等待 1 秒。
//Create a delayed sequence
function sequenceSubscriber(observer) {
const seq = [1, 2, 3];
let timeoutId;
// Will run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(arr, idx) {
timeoutId = setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(arr, ++idx);
}
}, 1000);
}
doSequence(seq, 0);
// Unsubscribe should clear the timeout to stop execution
return {unsubscribe() {
clearTimeout(timeoutId);
}};
}
// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence
注意,如果你訂閱了兩次,就會有兩個獨立的流,每個流都會每秒發(fā)出一個數(shù)字。代碼如下:
//Two subscriptions
// Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1/2 second, subscribe again.
setTimeout(() => {
sequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished
修改這個可觀察對象以支持多播,代碼如下:
//Create a multicast subscriber
function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
// Keep track of each observer (one for every active subscription)
const observers = [];
// Still a single timeoutId because there will only ever be one
// set of values being generated, multicasted to each subscriber
let timeoutId;
// Return the subscriber function (runs when subscribe()
// function is invoked)
return (observer) => {
observers.push(observer);
// When this is the first subscription, start the sequence
if (observers.length === 1) {
timeoutId = doSequence({
next(val) {
// Iterate through observers and notify all subscriptions
observers.forEach(obs => obs.next(val));
},
complete() {
// Notify all complete callbacks
observers.slice(0).forEach(obs => obs.complete());
}
}, seq, 0);
}
return {
unsubscribe() {
// Remove from the observers array so it's no longer notified
observers.splice(observers.indexOf(observer), 1);
// If there's no more listeners, do cleanup
if (observers.length === 0) {
clearTimeout(timeoutId);
}
}
};
};
}
// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, ++idx);
}
}, 1000);
}
// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber());
// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
multicastSequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 1500);
// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
雖然支持多播的可觀察對象需要做更多的準備工作,但對某些應用來說,這非常有用。稍后我們會介紹一些簡化多播的工具,它們讓你能接收任何可觀察對象,并把它變成支持多播的。
由于可觀察對象會異步生成值,所以用 try/catch
是無法捕獲錯誤的。你應該在觀察者中指定一個 error
回調(diào)來處理錯誤。發(fā)生錯誤時還會導致可觀察對象清理現(xiàn)有的訂閱,并且停止生成值??捎^察對象可以生成值(調(diào)用 next
回調(diào)),也可以調(diào)用 complete
或 error
回調(diào)來主動結束。
myObservable.subscribe({
next(num) { console.log('Next num: ' + num)},
error(err) { console.log('Received an errror: ' + err)}
});
更多建議: