fix(core): Fixes event msg confirmations if no subscribers present (#5118)

* adds ExecutionEvents view modal to ExecutionList

* fix time rendering and remove wf column

* checks for unfinished executions and fails them

* prevent re-setting stoppedAt for execution

* removing UI changes but keeping eventbus fixes

* remove comment
This commit is contained in:
Michael Auerswald
2023-01-11 14:09:09 +01:00
committed by GitHub
parent 044b153275
commit 62d06b1e6e
10 changed files with 272 additions and 70 deletions

View File

@@ -24,7 +24,7 @@ import {
eventMessageGenericDestinationTestEvent,
} from '../EventMessageClasses/EventMessageGeneric';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus;
@@ -89,14 +89,20 @@ class MessageEventBus extends EventEmitter {
// - cycle event logs and start the logging to a fresh file
// - retry sending events
LoggerProxy.debug('Checking for unsent event messages');
const unsentMessages = await this.getEventsUnsent();
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${
(await this.logWriter?.getThread()?.getLogFileName()) ?? 'unknown filename'
} `,
);
await this.logWriter?.startLogging();
await this.send(unsentMessages);
await this.send(unsentAndUnfinished.unsentMessages);
if (unsentAndUnfinished.unfinishedExecutions.size > 0) {
for (const executionId of unsentAndUnfinished.unfinishedExecutions) {
LoggerProxy.debug(`Found unfinished execution ${executionId} in event log(s)`);
}
}
// if configured, run this test every n ms
if (config.getEnv('eventBus.checkUnsentInterval') > 0) {
@@ -190,6 +196,15 @@ class MessageEventBus extends EventEmitter {
await this.logWriter?.confirmMessageSent(msg.id, source);
}
private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean {
for (const destinationName of Object.keys(this.destinations)) {
if (this.destinations[destinationName].hasSubscribedToEvent(msg)) {
return true;
}
}
return false;
}
private async emitMessage(msg: EventMessageTypes) {
// generic emit for external modules to capture events
// this is for internal use ONLY and not for use with custom destinations!
@@ -198,7 +213,11 @@ class MessageEventBus extends EventEmitter {
LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`);
// if there are no set up destinations, immediately mark the event as sent
if (!isLogStreamingEnabled() || Object.keys(this.destinations).length === 0) {
if (
!isLogStreamingEnabled() ||
Object.keys(this.destinations).length === 0 ||
!this.hasAnyDestinationSubscribedToEvent(msg)
) {
await this.confirmSent(msg, { id: '0', name: 'eventBus' });
} else {
for (const destinationName of Object.keys(this.destinations)) {
@@ -207,32 +226,50 @@ class MessageEventBus extends EventEmitter {
}
}
async getEvents(mode: EventMessageReturnMode = 'all'): Promise<EventMessageTypes[]> {
let queryResult: EventMessageTypes[];
switch (mode) {
case 'all':
queryResult = await this.logWriter?.getMessages();
break;
case 'sent':
queryResult = await this.logWriter?.getMessagesSent();
break;
case 'unsent':
queryResult = await this.logWriter?.getMessagesUnsent();
}
async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll();
const filtered = uniqby(queryResult, 'id');
return filtered;
}
async getEventsSent(): Promise<EventMessageTypes[]> {
const sentMessages = await this.getEvents('sent');
return sentMessages;
const queryResult = await this.logWriter?.getMessagesSent();
const filtered = uniqby(queryResult, 'id');
return filtered;
}
async getEventsUnsent(): Promise<EventMessageTypes[]> {
const unSentMessages = await this.getEvents('unsent');
return unSentMessages;
const queryResult = await this.logWriter?.getMessagesUnsent();
const filtered = uniqby(queryResult, 'id');
return filtered;
}
async getUnfinishedExecutions(): Promise<Set<string>> {
const queryResult = await this.logWriter?.getUnfinishedExecutions();
return queryResult;
}
async getUnsentAndUnfinishedExecutions(): Promise<{
unsentMessages: EventMessageTypes[];
unfinishedExecutions: Set<string>;
}> {
const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions();
return queryResult;
}
/**
* This will pull all events for a given execution id from the event log files. Note that this can be a very expensive operation, depending on the number of events and the size of the log files.
* @param executionId id to look for
* @param logHistory defaults to 1, which means it will look at the current log file AND the previous one.
* @returns Array of EventMessageTypes
*/
async getEventsByExecutionId(
executionId: string,
logHistory?: number,
): Promise<EventMessageTypes[]> {
const result = await this.logWriter?.getMessagesByExecutionId(executionId, logHistory);
return result;
}
/**
* Convenience Methods
*/