隊列是一種有用的設(shè)計模式,可以幫助你處理一般應(yīng)用規(guī)模和性能的挑戰(zhàn)。一些隊列可以幫助你處理的問題示例包括:
Nest 提供了@nestjs/bull包,這是Bull包的一個包裝器,Bull 是一個流行的、支持良好的、高性能的基于 Nodejs 的消息隊列系統(tǒng)應(yīng)用。該包將 Bull 隊列以 Nest 友好的方式添加到你的應(yīng)用中。
Bull 使用Redis持久化工作數(shù)據(jù),因此你需要在你的系統(tǒng)中安裝 Redis。因為他是基于 Redis 的,你的隊列結(jié)構(gòu)可以是完全分布式的并且和平臺無關(guān)。例如,你可以有一些隊列生產(chǎn)者、消費者和監(jiān)聽者,他們運行在 Nest 的一個或多個節(jié)點上,同時,其他生產(chǎn)者、消費者和監(jiān)聽者在其他 Node.js 平臺或者其他網(wǎng)絡(luò)節(jié)點上。
本章使用@nestjs/bull包,我們同時推薦閱讀BUll 文檔來獲取更多背景和應(yīng)用細(xì)節(jié)。
要開始使用,我們首先安裝需要的依賴:
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
一旦安裝過程完成,我們可以在根AppModule中導(dǎo)入BullModule。
app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
registerQueue()方法用于實例化并/或注冊隊列。隊列在不同的模塊和進程之間共享,在底層則通過同樣的憑據(jù)連接到同樣的 Redis 數(shù)據(jù)庫。每個隊列由其name屬性區(qū)分(如下),當(dāng)共享隊列(跨模塊/進程)時,第一個registerQueue()方法同時實例化該隊列并向模塊注冊它。其他模塊(在相同或者不同進程下)則簡單地注冊隊列。隊列注冊創(chuàng)建一個injection token,它可以被用在給定 Nest 模塊中獲取隊列。
針對每個隊列,傳遞一個包含下列屬性的配置對象:
-name:string- 一個隊列名稱,它可以被用作injection token(用于將隊列注冊到控制器/提供者),也可以作為裝飾器參數(shù)來將消費者類和監(jiān)聽者與隊列聯(lián)系起來。是必須的。 -limiter:RateLimiter-該選項用于確定消息隊列處理速率,查看RateLimiter獲取更多信息??蛇x的。 -redis:RedisOpts-該選項用于配置 Redis 連接,查看RedisOpts獲取更多信息??蛇x的。 -prefix: string-隊列所有鍵的前綴。可選的。 -defaultJobOptions: JobOpts-選項用以控制新任務(wù)的默認(rèn)屬性。查看JobOpts獲取更多信息??蛇x的。 -settings: AdvancedSettings-高級隊列配置設(shè)置。這些通常不需要改變。查看AdvancedSettings獲取更多信息??蛇x的。
注意,name屬性是必須的。其他選項是可選的,為隊列行為提供更細(xì)節(jié)的控制。這些會直接傳遞給 Bull 的Queue構(gòu)造器。在這里閱讀更多選項。當(dāng)在第二個或者子模塊中注冊一個隊列時,最佳時間是省略配置對象中除name屬性之外的所有選項。這些選項僅應(yīng)該在實例化隊列的模塊中確定。
在registerQueue()方法中傳遞多個逗號分隔的選項對象來創(chuàng)建多個隊列。
由于任務(wù)在 Redis 中是持久化的,每次當(dāng)一個特定名稱的隊列被實例化時(例如,當(dāng)一個 app 啟動/重啟時),它嘗試處理任何可能在前一個舊的任務(wù)遺留未完成的session。
每個隊里可能有一個或很多生產(chǎn)者、消費者以及監(jiān)聽者。消費者從一個特定命令隊列中獲取任務(wù):FIFO(默認(rèn),先進先出),LIFO(后進先出)或者依據(jù)優(yōu)先級。
控制隊列處理命令在這里討論。
任務(wù)生產(chǎn)者添加任務(wù)到隊列中。生產(chǎn)者是典型的應(yīng)用服務(wù)(Nest 提供者)。要添加工作到一個隊列,首先注冊隊列到服務(wù)中:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue()裝飾器由其名稱指定隊列,像它在registerQueue()方法中提供的那樣(例如,audio)。
現(xiàn)在,通過調(diào)用隊列的add()方法添加一個任務(wù),傳遞一個用戶定義的任務(wù)對象。任務(wù)表現(xiàn)為序列化的JavaScript對象(因為它們被存儲在 Redis 數(shù)據(jù)庫中)。你傳遞的任務(wù)形式是可選的;用它來在語義上表示你任務(wù)對象:
const job = await this.audioQueue.add({
foo: 'bar',
});
任務(wù)需要獨一無二的名字。這允許你創(chuàng)建專用的消費者,這將僅處理給定名稱的處理任務(wù)。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
當(dāng)使用命名任務(wù)時,你必須為每個添加到隊列中的特有名稱創(chuàng)建處理者,否則隊列會反饋缺失了給定任務(wù)的處理器。查看這里閱讀更多關(guān)于消費命名任務(wù)的信息。
任務(wù)可以包括附加選項。在Quene.add()方法的job參數(shù)之后傳遞選項對象。任務(wù)選項屬性有:
這里是一些帶有任務(wù)選項的自定義任務(wù)示例。
要延遲任務(wù)的開始,使用delay配置屬性:
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 } // 3 seconds delayed
);
要從右端添加任務(wù)到隊列(以 LIFO(后進先出)處理任務(wù)),設(shè)置配置對象的lifo屬性為true。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true }
);
要優(yōu)先一個任務(wù),使用priority屬性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 }
);
消費者是一個類,定義的方法要么處理添加到隊列中的任務(wù),要么監(jiān)聽隊列的事件,或者兩者皆有。使用@Processor()裝飾器來定義消費者類,如下:
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
裝飾器的字符串參數(shù)(例如,audio)是和類方法關(guān)聯(lián)的隊列名稱。
在消費者類中,使用@Process()裝飾器來裝飾任務(wù)處理者。
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress);
}
return {};
}
}
裝飾器方法(例如transcode()) 在工作空閑或者隊列中有消息要處理的時候被調(diào)用。該處理器方法接受job對象作為其僅有的參數(shù)。處理器方法的返回值被保存在任務(wù)對象中,可以在之后被訪問,例如,在用于完成事件的監(jiān)聽者中。
Job對象有多個方法,允許你和他們的狀態(tài)交互。例如,上述代碼使用progress()方法來更新工作進程。查看這里以了解完整的Job對象 API 參照。
你可以指定一個任務(wù)處理方法,僅處理指定類型(包含特定name的任務(wù))的任務(wù),這可以通過如下所述的將name傳遞給@Process()裝飾器完成。你在一個給定消費者類中可以有多個@Process()處理器,以反應(yīng)每個任務(wù)類型(name),確保每個name有相應(yīng)的處理者。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
當(dāng)隊列和/或任務(wù)狀態(tài)改變時,Bull生成一個有用的事件集合。Nest 提供了一個裝飾器集合,允許訂閱一系列標(biāo)準(zhǔn)核心事件集合。他們從@nestjs/bull包中導(dǎo)出。
事件監(jiān)聽者必須在一個消費者類中聲明(通過@Processor()裝飾器)。要監(jiān)聽一個事件,使用如下表格之一的裝飾器來聲明一個事件處理器。例如,當(dāng)一個任務(wù)進入audio隊列活躍狀態(tài)時,要監(jiān)聽其發(fā)射的事件,使用下列結(jié)構(gòu):
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
鑒于 BUll 運行于分布式(多 node)環(huán)境,它定義了本地事件概念。該概念可以辨識出一個由完整的單一進程觸發(fā)的事件,或者由不同進程共享的隊列。一個本地事件是指在本地進程中觸發(fā)的一個隊列行為或者狀態(tài)變更。換句話說,當(dāng)你的事件生產(chǎn)者和消費者是本地單進程時,隊列中所有事件都是本地的。
當(dāng)一個隊列在多個進程中共享時,我們可能要遇到全局事件。對一個由其他進程觸發(fā)的事件通知器進程的監(jiān)聽者來說,它必須注冊為全局事件。
當(dāng)相應(yīng)事件發(fā)射時事件處理器被喚醒。該處理器被下表所示的簽名調(diào)用,提供訪問事件相關(guān)的信息。我們討論下面簽名中本地和全局事件處理器。
本地事件監(jiān)聽者 | 全局事件監(jiān)聽者 | 處理器方法簽名/當(dāng)觸發(fā)時 |
---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 當(dāng)錯誤發(fā)生時,error 包括觸發(fā)錯誤 |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - 一旦工作者空閑就等待執(zhí)行的任務(wù),jobId 包括進入此狀態(tài)的 id |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-job 任務(wù)已啟動 |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-job 任務(wù)被標(biāo)記為延遲。這在時間循環(huán)崩潰或暫停時進行調(diào)試工作時是很有效的 |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-job 任務(wù)進程被更新為progress 值 |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) job 任務(wù)進程成功以result 結(jié)束 |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)job 任務(wù)以err 原因失敗 |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler()隊列被暫停 |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)隊列被恢復(fù) |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 舊任務(wù)從隊列中被清理,job 是一個清理任務(wù)數(shù)組,type 是要清理的任務(wù)類型 |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler()在隊列處理完所有等待的任務(wù)(除非有些尚未處理的任務(wù)被延遲)時發(fā)射出 |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)job 任務(wù)被成功移除 |
當(dāng)監(jiān)聽全局事件時,簽名方法可能和本地有一點不同。特別地,本地版本的任何方法簽名接受job對象的方法簽名而不是全局版本的jobId(number)。要在這種情況下獲取實際的job對象的引用,使用Queue#getJob方法。這種調(diào)用可能需要等待,因此處理者應(yīng)該被聲明為async,例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
要獲取一個Queue對象(使用getJob()調(diào)用),你當(dāng)然必須注入它。同時,隊列必須注冊到你要注入的模塊中。
在特定事件監(jiān)聽器裝飾器之外,你可以使用通用的@OnQueueEvent()裝飾器與BullQueueEvents或者BullQueueGlobalEvents枚舉相結(jié)合。在這里閱讀更多有關(guān)事件的內(nèi)容。
隊列有一個 API 來實現(xiàn)管理功能比如暫停、恢復(fù)、檢索不同狀態(tài)的任務(wù)數(shù)量等。你可以在這里找到完整的隊列 API。直接在Queue對象上調(diào)用這些方法,如下所示的暫停/恢復(fù)示例。
使用pause()方法調(diào)用來暫停隊列。一個暫停的隊列在恢復(fù)前將不會處理新的任務(wù),但會繼續(xù)處理完當(dāng)前執(zhí)行的任務(wù)。
await audioQueue.pause();
要恢復(fù)一個暫停的隊列,使用resume()方法,如下:
await audioQueue.resume();
你可能需要異步而不是靜態(tài)傳遞隊列選項。在這種情況下,使用registerQueueAsync()方法,可以提供不同的異步配置方法。
一個方法是使用工廠函數(shù):
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
我們的工廠函數(shù)方法和其他異步提供者(它可以是async的并可以使用inject來注入)方法相同。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
可選的,你可以使用useClass語法。
BullModule.registerQueueAsync({
name: 'audio',
useClass: BullConfigService,
});
上述結(jié)構(gòu)在BullModule中實例化BullConfigService,并通過調(diào)用createBullOptions()來用它提供一個選項對象。注意這意味著BullConfigService要實現(xiàn)BullOptionsFactory工廠接口,如下:
@Injectable()
class BullConfigService implements BullOptionsFactory {
createBullOptions(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
要阻止在BullModule中創(chuàng)建BullConfigService并使用一個從其他模塊導(dǎo)入的提供者,可以使用useExisting語法。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useExisting: ConfigService,
});
這個結(jié)構(gòu)和useClass有一個根本區(qū)別——BullModule將查找導(dǎo)入的模塊來重用現(xiàn)有的ConfigServie而不是實例化一個新的。
一個可用的示例見這里。
更多建議: