refactor: Setup typescript project references across workflow, core, and cli (#4519)
* refactor: use consistent folder structure across workflow, core, and cli * setup typescript project references across workflow, core, and cli
This commit is contained in:
committed by
GitHub
parent
de96def372
commit
698d96a617
57
packages/cli/src/commands/BaseCommand.ts
Normal file
57
packages/cli/src/commands/BaseCommand.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { Command } from '@oclif/core';
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
import { getLogger, Logger } from '@/Logger';
|
||||
import { User } from '@db/entities/User';
|
||||
import * as Db from '@/Db';
|
||||
|
||||
export abstract class BaseCommand extends Command {
|
||||
logger: Logger;
|
||||
|
||||
/**
|
||||
* Lifecycle methods
|
||||
*/
|
||||
|
||||
async init(): Promise<void> {
|
||||
this.logger = getLogger();
|
||||
LoggerProxy.init(this.logger);
|
||||
|
||||
await Db.init();
|
||||
}
|
||||
|
||||
async finally(): Promise<void> {
|
||||
if (process.env.NODE_ENV === 'test') return;
|
||||
|
||||
this.exit();
|
||||
}
|
||||
|
||||
/**
|
||||
* User Management utils
|
||||
*/
|
||||
|
||||
defaultUserProps = {
|
||||
firstName: null,
|
||||
lastName: null,
|
||||
email: null,
|
||||
password: null,
|
||||
resetPasswordToken: null,
|
||||
};
|
||||
|
||||
async getInstanceOwner(): Promise<User> {
|
||||
const globalRole = await Db.collections.Role.findOneOrFail({
|
||||
name: 'owner',
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
const owner = await Db.collections.User.findOne({ globalRole });
|
||||
|
||||
if (owner) return owner;
|
||||
|
||||
const user = new User();
|
||||
|
||||
Object.assign(user, { ...this.defaultUserProps, globalRole });
|
||||
|
||||
await Db.collections.User.save(user);
|
||||
|
||||
return Db.collections.User.findOneOrFail({ globalRole });
|
||||
}
|
||||
}
|
||||
66
packages/cli/src/commands/Interfaces.d.ts
vendored
Normal file
66
packages/cli/src/commands/Interfaces.d.ts
vendored
Normal file
@@ -0,0 +1,66 @@
|
||||
interface IResult {
|
||||
totalWorkflows: number;
|
||||
summary: {
|
||||
failedExecutions: number;
|
||||
successfulExecutions: number;
|
||||
warningExecutions: number;
|
||||
errors: IExecutionError[];
|
||||
warnings: IExecutionError[];
|
||||
};
|
||||
coveredNodes: {
|
||||
[nodeType: string]: number;
|
||||
};
|
||||
executions: IExecutionResult[];
|
||||
}
|
||||
interface IExecutionResult {
|
||||
workflowId: string | number;
|
||||
workflowName: string;
|
||||
executionTime: number; // Given in seconds with decimals for milliseconds
|
||||
finished: boolean;
|
||||
executionStatus: ExecutionStatus;
|
||||
error?: string;
|
||||
changes?: string;
|
||||
coveredNodes: {
|
||||
[nodeType: string]: number;
|
||||
};
|
||||
}
|
||||
|
||||
interface IExecutionError {
|
||||
workflowId: string | number;
|
||||
error: string;
|
||||
}
|
||||
|
||||
interface IWorkflowExecutionProgress {
|
||||
workflowId: string | number;
|
||||
status: ExecutionStatus;
|
||||
}
|
||||
|
||||
interface INodeSpecialCases {
|
||||
[nodeName: string]: INodeSpecialCase;
|
||||
}
|
||||
|
||||
interface INodeSpecialCase {
|
||||
ignoredProperties?: string[];
|
||||
capResults?: number;
|
||||
keepOnlyProperties?: string[];
|
||||
}
|
||||
|
||||
type ExecutionStatus = 'success' | 'error' | 'warning' | 'running';
|
||||
|
||||
declare module 'json-diff' {
|
||||
interface IDiffOptions {
|
||||
keysOnly?: boolean;
|
||||
}
|
||||
export function diff(obj1: unknown, obj2: unknown, diffOptions: IDiffOptions): string;
|
||||
}
|
||||
|
||||
type SmtpConfig = {
|
||||
host: string;
|
||||
port: number;
|
||||
secure: boolean;
|
||||
auth: {
|
||||
user: string;
|
||||
pass: string;
|
||||
};
|
||||
sender: string;
|
||||
};
|
||||
62
packages/cli/src/commands/db/revert.ts
Normal file
62
packages/cli/src/commands/db/revert.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import { getLogger } from '@/Logger';
|
||||
|
||||
import * as Db from '@/Db';
|
||||
|
||||
export class DbRevertMigrationCommand extends Command {
|
||||
static description = 'Revert last database migration';
|
||||
|
||||
static examples = ['$ n8n db:revert'];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow, @typescript-eslint/no-unused-vars
|
||||
const { flags } = this.parse(DbRevertMigrationCommand);
|
||||
|
||||
let connection: Connection | undefined;
|
||||
try {
|
||||
await Db.init();
|
||||
connection = Db.collections.Credentials.manager.connection;
|
||||
|
||||
if (!connection) {
|
||||
throw new Error(`No database connection available.`);
|
||||
}
|
||||
|
||||
const connectionOptions: ConnectionOptions = Object.assign(connection.options, {
|
||||
subscribers: [],
|
||||
synchronize: false,
|
||||
migrationsRun: false,
|
||||
dropSchema: false,
|
||||
logging: ['query', 'error', 'schema'],
|
||||
});
|
||||
|
||||
// close connection in order to reconnect with updated options
|
||||
await connection.close();
|
||||
connection = await createConnection(connectionOptions);
|
||||
|
||||
await connection.undoLastMigration();
|
||||
await connection.close();
|
||||
} catch (error) {
|
||||
if (connection) await connection.close();
|
||||
|
||||
console.error('Error reverting last migration. See log messages for details.');
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||
logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
|
||||
this.exit();
|
||||
}
|
||||
}
|
||||
199
packages/cli/src/commands/execute.ts
Normal file
199
packages/cli/src/commands/execute.ts
Normal file
@@ -0,0 +1,199 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { promises as fs } from 'fs';
|
||||
import { Command, flags } from '@oclif/command';
|
||||
import { BinaryDataManager, UserSettings, PLACEHOLDER_EMPTY_WORKFLOW_ID } from 'n8n-core';
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import * as ActiveExecutions from '@/ActiveExecutions';
|
||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||
import { CredentialTypes } from '@/CredentialTypes';
|
||||
import * as Db from '@/Db';
|
||||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { InternalHooksManager } from '@/InternalHooksManager';
|
||||
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||
import { IWorkflowBase, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
||||
import { getLogger } from '@/Logger';
|
||||
import config from '@/config';
|
||||
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
|
||||
import { findCliWorkflowStart } from '@/utils';
|
||||
|
||||
export class Execute extends Command {
|
||||
static description = '\nExecutes a given workflow';
|
||||
|
||||
static examples = [`$ n8n execute --id=5`, `$ n8n execute --file=workflow.json`];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
file: flags.string({
|
||||
description: 'path to a workflow file to execute',
|
||||
}),
|
||||
id: flags.string({
|
||||
description: 'id of the workflow to execute',
|
||||
}),
|
||||
rawOutput: flags.boolean({
|
||||
description: 'Outputs only JSON data, with no other text',
|
||||
}),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||
await BinaryDataManager.init(binaryDataConfig, true);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(Execute);
|
||||
|
||||
// Start directly with the init of the database to improve startup time
|
||||
const startDbInitPromise = Db.init();
|
||||
|
||||
// Load all node and credential types
|
||||
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
||||
const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init();
|
||||
|
||||
if (!flags.id && !flags.file) {
|
||||
console.info(`Either option "--id" or "--file" have to be set!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.id && flags.file) {
|
||||
console.info(`Either "id" or "file" can be set never both!`);
|
||||
return;
|
||||
}
|
||||
|
||||
let workflowId: string | undefined;
|
||||
let workflowData: IWorkflowBase | undefined;
|
||||
if (flags.file) {
|
||||
// Path to workflow is given
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
workflowData = JSON.parse(await fs.readFile(flags.file, 'utf8'));
|
||||
} catch (error) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
if (error.code === 'ENOENT') {
|
||||
console.info(`The file "${flags.file}" could not be found.`);
|
||||
return;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Do a basic check if the data in the file looks right
|
||||
// TODO: Later check with the help of TypeScript data if it is valid or not
|
||||
if (
|
||||
workflowData === undefined ||
|
||||
workflowData.nodes === undefined ||
|
||||
workflowData.connections === undefined
|
||||
) {
|
||||
console.info(`The file "${flags.file}" does not contain valid workflow data.`);
|
||||
return;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
workflowId = workflowData.id ? workflowData.id.toString() : PLACEHOLDER_EMPTY_WORKFLOW_ID;
|
||||
}
|
||||
|
||||
// Wait till the database is ready
|
||||
await startDbInitPromise;
|
||||
|
||||
if (flags.id) {
|
||||
// Id of workflow is given
|
||||
workflowId = flags.id;
|
||||
workflowData = await Db.collections.Workflow.findOne(workflowId);
|
||||
if (workflowData === undefined) {
|
||||
console.info(`The workflow with the id "${workflowId}" does not exist.`);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
if (!workflowData) {
|
||||
throw new Error('Failed to retrieve workflow data for requested workflow');
|
||||
}
|
||||
|
||||
// Make sure the settings exist
|
||||
await UserSettings.prepareUserSettings();
|
||||
|
||||
// Wait till the n8n-packages have been read
|
||||
await loadNodesAndCredentialsPromise;
|
||||
|
||||
// Load the credentials overwrites if any exist
|
||||
const credentialsOverwrites = CredentialsOverwrites();
|
||||
await credentialsOverwrites.init();
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
// Add the found types to an instance other parts of the application can use
|
||||
const nodeTypes = NodeTypes();
|
||||
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
||||
const credentialTypes = CredentialTypes();
|
||||
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
||||
|
||||
const instanceId = await UserSettings.getInstanceId();
|
||||
const { cli } = await GenericHelpers.getVersions();
|
||||
InternalHooksManager.init(instanceId, cli, nodeTypes);
|
||||
|
||||
if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) {
|
||||
workflowId = undefined;
|
||||
}
|
||||
|
||||
try {
|
||||
const startingNode = findCliWorkflowStart(workflowData.nodes);
|
||||
|
||||
const user = await getInstanceOwner();
|
||||
const runData: IWorkflowExecutionDataProcess = {
|
||||
executionMode: 'cli',
|
||||
startNodes: [startingNode.name],
|
||||
workflowData,
|
||||
userId: user.id,
|
||||
};
|
||||
|
||||
const workflowRunner = new WorkflowRunner();
|
||||
const executionId = await workflowRunner.run(runData);
|
||||
|
||||
const activeExecutions = ActiveExecutions.getInstance();
|
||||
const data = await activeExecutions.getPostExecutePromise(executionId);
|
||||
|
||||
if (data === undefined) {
|
||||
throw new Error('Workflow did not return any data!');
|
||||
}
|
||||
|
||||
if (data.data.resultData.error) {
|
||||
console.info('Execution was NOT successful. See log message for details.');
|
||||
logger.info('Execution error:');
|
||||
logger.info('====================================');
|
||||
logger.info(JSON.stringify(data, null, 2));
|
||||
|
||||
const { error } = data.data.resultData;
|
||||
// eslint-disable-next-line @typescript-eslint/no-throw-literal
|
||||
throw {
|
||||
...error,
|
||||
stack: error.stack,
|
||||
};
|
||||
}
|
||||
if (flags.rawOutput === undefined) {
|
||||
this.log('Execution was successful:');
|
||||
this.log('====================================');
|
||||
}
|
||||
this.log(JSON.stringify(data, null, 2));
|
||||
} catch (e) {
|
||||
console.error('Error executing workflow. See log messages for details.');
|
||||
logger.error('\nExecution error:');
|
||||
logger.info('====================================');
|
||||
logger.error(e.message);
|
||||
if (e.description) logger.error(e.description);
|
||||
logger.error(e.stack);
|
||||
this.exit(1);
|
||||
}
|
||||
|
||||
this.exit();
|
||||
}
|
||||
}
|
||||
866
packages/cli/src/commands/executeBatch.ts
Normal file
866
packages/cli/src/commands/executeBatch.ts
Normal file
@@ -0,0 +1,866 @@
|
||||
/* eslint-disable @typescript-eslint/prefer-optional-chain */
|
||||
/* eslint-disable array-callback-return */
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
/* eslint-disable no-await-in-loop */
|
||||
/* eslint-disable no-async-promise-executor */
|
||||
/* eslint-disable no-param-reassign */
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
/* eslint-disable no-console */
|
||||
import fs from 'fs';
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { BinaryDataManager, UserSettings } from 'n8n-core';
|
||||
|
||||
import { ITaskData, LoggerProxy, sleep } from 'n8n-workflow';
|
||||
|
||||
import { sep } from 'path';
|
||||
|
||||
import { diff } from 'json-diff';
|
||||
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies
|
||||
import { pick } from 'lodash';
|
||||
import { getLogger } from '@/Logger';
|
||||
|
||||
import * as ActiveExecutions from '@/ActiveExecutions';
|
||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||
import { CredentialTypes } from '@/CredentialTypes';
|
||||
import * as Db from '@/Db';
|
||||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { InternalHooksManager } from '@/InternalHooksManager';
|
||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||
import { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
||||
import config from '@/config';
|
||||
import { User } from '@db/entities/User';
|
||||
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
|
||||
import { findCliWorkflowStart } from '@/utils';
|
||||
|
||||
export class ExecuteBatch extends Command {
|
||||
static description = '\nExecutes multiple workflows once';
|
||||
|
||||
static cancelled = false;
|
||||
|
||||
static workflowExecutionsProgress: IWorkflowExecutionProgress[][];
|
||||
|
||||
static shallow = false;
|
||||
|
||||
static compare: string;
|
||||
|
||||
static snapshot: string;
|
||||
|
||||
static concurrency = 1;
|
||||
|
||||
static debug = false;
|
||||
|
||||
static executionTimeout = 3 * 60 * 1000;
|
||||
|
||||
static instanceOwner: User;
|
||||
|
||||
static examples = [
|
||||
`$ n8n executeBatch`,
|
||||
`$ n8n executeBatch --concurrency=10 --skipList=/data/skipList.txt`,
|
||||
`$ n8n executeBatch --debug --output=/data/output.json`,
|
||||
`$ n8n executeBatch --ids=10,13,15 --shortOutput`,
|
||||
`$ n8n executeBatch --snapshot=/data/snapshots --shallow`,
|
||||
`$ n8n executeBatch --compare=/data/previousExecutionData --retries=2`,
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
debug: flags.boolean({
|
||||
description: 'Toggles on displaying all errors and debug messages.',
|
||||
}),
|
||||
ids: flags.string({
|
||||
description:
|
||||
'Specifies workflow IDs to get executed, separated by a comma or a file containing the ids',
|
||||
}),
|
||||
concurrency: flags.integer({
|
||||
default: 1,
|
||||
description:
|
||||
'How many workflows can run in parallel. Defaults to 1 which means no concurrency.',
|
||||
}),
|
||||
output: flags.string({
|
||||
description:
|
||||
'Enable execution saving, You must inform an existing folder to save execution via this param',
|
||||
}),
|
||||
snapshot: flags.string({
|
||||
description:
|
||||
'Enables snapshot saving. You must inform an existing folder to save snapshots via this param.',
|
||||
}),
|
||||
compare: flags.string({
|
||||
description:
|
||||
'Compares current execution with an existing snapshot. You must inform an existing folder where the snapshots are saved.',
|
||||
}),
|
||||
shallow: flags.boolean({
|
||||
description:
|
||||
'Compares only if attributes output from node are the same, with no regards to nested JSON objects.',
|
||||
}),
|
||||
skipList: flags.string({
|
||||
description: 'File containing a comma separated list of workflow IDs to skip.',
|
||||
}),
|
||||
retries: flags.integer({
|
||||
description: 'Retries failed workflows up to N tries. Default is 1. Set 0 to disable.',
|
||||
default: 1,
|
||||
}),
|
||||
shortOutput: flags.boolean({
|
||||
description: 'Omits the full execution information from output, displaying only summary.',
|
||||
}),
|
||||
};
|
||||
|
||||
/**
|
||||
* Gracefully handles exit.
|
||||
* @param {boolean} skipExit Whether to skip exit or number according to received signal
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
static async stopProcess(skipExit: boolean | number = false) {
|
||||
if (ExecuteBatch.cancelled) {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
ExecuteBatch.cancelled = true;
|
||||
const activeExecutionsInstance = ActiveExecutions.getInstance();
|
||||
const stopPromises = activeExecutionsInstance.getActiveExecutions().map(async (execution) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
activeExecutionsInstance.stopExecution(execution.id);
|
||||
});
|
||||
|
||||
await Promise.allSettled(stopPromises);
|
||||
|
||||
setTimeout(() => {
|
||||
process.exit(0);
|
||||
}, 30000);
|
||||
|
||||
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
|
||||
let count = 0;
|
||||
while (executingWorkflows.length !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
|
||||
executingWorkflows.map((execution) => {
|
||||
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await sleep(500);
|
||||
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
}
|
||||
// We may receive true but when called from `process.on`
|
||||
// we get the signal (SIGINT, etc.)
|
||||
if (skipExit !== true) {
|
||||
process.exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
formatJsonOutput(data: object) {
|
||||
return JSON.stringify(data, null, 2);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
shouldBeConsideredAsWarning(errorMessage: string) {
|
||||
const warningStrings = [
|
||||
'refresh token is invalid',
|
||||
'unable to connect to',
|
||||
'econnreset',
|
||||
'429',
|
||||
'econnrefused',
|
||||
'missing a required parameter',
|
||||
'insufficient credit balance',
|
||||
'request timed out',
|
||||
'status code 401',
|
||||
];
|
||||
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
errorMessage = errorMessage.toLowerCase();
|
||||
|
||||
for (let i = 0; i < warningStrings.length; i++) {
|
||||
if (errorMessage.includes(warningStrings[i])) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
process.once('SIGTERM', ExecuteBatch.stopProcess);
|
||||
process.once('SIGINT', ExecuteBatch.stopProcess);
|
||||
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||
await BinaryDataManager.init(binaryDataConfig, true);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(ExecuteBatch);
|
||||
|
||||
ExecuteBatch.debug = flags.debug;
|
||||
ExecuteBatch.concurrency = flags.concurrency || 1;
|
||||
|
||||
const ids: number[] = [];
|
||||
const skipIds: number[] = [];
|
||||
|
||||
if (flags.snapshot !== undefined) {
|
||||
if (fs.existsSync(flags.snapshot)) {
|
||||
if (!fs.lstatSync(flags.snapshot).isDirectory()) {
|
||||
console.log(`The parameter --snapshot must be an existing directory`);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
console.log(`The parameter --snapshot must be an existing directory`);
|
||||
return;
|
||||
}
|
||||
|
||||
ExecuteBatch.snapshot = flags.snapshot;
|
||||
}
|
||||
if (flags.compare !== undefined) {
|
||||
if (fs.existsSync(flags.compare)) {
|
||||
if (!fs.lstatSync(flags.compare).isDirectory()) {
|
||||
console.log(`The parameter --compare must be an existing directory`);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
console.log(`The parameter --compare must be an existing directory`);
|
||||
return;
|
||||
}
|
||||
|
||||
ExecuteBatch.compare = flags.compare;
|
||||
}
|
||||
|
||||
if (flags.output !== undefined) {
|
||||
if (fs.existsSync(flags.output)) {
|
||||
if (fs.lstatSync(flags.output).isDirectory()) {
|
||||
console.log(`The parameter --output must be a writable file`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (flags.ids !== undefined) {
|
||||
if (fs.existsSync(flags.ids)) {
|
||||
const contents = fs.readFileSync(flags.ids, { encoding: 'utf-8' });
|
||||
ids.push(...contents.split(',').map((id) => parseInt(id.trim(), 10)));
|
||||
} else {
|
||||
const paramIds = flags.ids.split(',');
|
||||
const re = /\d+/;
|
||||
const matchedIds = paramIds
|
||||
.filter((id) => re.exec(id))
|
||||
.map((id) => parseInt(id.trim(), 10));
|
||||
|
||||
if (matchedIds.length === 0) {
|
||||
console.log(
|
||||
`The parameter --ids must be a list of numeric IDs separated by a comma or a file with this content.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
ids.push(...matchedIds);
|
||||
}
|
||||
}
|
||||
|
||||
if (flags.skipList !== undefined) {
|
||||
if (fs.existsSync(flags.skipList)) {
|
||||
const contents = fs.readFileSync(flags.skipList, { encoding: 'utf-8' });
|
||||
skipIds.push(...contents.split(',').map((id) => parseInt(id.trim(), 10)));
|
||||
} else {
|
||||
console.log('Skip list file not found. Exiting.');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (flags.shallow) {
|
||||
ExecuteBatch.shallow = true;
|
||||
}
|
||||
|
||||
// Start directly with the init of the database to improve startup time
|
||||
const startDbInitPromise = Db.init();
|
||||
|
||||
// Load all node and credential types
|
||||
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
||||
const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init();
|
||||
|
||||
// Make sure the settings exist
|
||||
await UserSettings.prepareUserSettings();
|
||||
|
||||
// Wait till the database is ready
|
||||
await startDbInitPromise;
|
||||
|
||||
ExecuteBatch.instanceOwner = await getInstanceOwner();
|
||||
|
||||
let allWorkflows;
|
||||
|
||||
const query = Db.collections.Workflow.createQueryBuilder('workflows');
|
||||
|
||||
if (ids.length > 0) {
|
||||
query.andWhere(`workflows.id in (:...ids)`, { ids });
|
||||
}
|
||||
|
||||
if (skipIds.length > 0) {
|
||||
query.andWhere(`workflows.id not in (:...skipIds)`, { skipIds });
|
||||
}
|
||||
|
||||
// eslint-disable-next-line prefer-const
|
||||
allWorkflows = (await query.getMany()) as IWorkflowDb[];
|
||||
|
||||
if (ExecuteBatch.debug) {
|
||||
process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`);
|
||||
}
|
||||
|
||||
// Wait till the n8n-packages have been read
|
||||
await loadNodesAndCredentialsPromise;
|
||||
|
||||
// Load the credentials overwrites if any exist
|
||||
await CredentialsOverwrites().init();
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
// Add the found types to an instance other parts of the application can use
|
||||
const nodeTypes = NodeTypes();
|
||||
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
||||
const credentialTypes = CredentialTypes();
|
||||
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
||||
|
||||
const instanceId = await UserSettings.getInstanceId();
|
||||
const { cli } = await GenericHelpers.getVersions();
|
||||
InternalHooksManager.init(instanceId, cli, nodeTypes);
|
||||
|
||||
// Send a shallow copy of allWorkflows so we still have all workflow data.
|
||||
const results = await this.runTests([...allWorkflows]);
|
||||
|
||||
let { retries } = flags;
|
||||
|
||||
while (
|
||||
retries > 0 &&
|
||||
results.summary.warningExecutions + results.summary.failedExecutions > 0 &&
|
||||
!ExecuteBatch.cancelled
|
||||
) {
|
||||
const failedWorkflowIds = results.summary.errors.map((execution) => execution.workflowId);
|
||||
failedWorkflowIds.push(...results.summary.warnings.map((execution) => execution.workflowId));
|
||||
|
||||
const newWorkflowList = allWorkflows.filter((workflow) =>
|
||||
failedWorkflowIds.includes(workflow.id),
|
||||
);
|
||||
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const retryResults = await this.runTests(newWorkflowList);
|
||||
|
||||
this.mergeResults(results, retryResults);
|
||||
// By now, `results` has been updated with the new successful executions.
|
||||
retries--;
|
||||
}
|
||||
|
||||
if (flags.output !== undefined) {
|
||||
fs.writeFileSync(flags.output, this.formatJsonOutput(results));
|
||||
console.log('\nExecution finished.');
|
||||
console.log('Summary:');
|
||||
console.log(`\tSuccess: ${results.summary.successfulExecutions}`);
|
||||
console.log(`\tFailures: ${results.summary.failedExecutions}`);
|
||||
console.log(`\tWarnings: ${results.summary.warningExecutions}`);
|
||||
console.log('\nNodes successfully tested:');
|
||||
Object.entries(results.coveredNodes).forEach(([nodeName, nodeCount]) => {
|
||||
console.log(`\t${nodeName}: ${nodeCount}`);
|
||||
});
|
||||
console.log('\nCheck the JSON file for more details.');
|
||||
} else if (flags.shortOutput) {
|
||||
console.log(
|
||||
this.formatJsonOutput({
|
||||
...results,
|
||||
executions: results.executions.filter(
|
||||
(execution) => execution.executionStatus !== 'success',
|
||||
),
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
console.log(this.formatJsonOutput(results));
|
||||
}
|
||||
|
||||
await ExecuteBatch.stopProcess(true);
|
||||
|
||||
if (results.summary.failedExecutions > 0) {
|
||||
this.exit(1);
|
||||
}
|
||||
this.exit(0);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
mergeResults(results: IResult, retryResults: IResult) {
|
||||
if (retryResults.summary.successfulExecutions === 0) {
|
||||
// Nothing to replace.
|
||||
return;
|
||||
}
|
||||
|
||||
// Find successful executions and replace them on previous result.
|
||||
retryResults.executions.forEach((newExecution) => {
|
||||
if (newExecution.executionStatus === 'success') {
|
||||
// Remove previous execution from list.
|
||||
results.executions = results.executions.filter(
|
||||
(previousExecutions) => previousExecutions.workflowId !== newExecution.workflowId,
|
||||
);
|
||||
|
||||
const errorIndex = results.summary.errors.findIndex(
|
||||
(summaryInformation) => summaryInformation.workflowId === newExecution.workflowId,
|
||||
);
|
||||
if (errorIndex !== -1) {
|
||||
// This workflow errored previously. Decrement error count.
|
||||
results.summary.failedExecutions--;
|
||||
// Remove from the list of errors.
|
||||
results.summary.errors.splice(errorIndex, 1);
|
||||
}
|
||||
|
||||
const warningIndex = results.summary.warnings.findIndex(
|
||||
(summaryInformation) => summaryInformation.workflowId === newExecution.workflowId,
|
||||
);
|
||||
if (warningIndex !== -1) {
|
||||
// This workflow errored previously. Decrement error count.
|
||||
results.summary.warningExecutions--;
|
||||
// Remove from the list of errors.
|
||||
results.summary.warnings.splice(warningIndex, 1);
|
||||
}
|
||||
// Increment successful executions count and push it to all executions array.
|
||||
results.summary.successfulExecutions++;
|
||||
results.executions.push(newExecution);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async runTests(allWorkflows: IWorkflowDb[]): Promise<IResult> {
|
||||
const result: IResult = {
|
||||
totalWorkflows: allWorkflows.length,
|
||||
summary: {
|
||||
failedExecutions: 0,
|
||||
warningExecutions: 0,
|
||||
successfulExecutions: 0,
|
||||
errors: [],
|
||||
warnings: [],
|
||||
},
|
||||
coveredNodes: {},
|
||||
executions: [],
|
||||
};
|
||||
|
||||
if (ExecuteBatch.debug) {
|
||||
this.initializeLogs();
|
||||
}
|
||||
|
||||
return new Promise(async (res) => {
|
||||
const promisesArray = [];
|
||||
for (let i = 0; i < ExecuteBatch.concurrency; i++) {
|
||||
const promise = new Promise(async (resolve) => {
|
||||
let workflow: IWorkflowDb | undefined;
|
||||
while (allWorkflows.length > 0) {
|
||||
workflow = allWorkflows.shift();
|
||||
if (ExecuteBatch.cancelled) {
|
||||
process.stdout.write(`Thread ${i + 1} resolving and quitting.`);
|
||||
resolve(true);
|
||||
break;
|
||||
}
|
||||
// This if shouldn't be really needed
|
||||
// but it's a concurrency precaution.
|
||||
if (workflow === undefined) {
|
||||
resolve(true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ExecuteBatch.debug) {
|
||||
ExecuteBatch.workflowExecutionsProgress[i].push({
|
||||
workflowId: workflow.id,
|
||||
status: 'running',
|
||||
});
|
||||
this.updateStatus();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-loop-func
|
||||
await this.startThread(workflow).then((executionResult) => {
|
||||
if (ExecuteBatch.debug) {
|
||||
ExecuteBatch.workflowExecutionsProgress[i].pop();
|
||||
}
|
||||
result.executions.push(executionResult);
|
||||
if (executionResult.executionStatus === 'success') {
|
||||
if (ExecuteBatch.debug) {
|
||||
ExecuteBatch.workflowExecutionsProgress[i].push({
|
||||
workflowId: workflow!.id,
|
||||
status: 'success',
|
||||
});
|
||||
this.updateStatus();
|
||||
}
|
||||
result.summary.successfulExecutions++;
|
||||
const nodeNames = Object.keys(executionResult.coveredNodes);
|
||||
|
||||
nodeNames.map((nodeName) => {
|
||||
if (result.coveredNodes[nodeName] === undefined) {
|
||||
result.coveredNodes[nodeName] = 0;
|
||||
}
|
||||
result.coveredNodes[nodeName] += executionResult.coveredNodes[nodeName];
|
||||
});
|
||||
} else if (executionResult.executionStatus === 'warning') {
|
||||
result.summary.warningExecutions++;
|
||||
result.summary.warnings.push({
|
||||
workflowId: executionResult.workflowId,
|
||||
error: executionResult.error!,
|
||||
});
|
||||
if (ExecuteBatch.debug) {
|
||||
ExecuteBatch.workflowExecutionsProgress[i].push({
|
||||
workflowId: workflow!.id,
|
||||
status: 'warning',
|
||||
});
|
||||
this.updateStatus();
|
||||
}
|
||||
} else if (executionResult.executionStatus === 'error') {
|
||||
result.summary.failedExecutions++;
|
||||
result.summary.errors.push({
|
||||
workflowId: executionResult.workflowId,
|
||||
error: executionResult.error!,
|
||||
});
|
||||
if (ExecuteBatch.debug) {
|
||||
ExecuteBatch.workflowExecutionsProgress[i].push({
|
||||
workflowId: workflow!.id,
|
||||
status: 'error',
|
||||
});
|
||||
this.updateStatus();
|
||||
}
|
||||
} else {
|
||||
throw new Error('Wrong execution status - cannot proceed');
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
resolve(true);
|
||||
});
|
||||
|
||||
promisesArray.push(promise);
|
||||
}
|
||||
|
||||
await Promise.allSettled(promisesArray);
|
||||
|
||||
res(result);
|
||||
});
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
updateStatus() {
|
||||
if (ExecuteBatch.cancelled) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.stdout.isTTY) {
|
||||
process.stdout.moveCursor(0, -ExecuteBatch.concurrency);
|
||||
process.stdout.cursorTo(0);
|
||||
process.stdout.clearLine(0);
|
||||
}
|
||||
|
||||
ExecuteBatch.workflowExecutionsProgress.map((concurrentThread, index) => {
|
||||
let message = `${index + 1}: `;
|
||||
concurrentThread.map((executionItem, workflowIndex) => {
|
||||
let openColor = '\x1b[0m';
|
||||
const closeColor = '\x1b[0m';
|
||||
switch (executionItem.status) {
|
||||
case 'success':
|
||||
openColor = '\x1b[32m';
|
||||
break;
|
||||
case 'error':
|
||||
openColor = '\x1b[31m';
|
||||
break;
|
||||
case 'warning':
|
||||
openColor = '\x1b[33m';
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
message += `${workflowIndex > 0 ? ', ' : ''}${openColor}${
|
||||
executionItem.workflowId
|
||||
}${closeColor}`;
|
||||
});
|
||||
if (process.stdout.isTTY) {
|
||||
process.stdout.cursorTo(0);
|
||||
process.stdout.clearLine(0);
|
||||
}
|
||||
process.stdout.write(`${message}\n`);
|
||||
});
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
initializeLogs() {
|
||||
process.stdout.write('**********************************************\n');
|
||||
process.stdout.write(' n8n test workflows\n');
|
||||
process.stdout.write('**********************************************\n');
|
||||
process.stdout.write('\n');
|
||||
process.stdout.write('Batch number:\n');
|
||||
ExecuteBatch.workflowExecutionsProgress = [];
|
||||
for (let i = 0; i < ExecuteBatch.concurrency; i++) {
|
||||
ExecuteBatch.workflowExecutionsProgress.push([]);
|
||||
process.stdout.write(`${i + 1}: \n`);
|
||||
}
|
||||
}
|
||||
|
||||
async startThread(workflowData: IWorkflowDb): Promise<IExecutionResult> {
|
||||
// This will be the object returned by the promise.
|
||||
// It will be updated according to execution progress below.
|
||||
const executionResult: IExecutionResult = {
|
||||
workflowId: workflowData.id,
|
||||
workflowName: workflowData.name,
|
||||
executionTime: 0,
|
||||
finished: false,
|
||||
executionStatus: 'running',
|
||||
coveredNodes: {},
|
||||
};
|
||||
|
||||
// We have a cool feature here.
|
||||
// On each node, on the Settings tab in the node editor you can change
|
||||
// the `Notes` field to add special cases for comparison and snapshots.
|
||||
// You need to set one configuration per line with the following possible keys:
|
||||
// CAP_RESULTS_LENGTH=x where x is a number. Cap the number of rows from this node to x.
|
||||
// This means if you set CAP_RESULTS_LENGTH=1 we will have only 1 row in the output
|
||||
// IGNORED_PROPERTIES=x,y,z where x, y and z are JSON property names. Removes these
|
||||
// properties from the JSON object (useful for optional properties that can
|
||||
// cause the comparison to detect changes when not true).
|
||||
const nodeEdgeCases = {} as INodeSpecialCases;
|
||||
workflowData.nodes.forEach((node) => {
|
||||
executionResult.coveredNodes[node.type] = (executionResult.coveredNodes[node.type] || 0) + 1;
|
||||
if (node.notes !== undefined && node.notes !== '') {
|
||||
node.notes.split('\n').forEach((note) => {
|
||||
const parts = note.split('=');
|
||||
if (parts.length === 2) {
|
||||
if (nodeEdgeCases[node.name] === undefined) {
|
||||
nodeEdgeCases[node.name] = {} as INodeSpecialCase;
|
||||
}
|
||||
if (parts[0] === 'CAP_RESULTS_LENGTH') {
|
||||
nodeEdgeCases[node.name].capResults = parseInt(parts[1], 10);
|
||||
} else if (parts[0] === 'IGNORED_PROPERTIES') {
|
||||
nodeEdgeCases[node.name].ignoredProperties = parts[1]
|
||||
.split(',')
|
||||
.map((property) => property.trim());
|
||||
} else if (parts[0] === 'KEEP_ONLY_PROPERTIES') {
|
||||
nodeEdgeCases[node.name].keepOnlyProperties = parts[1]
|
||||
.split(',')
|
||||
.map((property) => property.trim());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return new Promise(async (resolve) => {
|
||||
let gotCancel = false;
|
||||
|
||||
// Timeouts execution after 5 minutes.
|
||||
const timeoutTimer = setTimeout(() => {
|
||||
gotCancel = true;
|
||||
executionResult.error = 'Workflow execution timed out.';
|
||||
executionResult.executionStatus = 'warning';
|
||||
resolve(executionResult);
|
||||
}, ExecuteBatch.executionTimeout);
|
||||
|
||||
try {
|
||||
const startingNode = findCliWorkflowStart(workflowData.nodes);
|
||||
|
||||
const runData: IWorkflowExecutionDataProcess = {
|
||||
executionMode: 'cli',
|
||||
startNodes: [startingNode.name],
|
||||
workflowData,
|
||||
userId: ExecuteBatch.instanceOwner.id,
|
||||
};
|
||||
|
||||
const workflowRunner = new WorkflowRunner();
|
||||
const executionId = await workflowRunner.run(runData);
|
||||
|
||||
const activeExecutions = ActiveExecutions.getInstance();
|
||||
const data = await activeExecutions.getPostExecutePromise(executionId);
|
||||
if (gotCancel || ExecuteBatch.cancelled) {
|
||||
clearTimeout(timeoutTimer);
|
||||
// The promise was settled already so we simply ignore.
|
||||
return;
|
||||
}
|
||||
|
||||
if (data === undefined) {
|
||||
executionResult.error = 'Workflow did not return any data.';
|
||||
executionResult.executionStatus = 'error';
|
||||
} else {
|
||||
executionResult.executionTime =
|
||||
(Date.parse(data.stoppedAt as unknown as string) -
|
||||
Date.parse(data.startedAt as unknown as string)) /
|
||||
1000;
|
||||
executionResult.finished = data?.finished !== undefined;
|
||||
|
||||
if (data.data.resultData.error) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, no-prototype-builtins
|
||||
executionResult.error = data.data.resultData.error.hasOwnProperty('description')
|
||||
? // @ts-ignore
|
||||
data.data.resultData.error.description
|
||||
: data.data.resultData.error.message;
|
||||
if (data.data.resultData.lastNodeExecuted !== undefined) {
|
||||
executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`;
|
||||
}
|
||||
executionResult.executionStatus = 'error';
|
||||
|
||||
if (this.shouldBeConsideredAsWarning(executionResult.error || '')) {
|
||||
executionResult.executionStatus = 'warning';
|
||||
}
|
||||
} else {
|
||||
if (ExecuteBatch.shallow) {
|
||||
// What this does is guarantee that top-level attributes
|
||||
// from the JSON are kept and the are the same type.
|
||||
|
||||
// We convert nested JSON objects to a simple {object:true}
|
||||
// and we convert nested arrays to ['json array']
|
||||
|
||||
// This reduces the chance of false positives but may
|
||||
// result in not detecting deeper changes.
|
||||
Object.keys(data.data.resultData.runData).map((nodeName: string) => {
|
||||
data.data.resultData.runData[nodeName].map((taskData: ITaskData) => {
|
||||
if (taskData.data === undefined) {
|
||||
return;
|
||||
}
|
||||
Object.keys(taskData.data).map((connectionName) => {
|
||||
const connection = taskData.data![connectionName];
|
||||
connection.map((executionDataArray) => {
|
||||
if (executionDataArray === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
nodeEdgeCases[nodeName] !== undefined &&
|
||||
nodeEdgeCases[nodeName].capResults !== undefined
|
||||
) {
|
||||
executionDataArray.splice(nodeEdgeCases[nodeName].capResults!);
|
||||
}
|
||||
|
||||
executionDataArray.map((executionData) => {
|
||||
if (executionData.json === undefined) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
nodeEdgeCases[nodeName] !== undefined &&
|
||||
nodeEdgeCases[nodeName].ignoredProperties !== undefined
|
||||
) {
|
||||
nodeEdgeCases[nodeName].ignoredProperties!.forEach(
|
||||
(ignoredProperty) => delete executionData.json[ignoredProperty],
|
||||
);
|
||||
}
|
||||
|
||||
let keepOnlyFields = [] as string[];
|
||||
if (
|
||||
nodeEdgeCases[nodeName] !== undefined &&
|
||||
nodeEdgeCases[nodeName].keepOnlyProperties !== undefined
|
||||
) {
|
||||
keepOnlyFields = nodeEdgeCases[nodeName].keepOnlyProperties!;
|
||||
}
|
||||
executionData.json =
|
||||
keepOnlyFields.length > 0
|
||||
? pick(executionData.json, keepOnlyFields)
|
||||
: executionData.json;
|
||||
const jsonProperties = executionData.json;
|
||||
|
||||
const nodeOutputAttributes = Object.keys(jsonProperties);
|
||||
nodeOutputAttributes.map((attributeName) => {
|
||||
if (Array.isArray(jsonProperties[attributeName])) {
|
||||
jsonProperties[attributeName] = ['json array'];
|
||||
} else if (typeof jsonProperties[attributeName] === 'object') {
|
||||
jsonProperties[attributeName] = { object: true };
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// If not using shallow comparison then we only treat nodeEdgeCases.
|
||||
const specialCases = Object.keys(nodeEdgeCases);
|
||||
|
||||
specialCases.forEach((nodeName) => {
|
||||
data.data.resultData.runData[nodeName].map((taskData: ITaskData) => {
|
||||
if (taskData.data === undefined) {
|
||||
return;
|
||||
}
|
||||
Object.keys(taskData.data).map((connectionName) => {
|
||||
const connection = taskData.data![connectionName];
|
||||
connection.map((executionDataArray) => {
|
||||
if (executionDataArray === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nodeEdgeCases[nodeName].capResults !== undefined) {
|
||||
executionDataArray.splice(nodeEdgeCases[nodeName].capResults!);
|
||||
}
|
||||
|
||||
if (nodeEdgeCases[nodeName].ignoredProperties !== undefined) {
|
||||
executionDataArray.map((executionData) => {
|
||||
if (executionData.json === undefined) {
|
||||
return;
|
||||
}
|
||||
nodeEdgeCases[nodeName].ignoredProperties!.forEach(
|
||||
(ignoredProperty) => delete executionData.json[ignoredProperty],
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const serializedData = this.formatJsonOutput(data);
|
||||
if (ExecuteBatch.compare === undefined) {
|
||||
executionResult.executionStatus = 'success';
|
||||
} else {
|
||||
const fileName = `${
|
||||
ExecuteBatch.compare.endsWith(sep)
|
||||
? ExecuteBatch.compare
|
||||
: ExecuteBatch.compare + sep
|
||||
}${workflowData.id}-snapshot.json`;
|
||||
if (fs.existsSync(fileName)) {
|
||||
const contents = fs.readFileSync(fileName, { encoding: 'utf-8' });
|
||||
|
||||
const changes = diff(JSON.parse(contents), data, { keysOnly: true });
|
||||
|
||||
if (changes !== undefined) {
|
||||
// If we had only additions with no removals
|
||||
// Then we treat as a warning and not an error.
|
||||
// To find this, we convert the object to JSON
|
||||
// and search for the `__deleted` string
|
||||
const changesJson = JSON.stringify(changes);
|
||||
if (changesJson.includes('__deleted')) {
|
||||
// we have structural changes. Report them.
|
||||
executionResult.error = 'Workflow may contain breaking changes';
|
||||
executionResult.changes = changes;
|
||||
executionResult.executionStatus = 'error';
|
||||
} else {
|
||||
executionResult.error =
|
||||
'Workflow contains new data that previously did not exist.';
|
||||
executionResult.changes = changes;
|
||||
executionResult.executionStatus = 'warning';
|
||||
}
|
||||
} else {
|
||||
executionResult.executionStatus = 'success';
|
||||
}
|
||||
} else {
|
||||
executionResult.error = 'Snapshot for not found.';
|
||||
executionResult.executionStatus = 'warning';
|
||||
}
|
||||
}
|
||||
// Save snapshots only after comparing - this is to make sure we're updating
|
||||
// After comparing to existing version.
|
||||
if (ExecuteBatch.snapshot !== undefined) {
|
||||
const fileName = `${
|
||||
ExecuteBatch.snapshot.endsWith(sep)
|
||||
? ExecuteBatch.snapshot
|
||||
: ExecuteBatch.snapshot + sep
|
||||
}${workflowData.id}-snapshot.json`;
|
||||
fs.writeFileSync(fileName, serializedData);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access
|
||||
executionResult.error = `Workflow failed to execute: ${e.message}`;
|
||||
executionResult.executionStatus = 'error';
|
||||
}
|
||||
clearTimeout(timeoutTimer);
|
||||
resolve(executionResult);
|
||||
});
|
||||
}
|
||||
}
|
||||
172
packages/cli/src/commands/export/credentials.ts
Normal file
172
packages/cli/src/commands/export/credentials.ts
Normal file
@@ -0,0 +1,172 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable @typescript-eslint/restrict-plus-operands */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { Credentials, UserSettings } from 'n8n-core';
|
||||
|
||||
import { IDataObject, LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { getLogger } from '@/Logger';
|
||||
import * as Db from '@/Db';
|
||||
import type { ICredentialsDecryptedDb } from '@/Interfaces';
|
||||
|
||||
export class ExportCredentialsCommand extends Command {
|
||||
static description = 'Export credentials';
|
||||
|
||||
static examples = [
|
||||
`$ n8n export:credentials --all`,
|
||||
`$ n8n export:credentials --id=5 --output=file.json`,
|
||||
`$ n8n export:credentials --all --output=backups/latest.json`,
|
||||
`$ n8n export:credentials --backup --output=backups/latest/`,
|
||||
`$ n8n export:credentials --all --decrypted --output=backups/decrypted.json`,
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
all: flags.boolean({
|
||||
description: 'Export all credentials',
|
||||
}),
|
||||
backup: flags.boolean({
|
||||
description:
|
||||
'Sets --all --pretty --separate for simple backups. Only --output has to be set additionally.',
|
||||
}),
|
||||
id: flags.string({
|
||||
description: 'The ID of the credential to export',
|
||||
}),
|
||||
output: flags.string({
|
||||
char: 'o',
|
||||
description: 'Output file name or directory if using separate files',
|
||||
}),
|
||||
pretty: flags.boolean({
|
||||
description: 'Format the output in an easier to read fashion',
|
||||
}),
|
||||
separate: flags.boolean({
|
||||
description:
|
||||
'Exports one file per credential (useful for versioning). Must inform a directory via --output.',
|
||||
}),
|
||||
decrypted: flags.boolean({
|
||||
description:
|
||||
'Exports data decrypted / in plain text. ALL SENSITIVE INFORMATION WILL BE VISIBLE IN THE FILES. Use to migrate from a installation to another that have a different secret key (in the config file).',
|
||||
}),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(ExportCredentialsCommand);
|
||||
|
||||
if (flags.backup) {
|
||||
flags.all = true;
|
||||
flags.pretty = true;
|
||||
flags.separate = true;
|
||||
}
|
||||
|
||||
if (!flags.all && !flags.id) {
|
||||
console.info(`Either option "--all" or "--id" have to be set!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.all && flags.id) {
|
||||
console.info(`You should either use "--all" or "--id" but never both!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
try {
|
||||
if (!flags.output) {
|
||||
console.info(`You must inform an output directory via --output when using --separate`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (fs.existsSync(flags.output)) {
|
||||
if (!fs.lstatSync(flags.output).isDirectory()) {
|
||||
console.info(`The parameter --output must be a directory`);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
fs.mkdirSync(flags.output, { recursive: true });
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(
|
||||
'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.',
|
||||
);
|
||||
logger.error('\nFILESYSTEM ERROR');
|
||||
logger.info('====================================');
|
||||
logger.error(e.message);
|
||||
logger.error(e.stack);
|
||||
this.exit(1);
|
||||
}
|
||||
} else if (flags.output) {
|
||||
if (fs.existsSync(flags.output)) {
|
||||
if (fs.lstatSync(flags.output).isDirectory()) {
|
||||
console.info(`The parameter --output must be a writeable file`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
const findQuery: IDataObject = {};
|
||||
if (flags.id) {
|
||||
findQuery.id = flags.id;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const credentials = await Db.collections.Credentials.find(findQuery);
|
||||
|
||||
if (flags.decrypted) {
|
||||
const encryptionKey = await UserSettings.getEncryptionKey();
|
||||
|
||||
for (let i = 0; i < credentials.length; i++) {
|
||||
const { name, type, nodesAccess, data } = credentials[i];
|
||||
const id = credentials[i].id as string;
|
||||
const credential = new Credentials({ id, name }, type, nodesAccess, data);
|
||||
const plainData = credential.getData(encryptionKey);
|
||||
(credentials[i] as ICredentialsDecryptedDb).data = plainData;
|
||||
}
|
||||
}
|
||||
|
||||
if (credentials.length === 0) {
|
||||
throw new Error('No credentials found with specified filters.');
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
let fileContents: string;
|
||||
let i: number;
|
||||
for (i = 0; i < credentials.length; i++) {
|
||||
fileContents = JSON.stringify(credentials[i], null, flags.pretty ? 2 : undefined);
|
||||
const filename = `${
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
(flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) +
|
||||
credentials[i].id
|
||||
}.json`;
|
||||
fs.writeFileSync(filename, fileContents);
|
||||
}
|
||||
console.info(`Successfully exported ${i} credentials.`);
|
||||
} else {
|
||||
const fileContents = JSON.stringify(credentials, null, flags.pretty ? 2 : undefined);
|
||||
if (flags.output) {
|
||||
fs.writeFileSync(flags.output, fileContents);
|
||||
console.info(`Successfully exported ${credentials.length} credentials.`);
|
||||
} else {
|
||||
console.info(fileContents);
|
||||
}
|
||||
}
|
||||
// Force exit as process won't exit using MySQL or Postgres.
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
console.error('Error exporting credentials. See log messages for details.');
|
||||
logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
157
packages/cli/src/commands/export/workflow.ts
Normal file
157
packages/cli/src/commands/export/workflow.ts
Normal file
@@ -0,0 +1,157 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { IDataObject, LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { getLogger } from '@/Logger';
|
||||
import * as Db from '@/Db';
|
||||
|
||||
export class ExportWorkflowsCommand extends Command {
|
||||
static description = 'Export workflows';
|
||||
|
||||
static examples = [
|
||||
`$ n8n export:workflow --all`,
|
||||
`$ n8n export:workflow --id=5 --output=file.json`,
|
||||
`$ n8n export:workflow --all --output=backups/latest/`,
|
||||
`$ n8n export:workflow --backup --output=backups/latest/`,
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
all: flags.boolean({
|
||||
description: 'Export all workflows',
|
||||
}),
|
||||
backup: flags.boolean({
|
||||
description:
|
||||
'Sets --all --pretty --separate for simple backups. Only --output has to be set additionally.',
|
||||
}),
|
||||
id: flags.string({
|
||||
description: 'The ID of the workflow to export',
|
||||
}),
|
||||
output: flags.string({
|
||||
char: 'o',
|
||||
description: 'Output file name or directory if using separate files',
|
||||
}),
|
||||
pretty: flags.boolean({
|
||||
description: 'Format the output in an easier to read fashion',
|
||||
}),
|
||||
separate: flags.boolean({
|
||||
description:
|
||||
'Exports one file per workflow (useful for versioning). Must inform a directory via --output.',
|
||||
}),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(ExportWorkflowsCommand);
|
||||
|
||||
if (flags.backup) {
|
||||
flags.all = true;
|
||||
flags.pretty = true;
|
||||
flags.separate = true;
|
||||
}
|
||||
|
||||
if (!flags.all && !flags.id) {
|
||||
console.info(`Either option "--all" or "--id" have to be set!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.all && flags.id) {
|
||||
console.info(`You should either use "--all" or "--id" but never both!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
try {
|
||||
if (!flags.output) {
|
||||
console.info(`You must inform an output directory via --output when using --separate`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (fs.existsSync(flags.output)) {
|
||||
if (!fs.lstatSync(flags.output).isDirectory()) {
|
||||
console.info(`The parameter --output must be a directory`);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
fs.mkdirSync(flags.output, { recursive: true });
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(
|
||||
'Aborting execution as a filesystem error has been encountered while creating the output directory. See log messages for details.',
|
||||
);
|
||||
logger.error('\nFILESYSTEM ERROR');
|
||||
logger.info('====================================');
|
||||
logger.error(e.message);
|
||||
logger.error(e.stack);
|
||||
this.exit(1);
|
||||
}
|
||||
} else if (flags.output) {
|
||||
if (fs.existsSync(flags.output)) {
|
||||
if (fs.lstatSync(flags.output).isDirectory()) {
|
||||
console.info(`The parameter --output must be a writeable file`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
const findQuery: IDataObject = {};
|
||||
if (flags.id) {
|
||||
findQuery.id = flags.id;
|
||||
}
|
||||
|
||||
const workflows = await Db.collections.Workflow.find({
|
||||
where: findQuery,
|
||||
relations: ['tags'],
|
||||
});
|
||||
|
||||
if (workflows.length === 0) {
|
||||
throw new Error('No workflows found with specified filters.');
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
let fileContents: string;
|
||||
let i: number;
|
||||
for (i = 0; i < workflows.length; i++) {
|
||||
fileContents = JSON.stringify(workflows[i], null, flags.pretty ? 2 : undefined);
|
||||
const filename = `${
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-plus-operands, @typescript-eslint/no-non-null-assertion
|
||||
(flags.output!.endsWith(path.sep) ? flags.output! : flags.output + path.sep) +
|
||||
workflows[i].id
|
||||
}.json`;
|
||||
fs.writeFileSync(filename, fileContents);
|
||||
}
|
||||
console.info(`Successfully exported ${i} workflows.`);
|
||||
} else {
|
||||
const fileContents = JSON.stringify(workflows, null, flags.pretty ? 2 : undefined);
|
||||
if (flags.output) {
|
||||
fs.writeFileSync(flags.output, fileContents);
|
||||
console.info(
|
||||
`Successfully exported ${workflows.length} ${
|
||||
workflows.length === 1 ? 'workflow.' : 'workflows.'
|
||||
}`,
|
||||
);
|
||||
} else {
|
||||
console.info(fileContents);
|
||||
}
|
||||
}
|
||||
// Force exit as process won't exit using MySQL or Postgres.
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
console.error('Error exporting workflows. See log messages for details.');
|
||||
logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
208
packages/cli/src/commands/import/credentials.ts
Normal file
208
packages/cli/src/commands/import/credentials.ts
Normal file
@@ -0,0 +1,208 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable no-restricted-syntax */
|
||||
/* eslint-disable @typescript-eslint/no-shadow */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable no-await-in-loop */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { Credentials, UserSettings } from 'n8n-core';
|
||||
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import fs from 'fs';
|
||||
import glob from 'fast-glob';
|
||||
import { EntityManager, getConnection } from 'typeorm';
|
||||
import { getLogger } from '@/Logger';
|
||||
import * as Db from '@/Db';
|
||||
import { User } from '@db/entities/User';
|
||||
import { SharedCredentials } from '@db/entities/SharedCredentials';
|
||||
import { Role } from '@db/entities/Role';
|
||||
import { CredentialsEntity } from '@db/entities/CredentialsEntity';
|
||||
|
||||
const FIX_INSTRUCTION =
|
||||
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';
|
||||
|
||||
export class ImportCredentialsCommand extends Command {
|
||||
static description = 'Import credentials';
|
||||
|
||||
static examples = [
|
||||
'$ n8n import:credentials --input=file.json',
|
||||
'$ n8n import:credentials --separate --input=backups/latest/',
|
||||
'$ n8n import:credentials --input=file.json --userId=1d64c3d2-85fe-4a83-a649-e446b07b3aae',
|
||||
'$ n8n import:credentials --separate --input=backups/latest/ --userId=1d64c3d2-85fe-4a83-a649-e446b07b3aae',
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
input: flags.string({
|
||||
char: 'i',
|
||||
description: 'Input file name or directory if --separate is used',
|
||||
}),
|
||||
separate: flags.boolean({
|
||||
description: 'Imports *.json files from directory provided by --input',
|
||||
}),
|
||||
userId: flags.string({
|
||||
description: 'The ID of the user to assign the imported credentials to',
|
||||
}),
|
||||
};
|
||||
|
||||
ownerCredentialRole: Role;
|
||||
|
||||
transactionManager: EntityManager;
|
||||
|
||||
async run(): Promise<void> {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
const { flags } = this.parse(ImportCredentialsCommand);
|
||||
|
||||
if (!flags.input) {
|
||||
console.info('An input file or directory with --input must be provided');
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
if (fs.existsSync(flags.input)) {
|
||||
if (!fs.lstatSync(flags.input).isDirectory()) {
|
||||
console.info('The argument to --input must be a directory');
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let totalImported = 0;
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
await this.initOwnerCredentialRole();
|
||||
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();
|
||||
|
||||
// Make sure the settings exist
|
||||
await UserSettings.prepareUserSettings();
|
||||
|
||||
const encryptionKey = await UserSettings.getEncryptionKey();
|
||||
|
||||
if (flags.separate) {
|
||||
let { input: inputPath } = flags;
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
inputPath = inputPath.replace(/\\/g, '/');
|
||||
}
|
||||
|
||||
const files = await glob('*.json', {
|
||||
cwd: inputPath,
|
||||
absolute: true,
|
||||
});
|
||||
|
||||
totalImported = files.length;
|
||||
|
||||
await getConnection().transaction(async (transactionManager) => {
|
||||
this.transactionManager = transactionManager;
|
||||
for (const file of files) {
|
||||
const credential = JSON.parse(fs.readFileSync(file, { encoding: 'utf8' }));
|
||||
|
||||
if (typeof credential.data === 'object') {
|
||||
// plain data / decrypted input. Should be encrypted first.
|
||||
Credentials.prototype.setData.call(credential, credential.data, encryptionKey);
|
||||
}
|
||||
|
||||
await this.storeCredential(credential, user);
|
||||
}
|
||||
});
|
||||
|
||||
this.reportSuccess(totalImported);
|
||||
process.exit();
|
||||
}
|
||||
|
||||
const credentials = JSON.parse(fs.readFileSync(flags.input, { encoding: 'utf8' }));
|
||||
|
||||
totalImported = credentials.length;
|
||||
|
||||
if (!Array.isArray(credentials)) {
|
||||
throw new Error(
|
||||
'File does not seem to contain credentials. Make sure the credentials are contained in an array.',
|
||||
);
|
||||
}
|
||||
|
||||
await getConnection().transaction(async (transactionManager) => {
|
||||
this.transactionManager = transactionManager;
|
||||
for (const credential of credentials) {
|
||||
if (typeof credential.data === 'object') {
|
||||
// plain data / decrypted input. Should be encrypted first.
|
||||
Credentials.prototype.setData.call(credential, credential.data, encryptionKey);
|
||||
}
|
||||
await this.storeCredential(credential, user);
|
||||
}
|
||||
});
|
||||
|
||||
this.reportSuccess(totalImported);
|
||||
process.exit();
|
||||
} catch (error) {
|
||||
console.error('An error occurred while importing credentials. See log messages for details.');
|
||||
if (error instanceof Error) logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
private reportSuccess(total: number) {
|
||||
console.info(`Successfully imported ${total} ${total === 1 ? 'credential.' : 'credentials.'}`);
|
||||
}
|
||||
|
||||
private async initOwnerCredentialRole() {
|
||||
const ownerCredentialRole = await Db.collections.Role.findOne({
|
||||
where: { name: 'owner', scope: 'credential' },
|
||||
});
|
||||
|
||||
if (!ownerCredentialRole) {
|
||||
throw new Error(`Failed to find owner credential role. ${FIX_INSTRUCTION}`);
|
||||
}
|
||||
|
||||
this.ownerCredentialRole = ownerCredentialRole;
|
||||
}
|
||||
|
||||
private async storeCredential(credential: object, user: User) {
|
||||
const newCredential = new CredentialsEntity();
|
||||
|
||||
Object.assign(newCredential, credential);
|
||||
|
||||
const savedCredential = await this.transactionManager.save<CredentialsEntity>(newCredential);
|
||||
|
||||
const newSharedCredential = new SharedCredentials();
|
||||
|
||||
Object.assign(newSharedCredential, {
|
||||
credentials: savedCredential,
|
||||
user,
|
||||
role: this.ownerCredentialRole,
|
||||
});
|
||||
|
||||
await this.transactionManager.save<SharedCredentials>(newSharedCredential);
|
||||
}
|
||||
|
||||
private async getOwner() {
|
||||
const ownerGlobalRole = await Db.collections.Role.findOne({
|
||||
where: { name: 'owner', scope: 'global' },
|
||||
});
|
||||
|
||||
const owner = await Db.collections.User.findOne({ globalRole: ownerGlobalRole });
|
||||
|
||||
if (!owner) {
|
||||
throw new Error(`Failed to find owner. ${FIX_INSTRUCTION}`);
|
||||
}
|
||||
|
||||
return owner;
|
||||
}
|
||||
|
||||
private async getAssignee(userId: string) {
|
||||
const user = await Db.collections.User.findOne(userId);
|
||||
|
||||
if (!user) {
|
||||
throw new Error(`Failed to find user with ID ${userId}`);
|
||||
}
|
||||
|
||||
return user;
|
||||
}
|
||||
}
|
||||
275
packages/cli/src/commands/import/workflow.ts
Normal file
275
packages/cli/src/commands/import/workflow.ts
Normal file
@@ -0,0 +1,275 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable no-restricted-syntax */
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
/* eslint-disable @typescript-eslint/no-shadow */
|
||||
/* eslint-disable @typescript-eslint/no-loop-func */
|
||||
/* eslint-disable no-await-in-loop */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
/* eslint-disable no-console */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { INode, INodeCredentialsDetails, LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import fs from 'fs';
|
||||
import glob from 'fast-glob';
|
||||
import { UserSettings } from 'n8n-core';
|
||||
import { EntityManager, getConnection } from 'typeorm';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { getLogger } from '@/Logger';
|
||||
import * as Db from '@/Db';
|
||||
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
|
||||
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
||||
import { Role } from '@db/entities/Role';
|
||||
import { User } from '@db/entities/User';
|
||||
import { setTagsForImport } from '@/TagHelpers';
|
||||
import type { ICredentialsDb, IWorkflowToImport } from '@/Interfaces';
|
||||
|
||||
const FIX_INSTRUCTION =
|
||||
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';
|
||||
|
||||
function assertHasWorkflowsToImport(workflows: unknown): asserts workflows is IWorkflowToImport[] {
|
||||
if (!Array.isArray(workflows)) {
|
||||
throw new Error(
|
||||
'File does not seem to contain workflows. Make sure the workflows are contained in an array.',
|
||||
);
|
||||
}
|
||||
|
||||
for (const workflow of workflows) {
|
||||
if (
|
||||
typeof workflow !== 'object' ||
|
||||
!Object.prototype.hasOwnProperty.call(workflow, 'nodes') ||
|
||||
!Object.prototype.hasOwnProperty.call(workflow, 'connections')
|
||||
) {
|
||||
throw new Error('File does not seem to contain valid workflows.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class ImportWorkflowsCommand extends Command {
|
||||
static description = 'Import workflows';
|
||||
|
||||
static examples = [
|
||||
'$ n8n import:workflow --input=file.json',
|
||||
'$ n8n import:workflow --separate --input=backups/latest/',
|
||||
'$ n8n import:workflow --input=file.json --userId=1d64c3d2-85fe-4a83-a649-e446b07b3aae',
|
||||
'$ n8n import:workflow --separate --input=backups/latest/ --userId=1d64c3d2-85fe-4a83-a649-e446b07b3aae',
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
input: flags.string({
|
||||
char: 'i',
|
||||
description: 'Input file name or directory if --separate is used',
|
||||
}),
|
||||
separate: flags.boolean({
|
||||
description: 'Imports *.json files from directory provided by --input',
|
||||
}),
|
||||
userId: flags.string({
|
||||
description: 'The ID of the user to assign the imported workflows to',
|
||||
}),
|
||||
};
|
||||
|
||||
ownerWorkflowRole: Role;
|
||||
|
||||
transactionManager: EntityManager;
|
||||
|
||||
async run(): Promise<void> {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
const { flags } = this.parse(ImportWorkflowsCommand);
|
||||
|
||||
if (!flags.input) {
|
||||
console.info('An input file or directory with --input must be provided');
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.separate) {
|
||||
if (fs.existsSync(flags.input)) {
|
||||
if (!fs.lstatSync(flags.input).isDirectory()) {
|
||||
console.info('The argument to --input must be a directory');
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
await this.initOwnerWorkflowRole();
|
||||
const user = flags.userId ? await this.getAssignee(flags.userId) : await this.getOwner();
|
||||
|
||||
// Make sure the settings exist
|
||||
await UserSettings.prepareUserSettings();
|
||||
const credentials = await Db.collections.Credentials.find();
|
||||
const tags = await Db.collections.Tag.find();
|
||||
|
||||
let totalImported = 0;
|
||||
|
||||
if (flags.separate) {
|
||||
let { input: inputPath } = flags;
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
inputPath = inputPath.replace(/\\/g, '/');
|
||||
}
|
||||
|
||||
const files = await glob('*.json', {
|
||||
cwd: inputPath,
|
||||
absolute: true,
|
||||
});
|
||||
|
||||
totalImported = files.length;
|
||||
|
||||
await getConnection().transaction(async (transactionManager) => {
|
||||
this.transactionManager = transactionManager;
|
||||
|
||||
for (const file of files) {
|
||||
const workflow = JSON.parse(fs.readFileSync(file, { encoding: 'utf8' }));
|
||||
|
||||
if (credentials.length > 0) {
|
||||
workflow.nodes.forEach((node: INode) => {
|
||||
this.transformCredentials(node, credentials);
|
||||
|
||||
if (!node.id) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
node.id = uuid();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
|
||||
await setTagsForImport(transactionManager, workflow, tags);
|
||||
}
|
||||
|
||||
await this.storeWorkflow(workflow, user);
|
||||
}
|
||||
});
|
||||
|
||||
this.reportSuccess(totalImported);
|
||||
process.exit();
|
||||
}
|
||||
|
||||
const workflows = JSON.parse(fs.readFileSync(flags.input, { encoding: 'utf8' }));
|
||||
|
||||
assertHasWorkflowsToImport(workflows);
|
||||
|
||||
totalImported = workflows.length;
|
||||
|
||||
await getConnection().transaction(async (transactionManager) => {
|
||||
this.transactionManager = transactionManager;
|
||||
|
||||
for (const workflow of workflows) {
|
||||
if (credentials.length > 0) {
|
||||
workflow.nodes.forEach((node: INode) => {
|
||||
this.transformCredentials(node, credentials);
|
||||
|
||||
if (!node.id) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
node.id = uuid();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (Object.prototype.hasOwnProperty.call(workflow, 'tags')) {
|
||||
await setTagsForImport(transactionManager, workflow, tags);
|
||||
}
|
||||
|
||||
await this.storeWorkflow(workflow, user);
|
||||
}
|
||||
});
|
||||
|
||||
this.reportSuccess(totalImported);
|
||||
process.exit();
|
||||
} catch (error) {
|
||||
console.error('An error occurred while importing workflows. See log messages for details.');
|
||||
if (error instanceof Error) logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
private reportSuccess(total: number) {
|
||||
console.info(`Successfully imported ${total} ${total === 1 ? 'workflow.' : 'workflows.'}`);
|
||||
}
|
||||
|
||||
private async initOwnerWorkflowRole() {
|
||||
const ownerWorkflowRole = await Db.collections.Role.findOne({
|
||||
where: { name: 'owner', scope: 'workflow' },
|
||||
});
|
||||
|
||||
if (!ownerWorkflowRole) {
|
||||
throw new Error(`Failed to find owner workflow role. ${FIX_INSTRUCTION}`);
|
||||
}
|
||||
|
||||
this.ownerWorkflowRole = ownerWorkflowRole;
|
||||
}
|
||||
|
||||
private async storeWorkflow(workflow: object, user: User) {
|
||||
const newWorkflow = new WorkflowEntity();
|
||||
|
||||
Object.assign(newWorkflow, workflow);
|
||||
|
||||
const savedWorkflow = await this.transactionManager.save<WorkflowEntity>(newWorkflow);
|
||||
|
||||
const newSharedWorkflow = new SharedWorkflow();
|
||||
|
||||
Object.assign(newSharedWorkflow, {
|
||||
workflow: savedWorkflow,
|
||||
user,
|
||||
role: this.ownerWorkflowRole,
|
||||
});
|
||||
|
||||
await this.transactionManager.save<SharedWorkflow>(newSharedWorkflow);
|
||||
}
|
||||
|
||||
private async getOwner() {
|
||||
const ownerGlobalRole = await Db.collections.Role.findOne({
|
||||
where: { name: 'owner', scope: 'global' },
|
||||
});
|
||||
|
||||
const owner = await Db.collections.User.findOne({ globalRole: ownerGlobalRole });
|
||||
|
||||
if (!owner) {
|
||||
throw new Error(`Failed to find owner. ${FIX_INSTRUCTION}`);
|
||||
}
|
||||
|
||||
return owner;
|
||||
}
|
||||
|
||||
private async getAssignee(userId: string) {
|
||||
const user = await Db.collections.User.findOne(userId);
|
||||
|
||||
if (!user) {
|
||||
throw new Error(`Failed to find user with ID ${userId}`);
|
||||
}
|
||||
|
||||
return user;
|
||||
}
|
||||
|
||||
private transformCredentials(node: INode, credentialsEntities: ICredentialsDb[]) {
|
||||
if (node.credentials) {
|
||||
const allNodeCredentials = Object.entries(node.credentials);
|
||||
for (const [type, name] of allNodeCredentials) {
|
||||
if (typeof name === 'string') {
|
||||
const nodeCredentials: INodeCredentialsDetails = {
|
||||
id: null,
|
||||
name,
|
||||
};
|
||||
|
||||
const matchingCredentials = credentialsEntities.filter(
|
||||
(credentials) => credentials.name === name && credentials.type === type,
|
||||
);
|
||||
|
||||
if (matchingCredentials.length === 1) {
|
||||
nodeCredentials.id = matchingCredentials[0].id.toString();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
node.credentials[type] = nodeCredentials;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
61
packages/cli/src/commands/list/workflow.ts
Normal file
61
packages/cli/src/commands/list/workflow.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { IDataObject } from 'n8n-workflow';
|
||||
|
||||
import * as Db from '@/Db';
|
||||
|
||||
export class ListWorkflowCommand extends Command {
|
||||
static description = '\nList workflows';
|
||||
|
||||
static examples = [
|
||||
'$ n8n list:workflow',
|
||||
'$ n8n list:workflow --active=true --onlyId',
|
||||
'$ n8n list:workflow --active=false',
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
active: flags.string({
|
||||
description: 'Filters workflows by active status. Can be true or false',
|
||||
}),
|
||||
onlyId: flags.boolean({
|
||||
description: 'Outputs workflow IDs only, one per line.',
|
||||
}),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(ListWorkflowCommand);
|
||||
|
||||
if (flags.active !== undefined && !['true', 'false'].includes(flags.active)) {
|
||||
this.error('The --active flag has to be passed using true or false');
|
||||
}
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
const findQuery: IDataObject = {};
|
||||
if (flags.active !== undefined) {
|
||||
findQuery.active = flags.active === 'true';
|
||||
}
|
||||
|
||||
const workflows = await Db.collections.Workflow.find(findQuery);
|
||||
if (flags.onlyId) {
|
||||
workflows.forEach((workflow) => console.log(workflow.id));
|
||||
} else {
|
||||
workflows.forEach((workflow) => console.log(`${workflow.id}|${workflow.name}`));
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('\nGOT ERROR');
|
||||
console.log('====================================');
|
||||
console.error(e.message);
|
||||
console.error(e.stack);
|
||||
this.exit(1);
|
||||
}
|
||||
|
||||
this.exit();
|
||||
}
|
||||
}
|
||||
471
packages/cli/src/commands/start.ts
Normal file
471
packages/cli/src/commands/start.ts
Normal file
@@ -0,0 +1,471 @@
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
/* eslint-disable @typescript-eslint/await-thenable */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
/* eslint-disable no-console */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
import localtunnel from 'localtunnel';
|
||||
import { BinaryDataManager, TUNNEL_SUBDOMAIN_ENV, UserSettings } from 'n8n-core';
|
||||
import { Command, flags } from '@oclif/command';
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies
|
||||
import Redis from 'ioredis';
|
||||
|
||||
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
|
||||
import { createHash } from 'crypto';
|
||||
import config from '@/config';
|
||||
|
||||
import * as ActiveExecutions from '@/ActiveExecutions';
|
||||
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
|
||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||
import { CredentialTypes } from '@/CredentialTypes';
|
||||
import * as Db from '@/Db';
|
||||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { InternalHooksManager } from '@/InternalHooksManager';
|
||||
import * as Server from '@/Server';
|
||||
import { DatabaseType } from '@/Interfaces';
|
||||
import * as TestWebhooks from '@/TestWebhooks';
|
||||
import { WaitTracker } from '@/WaitTracker';
|
||||
|
||||
import { getLogger } from '@/Logger';
|
||||
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
|
||||
import { initErrorHandling } from '@/ErrorReporting';
|
||||
import * as CrashJournal from '@/CrashJournal';
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
||||
const open = require('open');
|
||||
|
||||
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
|
||||
let processExitCode = 0;
|
||||
|
||||
export class Start extends Command {
|
||||
static description = 'Starts n8n. Makes Web-UI available and starts active workflows';
|
||||
|
||||
static examples = [
|
||||
`$ n8n start`,
|
||||
`$ n8n start --tunnel`,
|
||||
`$ n8n start -o`,
|
||||
`$ n8n start --tunnel -o`,
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
open: flags.boolean({
|
||||
char: 'o',
|
||||
description: 'opens the UI automatically in browser',
|
||||
}),
|
||||
tunnel: flags.boolean({
|
||||
description:
|
||||
'runs the webhooks via a hooks.n8n.cloud tunnel server. Use only for testing and development!',
|
||||
}),
|
||||
reinstallMissingPackages: flags.boolean({
|
||||
description:
|
||||
'Attempts to self heal n8n if packages with nodes are missing. Might drastically increase startup times.',
|
||||
}),
|
||||
};
|
||||
|
||||
/**
|
||||
* Opens the UI in browser
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
static openBrowser() {
|
||||
const editorUrl = GenericHelpers.getBaseUrl();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
open(editorUrl, { wait: true }).catch((error: Error) => {
|
||||
console.log(
|
||||
`\nWas not able to open URL in browser. Please open manually by visiting:\n${editorUrl}\n`,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop n8n in a graceful way.
|
||||
* Make for example sure that all the webhooks from third party services
|
||||
* get removed.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
static async stopProcess() {
|
||||
getLogger().info('\nStopping n8n...');
|
||||
|
||||
const exit = () => {
|
||||
CrashJournal.cleanup().finally(() => {
|
||||
process.exit(processExitCode);
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
// Stop with trying to activate workflows that could not be activated
|
||||
activeWorkflowRunner?.removeAllQueuedWorkflowActivations();
|
||||
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.run('n8n.stop', []);
|
||||
|
||||
setTimeout(() => {
|
||||
// In case that something goes wrong with shutdown we
|
||||
// kill after max. 30 seconds no matter what
|
||||
console.log(`process exited after 30s`);
|
||||
exit();
|
||||
}, 30000);
|
||||
|
||||
await InternalHooksManager.getInstance().onN8nStop();
|
||||
|
||||
const skipWebhookDeregistration = config.getEnv(
|
||||
'endpoints.skipWebhoooksDeregistrationOnShutdown',
|
||||
);
|
||||
|
||||
const removePromises = [];
|
||||
if (activeWorkflowRunner !== undefined && !skipWebhookDeregistration) {
|
||||
removePromises.push(activeWorkflowRunner.removeAll());
|
||||
}
|
||||
|
||||
// Remove all test webhooks
|
||||
const testWebhooks = TestWebhooks.getInstance();
|
||||
removePromises.push(testWebhooks.removeAll());
|
||||
|
||||
await Promise.all(removePromises);
|
||||
|
||||
// Wait for active workflow executions to finish
|
||||
const activeExecutionsInstance = ActiveExecutions.getInstance();
|
||||
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
|
||||
let count = 0;
|
||||
while (executingWorkflows.length !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
|
||||
// eslint-disable-next-line array-callback-return
|
||||
executingWorkflows.map((execution) => {
|
||||
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await sleep(500);
|
||||
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('There was an error shutting down n8n.', error);
|
||||
}
|
||||
|
||||
exit();
|
||||
}
|
||||
|
||||
async run() {
|
||||
// Make sure that n8n shuts down gracefully if possible
|
||||
process.once('SIGTERM', Start.stopProcess);
|
||||
process.once('SIGINT', Start.stopProcess);
|
||||
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
logger.info('Initializing n8n process');
|
||||
|
||||
initErrorHandling();
|
||||
await CrashJournal.init();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(Start);
|
||||
|
||||
// Wrap that the process does not close but we can still use async
|
||||
await (async () => {
|
||||
try {
|
||||
// Start directly with the init of the database to improve startup time
|
||||
const startDbInitPromise = Db.init().catch((error: Error) => {
|
||||
logger.error(`There was an error initializing DB: "${error.message}"`);
|
||||
|
||||
processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Make sure the settings exist
|
||||
const userSettings = await UserSettings.prepareUserSettings();
|
||||
|
||||
if (!config.getEnv('userManagement.jwtSecret')) {
|
||||
// If we don't have a JWT secret set, generate
|
||||
// one based and save to config.
|
||||
const encryptionKey = await UserSettings.getEncryptionKey();
|
||||
|
||||
// For a key off every other letter from encryption key
|
||||
// CAREFUL: do not change this or it breaks all existing tokens.
|
||||
let baseKey = '';
|
||||
for (let i = 0; i < encryptionKey.length; i += 2) {
|
||||
baseKey += encryptionKey[i];
|
||||
}
|
||||
config.set(
|
||||
'userManagement.jwtSecret',
|
||||
createHash('sha256').update(baseKey).digest('hex'),
|
||||
);
|
||||
}
|
||||
|
||||
// Load all node and credential types
|
||||
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
||||
await loadNodesAndCredentials.init();
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
// Add the found types to an instance other parts of the application can use
|
||||
const nodeTypes = NodeTypes();
|
||||
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
||||
const credentialTypes = CredentialTypes();
|
||||
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
||||
|
||||
// Load the credentials overwrites if any exist
|
||||
const credentialsOverwrites = CredentialsOverwrites();
|
||||
await credentialsOverwrites.init();
|
||||
|
||||
// Wait till the database is ready
|
||||
await startDbInitPromise;
|
||||
|
||||
const installedPackages = await getAllInstalledPackages();
|
||||
const missingPackages = new Set<{
|
||||
packageName: string;
|
||||
version: string;
|
||||
}>();
|
||||
installedPackages.forEach((installedpackage) => {
|
||||
installedpackage.installedNodes.forEach((installedNode) => {
|
||||
if (!loadNodesAndCredentials.nodeTypes[installedNode.type]) {
|
||||
// Leave the list ready for installing in case we need.
|
||||
missingPackages.add({
|
||||
packageName: installedpackage.packageName,
|
||||
version: installedpackage.installedVersion,
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
await UserSettings.getEncryptionKey();
|
||||
|
||||
// Load settings from database and set them to config.
|
||||
const databaseSettings = await Db.collections.Settings.find({ loadOnStartup: true });
|
||||
databaseSettings.forEach((setting) => {
|
||||
config.set(setting.key, JSON.parse(setting.value));
|
||||
});
|
||||
|
||||
config.set('nodes.packagesMissing', '');
|
||||
if (missingPackages.size) {
|
||||
LoggerProxy.error(
|
||||
'n8n detected that some packages are missing. For more information, visit https://docs.n8n.io/integrations/community-nodes/troubleshooting/',
|
||||
);
|
||||
|
||||
if (flags.reinstallMissingPackages || process.env.N8N_REINSTALL_MISSING_PACKAGES) {
|
||||
LoggerProxy.info('Attempting to reinstall missing packages', { missingPackages });
|
||||
try {
|
||||
// Optimistic approach - stop if any installation fails
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
for (const missingPackage of missingPackages) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
void (await loadNodesAndCredentials.loadNpmModule(
|
||||
missingPackage.packageName,
|
||||
missingPackage.version,
|
||||
));
|
||||
missingPackages.delete(missingPackage);
|
||||
}
|
||||
LoggerProxy.info(
|
||||
'Packages reinstalled successfully. Resuming regular initialization.',
|
||||
);
|
||||
} catch (error) {
|
||||
LoggerProxy.error('n8n was unable to install the missing packages.');
|
||||
}
|
||||
}
|
||||
}
|
||||
if (missingPackages.size) {
|
||||
config.set(
|
||||
'nodes.packagesMissing',
|
||||
Array.from(missingPackages)
|
||||
.map((missingPackage) => `${missingPackage.packageName}@${missingPackage.version}`)
|
||||
.join(' '),
|
||||
);
|
||||
}
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
const redisHost = config.getEnv('queue.bull.redis.host');
|
||||
const redisPassword = config.getEnv('queue.bull.redis.password');
|
||||
const redisPort = config.getEnv('queue.bull.redis.port');
|
||||
const redisDB = config.getEnv('queue.bull.redis.db');
|
||||
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
|
||||
let lastTimer = 0;
|
||||
let cumulativeTimeout = 0;
|
||||
|
||||
const settings = {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
retryStrategy: (times: number): number | null => {
|
||||
const now = Date.now();
|
||||
if (now - lastTimer > 30000) {
|
||||
// Means we had no timeout at all or last timeout was temporary and we recovered
|
||||
lastTimer = now;
|
||||
cumulativeTimeout = 0;
|
||||
} else {
|
||||
cumulativeTimeout += now - lastTimer;
|
||||
lastTimer = now;
|
||||
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
|
||||
logger.error(
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
return 500;
|
||||
},
|
||||
} as IDataObject;
|
||||
|
||||
if (redisHost) {
|
||||
settings.host = redisHost;
|
||||
}
|
||||
if (redisPassword) {
|
||||
settings.password = redisPassword;
|
||||
}
|
||||
if (redisPort) {
|
||||
settings.port = redisPort;
|
||||
}
|
||||
if (redisDB) {
|
||||
settings.db = redisDB;
|
||||
}
|
||||
|
||||
// This connection is going to be our heartbeat
|
||||
// IORedis automatically pings redis and tries to reconnect
|
||||
// We will be using the retryStrategy above
|
||||
// to control how and when to exit.
|
||||
const redis = new Redis(settings);
|
||||
|
||||
redis.on('error', (error) => {
|
||||
if (error.toString().includes('ECONNREFUSED') === true) {
|
||||
logger.warn('Redis unavailable - trying to reconnect...');
|
||||
} else {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||
logger.warn('Error with Redis: ', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
|
||||
|
||||
if (dbType === 'sqlite') {
|
||||
const shouldRunVacuum = config.getEnv('database.sqlite.executeVacuumOnStartup');
|
||||
if (shouldRunVacuum) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
await Db.collections.Execution.query('VACUUM;');
|
||||
}
|
||||
}
|
||||
|
||||
if (flags.tunnel) {
|
||||
this.log('\nWaiting for tunnel ...');
|
||||
|
||||
let tunnelSubdomain;
|
||||
if (
|
||||
process.env[TUNNEL_SUBDOMAIN_ENV] !== undefined &&
|
||||
process.env[TUNNEL_SUBDOMAIN_ENV] !== ''
|
||||
) {
|
||||
tunnelSubdomain = process.env[TUNNEL_SUBDOMAIN_ENV];
|
||||
} else if (userSettings.tunnelSubdomain !== undefined) {
|
||||
tunnelSubdomain = userSettings.tunnelSubdomain;
|
||||
}
|
||||
|
||||
if (tunnelSubdomain === undefined) {
|
||||
// When no tunnel subdomain did exist yet create a new random one
|
||||
const availableCharacters = 'abcdefghijklmnopqrstuvwxyz0123456789';
|
||||
userSettings.tunnelSubdomain = Array.from({ length: 24 })
|
||||
.map(() => {
|
||||
return availableCharacters.charAt(
|
||||
Math.floor(Math.random() * availableCharacters.length),
|
||||
);
|
||||
})
|
||||
.join('');
|
||||
|
||||
await UserSettings.writeUserSettings(userSettings);
|
||||
}
|
||||
|
||||
const tunnelSettings: localtunnel.TunnelConfig = {
|
||||
host: 'https://hooks.n8n.cloud',
|
||||
subdomain: tunnelSubdomain,
|
||||
};
|
||||
|
||||
const port = config.getEnv('port');
|
||||
|
||||
// @ts-ignore
|
||||
const webhookTunnel = await localtunnel(port, tunnelSettings);
|
||||
|
||||
process.env.WEBHOOK_URL = `${webhookTunnel.url}/`;
|
||||
this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`);
|
||||
this.log(
|
||||
'IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!',
|
||||
);
|
||||
}
|
||||
|
||||
const instanceId = await UserSettings.getInstanceId();
|
||||
const { cli } = await GenericHelpers.getVersions();
|
||||
InternalHooksManager.init(instanceId, cli, nodeTypes);
|
||||
|
||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||
await BinaryDataManager.init(binaryDataConfig, true);
|
||||
|
||||
await Server.start();
|
||||
|
||||
// Start to get active workflows and run their triggers
|
||||
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
|
||||
await activeWorkflowRunner.init();
|
||||
|
||||
WaitTracker();
|
||||
|
||||
const editorUrl = GenericHelpers.getBaseUrl();
|
||||
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
|
||||
|
||||
const saveManualExecutions = config.getEnv('executions.saveDataManualExecutions');
|
||||
|
||||
if (saveManualExecutions) {
|
||||
this.log('\nManual executions will be visible only for the owner');
|
||||
}
|
||||
|
||||
// Allow to open n8n editor by pressing "o"
|
||||
if (Boolean(process.stdout.isTTY) && process.stdin.setRawMode) {
|
||||
process.stdin.setRawMode(true);
|
||||
process.stdin.resume();
|
||||
process.stdin.setEncoding('utf8');
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
let inputText = '';
|
||||
|
||||
if (flags.open) {
|
||||
Start.openBrowser();
|
||||
}
|
||||
this.log(`\nPress "o" to open in Browser.`);
|
||||
process.stdin.on('data', (key: string) => {
|
||||
if (key === 'o') {
|
||||
Start.openBrowser();
|
||||
inputText = '';
|
||||
} else if (key.charCodeAt(0) === 3) {
|
||||
// Ctrl + c got pressed
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
Start.stopProcess();
|
||||
} else {
|
||||
// When anything else got pressed, record it and send it on enter into the child process
|
||||
// eslint-disable-next-line no-lonely-if
|
||||
if (key.charCodeAt(0) === 13) {
|
||||
// send to child process and print in terminal
|
||||
process.stdout.write('\n');
|
||||
inputText = '';
|
||||
} else {
|
||||
// record it and write into terminal
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
inputText += key;
|
||||
process.stdout.write(key);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
this.error(`There was an error: ${error.message}`);
|
||||
|
||||
processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
89
packages/cli/src/commands/update/workflow.ts
Normal file
89
packages/cli/src/commands/update/workflow.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable no-console */
|
||||
import { Command, flags } from '@oclif/command';
|
||||
|
||||
import { IDataObject, LoggerProxy } from 'n8n-workflow';
|
||||
|
||||
import * as Db from '@/Db';
|
||||
|
||||
import { getLogger } from '@/Logger';
|
||||
|
||||
export class UpdateWorkflowCommand extends Command {
|
||||
static description = 'Update workflows';
|
||||
|
||||
static examples = [
|
||||
`$ n8n update:workflow --all --active=false`,
|
||||
`$ n8n update:workflow --id=5 --active=true`,
|
||||
];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
active: flags.string({
|
||||
description: 'Active state the workflow/s should be set to',
|
||||
}),
|
||||
all: flags.boolean({
|
||||
description: 'Operate on all workflows',
|
||||
}),
|
||||
id: flags.string({
|
||||
description: 'The ID of the workflow to operate on',
|
||||
}),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(UpdateWorkflowCommand);
|
||||
|
||||
if (!flags.all && !flags.id) {
|
||||
console.info(`Either option "--all" or "--id" have to be set!`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flags.all && flags.id) {
|
||||
console.info(
|
||||
`Either something else on top should be "--all" or "--id" can be set never both!`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const updateQuery: IDataObject = {};
|
||||
if (flags.active === undefined) {
|
||||
console.info(`No update flag like "--active=true" has been set!`);
|
||||
return;
|
||||
}
|
||||
if (!['false', 'true'].includes(flags.active)) {
|
||||
console.info(`Valid values for flag "--active" are only "false" or "true"!`);
|
||||
return;
|
||||
}
|
||||
updateQuery.active = flags.active === 'true';
|
||||
|
||||
try {
|
||||
await Db.init();
|
||||
|
||||
const findQuery: IDataObject = {};
|
||||
if (flags.id) {
|
||||
console.info(`Deactivating workflow with ID: ${flags.id}`);
|
||||
findQuery.id = flags.id;
|
||||
} else {
|
||||
console.info('Deactivating all workflows');
|
||||
findQuery.active = true;
|
||||
}
|
||||
|
||||
await Db.collections.Workflow.update(findQuery, updateQuery);
|
||||
console.info('Done');
|
||||
} catch (e) {
|
||||
console.error('Error updating database. See log messages for details.');
|
||||
logger.error('\nGOT ERROR');
|
||||
logger.info('====================================');
|
||||
logger.error(e.message);
|
||||
logger.error(e.stack);
|
||||
this.exit(1);
|
||||
}
|
||||
|
||||
this.exit();
|
||||
}
|
||||
}
|
||||
66
packages/cli/src/commands/user-management/reset.ts
Normal file
66
packages/cli/src/commands/user-management/reset.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { Not } from 'typeorm';
|
||||
import * as Db from '@/Db';
|
||||
import { CredentialsEntity } from '@db/entities/CredentialsEntity';
|
||||
import { BaseCommand } from '../BaseCommand';
|
||||
|
||||
export class Reset extends BaseCommand {
|
||||
static description = '\nResets the database to the default user state';
|
||||
|
||||
async run(): Promise<void> {
|
||||
const owner = await this.getInstanceOwner();
|
||||
|
||||
const ownerWorkflowRole = await Db.collections.Role.findOneOrFail({
|
||||
name: 'owner',
|
||||
scope: 'workflow',
|
||||
});
|
||||
|
||||
const ownerCredentialRole = await Db.collections.Role.findOneOrFail({
|
||||
name: 'owner',
|
||||
scope: 'credential',
|
||||
});
|
||||
|
||||
await Db.collections.SharedWorkflow.update(
|
||||
{ user: { id: Not(owner.id) }, role: ownerWorkflowRole },
|
||||
{ user: owner },
|
||||
);
|
||||
|
||||
await Db.collections.SharedCredentials.update(
|
||||
{ user: { id: Not(owner.id) }, role: ownerCredentialRole },
|
||||
{ user: owner },
|
||||
);
|
||||
|
||||
await Db.collections.User.delete({ id: Not(owner.id) });
|
||||
await Db.collections.User.save(Object.assign(owner, this.defaultUserProps));
|
||||
|
||||
const danglingCredentials: CredentialsEntity[] =
|
||||
(await Db.collections.Credentials.createQueryBuilder('credentials')
|
||||
.leftJoinAndSelect('credentials.shared', 'shared')
|
||||
.where('shared.credentialsId is null')
|
||||
.getMany()) as CredentialsEntity[];
|
||||
const newSharedCredentials = danglingCredentials.map((credentials) =>
|
||||
Db.collections.SharedCredentials.create({
|
||||
credentials,
|
||||
user: owner,
|
||||
role: ownerCredentialRole,
|
||||
}),
|
||||
);
|
||||
await Db.collections.SharedCredentials.save(newSharedCredentials);
|
||||
|
||||
await Db.collections.Settings.update(
|
||||
{ key: 'userManagement.isInstanceOwnerSetUp' },
|
||||
{ value: 'false' },
|
||||
);
|
||||
await Db.collections.Settings.update(
|
||||
{ key: 'userManagement.skipInstanceOwnerSetup' },
|
||||
{ value: 'false' },
|
||||
);
|
||||
|
||||
this.logger.info('Successfully reset the database to default user state.');
|
||||
}
|
||||
|
||||
async catch(error: Error): Promise<void> {
|
||||
this.logger.error('Error resetting database. See log messages for details.');
|
||||
this.logger.error(error.message);
|
||||
this.exit(1);
|
||||
}
|
||||
}
|
||||
246
packages/cli/src/commands/webhook.ts
Normal file
246
packages/cli/src/commands/webhook.ts
Normal file
@@ -0,0 +1,246 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable no-console */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
import { BinaryDataManager, UserSettings } from 'n8n-core';
|
||||
import { Command, flags } from '@oclif/command';
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies
|
||||
import Redis from 'ioredis';
|
||||
|
||||
import { IDataObject, LoggerProxy, sleep } from 'n8n-workflow';
|
||||
import config from '@/config';
|
||||
import * as ActiveExecutions from '@/ActiveExecutions';
|
||||
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
|
||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||
import { CredentialTypes } from '@/CredentialTypes';
|
||||
import * as Db from '@/Db';
|
||||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { InternalHooksManager } from '@/InternalHooksManager';
|
||||
import * as WebhookServer from '@/WebhookServer';
|
||||
import { getLogger } from '@/Logger';
|
||||
import { initErrorHandling } from '@/ErrorReporting';
|
||||
import * as CrashJournal from '@/CrashJournal';
|
||||
|
||||
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
|
||||
let processExitCode = 0;
|
||||
|
||||
export class Webhook extends Command {
|
||||
static description = 'Starts n8n webhook process. Intercepts only production URLs.';
|
||||
|
||||
static examples = [`$ n8n webhook`];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
};
|
||||
|
||||
/**
|
||||
* Stops n8n in a graceful way.
|
||||
* Make for example sure that all the webhooks from third party services
|
||||
* get removed.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
static async stopProcess() {
|
||||
LoggerProxy.info(`\nStopping n8n...`);
|
||||
|
||||
const exit = () => {
|
||||
CrashJournal.cleanup().finally(() => {
|
||||
process.exit(processExitCode);
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.run('n8n.stop', []);
|
||||
|
||||
setTimeout(() => {
|
||||
// In case that something goes wrong with shutdown we
|
||||
// kill after max. 30 seconds no matter what
|
||||
exit();
|
||||
}, 30000);
|
||||
|
||||
// Wait for active workflow executions to finish
|
||||
const activeExecutionsInstance = ActiveExecutions.getInstance();
|
||||
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
|
||||
let count = 0;
|
||||
while (executingWorkflows.length !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
LoggerProxy.info(
|
||||
`Waiting for ${executingWorkflows.length} active executions to finish...`,
|
||||
);
|
||||
}
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await sleep(500);
|
||||
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
|
||||
}
|
||||
} catch (error) {
|
||||
LoggerProxy.error('There was an error shutting down n8n.', error);
|
||||
}
|
||||
|
||||
exit();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// Make sure that n8n shuts down gracefully if possible
|
||||
process.once('SIGTERM', Webhook.stopProcess);
|
||||
process.once('SIGINT', Webhook.stopProcess);
|
||||
|
||||
initErrorHandling();
|
||||
await CrashJournal.init();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-shadow
|
||||
const { flags } = this.parse(Webhook);
|
||||
|
||||
// Wrap that the process does not close but we can still use async
|
||||
await (async () => {
|
||||
if (config.getEnv('executions.mode') !== 'queue') {
|
||||
/**
|
||||
* It is technically possible to run without queues but
|
||||
* there are 2 known bugs when running in this mode:
|
||||
* - Executions list will be problematic as the main process
|
||||
* is not aware of current executions in the webhook processes
|
||||
* and therefore will display all current executions as error
|
||||
* as it is unable to determine if it is still running or crashed
|
||||
* - You cannot stop currently executing jobs from webhook processes
|
||||
* when running without queues as the main process cannot talk to
|
||||
* the webhook processes to communicate workflow execution interruption.
|
||||
*/
|
||||
|
||||
this.error('Webhook processes can only run with execution mode as queue.');
|
||||
}
|
||||
|
||||
try {
|
||||
// Start directly with the init of the database to improve startup time
|
||||
const startDbInitPromise = Db.init().catch((error) => {
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions, @typescript-eslint/no-unsafe-member-access
|
||||
logger.error(`There was an error initializing DB: "${error.message}"`);
|
||||
|
||||
processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Make sure the settings exist
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const userSettings = await UserSettings.prepareUserSettings();
|
||||
|
||||
// Load all node and credential types
|
||||
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
||||
await loadNodesAndCredentials.init();
|
||||
|
||||
// Load the credentials overwrites if any exist
|
||||
const credentialsOverwrites = CredentialsOverwrites();
|
||||
await credentialsOverwrites.init();
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
// Add the found types to an instance other parts of the application can use
|
||||
const nodeTypes = NodeTypes();
|
||||
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
||||
const credentialTypes = CredentialTypes();
|
||||
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
||||
|
||||
// Wait till the database is ready
|
||||
await startDbInitPromise;
|
||||
|
||||
const instanceId = await UserSettings.getInstanceId();
|
||||
const { cli } = await GenericHelpers.getVersions();
|
||||
InternalHooksManager.init(instanceId, cli, nodeTypes);
|
||||
|
||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||
await BinaryDataManager.init(binaryDataConfig);
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
const redisHost = config.getEnv('queue.bull.redis.host');
|
||||
const redisPassword = config.getEnv('queue.bull.redis.password');
|
||||
const redisPort = config.getEnv('queue.bull.redis.port');
|
||||
const redisDB = config.getEnv('queue.bull.redis.db');
|
||||
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
|
||||
let lastTimer = 0;
|
||||
let cumulativeTimeout = 0;
|
||||
|
||||
const settings = {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
retryStrategy: (times: number): number | null => {
|
||||
const now = Date.now();
|
||||
if (now - lastTimer > 30000) {
|
||||
// Means we had no timeout at all or last timeout was temporary and we recovered
|
||||
lastTimer = now;
|
||||
cumulativeTimeout = 0;
|
||||
} else {
|
||||
cumulativeTimeout += now - lastTimer;
|
||||
lastTimer = now;
|
||||
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
|
||||
logger.error(
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
return 500;
|
||||
},
|
||||
} as IDataObject;
|
||||
|
||||
if (redisHost) {
|
||||
settings.host = redisHost;
|
||||
}
|
||||
if (redisPassword) {
|
||||
settings.password = redisPassword;
|
||||
}
|
||||
if (redisPort) {
|
||||
settings.port = redisPort;
|
||||
}
|
||||
if (redisDB) {
|
||||
settings.db = redisDB;
|
||||
}
|
||||
|
||||
// This connection is going to be our heartbeat
|
||||
// IORedis automatically pings redis and tries to reconnect
|
||||
// We will be using the retryStrategy above
|
||||
// to control how and when to exit.
|
||||
const redis = new Redis(settings);
|
||||
|
||||
redis.on('error', (error) => {
|
||||
if (error.toString().includes('ECONNREFUSED') === true) {
|
||||
logger.warn('Redis unavailable - trying to reconnect...');
|
||||
} else {
|
||||
logger.warn('Error with Redis: ', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
await WebhookServer.start();
|
||||
|
||||
// Start to get active workflows and run their triggers
|
||||
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
|
||||
await activeWorkflowRunner.initWebhooks();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const editorUrl = GenericHelpers.getBaseUrl();
|
||||
console.info('Webhook listener waiting for requests.');
|
||||
} catch (error) {
|
||||
console.error('Exiting due to error. See log message for details.');
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
logger.error(`Webhook process cannot continue. "${error.message}"`);
|
||||
|
||||
processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
process.exit(1);
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
453
packages/cli/src/commands/worker.ts
Normal file
453
packages/cli/src/commands/worker.ts
Normal file
@@ -0,0 +1,453 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable no-console */
|
||||
/* eslint-disable @typescript-eslint/no-shadow */
|
||||
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
|
||||
/* eslint-disable @typescript-eslint/unbound-method */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
// eslint-disable-next-line import/no-extraneous-dependencies
|
||||
import express from 'express';
|
||||
import http from 'http';
|
||||
import PCancelable from 'p-cancelable';
|
||||
|
||||
import { Command, flags } from '@oclif/command';
|
||||
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
|
||||
|
||||
import {
|
||||
IExecuteResponsePromiseData,
|
||||
INodeTypes,
|
||||
IRun,
|
||||
Workflow,
|
||||
LoggerProxy,
|
||||
sleep,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { FindOneOptions, getConnectionManager } from 'typeorm';
|
||||
|
||||
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
|
||||
import { CredentialTypes } from '@/CredentialTypes';
|
||||
import * as Db from '@/Db';
|
||||
import { ExternalHooks } from '@/ExternalHooks';
|
||||
import * as GenericHelpers from '@/GenericHelpers';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import * as ResponseHelper from '@/ResponseHelper';
|
||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
||||
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
||||
import { InternalHooksManager } from '@/InternalHooksManager';
|
||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { getLogger } from '@/Logger';
|
||||
|
||||
import config from '@/config';
|
||||
import * as Queue from '@/Queue';
|
||||
import {
|
||||
checkPermissionsForExecution,
|
||||
getWorkflowOwner,
|
||||
} from '@/UserManagement/UserManagementHelper';
|
||||
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
||||
|
||||
export class Worker extends Command {
|
||||
static description = '\nStarts a n8n worker';
|
||||
|
||||
static examples = [`$ n8n worker --concurrency=5`];
|
||||
|
||||
static flags = {
|
||||
help: flags.help({ char: 'h' }),
|
||||
concurrency: flags.integer({
|
||||
default: 10,
|
||||
description: 'How many jobs can run in parallel.',
|
||||
}),
|
||||
};
|
||||
|
||||
static runningJobs: {
|
||||
[key: string]: PCancelable<IRun>;
|
||||
} = {};
|
||||
|
||||
static jobQueue: Queue.JobQueue;
|
||||
|
||||
static processExitCode = 0;
|
||||
// static activeExecutions = ActiveExecutions.getInstance();
|
||||
|
||||
/**
|
||||
* Stop n8n in a graceful way.
|
||||
* Make for example sure that all the webhooks from third party services
|
||||
* get removed.
|
||||
*/
|
||||
static async stopProcess() {
|
||||
LoggerProxy.info(`Stopping n8n...`);
|
||||
|
||||
// Stop accepting new jobs
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
Worker.jobQueue.pause(true);
|
||||
|
||||
try {
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.run('n8n.stop', []);
|
||||
|
||||
const maxStopTime = config.getEnv('queue.bull.gracefulShutdownTimeout') * 1000;
|
||||
|
||||
const stopTime = new Date().getTime() + maxStopTime;
|
||||
|
||||
setTimeout(() => {
|
||||
// In case that something goes wrong with shutdown we
|
||||
// kill after max. 30 seconds no matter what
|
||||
process.exit(Worker.processExitCode);
|
||||
}, maxStopTime);
|
||||
|
||||
// Wait for active workflow executions to finish
|
||||
let count = 0;
|
||||
while (Object.keys(Worker.runningJobs).length !== 0) {
|
||||
if (count++ % 4 === 0) {
|
||||
const waitLeft = Math.ceil((stopTime - new Date().getTime()) / 1000);
|
||||
LoggerProxy.info(
|
||||
`Waiting for ${
|
||||
Object.keys(Worker.runningJobs).length
|
||||
} active executions to finish... (wait ${waitLeft} more seconds)`,
|
||||
);
|
||||
}
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await sleep(500);
|
||||
}
|
||||
} catch (error) {
|
||||
LoggerProxy.error('There was an error shutting down n8n.', error);
|
||||
}
|
||||
|
||||
process.exit(Worker.processExitCode);
|
||||
}
|
||||
|
||||
async runJob(job: Queue.Job, nodeTypes: INodeTypes): Promise<Queue.JobResponse> {
|
||||
const { executionId, loadStaticData } = job.data;
|
||||
const executionDb = await Db.collections.Execution.findOne(executionId);
|
||||
|
||||
if (!executionDb) {
|
||||
LoggerProxy.error(
|
||||
`Worker failed to find data of execution "${executionId}" in database. Cannot continue.`,
|
||||
{ executionId },
|
||||
);
|
||||
throw new Error(
|
||||
`Unable to find data of execution "${executionId}" in database. Aborting execution.`,
|
||||
);
|
||||
}
|
||||
const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb);
|
||||
LoggerProxy.info(
|
||||
`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${executionId})`,
|
||||
);
|
||||
|
||||
const workflowOwner = await getWorkflowOwner(currentExecutionDb.workflowData.id!.toString());
|
||||
|
||||
let { staticData } = currentExecutionDb.workflowData;
|
||||
if (loadStaticData) {
|
||||
const findOptions = {
|
||||
select: ['id', 'staticData'],
|
||||
} as FindOneOptions;
|
||||
const workflowData = await Db.collections.Workflow.findOne(
|
||||
currentExecutionDb.workflowData.id,
|
||||
findOptions,
|
||||
);
|
||||
if (workflowData === undefined) {
|
||||
LoggerProxy.error(
|
||||
'Worker execution failed because workflow could not be found in database.',
|
||||
{
|
||||
workflowId: currentExecutionDb.workflowData.id,
|
||||
executionId,
|
||||
},
|
||||
);
|
||||
throw new Error(
|
||||
`The workflow with the ID "${currentExecutionDb.workflowData.id}" could not be found`,
|
||||
);
|
||||
}
|
||||
staticData = workflowData.staticData;
|
||||
}
|
||||
|
||||
let workflowTimeout = config.getEnv('executions.timeout'); // initialize with default
|
||||
if (
|
||||
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain
|
||||
currentExecutionDb.workflowData.settings &&
|
||||
currentExecutionDb.workflowData.settings.executionTimeout
|
||||
) {
|
||||
workflowTimeout = currentExecutionDb.workflowData.settings.executionTimeout as number; // preference on workflow setting
|
||||
}
|
||||
|
||||
let executionTimeoutTimestamp: number | undefined;
|
||||
if (workflowTimeout > 0) {
|
||||
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout'));
|
||||
executionTimeoutTimestamp = Date.now() + workflowTimeout * 1000;
|
||||
}
|
||||
|
||||
const workflow = new Workflow({
|
||||
id: currentExecutionDb.workflowData.id as string,
|
||||
name: currentExecutionDb.workflowData.name,
|
||||
nodes: currentExecutionDb.workflowData.nodes,
|
||||
connections: currentExecutionDb.workflowData.connections,
|
||||
active: currentExecutionDb.workflowData.active,
|
||||
nodeTypes,
|
||||
staticData,
|
||||
settings: currentExecutionDb.workflowData.settings,
|
||||
});
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(
|
||||
workflowOwner.id,
|
||||
undefined,
|
||||
executionTimeoutTimestamp,
|
||||
);
|
||||
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
|
||||
currentExecutionDb.mode,
|
||||
job.data.executionId,
|
||||
currentExecutionDb.workflowData,
|
||||
{ retryOf: currentExecutionDb.retryOf as string },
|
||||
);
|
||||
|
||||
try {
|
||||
await checkPermissionsForExecution(workflow, workflowOwner.id);
|
||||
} catch (error) {
|
||||
const failedExecution = generateFailedExecutionFromError(
|
||||
currentExecutionDb.mode,
|
||||
error,
|
||||
error.node,
|
||||
);
|
||||
await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]);
|
||||
return {
|
||||
success: true,
|
||||
};
|
||||
}
|
||||
|
||||
additionalData.hooks.hookFunctions.sendResponse = [
|
||||
async (response: IExecuteResponsePromiseData): Promise<void> => {
|
||||
const progress: Queue.WebhookResponse = {
|
||||
executionId,
|
||||
response: WebhookHelpers.encodeWebhookResponse(response),
|
||||
};
|
||||
await job.progress(progress);
|
||||
},
|
||||
];
|
||||
|
||||
additionalData.executionId = executionId;
|
||||
|
||||
let workflowExecute: WorkflowExecute;
|
||||
let workflowRun: PCancelable<IRun>;
|
||||
if (currentExecutionDb.data !== undefined) {
|
||||
workflowExecute = new WorkflowExecute(
|
||||
additionalData,
|
||||
currentExecutionDb.mode,
|
||||
currentExecutionDb.data,
|
||||
);
|
||||
workflowRun = workflowExecute.processRunExecutionData(workflow);
|
||||
} else {
|
||||
// Execute all nodes
|
||||
// Can execute without webhook so go on
|
||||
workflowExecute = new WorkflowExecute(additionalData, currentExecutionDb.mode);
|
||||
workflowRun = workflowExecute.run(workflow);
|
||||
}
|
||||
|
||||
Worker.runningJobs[job.id] = workflowRun;
|
||||
|
||||
// Wait till the execution is finished
|
||||
await workflowRun;
|
||||
|
||||
delete Worker.runningJobs[job.id];
|
||||
|
||||
return {
|
||||
success: true,
|
||||
};
|
||||
}
|
||||
|
||||
async run() {
|
||||
const logger = getLogger();
|
||||
LoggerProxy.init(logger);
|
||||
|
||||
// eslint-disable-next-line no-console
|
||||
console.info('Starting n8n worker...');
|
||||
|
||||
// Make sure that n8n shuts down gracefully if possible
|
||||
process.once('SIGTERM', Worker.stopProcess);
|
||||
process.once('SIGINT', Worker.stopProcess);
|
||||
|
||||
// Wrap that the process does not close but we can still use async
|
||||
await (async () => {
|
||||
try {
|
||||
const { flags } = this.parse(Worker);
|
||||
|
||||
// Start directly with the init of the database to improve startup time
|
||||
const startDbInitPromise = Db.init().catch((error) => {
|
||||
logger.error(`There was an error initializing DB: "${error.message}"`);
|
||||
|
||||
Worker.processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Make sure the settings exist
|
||||
await UserSettings.prepareUserSettings();
|
||||
|
||||
// Load all node and credential types
|
||||
const loadNodesAndCredentials = LoadNodesAndCredentials();
|
||||
await loadNodesAndCredentials.init();
|
||||
|
||||
// Load the credentials overwrites if any exist
|
||||
const credentialsOverwrites = CredentialsOverwrites();
|
||||
await credentialsOverwrites.init();
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
// Add the found types to an instance other parts of the application can use
|
||||
const nodeTypes = NodeTypes();
|
||||
await nodeTypes.init(loadNodesAndCredentials.nodeTypes);
|
||||
const credentialTypes = CredentialTypes();
|
||||
await credentialTypes.init(loadNodesAndCredentials.credentialTypes);
|
||||
|
||||
// Wait till the database is ready
|
||||
await startDbInitPromise;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold');
|
||||
|
||||
Worker.jobQueue = Queue.getInstance().getBullObjectInstance();
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
Worker.jobQueue.process(flags.concurrency, async (job) => this.runJob(job, nodeTypes));
|
||||
|
||||
const versions = await GenericHelpers.getVersions();
|
||||
const instanceId = await UserSettings.getInstanceId();
|
||||
|
||||
InternalHooksManager.init(instanceId, versions.cli, nodeTypes);
|
||||
|
||||
const binaryDataConfig = config.getEnv('binaryDataManager');
|
||||
await BinaryDataManager.init(binaryDataConfig);
|
||||
|
||||
console.info('\nn8n worker is now ready');
|
||||
console.info(` * Version: ${versions.cli}`);
|
||||
console.info(` * Concurrency: ${flags.concurrency}`);
|
||||
console.info('');
|
||||
|
||||
Worker.jobQueue.on('global:progress', (jobId, progress) => {
|
||||
// Progress of a job got updated which does get used
|
||||
// to communicate that a job got canceled.
|
||||
|
||||
if (progress === -1) {
|
||||
// Job has to get canceled
|
||||
if (Worker.runningJobs[jobId] !== undefined) {
|
||||
// Job is processed by current worker so cancel
|
||||
Worker.runningJobs[jobId].cancel();
|
||||
delete Worker.runningJobs[jobId];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let lastTimer = 0;
|
||||
let cumulativeTimeout = 0;
|
||||
Worker.jobQueue.on('error', (error: Error) => {
|
||||
if (error.toString().includes('ECONNREFUSED')) {
|
||||
const now = Date.now();
|
||||
if (now - lastTimer > 30000) {
|
||||
// Means we had no timeout at all or last timeout was temporary and we recovered
|
||||
lastTimer = now;
|
||||
cumulativeTimeout = 0;
|
||||
} else {
|
||||
cumulativeTimeout += now - lastTimer;
|
||||
lastTimer = now;
|
||||
if (cumulativeTimeout > redisConnectionTimeoutLimit) {
|
||||
logger.error(
|
||||
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
logger.warn('Redis unavailable - trying to reconnect...');
|
||||
} else if (error.toString().includes('Error initializing Lua scripts')) {
|
||||
// This is a non-recoverable error
|
||||
// Happens when worker starts and Redis is unavailable
|
||||
// Even if Redis comes back online, worker will be zombie
|
||||
logger.error('Error initializing worker.');
|
||||
process.exit(2);
|
||||
} else {
|
||||
logger.error('Error from queue: ', error);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
if (config.getEnv('queue.health.active')) {
|
||||
const port = config.getEnv('queue.health.port');
|
||||
|
||||
const app = express();
|
||||
app.disable('x-powered-by');
|
||||
|
||||
const server = http.createServer(app);
|
||||
|
||||
app.get(
|
||||
'/healthz',
|
||||
// eslint-disable-next-line consistent-return
|
||||
async (req: express.Request, res: express.Response) => {
|
||||
LoggerProxy.debug('Health check started!');
|
||||
|
||||
const connection = getConnectionManager().get();
|
||||
|
||||
try {
|
||||
if (!connection.isConnected) {
|
||||
// Connection is not active
|
||||
throw new Error('No active database connection!');
|
||||
}
|
||||
// DB ping
|
||||
await connection.query('SELECT 1');
|
||||
} catch (e) {
|
||||
LoggerProxy.error('No Database connection!', e);
|
||||
const error = new ResponseHelper.ResponseError(
|
||||
'No Database connection!',
|
||||
undefined,
|
||||
503,
|
||||
);
|
||||
return ResponseHelper.sendErrorResponse(res, error);
|
||||
}
|
||||
|
||||
// Just to be complete, generally will the worker stop automatically
|
||||
// if it loses the connection to redis
|
||||
try {
|
||||
// Redis ping
|
||||
await Worker.jobQueue.client.ping();
|
||||
} catch (e) {
|
||||
LoggerProxy.error('No Redis connection!', e);
|
||||
const error = new ResponseHelper.ResponseError(
|
||||
'No Redis connection!',
|
||||
undefined,
|
||||
503,
|
||||
);
|
||||
return ResponseHelper.sendErrorResponse(res, error);
|
||||
}
|
||||
|
||||
// Everything fine
|
||||
const responseData = {
|
||||
status: 'ok',
|
||||
};
|
||||
|
||||
LoggerProxy.debug('Health check completed successfully!');
|
||||
|
||||
ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
|
||||
},
|
||||
);
|
||||
|
||||
server.listen(port, () => {
|
||||
console.info(`\nn8n worker health check via, port ${port}`);
|
||||
});
|
||||
|
||||
server.on('error', (error: Error & { code: string }) => {
|
||||
if (error.code === 'EADDRINUSE') {
|
||||
console.log(
|
||||
`n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Worker process cannot continue. "${error.message}"`);
|
||||
|
||||
Worker.processExitCode = 1;
|
||||
// @ts-ignore
|
||||
process.emit('SIGINT');
|
||||
process.exit(1);
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user