Unify execution id + Queue system (#1340)

* Unify execution ID across executions

* Fix indentation and improved comments

* WIP: saving data after each node execution

* Added on/off to save data after each step, saving initial data and retries working

* Fixing lint issues

* Fixing more lint issues

*  Add bull to execute workflows

* 👕 Fix lint issue

*  Add graceful shutdown to worker

*  Add loading staticData to worker

* 👕 Fix lint issue

*  Fix import

* Changed tables metadata to add nullable to stoppedAt

* Reload database on migration run

* Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration

* Added checks to Redis and exiting process if connection is unavailable

* Fixing error with new installations

* Fix issue with data not being sent back to browser on manual executions with defined destination

* Merging bull and unify execution id branch fixes

* Main process will now get execution success from database instead of redis

* Omit execution duration if execution did not stop

* Fix issue with execution list displaying inconsistant information information while a workflow is running

* Remove unused hooks to clarify for developers that these wont run in queue mode

* Added active pooling to help recover from Redis crashes

* Lint issues

* Changing default polling interval to 60 seconds

* Removed unnecessary attributes from bull job

*  Improved output on worker job start

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue
2021-02-09 08:59:32 +01:00
committed by GitHub
parent 9c67c893e7
commit 7a3aaf8a24
26 changed files with 952 additions and 60 deletions

View File

@@ -7,17 +7,22 @@ import {
} from 'n8n-core';
import {
Db,
IExecutingWorkflowData,
IExecutionDb,
IExecutionFlattedDb,
IExecutionsCurrentSummary,
IWorkflowExecutionDataProcess,
ResponseHelper,
WorkflowHelpers,
} from '.';
import { ChildProcess } from 'child_process';
import * as PCancelable from 'p-cancelable';
import { ObjectID } from 'typeorm';
export class ActiveExecutions {
private nextId = 1;
private activeExecutions: {
[index: string]: IExecutingWorkflowData;
} = {};
@@ -31,8 +36,30 @@ export class ActiveExecutions {
* @returns {string}
* @memberof ActiveExecutions
*/
add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): string {
const executionId = this.nextId++;
async add(executionData: IWorkflowExecutionDataProcess, process?: ChildProcess): Promise<string> {
const fullExecutionData: IExecutionDb = {
data: executionData.executionData!,
mode: executionData.executionMode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
};
if (executionData.retryOf !== undefined) {
fullExecutionData.retryOf = executionData.retryOf.toString();
}
if (executionData.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(executionData.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = executionData.workflowData.id.toString();
}
const execution = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
const executionResult = await Db.collections.Execution!.save(execution as IExecutionFlattedDb);
const executionId = typeof executionResult.id === "object" ? executionResult.id.toString() : executionResult.id + "";
this.activeExecutions[executionId] = {
executionData,
@@ -41,7 +68,7 @@ export class ActiveExecutions {
postExecutePromises: [],
};
return executionId.toString();
return executionId;
}

View File

@@ -106,7 +106,7 @@ export async function init(): Promise<IDatabaseCollections> {
database: path.join(n8nFolder, 'database.sqlite'),
entityPrefix,
migrations: sqliteMigrations,
migrationsRun: true,
migrationsRun: false, // migrations for sqlite will be ran manually for now; see below
migrationsTableName: `${entityPrefix}migrations`,
};
break;
@@ -121,11 +121,30 @@ export async function init(): Promise<IDatabaseCollections> {
logging: false,
});
const connection = await createConnection(connectionOptions);
let connection = await createConnection(connectionOptions);
await connection.runMigrations({
transaction: 'none',
});
if (dbType === 'sqlite') {
// This specific migration changes database metadata.
// A field is now nullable. We need to reconnect so that
// n8n knows it has changed. Happens only on sqlite.
let migrations = [];
try {
migrations = await connection.query(`SELECT id FROM ${entityPrefix}migrations where name = "MakeStoppedAtNullable1607431743769"`);
} catch(error) {
// Migration table does not exist yet - it will be created after migrations run for the first time.
}
// If you remove this call, remember to turn back on the
// setting to run migrations automatically above.
await connection.runMigrations({
transaction: 'none',
});
if (migrations.length === 0) {
await connection.close();
connection = await createConnection(connectionOptions);
}
}
collections.Credentials = getRepository(entities.CredentialsEntity);
collections.Execution = getRepository(entities.ExecutionEntity);

View File

@@ -33,6 +33,15 @@ export interface IActivationError {
};
}
export interface IBullJobData {
executionId: string;
loadStaticData: boolean;
}
export interface IBullJobResponse {
success: boolean;
}
export interface ICustomRequest extends Request {
parsedUrl: Url | undefined;
}
@@ -112,7 +121,7 @@ export interface IExecutionBase {
id?: number | string | ObjectID;
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
workflowId?: string; // To be able to filter executions easily //
finished: boolean;
retryOf?: number | string | ObjectID; // If it is a retry, the id of the execution it is a retry of.
@@ -166,7 +175,7 @@ export interface IExecutionsStopData {
finished?: boolean;
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt: Date;
stoppedAt?: Date;
}
export interface IExecutionsSummary {

View File

@@ -8,7 +8,9 @@ import {
resolve as pathResolve,
} from 'path';
import {
getConnection,
getConnectionManager,
In,
} from 'typeorm';
import * as bodyParser from 'body-parser';
require('body-parser-xml')(bodyParser);
@@ -1426,32 +1428,42 @@ class App {
limit = parseInt(req.query.limit as string, 10);
}
const executingWorkflowIds = this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[];
const countFilter = JSON.parse(JSON.stringify(filter));
if (req.query.lastId) {
filter.id = LessThan(req.query.lastId);
} else if (req.query.firstId) {
filter.id = MoreThanOrEqual(req.query.firstId);
}
countFilter.select = ['id'];
countFilter.where = {id: Not(In(executingWorkflowIds))};
const resultsPromise = Db.collections.Execution!.find({
select: [
'id',
'finished',
'mode',
'retryOf',
'retrySuccessId',
'startedAt',
'stoppedAt',
'workflowData',
],
where: filter,
order: {
id: 'DESC',
},
take: limit,
const resultsQuery = await Db.collections.Execution!
.createQueryBuilder("execution")
.select([
'execution.id',
'execution.finished',
'execution.mode',
'execution.retryOf',
'execution.retrySuccessId',
'execution.startedAt',
'execution.stoppedAt',
'execution.workflowData',
])
.orderBy('execution.id', 'DESC')
.take(limit);
Object.keys(filter).forEach((filterField) => {
resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]});
});
if (req.query.lastId) {
resultsQuery.andWhere(`execution.id <= :lastId`, {lastId: req.query.lastId});
}
if (req.query.firstId) {
resultsQuery.andWhere(`execution.id >= :firstId`, {firstId: req.query.firstId});
}
if (executingWorkflowIds.length > 0) {
resultsQuery.andWhere(`execution.id NOT IN (:...ids)`, {ids: executingWorkflowIds});
}
const resultsPromise = resultsQuery.getMany();
const countPromise = Db.collections.Execution!.count(countFilter);
const results: IExecutionFlattedDb[] = await resultsPromise;
@@ -1529,11 +1541,19 @@ class App {
workflowData: fullExecutionData.workflowData,
};
const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string;
const lastNodeExecuted = data!.executionData!.resultData.lastNodeExecuted as string | undefined;
// Remove the old error and the data of the last run of the node that it can be replaced
delete data!.executionData!.resultData.error;
data!.executionData!.resultData.runData[lastNodeExecuted].pop();
if (lastNodeExecuted) {
// Remove the old error and the data of the last run of the node that it can be replaced
delete data!.executionData!.resultData.error;
const length = data!.executionData!.resultData.runData[lastNodeExecuted].length;
if (length > 0 && data!.executionData!.resultData.runData[lastNodeExecuted][length - 1].error !== undefined) {
// Remove results only if it is an error.
// If we are retrying due to a crash, the information is simply success info from last node
data!.executionData!.resultData.runData[lastNodeExecuted].pop();
// Stack will determine what to run next
}
}
if (req.body.loadWorkflow === true) {
// Loads the currently saved workflow to execute instead of the
@@ -1647,7 +1667,7 @@ class App {
const returnData: IExecutionsStopData = {
mode: result.mode,
startedAt: new Date(result.startedAt),
stoppedAt: new Date(result.stoppedAt),
stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined,
finished: result.finished,
};

View File

@@ -283,7 +283,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
// Start now to run the workflow
const workflowRunner = new WorkflowRunner();
const executionId = await workflowRunner.run(runData, true);
const executionId = await workflowRunner.run(runData, true, !didSendResponse);
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise<IExecutionDb | undefined>;

View File

@@ -4,6 +4,7 @@ import {
ExternalHooks,
IExecutionDb,
IExecutionFlattedDb,
IExecutionResponse,
IPushDataExecutionFinished,
IWorkflowBase,
IWorkflowExecutionDataProcess,
@@ -221,6 +222,68 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
nodeExecuteAfter: [
async function (nodeName: string, data: ITaskData, executionData: IRunExecutionData): Promise<void> {
if (this.workflowData.settings !== undefined) {
if (this.workflowData.settings.saveExecutionProgress === false) {
return;
} else if (this.workflowData.settings.saveExecutionProgress !== true && !config.get('executions.saveExecutionProgress') as boolean) {
return;
}
} else if (!config.get('executions.saveExecutionProgress') as boolean) {
return;
}
const execution = await Db.collections.Execution!.findOne(this.executionId);
if (execution === undefined) {
// Something went badly wrong if this happens.
// This check is here mostly to make typescript happy.
return undefined;
}
const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution);
if (fullExecutionData.finished) {
// We already received ´workflowExecuteAfter´ webhook, so this is just an async call
// that was left behind. We skip saving because the other call should have saved everything
// so this one is safe to ignore
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
},
};
}
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
fullExecutionData.data.executionData = executionData.executionData;
// Set last executed node so that it may resume on failure
fullExecutionData.data.resultData.lastNodeExecuted = nodeName;
const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData);
await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb);
},
],
};
}
@@ -262,6 +325,8 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}
if (isManualMode && saveManualExecutions === false) {
// Data is always saved, so we remove from database
Db.collections.Execution!.delete(this.executionId);
return;
}
@@ -280,6 +345,8 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
// Data is always saved, so we remove from database
Db.collections.Execution!.delete(this.executionId);
return;
}
@@ -303,16 +370,16 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
// Save the Execution in DB
const executionResult = await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(this.executionId, executionData as IExecutionFlattedDb);
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: executionResult.id });
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: this.executionId });
}
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, executionResult ? executionResult.id as string : undefined, this.retryOf);
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, this.executionId, this.retryOf);
}
} catch (error) {
if (!isManualMode) {
@@ -485,12 +552,38 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for main process if workflow runs via worker
*/
export function getWorkflowHooksWorkerMain(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsPush();
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
// When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*
@@ -503,15 +596,22 @@ export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, execut
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
}
if (isMainProcess) {
const preExecuteFunctions = hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
}
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string});
}

View File

@@ -2,15 +2,21 @@ import {
ActiveExecutions,
CredentialsOverwrites,
CredentialTypes,
Db,
ExternalHooks,
IBullJobData,
IBullJobResponse,
ICredentialsOverwrite,
ICredentialsTypeData,
IExecutionFlattedDb,
IExecutionResponse,
IProcessMessageDataHook,
ITransferNodeTypes,
IWorkflowExecutionDataProcess,
IWorkflowExecutionDataProcessWithExecution,
NodeTypes,
Push,
ResponseHelper,
WorkflowExecuteAdditionalData,
WorkflowHelpers,
} from './';
@@ -21,6 +27,7 @@ import {
} from 'n8n-core';
import {
IDataObject,
IExecutionError,
IRun,
Workflow,
@@ -33,17 +40,29 @@ import * as PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path';
import { fork } from 'child_process';
import * as Bull from 'bull';
export class WorkflowRunner {
activeExecutions: ActiveExecutions.ActiveExecutions;
credentialsOverwrites: ICredentialsOverwrite;
push: Push.Push;
jobQueue: Bull.Queue;
constructor() {
this.push = Push.getInstance();
this.activeExecutions = ActiveExecutions.getInstance();
this.credentialsOverwrites = CredentialsOverwrites().getAll();
const executionsMode = config.get('executions.mode') as string;
if (executionsMode === 'queue') {
// Connect to bull-queue
const prefix = config.get('queue.bull.prefix') as string;
const redisOptions = config.get('queue.bull.redis') as object;
// @ts-ignore
this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false });
}
}
@@ -99,11 +118,16 @@ export class WorkflowRunner {
* @returns {Promise<string>}
* @memberof WorkflowRunner
*/
async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> {
async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise<string> {
const executionsProcess = config.get('executions.process') as string;
const executionsMode = config.get('executions.mode') as string;
let executionId: string;
if (executionsProcess === 'main') {
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
executionId = await this.runBull(data, loadStaticData, realtime);
} else if (executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData);
} else {
executionId = await this.runSubprocess(data, loadStaticData);
@@ -144,7 +168,7 @@ export class WorkflowRunner {
const additionalData = await WorkflowExecuteAdditionalData.getBase(data.credentials);
// Register the active execution
const executionId = this.activeExecutions.add(data, undefined);
const executionId = await this.activeExecutions.add(data, undefined);
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId, true);
@@ -154,7 +178,7 @@ export class WorkflowRunner {
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.runData === undefined || data.startNodes === undefined || data.startNodes.length === 0 || data.destinationNode === undefined) {
// Execute all nodes
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(workflow, undefined, data.destinationNode);
@@ -191,6 +215,159 @@ export class WorkflowRunner {
return executionId;
}
async runBull(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean): Promise<string> {
// TODO: If "loadStaticData" is set to true it has to load data new on worker
// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined);
const jobData: IBullJobData = {
executionId,
loadStaticData: !!loadStaticData,
};
let priority = 100;
if (realtime === true) {
// Jobs which require a direct response get a higher priority
priority = 50;
}
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.
// Check if they get retried by default and how often.
const jobOptions = {
priority,
removeOnComplete: true,
removeOnFail: true,
};
const job = await this.jobQueue.add(jobData, jobOptions);
console.log('Started with ID: ' + job.id.toString());
const hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(data.executionMode, executionId, data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined });
// Normally also workflow should be supplied here but as it only used for sending
// data to editor-UI is not needed.
hooks.executeHookFunctions('workflowExecuteBefore', []);
const workflowExecution: PCancelable<IRun> = new PCancelable(async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
if (await job.isActive()) {
// Job is already running so tell it to stop
await job.progress(-1);
} else {
// Job did not get started yet so remove from queue
await job.remove();
const fullRunData: IRun = {
data: {
resultData: {
error: {
message: 'Workflow has been canceled!',
} as IExecutionError,
runData: {},
},
},
mode: data.executionMode,
startedAt: new Date(),
stoppedAt: new Date(),
};
this.activeExecutions.remove(executionId, fullRunData);
resolve(fullRunData);
}
});
const jobData: Promise<IBullJobResponse> = job.finished();
const queueRecoveryInterval = config.get('queue.bull.queueRecoveryInterval') as number;
if (queueRecoveryInterval > 0) {
/*************************************************
* Long explanation about what this solves: *
* This only happens in a very specific scenario *
* when Redis crashes and recovers shortly *
* but during this time, some execution(s) *
* finished. The end result is that the main *
* process will wait indefinitively and never *
* get a response. This adds an active polling to*
* the queue that allows us to identify that the *
* execution finished and get information from *
* the database. *
*************************************************/
let watchDogInterval: NodeJS.Timeout | undefined;
let resolved = false;
const watchDog = new Promise((res) => {
watchDogInterval = setInterval(async () => {
const currentJob = await this.jobQueue.getJob(job.id);
// When null means job is finished (not found in queue)
if (currentJob === null) {
// Mimic worker's success message
res({success: true});
}
}, queueRecoveryInterval * 1000);
});
const clearWatchdogInterval = () => {
if (watchDogInterval) {
clearInterval(watchDogInterval);
watchDogInterval = undefined;
}
};
await new Promise((res, rej) => {
jobData.then((data) => {
if (!resolved) {
resolved = true;
clearWatchdogInterval();
res(data);
}
}).catch((e) => {
if(!resolved) {
resolved = true;
clearWatchdogInterval();
rej(e);
}
});
watchDog.then((data) => {
if (!resolved) {
resolved = true;
clearWatchdogInterval();
res(data);
}
});
});
} else {
await jobData;
}
const executionDb = await Db.collections.Execution!.findOne(executionId) as IExecutionFlattedDb;
const fullExecutionData = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse;
const runData = {
data: fullExecutionData.data,
finished: fullExecutionData.finished,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt,
} as IRun;
this.activeExecutions.remove(executionId, runData);
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.
hooks.executeHookFunctions('workflowExecuteAfter', [runData]);
resolve(runData);
});
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
/**
* Run the workflow
*
@@ -209,7 +386,7 @@ export class WorkflowRunner {
}
// Register the active execution
const executionId = this.activeExecutions.add(data, subprocess);
const executionId = await this.activeExecutions.add(data, subprocess);
// Check if workflow contains a "executeWorkflow" Node as in this
// case we can not know which nodeTypes and credentialTypes will

View File

@@ -2,6 +2,7 @@
import {
CredentialsOverwrites,
CredentialTypes,
Db,
ExternalHooks,
IWorkflowExecutionDataProcessWithExecution,
NodeTypes,
@@ -15,16 +16,20 @@ import {
import {
IDataObject,
IExecuteData,
IExecutionError,
INodeType,
INodeTypeData,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowExecuteHooks,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
import * as config from '../config';
export class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined;
startedAt = new Date();
@@ -74,6 +79,19 @@ export class WorkflowRunnerProcess {
const externalHooks = ExternalHooks();
await externalHooks.init();
// This code has been split into 3 ifs just to make it easier to understand
// Can be made smaller but in the end it will make it impossible to read.
if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress === true) {
// Workflow settings specifying it should save
await Db.init();
} else if (inputData.workflowData.settings !== undefined && inputData.workflowData.settings.saveExecutionProgress !== false && config.get('executions.saveExecutionProgress') as boolean) {
// Workflow settings not saying anything about saving but default settings says so
await Db.init();
} else if (inputData.workflowData.settings === undefined && config.get('executions.saveExecutionProgress') as boolean) {
// Workflow settings not saying anything about saving but default settings says so
await Db.init();
}
this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings});
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials);
additionalData.hooks = this.getProcessForwardHooks();
@@ -83,7 +101,7 @@ export class WorkflowRunnerProcess {
return this.workflowExecute.processRunExecutionData(this.workflow);
} else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) {
// Execute all nodes
// Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode);
@@ -134,8 +152,8 @@ export class WorkflowRunnerProcess {
},
],
nodeExecuteAfter: [
async (nodeName: string, data: ITaskData): Promise<void> => {
this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]);
async (nodeName: string, data: ITaskData, executionData: IRunExecutionData): Promise<void> => {
this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data, executionData]);
},
],
workflowExecuteBefore: [
@@ -152,6 +170,9 @@ export class WorkflowRunnerProcess {
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}

View File

@@ -40,7 +40,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
startedAt: Date;
@Index()
@Column('datetime')
@Column('datetime', { nullable: true })
stoppedAt: Date;
@Column('json')

View File

@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import * as config from '../../../../config';
export class MakeStoppedAtNullable1607431743767 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` MODIFY `stoppedAt` datetime', undefined);
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` MODIFY `stoppedAt` datetime NOT NULL', undefined);
}
}

View File

@@ -2,10 +2,12 @@ import { InitialMigration1588157391238 } from './1588157391238-InitialMigration'
import { WebhookModel1592447867632 } from './1592447867632-WebhookModel';
import { CreateIndexStoppedAt1594902918301 } from './1594902918301-CreateIndexStoppedAt';
import { AddWebhookId1611149998770 } from './1611149998770-AddWebhookId';
import { MakeStoppedAtNullable1607431743767 } from './1607431743767-MakeStoppedAtNullable';
export const mysqlMigrations = [
InitialMigration1588157391238,
WebhookModel1592447867632,
CreateIndexStoppedAt1594902918301,
AddWebhookId1611149998770,
MakeStoppedAtNullable1607431743767,
];

View File

@@ -40,7 +40,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
startedAt: Date;
@Index()
@Column('timestamp')
@Column('timestamp', { nullable: true })
stoppedAt: Date;
@Column('json')

View File

@@ -0,0 +1,21 @@
import {MigrationInterface, QueryRunner} from "typeorm";
import * as config from '../../../../config';
export class MakeStoppedAtNullable1607431743768 implements MigrationInterface {
name = 'MakeStoppedAtNullable1607431743768';
async up(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query('ALTER TABLE ' + tablePrefix + 'execution_entity ALTER COLUMN "stoppedAt" DROP NOT NULL', undefined);
}
async down(queryRunner: QueryRunner): Promise<void> {
// Cannot be undone as column might already have null values
}
}

View File

@@ -2,10 +2,12 @@ import { InitialMigration1587669153312 } from './1587669153312-InitialMigration'
import { WebhookModel1589476000887 } from './1589476000887-WebhookModel';
import { CreateIndexStoppedAt1594828256133 } from './1594828256133-CreateIndexStoppedAt';
import { AddWebhookId1611144599516 } from './1611144599516-AddWebhookId';
import { MakeStoppedAtNullable1607431743768 } from './1607431743768-MakeStoppedAtNullable';
export const postgresMigrations = [
InitialMigration1587669153312,
WebhookModel1589476000887,
CreateIndexStoppedAt1594828256133,
AddWebhookId1611144599516,
MakeStoppedAtNullable1607431743768,
];

View File

@@ -40,7 +40,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
startedAt: Date;
@Index()
@Column()
@Column({ nullable: true })
stoppedAt: Date;
@Column('simple-json')

View File

@@ -0,0 +1,23 @@
import {MigrationInterface, QueryRunner} from "typeorm";
import * as config from '../../../../config';
export class MakeStoppedAtNullable1607431743769 implements MigrationInterface {
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
// SQLite does not allow us to simply "alter column"
// We're hacking the way sqlite identifies tables
// Allowing a column to become nullable
// This is a very strict case when this can be done safely
// As no collateral effects exist.
await queryRunner.query(`PRAGMA writable_schema = 1; `, undefined);
await queryRunner.query(`UPDATE SQLITE_MASTER SET SQL = 'CREATE TABLE IF NOT EXISTS "${tablePrefix}execution_entity" ("id" integer PRIMARY KEY AUTOINCREMENT NOT NULL, "data" text NOT NULL, "finished" boolean NOT NULL, "mode" varchar NOT NULL, "retryOf" varchar, "retrySuccessId" varchar, "startedAt" datetime NOT NULL, "stoppedAt" datetime, "workflowData" text NOT NULL, "workflowId" varchar)' WHERE NAME = "${tablePrefix}execution_entity";`, undefined);
await queryRunner.query(`PRAGMA writable_schema = 0;`, undefined);
}
async down(queryRunner: QueryRunner): Promise<void> {
// This cannot be undone as the table might already have nullable values
}
}

View File

@@ -2,10 +2,12 @@ import { InitialMigration1588102412422 } from './1588102412422-InitialMigration'
import { WebhookModel1592445003908 } from './1592445003908-WebhookModel';
import { CreateIndexStoppedAt1594825041918 } from './1594825041918-CreateIndexStoppedAt';
import { AddWebhookId1611071044839 } from './1611071044839-AddWebhookId';
import { MakeStoppedAtNullable1607431743769 } from './1607431743769-MakeStoppedAtNullable';
export const sqliteMigrations = [
InitialMigration1588102412422,
WebhookModel1592445003908,
CreateIndexStoppedAt1594825041918,
AddWebhookId1611071044839,
MakeStoppedAtNullable1607431743769,
];