diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 8fd435100..11cfed839 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -35,6 +35,24 @@ export interface WebhookResponse { export class Queue { private jobQueue: JobQueue; + /** + * The number of jobs a single server can process concurrently + * Any worker that wants to process executions must first set this to a non-zero value + */ + private concurrency = 0; + + setConcurrency(concurrency: number) { + this.concurrency = concurrency; + // This sets the max event listeners on the jobQueue EventEmitter to prevent the logs getting flooded with MaxListenersExceededWarning + // see: https://github.com/OptimalBits/bull/blob/develop/lib/job.js#L497-L521 + this.jobQueue.setMaxListeners( + 4 + // `close` + 2 + // `error` + 2 + // `global:progress` + concurrency * 2, // 2 global events for every call to `job.finished()` + ); + } + constructor(private activeExecutions: ActiveExecutions) {} async init() { @@ -102,8 +120,8 @@ export class Queue { return new Set(inProgressJobs.map((job) => job.data.executionId)); } - async process(concurrency: number, fn: Bull.ProcessCallbackFunction): Promise { - return await this.jobQueue.process(concurrency, fn); + async process(fn: Bull.ProcessCallbackFunction): Promise { + return await this.jobQueue.process(this.concurrency, fn); } async ping(): Promise { diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 18db29659..4f582904c 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -320,11 +320,9 @@ export class Worker extends BaseCommand { const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; + Worker.jobQueue.setConcurrency(concurrency); - void Worker.jobQueue.process( - concurrency, - async (job) => await this.runJob(job, this.nodeTypes), - ); + void Worker.jobQueue.process(async (job) => await this.runJob(job, this.nodeTypes)); Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => { // Progress of a job got updated which does get used