Based on #7065 | Story: https://linear.app/n8n/issue/PAY-771 n8n on filesystem mode marks binary data to delete on manual execution deletion, on unsaved execution completion, and on every execution pruning cycle. We later prune binary data in a separate cycle via these marker files, based on the configured TTL. In the context of introducing an S3 client to manage binary data, the filesystem mode's mark-and-prune setup is too tightly coupled to the general binary data management client interface. This PR... - Ensures the deletion of an execution causes the deletion of any binary data associated to it. This does away with the need for binary data TTL and simplifies the filesystem mode's mark-and-prune setup. - Refactors all execution deletions (including pruning) to cause soft deletions, hard-deletes soft-deleted executions based on the existing pruning config, and adjusts execution endpoints to filter out soft-deleted executions. This reduces DB load, and keeps binary data around long enough for users to access it when building workflows with unsaved executions. - Moves all execution pruning work from an execution lifecycle hook to `execution.repository.ts`. This keeps related logic in a single place. - Removes all marking logic from the binary data manager. This simplifies the interface that the S3 client will meet. - Adds basic sanity-check tests to pruning logic and execution deletion. Out of scope: - Improving existing pruning logic. - Improving existing execution repository logic. - Adjusting dir structure for filesystem mode. --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
245 lines
6.8 KiB
TypeScript
245 lines
6.8 KiB
TypeScript
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
|
|
|
|
import { Container, Service } from 'typedi';
|
|
import type {
|
|
IDeferredPromise,
|
|
IExecuteResponsePromiseData,
|
|
IRun,
|
|
ExecutionStatus,
|
|
} from 'n8n-workflow';
|
|
import { createDeferredPromise, LoggerProxy } from 'n8n-workflow';
|
|
|
|
import type { ChildProcess } from 'child_process';
|
|
import type PCancelable from 'p-cancelable';
|
|
import type {
|
|
ExecutionPayload,
|
|
IExecutingWorkflowData,
|
|
IExecutionDb,
|
|
IExecutionsCurrentSummary,
|
|
IWorkflowExecutionDataProcess,
|
|
} from '@/Interfaces';
|
|
import { isWorkflowIdValid } from '@/utils';
|
|
import { ExecutionRepository } from '@db/repositories';
|
|
|
|
@Service()
|
|
export class ActiveExecutions {
|
|
private activeExecutions: {
|
|
[index: string]: IExecutingWorkflowData;
|
|
} = {};
|
|
|
|
/**
|
|
* Add a new active execution
|
|
*/
|
|
async add(
|
|
executionData: IWorkflowExecutionDataProcess,
|
|
process?: ChildProcess,
|
|
executionId?: string,
|
|
): Promise<string> {
|
|
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
|
|
if (executionId === undefined) {
|
|
// Is a new execution so save in DB
|
|
|
|
const fullExecutionData: ExecutionPayload = {
|
|
data: executionData.executionData!,
|
|
mode: executionData.executionMode,
|
|
finished: false,
|
|
startedAt: new Date(),
|
|
workflowData: executionData.workflowData,
|
|
status: executionStatus,
|
|
};
|
|
|
|
if (executionData.retryOf !== undefined) {
|
|
fullExecutionData.retryOf = executionData.retryOf.toString();
|
|
}
|
|
|
|
const workflowId = executionData.workflowData.id;
|
|
if (workflowId !== undefined && isWorkflowIdValid(workflowId)) {
|
|
fullExecutionData.workflowId = workflowId;
|
|
}
|
|
|
|
const executionResult =
|
|
await Container.get(ExecutionRepository).createNewExecution(fullExecutionData);
|
|
executionId = executionResult.id;
|
|
if (executionId === undefined) {
|
|
throw new Error('There was an issue assigning an execution id to the execution');
|
|
}
|
|
executionStatus = 'running';
|
|
} else {
|
|
// Is an existing execution we want to finish so update in DB
|
|
|
|
const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
|
|
id: executionId,
|
|
data: executionData.executionData!,
|
|
waitTill: null,
|
|
status: executionStatus,
|
|
};
|
|
|
|
await Container.get(ExecutionRepository).updateExistingExecution(executionId, execution);
|
|
}
|
|
|
|
this.activeExecutions[executionId] = {
|
|
executionData,
|
|
process,
|
|
startedAt: new Date(),
|
|
postExecutePromises: [],
|
|
status: executionStatus,
|
|
};
|
|
|
|
return executionId;
|
|
}
|
|
|
|
/**
|
|
* Attaches an execution
|
|
*
|
|
*/
|
|
|
|
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
throw new Error(
|
|
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
|
|
);
|
|
}
|
|
|
|
this.activeExecutions[executionId].workflowExecution = workflowExecution;
|
|
}
|
|
|
|
attachResponsePromise(
|
|
executionId: string,
|
|
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
|
|
): void {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
throw new Error(
|
|
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
|
|
);
|
|
}
|
|
|
|
this.activeExecutions[executionId].responsePromise = responsePromise;
|
|
}
|
|
|
|
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
return;
|
|
}
|
|
|
|
this.activeExecutions[executionId].responsePromise?.resolve(response);
|
|
}
|
|
|
|
getPostExecutePromiseCount(executionId: string): number {
|
|
return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0;
|
|
}
|
|
|
|
/**
|
|
* Remove an active execution
|
|
*
|
|
*/
|
|
remove(executionId: string, fullRunData?: IRun): void {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
return;
|
|
}
|
|
|
|
// Resolve all the waiting promises
|
|
|
|
for (const promise of this.activeExecutions[executionId].postExecutePromises) {
|
|
promise.resolve(fullRunData);
|
|
}
|
|
|
|
// Remove from the list of active executions
|
|
delete this.activeExecutions[executionId];
|
|
}
|
|
|
|
/**
|
|
* Forces an execution to stop
|
|
*
|
|
* @param {string} executionId The id of the execution to stop
|
|
* @param {string} timeout String 'timeout' given if stop due to timeout
|
|
*/
|
|
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
// There is no execution running with that id
|
|
return;
|
|
}
|
|
|
|
// In case something goes wrong make sure that promise gets first
|
|
// returned that it gets then also resolved correctly.
|
|
if (this.activeExecutions[executionId].process !== undefined) {
|
|
// Workflow is running in subprocess
|
|
if (this.activeExecutions[executionId].process!.connected) {
|
|
setTimeout(() => {
|
|
// execute on next event loop tick;
|
|
this.activeExecutions[executionId].process!.send({
|
|
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
|
|
type: timeout || 'stopExecution',
|
|
});
|
|
}, 1);
|
|
}
|
|
} else {
|
|
// Workflow is running in current process
|
|
this.activeExecutions[executionId].workflowExecution!.cancel();
|
|
}
|
|
|
|
return this.getPostExecutePromise(executionId);
|
|
}
|
|
|
|
/**
|
|
* Returns a promise which will resolve with the data of the execution
|
|
* with the given id
|
|
*
|
|
* @param {string} executionId The id of the execution to wait for
|
|
*/
|
|
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
|
|
// Create the promise which will be resolved when the execution finished
|
|
const waitPromise = await createDeferredPromise<IRun | undefined>();
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
throw new Error(`There is no active execution with id "${executionId}".`);
|
|
}
|
|
|
|
this.activeExecutions[executionId].postExecutePromises.push(waitPromise);
|
|
|
|
return waitPromise.promise();
|
|
}
|
|
|
|
/**
|
|
* Returns all the currently active executions
|
|
*
|
|
*/
|
|
getActiveExecutions(): IExecutionsCurrentSummary[] {
|
|
const returnData: IExecutionsCurrentSummary[] = [];
|
|
|
|
let data;
|
|
|
|
for (const id of Object.keys(this.activeExecutions)) {
|
|
data = this.activeExecutions[id];
|
|
returnData.push({
|
|
id,
|
|
retryOf: data.executionData.retryOf as string | undefined,
|
|
startedAt: data.startedAt,
|
|
mode: data.executionData.executionMode,
|
|
workflowId: data.executionData.workflowData.id! as string,
|
|
status: data.status,
|
|
});
|
|
}
|
|
|
|
return returnData;
|
|
}
|
|
|
|
async setStatus(executionId: string, status: ExecutionStatus): Promise<void> {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
LoggerProxy.debug(
|
|
`There is no active execution with id "${executionId}", can't update status to ${status}.`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
this.activeExecutions[executionId].status = status;
|
|
}
|
|
|
|
getStatus(executionId: string): ExecutionStatus {
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
return 'unknown';
|
|
}
|
|
|
|
return this.activeExecutions[executionId].status;
|
|
}
|
|
}
|