diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index ab27a710c..02f02beca 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -10,7 +10,7 @@ import parseUrl from 'parseurl'; import type { RedisOptions } from 'ioredis'; import type { WebhookHttpMethod } from 'n8n-workflow'; -import { LoggerProxy as Logger } from 'n8n-workflow'; +import { LoggerProxy } from 'n8n-workflow'; import config from '@/config'; import { N8N_VERSION, inDevelopment } from '@/constants'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; @@ -27,6 +27,7 @@ import { corsMiddleware } from '@/middlewares'; import { TestWebhooks } from '@/TestWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks'; import { WEBHOOK_METHODS } from '@/WebhookHelpers'; +import { getRedisClusterNodes } from './GenericHelpers'; const emptyBuffer = Buffer.alloc(0); @@ -187,42 +188,64 @@ export abstract class AbstractServer { let lastTimer = 0; let cumulativeTimeout = 0; const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); + const clusterNodes = getRedisClusterNodes(); const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); - - const redis = new Redis({ - host, - port, - db, + const usesRedisCluster = clusterNodes.length > 0; + LoggerProxy.debug( + usesRedisCluster + ? `Initialising Redis cluster connection with nodes: ${clusterNodes + .map((e) => `${e.host}:${e.port}`) + .join(',')}` + : `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${ + port ?? '6379' + }`, + ); + const sharedRedisOptions: RedisOptions = { username, password, - retryStrategy: (): number | null => { - const now = Date.now(); - if (now - lastTimer > 30000) { - // Means we had no timeout at all or last timeout was temporary and we recovered - lastTimer = now; - cumulativeTimeout = 0; - } else { - cumulativeTimeout += now - lastTimer; - lastTimer = now; - if (cumulativeTimeout > redisConnectionTimeoutLimit) { - Logger.error( - `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, - ); - process.exit(1); - } - } - return 500; - }, - }); + db, + enableReadyCheck: false, + maxRetriesPerRequest: null, + }; + const redis = usesRedisCluster + ? new Redis.Cluster( + clusterNodes.map((node) => ({ host: node.host, port: node.port })), + { + redisOptions: sharedRedisOptions, + }, + ) + : new Redis({ + host, + port, + ...sharedRedisOptions, + retryStrategy: (): number | null => { + const now = Date.now(); + if (now - lastTimer > 30000) { + // Means we had no timeout at all or last timeout was temporary and we recovered + lastTimer = now; + cumulativeTimeout = 0; + } else { + cumulativeTimeout += now - lastTimer; + lastTimer = now; + if (cumulativeTimeout > redisConnectionTimeoutLimit) { + LoggerProxy.error( + `Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, + ); + process.exit(1); + } + } + return 500; + }, + }); redis.on('close', () => { - Logger.warn('Redis unavailable - trying to reconnect...'); + LoggerProxy.warn('Redis unavailable - trying to reconnect...'); }); redis.on('error', (error) => { if (!String(error).includes('ECONNREFUSED')) { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - Logger.warn('Error with Redis: ', error); + LoggerProxy.warn('Error with Redis: ', error); } }); } diff --git a/packages/cli/src/GenericHelpers.ts b/packages/cli/src/GenericHelpers.ts index e012accd5..56d6c9ce9 100644 --- a/packages/cli/src/GenericHelpers.ts +++ b/packages/cli/src/GenericHelpers.ts @@ -199,4 +199,28 @@ export async function createErrorExecution( await Container.get(ExecutionRepository).createNewExecution(fullExecutionData); } +export function getRedisClusterNodes(): Array<{ host: string; port: number }> { + const clusterNodePairs = config + .getEnv('queue.bull.redis.clusterNodes') + .split(',') + .filter((e) => e); + return clusterNodePairs.map((pair) => { + const [host, port] = pair.split(':'); + return { host, port: parseInt(port) }; + }); +} + +export function getRedisPrefix(): string { + let prefix = config.getEnv('queue.bull.prefix'); + if (prefix && getRedisClusterNodes().length > 0) { + if (!prefix.startsWith('{')) { + prefix = '{' + prefix; + } + if (!prefix.endsWith('}')) { + prefix += '}'; + } + } + return prefix; +} + export const DEFAULT_EXECUTIONS_GET_ALL_LIMIT = 20; diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 1bd7fb5fc..c1af79b7e 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,10 +1,11 @@ import type Bull from 'bull'; -import type { RedisOptions } from 'ioredis'; +import { type RedisOptions } from 'ioredis'; import { Service } from 'typedi'; -import type { IExecuteResponsePromiseData } from 'n8n-workflow'; +import { LoggerProxy, type IExecuteResponsePromiseData } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; import * as WebhookHelpers from '@/WebhookHelpers'; +import { getRedisClusterNodes, getRedisPrefix } from './GenericHelpers'; export type JobId = Bull.JobId; export type Job = Bull.Job; @@ -31,19 +32,54 @@ export class Queue { constructor(private activeExecutions: ActiveExecutions) {} async init() { - const prefix = config.getEnv('queue.bull.prefix'); - const redisOptions: RedisOptions = config.getEnv('queue.bull.redis'); - + const prefix = getRedisPrefix(); + const clusterNodes = getRedisClusterNodes(); + const usesRedisCluster = clusterNodes.length > 0; + const { host, port, username, password, db }: RedisOptions = config.getEnv('queue.bull.redis'); // eslint-disable-next-line @typescript-eslint/naming-convention const { default: Bull } = await import('bull'); - + // eslint-disable-next-line @typescript-eslint/naming-convention + const { default: Redis } = await import('ioredis'); // Disabling ready check is necessary as it allows worker to // quickly reconnect to Redis if Redis crashes or is unreachable // for some time. With it enabled, worker might take minutes to realize // redis is back up and resume working. // More here: https://github.com/OptimalBits/bull/issues/890 - // @ts-ignore - this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); + + LoggerProxy.debug( + usesRedisCluster + ? `Initialising Redis cluster connection with nodes: ${clusterNodes + .map((e) => `${e.host}:${e.port}`) + .join(',')}` + : `Initialising Redis client connection with host: ${host ?? 'localhost'} and port: ${ + port ?? '6379' + }`, + ); + const sharedRedisOptions: RedisOptions = { + username, + password, + db, + enableReadyCheck: false, + maxRetriesPerRequest: null, + }; + this.jobQueue = new Bull('jobs', { + prefix, + createClient: (type, clientConfig) => + usesRedisCluster + ? new Redis.Cluster( + clusterNodes.map((node) => ({ host: node.host, port: node.port })), + { + ...clientConfig, + redisOptions: sharedRedisOptions, + }, + ) + : new Redis({ + ...clientConfig, + host, + port, + ...sharedRedisOptions, + }), + }); this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => { this.activeExecutions.resolveResponsePromise( diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index e6aedf836..cc2eb24e8 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -353,9 +353,9 @@ export const schema = { }, bull: { prefix: { - doc: 'Prefix for all queue keys', + doc: 'Prefix for all queue keys (wrap in {} for cluster mode)', format: String, - default: '', + default: 'bull', env: 'QUEUE_BULL_PREFIX', }, redis: { @@ -395,6 +395,12 @@ export const schema = { default: '', env: 'QUEUE_BULL_REDIS_USERNAME', }, + clusterNodes: { + doc: 'Redis Cluster startup nodes (comma separated list of host:port pairs)', + format: String, + default: '', + env: 'QUEUE_BULL_REDIS_CLUSTER_NODES', + }, }, queueRecoveryInterval: { doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.',