ci: @n8n/task-runner package setup (no-changelog) (#11067)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-10-02 16:14:57 +02:00
committed by GitHub
parent 4546649c61
commit 49c71469f4
13 changed files with 256 additions and 434 deletions

View File

@@ -0,0 +1,47 @@
import { ApplicationError } from 'n8n-workflow';
import * as a from 'node:assert/strict';
export type AuthOpts = {
n8nUri: string;
authToken: string;
};
/**
* Requests a one-time token that can be used to establish a task runner connection
*/
export async function authenticate(opts: AuthOpts) {
try {
const authEndpoint = `http://${opts.n8nUri}/rest/runners/auth`;
const response = await fetch(authEndpoint, {
method: 'POST',
headers: {
// eslint-disable-next-line @typescript-eslint/naming-convention
'Content-Type': 'application/json',
},
body: JSON.stringify({
token: opts.authToken,
}),
});
if (!response.ok) {
throw new ApplicationError(
`Invalid response status ${response.status}: ${await response.text()}`,
);
}
const { data } = (await response.json()) as { data: { token: string } };
const grantToken = data.token;
a.ok(grantToken);
return grantToken;
} catch (e) {
console.error(e);
const error = e as Error;
throw new ApplicationError(
`Could not connect to n8n message broker ${opts.n8nUri}: ${error.message}`,
{
cause: error,
},
);
}
}

View File

@@ -0,0 +1,147 @@
import { getAdditionalKeys } from 'n8n-core';
import {
type INode,
type INodeType,
type ITaskDataConnections,
type IWorkflowExecuteAdditionalData,
WorkflowDataProxy,
type WorkflowParameters,
type IDataObject,
type IExecuteData,
type INodeExecutionData,
type INodeParameters,
type IRunExecutionData,
// type IWorkflowDataProxyAdditionalKeys,
Workflow,
type WorkflowExecuteMode,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from './runner-types';
import { type Task, TaskRunner } from './task-runner';
interface JSExecSettings {
code: string;
// For workflow data proxy
mode: WorkflowExecuteMode;
}
export interface PartialAdditionalData {
executionId?: string;
restartExecutionId?: string;
restApiUrl: string;
instanceBaseUrl: string;
formWaitingBaseUrl: string;
webhookBaseUrl: string;
webhookWaitingBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters?: INodeParameters;
executionTimeoutTimestamp?: number;
userId?: string;
variables: IDataObject;
}
export interface AllCodeTaskData {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;
runExecutionData: IRunExecutionData;
runIndex: number;
itemIndex: number;
activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
executeData?: IExecuteData;
defaultReturnRunIndex: number;
selfData: IDataObject;
contextNodeName: string;
additionalData: PartialAdditionalData;
}
export class JsTaskRunner extends TaskRunner {
constructor(
taskType: string,
wsUrl: string,
grantToken: string,
maxConcurrency: number,
name?: string,
) {
super(taskType, wsUrl, grantToken, maxConcurrency, name ?? 'JS Task Runner');
}
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
const allData = await this.requestData<AllCodeTaskData>(task.taskId, 'all');
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');
const workflowParams = allData.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: {
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getByName() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return {};
},
},
});
const dataProxy = new WorkflowDataProxy(
workflow,
allData.runExecutionData,
allData.runIndex,
allData.itemIndex,
allData.activeNodeName,
allData.connectionInputData,
allData.siblingParameters,
settings.mode,
getAdditionalKeys(
allData.additionalData as IWorkflowExecuteAdditionalData,
allData.mode,
allData.runExecutionData,
),
allData.executeData,
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
);
const customConsole = {
log: (...args: unknown[]) => {
const logOutput = args
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
console.log('[JS Code]', logOutput);
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};
const context: Context = {
require,
module: {},
console: customConsole,
...dataProxy.getDataProxy(),
...this.buildRpcCallObject(task.taskId),
};
const result = (await runInNewContext(
`module.exports = async function() {${settings.code}\n}()`,
context,
)) as TaskResultData['result'];
return {
result,
customData: allData.runExecutionData.resultData.metadata,
};
}
}

View File

@@ -0,0 +1,2 @@
export * from './task-runner';
export * from './runner-types';

View File

@@ -0,0 +1,231 @@
import type { INodeExecutionData } from 'n8n-workflow';
export type DataRequestType = 'input' | 'node' | 'all';
export interface TaskResultData {
result: INodeExecutionData[];
customData?: Record<string, string>;
}
export namespace N8nMessage {
export namespace ToRunner {
export interface InfoRequest {
type: 'broker:inforequest';
}
export interface RunnerRegistered {
type: 'broker:runnerregistered';
}
export interface TaskOfferAccept {
type: 'broker:taskofferaccept';
taskId: string;
offerId: string;
}
export interface TaskCancel {
type: 'broker:taskcancel';
taskId: string;
reason: string;
}
export interface TaskSettings {
type: 'broker:tasksettings';
taskId: string;
settings: unknown;
}
export interface RPCResponse {
type: 'broker:rpcresponse';
callId: string;
taskId: string;
status: 'success' | 'error';
data: unknown;
}
export interface TaskDataResponse {
type: 'broker:taskdataresponse';
taskId: string;
requestId: string;
data: unknown;
}
export type All =
| InfoRequest
| TaskOfferAccept
| TaskCancel
| TaskSettings
| RunnerRegistered
| RPCResponse
| TaskDataResponse;
}
export namespace ToRequester {
export interface TaskReady {
type: 'broker:taskready';
requestId: string;
taskId: string;
}
export interface TaskDone {
type: 'broker:taskdone';
taskId: string;
data: TaskResultData;
}
export interface TaskError {
type: 'broker:taskerror';
taskId: string;
error: unknown;
}
export interface TaskDataRequest {
type: 'broker:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
}
export interface RPC {
type: 'broker:rpc';
callId: string;
taskId: string;
name: (typeof RPC_ALLOW_LIST)[number];
params: unknown[];
}
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC;
}
}
export namespace RequesterMessage {
export namespace ToN8n {
export interface TaskSettings {
type: 'requester:tasksettings';
taskId: string;
settings: unknown;
}
export interface TaskCancel {
type: 'requester:taskcancel';
taskId: string;
reason: string;
}
export interface TaskDataResponse {
type: 'requester:taskdataresponse';
taskId: string;
requestId: string;
data: unknown;
}
export interface RPCResponse {
type: 'requester:rpcresponse';
taskId: string;
callId: string;
status: 'success' | 'error';
data: unknown;
}
export interface TaskRequest {
type: 'requester:taskrequest';
requestId: string;
taskType: string;
}
export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest;
}
}
export namespace RunnerMessage {
export namespace ToN8n {
export interface Info {
type: 'runner:info';
name: string;
types: string[];
}
export interface TaskAccepted {
type: 'runner:taskaccepted';
taskId: string;
}
export interface TaskRejected {
type: 'runner:taskrejected';
taskId: string;
reason: string;
}
export interface TaskDone {
type: 'runner:taskdone';
taskId: string;
data: TaskResultData;
}
export interface TaskError {
type: 'runner:taskerror';
taskId: string;
error: unknown;
}
export interface TaskOffer {
type: 'runner:taskoffer';
offerId: string;
taskType: string;
validFor: number;
}
export interface TaskDataRequest {
type: 'runner:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
}
export interface RPC {
type: 'runner:rpc';
callId: string;
taskId: string;
name: (typeof RPC_ALLOW_LIST)[number];
params: unknown[];
}
export type All =
| Info
| TaskDone
| TaskError
| TaskAccepted
| TaskRejected
| TaskOffer
| RPC
| TaskDataRequest;
}
}
export const RPC_ALLOW_LIST = [
'helpers.httpRequestWithAuthentication',
'helpers.requestWithAuthenticationPaginated',
// "helpers.normalizeItems"
// "helpers.constructExecutionMetaData"
// "helpers.assertBinaryData"
'helpers.getBinaryDataBuffer',
// "helpers.copyInputItems"
// "helpers.returnJsonArray"
'helpers.getSSHClient',
'helpers.createReadStream',
// "helpers.getStoragePath"
'helpers.writeContentToFile',
'helpers.prepareBinaryData',
'helpers.setBinaryDataBuffer',
'helpers.copyBinaryFile',
'helpers.binaryToBuffer',
// "helpers.binaryToString"
// "helpers.getBinaryPath"
'helpers.getBinaryStream',
'helpers.getBinaryMetadata',
'helpers.createDeferredPromise',
'helpers.httpRequest',
'logNodeOutput',
] as const;

View File

@@ -0,0 +1,48 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import * as a from 'node:assert/strict';
import { authenticate } from './authenticator';
import { JsTaskRunner } from './code';
type Config = {
n8nUri: string;
authToken?: string;
grantToken?: string;
};
function readAndParseConfig(): Config {
const authToken = process.env.N8N_RUNNERS_AUTH_TOKEN;
const grantToken = process.env.N8N_RUNNERS_GRANT_TOKEN;
if (!authToken && !grantToken) {
throw new ApplicationError(
'Missing task runner authentication. Use either N8N_RUNNERS_AUTH_TOKEN or N8N_RUNNERS_GRANT_TOKEN to configure it',
);
}
return {
n8nUri: process.env.N8N_RUNNERS_N8N_URI ?? 'localhost:5678',
authToken,
grantToken,
};
}
void (async function start() {
const config = readAndParseConfig();
let grantToken = config.grantToken;
if (!grantToken) {
a.ok(config.authToken);
grantToken = await authenticate({
authToken: config.authToken,
n8nUri: config.n8nUri,
});
}
const wsUrl = `ws://${config.n8nUri}/runners/_ws`;
new JsTaskRunner('javascript', wsUrl, grantToken, 5);
})().catch((e) => {
const error = ensureError(e);
console.error('Task runner failed to start', { error });
process.exit(1);
});

View File

@@ -0,0 +1,362 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { URL } from 'node:url';
import { type MessageEvent, WebSocket } from 'ws';
import {
RPC_ALLOW_LIST,
type RunnerMessage,
type N8nMessage,
type TaskResultData,
} from './runner-types';
export interface Task<T = unknown> {
taskId: string;
settings?: T;
active: boolean;
cancelled: boolean;
}
export interface TaskOffer {
offerId: string;
validUntil: bigint;
}
interface DataRequest {
requestId: string;
resolve: (data: unknown) => void;
reject: (error: unknown) => void;
}
interface RPCCall {
callId: string;
resolve: (data: unknown) => void;
reject: (error: unknown) => void;
}
export interface RPCCallObject {
[name: string]: ((...args: unknown[]) => Promise<unknown>) | RPCCallObject;
}
const VALID_TIME_MS = 1000;
const VALID_EXTRA_MS = 100;
export abstract class TaskRunner {
id: string = nanoid();
ws: WebSocket;
canSendOffers = false;
runningTasks: Map<Task['taskId'], Task> = new Map();
offerInterval: NodeJS.Timeout | undefined;
openOffers: Map<TaskOffer['offerId'], TaskOffer> = new Map();
dataRequests: Map<DataRequest['requestId'], DataRequest> = new Map();
rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map();
constructor(
public taskType: string,
wsUrl: string,
grantToken: string,
private maxConcurrency: number,
public name?: string,
) {
const url = new URL(wsUrl);
url.searchParams.append('id', this.id);
this.ws = new WebSocket(url.toString(), {
headers: {
authorization: `Bearer ${grantToken}`,
},
});
this.ws.addEventListener('message', this.receiveMessage);
this.ws.addEventListener('close', this.stopTaskOffers);
}
private receiveMessage = (message: MessageEvent) => {
// eslint-disable-next-line n8n-local-rules/no-uncaught-json-parse
const data = JSON.parse(message.data as string) as N8nMessage.ToRunner.All;
void this.onMessage(data);
};
private stopTaskOffers = () => {
this.canSendOffers = false;
if (this.offerInterval) {
clearInterval(this.offerInterval);
this.offerInterval = undefined;
}
};
private startTaskOffers() {
this.canSendOffers = true;
if (this.offerInterval) {
clearInterval(this.offerInterval);
}
this.offerInterval = setInterval(() => this.sendOffers(), 250);
}
deleteStaleOffers() {
this.openOffers.forEach((offer, key) => {
if (offer.validUntil < process.hrtime.bigint()) {
this.openOffers.delete(key);
}
});
}
sendOffers() {
this.deleteStaleOffers();
const offersToSend =
this.maxConcurrency -
(Object.values(this.openOffers).length + Object.values(this.runningTasks).length);
for (let i = 0; i < offersToSend; i++) {
const offer: TaskOffer = {
offerId: nanoid(),
validUntil: process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency
};
this.openOffers.set(offer.offerId, offer);
this.send({
type: 'runner:taskoffer',
taskType: this.taskType,
offerId: offer.offerId,
validFor: VALID_TIME_MS,
});
}
}
send(message: RunnerMessage.ToN8n.All) {
this.ws.send(JSON.stringify(message));
}
onMessage(message: N8nMessage.ToRunner.All) {
switch (message.type) {
case 'broker:inforequest':
this.send({
type: 'runner:info',
name: this.name ?? 'Node.js Task Runner SDK',
types: [this.taskType],
});
break;
case 'broker:runnerregistered':
this.startTaskOffers();
break;
case 'broker:taskofferaccept':
this.offerAccepted(message.offerId, message.taskId);
break;
case 'broker:taskcancel':
this.taskCancelled(message.taskId);
break;
case 'broker:tasksettings':
void this.receivedSettings(message.taskId, message.settings);
break;
case 'broker:taskdataresponse':
this.processDataResponse(message.requestId, message.data);
break;
case 'broker:rpcresponse':
this.handleRpcResponse(message.callId, message.status, message.data);
}
}
processDataResponse(requestId: string, data: unknown) {
const request = this.dataRequests.get(requestId);
if (!request) {
return;
}
// Deleting of the request is handled in `requestData`, using a
// `finally` wrapped around the return
request.resolve(data);
}
hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency;
}
offerAccepted(offerId: string, taskId: string) {
if (!this.hasOpenTasks()) {
this.send({
type: 'runner:taskrejected',
taskId,
reason: 'No open task slots',
});
return;
}
const offer = this.openOffers.get(offerId);
if (!offer) {
this.send({
type: 'runner:taskrejected',
taskId,
reason: 'Offer expired and no open task slots',
});
return;
} else {
this.openOffers.delete(offerId);
}
this.runningTasks.set(taskId, {
taskId,
active: false,
cancelled: false,
});
this.send({
type: 'runner:taskaccepted',
taskId,
});
}
taskCancelled(taskId: string) {
const task = this.runningTasks.get(taskId);
if (!task) {
return;
}
task.cancelled = true;
if (task.active) {
// TODO
} else {
this.runningTasks.delete(taskId);
}
this.sendOffers();
}
taskErrored(taskId: string, error: unknown) {
this.send({
type: 'runner:taskerror',
taskId,
error,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
taskDone(taskId: string, data: RunnerMessage.ToN8n.TaskDone['data']) {
this.send({
type: 'runner:taskdone',
taskId,
data,
});
this.runningTasks.delete(taskId);
this.sendOffers();
}
async receivedSettings(taskId: string, settings: unknown) {
const task = this.runningTasks.get(taskId);
if (!task) {
return;
}
if (task.cancelled) {
this.runningTasks.delete(taskId);
return;
}
task.settings = settings;
task.active = true;
try {
const data = await this.executeTask(task);
this.taskDone(taskId, data);
} catch (e) {
if (ensureError(e)) {
this.taskErrored(taskId, (e as Error).message);
} else {
this.taskErrored(taskId, e);
}
}
}
// eslint-disable-next-line @typescript-eslint/naming-convention
async executeTask(_task: Task): Promise<TaskResultData> {
throw new ApplicationError('Unimplemented');
}
async requestData<T = unknown>(
taskId: Task['taskId'],
type: RunnerMessage.ToN8n.TaskDataRequest['requestType'],
param?: string,
): Promise<T> {
const requestId = nanoid();
const p = new Promise<T>((resolve, reject) => {
this.dataRequests.set(requestId, {
requestId,
resolve: resolve as (data: unknown) => void,
reject,
});
});
this.send({
type: 'runner:taskdatarequest',
taskId,
requestId,
requestType: type,
param,
});
try {
return await p;
} finally {
this.dataRequests.delete(requestId);
}
}
async makeRpcCall(taskId: string, name: RunnerMessage.ToN8n.RPC['name'], params: unknown[]) {
const callId = nanoid();
const dataPromise = new Promise((resolve, reject) => {
this.rpcCalls.set(callId, {
callId,
resolve,
reject,
});
});
this.send({
type: 'runner:rpc',
callId,
taskId,
name,
params,
});
try {
return await dataPromise;
} finally {
this.rpcCalls.delete(callId);
}
}
handleRpcResponse(
callId: string,
status: N8nMessage.ToRunner.RPCResponse['status'],
data: unknown,
) {
const call = this.rpcCalls.get(callId);
if (!call) {
return;
}
if (status === 'success') {
call.resolve(data);
} else {
call.reject(typeof data === 'string' ? new Error(data) : data);
}
}
buildRpcCallObject(taskId: string) {
const rpcObject: RPCCallObject = {};
for (const r of RPC_ALLOW_LIST) {
const splitPath = r.split('.');
let obj = rpcObject;
splitPath.forEach((s, index) => {
if (index !== splitPath.length - 1) {
obj[s] = {};
obj = obj[s];
return;
}
obj[s] = async (...args: unknown[]) => await this.makeRpcCall(taskId, r, args);
});
}
return rpcObject;
}
}