feat(editor): Adds a EE view to show worker details and job status (#7600)

This change expands on the command channel communication introduced
lately between the main instance(s) and the workers. The frontend gets a
new menu entry "Workers" which will, when opened, trigger a regular call
to getStatus from the workers. The workers then respond via their
response channel to the backend, which then pushes the status to the
frontend.
This introduces the use of ChartJS for metrics.
This feature is still in MVP state and thus disabled by default for the
moment.
This commit is contained in:
Michael Auerswald
2023-11-10 23:48:31 +01:00
committed by GitHub
parent 0ddafd2b82
commit cbc690907f
39 changed files with 1125 additions and 50 deletions

View File

@@ -46,6 +46,7 @@ import type { UserRepository } from '@db/repositories/user.repository';
import type { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
import type { WorkerJobStatusSummary } from './services/orchestration/worker/types';
export interface ICredentialsTypeData {
[key: string]: CredentialLoadingDetails;
@@ -466,7 +467,8 @@ export type IPushData =
| PushDataTestWebhook
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged;
| PushDataActiveWorkflowUsersChanged
| PushDataWorkerStatusMessage;
type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
@@ -503,7 +505,12 @@ export type PushDataConsoleMessage = {
type: 'sendConsoleMessage';
};
export type PushDataReloadNodeType = {
type PushDataWorkerStatusMessage = {
data: IPushDataWorkerStatusMessage;
type: 'sendWorkerStatusMessage';
};
type PushDataReloadNodeType = {
data: IPushDataReloadNodeType;
type: 'reloadNodeType';
};
@@ -583,6 +590,30 @@ export interface IPushDataConsoleMessage {
message: string;
}
export interface IPushDataWorkerStatusMessage {
workerId: string;
status: IPushDataWorkerStatusPayload;
}
export interface IPushDataWorkerStatusPayload {
workerId: string;
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
interfaces: Array<{
family: 'IPv4' | 'IPv6';
address: string;
internal: boolean;
}>;
version: string;
}
export interface IResponseCallbackData {
data?: IDataObject | IDataObject[];
headers?: object;

View File

@@ -253,6 +253,10 @@ export class License {
return this.isFeatureEnabled(LICENSE_FEATURES.API_DISABLED);
}
isWorkerViewLicensed() {
return this.isFeatureEnabled(LICENSE_FEATURES.WORKER_VIEW);
}
getCurrentEntitlements() {
return this.manager?.getCurrentEntitlements() ?? [];
}

View File

@@ -80,6 +80,7 @@ export const LICENSE_FEATURES = {
DEBUG_IN_EDITOR: 'feat:debugInEditor',
BINARY_DATA_S3: 'feat:binaryDataS3',
MULTIPLE_MAIN_INSTANCES: 'feat:multipleMainInstances',
WORKER_VIEW: 'feat:workerView',
} as const;
export const LICENSE_QUOTAS = {

View File

@@ -70,6 +70,7 @@ export class E2EController {
[LICENSE_FEATURES.DEBUG_IN_EDITOR]: false,
[LICENSE_FEATURES.BINARY_DATA_S3]: false,
[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES]: false,
[LICENSE_FEATURES.WORKER_VIEW]: false,
};
constructor(
@@ -99,6 +100,13 @@ export class E2EController {
this.enabledFeatures[feature] = enabled;
}
@Patch('/queue-mode')
async setQueueMode(req: Request<{}, {}, { enabled: boolean }>) {
const { enabled } = req.body;
config.set('executions.mode', enabled ? 'queue' : 'regular');
return { success: true, message: `Queue mode set to ${config.getEnv('executions.mode')}` };
}
private resetFeatures() {
for (const feature of Object.keys(this.enabledFeatures)) {
this.enabledFeatures[feature as BooleanLicenseFeature] = false;

View File

@@ -1,32 +1,38 @@
import { Authorized, Get, RestController } from '@/decorators';
import { Authorized, Post, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { License } from '../License';
@Authorized(['global', 'owner'])
@RestController('/orchestration')
@Service()
export class OrchestrationController {
constructor(private readonly orchestrationService: SingleMainInstancePublisher) {}
constructor(
private readonly orchestrationService: SingleMainInstancePublisher,
private readonly licenseService: License,
) {}
/**
* These endpoint currently do not return anything, they just trigger the messsage to
* These endpoints do not return anything, they just trigger the messsage to
* the workers to respond on Redis with their status.
* TODO: these responses need to be forwarded to and handled by the frontend
*/
@Get('/worker/status/:id')
@Post('/worker/status/:id')
async getWorkersStatus(req: OrchestrationRequest.Get) {
if (!this.licenseService.isWorkerViewLicensed()) return;
const id = req.params.id;
return this.orchestrationService.getWorkerStatus(id);
}
@Get('/worker/status')
@Post('/worker/status')
async getWorkersStatusAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerStatus();
}
@Get('/worker/ids')
@Post('/worker/ids')
async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerIds();
}
}

View File

@@ -175,6 +175,7 @@ export class FrontendService {
debugInEditor: false,
binaryDataS3: false,
workflowHistory: false,
workerView: false,
},
mfa: {
enabled: false,
@@ -263,6 +264,7 @@ export class FrontendService {
binaryDataS3: isS3Available && isS3Selected && isS3Licensed,
workflowHistory:
this.license.isWorkflowHistoryLicensed() && config.getEnv('workflowHistory.enabled'),
workerView: this.license.isWorkerViewLicensed(),
});
if (this.license.isLdapEnabled()) {
@@ -296,6 +298,8 @@ export class FrontendService {
this.settings.mfa.enabled = config.get('mfa.enabled');
this.settings.executionMode = config.getEnv('executions.mode');
return this.settings;
}

View File

@@ -1,15 +1,27 @@
import { jsonParse } from 'n8n-workflow';
import Container from 'typedi';
import { Logger } from '@/Logger';
import { Push } from '../../../push';
import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands';
export async function handleWorkerResponseMessageMain(messageString: string) {
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(messageString);
if (workerResponse) {
// TODO: Handle worker response
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
switch (workerResponse.command) {
case 'getStatus':
const push = Container.get(Push);
push.broadcast('sendWorkerStatusMessage', {
workerId: workerResponse.workerId,
status: workerResponse.payload,
});
break;
case 'getId':
break;
default:
Container.get(Logger).debug(
`Received worker response ${workerResponse.command} from ${workerResponse.workerId}`,
);
}
}
return workerResponse;
}

View File

@@ -9,6 +9,7 @@ import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager
import { debounceMessageReceiver, getOsCpuString } from '../helpers';
import type { WorkerCommandReceivedHandlerOptions } from './types';
import { Logger } from '@/Logger';
import { N8N_VERSION } from '@/constants';
export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) {
return async (channel: string, messageString: string) => {
@@ -33,13 +34,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
}
switch (message.command) {
case 'getStatus':
if (!debounceMessageReceiver(message, 200)) return;
if (!debounceMessageReceiver(message, 500)) return;
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'getStatus',
payload: {
workerId: options.queueModeId,
runningJobs: options.getRunningJobIds(),
runningJobsSummary: options.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
@@ -49,27 +49,32 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
net: Object.values(os.networkInterfaces()).flatMap(
interfaces: Object.values(os.networkInterfaces()).flatMap(
(interfaces) =>
interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '',
(interfaces ?? [])?.map((net) => ({
family: net.family,
address: net.address,
internal: net.internal,
})),
),
version: N8N_VERSION,
},
});
break;
case 'getId':
if (!debounceMessageReceiver(message, 200)) return;
if (!debounceMessageReceiver(message, 500)) return;
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'getId',
});
break;
case 'restartEventBus':
if (!debounceMessageReceiver(message, 100)) return;
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(MessageEventBus).restart();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'restartEventBus',
payload: {
result: 'success',
},
@@ -77,7 +82,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'restartEventBus',
payload: {
result: 'error',
error: (error as Error).message,
@@ -86,12 +91,12 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
}
break;
case 'reloadExternalSecretsProviders':
if (!debounceMessageReceiver(message, 200)) return;
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'success',
},
@@ -99,7 +104,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
} catch (error) {
await options.redisPublisher.publishToWorkerChannel({
workerId: options.queueModeId,
command: message.command,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'error',
error: (error as Error).message,

View File

@@ -1,4 +1,4 @@
import type { WorkerJobStatusSummary } from '../orchestration/worker/types';
import type { IPushDataWorkerStatusPayload } from '@/Interfaces';
export type RedisServiceCommand =
| 'getStatus'
@@ -28,20 +28,7 @@ export type RedisServiceWorkerResponseObject = {
| RedisServiceBaseCommand
| {
command: 'getStatus';
payload: {
workerId: string;
runningJobs: string[];
runningJobsSummary: WorkerJobStatusSummary[];
freeMem: number;
totalMem: number;
uptime: number;
loadAvg: number[];
cpus: string;
arch: string;
platform: NodeJS.Platform;
hostname: string;
net: string[];
};
payload: IPushDataWorkerStatusPayload;
}
| {
command: 'getId';