Merge 'master' into 'Bitly-OAuth2-support'

This commit is contained in:
ricardo
2020-08-09 19:12:38 -04:00
294 changed files with 17585 additions and 2533 deletions

View File

@@ -1,10 +1,10 @@
# n8n - Workflow Automation Tool
![n8n.io - Workflow Automation](https://raw.githubusercontent.com/n8n-io/n8n/master/docs/images/n8n-logo.png)
![n8n.io - Workflow Automation](https://raw.githubusercontent.com/n8n-io/n8n/master/assets/n8n-logo.png)
n8n is a free and open [fair-code](http://faircode.io) licensed node based Workflow Automation Tool. It can be self-hosted, easily extended, and so also used with internal tools.
<a href="https://raw.githubusercontent.com/n8n-io/n8n/master/docs/images/n8n-screenshot.png"><img src="https://raw.githubusercontent.com/n8n-io/n8n/master/docs/images/n8n-screenshot.png" width="550" alt="n8n.io - Screenshot"></a>
<a href="https://raw.githubusercontent.com/n8n-io/n8n/master/assets/n8n-screenshot.png"><img src="https://raw.githubusercontent.com/n8n-io/n8n/master/assets/n8n-screenshot.png" width="550" alt="n8n.io - Screenshot"></a>
## Contents
@@ -89,8 +89,7 @@ If you have problems or questions go to our forum, we will then try to help you
If you are interested in working for n8n and so shape the future of the project
check out our job posts:
[https://jobs.n8n.io](https://jobs.n8n.io)
[https://n8n.join.com](https://n8n.join.com)
## Upgrading

View File

@@ -128,15 +128,23 @@ const config = convict({
credentials: {
overwrite: {
// Allows to set default values for credentials which
// get automatically prefilled and the user does not get
// displayed and can not change.
// Format: { CREDENTIAL_NAME: { PARAMTER: VALUE }}
doc: 'Overwrites for credentials',
format: '*',
default: '{}',
env: 'CREDENTIALS_OVERWRITE'
}
data: {
// Allows to set default values for credentials which
// get automatically prefilled and the user does not get
// displayed and can not change.
// Format: { CREDENTIAL_NAME: { PARAMTER: VALUE }}
doc: 'Overwrites for credentials',
format: '*',
default: '{}',
env: 'CREDENTIALS_OVERWRITE_DATA'
},
endpoint: {
doc: 'Fetch credentials from API',
format: String,
default: '',
env: 'CREDENTIALS_OVERWRITE_ENDPOINT',
},
},
},
executions: {
@@ -151,10 +159,34 @@ const config = convict({
env: 'EXECUTIONS_PROCESS'
},
// A Workflow times out and gets canceled after this time (seconds).
// If the workflow is executed in the main process a soft timeout
// is executed (takes effect after the current node finishes).
// If a workflow is running in its own process is a soft timeout
// tried first, before killing the process after waiting for an
// additional fifth of the given timeout duration.
//
// To deactivate timeout set it to -1
//
// Timeout is currently not activated by default which will change
// in a future version.
timeout: {
doc: 'Max run time (seconds) before stopping the workflow execution',
format: Number,
default: -1,
env: 'EXECUTIONS_TIMEOUT'
},
maxTimeout: {
doc: 'Max execution time (seconds) that can be set for a workflow individually',
format: Number,
default: 3600,
env: 'EXECUTIONS_TIMEOUT_MAX'
},
// If a workflow executes all the data gets saved by default. This
// could be a problem when a workflow gets executed a lot and processes
// a lot of data. To not write the database full it is possible to
// not save the execution at all.
// a lot of data. To not exceed the database's capacity it is possible to
// prune the database regularly or to not save the execution at all.
// Depending on if the execution did succeed or error a different
// save behaviour can be set.
saveDataOnError: {
@@ -177,9 +209,34 @@ const config = convict({
// in the editor.
saveDataManualExecutions: {
doc: 'Save data of executions when started manually via editor',
format: 'Boolean',
default: false,
env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS'
},
// To not exceed the database's capacity and keep its size moderate
// the execution data gets pruned regularly (default: 1 hour interval).
// All saved execution data older than the max age will be deleted.
// Pruning is currently not activated by default, which will change in
// a future version.
pruneData: {
doc: 'Delete data of past executions on a rolling basis',
format: 'Boolean',
default: false,
env: 'EXECUTIONS_DATA_PRUNE'
},
pruneDataMaxAge: {
doc: 'How old (hours) the execution data has to be to get deleted',
format: Number,
default: 336,
env: 'EXECUTIONS_DATA_MAX_AGE'
},
pruneDataTimeout: {
doc: 'Timeout (seconds) after execution data has been pruned',
format: Number,
default: 3600,
env: 'EXECUTIONS_DATA_PRUNE_TIMEOUT'
},
},
generic: {
@@ -196,6 +253,13 @@ const config = convict({
},
// How n8n can be reached (Editor & REST-API)
path: {
format: String,
default: '/',
arg: 'path',
env: 'N8N_PATH',
doc: 'Path n8n is deployed to'
},
host: {
format: String,
default: 'localhost',
@@ -269,13 +333,43 @@ const config = convict({
env: 'N8N_JWT_AUTH_HEADER',
doc: 'The request header containing a signed JWT'
},
jwtHeaderValuePrefix: {
format: String,
default: '',
env: 'N8N_JWT_AUTH_HEADER_VALUE_PREFIX',
doc: 'The request header value prefix to strip (optional)'
},
jwksUri: {
format: String,
default: '',
env: 'N8N_JWKS_URI',
doc: 'The URI to fetch JWK Set for JWT auh'
doc: 'The URI to fetch JWK Set for JWT authentication'
},
}
jwtIssuer: {
format: String,
default: '',
env: 'N8N_JWT_ISSUER',
doc: 'JWT issuer to expect (optional)'
},
jwtNamespace: {
format: String,
default: '',
env: 'N8N_JWT_NAMESPACE',
doc: 'JWT namespace to expect (optional)'
},
jwtAllowedTenantKey: {
format: String,
default: '',
env: 'N8N_JWT_ALLOWED_TENANT_KEY',
doc: 'JWT tenant key name to inspect within JWT namespace (optional)'
},
jwtAllowedTenant: {
format: String,
default: '',
env: 'N8N_JWT_ALLOWED_TENANT',
doc: 'JWT tenant to allow (optional)'
},
},
},
endpoints: {

View File

@@ -44,9 +44,9 @@ module.exports = [
"logging": false,
"host": "localhost",
"username": "postgres",
"password": "docker",
"password": "",
"port": 5432,
"database": "postgres",
"database": "n8n",
"schema": "public",
"entities": Object.values(PostgresDb),
"migrations": [
@@ -68,7 +68,7 @@ module.exports = [
"username": "root",
"password": "password",
"host": "localhost",
"port": "3308",
"port": "3306",
"logging": false,
"entities": Object.values(MySQLDb),
"migrations": [
@@ -90,7 +90,7 @@ module.exports = [
"username": "root",
"password": "password",
"host": "localhost",
"port": "3308",
"port": "3306",
"logging": false,
"entities": Object.values(MySQLDb),
"migrations": [
@@ -105,4 +105,4 @@ module.exports = [
"subscribersDir": "./src/databases/mysqldb/Subscribers"
}
},
];
];

View File

@@ -1,6 +1,6 @@
{
"name": "n8n",
"version": "0.71.0",
"version": "0.76.0",
"description": "n8n Workflow Automation Tool",
"license": "SEE LICENSE IN LICENSE.md",
"homepage": "https://n8n.io",
@@ -100,13 +100,13 @@
"lodash.get": "^4.4.2",
"mongodb": "^3.5.5",
"mysql2": "^2.0.1",
"n8n-core": "~0.36.0",
"n8n-editor-ui": "~0.48.0",
"n8n-nodes-base": "~0.66.0",
"n8n-workflow": "~0.33.0",
"n8n-core": "~0.41.0",
"n8n-editor-ui": "~0.52.0",
"n8n-nodes-base": "~0.71.0",
"n8n-workflow": "~0.37.0",
"oauth-1.0a": "^2.2.6",
"open": "^7.0.0",
"pg": "^7.11.0",
"pg": "^8.3.0",
"request-promise-native": "^1.0.7",
"sqlite3": "^4.2.0",
"sse-channel": "^3.1.1",

View File

@@ -88,10 +88,11 @@ export class ActiveExecutions {
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @param {string} timeout String 'timeout' given if stop due to timeout
* @returns {(Promise<IRun | undefined>)}
* @memberof ActiveExecutions
*/
async stopExecution(executionId: string): Promise<IRun | undefined> {
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
if (this.activeExecutions[executionId] === undefined) {
// There is no execution running with that id
return;
@@ -101,17 +102,17 @@ export class ActiveExecutions {
// returned that it gets then also resolved correctly.
if (this.activeExecutions[executionId].process !== undefined) {
// Workflow is running in subprocess
setTimeout(() => {
if (this.activeExecutions[executionId].process!.connected) {
if (this.activeExecutions[executionId].process!.connected) {
setTimeout(() => {
// execute on next event loop tick;
this.activeExecutions[executionId].process!.send({
type: 'stopExecution'
type: timeout ? timeout : 'stopExecution',
});
}
}, 1);
}, 1);
}
} else {
// Workflow is running in current process
this.activeExecutions[executionId].workflowExecution!.cancel('Canceled by user');
this.activeExecutions[executionId].workflowExecution!.cancel();
}
return this.getPostExecutePromise(executionId);

View File

@@ -11,11 +11,11 @@ import {
WorkflowHelpers,
WorkflowRunner,
WorkflowExecuteAdditionalData,
IWebhookDb,
} from './';
import {
ActiveWorkflows,
ActiveWebhooks,
NodeExecuteFunctions,
} from 'n8n-core';
@@ -26,7 +26,7 @@ import {
INode,
INodeExecutionData,
IRunExecutionData,
IWebhookData,
NodeHelpers,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
WebhookHttpMethod,
Workflow,
@@ -35,22 +35,23 @@ import {
import * as express from 'express';
export class ActiveWorkflowRunner {
private activeWorkflows: ActiveWorkflows | null = null;
private activeWebhooks: ActiveWebhooks | null = null;
private activationErrors: {
[key: string]: IActivationError;
} = {};
async init() {
// Get the active workflows from database
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so intead of pulling all the active wehhooks just pull the actives that have a trigger
const workflowsData: IWorkflowDb[] = await Db.collections.Workflow!.find({ active: true }) as IWorkflowDb[];
this.activeWebhooks = new ActiveWebhooks();
// Add them as active workflows
this.activeWorkflows = new ActiveWorkflows();
if (workflowsData.length !== 0) {
@@ -58,20 +59,27 @@ export class ActiveWorkflowRunner {
console.log(' Start Active Workflows:');
console.log(' ================================');
const nodeTypes = NodeTypes();
for (const workflowData of workflowsData) {
console.log(` - ${workflowData.name}`);
try {
await this.add(workflowData.id.toString(), workflowData);
console.log(` => Started`);
} catch (error) {
console.log(` => ERROR: Workflow could not be activated:`);
console.log(` ${error.message}`);
const workflow = new Workflow({ id: workflowData.id.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
if (workflow.getTriggerNodes().length !== 0
|| workflow.getPollNodes().length !== 0) {
console.log(` - ${workflowData.name}`);
try {
await this.add(workflowData.id.toString(), workflowData);
console.log(` => Started`);
} catch (error) {
console.log(` => ERROR: Workflow could not be activated:`);
console.log(` ${error.message}`);
}
}
}
}
}
/**
* Removes all the currently active workflows
*
@@ -94,7 +102,6 @@ export class ActiveWorkflowRunner {
return;
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*
@@ -110,30 +117,41 @@ export class ActiveWorkflowRunner {
throw new ResponseHelper.ResponseError('The "activeWorkflows" instance did not get initialized yet.', 404, 404);
}
const webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
const webhook = await Db.collections.Webhook?.findOne({ webhookPath: path, method: httpMethod }) as IWebhookDb;
if (webhookData === undefined) {
// check if something exist
if (webhook === undefined) {
// The requested webhook is not registered
throw new ResponseHelper.ResponseError(`The requested webhook "${httpMethod} ${path}" is not registered.`, 404, 404);
}
const workflowData = await Db.collections.Workflow!.findOne(webhookData.workflowId);
const workflowData = await Db.collections.Workflow!.findOne(webhook.workflowId);
if (workflowData === undefined) {
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhookData.workflowId}"`, 404, 404);
throw new ResponseHelper.ResponseError(`Could not find workflow with id "${webhook.workflowId}"`, 404, 404);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow({ id: webhookData.workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
const workflow = new Workflow({ id: webhook.workflowId.toString(), name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings});
const credentials = await WorkflowCredentials([workflow.getNode(webhook.node as string) as INode]);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhookData = NodeHelpers.getNodeWebhooks(workflow, workflow.getNode(webhook.node as string) as INode, additionalData).filter((webhook) => {
return (webhook.httpMethod === httpMethod && webhook.path === path);
})[0];
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new ResponseHelper.ResponseError('Could not find node to process webhook.', 404, 404);
}
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
//@ts-ignore
WebhookHelpers.executeWebhook(workflow, webhookData, workflowData, workflowStartNode, executionMode, undefined, req, res, (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
@@ -143,6 +161,20 @@ export class ActiveWorkflowRunner {
});
}
/**
* Gets all request methods associated with a single webhook
*
* @param {string} path webhook path
* @returns {Promise<string[]>}
* @memberof ActiveWorkflowRunner
*/
async getWebhookMethods(path: string) : Promise<string[]> {
const webhooks = await Db.collections.Webhook?.find({ webhookPath: path}) as IWebhookDb[];
// Gather all request methods in string array
const webhookMethods: string[] = webhooks.map(webhook => webhook.method);
return webhookMethods;
}
/**
* Returns the ids of the currently active workflows
@@ -150,12 +182,8 @@ export class ActiveWorkflowRunner {
* @returns {string[]}
* @memberof ActiveWorkflowRunner
*/
getActiveWorkflows(): string[] {
if (this.activeWorkflows === null) {
return [];
}
return this.activeWorkflows.allActiveWorkflows();
getActiveWorkflows(): Promise<IWorkflowDb[]> {
return Db.collections.Workflow?.find({ select: ['id'] }) as Promise<IWorkflowDb[]>;
}
@@ -166,15 +194,11 @@ export class ActiveWorkflowRunner {
* @returns {boolean}
* @memberof ActiveWorkflowRunner
*/
isActive(id: string): boolean {
if (this.activeWorkflows !== null) {
return this.activeWorkflows.isActive(id);
}
return false;
async isActive(id: string): Promise<boolean> {
const workflow = await Db.collections.Workflow?.findOne({ id }) as IWorkflowDb;
return workflow?.active as boolean;
}
/**
* Return error if there was a problem activating the workflow
*
@@ -190,7 +214,6 @@ export class ActiveWorkflowRunner {
return this.activationErrors[id];
}
/**
* Adds all the webhooks of the workflow
*
@@ -202,12 +225,69 @@ export class ActiveWorkflowRunner {
*/
async addWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): Promise<void> {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
let path = '' as string | undefined;
for (const webhookData of webhooks) {
await this.activeWebhooks!.add(workflow, webhookData, mode);
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
const node = workflow.getNode(webhookData.node) as INode;
node.name = webhookData.node;
path = node.parameters.path as string;
if (node.parameters.path === undefined) {
path = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['path']) as string | undefined;
if (path === undefined) {
// TODO: Use a proper logger
console.error(`No webhook path could be found for node "${node.name}" in workflow "${workflow.id}".`);
continue;
}
}
const isFullPath: boolean = workflow.getSimpleParameterValue(node, webhookData.webhookDescription['isFullPath'], false) as boolean;
const webhook = {
workflowId: webhookData.workflowId,
webhookPath: NodeHelpers.getNodeWebhookPath(workflow.id as string, node, path, isFullPath),
node: node.name,
method: webhookData.httpMethod,
} as IWebhookDb;
try {
await Db.collections.Webhook?.insert(webhook);
const webhookExists = await workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, false);
if (webhookExists === false) {
// If webhook does not exist yet create it
await workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, false);
}
} catch (error) {
let errorMessage = '';
await Db.collections.Webhook?.delete({ workflowId: workflow.id });
// if it's a workflow from the the insert
// TODO check if there is standard error code for deplicate key violation that works
// with all databases
if (error.name === 'MongoError' || error.name === 'QueryFailedError') {
errorMessage = `The webhook path [${webhook.webhookPath}] and method [${webhook.method}] already exist.`;
} else if (error.detail) {
// it's a error runnig the webhook methods (checkExists, create)
errorMessage = error.detail;
} else {
errorMessage = error.message;
}
throw new Error(errorMessage);
}
}
// Save static data!
await WorkflowHelpers.saveStaticData(workflow);
}
@@ -227,13 +307,29 @@ export class ActiveWorkflowRunner {
const nodeTypes = NodeTypes();
const workflow = new Workflow({ id: workflowId, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, nodeTypes, staticData: workflowData.staticData, settings: workflowData.settings });
await this.activeWebhooks!.removeWorkflow(workflow);
const mode = 'internal';
// Save the static workflow data if needed
await WorkflowHelpers.saveStaticData(workflow);
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData);
for (const webhookData of webhooks) {
await workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, false);
}
// if it's a mongo objectId convert it to string
if (typeof workflowData.id === 'object') {
workflowData.id = workflowData.id.toString();
}
const webhook = {
workflowId: workflowData.id,
} as IWebhookDb;
await Db.collections.Webhook?.delete(webhook);
}
/**
* Runs the given workflow
*
@@ -322,7 +418,6 @@ export class ActiveWorkflowRunner {
});
}
/**
* Makes a workflow active
*
@@ -361,7 +456,11 @@ export class ActiveWorkflowRunner {
// Add the workflows which have webhooks defined
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode);
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
if (workflowInstance.getTriggerNodes().length !== 0
|| workflowInstance.getPollNodes().length !== 0) {
await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions, getPollFunctions);
}
if (this.activationErrors[workflowId] !== undefined) {
// If there were activation errors delete them
@@ -386,7 +485,6 @@ export class ActiveWorkflowRunner {
await WorkflowHelpers.saveStaticData(workflowInstance!);
}
/**
* Makes a workflow inactive
*
@@ -395,6 +493,7 @@ export class ActiveWorkflowRunner {
* @memberof ActiveWorkflowRunner
*/
async remove(workflowId: string): Promise<void> {
if (this.activeWorkflows !== null) {
// Remove all the webhooks of the workflow
await this.removeWorkflowWebhooks(workflowId);
@@ -404,8 +503,13 @@ export class ActiveWorkflowRunner {
delete this.activationErrors[workflowId];
}
// Remove the workflow from the "list" of active workflows
return this.activeWorkflows.remove(workflowId);
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
this.activeWorkflows.remove(workflowId);
}
return;
}
throw new Error(`The "activeWorkflows" instance did not get initialized yet.`);

View File

@@ -20,7 +20,7 @@ class CredentialsOverwritesClass {
return;
}
const data = await GenericHelpers.getConfigValue('credentials.overwrite') as string;
const data = await GenericHelpers.getConfigValue('credentials.overwrite.data') as string;
try {
this.overwriteData = JSON.parse(data);
@@ -30,6 +30,7 @@ class CredentialsOverwritesClass {
}
applyOverwrite(type: string, data: ICredentialDataDecryptedObject) {
const overwrites = this.get(type);
if (overwrites === undefined) {

View File

@@ -29,22 +29,31 @@ export let collections: IDatabaseCollections = {
Credentials: null,
Execution: null,
Workflow: null,
Webhook: null,
};
import {
InitialMigration1587669153312
InitialMigration1587669153312,
WebhookModel1589476000887,
CreateIndexStoppedAt1594828256133,
} from './databases/postgresdb/migrations';
import {
InitialMigration1587563438936
InitialMigration1587563438936,
WebhookModel1592679094242,
CreateIndexStoppedAt1594910478695,
} from './databases/mongodb/migrations';
import {
InitialMigration1588157391238
InitialMigration1588157391238,
WebhookModel1592447867632,
CreateIndexStoppedAt1594902918301,
} from './databases/mysqldb/migrations';
import {
InitialMigration1588102412422
InitialMigration1588102412422,
WebhookModel1592445003908,
CreateIndexStoppedAt1594825041918,
} from './databases/sqlite/migrations';
import * as path from 'path';
@@ -66,7 +75,11 @@ export async function init(): Promise<IDatabaseCollections> {
entityPrefix,
url: await GenericHelpers.getConfigValue('database.mongodb.connectionUrl') as string,
useNewUrlParser: true,
migrations: [InitialMigration1587563438936],
migrations: [
InitialMigration1587563438936,
WebhookModel1592679094242,
CreateIndexStoppedAt1594910478695,
],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@@ -99,7 +112,11 @@ export async function init(): Promise<IDatabaseCollections> {
port: await GenericHelpers.getConfigValue('database.postgresdb.port') as number,
username: await GenericHelpers.getConfigValue('database.postgresdb.user') as string,
schema: config.get('database.postgresdb.schema'),
migrations: [InitialMigration1587669153312],
migrations: [
InitialMigration1587669153312,
WebhookModel1589476000887,
CreateIndexStoppedAt1594828256133,
],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
ssl,
@@ -118,7 +135,11 @@ export async function init(): Promise<IDatabaseCollections> {
password: await GenericHelpers.getConfigValue('database.mysqldb.password') as string,
port: await GenericHelpers.getConfigValue('database.mysqldb.port') as number,
username: await GenericHelpers.getConfigValue('database.mysqldb.user') as string,
migrations: [InitialMigration1588157391238],
migrations: [
InitialMigration1588157391238,
WebhookModel1592447867632,
CreateIndexStoppedAt1594902918301,
],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@@ -130,7 +151,11 @@ export async function init(): Promise<IDatabaseCollections> {
type: 'sqlite',
database: path.join(n8nFolder, 'database.sqlite'),
entityPrefix,
migrations: [InitialMigration1588102412422],
migrations: [
InitialMigration1588102412422,
WebhookModel1592445003908,
CreateIndexStoppedAt1594825041918
],
migrationsRun: true,
migrationsTableName: `${entityPrefix}migrations`,
};
@@ -155,6 +180,7 @@ export async function init(): Promise<IDatabaseCollections> {
collections.Credentials = getRepository(entities.CredentialsEntity);
collections.Execution = getRepository(entities.ExecutionEntity);
collections.Workflow = getRepository(entities.WorkflowEntity);
collections.Webhook = getRepository(entities.WebhookEntity);
return collections;
}

View File

@@ -40,11 +40,12 @@ export function getBaseUrl(): string {
const protocol = config.get('protocol') as string;
const host = config.get('host') as string;
const port = config.get('port') as number;
const path = config.get('path') as string;
if (protocol === 'http' && port === 80 || protocol === 'https' && port === 443) {
return `${protocol}://${host}/`;
return `${protocol}://${host}${path}`;
}
return `${protocol}://${host}:${port}/`;
return `${protocol}://${host}:${port}${path}`;
}

View File

@@ -49,8 +49,15 @@ export interface IDatabaseCollections {
Credentials: Repository<ICredentialsDb> | null;
Execution: Repository<IExecutionFlattedDb> | null;
Workflow: Repository<IWorkflowDb> | null;
Webhook: Repository<IWebhookDb> | null;
}
export interface IWebhookDb {
workflowId: number | string | ObjectID;
webhookPath: string;
method: string;
node: string;
}
export interface IWorkflowBase extends IWorkflowBaseWorkflow {
id?: number | string | ObjectID;
@@ -279,17 +286,17 @@ export interface IN8nUISettings {
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
urlBaseWebhook: string;
versionCli: string;
}
export interface IPackageVersions {
cli: string;
}
export interface IPushData {
data: IPushDataExecutionFinished | IPushDataNodeExecuteAfter | IPushDataNodeExecuteBefore | IPushDataTestWebhook;
type: IPushDataType;
@@ -297,7 +304,6 @@ export interface IPushData {
export type IPushDataType = 'executionFinished' | 'executionStarted' | 'nodeExecuteAfter' | 'nodeExecuteBefore' | 'testWebhookDeleted' | 'testWebhookReceived';
export interface IPushDataExecutionFinished {
data: IRun;
executionIdActive: string;

View File

@@ -29,6 +29,9 @@ class NodeTypesClass implements INodeTypes {
}
getByName(nodeType: string): INodeType | undefined {
if (this.nodeTypes[nodeType] === undefined) {
throw new Error(`The node-type "${nodeType}" is not known!`);
}
return this.nodeTypes[nodeType].type;
}
}

View File

@@ -18,7 +18,7 @@ import * as clientOAuth2 from 'client-oauth2';
import * as clientOAuth1 from 'oauth-1.0a';
import { RequestOptions } from 'oauth-1.0a';
import * as csrf from 'csrf';
import * as requestPromise from 'request-promise-native';
import * as requestPromise from 'request-promise-native';
import { createHmac } from 'crypto';
import {
@@ -58,6 +58,9 @@ import {
WorkflowExecuteAdditionalData,
WorkflowRunner,
GenericHelpers,
CredentialsOverwrites,
ICredentialsOverwrite,
LoadNodesAndCredentials,
} from './';
import {
@@ -105,10 +108,13 @@ class App {
testWebhooks: TestWebhooks.TestWebhooks;
endpointWebhook: string;
endpointWebhookTest: string;
endpointPresetCredentials: string;
externalHooks: IExternalHooksClass;
saveDataErrorExecution: string;
saveDataSuccessExecution: string;
saveManualExecutions: boolean;
executionTimeout: number;
maxExecutionTimeout: number;
timezone: string;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
push: Push.Push;
@@ -116,9 +122,11 @@ class App {
restEndpoint: string;
protocol: string;
sslKey: string;
sslKey: string;
sslCert: string;
presetCredentialsLoaded: boolean;
constructor() {
this.app = express();
@@ -127,6 +135,8 @@ class App {
this.saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
this.saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
this.saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
this.executionTimeout = config.get('executions.timeout') as number;
this.maxExecutionTimeout = config.get('executions.maxTimeout') as number;
this.timezone = config.get('generic.timezone') as string;
this.restEndpoint = config.get('endpoints.rest') as string;
@@ -137,10 +147,13 @@ class App {
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.protocol = config.get('protocol');
this.sslKey = config.get('ssl_key');
this.sslKey = config.get('ssl_key');
this.sslCert = config.get('ssl_cert');
this.externalHooks = ExternalHooks();
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.get('credentials.overwrite.endpoint') as string;
}
@@ -195,42 +208,69 @@ class App {
}
// Check for and validate JWT if configured
const jwtAuthActive = config.get('security.jwtAuth.active') as boolean;
const jwtAuthActive = config.get('security.jwtAuth.active') as boolean;
if (jwtAuthActive === true) {
const jwtAuthHeader = await GenericHelpers.getConfigValue('security.jwtAuth.jwtHeader') as string;
if (jwtAuthHeader === '') {
throw new Error('JWT auth is activated but no request header was defined. Please set one!');
}
}
const jwksUri = await GenericHelpers.getConfigValue('security.jwtAuth.jwksUri') as string;
if (jwksUri === '') {
throw new Error('JWT auth is activated but no JWK Set URI was defined. Please set one!');
}
}
const jwtHeaderValuePrefix = await GenericHelpers.getConfigValue('security.jwtAuth.jwtHeaderValuePrefix') as string;
const jwtIssuer = await GenericHelpers.getConfigValue('security.jwtAuth.jwtIssuer') as string;
const jwtNamespace = await GenericHelpers.getConfigValue('security.jwtAuth.jwtNamespace') as string;
const jwtAllowedTenantKey = await GenericHelpers.getConfigValue('security.jwtAuth.jwtAllowedTenantKey') as string;
const jwtAllowedTenant = await GenericHelpers.getConfigValue('security.jwtAuth.jwtAllowedTenant') as string;
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.url.match(authIgnoreRegex)) {
return next();
}
function isTenantAllowed(decodedToken: object): Boolean {
if (jwtNamespace === '' || jwtAllowedTenantKey === '' || jwtAllowedTenant === '') return true;
else {
for (let [k, v] of Object.entries(decodedToken)) {
if (k === jwtNamespace) {
for (let [kn, kv] of Object.entries(v)) {
if (kn === jwtAllowedTenantKey && kv === jwtAllowedTenant) {
return true;
}
}
}
}
}
return false;
}
const token = req.header(jwtAuthHeader) as string;
if (token === '') {
return ResponseHelper.jwtAuthAuthorizationError(res, "Missing token");
}
this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.url.match(authIgnoreRegex)) {
return next();
}
const jwkClient = jwks({ cache: true, jwksUri });
function getKey(header: any, callback: Function) { // tslint:disable-line:no-any
jwkClient.getSigningKey(header.kid, (err: Error, key: any) => { // tslint:disable-line:no-any
if (err) throw ResponseHelper.jwtAuthAuthorizationError(res, err.message);
var token = req.header(jwtAuthHeader) as string;
if (token === undefined || token === '') {
return ResponseHelper.jwtAuthAuthorizationError(res, "Missing token");
}
if (jwtHeaderValuePrefix != '' && token.startsWith(jwtHeaderValuePrefix)) {
token = token.replace(jwtHeaderValuePrefix + ' ', '').trimLeft();
}
const signingKey = key.publicKey || key.rsaPublicKey;
callback(null, signingKey);
});
}
const jwkClient = jwks({ cache: true, jwksUri });
function getKey(header: any, callback: Function) { // tslint:disable-line:no-any
jwkClient.getSigningKey(header.kid, (err: Error, key: any) => { // tslint:disable-line:no-any
if (err) throw ResponseHelper.jwtAuthAuthorizationError(res, err.message);
jwt.verify(token, getKey, {}, (err: jwt.VerifyErrors, decoded: object) => {
if (err) return ResponseHelper.jwtAuthAuthorizationError(res, 'Invalid token');
const signingKey = key.publicKey || key.rsaPublicKey;
callback(null, signingKey);
});
}
next();
var jwtVerifyOptions: jwt.VerifyOptions = {
issuer: jwtIssuer != '' ? jwtIssuer : undefined,
ignoreExpiration: false
}
jwt.verify(token, getKey, jwtVerifyOptions, (err: jwt.VerifyErrors, decoded: object) => {
if (err) ResponseHelper.jwtAuthAuthorizationError(res, 'Invalid token');
else if (!isTenantAllowed(decoded)) ResponseHelper.jwtAuthAuthorizationError(res, 'Tenant not allowed');
else next();
});
});
}
@@ -273,7 +313,7 @@ class App {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
} }));
} }));
this.app.use(bodyParser.text({
limit: '16mb', verify: (req, res, buf) => {
@@ -448,7 +488,9 @@ class App {
await this.externalHooks.run('workflow.update', [newWorkflowData]);
if (this.activeWorkflowRunner.isActive(id)) {
const isActive = await this.activeWorkflowRunner.isActive(id);
if (isActive) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await this.activeWorkflowRunner.remove(id);
@@ -471,9 +513,12 @@ class App {
// Do not save when default got set
delete newWorkflowData.settings.saveManualExecutions;
}
if (parseInt(newWorkflowData.settings.executionTimeout as string, 10) === this.executionTimeout) {
// Do not save when default got set
delete newWorkflowData.settings.executionTimeout;
}
}
newWorkflowData.updatedAt = this.getCurrentDate();
await Db.collections.Workflow!.update(id, newWorkflowData);
@@ -517,7 +562,9 @@ class App {
await this.externalHooks.run('workflow.delete', [id]);
if (this.activeWorkflowRunner.isActive(id)) {
const isActive = await this.activeWorkflowRunner.isActive(id);
if (isActive) {
// Before deleting a workflow deactivate it
await this.activeWorkflowRunner.remove(id);
}
@@ -657,7 +704,8 @@ class App {
// Returns the active workflow ids
this.app.get(`/${this.restEndpoint}/active`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string[]> => {
return this.activeWorkflowRunner.getActiveWorkflows();
const activeWorkflows = await this.activeWorkflowRunner.getActiveWorkflows();
return activeWorkflows.map(workflow => workflow.id.toString()) as string[];
}));
@@ -922,7 +970,8 @@ class App {
// Authorize OAuth Data
this.app.get(`/${this.restEndpoint}/oauth1-credential/auth`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string> => {
if (req.query.id === undefined) {
throw new Error('Required credential id is missing!');
res.status(500).send('Required credential id is missing!');
return '';
}
const result = await Db.collections.Credentials!.findOne(req.query.id as string);
@@ -934,7 +983,8 @@ class App {
let encryptionKey = undefined;
encryptionKey = await UserSettings.getEncryptionKey();
if (encryptionKey === undefined) {
throw new Error('No encryption key got found to decrypt the credentials!');
res.status(500).send('No encryption key got found to decrypt the credentials!');
return '';
}
// Decrypt the currently saved credentials
@@ -965,7 +1015,7 @@ class App {
const callback = `${WebhookHelpers.getWebhookBaseUrl()}${this.restEndpoint}/oauth1-credential/callback?cid=${req.query.id}`;
const options: RequestOptions = {
const options: RequestOptions = {
method: 'POST',
url: (_.get(oauthCredentials, 'requestTokenUrl') as string),
data: {
@@ -1006,7 +1056,8 @@ class App {
const { oauth_verifier, oauth_token, cid } = req.query;
if (oauth_verifier === undefined || oauth_token === undefined) {
throw new Error('Insufficient parameters for OAuth1 callback');
const errorResponse = new ResponseHelper.ResponseError('Insufficient parameters for OAuth1 callback. Received following query parameters: ' + JSON.stringify(req.query), undefined, 503);
return ResponseHelper.sendErrorResponse(res, errorResponse);
}
const result = await Db.collections.Credentials!.findOne(cid as any); // tslint:disable-line:no-any
@@ -1032,7 +1083,7 @@ class App {
const decryptedDataOriginal = credentialsHelper.getDecrypted(result.name, result.type, true);
const oauthCredentials = credentialsHelper.applyDefaultsAndOverwrites(decryptedDataOriginal, result.type);
const options: OptionsWithUrl = {
const options: OptionsWithUrl = {
method: 'POST',
url: _.get(oauthCredentials, 'accessTokenUrl') as string,
qs: {
@@ -1076,7 +1127,8 @@ class App {
// Authorize OAuth Data
this.app.get(`/${this.restEndpoint}/oauth2-credential/auth`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string> => {
if (req.query.id === undefined) {
throw new Error('Required credential id is missing!');
res.status(500).send('Required credential id is missing.');
return '';
}
const result = await Db.collections.Credentials!.findOne(req.query.id as string);
@@ -1088,7 +1140,8 @@ class App {
let encryptionKey = undefined;
encryptionKey = await UserSettings.getEncryptionKey();
if (encryptionKey === undefined) {
throw new Error('No encryption key got found to decrypt the credentials!');
res.status(500).send('No encryption key got found to decrypt the credentials!');
return '';
}
// Decrypt the currently saved credentials
@@ -1136,6 +1189,13 @@ class App {
const authQueryParameters = _.get(oauthCredentials, 'authQueryParameters', '') as string;
let returnUri = oAuthObj.code.getUri();
// if scope uses comma, change it as the library always return then with spaces
if ((_.get(oauthCredentials, 'scope') as string).includes(',')) {
const data = querystring.parse(returnUri.split('?')[1] as string);
data.scope = _.get(oauthCredentials, 'scope') as string;
returnUri = `${_.get(oauthCredentials, 'authUrl', '')}?${querystring.stringify(data)}`;
}
if (authQueryParameters) {
returnUri += '&' + authQueryParameters;
}
@@ -1152,7 +1212,8 @@ class App {
const {code, state: stateEncoded } = req.query;
if (code === undefined || stateEncoded === undefined) {
throw new Error('Insufficient parameters for OAuth2 callback');
const errorResponse = new ResponseHelper.ResponseError('Insufficient parameters for OAuth2 callback. Received following query parameters: ' + JSON.stringify(req.query), undefined, 503);
return ResponseHelper.sendErrorResponse(res, errorResponse);
}
let state;
@@ -1212,10 +1273,13 @@ class App {
};
delete oAuth2Parameters.clientSecret;
}
const redirectUri = `${WebhookHelpers.getWebhookBaseUrl()}${this.restEndpoint}/oauth2-credential/callback`;
const oAuthObj = new clientOAuth2(oAuth2Parameters);
const oauthToken = await oAuthObj.code.getToken(req.originalUrl, options);
const queryParameters = req.originalUrl.split('?').splice(1, 1).join('');
const oauthToken = await oAuthObj.code.getToken(`${redirectUri}?${queryParameters}`, options);
if (oauthToken === undefined) {
const errorResponse = new ResponseHelper.ResponseError('Unable to get access tokens!', undefined, 404);
@@ -1303,7 +1367,7 @@ class App {
retrySuccessId: result.retrySuccessId ? result.retrySuccessId.toString() : undefined,
startedAt: result.startedAt,
stoppedAt: result.stoppedAt,
workflowId: result.workflowData!.id!.toString(),
workflowId: result.workflowData!.id ? result.workflowData!.id!.toString() : '',
workflowName: result.workflowData!.name,
});
}
@@ -1514,6 +1578,8 @@ class App {
saveDataErrorExecution: this.saveDataErrorExecution,
saveDataSuccessExecution: this.saveDataSuccessExecution,
saveManualExecutions: this.saveManualExecutions,
executionTimeout: this.executionTimeout,
maxExecutionTimeout: this.maxExecutionTimeout,
timezone: this.timezone,
urlBaseWebhook: WebhookHelpers.getWebhookBaseUrl(),
versionCli: this.versions!.cli,
@@ -1547,6 +1613,26 @@ class App {
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// OPTIONS webhook requests
this.app.options(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2);
let allowedMethods: string[];
try {
allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
});
// GET webhook requests
this.app.get(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook/" to get the registred part of the url
@@ -1610,6 +1696,26 @@ class App {
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
});
// HEAD webhook requests (test for UI)
this.app.options(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook-test/" to get the registred part of the url
const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhookTest.length + 2);
let allowedMethods: string[];
try {
allowedMethods = await this.testWebhooks.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
ResponseHelper.sendErrorResponse(res, error);
return;
}
ResponseHelper.sendSuccessResponse(res, {}, true, 204);
});
// GET webhook requests (test for UI)
this.app.get(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => {
// Cut away the "/webhook-test/" to get the registred part of the url
@@ -1653,9 +1759,57 @@ class App {
});
if (this.endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
this.app.post(`/${this.endpointPresetCredentials}`, async (req: express.Request, res: express.Response) => {
if (this.presetCredentialsLoaded === false) {
const body = req.body as ICredentialsOverwrite;
if (req.headers['content-type'] !== 'application/json') {
ResponseHelper.sendErrorResponse(res, new Error('Body must be a valid JSON, make sure the content-type is application/json'));
return;
}
const loadNodesAndCredentials = LoadNodesAndCredentials();
const credentialsOverwrites = CredentialsOverwrites();
await credentialsOverwrites.init(body);
const credentialTypes = CredentialTypes();
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
this.presetCredentialsLoaded = true;
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
} else {
ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once'));
}
});
}
// Read the index file and replace the path placeholder
const editorUiPath = require.resolve('n8n-editor-ui');
const filePath = pathJoin(pathDirname(editorUiPath), 'dist', 'index.html');
const n8nPath = config.get('path');
let readIndexFile = readFileSync(filePath, 'utf8');
readIndexFile = readIndexFile.replace(/\/%BASE_PATH%\//g, n8nPath);
readIndexFile = readIndexFile.replace(/\/favicon.ico/g, `${n8nPath}/favicon.ico`);
// Serve the altered index.html file separately
this.app.get(`/index.html`, async (req: express.Request, res: express.Response) => {
res.send(readIndexFile);
});
// Serve the website
const startTime = (new Date()).toUTCString();
const editorUiPath = require.resolve('n8n-editor-ui');
this.app.use('/', express.static(pathJoin(pathDirname(editorUiPath), 'dist'), {
index: 'index.html',
setHeaders: (res, path) => {

View File

@@ -110,6 +110,21 @@ export class TestWebhooks {
});
}
/**
* Gets all request methods associated with a single test webhook
* @param path webhook path
*/
async getWebhookMethods(path : string) : Promise<string[]> {
const webhookMethods: string[] = this.activeWebhooks!.getWebhookMethods(path);
if (webhookMethods === undefined) {
// The requested webhook is not registered
throw new ResponseHelper.ResponseError(`The requested webhook "${path}" is not registered.`, 404, 404);
}
return webhookMethods;
}
/**
* Checks if it has to wait for webhook data to execute the workflow. If yes it waits
@@ -141,12 +156,14 @@ export class TestWebhooks {
let key: string;
for (const webhookData of webhooks) {
key = this.activeWebhooks!.getWebhookKey(webhookData.httpMethod, webhookData.path);
await this.activeWebhooks!.add(workflow, webhookData, mode);
this.testWebhookData[key] = {
sessionId,
timeout,
workflowData,
};
await this.activeWebhooks!.add(workflow, webhookData, mode);
// Save static data!
this.testWebhookData[key].workflowData.staticData = workflow.staticData;

View File

@@ -69,6 +69,26 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
return returnData;
}
/**
* Returns all the webhooks which should be created for the give workflow
*
* @export
* @param {string} workflowId
* @param {Workflow} workflow
* @returns {IWebhookData[]}
*/
export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
// Check all the nodes in the workflow if they have webhooks
const returnData: IWebhookData[] = [];
for (const node of Object.values(workflow.nodes)) {
returnData.push.apply(returnData, NodeHelpers.getNodeWebhooksBasic(workflow, node));
}
return returnData;
}
/**
* Executes a webhook

View File

@@ -41,6 +41,8 @@ import {
import * as config from '../config';
import { LessThanOrEqual } from "typeorm";
/**
* Checks if there was an error and if errorWorkflow is defined. If so it collects
@@ -79,6 +81,30 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
}
}
/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
*
*/
let throttling = false;
function pruneExecutionData(): void {
if (!throttling) {
throttling = true;
const timeout = config.get('executions.pruneDataTimeout') as number; // in seconds
const maxAge = config.get('executions.pruneDataMaxAge') as number; // in h
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);
// throttle just on success to allow for self healing on failure
Db.collections.Execution!.delete({ stoppedAt: LessThanOrEqual(date.toISOString()) })
.then(data =>
setTimeout(() => {
throttling = false;
}, timeout * 1000)
).catch(err => throttling = false);
}
}
/**
* Pushes the execution out to all connected clients
@@ -189,6 +215,11 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
// Prune old execution data
if (config.get('executions.pruneData')) {
pruneExecutionData();
}
const isManualMode = [this.mode, parentProcessMode].includes('manual');
try {
@@ -316,14 +347,14 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi
// Does not get used so set it simply to empty string
const executionId = '';
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(additionalData.credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
additionalDataIntegrated.credentials = await WorkflowCredentials(workflowData!.nodes);
const credentials = await WorkflowCredentials(workflowData!.nodes);
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];

View File

@@ -90,7 +90,6 @@ export class WorkflowRunner {
WorkflowExecuteAdditionalData.pushExecutionFinished(executionMode, fullRunData, executionId);
}
/**
* Run the workflow
*
@@ -155,7 +154,25 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
// Soft timeout to stop workflow execution after current running node
let executionTimeout: NodeJS.Timeout;
let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default
if (data.workflowData.settings && data.workflowData.settings.executionTimeout) {
workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number; // preference on workflow setting
}
if (workflowTimeout) {
const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds
executionTimeout = setTimeout(() => {
this.activeExecutions.stopExecution(executionId, 'timeout');
}, timeout);
}
workflowExecution.then((fullRunData) => {
clearTimeout(executionTimeout);
if (workflowExecution.isCanceled) {
fullRunData.finished = false;
}
this.activeExecutions.remove(executionId, fullRunData);
});
@@ -218,24 +235,54 @@ export class WorkflowRunner {
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
// Start timeout for the execution
let executionTimeout: NodeJS.Timeout;
let workflowTimeout = config.get('executions.timeout') as number > 0 && config.get('executions.timeout') as number; // initialize with default
if (data.workflowData.settings && data.workflowData.settings.executionTimeout) {
workflowTimeout = data.workflowData.settings!.executionTimeout as number > 0 && data.workflowData.settings!.executionTimeout as number; // preference on workflow setting
}
if (workflowTimeout) {
const timeout = Math.min(workflowTimeout, config.get('executions.maxTimeout') as number) * 1000; // as seconds
executionTimeout = setTimeout(() => {
this.activeExecutions.stopExecution(executionId, 'timeout');
executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds
}, timeout);
}
// Listen to data from the subprocess
subprocess.on('message', (message: IProcessMessage) => {
if (message.type === 'end') {
clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId!, message.data.runData);
} else if (message.type === 'processError') {
clearTimeout(executionTimeout);
const executionError = message.data.executionError as IExecutionError;
this.processError(executionError, startedAt, data.executionMode, executionId);
} else if (message.type === 'processHook') {
this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = { message: 'Workflow execution timed out!' } as IExecutionError;
this.processError(timeoutError, startedAt, data.executionMode, executionId);
}
});
// Also get informed when the processes does exit especially when it did crash
// Also get informed when the processes does exit especially when it did crash or timed out
subprocess.on('exit', (code, signal) => {
if (code !== 0) {
if (signal === 'SIGTERM'){
// Execution timed out and its process has been terminated
const timeoutError = {
message: 'Workflow execution timed out!',
} as IExecutionError;
this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (code !== 0) {
// Process did exit with error code, so something went wrong.
const executionError = {
message: 'Workflow execution process did crash for an unknown reason!',
@@ -243,6 +290,7 @@ export class WorkflowRunner {
this.processError(executionError, startedAt, data.executionMode, executionId);
}
clearTimeout(executionTimeout);
});
return executionId;

View File

@@ -190,17 +190,18 @@ process.on('message', async (message: IProcessMessage) => {
// Once the workflow got executed make sure the process gets killed again
process.exit();
} else if (message.type === 'stopExecution') {
} else if (message.type === 'stopExecution' || message.type === 'timeout') {
// The workflow execution should be stopped
let runData: IRun;
if (workflowRunner.workflowExecute !== undefined) {
// Workflow started already executing
runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt);
// If there is any data send it to parent process
await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!);
const timeOutError = message.type === 'timeout' ? { message: 'Workflow execution timed out!' } as IExecutionError : undefined;
// If there is any data send it to parent process, if execution timedout add the error
await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!, timeOutError);
} else {
// Workflow did not get started yet
runData = {
@@ -209,7 +210,7 @@ process.on('message', async (message: IProcessMessage) => {
runData: {},
},
},
finished: true,
finished: message.type !== 'timeout',
mode: workflowRunner.data!.executionMode,
startedAt: workflowRunner.startedAt,
stoppedAt: new Date(),
@@ -218,7 +219,7 @@ process.on('message', async (message: IProcessMessage) => {
workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]);
}
await sendToParentProcess('end', {
await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', {
runData,
});

View File

@@ -39,6 +39,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@Column('Date')
startedAt: Date;
@Index()
@Column('Date')
stoppedAt: Date;

View File

@@ -0,0 +1,30 @@
import {
Column,
Entity,
Index,
ObjectID,
ObjectIdColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@ObjectIdColumn()
id: ObjectID;
@Column()
workflowId: number;
@Column()
webhookPath: string;
@Column()
method: string;
@Column()
node: string;
}

View File

@@ -1,3 +1,5 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View File

@@ -0,0 +1,22 @@
import { MigrationInterface } from "typeorm";
import {
MongoQueryRunner,
} from 'typeorm/driver/mongodb/MongoQueryRunner';
import * as config from '../../../../config';
export class CreateIndexStoppedAt1594910478695 implements MigrationInterface {
name = 'CreateIndexStoppedAt1594910478695';
async up(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.manager.createCollectionIndex(`${tablePrefix}execution_entity`, 'stoppedAt', { name: `IDX_${tablePrefix}execution_entity_stoppedAt` });
}
async down(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.manager.dropCollectionIndex
(`${tablePrefix}execution_entity`, `IDX_${tablePrefix}execution_entity_stoppedAt`);
}
}

View File

@@ -0,0 +1,57 @@
import {
MigrationInterface,
} from 'typeorm';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow/dist/src/Workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
import * as config from '../../../../config';
import {
MongoQueryRunner,
} from 'typeorm/driver/mongodb/MongoQueryRunner';
export class WebhookModel1592679094242 implements MigrationInterface {
name = 'WebhookModel1592679094242';
async up(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
const workflows = await queryRunner.cursor( `${tablePrefix}workflow_entity`, { active: true }).toArray() as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.insertMany(`${tablePrefix}webhook_entity`, data);
}
await queryRunner.manager.createCollectionIndex(`${tablePrefix}webhook_entity`, ['webhookPath', 'method'], { unique: true, background: false });
}
async down(queryRunner: MongoQueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.dropTable(`${tablePrefix}webhook_entity`);
}
}

View File

@@ -1 +1,3 @@
export * from './1587563438936-InitialMigration';
export * from './1592679094242-WebhookModel';
export * from './151594910478695-CreateIndexStoppedAt';

View File

@@ -39,6 +39,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@Column('datetime')
startedAt: Date;
@Index()
@Column('datetime')
stoppedAt: Date;

View File

@@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View File

@@ -1,3 +1,4 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View File

@@ -0,0 +1,59 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
export class WebhookModel1592447867632 implements MigrationInterface {
name = 'WebhookModel1592447867632';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity (workflowId int NOT NULL, webhookPath varchar(255) NOT NULL, method varchar(255) NOT NULL, node varchar(255) NOT NULL, PRIMARY KEY (webhookPath, method)) ENGINE=InnoDB`);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`);
}
}

View File

@@ -0,0 +1,20 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import * as config from '../../../../config';
export class CreateIndexStoppedAt1594902918301 implements MigrationInterface {
name = 'CreateIndexStoppedAt1594902918301';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query('CREATE INDEX `IDX_' + tablePrefix + 'cefb067df2402f6aed0638a6c1` ON `' + tablePrefix + 'execution_entity` (`stoppedAt`)');
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query('DROP INDEX `IDX_' + tablePrefix + 'cefb067df2402f6aed0638a6c1` ON `' + tablePrefix + 'execution_entity`');
}
}

View File

@@ -1 +1,3 @@
export * from './1588157391238-InitialMigration';
export * from './1588157391238-InitialMigration';
export * from './1592447867632-WebhookModel';
export * from './1594902918301-CreateIndexStoppedAt';

View File

@@ -39,6 +39,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@Column('timestamp')
startedAt: Date;
@Index()
@Column('timestamp')
stoppedAt: Date;

View File

@@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View File

@@ -1,3 +1,5 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View File

@@ -1,4 +1,5 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import {
MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';

View File

@@ -0,0 +1,69 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
import * as config from '../../../../config';
export class WebhookModel1589476000887 implements MigrationInterface {
name = 'WebhookModel1589476000887';
async up(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const tablePrefixIndex = tablePrefix;
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`CREATE TABLE ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" character varying NOT NULL, "method" character varying NOT NULL, "node" character varying NOT NULL, CONSTRAINT "PK_${tablePrefixIndex}b21ace2e13596ccd87dc9bf4ea6" PRIMARY KEY ("webhookPath", "method"))`, undefined);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`, undefined);
}
}

View File

@@ -0,0 +1,25 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import * as config from '../../../../config';
export class CreateIndexStoppedAt1594828256133 implements MigrationInterface {
name = 'CreateIndexStoppedAt1594828256133';
async up(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.get('database.tablePrefix');
const tablePrefixPure = tablePrefix;
const schema = config.get('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`CREATE INDEX IF NOT EXISTS IDX_${tablePrefixPure}33228da131bb1112247cf52a42 ON ${tablePrefix}execution_entity ("stoppedAt") `);
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP INDEX IDX_${tablePrefix}33228da131bb1112247cf52a42`);
}
}

View File

@@ -1 +1,4 @@
export * from './1587669153312-InitialMigration';
export * from './1589476000887-WebhookModel';
export * from './1594828256133-CreateIndexStoppedAt';

View File

@@ -39,6 +39,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {
@Column()
startedAt: Date;
@Index()
@Column()
stoppedAt: Date;

View File

@@ -0,0 +1,25 @@
import {
Column,
Entity,
PrimaryColumn,
} from 'typeorm';
import {
IWebhookDb,
} from '../../Interfaces';
@Entity()
export class WebhookEntity implements IWebhookDb {
@Column()
workflowId: number;
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@Column()
node: string;
}

View File

@@ -1,4 +1,4 @@
export * from './CredentialsEntity';
export * from './ExecutionEntity';
export * from './WorkflowEntity';
export * from './WebhookEntity';

View File

@@ -1,4 +1,7 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';

View File

@@ -0,0 +1,63 @@
import {
MigrationInterface,
QueryRunner,
} from 'typeorm';
import * as config from '../../../../config';
import {
IWorkflowDb,
NodeTypes,
WebhookHelpers,
} from '../../..';
import {
Workflow,
} from 'n8n-workflow';
import {
IWebhookDb,
} from '../../../Interfaces';
export class WebhookModel1592445003908 implements MigrationInterface {
name = 'WebhookModel1592445003908';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`CREATE TABLE IF NOT EXISTS ${tablePrefix}webhook_entity ("workflowId" integer NOT NULL, "webhookPath" varchar NOT NULL, "method" varchar NOT NULL, "node" varchar NOT NULL, PRIMARY KEY ("webhookPath", "method"))`);
const workflows = await queryRunner.query(`SELECT * FROM ${tablePrefix}workflow_entity WHERE active=true`) as IWorkflowDb[];
const data: IWebhookDb[] = [];
const nodeTypes = NodeTypes();
for (const workflow of workflows) {
workflow.nodes = JSON.parse(workflow.nodes as unknown as string);
workflow.connections = JSON.parse(workflow.connections as unknown as string);
workflow.staticData = JSON.parse(workflow.staticData as unknown as string);
workflow.settings = JSON.parse(workflow.settings as unknown as string);
const workflowInstance = new Workflow({ id: workflow.id as string, name: workflow.name, nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes, staticData: workflow.staticData, settings: workflow.settings });
const webhooks = WebhookHelpers.getWorkflowWebhooksBasic(workflowInstance);
for (const webhook of webhooks) {
data.push({
workflowId: workflowInstance.id as string,
webhookPath: webhook.path,
method: webhook.httpMethod,
node: webhook.node,
});
}
}
if (data.length !== 0) {
await queryRunner.manager.createQueryBuilder()
.insert()
.into(`${tablePrefix}webhook_entity`)
.values(data)
.execute();
}
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP TABLE ${tablePrefix}webhook_entity`);
}
}

View File

@@ -0,0 +1,20 @@
import { MigrationInterface, QueryRunner } from "typeorm";
import * as config from '../../../../config';
export class CreateIndexStoppedAt1594825041918 implements MigrationInterface {
name = 'CreateIndexStoppedAt1594825041918';
async up(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "execution_entity" ("stoppedAt") `);
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix');
await queryRunner.query(`DROP INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1"`);
}
}

View File

@@ -1 +1,3 @@
export * from './1588102412422-InitialMigration';
export * from './1588102412422-InitialMigration';
export * from './1592445003908-WebhookModel';
export * from './1594825041918-CreateIndexStoppedAt';