refactor(core): Parse Webhook request bodies on-demand (#6394)

Also,
1. Consistent CORS support ~on all three webhook types~ waiting webhooks never supported CORS. I'll fix that in another PR
2. [Fixes binary-data handling when request body is text, json, or xml](https://linear.app/n8n/issue/NODE-505/webhook-binary-data-handling-fails-for-textplain-files).
3. Reduced number of middleware that each request has to go through.
4. Removed the need to maintain webhook endpoints in the auth-exception list.
5. Skip all middlewares (apart from `compression`) on Webhook routes. 
6. move `multipart/form-data` support out of individual nodes
7. upgrade `formidable`
8. fix the filenames on binary-data in webhooks nodes
9. add unit tests and integration tests for webhook request handling, and increase test coverage
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-08-01 17:32:30 +02:00
committed by GitHub
parent 369a2e9796
commit 31d8f478ee
29 changed files with 905 additions and 604 deletions

View File

@@ -1,40 +1,28 @@
import { Container } from 'typedi';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import type { Url } from 'url';
import express from 'express';
import bodyParser from 'body-parser';
import bodyParserXml from 'body-parser-xml';
import compression from 'compression';
import parseUrl from 'parseurl';
import type { RedisOptions } from 'ioredis';
import type { WebhookHttpMethod } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION, inDevelopment } from '@/constants';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import type { IExternalHooksClass } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
import {
send,
sendErrorResponse,
sendSuccessResponse,
ServiceUnavailableError,
} from '@/ResponseHelper';
import { corsMiddleware } from '@/middlewares';
import { send, sendErrorResponse, ServiceUnavailableError } from '@/ResponseHelper';
import { rawBody, jsonParser, corsMiddleware } from '@/middlewares';
import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
import { getRedisClusterNodes } from './GenericHelpers';
const emptyBuffer = Buffer.alloc(0);
import { webhookRequestHandler } from '@/WebhookHelpers';
export abstract class AbstractServer {
protected server: Server;
protected app: express.Application;
readonly app: express.Application;
protected externalHooks: IExternalHooksClass;
@@ -58,7 +46,9 @@ export abstract class AbstractServer {
protected instanceId = '';
abstract configure(): Promise<void>;
protected webhooksEnabled = true;
protected testWebhooksEnabled = false;
constructor() {
this.app = express();
@@ -76,6 +66,10 @@ export abstract class AbstractServer {
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
}
async configure(): Promise<void> {
// Additional configuration in derived classes
}
private async setupErrorHandlers() {
const { app } = this;
@@ -87,66 +81,12 @@ export abstract class AbstractServer {
app.use(errorHandler());
}
private async setupCommonMiddlewares() {
const { app } = this;
private setupCommonMiddlewares() {
// Compress the response data
app.use(compression());
this.app.use(compression());
// Make sure that each request has the "parsedUrl" parameter
app.use((req, res, next) => {
req.parsedUrl = parseUrl(req)!;
req.rawBody = emptyBuffer;
next();
});
const payloadSizeMax = config.getEnv('endpoints.payloadSizeMax');
// Support application/json type post data
app.use(
bodyParser.json({
limit: `${payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Support application/xml type post data
bodyParserXml(bodyParser);
app.use(
bodyParser.xml({
limit: `${payloadSizeMax}mb`,
xmlParseOptions: {
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
},
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
app.use(
bodyParser.text({
limit: `${payloadSizeMax}mb`,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// support application/x-www-form-urlencoded post data
app.use(
bodyParser.urlencoded({
limit: `${payloadSizeMax}mb`,
extended: false,
verify: (req, res, buf) => {
req.rawBody = buf;
},
}),
);
// Read incoming data into `rawBody`
this.app.use(rawBody);
}
private setupDevMiddlewares() {
@@ -246,163 +186,6 @@ export abstract class AbstractServer {
});
}
// ----------------------------------------
// Regular Webhooks
// ----------------------------------------
protected setupWebhookEndpoint() {
const endpoint = this.endpointWebhook;
const activeWorkflowRunner = this.activeWorkflowRunner;
// Register all webhook requests
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await activeWorkflowRunner.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await activeWorkflowRunner.executeWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
}
// ----------------------------------------
// Waiting Webhooks
// ----------------------------------------
protected setupWaitingWebhookEndpoint() {
const endpoint = this.endpointWebhookWaiting;
const waitingWebhooks = Container.get(WaitingWebhooks);
// Register all webhook-waiting requests
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook-waiting/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await waitingWebhooks.executeWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
}
// ----------------------------------------
// Testing Webhooks
// ----------------------------------------
protected setupTestWebhookEndpoint() {
const endpoint = this.endpointWebhookTest;
const testWebhooks = Container.get(TestWebhooks);
// Register all test webhook requests (for testing via the UI)
this.app.all(`/${endpoint}/*`, async (req, res) => {
// Cut away the "/webhook-test/" to get the registered part of the url
const requestUrl = req.parsedUrl.pathname!.slice(endpoint.length + 2);
const method = req.method.toUpperCase() as WebhookHttpMethod;
if (method === 'OPTIONS') {
let allowedMethods: string[];
try {
allowedMethods = await testWebhooks.getWebhookMethods(requestUrl);
allowedMethods.push('OPTIONS');
// Add custom "Allow" header to satisfy OPTIONS response.
res.append('Allow', allowedMethods);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
res.header('Access-Control-Allow-Origin', '*');
sendSuccessResponse(res, {}, true, 204);
return;
}
if (!WEBHOOK_METHODS.includes(method)) {
sendErrorResponse(res, new Error(`The method ${method} is not supported.`));
return;
}
let response;
try {
response = await testWebhooks.callTestWebhook(method, requestUrl, req, res);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
sendErrorResponse(res, error);
return;
}
if (response.noWebhookResponse === true) {
// Nothing else to do as the response got already sent
return;
}
sendSuccessResponse(res, response.data, true, response.responseCode, response.headers);
});
// Removes a test webhook
// TODO UM: check if this needs validation with user management.
this.app.delete(
`/${this.restEndpoint}/test-webhook/:id`,
send(async (req) => testWebhooks.cancelTestWebhook(req.params.id)),
);
}
async init(): Promise<void> {
const { app, protocol, sslKey, sslCert } = this;
@@ -443,27 +226,60 @@ export abstract class AbstractServer {
}
async start(): Promise<void> {
await this.setupErrorHandlers();
this.setupPushServer();
await this.setupCommonMiddlewares();
if (!inTest) {
await this.setupErrorHandlers();
this.setupPushServer();
}
this.setupCommonMiddlewares();
// Setup webhook handlers before bodyParser, to let the Webhook node handle binary data in requests
if (this.webhooksEnabled) {
// Register a handler for active webhooks
this.app.all(
`/${this.endpointWebhook}/:path(*)`,
webhookRequestHandler(Container.get(ActiveWorkflowRunner)),
);
// Register a handler for waiting webhooks
this.app.all(
`/${this.endpointWebhookWaiting}/:path/:suffix?`,
webhookRequestHandler(Container.get(WaitingWebhooks)),
);
}
if (this.testWebhooksEnabled) {
const testWebhooks = Container.get(TestWebhooks);
// Register a handler for test webhooks
this.app.all(`/${this.endpointWebhookTest}/:path(*)`, webhookRequestHandler(testWebhooks));
// Removes a test webhook
// TODO UM: check if this needs validation with user management.
this.app.delete(
`/${this.restEndpoint}/test-webhook/:id`,
send(async (req) => testWebhooks.cancelTestWebhook(req.params.id)),
);
}
if (inDevelopment) {
this.setupDevMiddlewares();
}
// Setup JSON parsing middleware after the webhook handlers are setup
this.app.use(jsonParser);
await this.configure();
console.log(`Version: ${N8N_VERSION}`);
const defaultLocale = config.getEnv('defaultLocale');
if (defaultLocale !== 'en') {
console.log(`Locale: ${defaultLocale}`);
if (!inTest) {
console.log(`Version: ${N8N_VERSION}`);
const defaultLocale = config.getEnv('defaultLocale');
if (defaultLocale !== 'en') {
console.log(`Locale: ${defaultLocale}`);
}
await this.externalHooks.run('n8n.ready', [this, config]);
}
await this.externalHooks.run('n8n.ready', [this, config]);
}
}
declare module 'http' {
export interface IncomingMessage {
parsedUrl: Url;
}
}

View File

@@ -1,7 +1,7 @@
import { Service } from 'typedi';
import type {
IWebhookData,
WebhookHttpMethod,
IHttpRequestMethods,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
@@ -102,7 +102,7 @@ export class ActiveWebhooks {
*
* @param {(string | undefined)} webhookId
*/
get(httpMethod: WebhookHttpMethod, path: string, webhookId?: string): IWebhookData | undefined {
get(httpMethod: IHttpRequestMethods, path: string, webhookId?: string): IWebhookData | undefined {
const webhookKey = this.getWebhookKey(httpMethod, path, webhookId);
if (this.webhookUrls[webhookKey] === undefined) {
return undefined;
@@ -133,17 +133,10 @@ export class ActiveWebhooks {
/**
* Gets all request methods associated with a single webhook
*/
getWebhookMethods(path: string): string[] {
const methods: string[] = [];
Object.keys(this.webhookUrls)
getWebhookMethods(path: string): IHttpRequestMethods[] {
return Object.keys(this.webhookUrls)
.filter((key) => key.includes(path))
.map((key) => {
methods.push(key.split('|')[0]);
});
return methods;
.map((key) => key.split('|')[0] as IHttpRequestMethods);
}
/**
@@ -159,7 +152,7 @@ export class ActiveWebhooks {
*
* @param {(string | undefined)} webhookId
*/
getWebhookKey(httpMethod: WebhookHttpMethod, path: string, webhookId?: string): string {
getWebhookKey(httpMethod: IHttpRequestMethods, path: string, webhookId?: string): string {
if (webhookId) {
if (path.startsWith(webhookId)) {
const cutFromIndex = path.indexOf('/') + 1;

View File

@@ -1,6 +1,3 @@
/* eslint-disable prefer-spread */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
@@ -21,10 +18,11 @@ import type {
IRunExecutionData,
IWorkflowBase,
IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow,
WebhookHttpMethod,
IHttpRequestMethods,
WorkflowActivateMode,
WorkflowExecuteMode,
INodeType,
IWebhookData,
} from 'n8n-workflow';
import {
NodeHelpers,
@@ -41,8 +39,10 @@ import type {
IActivationError,
IQueuedWorkflowActivations,
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
IWorkflowExecutionDataProcess,
WebhookRequest,
} from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
@@ -73,7 +73,7 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@Service()
export class ActiveWorkflowRunner {
export class ActiveWorkflowRunner implements IWebhookManager {
private activeWorkflows = new ActiveWorkflows();
private activationErrors: {
@@ -168,7 +168,7 @@ export class ActiveWorkflowRunner {
let activeWorkflowIds: string[] = [];
Logger.verbose('Call to remove all active workflows received (removeAll)');
activeWorkflowIds.push.apply(activeWorkflowIds, this.activeWorkflows.allActiveWorkflows());
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
const activeWorkflows = await this.getActiveWorkflows();
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
@@ -187,15 +187,13 @@ export class ActiveWorkflowRunner {
* Checks if a webhook for the given method and path exists and executes the workflow.
*/
async executeWebhook(
httpMethod: WebhookHttpMethod,
path: string,
req: express.Request,
res: express.Response,
request: WebhookRequest,
response: express.Response,
): Promise<IResponseCallbackData> {
Logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
const httpMethod = request.method;
let path = request.params.path;
// Reset request parameters
req.params = {};
Logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
// Remove trailing slash
if (path.endsWith('/')) {
@@ -245,6 +243,7 @@ export class ActiveWorkflowRunner {
webhook = dynamicWebhook;
}
});
if (webhook === null) {
throw new ResponseHelper.NotFoundError(
webhookNotFoundErrorMessage(path, httpMethod),
@@ -253,14 +252,14 @@ export class ActiveWorkflowRunner {
}
// @ts-ignore
path = webhook.webhookPath;
// extracting params from path
// @ts-ignore
webhook.webhookPath.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
req.params[ele.slice(1)] = pathElements[index];
// @ts-ignore
request.params[ele.slice(1)] = pathElements[index];
}
});
}
@@ -294,9 +293,7 @@ export class ActiveWorkflowRunner {
workflow,
workflow.getNode(webhook.node) as INode,
additionalData,
).filter((webhook) => {
return webhook.httpMethod === httpMethod && webhook.path === path;
})[0];
).find((w) => w.httpMethod === httpMethod && w.path === path) as IWebhookData;
// Get the node which has the webhook defined to know where to start from and to
// get additional data
@@ -317,9 +314,8 @@ export class ActiveWorkflowRunner {
undefined,
undefined,
undefined,
req,
res,
request,
response,
(error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
@@ -333,7 +329,7 @@ export class ActiveWorkflowRunner {
/**
* Gets all request methods associated with a single webhook
*/
async getWebhookMethods(path: string): Promise<string[]> {
async getWebhookMethods(path: string): Promise<IHttpRequestMethods[]> {
const webhooks = await this.webhookRepository.find({
select: ['method'],
where: { webhookPath: path },
@@ -479,10 +475,10 @@ export class ActiveWorkflowRunner {
try {
await this.removeWorkflowWebhooks(workflow.id as string);
} catch (error) {
ErrorReporter.error(error);
} catch (error1) {
ErrorReporter.error(error1);
Logger.error(
`Could not remove webhooks of workflow "${workflow.id}" because of error: "${error.message}"`,
`Could not remove webhooks of workflow "${workflow.id}" because of error: "${error1.message}"`,
);
}

View File

@@ -1,4 +1,4 @@
import type { Application } from 'express';
import type { Application, Request, Response } from 'express';
import type {
ExecutionError,
ICredentialDataDecryptedObject,
@@ -22,6 +22,7 @@ import type {
IExecutionsSummary,
FeatureFlags,
IUserSettings,
IHttpRequestMethods,
} from 'n8n-workflow';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@@ -300,6 +301,19 @@ export interface IExternalHooksClass {
run(hookName: string, hookParameters?: any[]): Promise<void>;
}
export type WebhookCORSRequest = Request & { method: 'OPTIONS' };
export type WebhookRequest = Request<{ path: string }> & { method: IHttpRequestMethods };
export type WaitingWebhookRequest = WebhookRequest & {
params: WebhookRequest['path'] & { suffix?: string };
};
export interface IWebhookManager {
getWebhookMethods?: (path: string) => Promise<IHttpRequestMethods[]>;
executeWebhook(req: WebhookRequest, res: Response): Promise<IResponseCallbackData>;
}
export interface IDiagnosticInfo {
versionCli: string;
databaseType: DatabaseType;

View File

@@ -69,8 +69,8 @@ export class ConflictError extends ResponseError {
}
export class UnprocessableRequestError extends ResponseError {
constructor(message: string) {
super(message, 422);
constructor(message: string, hint: string | undefined = undefined) {
super(message, 422, 422, hint);
}
}

View File

@@ -201,6 +201,9 @@ export class Server extends AbstractServer {
this.app.set('view engine', 'handlebars');
this.app.set('views', TEMPLATES_DIR);
this.testWebhooksEnabled = true;
this.webhooksEnabled = !config.getEnv('endpoints.disableProductionWebhooksOnMainProcess');
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const telemetrySettings: ITelemetrySettings = {
enabled: config.getEnv('diagnostics.enabled'),
@@ -544,8 +547,6 @@ export class Server extends AbstractServer {
'healthz',
'metrics',
'e2e',
this.endpointWebhook,
this.endpointWebhookTest,
this.endpointPresetCredentials,
isApiEnabled() ? '' : publicApiEndpoint,
...excludeEndpoints.split(':'),
@@ -1387,17 +1388,6 @@ export class Server extends AbstractServer {
await eventBus.initialize();
}
// ----------------------------------------
// Webhooks
// ----------------------------------------
if (!config.getEnv('endpoints.disableProductionWebhooksOnMainProcess')) {
this.setupWebhookEndpoint();
this.setupWaitingWebhookEndpoint();
}
this.setupTestWebhookEndpoint();
if (this.endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
this.app.post(
@@ -1406,7 +1396,7 @@ export class Server extends AbstractServer {
if (!this.presetCredentialsLoaded) {
const body = req.body as ICredentialsOverwrite;
if (req.headers['content-type'] !== 'application/json') {
if (req.contentType !== 'application/json') {
ResponseHelper.sendErrorResponse(
res,
new Error(

View File

@@ -4,14 +4,19 @@ import { Service } from 'typedi';
import type {
IWebhookData,
IWorkflowExecuteAdditionalData,
WebhookHttpMethod,
IHttpRequestMethods,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ActiveWebhooks } from '@/ActiveWebhooks';
import type { IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import type {
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
WebhookRequest,
} from '@/Interfaces';
import { Push } from '@/push';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
@@ -21,7 +26,7 @@ const WEBHOOK_TEST_UNREGISTERED_HINT =
"Click the 'Execute workflow' button on the canvas, then try again. (In test mode, the webhook only works for one call after you click this button)";
@Service()
export class TestWebhooks {
export class TestWebhooks implements IWebhookManager {
private testWebhookData: {
[key: string]: {
sessionId?: string;
@@ -44,14 +49,12 @@ export class TestWebhooks {
* data gets additionally send to the UI. After the request got handled it
* automatically remove the test-webhook.
*/
async callTestWebhook(
httpMethod: WebhookHttpMethod,
path: string,
request: express.Request,
async executeWebhook(
request: WebhookRequest,
response: express.Response,
): Promise<IResponseCallbackData> {
// Reset request parameters
request.params = {};
const httpMethod = request.method;
let path = request.params.path;
// Remove trailing slash
if (path.endsWith('/')) {
@@ -82,6 +85,7 @@ export class TestWebhooks {
path.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
// @ts-ignore
request.params[ele.slice(1)] = pathElements[index];
}
});
@@ -157,7 +161,7 @@ export class TestWebhooks {
/**
* Gets all request methods associated with a single test webhook
*/
async getWebhookMethods(path: string): Promise<string[]> {
async getWebhookMethods(path: string): Promise<IHttpRequestMethods[]> {
const webhookMethods = this.activeWebhooks.getWebhookMethods(path);
if (!webhookMethods.length) {
// The requested webhook is not registered

View File

@@ -1,4 +1,3 @@
import type { INode, WebhookHttpMethod } from 'n8n-workflow';
import { NodeHelpers, Workflow, LoggerProxy as Logger } from 'n8n-workflow';
import { Service } from 'typedi';
import type express from 'express';
@@ -6,41 +5,34 @@ import type express from 'express';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import { NodeTypes } from '@/NodeTypes';
import type { IExecutionResponse, IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import type {
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
WaitingWebhookRequest,
} from '@/Interfaces';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { ExecutionRepository } from '@db/repositories';
import { OwnershipService } from './services/ownership.service';
@Service()
export class WaitingWebhooks {
export class WaitingWebhooks implements IWebhookManager {
constructor(
private nodeTypes: NodeTypes,
private executionRepository: ExecutionRepository,
private ownershipService: OwnershipService,
) {}
// TODO: implement `getWebhookMethods` for CORS support
async executeWebhook(
httpMethod: WebhookHttpMethod,
fullPath: string,
req: express.Request,
req: WaitingWebhookRequest,
res: express.Response,
): Promise<IResponseCallbackData> {
Logger.debug(`Received waiting-webhook "${httpMethod}" for path "${fullPath}"`);
const { path: executionId, suffix } = req.params;
Logger.debug(`Received waiting-webhook "${req.method}" for execution "${executionId}"`);
// Reset request parameters
req.params = {};
// Remove trailing slash
if (fullPath.endsWith('/')) {
fullPath = fullPath.slice(0, -1);
}
const pathParts = fullPath.split('/');
const executionId = pathParts.shift();
const path = pathParts.join('/');
const execution = await this.executionRepository.findSingleExecution(executionId as string, {
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
@@ -53,35 +45,19 @@ export class WaitingWebhooks {
throw new ResponseHelper.ConflictError(`The execution "${executionId} has finished already.`);
}
return this.startExecution(httpMethod, path, execution, req, res);
}
async startExecution(
httpMethod: WebhookHttpMethod,
path: string,
fullExecutionData: IExecutionResponse,
req: express.Request,
res: express.Response,
): Promise<IResponseCallbackData> {
const executionId = fullExecutionData.id;
if (fullExecutionData.finished) {
throw new Error('The execution did succeed and can so not be started again.');
}
const lastNodeExecuted = fullExecutionData.data.resultData.lastNodeExecuted as string;
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
// Set the node as disabled so that the data does not get executed again as it would result
// in starting the wait all over again
fullExecutionData.data.executionData!.nodeExecutionStack[0].node.disabled = true;
execution.data.executionData!.nodeExecutionStack[0].node.disabled = true;
// Remove waitTill information else the execution would stop
fullExecutionData.data.waitTill = undefined;
execution.data.waitTill = undefined;
// Remove the data of the node execution again else it will display the node as executed twice
fullExecutionData.data.resultData.runData[lastNodeExecuted].pop();
execution.data.resultData.runData[lastNodeExecuted].pop();
const { workflowData } = fullExecutionData;
const { workflowData } = execution;
const workflow = new Workflow({
id: workflowData.id!,
@@ -101,34 +77,31 @@ export class WaitingWebhooks {
throw new ResponseHelper.NotFoundError('Could not find workflow');
}
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.id);
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflow.getNode(lastNodeExecuted) as INode,
additionalData,
).find((webhook) => {
return (
webhook.httpMethod === httpMethod &&
webhook.path === path &&
webhook.webhookDescription.restartWebhook === true
);
});
if (webhookData === undefined) {
// If no data got found it means that the execution can not be started via a webhook.
// Return 404 because we do not want to give any data if the execution exists or not.
const errorMessage = `The execution "${executionId}" with webhook suffix path "${path}" is not known.`;
throw new ResponseHelper.NotFoundError(errorMessage);
}
const workflowStartNode = workflow.getNode(lastNodeExecuted);
if (workflowStartNode === null) {
throw new ResponseHelper.NotFoundError('Could not find node to process webhook.');
}
const runExecutionData = fullExecutionData.data;
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.id);
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflowStartNode,
additionalData,
).find(
(webhook) =>
webhook.httpMethod === req.method &&
webhook.path === (suffix ?? '') &&
webhook.webhookDescription.restartWebhook === true,
);
if (webhookData === undefined) {
// If no data got found it means that the execution can not be started via a webhook.
// Return 404 because we do not want to give any data if the execution exists or not.
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
throw new ResponseHelper.NotFoundError(errorMessage);
}
const runExecutionData = execution.data;
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
@@ -140,7 +113,7 @@ export class WaitingWebhooks {
executionMode,
undefined,
runExecutionData,
fullExecutionData.id,
execution.id,
req,
res,

View File

@@ -1,21 +1,20 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable id-denylist */
/* eslint-disable prefer-spread */
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import type express from 'express';
import { Container } from 'typedi';
import get from 'lodash/get';
import stream from 'stream';
import { promisify } from 'util';
import { Container } from 'typedi';
import { parse as parseQueryString } from 'querystring';
import { Parser as XmlParser } from 'xml2js';
import formidable from 'formidable';
import { BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
@@ -26,6 +25,7 @@ import type {
IDeferredPromise,
IExecuteData,
IExecuteResponsePromiseData,
IHttpRequestMethods,
IN8nHttpFullResponse,
INode,
IRunExecutionData,
@@ -40,6 +40,7 @@ import {
BINARY_ENCODING,
createDeferredPromise,
ErrorReporterProxy as ErrorReporter,
jsonParse,
LoggerProxy as Logger,
NodeHelpers,
} from 'n8n-workflow';
@@ -47,8 +48,11 @@ import {
import type {
IExecutionDb,
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
IWorkflowExecutionDataProcess,
WebhookCORSRequest,
WebhookRequest,
} from '@/Interfaces';
import * as GenericHelpers from '@/GenericHelpers';
import * as ResponseHelper from '@/ResponseHelper';
@@ -63,11 +67,67 @@ import { OwnershipService } from './services/ownership.service';
const pipeline = promisify(stream.pipeline);
export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];
export const WEBHOOK_METHODS: IHttpRequestMethods[] = [
'DELETE',
'GET',
'HEAD',
'PATCH',
'POST',
'PUT',
];
const xmlParser = new XmlParser({
async: true,
normalize: true, // Trim whitespace inside text nodes
normalizeTags: true, // Transform tags to lowercase
explicitArray: false, // Only put properties in array if length > 1
});
export const webhookRequestHandler =
(webhookManager: IWebhookManager) =>
async (req: WebhookRequest | WebhookCORSRequest, res: express.Response) => {
const { path } = req.params;
const method = req.method;
if (method !== 'OPTIONS' && !WEBHOOK_METHODS.includes(method)) {
return ResponseHelper.sendErrorResponse(
res,
new Error(`The method ${method} is not supported.`),
);
}
// Setup CORS headers only if the incoming request has an `origin` header
if ('origin' in req.headers) {
if (webhookManager.getWebhookMethods) {
try {
const allowedMethods = await webhookManager.getWebhookMethods(path);
res.header('Access-Control-Allow-Methods', ['OPTIONS', ...allowedMethods].join(', '));
} catch (error) {
return ResponseHelper.sendErrorResponse(res, error as Error);
}
}
res.header('Access-Control-Allow-Origin', req.headers.origin);
}
if (method === 'OPTIONS') {
return ResponseHelper.sendSuccessResponse(res, {}, true, 204);
}
let response;
try {
response = await webhookManager.executeWebhook(req, res);
} catch (error) {
return ResponseHelper.sendErrorResponse(res, error as Error);
}
// Don't respond, if already responded
if (response.noWebhookResponse !== true) {
ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode);
}
};
/**
* Returns all the webhooks which should be created for the given workflow
*
*/
export function getWorkflowWebhooks(
workflow: Workflow,
@@ -134,9 +194,6 @@ export function encodeWebhookResponse(
/**
* Executes a webhook
*
* @param {(string | undefined)} sessionId
* @param {((error: Error | null, data: IResponseCallbackData) => void)} responseCallback
*/
export async function executeWebhook(
workflow: Workflow,
@@ -147,7 +204,7 @@ export async function executeWebhook(
sessionId: string | undefined,
runExecutionData: IRunExecutionData | undefined,
executionId: string | undefined,
req: express.Request,
req: WebhookRequest,
res: express.Response,
responseCallback: (error: Error | null, data: IResponseCallbackData) => void,
destinationNode?: string,
@@ -227,6 +284,16 @@ export async function executeWebhook(
additionalData.httpRequest = req;
additionalData.httpResponse = res;
const binaryData = workflow.expression.getSimpleParameterValue(
workflowStartNode,
'={{$parameter["options"]["binaryData"]}}',
executionMode,
additionalData.timezone,
additionalKeys,
undefined,
false,
);
let didSendResponse = false;
let runExecutionDataMerge = {};
try {
@@ -234,6 +301,46 @@ export async function executeWebhook(
// the workflow should be executed or not
let webhookResultData: IWebhookResponseData;
// if `Webhook` or `Wait` node, and binaryData is enabled, skip pre-parse the request-body
if (!binaryData) {
const { contentType, encoding } = req;
if (contentType === 'multipart/form-data') {
const form = formidable({
multiples: true,
encoding: encoding as formidable.BufferEncoding,
// TODO: pass a custom `fileWriteStreamHandler` to create binary data files directly
});
req.body = await new Promise((resolve) => {
form.parse(req, async (err, data, files) => {
resolve({ data, files });
});
});
} else {
await req.readRawBody();
const { rawBody } = req;
if (rawBody?.length) {
try {
if (contentType === 'application/json') {
req.body = jsonParse(rawBody.toString(encoding));
} else if (contentType?.endsWith('/xml') || contentType?.endsWith('+xml')) {
req.body = await xmlParser.parseStringPromise(rawBody.toString(encoding));
} else if (contentType === 'application/x-www-form-urlencoded') {
req.body = parseQueryString(rawBody.toString(encoding), undefined, undefined, {
maxKeys: 1000,
});
} else if (contentType === 'text/plain') {
req.body = rawBody.toString(encoding);
}
} catch (error) {
throw new ResponseHelper.UnprocessableRequestError(
'Failed to parse request body',
error.message,
);
}
}
}
}
try {
webhookResultData = await workflow.runWebhook(
webhookData,
@@ -685,11 +792,13 @@ export async function executeWebhook(
return executionId;
} catch (e) {
if (!didSendResponse) {
responseCallback(new Error('There was a problem executing the workflow'), {});
}
throw new ResponseHelper.InternalServerError(e.message);
const error =
e instanceof ResponseHelper.UnprocessableRequestError
? e
: new Error('There was a problem executing the workflow', { cause: e });
if (didSendResponse) throw error;
responseCallback(error, {});
return;
}
}

View File

@@ -1,8 +1,3 @@
import { AbstractServer } from '@/AbstractServer';
export class WebhookServer extends AbstractServer {
async configure() {
this.setupWebhookEndpoint();
this.setupWaitingWebhookEndpoint();
}
}
export class WebhookServer extends AbstractServer {}

View File

@@ -26,6 +26,7 @@ if (inE2ETests) {
process.env.N8N_ENCRYPTION_KEY = 'test-encryption-key';
process.env.N8N_PUBLIC_API_DISABLED = 'true';
process.env.N8N_USER_FOLDER = mkdtempSync(testsDir);
process.env.SKIP_STATISTICS_EVENTS = 'true';
} else {
dotenv.config();
}

View File

@@ -1,3 +1,4 @@
import { IHttpRequestMethods } from 'n8n-workflow';
import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
@Entity()
@@ -9,8 +10,8 @@ export class WebhookEntity {
@PrimaryColumn()
webhookPath: string;
@PrimaryColumn()
method: string;
@PrimaryColumn({ type: 'text' })
method: IHttpRequestMethods;
@Column()
node: string;

View File

@@ -0,0 +1,61 @@
import { parse as parseContentDisposition } from 'content-disposition';
import { parse as parseContentType } from 'content-type';
import getRawBody from 'raw-body';
import { type RequestHandler } from 'express';
import { jsonParse } from 'n8n-workflow';
import config from '@/config';
const payloadSizeMax = config.getEnv('endpoints.payloadSizeMax');
export const rawBody: RequestHandler = async (req, res, next) => {
if ('content-type' in req.headers) {
const { type: contentType, parameters } = (() => {
try {
return parseContentType(req);
} catch {
return { type: undefined, parameters: undefined };
}
})();
req.contentType = contentType;
req.encoding = (parameters?.charset ?? 'utf-8').toLowerCase() as BufferEncoding;
const contentDispositionHeader = req.headers['content-disposition'];
if (contentDispositionHeader?.length) {
const {
type,
parameters: { filename },
} = parseContentDisposition(contentDispositionHeader);
req.contentDisposition = { type, filename };
}
}
req.readRawBody = async () => {
if (!req.rawBody) {
req.rawBody = await getRawBody(req, {
length: req.headers['content-length'],
limit: `${String(payloadSizeMax)}mb`,
});
req._body = true;
}
};
next();
};
export const jsonParser: RequestHandler = async (req, res, next) => {
await req.readRawBody();
if (Buffer.isBuffer(req.rawBody)) {
if (req.contentType === 'application/json') {
try {
req.body = jsonParse<unknown>(req.rawBody.toString(req.encoding));
} catch (error) {
res.status(400).send({ error: 'Failed to parse request body' });
return;
}
} else {
req.body = {};
}
}
next();
};

View File

@@ -1,2 +1,3 @@
export * from './auth';
export * from './bodyParser';
export * from './cors';