feat(core): Implement inter-main communication for test webhooks in multi-main setup (#8267)

This commit is contained in:
Iván Ovejero
2024-01-12 11:48:58 +01:00
committed by GitHub
parent 135553bd6b
commit 1a0e285553
17 changed files with 231 additions and 96 deletions

View File

@@ -15,6 +15,7 @@ import type {
MaybeHash,
Hash,
} from '@/services/cache/cache.types';
import { TIME } from '@/constants';
@Service()
export class CacheService extends EventEmitter {
@@ -130,6 +131,21 @@ export class CacheService extends EventEmitter {
await this.set(key, hashObject);
}
async expire(key: string, ttlMs: number) {
if (!this.cache) await this.init();
if (!key?.length) return;
if (this.cache.kind === 'memory') {
setTimeout(async () => {
await this.cache.store.del(key);
}, ttlMs);
return;
}
await this.cache.store.expire(key, ttlMs / TIME.SECOND);
}
// ----------------------------------
// retrieving
// ----------------------------------

View File

@@ -39,6 +39,7 @@ export interface RedisStore extends Store {
hvals<T>(key: string): Promise<T[]>;
hexists(key: string, field: string): Promise<boolean>;
hdel(key: string, field: string): Promise<number>;
expire(key: string, ttlSeconds: number): Promise<void>;
}
function builder(
@@ -56,6 +57,9 @@ function builder(
if (val === undefined || val === null) return undefined;
else return jsonParse<T>(val);
},
async expire(key: string, ttlSeconds: number) {
await redisCache.expire(key, ttlSeconds);
},
async set(key, value, ttl) {
// eslint-disable-next-line @typescript-eslint/no-throw-literal, @typescript-eslint/restrict-template-expressions
if (!isCacheable(value)) throw new NoCacheableError(`"${value}" is not a cacheable value`);

View File

@@ -4,6 +4,10 @@ import { TIME } from '@/constants';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
import { ErrorReporterProxy as EventReporter } from 'n8n-workflow';
import type {
RedisServiceBaseCommand,
RedisServiceCommand,
} from '@/services/redis/RedisServiceCommands';
@Service()
export class MultiMainSetup extends SingleMainSetup {
@@ -122,27 +126,14 @@ export class MultiMainSetup extends SingleMainSetup {
}
}
async broadcastWorkflowActiveStateChanged(payload: {
workflowId: string;
oldState: boolean;
newState: boolean;
versionId: string;
}) {
async publish(command: RedisServiceCommand, data: unknown) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowActiveStateChanged',
payload,
});
}
const payload = data as RedisServiceBaseCommand['payload'];
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
if (!this.sanityCheck()) return;
this.logger.debug(`[Instance ID ${this.id}] Publishing command "${command}"`, payload);
await this.redisPublisher.publishToCommandChannel({
command: 'workflowFailedToActivate',
payload,
});
await this.redisPublisher.publishToCommandChannel({ command, payload });
}
async fetchLeaderKey() {

View File

@@ -9,6 +9,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { Push } from '@/push';
import { MultiMainSetup } from './MultiMainSetup.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { TestWebhooks } from '@/TestWebhooks';
export async function handleCommandMessageMain(messageString: string) {
const queueModeId = config.getEnv('redis.queueModeId');
@@ -31,6 +32,9 @@ export async function handleCommandMessageMain(messageString: string) {
);
return message;
}
const push = Container.get(Push);
switch (message.command) {
case 'reloadLicense':
if (!debounceMessageReceiver(message, 500)) {
@@ -84,8 +88,6 @@ export async function handleCommandMessageMain(messageString: string) {
break;
}
const push = Container.get(Push);
if (!oldState && newState) {
try {
await activeWorkflowRunner.add(workflowId, 'activate');
@@ -98,7 +100,7 @@ export async function handleCommandMessageMain(messageString: string) {
versionId,
});
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
await Container.get(MultiMainSetup).publish('workflowFailedToActivate', {
workflowId,
errorMessage: error.message,
});
@@ -125,6 +127,44 @@ export async function handleCommandMessageMain(messageString: string) {
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
break;
}
case 'relay-execution-lifecycle-event': {
/**
* Do not debounce this - all events share the same message name.
*/
const { type, args, sessionId } = message.payload;
if (!push.getBackend().hasSessionId(sessionId)) break;
push.send(type, args, sessionId);
break;
}
case 'clear-test-webhooks': {
if (!debounceMessageReceiver(message, 100)) {
// @ts-expect-error Legacy typing
message.payload = { result: 'debounced' };
return message;
}
const { webhookKey, workflowEntity, sessionId } = message.payload;
if (!push.getBackend().hasSessionId(sessionId)) break;
const testWebhooks = Container.get(TestWebhooks);
testWebhooks.clearTimeout(webhookKey);
const workflow = testWebhooks.toWorkflow(workflowEntity);
await testWebhooks.deactivateWebhooks(workflow);
break;
}
default:

View File

@@ -122,6 +122,13 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
// await this.stopProcess();
break;
default:
if (
message.command === 'relay-execution-lifecycle-event' ||
message.command === 'clear-test-webhooks'
) {
break; // meant only for main
}
logger.debug(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`,

View File

@@ -1,4 +1,4 @@
import type { IPushDataWorkerStatusPayload } from '@/Interfaces';
import type { IPushDataType, IPushDataWorkerStatusPayload, IWorkflowDb } from '@/Interfaces';
export type RedisServiceCommand =
| 'getStatus'
@@ -8,7 +8,9 @@ export type RedisServiceCommand =
| 'reloadLicense'
| 'reloadExternalSecretsProviders'
| 'workflowActiveStateChanged' // multi-main only
| 'workflowFailedToActivate'; // multi-main only
| 'workflowFailedToActivate' // multi-main only
| 'relay-execution-lifecycle-event' // multi-main only
| 'clear-test-webhooks'; // multi-main only
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
@@ -16,13 +18,27 @@ export type RedisServiceCommand =
* @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids.
* @field payload: Optional arguments to be sent with the command.
*/
type RedisServiceBaseCommand = {
senderId: string;
command: RedisServiceCommand;
payload?: {
[key: string]: string | number | boolean | string[] | number[] | boolean[];
};
};
export type RedisServiceBaseCommand =
| {
senderId: string;
command: Exclude<
RedisServiceCommand,
'relay-execution-lifecycle-event' | 'clear-test-webhooks'
>;
payload?: {
[key: string]: string | number | boolean | string[] | number[] | boolean[];
};
}
| {
senderId: string;
command: 'relay-execution-lifecycle-event';
payload: { type: IPushDataType; args: Record<string, unknown>; sessionId: string };
}
| {
senderId: string;
command: 'clear-test-webhooks';
payload: { webhookKey: string; workflowEntity: IWorkflowDb; sessionId: string };
};
export type RedisServiceWorkerResponseObject = {
workerId: string;

View File

@@ -2,6 +2,7 @@ import { Service } from 'typedi';
import { CacheService } from '@/services/cache/cache.service';
import { type IWebhookData } from 'n8n-workflow';
import type { IWorkflowDb } from '@/Interfaces';
import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants';
export type TestWebhookRegistration = {
sessionId?: string;
@@ -20,6 +21,19 @@ export class TestWebhookRegistrationsService {
const hashKey = this.toKey(registration.webhook);
await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration });
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks.
* We set a TTL on the key so that it is cleared even on creator process crash,
* with an additional buffer to ensure this safeguard expiration will not delete
* the key before the regular test webhook timeout fetches the key to delete it.
*/
const ttl = TEST_WEBHOOK_TIMEOUT + TEST_WEBHOOK_TIMEOUT_BUFFER;
await this.cacheService.expire(this.cacheKey, ttl);
}
async deregister(arg: IWebhookData | string) {