From 3bbeae47f38769b1cedbe9385a74fa476a948933 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 16 Jul 2024 18:20:55 +0200 Subject: [PATCH] refactor(core): Suppress `MaxListenersExceededWarning` in the logs (#10077) --- packages/cli/src/Queue.ts | 22 ++++++++++++++++++++-- packages/cli/src/commands/worker.ts | 6 ++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 8fd435100..11cfed839 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -35,6 +35,24 @@ export interface WebhookResponse { export class Queue { private jobQueue: JobQueue; + /** + * The number of jobs a single server can process concurrently + * Any worker that wants to process executions must first set this to a non-zero value + */ + private concurrency = 0; + + setConcurrency(concurrency: number) { + this.concurrency = concurrency; + // This sets the max event listeners on the jobQueue EventEmitter to prevent the logs getting flooded with MaxListenersExceededWarning + // see: https://github.com/OptimalBits/bull/blob/develop/lib/job.js#L497-L521 + this.jobQueue.setMaxListeners( + 4 + // `close` + 2 + // `error` + 2 + // `global:progress` + concurrency * 2, // 2 global events for every call to `job.finished()` + ); + } + constructor(private activeExecutions: ActiveExecutions) {} async init() { @@ -102,8 +120,8 @@ export class Queue { return new Set(inProgressJobs.map((job) => job.data.executionId)); } - async process(concurrency: number, fn: Bull.ProcessCallbackFunction): Promise { - return await this.jobQueue.process(concurrency, fn); + async process(fn: Bull.ProcessCallbackFunction): Promise { + return await this.jobQueue.process(this.concurrency, fn); } async ping(): Promise { diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 18db29659..4f582904c 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -320,11 +320,9 @@ export class Worker extends BaseCommand { const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; + Worker.jobQueue.setConcurrency(concurrency); - void Worker.jobQueue.process( - concurrency, - async (job) => await this.runJob(job, this.nodeTypes), - ); + void Worker.jobQueue.process(async (job) => await this.runJob(job, this.nodeTypes)); Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => { // Progress of a job got updated which does get used