fix(core): Remove threads pkg, rewrite log writer worker (#5134)

This commit is contained in:
Michael Auerswald
2023-01-13 15:39:25 +01:00
committed by GitHub
parent b7faf4a0df
commit e845eb33f9
15 changed files with 287 additions and 286 deletions

View File

@@ -3,9 +3,8 @@
import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage';
import { UserSettings } from 'n8n-core';
import path, { parse } from 'path';
import { ModuleThread, spawn, Thread, Worker } from 'threads';
import { MessageEventBusLogWriterWorker } from './MessageEventBusLogWriterWorker';
import { createReadStream, existsSync } from 'fs';
import { Worker } from 'worker_threads';
import { createReadStream, existsSync, rmSync } from 'fs';
import readline from 'readline';
import { jsonParse, LoggerProxy } from 'n8n-workflow';
import remove from 'lodash.remove';
@@ -19,15 +18,21 @@ import {
isEventMessageConfirm,
} from '../EventMessageClasses/EventMessageConfirm';
import { once as eventOnce } from 'events';
import { inTest } from '../../constants';
interface MessageEventBusLogWriterOptions {
syncFileAccess?: boolean;
interface MessageEventBusLogWriterConstructorOptions {
logBaseName?: string;
logBasePath?: string;
keepLogCount?: number;
keepNumberOfFiles?: number;
maxFileSizeInKB?: number;
}
export interface MessageEventBusLogWriterOptions {
logFullBasePath: string;
keepNumberOfFiles: number;
maxFileSizeInKB: number;
}
interface ReadMessagesFromLogFileResult {
loggedMessages: EventMessageTypes[];
sentMessages: EventMessageTypes[];
@@ -42,7 +47,11 @@ export class MessageEventBusLogWriter {
static options: Required<MessageEventBusLogWriterOptions>;
private worker: ModuleThread<MessageEventBusLogWriterWorker> | null;
private _worker: Worker | undefined;
public get worker(): Worker | undefined {
return this._worker;
}
/**
* Instantiates the Writer and the corresponding worker thread.
@@ -51,16 +60,17 @@ export class MessageEventBusLogWriter {
* **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging()
*/
static async getInstance(
options?: MessageEventBusLogWriterOptions,
options?: MessageEventBusLogWriterConstructorOptions,
): Promise<MessageEventBusLogWriter> {
if (!MessageEventBusLogWriter.instance) {
MessageEventBusLogWriter.instance = new MessageEventBusLogWriter();
MessageEventBusLogWriter.options = {
logBaseName: options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'),
logBasePath: options?.logBasePath ?? UserSettings.getUserN8nFolderPath(),
syncFileAccess:
options?.syncFileAccess ?? config.getEnv('eventBus.logWriter.syncFileAccess'),
keepLogCount: options?.keepLogCount ?? config.getEnv('eventBus.logWriter.keepLogCount'),
logFullBasePath: path.join(
options?.logBasePath ?? UserSettings.getUserN8nFolderPath(),
options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'),
),
keepNumberOfFiles:
options?.keepNumberOfFiles ?? config.getEnv('eventBus.logWriter.keepLogCount'),
maxFileSizeInKB:
options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'),
};
@@ -73,15 +83,19 @@ export class MessageEventBusLogWriter {
* First archives existing log files one history level upwards,
* then starts logging events into a fresh event log
*/
async startLogging() {
await MessageEventBusLogWriter.instance.getThread()?.startLogging();
startLogging() {
if (this.worker) {
this.worker.postMessage({ command: 'startLogging', data: {} });
}
}
/**
* Pauses all logging. Events are still received by the worker, they just are not logged any more
*/
async pauseLogging() {
await MessageEventBusLogWriter.instance.getThread()?.pauseLogging();
if (this.worker) {
this.worker.postMessage({ command: 'pauseLogging', data: {} });
}
}
private async startThread() {
@@ -89,26 +103,23 @@ export class MessageEventBusLogWriter {
await this.close();
}
await MessageEventBusLogWriter.instance.spawnThread();
await MessageEventBusLogWriter.instance
.getThread()
?.initialize(
path.join(
MessageEventBusLogWriter.options.logBasePath,
MessageEventBusLogWriter.options.logBaseName,
),
MessageEventBusLogWriter.options.syncFileAccess,
MessageEventBusLogWriter.options.keepLogCount,
MessageEventBusLogWriter.options.maxFileSizeInKB,
);
if (this.worker) {
this.worker.postMessage({ command: 'initialize', data: MessageEventBusLogWriter.options });
}
}
private async spawnThread(): Promise<boolean> {
this.worker = await spawn<MessageEventBusLogWriterWorker>(
new Worker(`${parse(__filename).name}Worker`),
);
const parsedName = parse(__filename);
let workerFileName;
if (inTest) {
workerFileName = './dist/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.js';
} else {
workerFileName = path.join(parsedName.dir, `${parsedName.name}Worker${parsedName.ext}`);
}
this._worker = new Worker(workerFileName);
if (this.worker) {
Thread.errors(this.worker).subscribe(async (error) => {
LoggerProxy.error('Event Bus Log Writer thread error', error);
this.worker.on('messageerror', async (error) => {
LoggerProxy.error('Event Bus Log Writer thread error, attempting to restart...', error);
await MessageEventBusLogWriter.instance.startThread();
});
return true;
@@ -116,29 +127,25 @@ export class MessageEventBusLogWriter {
return false;
}
getThread(): ModuleThread<MessageEventBusLogWriterWorker> | undefined {
if (this.worker) {
return this.worker;
}
return;
}
async close(): Promise<void> {
if (this.worker) {
await Thread.terminate(this.worker);
this.worker = null;
await this.worker.terminate();
this._worker = undefined;
}
}
async putMessage(msg: EventMessageTypes): Promise<void> {
putMessage(msg: EventMessageTypes): void {
if (this.worker) {
await this.worker.appendMessageToLog(msg.serialize());
this.worker.postMessage({ command: 'appendMessageToLog', data: msg.serialize() });
}
}
async confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): Promise<void> {
confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): void {
if (this.worker) {
await this.worker.confirmMessageSent(new EventMessageConfirm(msgId, source).serialize());
this.worker.postMessage({
command: 'confirmMessageSent',
data: new EventMessageConfirm(msgId, source).serialize(),
});
}
}
@@ -155,7 +162,7 @@ export class MessageEventBusLogWriter {
? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory)
: (config.get('eventBus.logWriter.keepLogCount') as number);
for (let i = logCount; i >= 0; i--) {
const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i);
const logFileName = this.getLogFileName(i);
if (logFileName) {
await this.readLoggedMessagesFromFile(results, mode, logFileName);
}
@@ -212,6 +219,22 @@ export class MessageEventBusLogWriter {
return results;
}
getLogFileName(counter?: number): string {
if (counter) {
return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`;
} else {
return `${MessageEventBusLogWriter.options.logFullBasePath}.log`;
}
}
cleanAllLogs() {
for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) {
if (existsSync(this.getLogFileName(i))) {
rmSync(this.getLogFileName(i));
}
}
}
async getMessagesByExecutionId(
executionId: string,
logHistory?: number,
@@ -221,7 +244,7 @@ export class MessageEventBusLogWriter {
? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory)
: (config.get('eventBus.logWriter.keepLogCount') as number);
for (let i = 0; i < logCount; i++) {
const logFileName = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(i);
const logFileName = this.getLogFileName(i);
if (logFileName) {
result.push(...(await this.readFromFileByExecutionId(executionId, logFileName)));
}