Angular9 Observable 概覽

2020-07-02 14:49 更新

使用可觀察對象(Observable)來傳遞值

可觀察對象對在應用的各個部分之間傳遞消息提供了支持。 它們在 Angular 中頻繁使用,并且推薦把它們用于事件處理、異步編程以及處理多個值等場景。

觀察者(Observer)模式是一個軟件設計模式,它有一個對象,稱之為主體 Subject,負責維護一個依賴項(稱之為觀察者 Observer)的列表,并且在狀態(tài)變化時自動通知它們。 該模式和發(fā)布/訂閱模式非常相似(但不完全一樣)。

可觀察對象是聲明式的 —— 也就是說,雖然你定義了一個用于發(fā)布值的函數(shù),但是在有消費者訂閱它之前,這個函數(shù)并不會實際執(zhí)行。 訂閱之后,當這個函數(shù)執(zhí)行完或取消訂閱時,訂閱者就會收到通知。

可觀察對象可以發(fā)送多個任意類型的值 —— 字面量、消息、事件。無論這些值是同步發(fā)送的還是異步發(fā)送的,接收這些值的 API 都是一樣的。 由于準備(setup)和清場(teardown)的邏輯都是由可觀察對象自己處理的,因此你的應用代碼只管訂閱并消費這些值就可以了,做完之后,取消訂閱。無論這個流是擊鍵流、HTTP 響應流還是定時器,對這些值進行監(jiān)聽和停止監(jiān)聽的接口都是一樣的。

由于這些優(yōu)點,可觀察對象在 Angular 中得到廣泛使用,也同樣建議應用開發(fā)者好好使用它。

基本用法和詞匯

作為發(fā)布者,你創(chuàng)建一個 Observable 的實例,其中定義了一個訂閱者(subscriber)函數(shù)。 當有消費者調(diào)用 subscribe() 方法時,這個函數(shù)就會執(zhí)行。 訂閱者函數(shù)用于定義“如何獲取或生成那些要發(fā)布的值或消息”。

要執(zhí)行所創(chuàng)建的可觀察對象,并開始從中接收通知,你就要調(diào)用它的 subscribe() 方法,并傳入一個觀察者(observer)。 這是一個 JavaScript 對象,它定義了你收到的這些消息的處理器(handler)。 subscribe() 調(diào)用會返回一個 Subscription 對象,該對象具有一個 unsubscribe() 方法。 當調(diào)用該方法時,你就會停止接收通知。

下面這個例子中示范了這種基本用法,它展示了如何使用可觀察對象來對當前地理位置進行更新。

//Observe geolocation updates


// Create an Observable that will start listening to geolocation updates
// when a consumer subscribes.
const locations = new Observable((observer) => {
  let watchId: number;


  // Simple geolocation API check provides values to publish
  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition((position: Position) => {
      observer.next(position);
    }, (error: PositionError) => {
      observer.error(error);
    });
  } else {
    observer.error('Geolocation not available');
  }


  // When the consumer unsubscribes, clean up data ready for next subscription.
  return {
    unsubscribe() {
      navigator.geolocation.clearWatch(watchId);
    }
  };
});


// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
  next(position) {
    console.log('Current Position: ', position);
  },
  error(msg) {
    console.log('Error Getting Location: ', msg);
  }
});


// Stop listening for location after 10 seconds
setTimeout(() => {
  locationsSubscription.unsubscribe();
}, 10000);

定義觀察者

用于接收可觀察對象通知的處理器要實現(xiàn) Observer 接口。這個對象定義了一些回調(diào)函數(shù)來處理可觀察對象可能會發(fā)來的三種通知:

通知類型 說明
next 必要。用來處理每個送達值。在開始執(zhí)行后可能執(zhí)行零次或多次。
error 可選。用來處理錯誤通知。錯誤會中斷這個可觀察對象實例的執(zhí)行過程。
complete 可選。用來處理執(zhí)行完畢(complete)通知。當執(zhí)行完畢后,這些值就會繼續(xù)傳給下一個處理器。

觀察者對象可以定義這三種處理器的任意組合。如果你不為某種通知類型提供處理器,這個觀察者就會忽略相應類型的通知。

訂閱

只有當有人訂閱 Observable 的實例時,它才會開始發(fā)布值。 訂閱時要先調(diào)用該實例的 subscribe() 方法,并把一個觀察者對象傳給它,用來接收通知。

為了展示訂閱的原理,我們需要創(chuàng)建新的可觀察對象。它有一個構造函數(shù)可以用來創(chuàng)建新實例,但是為了更簡明,也可以使用 Observable 上定義的一些靜態(tài)方法來創(chuàng)建一些常用的簡單可觀察對象:

- `of(...items)` —— 返回一個 `Observable` 實例,它用同步的方式把參數(shù)中提供的這些值發(fā)送出來。

- `from(iterable)` —— 把它的參數(shù)轉換成一個 `Observable` 實例。 該方法通常用于把一個數(shù)組轉換成一個(發(fā)送多個值的)可觀察對象。

下面的例子會創(chuàng)建并訂閱一個簡單的可觀察對象,它的觀察者會把接收到的消息記錄到控制臺中:

//Subscribe using observer


// Create simple observable that emits three values
const myObservable = of(1, 2, 3);


// Create observer object
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};


// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

另外,subscribe() 方法還可以接收定義在同一行中的回調(diào)函數(shù),無論 next、error 還是 complete 處理器。比如,下面的 subscribe() 調(diào)用和前面指定預定義觀察者的例子是等價的。

//Subscribe with positional arguments


myObservable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

無論哪種情況,next 處理器都是必要的,而 errorcomplete 處理器是可選的。

注意,next() 函數(shù)可以接受消息字符串、事件對象、數(shù)字值或各種結構,具體類型取決于上下文。 為了更通用一點,我們把由可觀察對象發(fā)布出來的數(shù)據(jù)統(tǒng)稱為流。任何類型的值都可以表示為可觀察對象,而這些值會被發(fā)布為一個流。

創(chuàng)建可觀察對象

使用 Observable 構造函數(shù)可以創(chuàng)建任何類型的可觀察流。 當執(zhí)行可觀察對象的 subscribe() 方法時,這個構造函數(shù)就會把它接收到的參數(shù)作為訂閱函數(shù)來運行。 訂閱函數(shù)會接收一個 Observer 對象,并把值發(fā)布給觀察者的 next() 方法。

比如,要創(chuàng)建一個與前面的 of(1, 2, 3) 等價的可觀察對象,你可以這樣做:

//Create observable with constructor


// This function runs when subscribe() is called
function sequenceSubscriber(observer) {
  // synchronously deliver 1, 2, and 3, then complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();


  // unsubscribe function doesn't need to do anything in this
  // because values are delivered synchronously
  return {unsubscribe() {}};
}


// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);


// execute the Observable and print the result of each notification
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});


// Logs:
// 1
// 2
// 3
// Finished sequence

如果要略微加強這個例子,我們可以創(chuàng)建一個用來發(fā)布事件的可觀察對象。在這個例子中,訂閱函數(shù)是用內(nèi)聯(lián)方式定義的。

//Create with custom fromEvent function


function fromEvent(target, eventName) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);


    // Add the event handler to the target
    target.addEventListener(eventName, handler);


    return () => {
      // Detach the event handler from the target
      target.removeEventListener(eventName, handler);
    };
  });
}

現(xiàn)在,你就可以使用這個函數(shù)來創(chuàng)建可發(fā)布 keydown 事件的可觀察對象了:

//Use custom fromEvent function


const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;


const subscription = fromEvent(nameInput, 'keydown')
  .subscribe((e: KeyboardEvent) => {
    if (e.keyCode === ESC_KEY) {
      nameInput.value = '';
    }
  });

多播

典型的可觀察對象會為每一個觀察者創(chuàng)建一次新的、獨立的執(zhí)行。 當觀察者進行訂閱時,該可觀察對象會連上一個事件處理器,并且向那個觀察者發(fā)送一些值。當?shù)诙€觀察者訂閱時,這個可觀察對象就會連上一個新的事件處理器,并獨立執(zhí)行一次,把這些值發(fā)送給第二個可觀察對象。

有時候,不應該對每一個訂閱者都獨立執(zhí)行一次,你可能會希望每次訂閱都得到同一批值 —— 即使是那些你已經(jīng)發(fā)送過的。這在某些情況下有用,比如用來發(fā)送 document 上的點擊事件的可觀察對象。

多播用來讓可觀察對象在一次執(zhí)行中同時廣播給多個訂閱者。借助支持多播的可觀察對象,你不必注冊多個監(jiān)聽器,而是復用第一個(next)監(jiān)聽器,并且把值發(fā)送給各個訂閱者。

當創(chuàng)建可觀察對象時,你要決定你希望別人怎么用這個對象以及是否對它的值進行多播。

來看一個從 1 到 3 進行計數(shù)的例子,它每發(fā)出一個數(shù)字就會等待 1 秒。

//Create a delayed sequence


function sequenceSubscriber(observer) {
  const seq = [1, 2, 3];
  let timeoutId;


  // Will run through an array of numbers, emitting one value
  // per second until it gets to the end of the array.
  function doSequence(arr, idx) {
    timeoutId = setTimeout(() => {
      observer.next(arr[idx]);
      if (idx === arr.length - 1) {
        observer.complete();
      } else {
        doSequence(arr, ++idx);
      }
    }, 1000);
  }


  doSequence(seq, 0);


  // Unsubscribe should clear the timeout to stop execution
  return {unsubscribe() {
    clearTimeout(timeoutId);
  }};
}


// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);


sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});


// Logs:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence

注意,如果你訂閱了兩次,就會有兩個獨立的流,每個流都會每秒發(fā)出一個數(shù)字。代碼如下:

//Two subscriptions


// Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});


// After 1/2 second, subscribe again.
setTimeout(() => {
  sequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 500);


// Logs:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished

修改這個可觀察對象以支持多播,代碼如下:

//Create a multicast subscriber


function multicastSequenceSubscriber() {
  const seq = [1, 2, 3];
  // Keep track of each observer (one for every active subscription)
  const observers = [];
  // Still a single timeoutId because there will only ever be one
  // set of values being generated, multicasted to each subscriber
  let timeoutId;


  // Return the subscriber function (runs when subscribe()
  // function is invoked)
  return (observer) => {
    observers.push(observer);
    // When this is the first subscription, start the sequence
    if (observers.length === 1) {
      timeoutId = doSequence({
        next(val) {
          // Iterate through observers and notify all subscriptions
          observers.forEach(obs => obs.next(val));
        },
        complete() {
          // Notify all complete callbacks
          observers.slice(0).forEach(obs => obs.complete());
        }
      }, seq, 0);
    }


    return {
      unsubscribe() {
        // Remove from the observers array so it's no longer notified
        observers.splice(observers.indexOf(observer), 1);
        // If there's no more listeners, do cleanup
        if (observers.length === 0) {
          clearTimeout(timeoutId);
        }
      }
    };
  };
}


// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer, arr, idx) {
  return setTimeout(() => {
    observer.next(arr[idx]);
    if (idx === arr.length - 1) {
      observer.complete();
    } else {
      doSequence(observer, arr, ++idx);
    }
  }, 1000);
}


// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber());


// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});


// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
  multicastSequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 1500);


// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished

雖然支持多播的可觀察對象需要做更多的準備工作,但對某些應用來說,這非常有用。稍后我們會介紹一些簡化多播的工具,它們讓你能接收任何可觀察對象,并把它變成支持多播的。

錯誤處理

由于可觀察對象會異步生成值,所以用 try/catch 是無法捕獲錯誤的。你應該在觀察者中指定一個 error 回調(diào)來處理錯誤。發(fā)生錯誤時還會導致可觀察對象清理現(xiàn)有的訂閱,并且停止生成值??捎^察對象可以生成值(調(diào)用 next 回調(diào)),也可以調(diào)用 completeerror 回調(diào)來主動結束。

myObservable.subscribe({
  next(num) { console.log('Next num: ' + num)},
  error(err) { console.log('Received an errror: ' + err)}
});
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號