refactor(core): Reorganize webhook related components under src/webhooks (no-changelog) (#10296)

This commit is contained in:
Tomi Turtiainen
2024-08-07 11:23:44 +03:00
committed by GitHub
parent 2a8f1753e8
commit c8d322a9ba
23 changed files with 118 additions and 104 deletions

View File

@@ -0,0 +1,162 @@
import { Service } from 'typedi';
import type { Response } from 'express';
import { Workflow, NodeHelpers } from 'n8n-workflow';
import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type {
IWebhookResponseCallbackData,
IWebhookManager,
WebhookAccessControlOptions,
WebhookRequest,
} from './webhook.types';
import { Logger } from '@/Logger';
import { NodeTypes } from '@/NodeTypes';
import { WebhookService } from '@/webhooks/webhook.service';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import * as WebhookHelpers from '@/webhooks/WebhookHelpers';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
/**
* Service for handling the execution of production webhooks, i.e. webhooks
* that belong to activated workflows and use the production URL
* (https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/#webhook-urls)
*/
@Service()
export class ActiveWebhooks implements IWebhookManager {
constructor(
private readonly logger: Logger,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly workflowStaticDataService: WorkflowStaticDataService,
) {}
async getWebhookMethods(path: string) {
return await this.webhookService.getWebhookMethods(path);
}
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
const webhook = await this.findWebhook(path, httpMethod);
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
select: ['nodes'],
});
const nodes = workflowData?.nodes;
const webhookNode = nodes?.find(
({ type, parameters, typeVersion }) =>
parameters?.path === path &&
(parameters?.httpMethod ?? 'GET') === httpMethod &&
'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion),
);
return webhookNode?.parameters?.options as WebhookAccessControlOptions;
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*/
async executeWebhook(
request: WebhookRequest,
response: Response,
): Promise<IWebhookResponseCallbackData> {
const httpMethod = request.method;
const path = request.params.path;
this.logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
// Reset request parameters
request.params = {} as WebhookRequest['params'];
const webhook = await this.findWebhook(path, httpMethod);
if (webhook.isDynamic) {
const pathElements = path.split('/').slice(1);
// extracting params from path
webhook.webhookPath.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
request.params[ele.slice(1)] = pathElements[index];
}
});
}
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
relations: { shared: { project: { projectRelations: true } } },
});
if (workflowData === null) {
throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`);
}
const workflow = new Workflow({
id: webhook.workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase();
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflow.getNode(webhook.node) as INode,
additionalData,
).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
return await new Promise((resolve, reject) => {
const executionMode = 'webhook';
void WebhookHelpers.executeWebhook(
workflow,
webhookData,
workflowData,
workflowStartNode,
executionMode,
undefined,
undefined,
undefined,
request,
response,
async (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
// Save static data if it changed
await this.workflowStaticDataService.saveStaticData(workflow);
resolve(data);
},
);
});
}
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
// Remove trailing slash
if (path.endsWith('/')) {
path = path.slice(0, -1);
}
const webhook = await this.webhookService.findWebhook(httpMethod, path);
if (webhook === null) {
throw new WebhookNotFoundError({ path, httpMethod }, { hint: 'production' });
}
return webhook;
}
}

View File

@@ -0,0 +1,425 @@
import type express from 'express';
import { Service } from 'typedi';
import { WebhookPathTakenError, Workflow } from 'n8n-workflow';
import type {
IWebhookData,
IWorkflowExecuteAdditionalData,
IHttpRequestMethods,
IRunData,
} from 'n8n-workflow';
import type {
IWebhookResponseCallbackData,
IWebhookManager,
WebhookAccessControlOptions,
WebhookRequest,
} from './webhook.types';
import { Push } from '@/push';
import { NodeTypes } from '@/NodeTypes';
import * as WebhookHelpers from '@/webhooks/WebhookHelpers';
import { TEST_WEBHOOK_TIMEOUT } from '@/constants';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import * as NodeExecuteFunctions from 'n8n-core';
import { removeTrailingSlash } from '@/utils';
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
import { OrchestrationService } from '@/services/orchestration.service';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import type { IWorkflowDb } from '@/Interfaces';
/**
* Service for handling the execution of webhooks of manual executions
* that use the [Test URL](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.webhook/#webhook-urls).
*/
@Service()
export class TestWebhooks implements IWebhookManager {
constructor(
private readonly push: Push,
private readonly nodeTypes: NodeTypes,
private readonly registrations: TestWebhookRegistrationsService,
private readonly orchestrationService: OrchestrationService,
) {}
private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {};
/**
* Return a promise that resolves when the test webhook is called.
* Also inform the FE of the result and remove the test webhook.
*/
async executeWebhook(
request: WebhookRequest,
response: express.Response,
): Promise<IWebhookResponseCallbackData> {
const httpMethod = request.method;
let path = removeTrailingSlash(request.params.path);
request.params = {} as WebhookRequest['params'];
let webhook = await this.getActiveWebhook(httpMethod, path);
if (!webhook) {
// no static webhook, so check if dynamic
// e.g. `/webhook-test/<uuid>/user/:id/create`
const [webhookId, ...segments] = path.split('/');
webhook = await this.getActiveWebhook(httpMethod, segments.join('/'), webhookId);
if (!webhook)
throw new WebhookNotFoundError({
path,
httpMethod,
webhookMethods: await this.getWebhookMethods(path),
});
path = webhook.path;
path.split('/').forEach((segment, index) => {
if (segment.startsWith(':')) {
request.params[segment.slice(1)] = segments[index];
}
});
}
const key = this.registrations.toKey(webhook);
const registration = await this.registrations.get(key);
if (!registration) {
throw new WebhookNotFoundError({
path,
httpMethod,
webhookMethods: await this.getWebhookMethods(path),
});
}
const { destinationNode, pushRef, workflowEntity, webhook: testWebhook } = registration;
const workflow = this.toWorkflow(workflowEntity);
if (testWebhook.staticData) workflow.setTestStaticData(testWebhook.staticData);
const workflowStartNode = workflow.getNode(webhook.node);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
return await new Promise(async (resolve, reject) => {
try {
const executionMode = 'manual';
const executionId = await WebhookHelpers.executeWebhook(
workflow,
webhook,
workflowEntity,
workflowStartNode,
executionMode,
pushRef,
undefined, // IRunExecutionData
undefined, // executionId
request,
response,
(error: Error | null, data: IWebhookResponseCallbackData) => {
if (error !== null) reject(error);
else resolve(data);
},
destinationNode,
);
// The workflow did not run as the request was probably setup related
// or a ping so do not resolve the promise and wait for the real webhook
// request instead.
if (executionId === undefined) return;
// Inform editor-ui that webhook got received
if (pushRef !== undefined) {
this.push.send(
'testWebhookReceived',
{ workflowId: webhook?.workflowId, executionId },
pushRef,
);
}
} catch {}
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks.
*/
if (
this.orchestrationService.isMultiMainSetupEnabled &&
pushRef &&
!this.push.getBackend().hasPushRef(pushRef)
) {
const payload = { webhookKey: key, workflowEntity, pushRef };
void this.orchestrationService.publish('clear-test-webhooks', payload);
return;
}
this.clearTimeout(key);
await this.deactivateWebhooks(workflow);
});
}
clearTimeout(key: string) {
const timeout = this.timeouts[key];
if (timeout) clearTimeout(timeout);
}
async getWebhookMethods(path: string) {
const allKeys = await this.registrations.getAllKeys();
const webhookMethods = allKeys
.filter((key) => key.includes(path))
.map((key) => key.split('|')[0] as IHttpRequestMethods);
if (!webhookMethods.length) throw new WebhookNotFoundError({ path });
return webhookMethods;
}
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
const allKeys = await this.registrations.getAllKeys();
const webhookKey = allKeys.find((key) => key.includes(path) && key.startsWith(httpMethod));
if (!webhookKey) return;
const registration = await this.registrations.get(webhookKey);
if (!registration) return;
const { workflowEntity } = registration;
const workflow = this.toWorkflow(workflowEntity);
const webhookNode = Object.values(workflow.nodes).find(
({ type, parameters, typeVersion }) =>
parameters?.path === path &&
(parameters?.httpMethod ?? 'GET') === httpMethod &&
'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion),
);
return webhookNode?.parameters?.options as WebhookAccessControlOptions;
}
/**
* Return whether activating a workflow requires listening for webhook calls.
* For every webhook call to listen for, also activate the webhook.
*/
async needsWebhook(
userId: string,
workflowEntity: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
runData?: IRunData,
pushRef?: string,
destinationNode?: string,
) {
if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity);
const workflow = this.toWorkflow(workflowEntity);
const webhooks = WebhookHelpers.getWorkflowWebhooks(
workflow,
additionalData,
destinationNode,
true,
);
if (!webhooks.some((w) => w.webhookDescription.restartWebhook !== true)) {
return false; // no webhooks found to start a workflow
}
const timeout = setTimeout(
async () => await this.cancelWebhook(workflow.id),
TEST_WEBHOOK_TIMEOUT,
);
for (const webhook of webhooks) {
const key = this.registrations.toKey(webhook);
const registrationByKey = await this.registrations.get(key);
if (runData && webhook.node in runData) {
return false;
}
// if registration already exists and is not a test webhook created by this user in this workflow throw an error
if (
registrationByKey &&
!webhook.webhookId &&
!registrationByKey.webhook.isTest &&
registrationByKey.webhook.userId !== userId &&
registrationByKey.webhook.workflowId !== workflow.id
) {
throw new WebhookPathTakenError(webhook.node);
}
webhook.path = removeTrailingSlash(webhook.path);
webhook.isTest = true;
/**
* Additional data cannot be cached because of circular refs.
* Hence store the `userId` and recreate additional data when needed.
*/
const { workflowExecuteAdditionalData: _, ...cacheableWebhook } = webhook;
cacheableWebhook.userId = userId;
const registration: TestWebhookRegistration = {
pushRef,
workflowEntity,
destinationNode,
webhook: cacheableWebhook as IWebhookData,
};
try {
/**
* Register the test webhook _before_ creation at third-party service
* in case service sends a confirmation request immediately on creation.
*/
await this.registrations.register(registration);
await workflow.createWebhookIfNotExists(webhook, NodeExecuteFunctions, 'manual', 'manual');
cacheableWebhook.staticData = workflow.staticData;
await this.registrations.register(registration);
this.timeouts[key] = timeout;
} catch (error) {
await this.deactivateWebhooks(workflow);
delete this.timeouts[key];
throw error;
}
}
return true;
}
async cancelWebhook(workflowId: string) {
let foundWebhook = false;
const allWebhookKeys = await this.registrations.getAllKeys();
for (const key of allWebhookKeys) {
const registration = await this.registrations.get(key);
if (!registration) continue;
const { pushRef, workflowEntity } = registration;
const workflow = this.toWorkflow(workflowEntity);
if (workflowEntity.id !== workflowId) continue;
this.clearTimeout(key);
if (pushRef !== undefined) {
try {
this.push.send('testWebhookDeleted', { workflowId }, pushRef);
} catch {
// Could not inform editor, probably is not connected anymore. So simply go on.
}
}
if (!foundWebhook) {
// As it removes all webhooks of the workflow execute only once
void this.deactivateWebhooks(workflow);
}
foundWebhook = true;
}
return foundWebhook;
}
async getActiveWebhook(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) {
const key = this.registrations.toKey({ httpMethod, path, webhookId });
let webhook: IWebhookData | undefined;
let maxMatches = 0;
const pathElementsSet = new Set(path.split('/'));
// check if static elements match in path
// if more results have been returned choose the one with the most static-route matches
const registration = await this.registrations.get(key);
if (!registration) return;
const { webhook: dynamicWebhook } = registration;
const staticElements = dynamicWebhook.path.split('/').filter((ele) => !ele.startsWith(':'));
const allStaticExist = staticElements.every((staticEle) => pathElementsSet.has(staticEle));
if (allStaticExist && staticElements.length > maxMatches) {
maxMatches = staticElements.length;
webhook = dynamicWebhook;
}
// handle routes with no static elements
else if (staticElements.length === 0 && !webhook) {
webhook = dynamicWebhook;
}
return webhook;
}
/**
* Deactivate all registered test webhooks of a workflow.
*/
async deactivateWebhooks(workflow: Workflow) {
const allRegistrations = await this.registrations.getAllRegistrations();
if (!allRegistrations.length) return; // nothing to deactivate
type WebhooksByWorkflow = { [workflowId: string]: IWebhookData[] };
const webhooksByWorkflow = allRegistrations.reduce<WebhooksByWorkflow>((acc, cur) => {
const { workflowId } = cur.webhook;
acc[workflowId] ||= [];
acc[workflowId].push(cur.webhook);
return acc;
}, {});
const webhooks = webhooksByWorkflow[workflow.id];
if (!webhooks) return; // nothing to deactivate
for (const webhook of webhooks) {
const { userId, staticData } = webhook;
if (userId) {
webhook.workflowExecuteAdditionalData = await WorkflowExecuteAdditionalData.getBase(userId);
}
if (staticData) workflow.staticData = staticData;
await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update');
}
await this.registrations.deregisterAll();
}
/**
* Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`.
*/
toWorkflow(workflowEntity: IWorkflowDb) {
return new Workflow({
id: workflowEntity.id,
name: workflowEntity.name,
nodes: workflowEntity.nodes,
connections: workflowEntity.connections,
active: false,
nodeTypes: this.nodeTypes,
staticData: {},
settings: workflowEntity.settings,
});
}
}

View File

@@ -0,0 +1,147 @@
import { NodeHelpers, Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import type express from 'express';
import * as WebhookHelpers from '@/webhooks/WebhookHelpers';
import { NodeTypes } from '@/NodeTypes';
import type {
IWebhookResponseCallbackData,
IWebhookManager,
WaitingWebhookRequest,
} from './webhook.types';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ConflictError } from '@/errors/response-errors/conflict.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type { IExecutionResponse, IWorkflowDb } from '@/Interfaces';
/**
* Service for handling the execution of webhooks of Wait nodes that use the
* [Resume On Webhook Call](https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.wait/#on-webhook-call)
* feature.
*/
@Service()
export class WaitingWebhooks implements IWebhookManager {
protected includeForms = false;
constructor(
protected readonly logger: Logger,
private readonly nodeTypes: NodeTypes,
private readonly executionRepository: ExecutionRepository,
) {}
// TODO: implement `getWebhookMethods` for CORS support
protected logReceivedWebhook(method: string, executionId: string) {
this.logger.debug(`Received waiting-webhook "${method}" for execution "${executionId}"`);
}
protected disableNode(execution: IExecutionResponse, _method?: string) {
execution.data.executionData!.nodeExecutionStack[0].node.disabled = true;
}
async executeWebhook(
req: WaitingWebhookRequest,
res: express.Response,
): Promise<IWebhookResponseCallbackData> {
const { path: executionId, suffix } = req.params;
this.logReceivedWebhook(req.method, executionId);
// Reset request parameters
req.params = {} as WaitingWebhookRequest['params'];
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) {
throw new NotFoundError(`The execution "${executionId} does not exist.`);
}
if (execution.status === 'running') {
throw new ConflictError(`The execution "${executionId} is running already.`);
}
if (execution.finished || execution.data.resultData.error) {
throw new ConflictError(`The execution "${executionId} has finished already.`);
}
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
// Set the node as disabled so that the data does not get executed again as it would result
// in starting the wait all over again
this.disableNode(execution, req.method);
// Remove waitTill information else the execution would stop
execution.data.waitTill = undefined;
// Remove the data of the node execution again else it will display the node as executed twice
execution.data.resultData.runData[lastNodeExecuted].pop();
const { workflowData } = execution;
const workflow = new Workflow({
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const workflowStartNode = workflow.getNode(lastNodeExecuted);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
const additionalData = await WorkflowExecuteAdditionalData.getBase();
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflowStartNode,
additionalData,
).find(
(webhook) =>
webhook.httpMethod === req.method &&
webhook.path === (suffix ?? '') &&
webhook.webhookDescription.restartWebhook === true &&
(webhook.webhookDescription.isForm || false) === this.includeForms,
);
if (webhookData === undefined) {
// If no data got found it means that the execution can not be started via a webhook.
// Return 404 because we do not want to give any data if the execution exists or not.
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
throw new NotFoundError(errorMessage);
}
const runExecutionData = execution.data;
return await new Promise((resolve, reject) => {
const executionMode = 'webhook';
void WebhookHelpers.executeWebhook(
workflow,
webhookData,
workflowData as IWorkflowDb,
workflowStartNode,
executionMode,
undefined,
runExecutionData,
execution.id,
req,
res,
(error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
resolve(data);
},
);
});
}
}

View File

@@ -0,0 +1,850 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable id-denylist */
/* eslint-disable prefer-spread */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import type express from 'express';
import { Container } from 'typedi';
import get from 'lodash/get';
import { finished } from 'stream/promises';
import formidable from 'formidable';
import { BinaryDataService, NodeExecuteFunctions } from 'n8n-core';
import type {
IBinaryData,
IBinaryKeyData,
IDataObject,
IDeferredPromise,
IExecuteData,
IExecuteResponsePromiseData,
IHttpRequestMethods,
IN8nHttpFullResponse,
INode,
IPinData,
IRunExecutionData,
IWebhookData,
IWebhookResponseData,
IWorkflowDataProxyAdditionalKeys,
IWorkflowExecuteAdditionalData,
WebhookResponseMode,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
ApplicationError,
BINARY_ENCODING,
createDeferredPromise,
ErrorReporterProxy as ErrorReporter,
NodeHelpers,
} from 'n8n-workflow';
import type {
IWebhookResponseCallbackData,
IWebhookManager,
WebhookCORSRequest,
WebhookRequest,
} from './webhook.types';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { ActiveExecutions } from '@/ActiveExecutions';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { OwnershipService } from '@/services/ownership.service';
import { parseBody } from '@/middlewares';
import { Logger } from '@/Logger';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import type { Project } from '@/databases/entities/Project';
import type { IExecutionDb, IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
export const WEBHOOK_METHODS: IHttpRequestMethods[] = [
'DELETE',
'GET',
'HEAD',
'PATCH',
'POST',
'PUT',
];
export const webhookRequestHandler =
(webhookManager: IWebhookManager) =>
async (req: WebhookRequest | WebhookCORSRequest, res: express.Response) => {
const { path } = req.params;
const method = req.method;
if (method !== 'OPTIONS' && !WEBHOOK_METHODS.includes(method)) {
return ResponseHelper.sendErrorResponse(
res,
new Error(`The method ${method} is not supported.`),
);
}
// Setup CORS headers only if the incoming request has an `origin` header
if ('origin' in req.headers) {
if (webhookManager.getWebhookMethods) {
try {
const allowedMethods = await webhookManager.getWebhookMethods(path);
res.header('Access-Control-Allow-Methods', ['OPTIONS', ...allowedMethods].join(', '));
} catch (error) {
return ResponseHelper.sendErrorResponse(res, error as Error);
}
}
const requestedMethod =
method === 'OPTIONS'
? (req.headers['access-control-request-method'] as IHttpRequestMethods)
: method;
if (webhookManager.findAccessControlOptions && requestedMethod) {
const options = await webhookManager.findAccessControlOptions(path, requestedMethod);
const { allowedOrigins } = options ?? {};
if (allowedOrigins && allowedOrigins !== '*' && allowedOrigins !== req.headers.origin) {
const originsList = allowedOrigins.split(',');
const defaultOrigin = originsList[0];
if (originsList.length === 1) {
res.header('Access-Control-Allow-Origin', defaultOrigin);
}
if (originsList.includes(req.headers.origin as string)) {
res.header('Access-Control-Allow-Origin', req.headers.origin);
} else {
res.header('Access-Control-Allow-Origin', defaultOrigin);
}
} else {
res.header('Access-Control-Allow-Origin', req.headers.origin);
}
if (method === 'OPTIONS') {
res.header('Access-Control-Max-Age', '300');
const requestedHeaders = req.headers['access-control-request-headers'];
if (requestedHeaders?.length) {
res.header('Access-Control-Allow-Headers', requestedHeaders);
}
}
}
}
if (method === 'OPTIONS') {
return ResponseHelper.sendSuccessResponse(res, {}, true, 204);
}
let response: IWebhookResponseCallbackData;
try {
response = await webhookManager.executeWebhook(req, res);
} catch (error) {
return ResponseHelper.sendErrorResponse(res, error as Error);
}
// Don't respond, if already responded
if (response.noWebhookResponse !== true) {
ResponseHelper.sendSuccessResponse(
res,
response.data,
true,
response.responseCode,
response.headers,
);
}
};
/**
* Returns all the webhooks which should be created for the given workflow
*/
export function getWorkflowWebhooks(
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
destinationNode?: string,
ignoreRestartWebhooks = false,
): IWebhookData[] {
// Check all the nodes in the workflow if they have webhooks
const returnData: IWebhookData[] = [];
let parentNodes: string[] | undefined;
if (destinationNode !== undefined) {
parentNodes = workflow.getParentNodes(destinationNode);
// Also add the destination node in case it itself is a webhook node
parentNodes.push(destinationNode);
}
for (const node of Object.values(workflow.nodes)) {
if (parentNodes !== undefined && !parentNodes.includes(node.name)) {
// If parentNodes are given check only them if they have webhooks
// and no other ones
continue;
}
returnData.push.apply(
returnData,
NodeHelpers.getNodeWebhooks(workflow, node, additionalData, ignoreRestartWebhooks),
);
}
return returnData;
}
export function encodeWebhookResponse(
response: IExecuteResponsePromiseData,
): IExecuteResponsePromiseData {
if (typeof response === 'object' && Buffer.isBuffer(response.body)) {
response.body = {
'__@N8nEncodedBuffer@__': response.body.toString(BINARY_ENCODING),
};
}
return response;
}
const normalizeFormData = <T>(values: Record<string, T | T[]>) => {
for (const key in values) {
const value = values[key];
if (Array.isArray(value) && value.length === 1) {
values[key] = value[0];
}
}
};
/**
* Executes a webhook
*/
// eslint-disable-next-line complexity
export async function executeWebhook(
workflow: Workflow,
webhookData: IWebhookData,
workflowData: IWorkflowDb,
workflowStartNode: INode,
executionMode: WorkflowExecuteMode,
pushRef: string | undefined,
runExecutionData: IRunExecutionData | undefined,
executionId: string | undefined,
req: WebhookRequest,
res: express.Response,
responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void,
destinationNode?: string,
): Promise<string | undefined> {
// Get the nodeType to know which responseMode is set
const nodeType = workflow.nodeTypes.getByNameAndVersion(
workflowStartNode.type,
workflowStartNode.typeVersion,
);
if (nodeType === undefined) {
const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known`;
responseCallback(new Error(errorMessage), {});
throw new InternalServerError(errorMessage);
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
};
let project: Project | undefined = undefined;
try {
project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowData.id);
} catch (error) {
throw new NotFoundError('Cannot find workflow');
}
// Prepare everything that is needed to run the workflow
const additionalData = await WorkflowExecuteAdditionalData.getBase();
if (executionId) {
additionalData.executionId = executionId;
}
// Get the responseMode
const responseMode = workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseMode,
executionMode,
additionalKeys,
undefined,
'onReceived',
) as WebhookResponseMode;
const responseCode = workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseCode as string,
executionMode,
additionalKeys,
undefined,
200,
) as number;
const responseData = workflow.expression.getComplexParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseData,
executionMode,
additionalKeys,
undefined,
'firstEntryJson',
);
if (!['onReceived', 'lastNode', 'responseNode'].includes(responseMode)) {
// If the mode is not known we error. Is probably best like that instead of using
// the default that people know as early as possible (probably already testing phase)
// that something does not resolve properly.
const errorMessage = `The response mode '${responseMode}' is not valid!`;
responseCallback(new Error(errorMessage), {});
throw new InternalServerError(errorMessage);
}
// Add the Response and Request so that this data can be accessed in the node
additionalData.httpRequest = req;
additionalData.httpResponse = res;
let binaryData;
const nodeVersion = workflowStartNode.typeVersion;
if (nodeVersion === 1) {
// binaryData option is removed in versions higher than 1
binaryData = workflow.expression.getSimpleParameterValue(
workflowStartNode,
'={{$parameter["options"]["binaryData"]}}',
executionMode,
additionalKeys,
undefined,
false,
);
}
let didSendResponse = false;
let runExecutionDataMerge = {};
try {
// Run the webhook function to see what should be returned and if
// the workflow should be executed or not
let webhookResultData: IWebhookResponseData;
// if `Webhook` or `Wait` node, and binaryData is enabled, skip pre-parse the request-body
// always falsy for versions higher than 1
if (!binaryData) {
const { contentType, encoding } = req;
if (contentType === 'multipart/form-data') {
const form = formidable({
multiples: true,
encoding: encoding as formidable.BufferEncoding,
// TODO: pass a custom `fileWriteStreamHandler` to create binary data files directly
});
req.body = await new Promise((resolve) => {
form.parse(req, async (_err, data, files) => {
normalizeFormData(data);
normalizeFormData(files);
resolve({ data, files });
});
});
} else {
if (nodeVersion > 1) {
if (
contentType?.startsWith('application/json') ||
contentType?.startsWith('text/plain') ||
contentType?.startsWith('application/x-www-form-urlencoded') ||
contentType?.endsWith('/xml') ||
contentType?.endsWith('+xml')
) {
await parseBody(req);
}
} else {
await parseBody(req);
}
}
}
try {
webhookResultData = await workflow.runWebhook(
webhookData,
workflowStartNode,
additionalData,
NodeExecuteFunctions,
executionMode,
runExecutionData ?? null,
);
Container.get(WorkflowStatisticsService).emit('nodeFetchedData', {
workflowId: workflow.id,
node: workflowStartNode,
});
} catch (err) {
// Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';
responseCallback(new Error(errorMessage), {});
didSendResponse = true;
// Add error to execution data that it can be logged and send to Editor-UI
runExecutionDataMerge = {
resultData: {
runData: {},
lastNodeExecuted: workflowStartNode.name,
error: {
...err,
message: err.message,
stack: err.stack,
},
},
};
webhookResultData = {
noWebhookResponse: true,
// Add empty data that it at least tries to "execute" the webhook
// which then so gets the chance to throw the error.
workflowData: [[{ json: {} }]],
};
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
};
if (webhookData.webhookDescription.responseHeaders !== undefined) {
const responseHeaders = workflow.expression.getComplexParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseHeaders,
executionMode,
additionalKeys,
undefined,
undefined,
) as {
entries?:
| Array<{
name: string;
value: string;
}>
| undefined;
};
if (responseHeaders !== undefined && responseHeaders.entries !== undefined) {
for (const item of responseHeaders.entries) {
res.setHeader(item.name, item.value);
}
}
}
if (webhookResultData.noWebhookResponse === true && !didSendResponse) {
// The response got already send
responseCallback(null, {
noWebhookResponse: true,
});
didSendResponse = true;
}
if (webhookResultData.workflowData === undefined) {
// Workflow should not run
if (webhookResultData.webhookResponse !== undefined) {
// Data to respond with is given
if (!didSendResponse) {
responseCallback(null, {
data: webhookResultData.webhookResponse,
responseCode,
});
didSendResponse = true;
}
} else {
// Send default response
if (!didSendResponse) {
responseCallback(null, {
data: {
message: 'Webhook call received',
},
responseCode,
});
didSendResponse = true;
}
}
return;
}
// Now that we know that the workflow should run we can return the default response
// directly if responseMode it set to "onReceived" and a response should be sent
if (responseMode === 'onReceived' && !didSendResponse) {
// Return response directly and do not wait for the workflow to finish
if (responseData === 'noData') {
// Return without data
responseCallback(null, {
responseCode,
});
} else if (responseData) {
// Return the data specified in the response data option
responseCallback(null, {
data: responseData as IDataObject,
responseCode,
});
} else if (webhookResultData.webhookResponse !== undefined) {
// Data to respond with is given
responseCallback(null, {
data: webhookResultData.webhookResponse,
responseCode,
});
} else {
responseCallback(null, {
data: {
message: 'Workflow was started',
},
responseCode,
});
}
didSendResponse = true;
}
// Initialize the data of the webhook node
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push({
node: workflowStartNode,
data: {
main: webhookResultData.workflowData,
},
source: null,
});
runExecutionData =
runExecutionData ||
({
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
} as IRunExecutionData);
if (destinationNode && runExecutionData.startData) {
runExecutionData.startData.destinationNode = destinationNode;
}
if (executionId !== undefined) {
// Set the data the webhook node did return on the waiting node if executionId
// already exists as it means that we are restarting an existing execution.
runExecutionData.executionData!.nodeExecutionStack[0].data.main =
webhookResultData.workflowData;
}
if (Object.keys(runExecutionDataMerge).length !== 0) {
// If data to merge got defined add it to the execution data
Object.assign(runExecutionData, runExecutionDataMerge);
}
let pinData: IPinData | undefined;
const usePinData = executionMode === 'manual';
if (usePinData) {
pinData = workflowData.pinData;
runExecutionData.resultData.pinData = pinData;
}
const runData: IWorkflowExecutionDataProcess = {
executionMode,
executionData: runExecutionData,
pushRef,
workflowData,
pinData,
projectId: project?.id,
};
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') {
responsePromise = await createDeferredPromise<IN8nHttpFullResponse>();
responsePromise
.promise()
.then(async (response: IN8nHttpFullResponse) => {
if (didSendResponse) {
return;
}
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
stream.pipe(res, { end: false });
await finished(stream);
responseCallback(null, { noWebhookResponse: true });
} else if (Buffer.isBuffer(response.body)) {
res.header(response.headers);
res.end(response.body);
responseCallback(null, { noWebhookResponse: true });
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
const headers = response.headers;
let responseCode = response.statusCode;
let data = response.body as IDataObject;
// for formTrigger node redirection has to be handled by sending redirectURL in response body
if (
nodeType.description.name === 'formTrigger' &&
headers.location &&
String(responseCode).startsWith('3')
) {
responseCode = 200;
data = {
redirectURL: headers.location,
};
headers.location = undefined;
}
responseCallback(null, {
data,
headers,
responseCode,
});
}
process.nextTick(() => res.end());
didSendResponse = true;
})
.catch(async (error) => {
ErrorReporter.error(error);
Container.get(Logger).error(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId: workflow.id },
);
});
}
// Start now to run the workflow
executionId = await Container.get(WorkflowRunner).run(
runData,
true,
!didSendResponse,
executionId,
responsePromise,
);
Container.get(Logger).verbose(
`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`,
{ executionId },
);
if (!didSendResponse) {
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
executionId,
) as Promise<IExecutionDb | undefined>;
executePromise
// eslint-disable-next-line complexity
.then(async (data) => {
if (data === undefined) {
if (!didSendResponse) {
responseCallback(null, {
data: {
message: 'Workflow executed successfully but no data was returned',
},
responseCode,
});
didSendResponse = true;
}
return undefined;
}
if (usePinData) {
data.data.resultData.pinData = pinData;
}
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
if (data.data.resultData.error || returnData?.error !== undefined) {
if (!didSendResponse) {
responseCallback(null, {
data: {
message: 'Error in workflow',
},
responseCode: 500,
});
}
didSendResponse = true;
return data;
}
// in `responseNode` mode `responseCallback` is called by `responsePromise`
if (responseMode === 'responseNode' && responsePromise) {
await Promise.allSettled([responsePromise.promise()]);
return undefined;
}
if (returnData === undefined) {
if (!didSendResponse) {
responseCallback(null, {
data: {
message:
'Workflow executed successfully but the last node did not return any data',
},
responseCode,
});
}
didSendResponse = true;
return data;
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
};
if (!didSendResponse) {
let data: IDataObject | IDataObject[] | undefined;
if (responseData === 'firstEntryJson') {
// Return the JSON data of the first entry
if (returnData.data!.main[0]![0] === undefined) {
responseCallback(new Error('No item to return got found'), {});
didSendResponse = true;
return undefined;
}
data = returnData.data!.main[0]![0].json;
const responsePropertyName = workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responsePropertyName,
executionMode,
additionalKeys,
undefined,
undefined,
);
if (responsePropertyName !== undefined) {
data = get(data, responsePropertyName as string) as IDataObject;
}
const responseContentType = workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseContentType,
executionMode,
additionalKeys,
undefined,
undefined,
);
if (responseContentType !== undefined) {
// Send the webhook response manually to be able to set the content-type
res.setHeader('Content-Type', responseContentType as string);
// Returning an object, boolean, number, ... causes problems so make sure to stringify if needed
if (
data !== null &&
data !== undefined &&
['Buffer', 'String'].includes(data.constructor.name)
) {
res.end(data);
} else {
res.end(JSON.stringify(data));
}
responseCallback(null, {
noWebhookResponse: true,
});
didSendResponse = true;
}
} else if (responseData === 'firstEntryBinary') {
// Return the binary data of the first entry
data = returnData.data!.main[0]![0];
if (data === undefined) {
responseCallback(new Error('No item was found to return'), {});
didSendResponse = true;
return undefined;
}
if (data.binary === undefined) {
responseCallback(new Error('No binary data was found to return'), {});
didSendResponse = true;
return undefined;
}
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseBinaryPropertyName,
executionMode,
additionalKeys,
undefined,
'data',
);
if (responseBinaryPropertyName === undefined && !didSendResponse) {
responseCallback(new Error("No 'responseBinaryPropertyName' is set"), {});
didSendResponse = true;
}
const binaryData = (data.binary as IBinaryKeyData)[
responseBinaryPropertyName as string
];
if (binaryData === undefined && !didSendResponse) {
responseCallback(
new Error(
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
),
{},
);
didSendResponse = true;
}
if (!didSendResponse) {
// Send the webhook response manually
res.setHeader('Content-Type', binaryData.mimeType);
if (binaryData.id) {
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
stream.pipe(res, { end: false });
await finished(stream);
} else {
res.write(Buffer.from(binaryData.data, BINARY_ENCODING));
}
responseCallback(null, {
noWebhookResponse: true,
});
process.nextTick(() => res.end());
}
} else if (responseData === 'noData') {
// Return without data
data = undefined;
} else {
// Return the JSON data of all the entries
data = [];
for (const entry of returnData.data!.main[0]!) {
data.push(entry.json);
}
}
if (!didSendResponse) {
responseCallback(null, {
data,
responseCode,
});
}
}
didSendResponse = true;
return data;
})
.catch((e) => {
if (!didSendResponse) {
responseCallback(
new ApplicationError('There was a problem executing the workflow', {
level: 'warning',
cause: e,
}),
{},
);
}
throw new InternalServerError(e.message);
});
}
return executionId;
} catch (e) {
const error =
e instanceof UnprocessableRequestError
? e
: new ApplicationError('There was a problem executing the workflow', {
level: 'warning',
cause: e,
});
if (didSendResponse) throw error;
responseCallback(error, {});
return;
}
}

View File

@@ -0,0 +1,9 @@
import { Service } from 'typedi';
import { AbstractServer } from '@/AbstractServer';
@Service()
export class WebhookServer extends AbstractServer {
constructor() {
super('webhook');
}
}

View File

@@ -0,0 +1,135 @@
import { mock } from 'jest-mock-extended';
import { TestWebhooks } from '@/webhooks/TestWebhooks';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import { v4 as uuid } from 'uuid';
import { generateNanoId } from '@/databases/utils/generators';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import * as WebhookHelpers from '@/webhooks/WebhookHelpers';
import type * as express from 'express';
import type { IWorkflowDb } from '@/Interfaces';
import type { IWebhookData, IWorkflowExecuteAdditionalData, Workflow } from 'n8n-workflow';
import type {
TestWebhookRegistrationsService,
TestWebhookRegistration,
} from '@/webhooks/test-webhook-registrations.service';
import * as AdditionalData from '@/WorkflowExecuteAdditionalData';
import type { WebhookRequest } from '@/webhooks/webhook.types';
jest.mock('@/WorkflowExecuteAdditionalData');
const mockedAdditionalData = AdditionalData as jest.Mocked<typeof AdditionalData>;
const workflowEntity = mock<IWorkflowDb>({ id: generateNanoId(), nodes: [] });
const httpMethod = 'GET';
const path = uuid();
const userId = '04ab4baf-85df-478f-917b-d303934a97de';
const webhook = mock<IWebhookData>({
httpMethod,
path,
workflowId: workflowEntity.id,
userId,
});
const registrations = mock<TestWebhookRegistrationsService>();
let testWebhooks: TestWebhooks;
describe('TestWebhooks', () => {
beforeAll(() => {
testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock());
jest.useFakeTimers();
});
describe('needsWebhook()', () => {
const args: Parameters<typeof testWebhooks.needsWebhook> = [
userId,
workflowEntity,
mock<IWorkflowExecuteAdditionalData>(),
];
test('if webhook is needed, should register then create webhook and return true', async () => {
const workflow = mock<Workflow>();
jest.spyOn(testWebhooks, 'toWorkflow').mockReturnValueOnce(workflow);
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
const needsWebhook = await testWebhooks.needsWebhook(...args);
const [registerOrder] = registrations.register.mock.invocationCallOrder;
const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder;
expect(registerOrder).toBeLessThan(createOrder);
expect(needsWebhook).toBe(true);
});
test('if webhook activation fails, should deactivate workflow webhooks', async () => {
const msg = 'Failed to add webhook to active webhooks';
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
jest.spyOn(registrations, 'register').mockRejectedValueOnce(new Error(msg));
registrations.getAllRegistrations.mockResolvedValue([]);
const needsWebhook = testWebhooks.needsWebhook(...args);
await expect(needsWebhook).rejects.toThrowError(msg);
});
test('if no webhook is found to start workflow, should return false', async () => {
webhook.webhookDescription.restartWebhook = true;
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]);
const result = await testWebhooks.needsWebhook(...args);
expect(result).toBe(false);
});
});
describe('executeWebhook()', () => {
test('if webhook is not registered, should throw', async () => {
jest.spyOn(testWebhooks, 'getActiveWebhook').mockResolvedValue(webhook);
jest.spyOn(testWebhooks, 'getWebhookMethods').mockResolvedValue([]);
const promise = testWebhooks.executeWebhook(
mock<WebhookRequest>({ params: { path } }),
mock(),
);
await expect(promise).rejects.toThrowError(WebhookNotFoundError);
});
test('if webhook is registered but missing from workflow, should throw', async () => {
jest.spyOn(testWebhooks, 'getActiveWebhook').mockResolvedValue(webhook);
jest.spyOn(testWebhooks, 'getWebhookMethods').mockResolvedValue([]);
const registration = mock<TestWebhookRegistration>({
pushRef: 'some-session-id',
workflowEntity,
});
await registrations.register(registration);
const promise = testWebhooks.executeWebhook(
mock<WebhookRequest>({ params: { path } }),
mock<express.Response>(),
);
await expect(promise).rejects.toThrowError(NotFoundError);
});
});
describe('deactivateWebhooks()', () => {
test('should add additional data to workflow', async () => {
registrations.getAllRegistrations.mockResolvedValue([{ workflowEntity, webhook }]);
const workflow = testWebhooks.toWorkflow(workflowEntity);
await testWebhooks.deactivateWebhooks(workflow);
expect(mockedAdditionalData.getBase).toHaveBeenCalledWith(userId);
});
});
});

View File

@@ -0,0 +1,142 @@
import { type Response } from 'express';
import { mock } from 'jest-mock-extended';
import { randomString } from 'n8n-workflow';
import type { IHttpRequestMethods } from 'n8n-workflow';
import type { IWebhookManager, WebhookCORSRequest, WebhookRequest } from '@/webhooks/webhook.types';
import { webhookRequestHandler } from '@/webhooks/WebhookHelpers';
describe('WebhookHelpers', () => {
describe('webhookRequestHandler', () => {
const webhookManager = mock<Required<IWebhookManager>>();
const handler = webhookRequestHandler(webhookManager);
beforeEach(() => {
jest.resetAllMocks();
});
it('should throw for unsupported methods', async () => {
const req = mock<WebhookRequest | WebhookCORSRequest>({
method: 'CONNECT' as IHttpRequestMethods,
});
const res = mock<Response>();
res.status.mockReturnValue(res);
await handler(req, res);
expect(res.status).toHaveBeenCalledWith(500);
expect(res.json).toHaveBeenCalledWith({
code: 0,
message: 'The method CONNECT is not supported.',
});
});
describe('preflight requests', () => {
it('should handle missing header for requested method', async () => {
const req = mock<WebhookRequest | WebhookCORSRequest>({
method: 'OPTIONS',
headers: {
origin: 'https://example.com',
'access-control-request-method': undefined,
},
params: { path: 'test' },
});
const res = mock<Response>();
res.status.mockReturnValue(res);
webhookManager.getWebhookMethods.mockResolvedValue(['GET', 'PATCH']);
await handler(req, res);
expect(res.status).toHaveBeenCalledWith(204);
expect(res.header).toHaveBeenCalledWith(
'Access-Control-Allow-Methods',
'OPTIONS, GET, PATCH',
);
});
it('should handle default origin and max-age', async () => {
const req = mock<WebhookRequest | WebhookCORSRequest>({
method: 'OPTIONS',
headers: {
origin: 'https://example.com',
'access-control-request-method': 'GET',
},
params: { path: 'test' },
});
const res = mock<Response>();
res.status.mockReturnValue(res);
webhookManager.getWebhookMethods.mockResolvedValue(['GET', 'PATCH']);
await handler(req, res);
expect(res.status).toHaveBeenCalledWith(204);
expect(res.header).toHaveBeenCalledWith(
'Access-Control-Allow-Methods',
'OPTIONS, GET, PATCH',
);
expect(res.header).toHaveBeenCalledWith(
'Access-Control-Allow-Origin',
'https://example.com',
);
expect(res.header).toHaveBeenCalledWith('Access-Control-Max-Age', '300');
});
it('should handle wildcard origin', async () => {
const randomOrigin = randomString(10);
const req = mock<WebhookRequest | WebhookCORSRequest>({
method: 'OPTIONS',
headers: {
origin: randomOrigin,
'access-control-request-method': 'GET',
},
params: { path: 'test' },
});
const res = mock<Response>();
res.status.mockReturnValue(res);
webhookManager.getWebhookMethods.mockResolvedValue(['GET', 'PATCH']);
webhookManager.findAccessControlOptions.mockResolvedValue({
allowedOrigins: '*',
});
await handler(req, res);
expect(res.status).toHaveBeenCalledWith(204);
expect(res.header).toHaveBeenCalledWith(
'Access-Control-Allow-Methods',
'OPTIONS, GET, PATCH',
);
expect(res.header).toHaveBeenCalledWith('Access-Control-Allow-Origin', randomOrigin);
});
it('should handle custom origin', async () => {
const req = mock<WebhookRequest | WebhookCORSRequest>({
method: 'OPTIONS',
headers: {
origin: 'https://example.com',
'access-control-request-method': 'GET',
},
params: { path: 'test' },
});
const res = mock<Response>();
res.status.mockReturnValue(res);
webhookManager.getWebhookMethods.mockResolvedValue(['GET', 'PATCH']);
webhookManager.findAccessControlOptions.mockResolvedValue({
allowedOrigins: 'https://test.com',
});
await handler(req, res);
expect(res.status).toHaveBeenCalledWith(204);
expect(res.header).toHaveBeenCalledWith(
'Access-Control-Allow-Methods',
'OPTIONS, GET, PATCH',
);
expect(res.header).toHaveBeenCalledWith('Access-Control-Allow-Origin', 'https://test.com');
});
});
});
});

View File

@@ -0,0 +1,98 @@
import type { CacheService } from '@/services/cache/cache.service';
import type { OrchestrationService } from '@/services/orchestration.service';
import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service';
import { TestWebhookRegistrationsService } from '@/webhooks/test-webhook-registrations.service';
import { mock } from 'jest-mock-extended';
describe('TestWebhookRegistrationsService', () => {
const cacheService = mock<CacheService>();
const registrations = new TestWebhookRegistrationsService(
cacheService,
mock<OrchestrationService>({ isMultiMainSetupEnabled: false }),
);
const registration = mock<TestWebhookRegistration>({
webhook: { httpMethod: 'GET', path: 'hello', webhookId: undefined },
});
const webhookKey = 'GET|hello';
const cacheKey = 'test-webhooks';
describe('register()', () => {
test('should register a test webhook registration', async () => {
await registrations.register(registration);
expect(cacheService.setHash).toHaveBeenCalledWith(cacheKey, { [webhookKey]: registration });
});
test('should skip setting TTL in single-main setup', async () => {
await registrations.register(registration);
expect(cacheService.expire).not.toHaveBeenCalled();
});
});
describe('deregister()', () => {
test('should deregister a test webhook registration', async () => {
await registrations.register(registration);
await registrations.deregister(webhookKey);
expect(cacheService.deleteFromHash).toHaveBeenCalledWith(cacheKey, webhookKey);
});
});
describe('get()', () => {
test('should retrieve a test webhook registration', async () => {
cacheService.getHashValue.mockResolvedValueOnce(registration);
const promise = registrations.get(webhookKey);
await expect(promise).resolves.toBe(registration);
});
test('should return undefined if no such test webhook registration was found', async () => {
cacheService.getHashValue.mockResolvedValueOnce(undefined);
const promise = registrations.get(webhookKey);
await expect(promise).resolves.toBeUndefined();
});
});
describe('getAllKeys()', () => {
test('should retrieve all test webhook registration keys', async () => {
cacheService.getHash.mockResolvedValueOnce({ [webhookKey]: registration });
const result = await registrations.getAllKeys();
expect(result).toEqual([webhookKey]);
});
});
describe('getAllRegistrations()', () => {
test('should retrieve all test webhook registrations', async () => {
cacheService.getHash.mockResolvedValueOnce({ [webhookKey]: registration });
const result = await registrations.getAllRegistrations();
expect(result).toEqual([registration]);
});
});
describe('deregisterAll()', () => {
test('should deregister all test webhook registrations', async () => {
await registrations.deregisterAll();
expect(cacheService.delete).toHaveBeenCalledWith(cacheKey);
});
});
describe('toKey()', () => {
test('should convert a test webhook registration to a key', () => {
const result = registrations.toKey(registration.webhook);
expect(result).toBe(webhookKey);
});
});
});

View File

@@ -0,0 +1,81 @@
import { mock } from 'jest-mock-extended';
import { WaitingWebhooks } from '@/webhooks/WaitingWebhooks';
import { ConflictError } from '@/errors/response-errors/conflict.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type { IExecutionResponse } from '@/Interfaces';
import type express from 'express';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { WaitingWebhookRequest } from '@/webhooks/webhook.types';
describe('WaitingWebhooks', () => {
const executionRepository = mock<ExecutionRepository>();
const waitingWebhooks = new WaitingWebhooks(mock(), mock(), executionRepository);
beforeEach(() => {
jest.restoreAllMocks();
});
it('should throw NotFoundError if there is no execution to resume', async () => {
/**
* Arrange
*/
executionRepository.findSingleExecution.mockResolvedValue(undefined);
/**
* Act
*/
const promise = waitingWebhooks.executeWebhook(
mock<WaitingWebhookRequest>(),
mock<express.Response>(),
);
/**
* Assert
*/
await expect(promise).rejects.toThrowError(NotFoundError);
});
it('should throw ConflictError if the execution to resume is already running', async () => {
/**
* Arrange
*/
executionRepository.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({ status: 'running' }),
);
/**
* Act
*/
const promise = waitingWebhooks.executeWebhook(
mock<WaitingWebhookRequest>(),
mock<express.Response>(),
);
/**
* Assert
*/
await expect(promise).rejects.toThrowError(ConflictError);
});
it('should throw ConflictError if the execution to resume already finished', async () => {
/**
* Arrange
*/
executionRepository.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({ finished: true }),
);
/**
* Act
*/
const promise = waitingWebhooks.executeWebhook(
mock<WaitingWebhookRequest>(),
mock<express.Response>(),
);
/**
* Assert
*/
await expect(promise).rejects.toThrowError(ConflictError);
});
});

View File

@@ -0,0 +1,190 @@
import { v4 as uuid } from 'uuid';
import config from '@/config';
import { WebhookRepository } from '@db/repositories/webhook.repository';
import { CacheService } from '@/services/cache/cache.service';
import { WebhookService } from '@/webhooks/webhook.service';
import { WebhookEntity } from '@db/entities/WebhookEntity';
import { mockInstance } from '@test/mocking';
const createWebhook = (method: string, path: string, webhookId?: string, pathSegments?: number) =>
Object.assign(new WebhookEntity(), {
method,
webhookPath: path,
webhookId,
pathSegments,
}) as WebhookEntity;
describe('WebhookService', () => {
const webhookRepository = mockInstance(WebhookRepository);
const cacheService = mockInstance(CacheService);
const webhookService = new WebhookService(webhookRepository, cacheService);
beforeEach(() => {
config.load(config.default);
jest.clearAllMocks();
});
[true, false].forEach((isCacheEnabled) => {
const tag = '[' + ['cache', isCacheEnabled ? 'enabled' : 'disabled'].join(' ') + ']';
describe(`findWebhook() - static case ${tag}`, () => {
test('should return the webhook if found', async () => {
const method = 'GET';
const path = 'user/profile';
const mockWebhook = createWebhook(method, path);
webhookRepository.findOneBy.mockResolvedValue(mockWebhook);
const returnedWebhook = await webhookService.findWebhook(method, path);
expect(returnedWebhook).toBe(mockWebhook);
});
test('should return null if not found', async () => {
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([]);
const returnValue = await webhookService.findWebhook('GET', 'user/profile');
expect(returnValue).toBeNull();
});
});
describe(`findWebhook() - dynamic case ${tag}`, () => {
test('should return the webhook if found', async () => {
const method = 'GET';
const webhookId = uuid();
const path = 'user/:id/posts';
const mockWebhook = createWebhook(method, path, webhookId, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook]); // dynamic
const returnedWebhook = await webhookService.findWebhook(
method,
[webhookId, 'user/123/posts'].join('/'),
);
expect(returnedWebhook).toBe(mockWebhook);
});
test('should handle subset dynamic path case', async () => {
const method1 = 'GET';
const webhookId1 = uuid();
const path1 = 'user/:id/posts';
const mockWebhook1 = createWebhook(method1, path1, webhookId1, 3);
const method2 = 'GET';
const webhookId2 = uuid();
const path2 = 'user/:id/posts/:postId/comments';
const mockWebhook2 = createWebhook(method2, path2, webhookId2, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook1, mockWebhook2]); // dynamic
const fullPath1 = [webhookId1, 'user/123/posts'].join('/');
const returnedWebhook1 = await webhookService.findWebhook(method1, fullPath1);
const fullPath2 = [webhookId1, 'user/123/posts/456/comments'].join('/');
const returnedWebhook2 = await webhookService.findWebhook(method2, fullPath2);
expect(returnedWebhook1).toBe(mockWebhook1);
expect(returnedWebhook2).toBe(mockWebhook2);
});
test('should handle single-segment dynamic path case', async () => {
const method1 = 'GET';
const webhookId1 = uuid();
const path1 = ':var';
const mockWebhook1 = createWebhook(method1, path1, webhookId1, 3);
const method2 = 'GET';
const webhookId2 = uuid();
const path2 = 'user/:id/posts/:postId/comments';
const mockWebhook2 = createWebhook(method2, path2, webhookId2, 3);
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([mockWebhook1, mockWebhook2]); // dynamic
const fullPath = [webhookId1, 'user/123/posts/456'].join('/');
const returnedWebhook = await webhookService.findWebhook(method1, fullPath);
expect(returnedWebhook).toBe(mockWebhook1);
});
test('should return null if not found', async () => {
const fullPath = [uuid(), 'user/:id/posts'].join('/');
webhookRepository.findOneBy.mockResolvedValue(null); // static
webhookRepository.findBy.mockResolvedValue([]); // dynamic
const returnValue = await webhookService.findWebhook('GET', fullPath);
expect(returnValue).toBeNull();
});
});
});
describe('getWebhookMethods()', () => {
test('should return all methods for webhook', async () => {
const path = 'user/profile';
webhookRepository.find.mockResolvedValue([
createWebhook('GET', path),
createWebhook('POST', path),
createWebhook('PUT', path),
createWebhook('PATCH', path),
]);
const returnedMethods = await webhookService.getWebhookMethods(path);
expect(returnedMethods).toEqual(['GET', 'POST', 'PUT', 'PATCH']);
});
test('should return empty array if no webhooks found', async () => {
webhookRepository.find.mockResolvedValue([]);
const returnedMethods = await webhookService.getWebhookMethods('user/profile');
expect(returnedMethods).toEqual([]);
});
});
describe('deleteWorkflowWebhooks()', () => {
test('should delete all webhooks of the workflow', async () => {
const mockWorkflowWebhooks = [
createWebhook('PUT', 'users'),
createWebhook('GET', 'user/:id'),
createWebhook('POST', ':var'),
];
webhookRepository.findBy.mockResolvedValue(mockWorkflowWebhooks);
const workflowId = uuid();
await webhookService.deleteWorkflowWebhooks(workflowId);
expect(webhookRepository.remove).toHaveBeenCalledWith(mockWorkflowWebhooks);
});
test('should not delete any webhooks if none found', async () => {
webhookRepository.findBy.mockResolvedValue([]);
const workflowId = uuid();
await webhookService.deleteWorkflowWebhooks(workflowId);
expect(webhookRepository.remove).toHaveBeenCalledWith([]);
});
});
describe('createWebhook()', () => {
test('should create the webhook', async () => {
const mockWebhook = createWebhook('GET', 'user/:id');
await webhookService.storeWebhook(mockWebhook);
expect(webhookRepository.insert).toHaveBeenCalledWith(mockWebhook);
});
});
});

View File

@@ -0,0 +1,93 @@
import { Service } from 'typedi';
import { CacheService } from '@/services/cache/cache.service';
import type { IWebhookData } from 'n8n-workflow';
import type { IWorkflowDb } from '@/Interfaces';
import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants';
import { OrchestrationService } from '@/services/orchestration.service';
export type TestWebhookRegistration = {
pushRef?: string;
workflowEntity: IWorkflowDb;
destinationNode?: string;
webhook: IWebhookData;
};
@Service()
export class TestWebhookRegistrationsService {
constructor(
private readonly cacheService: CacheService,
private readonly orchestrationService: OrchestrationService,
) {}
private readonly cacheKey = 'test-webhooks';
async register(registration: TestWebhookRegistration) {
const hashKey = this.toKey(registration.webhook);
await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration });
if (!this.orchestrationService.isMultiMainSetupEnabled) return;
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks.
* We set a TTL on the key so that it is cleared even on creator process crash,
* with an additional buffer to ensure this safeguard expiration will not delete
* the key before the regular test webhook timeout fetches the key to delete it.
*/
const ttl = TEST_WEBHOOK_TIMEOUT + TEST_WEBHOOK_TIMEOUT_BUFFER;
await this.cacheService.expire(this.cacheKey, ttl);
}
async deregister(arg: IWebhookData | string) {
if (typeof arg === 'string') {
await this.cacheService.deleteFromHash(this.cacheKey, arg);
} else {
const hashKey = this.toKey(arg);
await this.cacheService.deleteFromHash(this.cacheKey, hashKey);
}
}
async get(key: string) {
return await this.cacheService.getHashValue<TestWebhookRegistration>(this.cacheKey, key);
}
async getAllKeys() {
const hash = await this.cacheService.getHash<TestWebhookRegistration>(this.cacheKey);
if (!hash) return [];
return Object.keys(hash);
}
async getAllRegistrations() {
const hash = await this.cacheService.getHash<TestWebhookRegistration>(this.cacheKey);
if (!hash) return [];
return Object.values(hash);
}
async deregisterAll() {
await this.cacheService.delete(this.cacheKey);
}
toKey(webhook: Pick<IWebhookData, 'webhookId' | 'httpMethod' | 'path'>) {
const { webhookId, httpMethod, path: webhookPath } = webhook;
if (!webhookId) return [httpMethod, webhookPath].join('|');
let path = webhookPath;
if (path.startsWith(webhookId)) {
const cutFromIndex = path.indexOf('/') + 1;
path = path.slice(cutFromIndex);
}
return [httpMethod, webhookId, path.split('/').length].join('|');
}
}

View File

@@ -0,0 +1,120 @@
import { WebhookRepository } from '@db/repositories/webhook.repository';
import { Service } from 'typedi';
import { CacheService } from '@/services/cache/cache.service';
import type { WebhookEntity } from '@db/entities/WebhookEntity';
import type { IHttpRequestMethods } from 'n8n-workflow';
type Method = NonNullable<IHttpRequestMethods>;
@Service()
export class WebhookService {
constructor(
private webhookRepository: WebhookRepository,
private cacheService: CacheService,
) {}
async populateCache() {
const allWebhooks = await this.webhookRepository.find();
if (!allWebhooks) return;
void this.cacheService.setMany(allWebhooks.map((w) => [w.cacheKey, w]));
}
private async findCached(method: Method, path: string) {
const cacheKey = `webhook:${method}-${path}`;
const cachedWebhook = await this.cacheService.get(cacheKey);
if (cachedWebhook) return this.webhookRepository.create(cachedWebhook);
let dbWebhook = await this.findStaticWebhook(method, path);
if (dbWebhook === null) {
dbWebhook = await this.findDynamicWebhook(method, path);
}
void this.cacheService.set(cacheKey, dbWebhook);
return dbWebhook;
}
/**
* Find a matching webhook with zero dynamic path segments, e.g. `<uuid>` or `user/profile`.
*/
private async findStaticWebhook(method: Method, path: string) {
return await this.webhookRepository.findOneBy({ webhookPath: path, method });
}
/**
* Find a matching webhook with one or more dynamic path segments, e.g. `<uuid>/user/:id/posts`.
* It is mandatory for dynamic webhooks to have `<uuid>/` at the base.
*/
private async findDynamicWebhook(method: Method, path: string) {
const [uuidSegment, ...otherSegments] = path.split('/');
const dynamicWebhooks = await this.webhookRepository.findBy({
webhookId: uuidSegment,
method,
pathLength: otherSegments.length,
});
if (dynamicWebhooks.length === 0) return null;
const requestSegments = new Set(otherSegments);
const { webhook } = dynamicWebhooks.reduce<{
webhook: WebhookEntity | null;
maxMatches: number;
}>(
(acc, dw) => {
const allStaticSegmentsMatch = dw.staticSegments.every((s) => requestSegments.has(s));
if (allStaticSegmentsMatch && dw.staticSegments.length > acc.maxMatches) {
acc.maxMatches = dw.staticSegments.length;
acc.webhook = dw;
return acc;
} else if (dw.staticSegments.length === 0 && !acc.webhook) {
acc.webhook = dw; // edge case: if path is `:var`, match on anything
}
return acc;
},
{ webhook: null, maxMatches: 0 },
);
return webhook;
}
async findWebhook(method: Method, path: string) {
return await this.findCached(method, path);
}
async storeWebhook(webhook: WebhookEntity) {
void this.cacheService.set(webhook.cacheKey, webhook);
return await this.webhookRepository.insert(webhook);
}
createWebhook(data: Partial<WebhookEntity>) {
return this.webhookRepository.create(data);
}
async deleteWorkflowWebhooks(workflowId: string) {
const webhooks = await this.webhookRepository.findBy({ workflowId });
return await this.deleteWebhooks(webhooks);
}
private async deleteWebhooks(webhooks: WebhookEntity[]) {
void this.cacheService.deleteMany(webhooks.map((w) => w.cacheKey));
return await this.webhookRepository.remove(webhooks);
}
async getWebhookMethods(path: string) {
return await this.webhookRepository
.find({ select: ['method'], where: { webhookPath: path } })
.then((rows) => rows.map((r) => r.method));
}
}

View File

@@ -0,0 +1,37 @@
import type { Request, Response } from 'express';
import type { IDataObject, IHttpRequestMethods } from 'n8n-workflow';
export type WebhookCORSRequest = Request & { method: 'OPTIONS' };
export type WebhookRequest = Request<{ path: string }> & {
method: IHttpRequestMethods;
params: Record<string, string>;
};
export type WaitingWebhookRequest = WebhookRequest & {
params: WebhookRequest['path'] & { suffix?: string };
};
export interface WebhookAccessControlOptions {
allowedOrigins?: string;
}
export interface IWebhookManager {
/** Gets all request methods associated with a webhook path*/
getWebhookMethods?: (path: string) => Promise<IHttpRequestMethods[]>;
/** Find the CORS options matching a path and method */
findAccessControlOptions?: (
path: string,
httpMethod: IHttpRequestMethods,
) => Promise<WebhookAccessControlOptions | undefined>;
executeWebhook(req: WebhookRequest, res: Response): Promise<IWebhookResponseCallbackData>;
}
export interface IWebhookResponseCallbackData {
data?: IDataObject | IDataObject[];
headers?: object;
noWebhookResponse?: boolean;
responseCode?: number;
}