From f2939568cf399e67214e89bc7f859323aaeda8dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 5 Jan 2024 13:06:42 +0100 Subject: [PATCH] perf(core): Optimize workflow activation errors (#8242) At https://github.com/n8n-io/n8n/pull/8213 we introduced Redis hashes for workflow ownership and manual webhooks... - to remove clutter from multiple related keys at the top level, - to improve performance by preventing serializing-deserializing, and - to guarantee atomicity during concurrent updates in multi-main setup. Workflow activation errors can also benefit from this. Added test coverage as well. To test manually, create a workflow with a trigger with an invalid credential, edit the workflow's `active` column to `true`, and restart. The activation error should show as a red triangle on canvas and in the workflow list. --- packages/cli/src/ActivationErrors.service.ts | 35 +++----- packages/cli/src/ActiveWorkflowRunner.ts | 10 +-- .../activation-errors.service.test.ts | 87 +++++++++++++++++++ 3 files changed, 102 insertions(+), 30 deletions(-) create mode 100644 packages/cli/test/integration/activation-errors.service.test.ts diff --git a/packages/cli/src/ActivationErrors.service.ts b/packages/cli/src/ActivationErrors.service.ts index 06dc002c0..e06dcf6de 100644 --- a/packages/cli/src/ActivationErrors.service.ts +++ b/packages/cli/src/ActivationErrors.service.ts @@ -1,10 +1,5 @@ import { Service } from 'typedi'; import { CacheService } from '@/services/cache/cache.service'; -import { jsonParse } from 'n8n-workflow'; - -type ActivationErrors = { - [workflowId: string]: string; // error message -}; @Service() export class ActivationErrorsService { @@ -12,38 +7,28 @@ export class ActivationErrorsService { constructor(private readonly cacheService: CacheService) {} - async set(workflowId: string, errorMessage: string) { - const errors = await this.getAll(); - - errors[workflowId] = errorMessage; - - await this.cacheService.set(this.cacheKey, JSON.stringify(errors)); + async register(workflowId: string, errorMessage: string) { + await this.cacheService.setHash(this.cacheKey, { [workflowId]: errorMessage }); } - async unset(workflowId: string) { - const errors = await this.getAll(); - - if (Object.keys(errors).length === 0) return; - - delete errors[workflowId]; - - await this.cacheService.set(this.cacheKey, JSON.stringify(errors)); + async deregister(workflowId: string) { + await this.cacheService.deleteFromHash(this.cacheKey, workflowId); } async get(workflowId: string) { - const errors = await this.getAll(); + const activationError = await this.cacheService.getHashValue(this.cacheKey, workflowId); - if (Object.keys(errors).length === 0) return null; + if (!activationError) return null; - return errors[workflowId]; + return activationError; } async getAll() { - const errors = await this.cacheService.get(this.cacheKey); + const activationErrors = await this.cacheService.getHash(this.cacheKey); - if (!errors) return {}; + if (!activationErrors) return {}; - return jsonParse(errors); + return activationErrors; } async clearAll() { diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index d00056bb4..e0e03ad11 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -425,7 +425,7 @@ export class ActiveWorkflowRunner { void this.activeWorkflows.remove(workflowData.id); - void this.activationErrorsService.set(workflowData.id, error.message); + void this.activationErrorsService.register(workflowData.id, error.message); // Run Error Workflow if defined const activationError = new WorkflowActivationError( @@ -630,13 +630,13 @@ export class ActiveWorkflowRunner { // Workflow got now successfully activated so make sure nothing is left in the queue this.removeQueuedWorkflowActivation(workflowId); - await this.activationErrorsService.unset(workflowId); + await this.activationErrorsService.deregister(workflowId); const triggerCount = this.countTriggers(workflow, additionalData); await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount); } catch (e) { const error = e instanceof Error ? e : new Error(`${e}`); - await this.activationErrorsService.set(workflowId, error.message); + await this.activationErrorsService.register(workflowId, error.message); throw e; } @@ -757,7 +757,7 @@ export class ActiveWorkflowRunner { ); } - await this.activationErrorsService.unset(workflowId); + await this.activationErrorsService.deregister(workflowId); if (this.queuedActivations[workflowId] !== undefined) { this.removeQueuedWorkflowActivation(workflowId); @@ -824,6 +824,6 @@ export class ActiveWorkflowRunner { } async removeActivationError(workflowId: string) { - await this.activationErrorsService.unset(workflowId); + await this.activationErrorsService.deregister(workflowId); } } diff --git a/packages/cli/test/integration/activation-errors.service.test.ts b/packages/cli/test/integration/activation-errors.service.test.ts new file mode 100644 index 000000000..7635660db --- /dev/null +++ b/packages/cli/test/integration/activation-errors.service.test.ts @@ -0,0 +1,87 @@ +import { ActivationErrorsService } from '@/ActivationErrors.service'; +import { CacheService } from '@/services/cache/cache.service'; + +describe('ActivationErrorsService', () => { + const cacheService = new CacheService(); + const activationErrorsService = new ActivationErrorsService(cacheService); + + const firstWorkflowId = 'GSG0etbfTA2CNPDX'; + const secondWorkflowId = 'k2ORscMPO66K0Jk3'; + + const firstErrorMsg = 'Failed to activate'; + const secondErrorMsg = 'Also failed to activate'; + + afterEach(async () => { + await activationErrorsService.clearAll(); + }); + + describe('register', () => { + test('should register an activation error for a workflow', async () => { + await activationErrorsService.register(firstWorkflowId, firstErrorMsg); + + const activationError = await activationErrorsService.get(firstWorkflowId); + + expect(activationError).toBe(firstErrorMsg); + }); + }); + + describe('deregister', () => { + test('should deregister an activation error for a workflow', async () => { + await activationErrorsService.register(firstWorkflowId, firstErrorMsg); + + await activationErrorsService.deregister(firstWorkflowId); + + const activationError = await activationErrorsService.get(firstWorkflowId); + + expect(activationError).toBeNull(); + }); + }); + + describe('get', () => { + test('should retrieve an activation error for a workflow', async () => { + await activationErrorsService.register(firstWorkflowId, firstErrorMsg); + + const activationError = await activationErrorsService.get(firstWorkflowId); + + expect(activationError).toBe(firstErrorMsg); + }); + + test('should return `null` if no activation error found for a workflow', async () => { + const activationError = await activationErrorsService.get(firstWorkflowId); + + expect(activationError).toBeNull(); + }); + }); + + describe('getAll', () => { + test('should retrieve all activation errors', async () => { + await activationErrorsService.register(firstWorkflowId, firstErrorMsg); + await activationErrorsService.register(secondWorkflowId, secondErrorMsg); + + const allActivationErrors = await activationErrorsService.getAll(); + + expect(allActivationErrors).toEqual({ + [firstWorkflowId]: firstErrorMsg, + [secondWorkflowId]: secondErrorMsg, + }); + }); + + test('should return an empty object if no activation errors', async () => { + const allActivationErrors = await activationErrorsService.getAll(); + + expect(allActivationErrors).toEqual({}); + }); + }); + + describe('clearAll()', () => { + test('should clear activation errors', async () => { + await activationErrorsService.register(firstWorkflowId, firstErrorMsg); + await activationErrorsService.register(secondWorkflowId, secondErrorMsg); + + await activationErrorsService.clearAll(); + + const allActivationErrors = await activationErrorsService.getAll(); + expect(allActivationErrors).toEqual({}); + }); + }); +});