feat: Add global event bus (#4860)
* fix branch * fix deserialize, add filewriter * add catchAll eventGroup/Name * adding simple Redis sender and receiver to eventbus * remove native node threads * improve eventbus * refactor and simplify * more refactoring and syslog client * more refactor, improved endpoints and eventbus * remove local broker and receivers from mvp * destination de/serialization * create MessageEventBusDestinationEntity * db migrations, load destinations at startup * add delete destination endpoint * pnpm merge and circular import fix * delete destination fix * trigger log file shuffle after size reached * add environment variables for eventbus * reworking event messages * serialize to thread fix * some refactor and lint fixing * add emit to eventbus * cleanup and fix sending unsent * quicksave frontend trial * initial EventTree vue component * basic log streaming settings in vue * http request code merge * create destination settings modals * fix eventmessage options types * credentials are loaded * fix and clean up frontend code * move request code to axios * update lock file * merge fix * fix redis build * move destination interfaces into workflow pkg * revive sentry as destination * migration fixes and frontend cleanup * N8N-5777 / N8N-5789 N8N-5788 * N8N-5784 * N8N-5782 removed event levels * N8N-5790 sentry destination cleanup * N8N-5786 and refactoring * N8N-5809 and refactor/cleanup * UI fixes and anonymize renaming * N8N-5837 * N8N-5834 * fix no-items UI issues * remove card / settings label in modal * N8N-5842 fix * disable webhook auth for now and update ui * change sidebar to tabs * remove payload option * extend audit events with more user data * N8N-5853 and UI revert to sidebar * remove redis destination * N8N-5864 / N8N-5868 / N8N-5867 / N8N-5865 * ui and licensing fixes * add node events and info bubbles to frontend * ui wording changes * frontend tests * N8N-5896 and ee rename * improves backend tests * merge fix * fix backend test * make linter happy * remove unnecessary cfg / limit actions to owners * fix multiple sentry DSN and anon bug * eslint fix * more tests and fixes * merge fix * fix workflow audit events * remove 'n8n.workflow.execution.error' event * merge fix * lint fix * lint fix * review fixes * fix merge * prettier fixes * merge * review changes * use loggerproxy * remove catch from internal hook promises * fix tests * lint fix * include review PR changes * review changes * delete duplicate lines from a bad merge * decouple log-streaming UI options from public API * logstreaming -> log-streaming for consistency * do not make unnecessary api calls when log streaming is disabled * prevent sentryClient.close() from being called if init failed * fix the e2e test for log-streaming * review changes * cleanup * use `private` for one last private property * do not use node prefix package names.. just yet * remove unused import * fix the tests because there is a folder called `events`, tsc-alias is messing up all imports for native events module. https://github.com/justkey007/tsc-alias/issues/152 Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
committed by
GitHub
parent
0795cdb74c
commit
b67f803cbe
@@ -0,0 +1,221 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
import { isEventMessageOptions } from '../EventMessageClasses/AbstractEventMessage';
|
||||
import { UserSettings } from 'n8n-core';
|
||||
import path, { parse } from 'path';
|
||||
import { ModuleThread, spawn, Thread, Worker } from 'threads';
|
||||
import { MessageEventBusLogWriterWorker } from './MessageEventBusLogWriterWorker';
|
||||
import { createReadStream, existsSync } from 'fs';
|
||||
import readline from 'readline';
|
||||
import { jsonParse, LoggerProxy } from 'n8n-workflow';
|
||||
import remove from 'lodash.remove';
|
||||
import config from '@/config';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import type { EventMessageReturnMode } from '../MessageEventBus/MessageEventBus';
|
||||
import type { EventMessageTypes } from '../EventMessageClasses';
|
||||
import {
|
||||
EventMessageConfirm,
|
||||
EventMessageConfirmSource,
|
||||
isEventMessageConfirm,
|
||||
} from '../EventMessageClasses/EventMessageConfirm';
|
||||
import { once as eventOnce } from 'events';
|
||||
|
||||
interface MessageEventBusLogWriterOptions {
|
||||
syncFileAccess?: boolean;
|
||||
logBaseName?: string;
|
||||
logBasePath?: string;
|
||||
keepLogCount?: number;
|
||||
maxFileSizeInKB?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* MessageEventBusWriter for Files
|
||||
*/
|
||||
export class MessageEventBusLogWriter {
|
||||
private static instance: MessageEventBusLogWriter;
|
||||
|
||||
static options: Required<MessageEventBusLogWriterOptions>;
|
||||
|
||||
private worker: ModuleThread<MessageEventBusLogWriterWorker> | null;
|
||||
|
||||
/**
|
||||
* Instantiates the Writer and the corresponding worker thread.
|
||||
* To actually start logging, call startLogging() function on the instance.
|
||||
*
|
||||
* **Note** that starting to log will archive existing logs, so handle unsent events first before calling startLogging()
|
||||
*/
|
||||
static async getInstance(
|
||||
options?: MessageEventBusLogWriterOptions,
|
||||
): Promise<MessageEventBusLogWriter> {
|
||||
if (!MessageEventBusLogWriter.instance) {
|
||||
MessageEventBusLogWriter.instance = new MessageEventBusLogWriter();
|
||||
MessageEventBusLogWriter.options = {
|
||||
logBaseName: options?.logBaseName ?? config.getEnv('eventBus.logWriter.logBaseName'),
|
||||
logBasePath: options?.logBasePath ?? UserSettings.getUserN8nFolderPath(),
|
||||
syncFileAccess:
|
||||
options?.syncFileAccess ?? config.getEnv('eventBus.logWriter.syncFileAccess'),
|
||||
keepLogCount: options?.keepLogCount ?? config.getEnv('eventBus.logWriter.keepLogCount'),
|
||||
maxFileSizeInKB:
|
||||
options?.maxFileSizeInKB ?? config.getEnv('eventBus.logWriter.maxFileSizeInKB'),
|
||||
};
|
||||
await MessageEventBusLogWriter.instance.startThread();
|
||||
}
|
||||
return MessageEventBusLogWriter.instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* First archives existing log files one history level upwards,
|
||||
* then starts logging events into a fresh event log
|
||||
*/
|
||||
async startLogging() {
|
||||
await MessageEventBusLogWriter.instance.getThread()?.startLogging();
|
||||
}
|
||||
|
||||
/**
|
||||
* Pauses all logging. Events are still received by the worker, they just are not logged any more
|
||||
*/
|
||||
async pauseLogging() {
|
||||
await MessageEventBusLogWriter.instance.getThread()?.pauseLogging();
|
||||
}
|
||||
|
||||
private async startThread() {
|
||||
if (this.worker) {
|
||||
await this.close();
|
||||
}
|
||||
await MessageEventBusLogWriter.instance.spawnThread();
|
||||
await MessageEventBusLogWriter.instance
|
||||
.getThread()
|
||||
?.initialize(
|
||||
path.join(
|
||||
MessageEventBusLogWriter.options.logBasePath,
|
||||
MessageEventBusLogWriter.options.logBaseName,
|
||||
),
|
||||
MessageEventBusLogWriter.options.syncFileAccess,
|
||||
MessageEventBusLogWriter.options.keepLogCount,
|
||||
MessageEventBusLogWriter.options.maxFileSizeInKB,
|
||||
);
|
||||
}
|
||||
|
||||
private async spawnThread(): Promise<boolean> {
|
||||
this.worker = await spawn<MessageEventBusLogWriterWorker>(
|
||||
new Worker(`${parse(__filename).name}Worker`),
|
||||
);
|
||||
if (this.worker) {
|
||||
Thread.errors(this.worker).subscribe(async (error) => {
|
||||
LoggerProxy.error('Event Bus Log Writer thread error', error);
|
||||
await MessageEventBusLogWriter.instance.startThread();
|
||||
});
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
getThread(): ModuleThread<MessageEventBusLogWriterWorker> | undefined {
|
||||
if (this.worker) {
|
||||
return this.worker;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
if (this.worker) {
|
||||
await Thread.terminate(this.worker);
|
||||
this.worker = null;
|
||||
}
|
||||
}
|
||||
|
||||
async putMessage(msg: EventMessageTypes): Promise<void> {
|
||||
if (this.worker) {
|
||||
await this.worker.appendMessageToLog(msg.serialize());
|
||||
}
|
||||
}
|
||||
|
||||
async confirmMessageSent(msgId: string, source?: EventMessageConfirmSource): Promise<void> {
|
||||
if (this.worker) {
|
||||
await this.worker.confirmMessageSent(new EventMessageConfirm(msgId, source).serialize());
|
||||
}
|
||||
}
|
||||
|
||||
async getMessages(
|
||||
mode: EventMessageReturnMode = 'all',
|
||||
includePreviousLog = true,
|
||||
): Promise<EventMessageTypes[]> {
|
||||
const logFileName0 = await MessageEventBusLogWriter.instance.getThread()?.getLogFileName();
|
||||
const logFileName1 = includePreviousLog
|
||||
? await MessageEventBusLogWriter.instance.getThread()?.getLogFileName(1)
|
||||
: undefined;
|
||||
const results: {
|
||||
loggedMessages: EventMessageTypes[];
|
||||
sentMessages: EventMessageTypes[];
|
||||
} = {
|
||||
loggedMessages: [],
|
||||
sentMessages: [],
|
||||
};
|
||||
if (logFileName0) {
|
||||
await this.readLoggedMessagesFromFile(results, mode, logFileName0);
|
||||
}
|
||||
if (logFileName1) {
|
||||
await this.readLoggedMessagesFromFile(results, mode, logFileName1);
|
||||
}
|
||||
switch (mode) {
|
||||
case 'all':
|
||||
case 'unsent':
|
||||
return results.loggedMessages;
|
||||
case 'sent':
|
||||
return results.sentMessages;
|
||||
}
|
||||
}
|
||||
|
||||
async readLoggedMessagesFromFile(
|
||||
results: {
|
||||
loggedMessages: EventMessageTypes[];
|
||||
sentMessages: EventMessageTypes[];
|
||||
},
|
||||
mode: EventMessageReturnMode,
|
||||
logFileName: string,
|
||||
): Promise<{
|
||||
loggedMessages: EventMessageTypes[];
|
||||
sentMessages: EventMessageTypes[];
|
||||
}> {
|
||||
if (logFileName && existsSync(logFileName)) {
|
||||
try {
|
||||
const rl = readline.createInterface({
|
||||
input: createReadStream(logFileName),
|
||||
crlfDelay: Infinity,
|
||||
});
|
||||
rl.on('line', (line) => {
|
||||
try {
|
||||
const json = jsonParse(line);
|
||||
if (isEventMessageOptions(json) && json.__type !== undefined) {
|
||||
const msg = getEventMessageObjectByType(json);
|
||||
if (msg !== null) results.loggedMessages.push(msg);
|
||||
}
|
||||
if (isEventMessageConfirm(json) && mode !== 'all') {
|
||||
const removedMessage = remove(results.loggedMessages, (e) => e.id === json.confirm);
|
||||
if (mode === 'sent') {
|
||||
results.sentMessages.push(...removedMessage);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
LoggerProxy.error(
|
||||
`Error reading line messages from file: ${logFileName}, line: ${line}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
// wait for stream to finish before continue
|
||||
await eventOnce(rl, 'close');
|
||||
} catch {
|
||||
LoggerProxy.error(`Error reading logged messages from file: ${logFileName}`);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
async getMessagesSent(): Promise<EventMessageTypes[]> {
|
||||
return this.getMessages('sent');
|
||||
}
|
||||
|
||||
async getMessagesUnsent(): Promise<EventMessageTypes[]> {
|
||||
return this.getMessages('unsent');
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
import { appendFileSync, existsSync, rmSync, renameSync, openSync, closeSync } from 'fs';
|
||||
import { appendFile, stat } from 'fs/promises';
|
||||
import { expose, isWorkerRuntime } from 'threads/worker';
|
||||
|
||||
// -----------------------------------------
|
||||
// * This part runs in the Worker Thread ! *
|
||||
// -----------------------------------------
|
||||
|
||||
// all references to and imports from classes have been remove to keep memory usage low
|
||||
|
||||
let logFileBasePath = '';
|
||||
let loggingPaused = true;
|
||||
let syncFileAccess = false;
|
||||
let keepFiles = 10;
|
||||
let fileStatTimer: NodeJS.Timer;
|
||||
let maxLogFileSizeInKB = 102400;
|
||||
|
||||
function setLogFileBasePath(basePath: string) {
|
||||
logFileBasePath = basePath;
|
||||
}
|
||||
|
||||
function setUseSyncFileAccess(useSync: boolean) {
|
||||
syncFileAccess = useSync;
|
||||
}
|
||||
|
||||
function setMaxLogFileSizeInKB(maxSizeInKB: number) {
|
||||
maxLogFileSizeInKB = maxSizeInKB;
|
||||
}
|
||||
|
||||
function setKeepFiles(keepNumberOfFiles: number) {
|
||||
if (keepNumberOfFiles < 1) {
|
||||
keepNumberOfFiles = 1;
|
||||
}
|
||||
keepFiles = keepNumberOfFiles;
|
||||
}
|
||||
|
||||
function buildLogFileNameWithCounter(counter?: number): string {
|
||||
if (counter) {
|
||||
return `${logFileBasePath}-${counter}.log`;
|
||||
} else {
|
||||
return `${logFileBasePath}.log`;
|
||||
}
|
||||
}
|
||||
|
||||
function cleanAllLogs() {
|
||||
for (let i = 0; i <= keepFiles; i++) {
|
||||
if (existsSync(buildLogFileNameWithCounter(i))) {
|
||||
rmSync(buildLogFileNameWithCounter(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs synchronously and cycles through log files up to the max amount kept
|
||||
*/
|
||||
function renameAndCreateLogs() {
|
||||
if (existsSync(buildLogFileNameWithCounter(keepFiles))) {
|
||||
rmSync(buildLogFileNameWithCounter(keepFiles));
|
||||
}
|
||||
for (let i = keepFiles - 1; i >= 0; i--) {
|
||||
if (existsSync(buildLogFileNameWithCounter(i))) {
|
||||
renameSync(buildLogFileNameWithCounter(i), buildLogFileNameWithCounter(i + 1));
|
||||
}
|
||||
}
|
||||
const f = openSync(buildLogFileNameWithCounter(), 'a');
|
||||
closeSync(f);
|
||||
}
|
||||
|
||||
async function checkFileSize(path: string) {
|
||||
const fileStat = await stat(path);
|
||||
if (fileStat.size / 1024 > maxLogFileSizeInKB) {
|
||||
renameAndCreateLogs();
|
||||
}
|
||||
}
|
||||
|
||||
function appendMessageSync(msg: any) {
|
||||
if (loggingPaused) {
|
||||
return;
|
||||
}
|
||||
appendFileSync(buildLogFileNameWithCounter(), JSON.stringify(msg) + '\n');
|
||||
}
|
||||
|
||||
async function appendMessage(msg: any) {
|
||||
if (loggingPaused) {
|
||||
return;
|
||||
}
|
||||
await appendFile(buildLogFileNameWithCounter(), JSON.stringify(msg) + '\n');
|
||||
}
|
||||
|
||||
const messageEventBusLogWriterWorker = {
|
||||
async appendMessageToLog(msg: any) {
|
||||
if (syncFileAccess) {
|
||||
appendMessageSync(msg);
|
||||
} else {
|
||||
await appendMessage(msg);
|
||||
}
|
||||
},
|
||||
async confirmMessageSent(confirm: unknown) {
|
||||
if (syncFileAccess) {
|
||||
appendMessageSync(confirm);
|
||||
} else {
|
||||
await appendMessage(confirm);
|
||||
}
|
||||
},
|
||||
pauseLogging() {
|
||||
loggingPaused = true;
|
||||
clearInterval(fileStatTimer);
|
||||
},
|
||||
initialize(
|
||||
basePath: string,
|
||||
useSyncFileAccess = false,
|
||||
keepNumberOfFiles = 10,
|
||||
maxSizeInKB = 102400,
|
||||
) {
|
||||
setLogFileBasePath(basePath);
|
||||
setUseSyncFileAccess(useSyncFileAccess);
|
||||
setKeepFiles(keepNumberOfFiles);
|
||||
setMaxLogFileSizeInKB(maxSizeInKB);
|
||||
},
|
||||
startLogging() {
|
||||
if (logFileBasePath) {
|
||||
renameAndCreateLogs();
|
||||
loggingPaused = false;
|
||||
fileStatTimer = setInterval(async () => {
|
||||
await checkFileSize(buildLogFileNameWithCounter());
|
||||
}, 5000);
|
||||
}
|
||||
},
|
||||
getLogFileName(counter?: number) {
|
||||
if (logFileBasePath) {
|
||||
return buildLogFileNameWithCounter(counter);
|
||||
} else {
|
||||
return undefined;
|
||||
}
|
||||
},
|
||||
cleanLogs() {
|
||||
cleanAllLogs();
|
||||
},
|
||||
};
|
||||
if (isWorkerRuntime()) {
|
||||
// Register the serializer on the worker thread
|
||||
expose(messageEventBusLogWriterWorker);
|
||||
}
|
||||
export type MessageEventBusLogWriterWorker = typeof messageEventBusLogWriterWorker;
|
||||
Reference in New Issue
Block a user