From cd08c8e4c68c58397d234b6b462cc0b3bb415c0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 20 Sep 2023 15:21:42 +0200 Subject: [PATCH] refactor(core): Implement soft-deletions for executions (#7092) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Based on #7065 | Story: https://linear.app/n8n/issue/PAY-771 n8n on filesystem mode marks binary data to delete on manual execution deletion, on unsaved execution completion, and on every execution pruning cycle. We later prune binary data in a separate cycle via these marker files, based on the configured TTL. In the context of introducing an S3 client to manage binary data, the filesystem mode's mark-and-prune setup is too tightly coupled to the general binary data management client interface. This PR... - Ensures the deletion of an execution causes the deletion of any binary data associated to it. This does away with the need for binary data TTL and simplifies the filesystem mode's mark-and-prune setup. - Refactors all execution deletions (including pruning) to cause soft deletions, hard-deletes soft-deleted executions based on the existing pruning config, and adjusts execution endpoints to filter out soft-deleted executions. This reduces DB load, and keeps binary data around long enough for users to access it when building workflows with unsaved executions. - Moves all execution pruning work from an execution lifecycle hook to `execution.repository.ts`. This keeps related logic in a single place. - Removes all marking logic from the binary data manager. This simplifies the interface that the S3 client will meet. - Adds basic sanity-check tests to pruning logic and execution deletion. Out of scope: - Improving existing pruning logic. - Improving existing execution repository logic. - Adjusting dir structure for filesystem mode. --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/cli/src/ActiveExecutions.ts | 3 +- packages/cli/src/GenericHelpers.ts | 4 +- packages/cli/src/Interfaces.ts | 7 +- .../handlers/executions/executions.handler.ts | 16 +- .../handlers/executions/executions.service.ts | 6 +- packages/cli/src/Server.ts | 1 - .../cli/src/WorkflowExecuteAdditionalData.ts | 98 +-------- packages/cli/src/WorkflowHelpers.ts | 8 +- packages/cli/src/WorkflowRunner.ts | 2 +- packages/cli/src/commands/BaseCommand.ts | 2 + packages/cli/src/commands/start.ts | 3 + packages/cli/src/config/schema.ts | 18 +- packages/cli/src/constants.ts | 9 + .../src/databases/entities/ExecutionEntity.ts | 3 +- .../repositories/execution.repository.ts | 192 +++++++++++++++--- .../shared/sharedHookFunctions.ts | 6 +- .../cli/src/executions/executions.service.ts | 12 +- .../audit/credentials.risk.test.ts | 5 + .../integration/audit/database.risk.test.ts | 5 + .../integration/audit/filesystem.risk.test.ts | 5 + .../integration/audit/instance.risk.test.ts | 5 + .../test/integration/audit/nodes.risk.test.ts | 5 + .../integration/commands/import.cmd.test.ts | 5 + .../integration/executions.controller.test.ts | 39 ++++ .../test/integration/ldap/ldap.api.test.ts | 5 + .../integration/publicApi/executions.test.ts | 6 + .../cli/test/integration/shared/testDb.ts | 4 + packages/cli/test/integration/shared/types.ts | 3 +- .../integration/shared/utils/testServer.ts | 18 +- .../cli/test/unit/PermissionChecker.test.ts | 5 + .../repositories/execution.repository.test.ts | 83 ++++++++ .../core/src/BinaryDataManager/FileSystem.ts | 59 +----- packages/core/src/BinaryDataManager/index.ts | 16 -- packages/core/src/Interfaces.ts | 3 - .../core/test/NodeExecuteFunctions.test.ts | 2 - packages/nodes-base/test/nodes/Helpers.ts | 1 - 36 files changed, 411 insertions(+), 253 deletions(-) create mode 100644 packages/cli/test/integration/executions.controller.test.ts create mode 100644 packages/cli/test/unit/repositories/execution.repository.test.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 26feeee58..f79433520 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -12,6 +12,7 @@ import { createDeferredPromise, LoggerProxy } from 'n8n-workflow'; import type { ChildProcess } from 'child_process'; import type PCancelable from 'p-cancelable'; import type { + ExecutionPayload, IExecutingWorkflowData, IExecutionDb, IExecutionsCurrentSummary, @@ -38,7 +39,7 @@ export class ActiveExecutions { if (executionId === undefined) { // Is a new execution so save in DB - const fullExecutionData: IExecutionDb = { + const fullExecutionData: ExecutionPayload = { data: executionData.executionData!, mode: executionData.executionMode, finished: false, diff --git a/packages/cli/src/GenericHelpers.ts b/packages/cli/src/GenericHelpers.ts index 0034ebea9..ef9e82a9f 100644 --- a/packages/cli/src/GenericHelpers.ts +++ b/packages/cli/src/GenericHelpers.ts @@ -11,7 +11,7 @@ import { Container } from 'typedi'; import { Like } from 'typeorm'; import config from '@/config'; import * as Db from '@/Db'; -import type { ICredentialsDb, IExecutionDb, IWorkflowDb } from '@/Interfaces'; +import type { ExecutionPayload, ICredentialsDb, IWorkflowDb } from '@/Interfaces'; import * as ResponseHelper from '@/ResponseHelper'; import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; import type { CredentialsEntity } from '@db/entities/CredentialsEntity'; @@ -178,7 +178,7 @@ export async function createErrorExecution( }, }; - const fullExecutionData: IExecutionDb = { + const fullExecutionData: ExecutionPayload = { data: executionData, mode, finished: false, diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index d7718efa9..3f373e9d9 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -171,7 +171,7 @@ export type ICredentialsDecryptedResponse = ICredentialsDecryptedDb; export type SaveExecutionDataType = 'all' | 'none'; export interface IExecutionBase { - id?: string; + id: string; mode: WorkflowExecuteMode; startedAt: Date; stoppedAt?: Date; // empty value means execution is still running @@ -189,6 +189,11 @@ export interface IExecutionDb extends IExecutionBase { workflowData?: IWorkflowBase; } +/** + * Payload for creating or updating an execution. + */ +export type ExecutionPayload = Omit; + export interface IExecutionPushResponse { executionId?: string; waitingForWebhook?: boolean; diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts index 03b70556e..a6b42c96e 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.handler.ts @@ -1,13 +1,6 @@ import type express from 'express'; -import { BinaryDataManager } from 'n8n-core'; - -import { - getExecutions, - getExecutionInWorkflows, - deleteExecution, - getExecutionsCount, -} from './executions.service'; +import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service'; import { ActiveExecutions } from '@/ActiveExecutions'; import { authorize, validCursor } from '../../shared/middlewares/global.middleware'; import type { ExecutionRequest } from '../../../types'; @@ -15,6 +8,7 @@ import { getSharedWorkflowIds } from '../workflows/workflows.service'; import { encodeNextCursor } from '../../shared/services/pagination.service'; import { Container } from 'typedi'; import { InternalHooks } from '@/InternalHooks'; +import { ExecutionRepository } from '@/databases/repositories'; export = { deleteExecution: [ @@ -37,9 +31,7 @@ export = { return res.status(404).json({ message: 'Not Found' }); } - await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]); - - await deleteExecution(execution); + await Container.get(ExecutionRepository).softDelete(execution.id); execution.id = id; @@ -111,7 +103,7 @@ export = { const executions = await getExecutions(filters); - const newLastId = !executions.length ? '0' : (executions.slice(-1)[0].id as string); + const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id; filters.lastId = newLastId; diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts index 6061df800..de08c9095 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts @@ -1,4 +1,4 @@ -import type { DeleteResult, FindOptionsWhere } from 'typeorm'; +import type { FindOptionsWhere } from 'typeorm'; import { In, Not, Raw, LessThan } from 'typeorm'; import { Container } from 'typedi'; import type { ExecutionStatus } from 'n8n-workflow'; @@ -109,7 +109,3 @@ export async function getExecutionInWorkflows( unflattenData: true, }); } - -export async function deleteExecution(execution: IExecutionBase): Promise { - return Container.get(ExecutionRepository).deleteExecution(execution.id as string); -} diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index a93882f26..842cf7976 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -396,7 +396,6 @@ export class Server extends AbstractServer { ), executions_data_prune: config.getEnv('executions.pruneData'), executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'), - executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'), }, deploymentType: config.getEnv('deployment.type'), binaryDataMode: binaryDataConfig.mode, diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 2dad79507..08bfd7103 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -9,7 +9,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; +import { UserSettings, WorkflowExecute } from 'n8n-core'; import type { IDataObject, @@ -37,21 +37,16 @@ import { } from 'n8n-workflow'; import { Container } from 'typedi'; -import type { FindOptionsWhere } from 'typeorm'; -import { LessThanOrEqual, In } from 'typeorm'; -import { DateUtils } from 'typeorm/util/DateUtils'; import config from '@/config'; -import * as Db from '@/Db'; import { ActiveExecutions } from '@/ActiveExecutions'; import { CredentialsHelper } from '@/CredentialsHelper'; import { ExternalHooks } from '@/ExternalHooks'; import type { - IExecutionDb, - IExecutionFlattedDb, IPushDataExecutionFinished, IWorkflowExecuteProcess, IWorkflowExecutionDataProcess, IWorkflowErrorData, + ExecutionPayload, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import { Push } from '@/push'; @@ -184,77 +179,6 @@ export function executeErrorWorkflow( } } -/** - * Prunes Saved Execution which are older than configured. - * Throttled to be executed just once in configured timeframe. - * TODO: Consider moving this whole function to the repository or at least the queries - */ -let throttling = false; -async function pruneExecutionData(this: WorkflowHooks): Promise { - if (!throttling) { - Logger.verbose('Pruning execution data from database'); - - throttling = true; - const timeout = config.getEnv('executions.pruneDataTimeout'); // in seconds - const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h - const maxCount = config.getEnv('executions.pruneDataMaxCount'); - const date = new Date(); // today - date.setHours(date.getHours() - maxAge); - - // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 - - const utcDate = DateUtils.mixedDateToUtcDatetimeString(date); - - const toPrune: Array> = [ - { stoppedAt: LessThanOrEqual(utcDate) }, - ]; - - if (maxCount > 0) { - const executions = await Db.collections.Execution.find({ - select: ['id'], - skip: maxCount, - take: 1, - order: { id: 'DESC' }, - }); - - if (executions[0]) { - toPrune.push({ id: LessThanOrEqual(executions[0].id) }); - } - } - - try { - setTimeout(() => { - throttling = false; - }, timeout * 1000); - let executionIds: Array; - do { - executionIds = ( - await Db.collections.Execution.find({ - select: ['id'], - where: toPrune, - take: 100, - }) - ).map(({ id }) => id); - await Db.collections.Execution.delete({ id: In(executionIds) }); - // Mark binary data for deletion for all executions - await BinaryDataManager.getInstance().markDataForDeletionByExecutionIds(executionIds); - } while (executionIds.length > 0); - } catch (error) { - ErrorReporter.error(error); - throttling = false; - Logger.error( - `Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`, - { - ...error, - executionId: this.executionId, - sessionId: this.sessionId, - workflowId: this.workflowData.id, - }, - ); - } - } -} - /** * Returns hook functions to push data to Editor-UI * @@ -522,11 +446,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - // Prune old execution data - if (config.getEnv('executions.pruneData')) { - await pruneExecutionData.call(this); - } - const isManualMode = [this.mode, parentProcessMode].includes('manual'); try { @@ -554,8 +473,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { } if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) { - // Data is always saved, so we remove from database - await Container.get(ExecutionRepository).deleteExecution(this.executionId, true); + await Container.get(ExecutionRepository).softDelete(this.executionId); return; } @@ -586,8 +504,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { this.executionId, this.retryOf, ); - // Data is always saved, so we remove from database - await Container.get(ExecutionRepository).deleteExecution(this.executionId); + await Container.get(ExecutionRepository).softDelete(this.executionId); return; } @@ -682,11 +599,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); try { - // Prune old execution data - if (config.getEnv('executions.pruneData')) { - await pruneExecutionData.call(this); - } - if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { @@ -973,7 +885,7 @@ async function executeWorkflow( // Therefore, database might not contain finished errors. // Force an update to db as there should be no harm doing this - const fullExecutionData: IExecutionDb = { + const fullExecutionData: ExecutionPayload = { data: fullRunData.data, mode: fullRunData.mode, finished: fullRunData.finished ? fullRunData.finished : false, diff --git a/packages/cli/src/WorkflowHelpers.ts b/packages/cli/src/WorkflowHelpers.ts index e1a6e7057..093f713e4 100644 --- a/packages/cli/src/WorkflowHelpers.ts +++ b/packages/cli/src/WorkflowHelpers.ts @@ -23,7 +23,11 @@ import { } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; import * as Db from '@/Db'; -import type { IExecutionDb, IWorkflowErrorData, IWorkflowExecutionDataProcess } from '@/Interfaces'; +import type { + ExecutionPayload, + IWorkflowErrorData, + IWorkflowExecutionDataProcess, +} from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; // eslint-disable-next-line import/no-cycle import { WorkflowRunner } from '@/WorkflowRunner'; @@ -186,7 +190,7 @@ export async function executeErrorWorkflow( initialNode, ); - const fullExecutionData: IExecutionDb = { + const fullExecutionData: ExecutionPayload = { data: fakeExecution.data, mode: fakeExecution.mode, finished: false, diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 6c7a5b77b..cea0f9e98 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -629,7 +629,7 @@ export class WorkflowRunner { (workflowDidSucceed && saveDataSuccessExecution === 'none') || (!workflowDidSucceed && saveDataErrorExecution === 'none') ) { - await Container.get(ExecutionRepository).deleteExecution(executionId); + await Container.get(ExecutionRepository).softDelete(executionId); } // eslint-disable-next-line id-denylist } catch (err) { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 9b2658ac7..6aa1e1217 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -114,6 +114,8 @@ export abstract class BaseCommand extends Command { } async initLicense(instanceType: N8nInstanceType = 'main'): Promise { + config.set('generic.instanceType', instanceType); + const license = Container.get(License); await license.init(this.instanceId, instanceType); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index b0a9b0b73..0cebdb633 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -29,6 +29,7 @@ import { eventBus } from '@/eventbus'; import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -103,6 +104,8 @@ export class Start extends BaseCommand { // Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete await Container.get(License).shutdown(); + Container.get(ExecutionRepository).clearTimers(); + await Container.get(InternalHooks).onN8nStop(); const skipWebhookDeregistration = config.getEnv( diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 6a67adfd5..1704be42b 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -322,12 +322,6 @@ export const schema = { default: 336, env: 'EXECUTIONS_DATA_MAX_AGE', }, - pruneDataTimeout: { - doc: 'Timeout (seconds) after execution data has been pruned', - format: Number, - default: 3600, - env: 'EXECUTIONS_DATA_PRUNE_TIMEOUT', - }, // Additional pruning option to delete executions if total count exceeds the configured max. // Deletes the oldest entries first @@ -438,6 +432,12 @@ export const schema = { default: 'America/New_York', env: 'GENERIC_TIMEZONE', }, + + instanceType: { + doc: 'Type of n8n instance', + format: ['main', 'webhook', 'worker'] as const, + default: 'main', + }, }, // How n8n can be reached (Editor & REST-API) @@ -919,12 +919,6 @@ export const schema = { env: 'N8N_BINARY_DATA_STORAGE_PATH', doc: 'Path for binary data storage in "filesystem" mode', }, - binaryDataTTL: { - format: Number, - default: 60, - env: 'N8N_BINARY_DATA_TTL', - doc: 'TTL for binary data of unsaved executions in minutes', - }, }, deployment: { diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 8d9bbb9e1..c94af1f5c 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -94,3 +94,12 @@ export const CREDENTIAL_BLANKING_VALUE = '__n8n_BLANK_VALUE_e5362baf-c777-4d57-a export const UM_FIX_INSTRUCTION = 'Please fix the database by running ./packages/cli/bin/n8n user-management:reset'; + +/** + * Units of time in milliseconds + */ +export const TIME = { + SECOND: 1000, + MINUTE: 60 * 1000, + HOUR: 60 * 60 * 1000, +}; diff --git a/packages/cli/src/databases/entities/ExecutionEntity.ts b/packages/cli/src/databases/entities/ExecutionEntity.ts index f71bf0ba5..d73267ee5 100644 --- a/packages/cli/src/databases/entities/ExecutionEntity.ts +++ b/packages/cli/src/databases/entities/ExecutionEntity.ts @@ -9,6 +9,7 @@ import { OneToOne, PrimaryColumn, Relation, + DeleteDateColumn, } from 'typeorm'; import { datetimeColumnType } from './AbstractEntity'; import { idStringifier } from '../utils/transformers'; @@ -49,7 +50,7 @@ export class ExecutionEntity { @Column({ type: datetimeColumnType, nullable: true }) stoppedAt: Date; - @Column(datetimeColumnType) + @DeleteDateColumn({ type: datetimeColumnType, nullable: true }) deletedAt: Date; @Column({ nullable: true }) diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index aee597c2c..651e6d7bf 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -1,29 +1,31 @@ import { Service } from 'typedi'; -import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm'; +import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm'; +import { DateUtils } from 'typeorm/util/DateUtils'; import type { FindManyOptions, FindOneOptions, FindOptionsWhere, SelectQueryBuilder, } from 'typeorm'; -import { ExecutionEntity } from '../entities/ExecutionEntity'; import { parse, stringify } from 'flatted'; +import { LoggerProxy as Logger } from 'n8n-workflow'; +import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow'; +import { BinaryDataManager } from 'n8n-core'; import type { + ExecutionPayload, IExecutionBase, - IExecutionDb, IExecutionFlattedDb, IExecutionResponse, } from '@/Interfaces'; -import { LoggerProxy } from 'n8n-workflow'; -import type { IExecutionsSummary, IRunExecutionData } from 'n8n-workflow'; -import { ExecutionDataRepository } from './executionData.repository'; -import type { ExecutionData } from '../entities/ExecutionData'; + +import config from '@/config'; import type { IGetExecutionsQueryFilter } from '@/executions/executions.service'; import { isAdvancedExecutionFiltersEnabled } from '@/executions/executionHelpers'; +import type { ExecutionData } from '../entities/ExecutionData'; +import { ExecutionEntity } from '../entities/ExecutionEntity'; import { ExecutionMetadata } from '../entities/ExecutionMetadata'; -import { DateUtils } from 'typeorm/util/DateUtils'; -import { BinaryDataManager } from 'n8n-core'; -import config from '@/config'; +import { ExecutionDataRepository } from './executionData.repository'; +import { TIME } from '@/constants'; function parseFiltersToQueryBuilder( qb: SelectQueryBuilder, @@ -66,11 +68,59 @@ function parseFiltersToQueryBuilder( @Service() export class ExecutionRepository extends Repository { + private logger = Logger; + + deletionBatchSize = 100; + + private intervals: Record = { + softDeletion: undefined, + hardDeletion: undefined, + }; + + private rates: Record = { + softDeletion: 1 * TIME.HOUR, + hardDeletion: 15 * TIME.MINUTE, + }; + + private isMainInstance = config.get('generic.instanceType') === 'main'; + + private isPruningEnabled = config.getEnv('executions.pruneData'); + constructor( dataSource: DataSource, private readonly executionDataRepository: ExecutionDataRepository, ) { super(ExecutionEntity, dataSource.manager); + + if (!this.isMainInstance) return; + + if (this.isPruningEnabled) this.setSoftDeletionInterval(); + + this.setHardDeletionInterval(); + } + + clearTimers() { + if (!this.isMainInstance) return; + + this.logger.debug('Clearing soft-deletion and hard-deletion intervals for executions'); + + clearInterval(this.intervals.softDeletion); + clearInterval(this.intervals.hardDeletion); + } + + setSoftDeletionInterval() { + this.logger.debug('Setting soft-deletion interval (pruning) for executions'); + + this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion); + } + + setHardDeletionInterval() { + this.logger.debug('Setting hard-deletion interval for executions'); + + this.intervals.hardDeletion = setInterval( + async () => this.hardDelete(), + this.rates.hardDeletion, + ); } async findMultipleExecutions( @@ -203,7 +253,7 @@ export class ExecutionRepository extends Repository { return rest; } - async createNewExecution(execution: IExecutionDb) { + async createNewExecution(execution: ExecutionPayload) { const { data, workflowData, ...rest } = execution; const newExecution = await this.save(rest); @@ -248,16 +298,6 @@ export class ExecutionRepository extends Repository { } } - async deleteExecution(executionId: string, deferBinaryDataDeletion = false) { - const binaryDataManager = BinaryDataManager.getInstance(); - if (deferBinaryDataDeletion) { - await binaryDataManager.markDataForDeletionByExecutionId(executionId); - } else { - await binaryDataManager.deleteBinaryDataByExecutionIds([executionId]); - } - return this.delete({ id: executionId }); - } - async countExecutions( filters: IGetExecutionsQueryFilter | undefined, accessibleWorkflowIds: string[], @@ -297,7 +337,7 @@ export class ExecutionRepository extends Repository { } } catch (error) { if (error instanceof Error) { - LoggerProxy.warn(`Failed to get executions count from Postgres: ${error.message}`, { + Logger.warn(`Failed to get executions count from Postgres: ${error.message}`, { error, }); } @@ -341,7 +381,6 @@ export class ExecutionRepository extends Repository { // eslint-disable-next-line @typescript-eslint/naming-convention .orderBy({ 'execution.id': 'DESC' }) .andWhere('execution.workflowId IN (:...accessibleWorkflowIds)', { accessibleWorkflowIds }); - if (excludedExecutionIds.length > 0) { query.andWhere('execution.id NOT IN (:...excludedExecutionIds)', { excludedExecutionIds }); } @@ -367,7 +406,7 @@ export class ExecutionRepository extends Repository { }); } - async deleteExecutions( + async deleteExecutionsByFilter( filters: IGetExecutionsQueryFilter | undefined, accessibleWorkflowIds: string[], deleteConditions: { @@ -399,7 +438,7 @@ export class ExecutionRepository extends Repository { if (!executions.length) { if (deleteConditions.ids) { - LoggerProxy.error('Failed to delete an execution due to insufficient permissions', { + Logger.error('Failed to delete an execution due to insufficient permissions', { executionIds: deleteConditions.ids, }); } @@ -407,13 +446,106 @@ export class ExecutionRepository extends Repository { } const executionIds = executions.map(({ id }) => id); + do { + // Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error + const batch = executionIds.splice(0, this.deletionBatchSize); + await this.softDelete(batch); + } while (executionIds.length > 0); + } + + async prune() { + Logger.verbose('Soft-deleting (pruning) execution data from database'); + + const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h + const maxCount = config.getEnv('executions.pruneDataMaxCount'); + + // Find ids of all executions that were stopped longer that pruneDataMaxAge ago + const date = new Date(); + date.setHours(date.getHours() - maxAge); + + const toPrune: Array> = [ + // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 + { stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) }, + ]; + + if (maxCount > 0) { + const executions = await this.find({ + select: ['id'], + skip: maxCount, + take: 1, + order: { id: 'DESC' }, + }); + + if (executions[0]) { + toPrune.push({ id: LessThanOrEqual(executions[0].id) }); + } + } + + const [timeBasedWhere, countBasedWhere] = toPrune; + + await this.createQueryBuilder() + .update(ExecutionEntity) + .set({ deletedAt: new Date() }) + .where( + new Brackets((qb) => + countBasedWhere + ? qb.where(timeBasedWhere).orWhere(countBasedWhere) + : qb.where(timeBasedWhere), + ), + ) + .execute(); + } + + /** + * Permanently delete all soft-deleted executions and their binary data, in batches. + */ + private async hardDelete() { + // Find ids of all executions that were deleted over an hour ago + const date = new Date(); + date.setHours(date.getHours() - 1); + + const executionIds = ( + await this.find({ + select: ['id'], + where: { + deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)), + }, + take: this.deletionBatchSize, + + /** + * @important This ensures soft-deleted executions are included, + * else `@DeleteDateColumn()` at `deletedAt` will exclude them. + */ + withDeleted: true, + }) + ).map(({ id }) => id); + const binaryDataManager = BinaryDataManager.getInstance(); await binaryDataManager.deleteBinaryDataByExecutionIds(executionIds); - do { - // Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error - const batch = executionIds.splice(0, 500); - await this.delete(batch); - } while (executionIds.length > 0); + this.logger.debug(`Hard-deleting ${executionIds.length} executions from database`, { + executionIds, + }); + + // Actually delete these executions + await this.delete({ id: In(executionIds) }); + + /** + * If the volume of executions to prune is as high as the batch size, there is a risk + * that the pruning process is unable to catch up to the creation of new executions, + * with high concurrency possibly leading to errors from duplicate deletions. + * + * Therefore, in this high-volume case we speed up the hard deletion cycle, until + * the number of executions to prune is low enough to fit in a single batch. + */ + if (executionIds.length === this.deletionBatchSize) { + clearInterval(this.intervals.hardDeletion); + + setTimeout(async () => this.hardDelete(), 1 * TIME.SECOND); + } else { + if (this.intervals.hardDeletion) return; + + this.setHardDeletionInterval(); + } } } diff --git a/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts index cf68a1930..113bb2980 100644 --- a/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts +++ b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts @@ -1,5 +1,5 @@ import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow'; -import type { IExecutionDb } from '@/Interfaces'; +import type { ExecutionPayload, IExecutionDb } from '@/Interfaces'; import pick from 'lodash/pick'; import { isWorkflowIdValid } from '@/utils'; import { LoggerProxy } from 'n8n-workflow'; @@ -24,7 +24,7 @@ export function prepareExecutionDataForDbUpdate(parameters: { workflowData: IWorkflowBase; workflowStatusFinal: ExecutionStatus; retryOf?: string; -}): IExecutionDb { +}) { const { runData, workflowData, workflowStatusFinal, retryOf } = parameters; // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive // As a result, we should create an IWorkflowBase object with only the data we want to save in it. @@ -41,7 +41,7 @@ export function prepareExecutionDataForDbUpdate(parameters: { 'pinData', ]); - const fullExecutionData: IExecutionDb = { + const fullExecutionData: ExecutionPayload = { data: runData.data, mode: runData.mode, finished: runData.finished ? runData.finished : false, diff --git a/packages/cli/src/executions/executions.service.ts b/packages/cli/src/executions/executions.service.ts index aab68b449..d13776c5d 100644 --- a/packages/cli/src/executions/executions.service.ts +++ b/packages/cli/src/executions/executions.service.ts @@ -351,9 +351,13 @@ export class ExecutionsService { } } - return Container.get(ExecutionRepository).deleteExecutions(requestFilters, sharedWorkflowIds, { - deleteBefore, - ids, - }); + return Container.get(ExecutionRepository).deleteExecutionsByFilter( + requestFilters, + sharedWorkflowIds, + { + deleteBefore, + ids, + }, + ); } } diff --git a/packages/cli/test/integration/audit/credentials.risk.test.ts b/packages/cli/test/integration/audit/credentials.risk.test.ts index 10d1d9ecb..49a399841 100644 --- a/packages/cli/test/integration/audit/credentials.risk.test.ts +++ b/packages/cli/test/integration/audit/credentials.risk.test.ts @@ -7,6 +7,11 @@ import { getRiskSection } from './utils'; import * as testDb from '../shared/testDb'; import { generateNanoId } from '@db/utils/generators'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + beforeAll(async () => { await testDb.init(); }); diff --git a/packages/cli/test/integration/audit/database.risk.test.ts b/packages/cli/test/integration/audit/database.risk.test.ts index a4068f5d7..b2e485663 100644 --- a/packages/cli/test/integration/audit/database.risk.test.ts +++ b/packages/cli/test/integration/audit/database.risk.test.ts @@ -10,6 +10,11 @@ import { getRiskSection, saveManualTriggerWorkflow } from './utils'; import * as testDb from '../shared/testDb'; import { generateNanoId } from '@db/utils/generators'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + beforeAll(async () => { await testDb.init(); }); diff --git a/packages/cli/test/integration/audit/filesystem.risk.test.ts b/packages/cli/test/integration/audit/filesystem.risk.test.ts index d8f3e711e..33d0ef8fa 100644 --- a/packages/cli/test/integration/audit/filesystem.risk.test.ts +++ b/packages/cli/test/integration/audit/filesystem.risk.test.ts @@ -5,6 +5,11 @@ import { FILESYSTEM_INTERACTION_NODE_TYPES, FILESYSTEM_REPORT } from '@/audit/co import { getRiskSection, saveManualTriggerWorkflow } from './utils'; import * as testDb from '../shared/testDb'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + beforeAll(async () => { await testDb.init(); }); diff --git a/packages/cli/test/integration/audit/instance.risk.test.ts b/packages/cli/test/integration/audit/instance.risk.test.ts index aa186c4d7..349f5de38 100644 --- a/packages/cli/test/integration/audit/instance.risk.test.ts +++ b/packages/cli/test/integration/audit/instance.risk.test.ts @@ -14,6 +14,11 @@ import { toReportTitle } from '@/audit/utils'; import config from '@/config'; import { generateNanoId } from '@db/utils/generators'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + beforeAll(async () => { await testDb.init(); diff --git a/packages/cli/test/integration/audit/nodes.risk.test.ts b/packages/cli/test/integration/audit/nodes.risk.test.ts index fb966d236..dbe414a95 100644 --- a/packages/cli/test/integration/audit/nodes.risk.test.ts +++ b/packages/cli/test/integration/audit/nodes.risk.test.ts @@ -11,6 +11,11 @@ import { NodeTypes } from '@/NodeTypes'; import { CommunityPackageService } from '@/services/communityPackage.service'; import Container from 'typedi'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + const nodesAndCredentials = mockInstance(LoadNodesAndCredentials); nodesAndCredentials.getCustomDirectories.mockReturnValue([]); mockInstance(NodeTypes); diff --git a/packages/cli/test/integration/commands/import.cmd.test.ts b/packages/cli/test/integration/commands/import.cmd.test.ts index 750ed86b4..a191267f6 100644 --- a/packages/cli/test/integration/commands/import.cmd.test.ts +++ b/packages/cli/test/integration/commands/import.cmd.test.ts @@ -4,6 +4,11 @@ import { InternalHooks } from '@/InternalHooks'; import { ImportWorkflowsCommand } from '@/commands/import/workflow'; import * as Config from '@oclif/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + beforeAll(async () => { mockInstance(InternalHooks); await testDb.init(); diff --git a/packages/cli/test/integration/executions.controller.test.ts b/packages/cli/test/integration/executions.controller.test.ts new file mode 100644 index 000000000..2af9ec5b8 --- /dev/null +++ b/packages/cli/test/integration/executions.controller.test.ts @@ -0,0 +1,39 @@ +import * as testDb from './shared/testDb'; +import { setupTestServer } from './shared/utils'; +import type { User } from '@/databases/entities/User'; + +let testServer = setupTestServer({ endpointGroups: ['executions'] }); + +let owner: User; + +const saveExecution = async ({ belongingTo }: { belongingTo: User }) => { + const workflow = await testDb.createWorkflow({}, belongingTo); + return testDb.createSuccessfulExecution(workflow); +}; + +beforeEach(async () => { + await testDb.truncate(['Execution', 'Workflow', 'SharedWorkflow']); + owner = await testDb.createOwner(); +}); + +describe('POST /executions/delete', () => { + test('should hard-delete an execution', async () => { + await saveExecution({ belongingTo: owner }); + + const response = await testServer.authAgentFor(owner).get('/executions').expect(200); + + expect(response.body.data.count).toBe(1); + + const [execution] = response.body.data.results; + + await testServer + .authAgentFor(owner) + .post('/executions/delete') + .send({ ids: [execution.id] }) + .expect(200); + + const executions = await testDb.getAllExecutions(); + + expect(executions).toHaveLength(0); + }); +}); diff --git a/packages/cli/test/integration/ldap/ldap.api.test.ts b/packages/cli/test/integration/ldap/ldap.api.test.ts index 8b00566d1..1a3e23cd8 100644 --- a/packages/cli/test/integration/ldap/ldap.api.test.ts +++ b/packages/cli/test/integration/ldap/ldap.api.test.ts @@ -17,6 +17,9 @@ import { randomEmail, randomName, uniqueId } from './../shared/random'; import * as testDb from './../shared/testDb'; import * as utils from '../shared/utils/'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + jest.mock('@/telemetry'); jest.mock('@/UserManagement/email/NodeMailer'); @@ -24,6 +27,8 @@ let globalMemberRole: Role; let owner: User; let authOwnerAgent: SuperAgentTest; +LoggerProxy.init(getLogger()); + const defaultLdapConfig = { ...LDAP_DEFAULT_CONFIGURATION, loginEnabled: true, diff --git a/packages/cli/test/integration/publicApi/executions.test.ts b/packages/cli/test/integration/publicApi/executions.test.ts index 2f5216a8b..9b208548a 100644 --- a/packages/cli/test/integration/publicApi/executions.test.ts +++ b/packages/cli/test/integration/publicApi/executions.test.ts @@ -5,6 +5,8 @@ import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { randomApiKey } from '../shared/random'; import * as utils from '../shared/utils/'; import * as testDb from '../shared/testDb'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; let owner: User; let user1: User; @@ -14,6 +16,8 @@ let authUser1Agent: SuperAgentTest; let authUser2Agent: SuperAgentTest; let workflowRunner: ActiveWorkflowRunner; +LoggerProxy.init(getLogger()); + const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] }); beforeAll(async () => { @@ -168,6 +172,8 @@ describe('DELETE /executions/:id', () => { expect(stoppedAt).not.toBeNull(); expect(workflowId).toBe(execution.workflowId); expect(waitTill).toBeNull(); + + await authOwnerAgent.get(`/executions/${execution.id}`).expect(404); }); }); diff --git a/packages/cli/test/integration/shared/testDb.ts b/packages/cli/test/integration/shared/testDb.ts index 4a9bd306b..91fa1723c 100644 --- a/packages/cli/test/integration/shared/testDb.ts +++ b/packages/cli/test/integration/shared/testDb.ts @@ -528,6 +528,10 @@ export async function getAllWorkflows() { return Db.collections.Workflow.find(); } +export async function getAllExecutions() { + return Db.collections.Execution.find(); +} + // ---------------------------------- // workflow sharing // ---------------------------------- diff --git a/packages/cli/test/integration/shared/types.ts b/packages/cli/test/integration/shared/types.ts index 3fd972a6b..f552c7e1f 100644 --- a/packages/cli/test/integration/shared/types.ts +++ b/packages/cli/test/integration/shared/types.ts @@ -28,7 +28,8 @@ export type EndpointGroup = | 'tags' | 'externalSecrets' | 'mfa' - | 'metrics'; + | 'metrics' + | 'executions'; export interface SetupProps { applyAuth?: boolean; diff --git a/packages/cli/test/integration/shared/utils/testServer.ts b/packages/cli/test/integration/shared/utils/testServer.ts index 2ea593305..21e381ff8 100644 --- a/packages/cli/test/integration/shared/utils/testServer.ts +++ b/packages/cli/test/integration/shared/utils/testServer.ts @@ -64,6 +64,7 @@ import { import { JwtService } from '@/services/jwt.service'; import { RoleService } from '@/services/role.service'; import { UserService } from '@/services/user.service'; +import { executionsController } from '@/executions/executions.controller'; /** * Plugin to prefix a path segment into a request URL pathname. @@ -93,7 +94,14 @@ const classifyEndpointGroups = (endpointGroups: EndpointGroup[]) => { const routerEndpoints: EndpointGroup[] = []; const functionEndpoints: EndpointGroup[] = []; - const ROUTER_GROUP = ['credentials', 'workflows', 'publicApi', 'license', 'variables']; + const ROUTER_GROUP = [ + 'credentials', + 'workflows', + 'publicApi', + 'license', + 'variables', + 'executions', + ]; endpointGroups.forEach((group) => (ROUTER_GROUP.includes(group) ? routerEndpoints : functionEndpoints).push(group), @@ -133,6 +141,9 @@ export const setupTestServer = ({ app.use(rawBodyReader); app.use(cookieParser()); + const logger = getLogger(); + LoggerProxy.init(logger); + const testServer: TestServer = { app, httpServer: app.listen(0), @@ -144,15 +155,13 @@ export const setupTestServer = ({ beforeAll(async () => { await testDb.init(); - const logger = getLogger(); - LoggerProxy.init(logger); - // Mock all telemetry. mockInstance(InternalHooks); mockInstance(PostHogClient); config.set('userManagement.jwtSecret', 'My JWT secret'); config.set('userManagement.isInstanceOwnerSetUp', true); + config.set('executions.pruneData', false); if (enabledFeatures) { Container.get(License).isFeatureEnabled = (feature) => enabledFeatures.includes(feature); @@ -175,6 +184,7 @@ export const setupTestServer = ({ workflows: { controller: workflowsController, path: 'workflows' }, license: { controller: licenseController, path: 'license' }, variables: { controller: variablesController, path: 'variables' }, + executions: { controller: executionsController, path: 'executions' }, }; if (enablePublicAPI) { diff --git a/packages/cli/test/unit/PermissionChecker.test.ts b/packages/cli/test/unit/PermissionChecker.test.ts index ab1443c0c..139cf0571 100644 --- a/packages/cli/test/unit/PermissionChecker.test.ts +++ b/packages/cli/test/unit/PermissionChecker.test.ts @@ -25,6 +25,11 @@ import type { SaveCredentialFunction } from '../integration/shared/types'; import { mockInstance } from '../integration/shared/utils/'; import { OwnershipService } from '@/services/ownership.service'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +LoggerProxy.init(getLogger()); + let mockNodeTypes: INodeTypes; let credentialOwnerRole: Role; let workflowOwnerRole: Role; diff --git a/packages/cli/test/unit/repositories/execution.repository.test.ts b/packages/cli/test/unit/repositories/execution.repository.test.ts new file mode 100644 index 000000000..7d2d0350d --- /dev/null +++ b/packages/cli/test/unit/repositories/execution.repository.test.ts @@ -0,0 +1,83 @@ +import { Container } from 'typedi'; +import { DataSource, EntityManager } from 'typeorm'; +import { mock } from 'jest-mock-extended'; +import { mockInstance } from '../../integration/shared/utils/'; +import { ExecutionRepository } from '@/databases/repositories'; +import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; +import { TIME } from '@/constants'; +import { DateUtils } from 'typeorm/util/DateUtils'; + +jest.mock('typeorm/util/DateUtils'); + +LoggerProxy.init(getLogger()); + +const { objectContaining } = expect; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const qb: any = { + update: jest.fn().mockReturnThis(), + set: jest.fn().mockReturnThis(), + where: jest.fn().mockReturnThis(), + execute: jest.fn().mockReturnThis(), +}; + +describe('ExecutionRepository', () => { + const entityManager = mockInstance(EntityManager); + const dataSource = mockInstance(DataSource, { manager: entityManager }); + dataSource.getMetadata.mockReturnValue(mock()); + Object.assign(entityManager, { connection: dataSource }); + + const executionRepository = Container.get(ExecutionRepository); + + beforeAll(() => { + Container.set(ExecutionRepository, executionRepository); + LoggerProxy.init(getLogger()); + }); + + beforeEach(() => { + config.load(config.default); + + jest.clearAllMocks(); + }); + + describe('pruneBySoftDeleting()', () => { + test('should limit pruning based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => { + const maxCount = 1; + + config.set('executions.pruneDataMaxCount', maxCount); + + const find = jest.spyOn(ExecutionRepository.prototype, 'find'); + entityManager.find.mockResolvedValue([]); + + jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb); + + await executionRepository.prune(); + + expect(find.mock.calls[0][0]).toEqual(objectContaining({ skip: maxCount })); + }); + + test('should limit pruning based on EXECUTIONS_DATA_MAX_AGE', async () => { + const maxAge = 5; // hours + + config.set('executions.pruneDataMaxCount', 0); // disable prune-by-count path + config.set('executions.pruneDataMaxAge', 5); + + entityManager.find.mockResolvedValue([]); + + jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb); + + const dateFormat = jest.spyOn(DateUtils, 'mixedDateToUtcDatetimeString'); + + const now = Date.now(); + + await executionRepository.prune(); + + const argDate = dateFormat.mock.calls[0][0]; + const difference = now - argDate.valueOf(); + + expect(Math.round(difference / TIME.HOUR)).toBe(maxAge); + }); + }); +}); diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts index cf9f1d94d..5c4bdd32a 100644 --- a/packages/core/src/BinaryDataManager/FileSystem.ts +++ b/packages/core/src/BinaryDataManager/FileSystem.ts @@ -1,4 +1,3 @@ -import glob from 'fast-glob'; import { createReadStream } from 'fs'; import fs from 'fs/promises'; import path from 'path'; @@ -10,32 +9,18 @@ import { jsonParse } from 'n8n-workflow'; import type { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { FileNotFoundError } from '../errors'; -const PREFIX_METAFILE = 'binarymeta'; - const executionExtractionRegexp = /^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/; export class BinaryDataFileSystem implements IBinaryDataManager { private storagePath: string; - private binaryDataTTL: number; - constructor(config: IBinaryDataConfig) { this.storagePath = config.localStoragePath; - this.binaryDataTTL = config.binaryDataTTL; } - async init(startPurger = false): Promise { - if (startPurger) { - setInterval(async () => { - await this.deleteMarkedFiles(); - }, this.binaryDataTTL * 30000); - } - + async init() { await this.assertFolder(this.storagePath); - await this.assertFolder(this.getBinaryDataMetaPath()); - - await this.deleteMarkedFiles(); } async getFileSize(identifier: string): Promise { @@ -81,44 +66,6 @@ export class BinaryDataFileSystem implements IBinaryDataManager { return this.resolveStoragePath(`${identifier}.metadata`); } - async markDataForDeletionByExecutionId(executionId: string): Promise { - const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000); - return fs.writeFile( - this.resolveStoragePath('meta', `${PREFIX_METAFILE}_${executionId}_${tt.valueOf()}`), - '', - ); - } - - async deleteMarkedFiles(): Promise { - return this.deleteMarkedFilesByMeta(this.getBinaryDataMetaPath(), PREFIX_METAFILE); - } - - private async deleteMarkedFilesByMeta(metaPath: string, filePrefix: string): Promise { - const currentTimeValue = new Date().valueOf(); - const metaFileNames = await glob(`${filePrefix}_*`, { cwd: metaPath }); - - const executionIds = metaFileNames - .map((f) => f.split('_') as [string, string, string]) - .filter(([prefix, , ts]) => { - if (prefix !== filePrefix) return false; - const execTimestamp = parseInt(ts, 10); - return execTimestamp < currentTimeValue; - }) - .map((e) => e[1]); - - const filesToDelete = []; - const deletedIds = await this.deleteBinaryDataByExecutionIds(executionIds); - for (const executionId of deletedIds) { - filesToDelete.push( - ...(await glob(`${filePrefix}_${executionId}_`, { - absolute: true, - cwd: metaPath, - })), - ); - } - await Promise.all(filesToDelete.map(async (file) => fs.rm(file))); - } - async duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise { const newBinaryDataId = this.generateFileName(prefix); @@ -160,10 +107,6 @@ export class BinaryDataFileSystem implements IBinaryDataManager { return [prefix, uuid()].join(''); } - private getBinaryDataMetaPath() { - return path.join(this.storagePath, 'meta'); - } - private async deleteFromLocalStorage(identifier: string) { return fs.rm(this.getBinaryPath(identifier)); } diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts index 7bfbf614c..eca32f184 100644 --- a/packages/core/src/BinaryDataManager/index.ts +++ b/packages/core/src/BinaryDataManager/index.ts @@ -156,22 +156,6 @@ export class BinaryDataManager { throw new Error('Storage mode used to store binary data not available'); } - async markDataForDeletionByExecutionId(executionId: string): Promise { - if (this.managers[this.binaryDataMode]) { - await this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(executionId); - } - } - - async markDataForDeletionByExecutionIds(executionIds: string[]): Promise { - if (this.managers[this.binaryDataMode]) { - await Promise.all( - executionIds.map(async (id) => - this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(id), - ), - ); - } - } - async deleteBinaryDataByExecutionIds(executionIds: string[]): Promise { if (this.managers[this.binaryDataMode]) { await this.managers[this.binaryDataMode].deleteBinaryDataByExecutionIds(executionIds); diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index d01b655bf..f1f3f4ff7 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -38,7 +38,6 @@ export interface IBinaryDataConfig { mode: 'default' | 'filesystem'; availableModes: string; localStoragePath: string; - binaryDataTTL: number; } export interface IBinaryDataManager { @@ -51,8 +50,6 @@ export interface IBinaryDataManager { retrieveBinaryDataByIdentifier(identifier: string): Promise; getBinaryPath(identifier: string): string; getBinaryStream(identifier: string, chunkSize?: number): Readable; - markDataForDeletionByExecutionId(executionId: string): Promise; - deleteMarkedFiles(): Promise; deleteBinaryDataByIdentifier(identifier: string): Promise; duplicateBinaryDataByIdentifier(binaryDataId: string, prefix: string): Promise; deleteBinaryDataByExecutionIds(executionIds: string[]): Promise; diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index 77261c2b6..16a77c847 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -36,7 +36,6 @@ describe('NodeExecuteFunctions', () => { mode: 'default', availableModes: 'default', localStoragePath: temporaryDir, - binaryDataTTL: 1, }); // Set our binary data buffer @@ -86,7 +85,6 @@ describe('NodeExecuteFunctions', () => { mode: 'filesystem', availableModes: 'filesystem', localStoragePath: temporaryDir, - binaryDataTTL: 1, }); // Set our binary data buffer diff --git a/packages/nodes-base/test/nodes/Helpers.ts b/packages/nodes-base/test/nodes/Helpers.ts index ecc87dbb5..d14ef9957 100644 --- a/packages/nodes-base/test/nodes/Helpers.ts +++ b/packages/nodes-base/test/nodes/Helpers.ts @@ -222,7 +222,6 @@ export async function initBinaryDataManager(mode: 'default' | 'filesystem' = 'de mode, availableModes: mode, localStoragePath: temporaryDir, - binaryDataTTL: 1, }); return temporaryDir; }