NestJS 隊列

2023-09-08 17:41 更新

隊列是一種有用的設(shè)計模式,可以幫助你處理一般應(yīng)用規(guī)模和性能的挑戰(zhàn)。一些隊列可以幫助你處理的問題示例包括:

  • 平滑輸出峰值。例如,如果用戶可以在任何時間創(chuàng)建資源敏感型任務(wù),你可以將其添加到一個消息隊列中而不是同步執(zhí)行。然后你可以通過工作者進程從隊列中以一個可控的方式取出進程。在應(yīng)用規(guī)模增大時,你可以輕松添加新的隊列消費者來提高后端任務(wù)處理能力。
  • 將可能阻塞Node.js事件循環(huán)的整體任務(wù)打碎。例如,如果一個用戶請求是 CPU 敏感型工作,例如音頻轉(zhuǎn)碼,你可以將其委托給其他進程,從而保證用戶接口進程保持響應(yīng)。
  • 在不同的服務(wù)間提供一個可信的通訊通道。例如,你可以將任務(wù)(工作)加入一個進程或服務(wù),并由另一個進程或服務(wù)來消費他們。你可以在由其他任何進程或服務(wù)執(zhí)行的工作完成、錯誤或者其他狀態(tài)變化時得到通知(通過監(jiān)聽狀態(tài)事件)。當(dāng)隊列生產(chǎn)者或者消費者失敗時,他們的狀態(tài)會被保留,任務(wù)將在 node 重啟后自動重啟。

Nest 提供了@nestjs/bull包,這是Bull包的一個包裝器,Bull 是一個流行的、支持良好的、高性能的基于 Nodejs 的消息隊列系統(tǒng)應(yīng)用。該包將 Bull 隊列以 Nest 友好的方式添加到你的應(yīng)用中。

Bull 使用Redis持久化工作數(shù)據(jù),因此你需要在你的系統(tǒng)中安裝 Redis。因為他是基于 Redis 的,你的隊列結(jié)構(gòu)可以是完全分布式的并且和平臺無關(guān)。例如,你可以有一些隊列生產(chǎn)者消費者監(jiān)聽者,他們運行在 Nest 的一個或多個節(jié)點上,同時,其他生產(chǎn)者、消費者和監(jiān)聽者在其他 Node.js 平臺或者其他網(wǎng)絡(luò)節(jié)點上。

本章使用@nestjs/bull包,我們同時推薦閱讀BUll 文檔來獲取更多背景和應(yīng)用細(xì)節(jié)。

安裝

要開始使用,我們首先安裝需要的依賴:

$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull

一旦安裝過程完成,我們可以在根AppModule中導(dǎo)入BullModule。

app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}

registerQueue()方法用于實例化并/或注冊隊列。隊列在不同的模塊和進程之間共享,在底層則通過同樣的憑據(jù)連接到同樣的 Redis 數(shù)據(jù)庫。每個隊列由其name屬性區(qū)分(如下),當(dāng)共享隊列(跨模塊/進程)時,第一個registerQueue()方法同時實例化該隊列并向模塊注冊它。其他模塊(在相同或者不同進程下)則簡單地注冊隊列。隊列注冊創(chuàng)建一個injection token,它可以被用在給定 Nest 模塊中獲取隊列。

針對每個隊列,傳遞一個包含下列屬性的配置對象:

-name:string- 一個隊列名稱,它可以被用作injection token(用于將隊列注冊到控制器/提供者),也可以作為裝飾器參數(shù)來將消費者類和監(jiān)聽者與隊列聯(lián)系起來。是必須的。 -limiter:RateLimiter-該選項用于確定消息隊列處理速率,查看RateLimiter獲取更多信息??蛇x的。 -redis:RedisOpts-該選項用于配置 Redis 連接,查看RedisOpts獲取更多信息??蛇x的。 -prefix: string-隊列所有鍵的前綴。可選的。 -defaultJobOptions: JobOpts-選項用以控制新任務(wù)的默認(rèn)屬性。查看JobOpts獲取更多信息??蛇x的。 -settings: AdvancedSettings-高級隊列配置設(shè)置。這些通常不需要改變。查看AdvancedSettings獲取更多信息??蛇x的。

注意,name屬性是必須的。其他選項是可選的,為隊列行為提供更細(xì)節(jié)的控制。這些會直接傳遞給 Bull 的Queue構(gòu)造器。在這里閱讀更多選項。當(dāng)在第二個或者子模塊中注冊一個隊列時,最佳時間是省略配置對象中除name屬性之外的所有選項。這些選項僅應(yīng)該在實例化隊列的模塊中確定。

在registerQueue()方法中傳遞多個逗號分隔的選項對象來創(chuàng)建多個隊列。

由于任務(wù)在 Redis 中是持久化的,每次當(dāng)一個特定名稱的隊列被實例化時(例如,當(dāng)一個 app 啟動/重啟時),它嘗試處理任何可能在前一個舊的任務(wù)遺留未完成的session。

每個隊里可能有一個或很多生產(chǎn)者、消費者以及監(jiān)聽者。消費者從一個特定命令隊列中獲取任務(wù):FIFO(默認(rèn),先進先出),LIFO(后進先出)或者依據(jù)優(yōu)先級。

控制隊列處理命令在這里討論。

生產(chǎn)者

任務(wù)生產(chǎn)者添加任務(wù)到隊列中。生產(chǎn)者是典型的應(yīng)用服務(wù)(Nest 提供者)。要添加工作到一個隊列,首先注冊隊列到服務(wù)中:

import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}

@InjectQueue()裝飾器由其名稱指定隊列,像它在registerQueue()方法中提供的那樣(例如,audio)。

現(xiàn)在,通過調(diào)用隊列的add()方法添加一個任務(wù),傳遞一個用戶定義的任務(wù)對象。任務(wù)表現(xiàn)為序列化的JavaScript對象(因為它們被存儲在 Redis 數(shù)據(jù)庫中)。你傳遞的任務(wù)形式是可選的;用它來在語義上表示你任務(wù)對象:

const job = await this.audioQueue.add({
  foo: 'bar',
});

命名的任務(wù)

任務(wù)需要獨一無二的名字。這允許你創(chuàng)建專用的消費者,這將僅處理給定名稱的處理任務(wù)。

const job = await this.audioQueue.add('transcode', {
  foo: 'bar',
});

當(dāng)使用命名任務(wù)時,你必須為每個添加到隊列中的特有名稱創(chuàng)建處理者,否則隊列會反饋缺失了給定任務(wù)的處理器。查看這里閱讀更多關(guān)于消費命名任務(wù)的信息。

任務(wù)選項

任務(wù)可以包括附加選項。在Quene.add()方法的job參數(shù)之后傳遞選項對象。任務(wù)選項屬性有:

  • priority: number-選項優(yōu)先級值。范圍從 1(最高優(yōu)先)到 MAX_INT(最低優(yōu)先)。注意使用屬性對性能有輕微影響,因此要小心使用。
  • delay: number- 任務(wù)執(zhí)行前等待的時間(毫秒)。注意,為了精確延時,服務(wù)端和客戶端時鐘應(yīng)該同步。
  • attempts: number-任務(wù)結(jié)束前總的嘗試次數(shù)。
  • repeat: RepeatOpts-按照定時設(shè)置重復(fù)任務(wù)記錄,查看RepeatOpts。
  • backoff: number | BackoffOpts- 如果任務(wù)失敗,自動重試閃避設(shè)置,查看BackoffOpts。
  • lifo: boolean-如果為true,從隊列右端添加任務(wù)以替代從左邊添加(默認(rèn)為 false)。
  • timeout: number-任務(wù)超時失敗的毫秒數(shù)。
  • jobId: number | string- 覆蓋任務(wù) ID-默認(rèn)地,任務(wù) ID 是唯一的整數(shù),但你可以使用該參數(shù)覆蓋它。如果你使用這個選項,你需要保證jobId是唯一的。如果你嘗試添加一個包含已有 id 的任務(wù),它不會被添加。
  • removeOnComplete: boolean | number-如果為true,當(dāng)任務(wù)完成時移除任務(wù)。一個數(shù)字用來指定要保存的任務(wù)數(shù)。默認(rèn)行為是將完成的工作保存在已完成的設(shè)置中。
  • removeOnFail: boolean | number-如果為true,當(dāng)所有嘗試失敗時移除任務(wù)。一個數(shù)字用來指定要保存的任務(wù)數(shù)。默認(rèn)行為是將失敗的任務(wù)保存在已失敗的設(shè)置中。
  • stackTraceLimit: number-限制在stacktrace中保存的堆棧跟蹤線。

這里是一些帶有任務(wù)選項的自定義任務(wù)示例。

要延遲任務(wù)的開始,使用delay配置屬性:

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { delay: 3000 } // 3 seconds delayed
);

要從右端添加任務(wù)到隊列(以 LIFO(后進先出)處理任務(wù)),設(shè)置配置對象的lifo屬性為true。

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { lifo: true }
);

要優(yōu)先一個任務(wù),使用priority屬性。

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { priority: 2 }
);

消費者

消費者是一個類,定義的方法要么處理添加到隊列中的任務(wù),要么監(jiān)聽隊列的事件,或者兩者皆有。使用@Processor()裝飾器來定義消費者類,如下:

import { Processor } from '@nestjs/bull';

@Processor('audio')
export class AudioConsumer {}

裝飾器的字符串參數(shù)(例如,audio)是和類方法關(guān)聯(lián)的隊列名稱。

在消費者類中,使用@Process()裝飾器來裝飾任務(wù)處理者。

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {
  @Process()
  async transcode(job: Job<unknown>) {
    let progress = 0;
    for (i = 0; i < 100; i++) {
      await doSomething(job.data);
      progress += 10;
      job.progress(progress);
    }
    return {};
  }
}

裝飾器方法(例如transcode()) 在工作空閑或者隊列中有消息要處理的時候被調(diào)用。該處理器方法接受job對象作為其僅有的參數(shù)。處理器方法的返回值被保存在任務(wù)對象中,可以在之后被訪問,例如,在用于完成事件的監(jiān)聽者中。

Job對象有多個方法,允許你和他們的狀態(tài)交互。例如,上述代碼使用progress()方法來更新工作進程。查看這里以了解完整的Job對象 API 參照。

你可以指定一個任務(wù)處理方法,僅處理指定類型(包含特定name的任務(wù))的任務(wù),這可以通過如下所述的將name傳遞給@Process()裝飾器完成。你在一個給定消費者類中可以有多個@Process()處理器,以反應(yīng)每個任務(wù)類型(name),確保每個name有相應(yīng)的處理者。

@Process('transcode')
async transcode(job: Job<unknown>) { ... }

事件監(jiān)聽者

當(dāng)隊列和/或任務(wù)狀態(tài)改變時,Bull生成一個有用的事件集合。Nest 提供了一個裝飾器集合,允許訂閱一系列標(biāo)準(zhǔn)核心事件集合。他們從@nestjs/bull包中導(dǎo)出。

事件監(jiān)聽者必須在一個消費者類中聲明(通過@Processor()裝飾器)。要監(jiān)聽一個事件,使用如下表格之一的裝飾器來聲明一個事件處理器。例如,當(dāng)一個任務(wù)進入audio隊列活躍狀態(tài)時,要監(jiān)聽其發(fā)射的事件,使用下列結(jié)構(gòu):

import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('audio')
export class AudioConsumer {

  @OnQueueActive()
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }

鑒于 BUll 運行于分布式(多 node)環(huán)境,它定義了本地事件概念。該概念可以辨識出一個由完整的單一進程觸發(fā)的事件,或者由不同進程共享的隊列。一個本地事件是指在本地進程中觸發(fā)的一個隊列行為或者狀態(tài)變更。換句話說,當(dāng)你的事件生產(chǎn)者和消費者是本地單進程時,隊列中所有事件都是本地的。

當(dāng)一個隊列在多個進程中共享時,我們可能要遇到全局事件。對一個由其他進程觸發(fā)的事件通知器進程的監(jiān)聽者來說,它必須注冊為全局事件。

當(dāng)相應(yīng)事件發(fā)射時事件處理器被喚醒。該處理器被下表所示的簽名調(diào)用,提供訪問事件相關(guān)的信息。我們討論下面簽名中本地和全局事件處理器。

本地事件監(jiān)聽者全局事件監(jiān)聽者處理器方法簽名/當(dāng)觸發(fā)時
@OnQueueError()@OnGlobalQueueError()handler(error: Error) - 當(dāng)錯誤發(fā)生時,error包括觸發(fā)錯誤
@OnQueueWaiting()@OnGlobalQueueWaiting()handler(jobId: number | string) - 一旦工作者空閑就等待執(zhí)行的任務(wù),jobId包括進入此狀態(tài)的 id
@OnQueueActive()@OnGlobalQueueActive()handler(job: Job)-job任務(wù)已啟動
@OnQueueStalled()@OnGlobalQueueStalled()handler(job: Job)-job任務(wù)被標(biāo)記為延遲。這在時間循環(huán)崩潰或暫停時進行調(diào)試工作時是很有效的
@OnQueueProgress()@OnGlobalQueueProgress()handler(job: Job, progress: number)-job任務(wù)進程被更新為progress
@OnQueueCompleted()@OnGlobalQueueCompleted()handler(job: Job, result: any) job任務(wù)進程成功以result結(jié)束
@OnQueueFailed()@OnGlobalQueueFailed()handler(job: Job, err: Error)job任務(wù)以err原因失敗
@OnQueuePaused()@OnGlobalQueuePaused()handler()隊列被暫停
@OnQueueResumed()@OnGlobalQueueResumed()handler(job: Job)隊列被恢復(fù)
@OnQueueCleaned()@OnGlobalQueueCleaned()handler(jobs: Job[], type: string) 舊任務(wù)從隊列中被清理,job是一個清理任務(wù)數(shù)組,type是要清理的任務(wù)類型
@OnQueueDrained()@OnGlobalQueueDrained()handler()在隊列處理完所有等待的任務(wù)(除非有些尚未處理的任務(wù)被延遲)時發(fā)射出
@OnQueueRemoved()@OnGlobalQueueRemoved()handler(job: Job)job任務(wù)被成功移除

當(dāng)監(jiān)聽全局事件時,簽名方法可能和本地有一點不同。特別地,本地版本的任何方法簽名接受job對象的方法簽名而不是全局版本的jobId(number)。要在這種情況下獲取實際的job對象的引用,使用Queue#getJob方法。這種調(diào)用可能需要等待,因此處理者應(yīng)該被聲明為async,例如:

@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
  const job = await this.immediateQueue.getJob(jobId);
  console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}

要獲取一個Queue對象(使用getJob()調(diào)用),你當(dāng)然必須注入它。同時,隊列必須注冊到你要注入的模塊中。

在特定事件監(jiān)聽器裝飾器之外,你可以使用通用的@OnQueueEvent()裝飾器與BullQueueEvents或者BullQueueGlobalEvents枚舉相結(jié)合。在這里閱讀更多有關(guān)事件的內(nèi)容。

隊列管理

隊列有一個 API 來實現(xiàn)管理功能比如暫停、恢復(fù)、檢索不同狀態(tài)的任務(wù)數(shù)量等。你可以在這里找到完整的隊列 API。直接在Queue對象上調(diào)用這些方法,如下所示的暫停/恢復(fù)示例。

使用pause()方法調(diào)用來暫停隊列。一個暫停的隊列在恢復(fù)前將不會處理新的任務(wù),但會繼續(xù)處理完當(dāng)前執(zhí)行的任務(wù)。

await audioQueue.pause();

要恢復(fù)一個暫停的隊列,使用resume()方法,如下:

await audioQueue.resume();

異步配置

你可能需要異步而不是靜態(tài)傳遞隊列選項。在這種情況下,使用registerQueueAsync()方法,可以提供不同的異步配置方法。

一個方法是使用工廠函數(shù):

BullModule.registerQueueAsync({
  name: 'audio',
  useFactory: () => ({
    redis: {
      host: 'localhost',
      port: 6379,
    },
  }),
});

我們的工廠函數(shù)方法和其他異步提供者(它可以是async的并可以使用inject來注入)方法相同。

BullModule.registerQueueAsync({
  name: 'audio',
  imports: [ConfigModule],
  useFactory: async (configService: ConfigService) => ({
    redis: {
      host: configService.get('QUEUE_HOST'),
      port: +configService.get('QUEUE_PORT'),
    },
  }),
  inject: [ConfigService],
});

可選的,你可以使用useClass語法。

BullModule.registerQueueAsync({
  name: 'audio',
  useClass: BullConfigService,
});

上述結(jié)構(gòu)在BullModule中實例化BullConfigService,并通過調(diào)用createBullOptions()來用它提供一個選項對象。注意這意味著BullConfigService要實現(xiàn)BullOptionsFactory工廠接口,如下:

@Injectable()
class BullConfigService implements BullOptionsFactory {
  createBullOptions(): BullModuleOptions {
    return {
      redis: {
        host: 'localhost',
        port: 6379,
      },
    };
  }
}

要阻止在BullModule中創(chuàng)建BullConfigService并使用一個從其他模塊導(dǎo)入的提供者,可以使用useExisting語法。

BullModule.registerQueueAsync({
  name: 'audio',
  imports: [ConfigModule],
  useExisting: ConfigService,
});

這個結(jié)構(gòu)和useClass有一個根本區(qū)別——BullModule將查找導(dǎo)入的模塊來重用現(xiàn)有的ConfigServie而不是實例化一個新的。

示例

一個可用的示例見這里


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號