Initial commit to release

This commit is contained in:
Jan Oberhauser
2019-06-23 12:35:23 +02:00
commit 9cb9804eee
257 changed files with 42436 additions and 0 deletions

View File

@@ -0,0 +1,172 @@
import {
IRun,
IRunExecutionData,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
createDeferredPromise,
IExecutingWorkflowData,
IExecutionsCurrentSummary,
} from '.';
export class ActiveExecutions {
private nextId = 1;
private activeExecutions: {
[index: string]: IExecutingWorkflowData;
} = {};
private stopExecutions: string[] = [];
/**
* Add a new active execution
*
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @param {WorkflowExecuteMode} mode
* @returns {string}
* @memberof ActiveExecutions
*/
add(workflow: Workflow, runExecutionData: IRunExecutionData, mode: WorkflowExecuteMode): string {
const executionId = this.nextId++;
this.activeExecutions[executionId] = {
runExecutionData,
startedAt: new Date().getTime(),
mode,
workflow,
postExecutePromises: [],
};
return executionId.toString();
}
/**
* Remove an active execution
*
* @param {string} executionId
* @param {IRun} fullRunData
* @returns {void}
* @memberof ActiveExecutions
*/
remove(executionId: string, fullRunData: IRun): void {
if (this.activeExecutions[executionId] === undefined) {
return;
}
// Resolve all the waiting promises
for (const promise of this.activeExecutions[executionId].postExecutePromises) {
promise.resolve(fullRunData);
}
// Remove from the list of active executions
delete this.activeExecutions[executionId];
const stopExecutionIndex = this.stopExecutions.indexOf(executionId);
if (stopExecutionIndex !== -1) {
// If it was on the stop-execution list remove it
this.stopExecutions.splice(stopExecutionIndex, 1);
}
}
/**
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @returns {(Promise<IRun | undefined>)}
* @memberof ActiveExecutions
*/
async stopExecution(executionId: string): Promise<IRun | undefined> {
if (this.activeExecutions[executionId] === undefined) {
// There is no execution running with that id
return;
}
if (!this.stopExecutions.includes(executionId)) {
// Add the execution to the stop list if it is not already on it
this.stopExecutions.push(executionId);
}
return this.getPostExecutePromise(executionId);
}
/**
* Returns a promise which will resolve with the data of the execution
* with the given id
*
* @param {string} executionId The id of the execution to wait for
* @returns {Promise<IRun>}
* @memberof ActiveExecutions
*/
async getPostExecutePromise(executionId: string): Promise<IRun> {
// Create the promise which will be resolved when the execution finished
const waitPromise = await createDeferredPromise<IRun>();
if (this.activeExecutions[executionId] === undefined) {
throw new Error(`There is no active execution with id "${executionId}".`);
}
this.activeExecutions[executionId].postExecutePromises.push(waitPromise);
return waitPromise.promise();
}
/**
* Returns if the execution should be stopped
*
* @param {string} executionId The execution id to check
* @returns {boolean}
* @memberof ActiveExecutions
*/
shouldBeStopped(executionId: string): boolean {
return this.stopExecutions.includes(executionId);
}
/**
* Returns all the currently active executions
*
* @returns {IExecutionsCurrentSummary[]}
* @memberof ActiveExecutions
*/
getActiveExecutions(): IExecutionsCurrentSummary[] {
const returnData: IExecutionsCurrentSummary[] = [];
let executionData;
for (const id of Object.keys(this.activeExecutions)) {
executionData = this.activeExecutions[id];
returnData.push(
{
id,
startedAt: executionData.startedAt,
mode: executionData.mode,
workflowId: executionData.workflow.id!,
}
);
}
return returnData;
}
}
let activeExecutionsInstance: ActiveExecutions | undefined;
export function getInstance(): ActiveExecutions {
if (activeExecutionsInstance === undefined) {
activeExecutionsInstance = new ActiveExecutions();
}
return activeExecutionsInstance;
}

View File

@@ -0,0 +1,175 @@
import {
IWebhookData,
WebhookHttpMethod,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
NodeExecuteFunctions,
} from './';
export class ActiveWebhooks {
private workflowWebhooks: {
[key: string]: IWebhookData[];
} = {};
private webhookUrls: {
[key: string]: IWebhookData;
} = {};
testWebhooks = false;
/**
* Adds a new webhook
*
* @param {IWebhookData} webhookData
* @param {WorkflowExecuteMode} mode
* @returns {Promise<void>}
* @memberof ActiveWebhooks
*/
async add(webhookData: IWebhookData, mode: WorkflowExecuteMode): Promise<void> {
if (webhookData.workflow.id === undefined) {
throw new Error('Webhooks can only be added for saved workflows as an id is needed!');
}
if (this.workflowWebhooks[webhookData.workflow.id] === undefined) {
this.workflowWebhooks[webhookData.workflow.id] = [];
}
// Make the webhook available directly because sometimes to create it successfully
// it gets called
this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)] = webhookData;
const webhookExists = await webhookData.workflow.runWebhookMethod('checkExists', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
if (webhookExists === false) {
// If webhook does not exist yet create it
await webhookData.workflow.runWebhookMethod('create', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
}
// Run the "activate" hooks on the nodes
await webhookData.workflow.runNodeHooks('activate', webhookData, NodeExecuteFunctions, mode);
this.workflowWebhooks[webhookData.workflow.id].push(webhookData);
}
/**
* Returns webhookData if a webhook with matches is currently registered
*
* @param {WebhookHttpMethod} httpMethod
* @param {string} path
* @returns {(IWebhookData | undefined)}
* @memberof ActiveWebhooks
*/
get(httpMethod: WebhookHttpMethod, path: string): IWebhookData | undefined {
const webhookKey = this.getWebhookKey(httpMethod, path);
if (this.webhookUrls[webhookKey] === undefined) {
return undefined;
}
return this.webhookUrls[webhookKey];
}
/**
* Returns key to uniquely identify a webhook
*
* @param {WebhookHttpMethod} httpMethod
* @param {string} path
* @returns {string}
* @memberof ActiveWebhooks
*/
getWebhookKey(httpMethod: WebhookHttpMethod, path: string): string {
return `${httpMethod}|${path}`;
}
/**
* Removes all webhooks of a workflow
*
* @param {string} workflowId
* @returns {boolean}
* @memberof ActiveWebhooks
*/
async removeByWorkflowId(workflowId: string): Promise<boolean> {
if (this.workflowWebhooks[workflowId] === undefined) {
// If it did not exist then there is nothing to remove
return false;
}
const webhooks = this.workflowWebhooks[workflowId];
const mode = 'internal';
// Go through all the registered webhooks of the workflow and remove them
for (const webhookData of webhooks) {
await webhookData.workflow.runWebhookMethod('delete', webhookData, NodeExecuteFunctions, mode, this.testWebhooks);
// Run the "deactivate" hooks on the nodes
await webhookData.workflow.runNodeHooks('deactivate', webhookData, NodeExecuteFunctions, mode);
delete this.webhookUrls[this.getWebhookKey(webhookData.httpMethod, webhookData.path)];
}
// Remove also the workflow-webhook entry
delete this.workflowWebhooks[workflowId];
return true;
}
/**
* Removes all the currently active webhooks
*/
async removeAll(): Promise<void> {
const workflowIds = Object.keys(this.workflowWebhooks);
const removePromises = [];
for (const workflowId of workflowIds) {
removePromises.push(this.removeByWorkflowId(workflowId));
}
await Promise.all(removePromises);
return;
}
// /**
// * Removes a single webhook by its key.
// * Currently not used, runNodeHooks for "deactivate" is missing
// *
// * @param {string} webhookKey
// * @returns {boolean}
// * @memberof ActiveWebhooks
// */
// removeByWebhookKey(webhookKey: string): boolean {
// if (this.webhookUrls[webhookKey] === undefined) {
// // If it did not exist then there is nothing to remove
// return false;
// }
// const webhookData = this.webhookUrls[webhookKey];
// // Remove from workflow-webhooks
// const workflowWebhooks = this.workflowWebhooks[webhookData.workflowId];
// for (let index = 0; index < workflowWebhooks.length; index++) {
// if (workflowWebhooks[index].path === webhookData.path) {
// workflowWebhooks.splice(index, 1);
// break;
// }
// }
// if (workflowWebhooks.length === 0) {
// // When there are no webhooks left for any workflow remove it totally
// delete this.workflowWebhooks[webhookData.workflowId];
// }
// // Remove from webhook urls
// delete this.webhookUrls[webhookKey];
// return true;
// }
}

View File

@@ -0,0 +1,112 @@
import {
ITriggerResponse,
IWorkflowExecuteAdditionalData,
Workflow,
} from 'n8n-workflow';
import {
NodeExecuteFunctions,
} from './';
export interface WorkflowData {
workflow: Workflow;
triggerResponse?: ITriggerResponse;
}
export class ActiveWorkflows {
private workflowData: {
[key: string]: WorkflowData;
} = {};
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
* @returns {boolean}
* @memberof ActiveWorkflows
*/
isActive(id: string): boolean {
return this.workflowData.hasOwnProperty(id);
}
/**
* Returns the ids of the currently active workflows
*
* @returns {string[]}
* @memberof ActiveWorkflows
*/
allActiveWorkflows(): string[] {
return Object.keys(this.workflowData);
}
/**
* Returns the Workflow data for the workflow with
* the given id if it is currently active
*
* @param {string} id
* @returns {(WorkflowData | undefined)}
* @memberof ActiveWorkflows
*/
get(id: string): WorkflowData | undefined {
return this.workflowData[id];
}
/**
* Makes a workflow active
*
* @param {string} id The id of the workflow to activate
* @param {Workflow} workflow The workflow to activate
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
* @returns {Promise<void>}
* @memberof ActiveWorkflows
*/
async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData): Promise<void> {
console.log('ADD ID (active): ' + id);
this.workflowData[id] = {
workflow
};
const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined;
for (const triggerNode of triggerNodes) {
triggerResponse = await workflow.runTrigger(triggerNode, NodeExecuteFunctions, additionalData, 'trigger');
if (triggerResponse !== undefined) {
// If a response was given save it
this.workflowData[id].triggerResponse = triggerResponse;
}
}
}
/**
* Makes a workflow inactive
*
* @param {string} id The id of the workflow to deactivate
* @returns {Promise<void>}
* @memberof ActiveWorkflows
*/
async remove(id: string): Promise<void> {
console.log('REMOVE ID (active): ' + id);
if (!this.isActive(id)) {
// Workflow is currently not registered
throw new Error(`The workflow with the id "${id}" is currently not active and can so not be removed`);
}
const workflowData = this.workflowData[id];
if (workflowData.triggerResponse && workflowData.triggerResponse.closeFunction) {
await workflowData.triggerResponse.closeFunction();
}
delete this.workflowData[id];
}
}

View File

@@ -0,0 +1,7 @@
export const BINARY_ENCODING = 'base64';
export const CUSTOM_EXTENSION_ENV = 'N8N_CUSTOM_EXTENSIONS';
export const ENCRYPTION_KEY_ENV_OVERWRITE = 'N8N_ENCRYPTION_KEY';
export const EXTENSIONS_SUBDIRECTORY = 'custom';
export const USER_FOLDER_ENV_OVERWRITE = 'N8N_USER_FOLDER';
export const USER_SETTINGS_FILE_NAME = 'config';
export const USER_SETTINGS_SUBFOLDER = '.n8n';

View File

@@ -0,0 +1,120 @@
import {
ICredentialDataDecryptedObject,
CredentialInformation,
ICredentialsEncrypted,
ICredentialNodeAccess,
} from 'n8n-workflow';
import { enc, AES } from 'crypto-js';
export class Credentials implements ICredentialsEncrypted {
name: string;
type: string;
data: string | undefined;
nodesAccess: ICredentialNodeAccess[];
constructor(name: string, type: string, nodesAccess: ICredentialNodeAccess[], data?: string) {
this.name = name;
this.type = type;
this.nodesAccess = nodesAccess;
this.data = data;
}
/**
* Returns if the given nodeType has access to data
*/
hasNodeAccess(nodeType: string): boolean {
for (const accessData of this.nodesAccess) {
if (accessData.nodeType === nodeType) {
return true;
}
}
return false;
}
/**
* Sets new credential object
*/
setData(data: ICredentialDataDecryptedObject, encryptionKey: string): void {
this.data = AES.encrypt(JSON.stringify(data), encryptionKey).toString();
}
/**
* Sets new credentials for given key
*/
setDataKey(key: string, data: CredentialInformation, encryptionKey: string): void {
let fullData;
try {
fullData = this.getData(encryptionKey);
} catch (e) {
fullData = {};
}
fullData[key] = data;
return this.setData(fullData, encryptionKey);
}
/**
* Returns the decrypted credential object
*/
getData(encryptionKey: string, nodeType?: string): ICredentialDataDecryptedObject {
if (nodeType && !this.hasNodeAccess(nodeType)) {
throw new Error(`The node of type "${nodeType}" does not have access to credentials "${this.name}" of type "${this.type}".`);
}
if (this.data === undefined) {
throw new Error('No data is set so nothing can be returned.');
}
const decryptedData = AES.decrypt(this.data, encryptionKey);
try {
return JSON.parse(decryptedData.toString(enc.Utf8));
} catch (e) {
throw new Error('Credentials could not be decrypted. The reason is that probably a different "encryptionKey" got used to encrypt the data than now to decrypt it.');
}
}
/**
* Returns the decrypted credentials for given key
*/
getDataKey(key: string, encryptionKey: string, nodeType?: string): CredentialInformation {
const fullData = this.getData(encryptionKey, nodeType);
if (fullData === null) {
throw new Error(`No data got set.`);
}
if (!fullData.hasOwnProperty(key)) {
throw new Error(`No data for key "${key}" exists.`);
}
return fullData[key];
}
/**
* Returns the encrypted credentials to be saved
*/
getDataToSave(): ICredentialsEncrypted {
if (this.data === undefined) {
throw new Error(`No credentials got set to save.`);
}
return {
name: this.name,
type: this.type,
data: this.data,
nodesAccess: this.nodesAccess,
};
}
}

View File

@@ -0,0 +1,14 @@
// From: https://gist.github.com/compulim/8b49b0a744a3eeb2205e2b9506201e50
export interface IDeferredPromise<T> {
promise: () => Promise<T>;
reject: (error: Error) => void;
resolve: (result: T) => void;
}
export function createDeferredPromise<T>(): Promise<IDeferredPromise<T>> {
return new Promise<IDeferredPromise<T>>(resolveCreate => {
const promise = new Promise<T>((resolve, reject) => {
resolveCreate({ promise: () => promise, resolve, reject });
});
});
}

View File

@@ -0,0 +1,115 @@
import {
IBinaryData,
ICredentialType,
IDataObject,
IExecuteFunctions as IExecuteFunctionsBase,
IExecuteSingleFunctions as IExecuteSingleFunctionsBase,
IHookFunctions as IHookFunctionsBase,
ILoadOptionsFunctions as ILoadOptionsFunctionsBase,
INodeExecutionData,
INodeType,
IRun,
IRunExecutionData,
ITriggerFunctions as ITriggerFunctionsBase,
IWebhookFunctions as IWebhookFunctionsBase,
IWorkflowSettings as IWorkflowSettingsWorkflow,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
IDeferredPromise
} from '.';
import * as request from 'request';
import * as requestPromise from 'request-promise-native';
interface Constructable<T> {
new(): T;
}
export interface IExecuteFunctions extends IExecuteFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI < requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl >,
};
}
export interface IExecutingWorkflowData {
runExecutionData: IRunExecutionData;
startedAt: number;
mode: WorkflowExecuteMode;
workflow: Workflow;
postExecutePromises: Array<IDeferredPromise<IRun>>;
}
export interface IExecutionsCurrentSummary {
id: string;
startedAt: number;
mode: WorkflowExecuteMode;
workflowId: string;
}
export interface ITriggerFunctions extends ITriggerFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
export interface IUserSettings {
encryptionKey?: string;
tunnelSubdomain?: string;
}
export interface ILoadOptionsFunctions extends ILoadOptionsFunctionsBase {
helpers: {
request?: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
};
}
export interface IHookFunctions extends IHookFunctionsBase {
helpers: {
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
};
}
export interface IWebhookFunctions extends IWebhookFunctionsBase {
helpers: {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>;
request: request.RequestAPI<requestPromise.RequestPromise, requestPromise.RequestPromiseOptions, request.RequiredUriUrl>,
returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[];
};
}
export interface IWorkflowSettings extends IWorkflowSettingsWorkflow {
errorWorkflow?: string;
timezone?: string;
saveManualRuns?: boolean;
}
// New node definition in file
export interface INodeDefinitionFile {
[key: string]: Constructable<INodeType | ICredentialType>;
}
// Is identical to TaskDataConnections but does not allow null value to be used as input for nodes
export interface INodeInputDataConnections {
[key: string]: INodeExecutionData[][];
}

View File

@@ -0,0 +1,97 @@
import {
INode,
INodeCredentials,
INodePropertyOptions,
INodeTypes,
IWorkflowExecuteAdditionalData,
Workflow,
} from 'n8n-workflow';
import {
NodeExecuteFunctions,
} from './';
const TEMP_NODE_NAME = 'Temp-Node';
const TEMP_WORKFLOW_NAME = 'Temp-Workflow';
export class LoadNodeParameterOptions {
workflow: Workflow;
constructor(nodeTypeName: string, nodeTypes: INodeTypes, credentials?: INodeCredentials) {
const nodeType = nodeTypes.getByName(nodeTypeName);
if (nodeType === undefined) {
throw new Error(`The node-type "${nodeTypeName}" is not known!`);
}
const nodeData: INode = {
parameters: {
},
name: TEMP_NODE_NAME,
type: nodeTypeName,
typeVersion: 1,
position: [
0,
0,
]
};
if (credentials) {
nodeData.credentials = credentials;
}
const workflowData = {
nodes: [
nodeData,
],
connections: {},
};
this.workflow = new Workflow(undefined, workflowData.nodes, workflowData.connections, false, nodeTypes, undefined);
}
/**
* Returns data of a fake workflow
*
* @returns
* @memberof LoadNodeParameterOptions
*/
getWorkflowData() {
return {
name: TEMP_WORKFLOW_NAME,
active: false,
connections: {},
nodes: Object.values(this.workflow.nodes),
createdAt: 0,
updatedAt: 0,
};
}
/**
* Returns the available options
*
* @param {string} methodName The name of the method of which to get the data from
* @param {IWorkflowExecuteAdditionalData} additionalData
* @returns {Promise<INodePropertyOptions[]>}
* @memberof LoadNodeParameterOptions
*/
getOptions(methodName: string, additionalData: IWorkflowExecuteAdditionalData): Promise<INodePropertyOptions[]> {
const node = this.workflow.getNode(TEMP_NODE_NAME);
const nodeType = this.workflow.nodeTypes.getByName(node!.type);
if (nodeType!.methods === undefined || nodeType!.methods.loadOptions === undefined || nodeType!.methods.loadOptions[methodName] === undefined) {
throw new Error(`The node-type "${node!.type}" does not have the method "${methodName}" defined!`);
}
const thisArgs = NodeExecuteFunctions.getLoadOptionsFunctions(this.workflow, node!, additionalData);
return nodeType!.methods.loadOptions[methodName].call(thisArgs);
}
}

View File

@@ -0,0 +1,639 @@
import {
Credentials,
IHookFunctions,
ILoadOptionsFunctions,
IWorkflowSettings,
WorkflowExecute,
BINARY_ENCODING,
} from './';
import {
IBinaryData,
IContextObject,
ICredentialDataDecryptedObject,
IDataObject,
IExecuteData,
IExecuteFunctions,
IExecuteSingleFunctions,
INode,
INodeExecutionData,
INodeParameters,
INodeType,
IRunExecutionData,
ITaskDataConnections,
ITriggerFunctions,
IWebhookDescription,
IWebhookFunctions,
IWorkflowExecuteAdditionalData,
NodeHelpers,
NodeParameterValue,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { get } from 'lodash';
import * as express from "express";
import * as path from 'path';
import * as requestPromise from 'request-promise-native';
import { Magic, MAGIC_MIME_TYPE } from 'mmmagic';
const magic = new Magic(MAGIC_MIME_TYPE);
/**
* Takes a buffer and converts it into the format n8n uses. It encodes the binary data as
* base64 and adds metadata.
*
* @export
* @param {Buffer} binaryData
* @param {string} [filePath]
* @param {string} [mimeType]
* @returns {Promise<IBinaryData>}
*/
export async function prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData> {
if (!mimeType) {
// If not mime type is given figure it out
mimeType = await new Promise<string>(
(resolve, reject) => {
magic.detect(binaryData, (err: Error, mimeType: string) => {
if (err) {
return reject(err);
}
return resolve(mimeType);
});
}
);
}
const returnData: IBinaryData = {
mimeType,
// TODO: Should program it in a way that it does not have to converted to base64
// It should only convert to and from base64 when saved in database because
// of for example an error or when there is a wait node.
data: binaryData.toString(BINARY_ENCODING)
};
if (filePath) {
if (filePath.includes('?')) {
// Remove maybe present query parameters
filePath = filePath.split('?').shift();
}
const filePathParts = path.parse(filePath as string);
returnData.fileName = filePathParts.base;
// Remove the dot
const fileExtension = filePathParts.ext.slice(1);
if (fileExtension) {
returnData.fileExtension = fileExtension;
}
}
return returnData;
}
/**
* Takes generic input data and brings it into the json format n8n uses.
*
* @export
* @param {(IDataObject | IDataObject[])} jsonData
* @returns {INodeExecutionData[]}
*/
export function returnJsonArray(jsonData: IDataObject | IDataObject[]): INodeExecutionData[] {
const returnData: INodeExecutionData[] = [];
if (!Array.isArray(jsonData)) {
jsonData = [jsonData];
}
jsonData.forEach((data) => {
returnData.push({ json: data });
});
return returnData;
}
/**
* Returns the requested decrypted credentials if the node has access to them.
*
* @export
* @param {Workflow} workflow Workflow which requests the data
* @param {INode} node Node which request the data
* @param {string} type The credential type to return
* @param {IWorkflowExecuteAdditionalData} additionalData
* @returns {(ICredentialDataDecryptedObject | undefined)}
*/
export function getCredentials(workflow: Workflow, node: INode, type: string, additionalData: IWorkflowExecuteAdditionalData): ICredentialDataDecryptedObject | undefined {
// Get the NodeType as it has the information if the credentials are required
const nodeType = workflow.nodeTypes.getByName(node.type);
if (nodeType === undefined) {
throw new Error(`Node type "${node.type}" is not known so can not get credentials!`);
}
if (nodeType.description.credentials === undefined) {
throw new Error(`Node type "${node.type}" does not have any credentials defined!`);
}
const nodeCredentialDescription = nodeType.description.credentials.find((credentialTypeDescription) => credentialTypeDescription.name === type);
if (nodeCredentialDescription === undefined) {
throw new Error(`Node type "${node.type}" does not have any credentials of type "${type}" defined!`);
}
if (NodeHelpers.displayParameter(node.parameters, nodeCredentialDescription, node.parameters) === false) {
// Credentials should not be displayed so return undefined even if they would be defined
return undefined;
}
// Check if node has any credentials defined
if (!node.credentials || !node.credentials[type]) {
// If none are defined check if the credentials are required or not
if (nodeCredentialDescription.required === true) {
// Credentials are required so error
if (!node.credentials) {
throw new Error('Node does not have any credentials set!');
}
if (!node.credentials[type]) {
throw new Error(`Node does not have any credentials set for "${type}"!`);
}
} else {
// Credentials are not required so resolve with undefined
return undefined;
}
}
const name = node.credentials[type];
if (!additionalData.credentials[type]) {
throw new Error(`No credentials of type "${type}" exist.`);
}
if (!additionalData.credentials[type][name]) {
throw new Error(`No credentials with name "${name}" exist for type "${type}".`);
}
const credentialData = additionalData.credentials[type][name];
const credentials = new Credentials(name, type, credentialData.nodesAccess, credentialData.data);
const decryptedDataObject = credentials.getData(additionalData.encryptionKey, node.type);
if (decryptedDataObject === null) {
throw new Error('Could not get the credentials');
}
return decryptedDataObject;
}
/**
* Returns the requested resolved (all expressions replaced) node parameters.
*
* @export
* @param {Workflow} workflow
* @param {(IRunExecutionData | null)} runExecutionData
* @param {number} runIndex
* @param {INodeExecutionData[]} connectionInputData
* @param {INode} node
* @param {string} parameterName
* @param {number} itemIndex
* @param {*} [fallbackValue]
* @returns {(NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object)}
*/
export function getNodeParameter(workflow: Workflow, runExecutionData: IRunExecutionData | null, runIndex: number, connectionInputData: INodeExecutionData[], node: INode, parameterName: string, itemIndex: number, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object { //tslint:disable-line:no-any
const nodeType = workflow.nodeTypes.getByName(node.type);
if (nodeType === undefined) {
throw new Error(`Node type "${node.type}" is not known so can not return paramter value!`);
}
const value = get(node.parameters, parameterName, fallbackValue);
if (value === undefined) {
throw new Error(`Could not get parameter "${parameterName}"!`);
}
const returnData = workflow.getParameterValue(value, runExecutionData, runIndex, itemIndex, node.name, connectionInputData);
return returnData;
}
/**
* Returns the timezone for the workflow
*
* @export
* @param {Workflow} workflow
* @param {IWorkflowExecuteAdditionalData} additionalData
* @returns {string}
*/
export function getTimezone(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData):string {
if (workflow.settings !== undefined && workflow.settings.timezone !== undefined) {
return (workflow.settings as IWorkflowSettings).timezone as string;
}
return additionalData.timezone;
}
/**
* Returns the execute functions the trigger nodes have access to.
*
* @export
* @param {Workflow} workflow
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {ITriggerFunctions}
*/
export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): ITriggerFunctions {
return ((workflow: Workflow, node: INode) => {
return {
emit: (data: INodeExecutionData[][]): void => {
const workflowExecute = new WorkflowExecute(additionalData, mode);
const nodeExecutionStack: IExecuteData[] = [
{
node,
data: {
main: data,
}
}
];
const runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
workflowExecute.runExecutionData(workflow, runExecutionData);
},
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
helpers: {
prepareBinaryData,
request: requestPromise,
returnJsonArray,
},
};
}) (workflow, node);
}
/**
* Returns the execute functions regular nodes have access to.
*
* @export
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @param {number} runIndex
* @param {INodeExecutionData[]} connectionInputData
* @param {ITaskDataConnections} inputData
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IExecuteFunctions}
*/
export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return [];
}
// TODO: Check if nodeType has input with that index defined
if (inputData[inputName].length < inputIndex) {
throw new Error(`Could not get input index "${inputIndex}" of input "${inputName}"!`);
}
if (inputData[inputName][inputIndex] === null) {
// return [];
throw new Error(`Value "${inputIndex}" of input "${inputName}" did not get set!`);
}
// TODO: Maybe do clone of data only here so it only clones the data that is really needed
return inputData[inputName][inputIndex] as INodeExecutionData[];
},
getNodeParameter: (parameterName: string, itemIndex: number, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
prepareOutputData: NodeHelpers.prepareOutputData,
helpers: {
prepareBinaryData,
request: requestPromise,
returnJsonArray,
},
};
})(workflow, runExecutionData, connectionInputData, inputData, node);
}
/**
* Returns the execute functions regular nodes have access to when single-function is defined.
*
* @export
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @param {number} runIndex
* @param {INodeExecutionData[]} connectionInputData
* @param {ITaskDataConnections} inputData
* @param {INode} node
* @param {number} itemIndex
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IExecuteSingleFunctions}
*/
export function getExecuteSingleFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, itemIndex: number, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return {json: {}};
}
// TODO: Check if nodeType has input with that index defined
if (inputData[inputName].length < inputIndex) {
throw new Error(`Could not get input index "${inputIndex}" of input "${inputName}"!`);
}
const allItems = inputData[inputName][inputIndex];
if (allItems === null) {
// return [];
throw new Error(`Value "${inputIndex}" of input "${inputName}" did not get set!`);
}
if (allItems[itemIndex] === null) {
// return [];
throw new Error(`Value "${inputIndex}" of input "${inputName}" with itemIndex "${itemIndex}" did not get set!`);
}
return allItems[itemIndex] as INodeExecutionData;
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
helpers: {
prepareBinaryData,
request: requestPromise,
},
};
})(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex);
}
/**
* Returns the execute functions regular nodes have access to in load-options-function.
*
* @export
* @param {Workflow} workflow
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @returns {ILoadOptionsFunctions}
*/
export function getLoadOptionsFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData): ILoadOptionsFunctions {
return ((workflow: Workflow, node: INode) => {
const that = {
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
helpers: {
request: requestPromise,
},
};
return that;
})(workflow, node);
}
/**
* Returns the execute functions regular nodes have access to in hook-function.
*
* @export
* @param {Workflow} workflow
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IHookFunctions}
*/
export function getExecuteHookFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, isTest?: boolean): IHookFunctions {
return ((workflow: Workflow, node: INode) => {
const that = {
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getNodeWebhookUrl: (name: string): string | undefined => {
let baseUrl = additionalData.webhookBaseUrl;
if (isTest === true) {
baseUrl = additionalData.webhookTestBaseUrl;
}
const webhookDescription = that.getWebhookDescription(name);
if (webhookDescription === undefined) {
return undefined;
}
const path = workflow.getWebhookParameterValue(node, webhookDescription, 'path');
if (path === undefined) {
return undefined;
}
return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path);
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getWebhookDescription(name: string): IWebhookDescription | undefined {
const nodeType = workflow.nodeTypes.getByName(node.type) as INodeType;
if (nodeType.description.webhooks === undefined) {
// Node does not have any webhooks so return
return undefined;
}
for (const webhookDescription of nodeType.description.webhooks) {
if (webhookDescription.name === name) {
return webhookDescription;
}
}
return undefined;
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
helpers: {
request: requestPromise,
},
};
return that;
})(workflow, node);
}
/**
* Returns the execute functions regular nodes have access to when webhook-function is defined.
*
* @export
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @param {INode} node
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {WorkflowExecuteMode} mode
* @returns {IWebhookFunctions}
*/
export function getExecuteWebhookFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IWebhookFunctions {
return ((workflow: Workflow, node: INode) => {
return {
getBodyData(): IDataObject {
if (additionalData.httpRequest === undefined) {
throw new Error('Request is missing!');
}
return additionalData.httpRequest.body;
},
getCredentials(type: string): ICredentialDataDecryptedObject | undefined {
return getCredentials(workflow, node, type, additionalData);
},
getHeaderData(): object {
if (additionalData.httpRequest === undefined) {
throw new Error('Request is missing!');
}
return additionalData.httpRequest.headers;
},
getMode: (): WorkflowExecuteMode => {
return mode;
},
getNodeParameter: (parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object => { //tslint:disable-line:no-any
const runExecutionData: IRunExecutionData | null = null;
const itemIndex = 0;
const runIndex = 0;
const connectionInputData: INodeExecutionData[] = [];
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getQueryData(): object {
if (additionalData.httpRequest === undefined) {
throw new Error('Request is missing!');
}
return additionalData.httpRequest.query;
},
getRequestObject(): express.Request {
if (additionalData.httpRequest === undefined) {
throw new Error('Request is missing!');
}
return additionalData.httpRequest;
},
getResponseObject(): express.Response {
if (additionalData.httpResponse === undefined) {
throw new Error('Response is missing!');
}
return additionalData.httpResponse;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getWorkflowStaticData(type: string): IDataObject {
return workflow.getStaticData(type, node);
},
prepareOutputData: NodeHelpers.prepareOutputData,
helpers: {
prepareBinaryData,
request: requestPromise,
returnJsonArray,
},
};
})(workflow, node);
}

View File

@@ -0,0 +1,234 @@
import {
ENCRYPTION_KEY_ENV_OVERWRITE,
EXTENSIONS_SUBDIRECTORY,
USER_FOLDER_ENV_OVERWRITE,
USER_SETTINGS_FILE_NAME,
USER_SETTINGS_SUBFOLDER,
IUserSettings,
} from '.';
import * as fs from 'fs';
import * as path from 'path';
import { randomBytes } from 'crypto';
const { promisify } = require('util');
const fsAccess = promisify(fs.access);
const fsReadFile = promisify(fs.readFile);
const fsMkdir = promisify(fs.mkdir);
const fsWriteFile = promisify(fs.writeFile);
let settingsCache: IUserSettings | undefined = undefined;
/**
* Creates the user settings if they do not exist yet
*
* @export
*/
export async function prepareUserSettings(): Promise<IUserSettings> {
const settingsPath = getUserSettingsPath();
let userSettings = await getUserSettings(settingsPath);
if (userSettings !== undefined) {
// Settings already exist, check if they contain the encryptionKey
if (userSettings.encryptionKey !== undefined) {
// Key already exists so return
return userSettings;
}
} else {
userSettings = {};
}
// Settings and/or key do not exist. So generate a new encryption key
userSettings.encryptionKey = randomBytes(24).toString('base64');
console.log(`UserSettings got generated and saved to: ${settingsPath}`);
return writeUserSettings(userSettings, settingsPath);
}
/**
* Returns the encryption key which is used to encrypt
* the credentials.
*
* @export
* @returns
*/
export async function getEncryptionKey() {
if (process.env[ENCRYPTION_KEY_ENV_OVERWRITE] !== undefined) {
return process.env[ENCRYPTION_KEY_ENV_OVERWRITE];
}
const userSettings = await getUserSettings();
if (userSettings === undefined) {
return undefined;
}
if (userSettings.encryptionKey === undefined) {
return undefined;
}
return userSettings.encryptionKey;
}
/**
* Adds/Overwrite the given settings in the currently
* saved user settings
*
* @export
* @param {IUserSettings} addSettings The settings to add/overwrite
* @param {string} [settingsPath] Optional settings file path
* @returns {Promise<IUserSettings>}
*/
export async function addToUserSettings(addSettings: IUserSettings, settingsPath?: string): Promise<IUserSettings> {
if (settingsPath === undefined) {
settingsPath = getUserSettingsPath();
}
let userSettings = await getUserSettings(settingsPath);
if (userSettings === undefined) {
userSettings = {};
}
// Add the settings
Object.assign(userSettings, addSettings);
return writeUserSettings(userSettings, settingsPath);
}
/**
* Writes a user settings file
*
* @export
* @param {IUserSettings} userSettings The settings to write
* @param {string} [settingsPath] Optional settings file path
* @returns {Promise<IUserSettings>}
*/
export async function writeUserSettings(userSettings: IUserSettings, settingsPath?: string): Promise<IUserSettings> {
if (settingsPath === undefined) {
settingsPath = getUserSettingsPath();
}
if (userSettings === undefined) {
userSettings = {};
}
// Check if parent folder exists if not create it.
try {
await fsAccess(path.dirname(settingsPath));
} catch (error) {
// Parent folder does not exist so create
await fsMkdir(path.dirname(settingsPath));
}
await fsWriteFile(settingsPath, JSON.stringify(userSettings, null, '\t'));
settingsCache = JSON.parse(JSON.stringify(userSettings));
return userSettings;
}
/**
* Returns the content of the user settings
*
* @export
* @returns {UserSettings}
*/
export async function getUserSettings(settingsPath?: string, ignoreCache?: boolean): Promise<IUserSettings | undefined> {
if (settingsCache !== undefined && ignoreCache !== true) {
return settingsCache;
}
if (settingsPath === undefined) {
settingsPath = getUserSettingsPath();
}
try {
await fsAccess(settingsPath);
} catch (error) {
// The file does not exist
return undefined;
}
const settingsFile = await fsReadFile(settingsPath, 'utf8');
settingsCache = JSON.parse(settingsFile);
return JSON.parse(settingsFile) as IUserSettings;
}
/**
* Returns the path to the user settings
*
* @export
* @returns {string}
*/
export function getUserSettingsPath(): string {
const n8nFolder = getUserN8nFolderPath();
return path.join(n8nFolder, USER_SETTINGS_FILE_NAME);
}
/**
* Retruns the path to the n8n folder in which all n8n
* related data gets saved
*
* @export
* @returns {string}
*/
export function getUserN8nFolderPath(): string {
let userFolder;
if (process.env[USER_FOLDER_ENV_OVERWRITE] !== undefined) {
userFolder = process.env[USER_FOLDER_ENV_OVERWRITE] as string;
} else {
userFolder = getUserHome();
}
return path.join(userFolder, USER_SETTINGS_SUBFOLDER);
}
/**
* Returns the path to the n8n user folder with the custom
* extensions like nodes and credentials
*
* @export
* @returns {string}
*/
export function getUserN8nFolderCustomExtensionPath(): string {
return path.join(getUserN8nFolderPath(), EXTENSIONS_SUBDIRECTORY);
}
/**
* Returns the home folder path of the user if
* none can be found it falls back to the current
* working directory
*
* @export
* @returns {string}
*/
export function getUserHome(): string {
let variableName = 'HOME';
if (process.platform === 'win32') {
variableName = 'USERPROFILE';
}
if (process.env[variableName] === undefined) {
// If for some reason the variable does not exist
// fall back to current folder
return process.cwd();
}
return process.env[variableName] as string;
}

View File

@@ -0,0 +1,579 @@
import {
IConnection,
IExecuteData,
IExecutionError,
INode,
INodeConnections,
INodeExecutionData,
IRun,
IRunData,
IRunExecutionData,
ITaskData,
ITaskDataConnections,
IWaitingForExecution,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
Workflow,
} from 'n8n-workflow';
import {
ActiveExecutions,
NodeExecuteFunctions,
} from './';
export class WorkflowExecute {
private additionalData: IWorkflowExecuteAdditionalData;
private mode: WorkflowExecuteMode;
private activeExecutions: ActiveExecutions.ActiveExecutions;
private executionId: string | null = null;
constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode) {
this.additionalData = additionalData;
this.activeExecutions = ActiveExecutions.getInstance();
this.mode = mode;
}
/**
* Executes the given workflow.
*
* @param {Workflow} workflow The workflow to execute
* @param {INode[]} [startNodes] Node to start execution from
* @param {string} [destinationNode] Node to stop execution at
* @returns {(Promise<string>)}
* @memberof WorkflowExecute
*/
async run(workflow: Workflow, startNodes?: INode[], destinationNode?: string): Promise<string> {
// Get the nodes to start workflow execution from
startNodes = startNodes || workflow.getStartNodes(destinationNode);
// If a destination node is given we only run the direct parent nodes and no others
let runNodeFilter: string[] | undefined = undefined;
if (destinationNode) {
// TODO: Combine that later with getStartNodes which does more or less the same tree iteration
runNodeFilter = workflow.getParentNodes(destinationNode);
runNodeFilter.push(destinationNode);
}
// Initialize the data of the start nodes
const nodeExecutionStack: IExecuteData[] = [];
startNodes.forEach((node) => {
nodeExecutionStack.push(
{
node,
data: {
main: [
[
{
json: {},
},
],
],
},
},
);
});
const runExecutionData: IRunExecutionData = {
startData: {
destinationNode,
runNodeFilter,
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
return this.runExecutionData(workflow, runExecutionData);
}
/**
* Executes the given workflow but only
*
* @param {Workflow} workflow The workflow to execute
* @param {IRunData} runData
* @param {string[]} startNodes Nodes to start execution from
* @param {string} destinationNode Node to stop execution at
* @returns {(Promise<string>)}
* @memberof WorkflowExecute
*/
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise<string> {
let incomingNodeConnections: INodeConnections | undefined;
let connection: IConnection;
const runIndex = 0;
// Initialize the nodeExecutionStack and waitingExecution with
// the data from runData
const nodeExecutionStack: IExecuteData[] = [];
const waitingExecution: IWaitingForExecution = {};
for (const startNode of startNodes) {
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode];
const incomingData: INodeExecutionData[][] = [];
if (incomingNodeConnections === undefined) {
// If it has no incoming data add the default empty data
incomingData.push([
{
json: {}
}
]);
} else {
// Get the data of the incoming connections
for (const connections of incomingNodeConnections.main) {
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
connection = connections[inputIndex];
incomingData.push(
runData[connection.node!][runIndex].data![connection.type][connection.index]!,
);
}
}
}
const executeData: IExecuteData = {
node: workflow.getNode(startNode) as INode,
data: {
main: incomingData,
}
};
nodeExecutionStack.push(executeData);
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
incomingNodeConnections = workflow.connectionsByDestinationNode[destinationNode];
if (incomingNodeConnections !== undefined) {
for (const connections of incomingNodeConnections.main) {
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
connection = connections[inputIndex];
if (waitingExecution[destinationNode] === undefined) {
waitingExecution[destinationNode] = {};
}
if (waitingExecution[destinationNode][runIndex] === undefined) {
waitingExecution[destinationNode][runIndex] = {};
}
if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) {
waitingExecution[destinationNode][runIndex][connection.type] = [];
}
if (runData[connection.node!] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNode][runIndex][connection.type].push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
} else {
waitingExecution[destinationNode][runIndex][connection.type].push(null);
}
}
}
}
}
// Only run the parent nodes and no others
let runNodeFilter: string[] | undefined = undefined;
runNodeFilter = workflow.getParentNodes(destinationNode);
runNodeFilter.push(destinationNode);
const runExecutionData: IRunExecutionData = {
startData: {
destinationNode,
runNodeFilter,
},
resultData: {
runData,
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution,
},
};
return await this.runExecutionData(workflow, runExecutionData);
}
/**
* Executes the hook with the given name
*
* @param {string} hookName
* @param {any[]} parameters
* @returns {Promise<IRun>}
* @memberof WorkflowExecute
*/
async executeHook(hookName: string, parameters: any[]): Promise<void> { // tslint:disable-line:no-any
if (this.additionalData.hooks === undefined) {
return parameters[0];
}
if (this.additionalData.hooks[hookName] === undefined || this.additionalData.hooks[hookName]!.length === 0) {
return parameters[0];
}
for (const hookFunction of this.additionalData.hooks[hookName]!) {
await hookFunction.apply(this, parameters as [IRun, IWaitingForExecution])
.catch((error) => {
// Catch all errors here because when "executeHook" gets called
// we have the most time no "await" and so the errors would so
// not be uncaught by anything.
// TODO: Add proper logging
console.error(`There was a problem executing hook: "${hookName}"`);
console.error('Parameters:');
console.error(parameters);
console.error('Error:');
console.error(error);
});
}
}
/**
* Runs the given execution data.
*
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @returns {Promise<string>}
* @memberof WorkflowExecute
*/
async runExecutionData(workflow: Workflow, runExecutionData: IRunExecutionData): Promise<string> {
const startedAt = new Date().getTime();
const workflowIssues = workflow.checkReadyForExecution();
if (workflowIssues !== null) {
throw new Error('The workflow has issues and can for that reason not be executed. Please fix them first.');
}
// Variables which hold temporary data for each node-execution
let executionData: IExecuteData;
let executionError: IExecutionError | undefined;
let executionNode: INode;
let nodeSuccessData: INodeExecutionData[][] | null;
let runIndex: number;
let startTime: number;
let taskData: ITaskData;
if (runExecutionData.startData === undefined) {
runExecutionData.startData = {};
}
this.executionId = this.activeExecutions.add(workflow, runExecutionData, this.mode);
this.executeHook('workflowExecuteBefore', [this.executionId]);
let currentExecutionTry = '';
let lastExecutionTry = '';
// Wait for the next tick so that the executionId gets already returned.
// So it can directly be send to the editor-ui and is so aware of the
// executionId when the first push messages arrive.
process.nextTick(() => (async () => {
executionLoop:
while (runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
if (this.activeExecutions.shouldBeStopped(this.executionId!) === true) {
// The execution should be stopped
break;
}
nodeSuccessData = null;
executionError = undefined;
executionData = runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionNode = executionData.node;
this.executeHook('nodeExecuteBefore', [this.executionId, executionNode.name]);
// Get the index of the current run
runIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runIndex = runExecutionData.resultData.runData[executionNode.name].length;
}
currentExecutionTry = `${executionNode.name}:${runIndex}`;
if (currentExecutionTry === lastExecutionTry) {
throw new Error('Did stop execution because execution seems to be in endless loop.');
}
if (runExecutionData.startData!.runNodeFilter !== undefined && runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) {
// If filter is set and node is not on filter skip it, that avoids the problem that it executes
// leafs that are parallel to a selected destinationNode. Normally it would execute them because
// they have the same parent and it executes all child nodes.
continue;
}
// Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) {
let inputConnections: IConnection[][];
let connectionIndex: number;
inputConnections = workflow.connectionsByDestinationNode[executionNode.name]['main'];
for (connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) {
if (workflow.getHighestNode(executionNode.name, 'main', connectionIndex).length === 0) {
// If there is no valid incoming node (if all are disabled)
// then ignore that it has inputs and simply execute it as it is without
// any data
continue;
}
if (!executionData.data!.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) {
// Does not have the data of the connections so add back to stack
runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
}
}
}
// TODO Has to check if node is disabled
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned
// is very slow so only do if needed
startTime = new Date().getTime();
try {
runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
nodeSuccessData = await workflow.runNode(executionData.node, JSON.parse(JSON.stringify(executionData.data)), runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
if (nodeSuccessData === null) {
// If null gets returned it means that the node did succeed
// but did not have any data. So the branch should end
// (meaning the nodes afterwards should not be processed)
continue;
}
} catch (error) {
executionError = {
message: error.message,
stack: error.stack,
};
}
// Add the data to return to the user
// (currently does not get cloned as data does not get changed, maybe later we should do that?!?!)
if (!runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runExecutionData.resultData.runData[executionNode.name] = [];
}
taskData = {
startTime,
executionTime: (new Date().getTime()) - startTime
};
if (executionError !== undefined) {
taskData.error = executionError;
if (executionData.node.continueOnFail === true) {
// Workflow should continue running even if node errors
if (executionData.data.hasOwnProperty('main') && executionData.data.main.length > 0) {
// Simply get the input data of the node if it has any and pass it through
// to the next node
if (executionData.data.main[0] !== null) {
nodeSuccessData = [(JSON.parse(JSON.stringify(executionData.data.main[0])) as INodeExecutionData[])];
}
}
} else {
// Node execution did fail so add error and stop execution
runExecutionData.resultData.runData[executionNode.name].push(taskData);
// Add the execution data again so that it can get restarted
runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]);
break;
}
}
// Node executed successfully. So add data and go on.
taskData.data = ({
'main': nodeSuccessData
} as ITaskDataConnections);
this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]);
runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (runExecutionData.startData && runExecutionData.startData.destinationNode && runExecutionData.startData.destinationNode === executionNode.name) {
// If destination node is defined and got executed stop execution
continue;
}
// Add the nodes to which the current node has an output connection to that they can
// be executed next
if (workflow.connectionsBySourceNode.hasOwnProperty(executionNode.name)) {
if (workflow.connectionsBySourceNode[executionNode.name].hasOwnProperty('main')) {
let outputIndex: string, connectionData: IConnection;
// Go over all the different
for (outputIndex in workflow.connectionsBySourceNode[executionNode.name]['main']) {
if (!workflow.connectionsBySourceNode[executionNode.name]['main'].hasOwnProperty(outputIndex)) {
continue;
}
// Go through all the different outputs of this connection
for (connectionData of workflow.connectionsBySourceNode[executionNode.name]['main'][outputIndex]) {
if (!workflow.nodes.hasOwnProperty(connectionData.node)) {
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
}
let stillDataMissing = false;
// Check if node has multiple inputs as then we have to wait for all input data
// to be present before we can add it to the node-execution-stack
if (workflow.connectionsByDestinationNode[connectionData.node]['main'].length > 1) {
// Node has multiple inputs
// Check if there is already data for the node
if (runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node) && runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] !== undefined) {
// There is already data for the node and the current run so
// add the new data
if (nodeSuccessData === null) {
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
} else {
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
}
// Check if all data exists now
let thisExecutionData: INodeExecutionData[] | null;
let allDataFound = true;
for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
if (thisExecutionData === null) {
allDataFound = false;
break;
}
}
if (allDataFound === true) {
// All data exists for node to be executed
// So add it to the execution stack
runExecutionData.executionData!.nodeExecutionStack.push({
node: workflow.nodes[connectionData.node],
data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]
});
// Remove the data from waiting
delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
// No more data left for the node so also delete that one
delete runExecutionData.executionData!.waitingExecution[connectionData.node];
}
continue;
} else {
stillDataMissing = true;
}
} else {
stillDataMissing = true;
}
}
// Make sure the array has all the values
const connectionDataArray: Array<INodeExecutionData[] | null> = [];
for (let i: number = connectionData.index; i >= 0; i--) {
connectionDataArray[i] = null;
}
// Add the data of the current execution
if (nodeSuccessData === null) {
connectionDataArray[connectionData.index] = null;
} else {
connectionDataArray[connectionData.index] = nodeSuccessData[outputIndex];
}
if (stillDataMissing === true) {
// Additional data is needed to run node so add it to waiting
if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
}
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: connectionDataArray
};
} else {
// All data is there so add it directly to stack
runExecutionData.executionData!.nodeExecutionStack.push({
node: workflow.nodes[connectionData.node],
data: {
main: connectionDataArray
}
});
}
}
}
}
}
}
return Promise.resolve();
})()
.then(async () => {
const fullRunData: IRun = {
data: runExecutionData,
mode: this.mode,
startedAt,
stoppedAt: new Date().getTime(),
};
if (executionError !== undefined) {
fullRunData.data.resultData.error = executionError;
} else {
fullRunData.finished = true;
}
this.activeExecutions.remove(this.executionId!, fullRunData);
await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]);
return fullRunData;
})
.catch(async (error) => {
const fullRunData: IRun = {
data: runExecutionData,
mode: this.mode,
startedAt,
stoppedAt: new Date().getTime(),
};
fullRunData.data.resultData.error = {
message: error.message,
stack: error.stack,
};
this.activeExecutions.remove(this.executionId!, fullRunData);
await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]);
return fullRunData;
}));
return this.executionId;
}
}

View File

@@ -0,0 +1,24 @@
try {
require('source-map-support').install();
} catch (error) {
}
export * from './ActiveWorkflows';
export * from './ActiveWebhooks';
export * from './Constants';
export * from './Credentials';
export * from './DeferredPromise';
export * from './Interfaces';
export * from './LoadNodeParameterOptions';
export * from './NodeExecuteFunctions';
export * from './WorkflowExecute';
import * as ActiveExecutions from './ActiveExecutions';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as UserSettings from './UserSettings';
export {
ActiveExecutions,
NodeExecuteFunctions,
UserSettings,
};