refactor(core): Include AI events in log streaming relay (#10768)

This commit is contained in:
Iván Ovejero
2024-09-12 12:02:47 +02:00
committed by GitHub
parent 8240b2a142
commit c133a6ef89
15 changed files with 496 additions and 92 deletions

View File

@@ -1,18 +1,17 @@
import { VariablesService } from '@/environments/variables/variables.service.ee';
import { mockInstance } from '@test/mocking';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { getBase } from '@/workflow-execute-additional-data';
import Container from 'typedi';
import { CredentialsHelper } from '@/credentials-helper';
import { SecretsHelper } from '@/secrets-helpers';
import { EventService } from '@/events/event.service';
describe('WorkflowExecuteAdditionalData', () => {
const messageEventBus = mockInstance(MessageEventBus);
const variablesService = mockInstance(VariablesService);
variablesService.getAllCached.mockResolvedValue([]);
const credentialsHelper = mockInstance(CredentialsHelper);
const secretsHelper = mockInstance(SecretsHelper);
Container.set(MessageEventBus, messageEventBus);
const eventService = mockInstance(EventService);
Container.set(VariablesService, variablesService);
Container.set(CredentialsHelper, credentialsHelper);
Container.set(SecretsHelper, secretsHelper);
@@ -20,7 +19,7 @@ describe('WorkflowExecuteAdditionalData', () => {
test('logAiEvent should call MessageEventBus', async () => {
const additionalData = await getBase('user-id');
const eventName = 'n8n.ai.memory.get.messages';
const eventName = 'ai-messages-retrieved-from-memory';
const payload = {
msg: 'test message',
executionId: '123',
@@ -30,12 +29,9 @@ describe('WorkflowExecuteAdditionalData', () => {
nodeType: 'n8n-memory',
};
await additionalData.logAiEvent(eventName, payload);
additionalData.logAiEvent(eventName, payload);
expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledTimes(1);
expect(messageEventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName,
payload,
});
expect(eventService.emit).toHaveBeenCalledTimes(1);
expect(eventService.emit).toHaveBeenCalledWith(eventName, payload);
});
});

View File

@@ -1,5 +1,6 @@
import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message';
import type { EventNamesAiNodesType, JsonObject } from 'n8n-workflow';
import type { JsonObject } from 'n8n-workflow';
import type { EventNamesAiNodesType } from '.';
import { EventMessageTypeNames } from 'n8n-workflow';
import type { AbstractEventMessageOptions } from './abstract-event-message-options';
import type { AbstractEventPayload } from './abstract-event-payload';

View File

@@ -4,7 +4,25 @@ import type { EventMessageExecution } from './event-message-execution';
import type { EventMessageGeneric } from './event-message-generic';
import type { EventMessageNode } from './event-message-node';
import type { EventMessageWorkflow } from './event-message-workflow';
import { eventNamesAiNodes, type EventNamesAiNodesType } from 'n8n-workflow';
export const eventNamesAiNodes = [
'n8n.ai.memory.get.messages',
'n8n.ai.memory.added.message',
'n8n.ai.output.parser.parsed',
'n8n.ai.retriever.get.relevant.documents',
'n8n.ai.embeddings.embedded.document',
'n8n.ai.embeddings.embedded.query',
'n8n.ai.document.processed',
'n8n.ai.text.splitter.split',
'n8n.ai.tool.called',
'n8n.ai.vector.store.searched',
'n8n.ai.llm.generated',
'n8n.ai.llm.error',
'n8n.ai.vector.store.populated',
'n8n.ai.vector.store.updated',
] as const;
export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];
export const eventNamesWorkflow = [
'n8n.workflow.started',

View File

@@ -945,4 +945,258 @@ describe('LogStreamingEventRelay', () => {
});
});
});
describe('AI events', () => {
it('should log on `ai-messages-retrieved-from-memory` event', () => {
const payload: RelayEventMap['ai-messages-retrieved-from-memory'] = {
msg: 'Hello, world!',
executionId: 'exec789',
nodeName: 'Memory',
workflowId: 'wf123',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.memory',
};
eventService.emit('ai-messages-retrieved-from-memory', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.memory.get.messages',
payload,
});
});
it('should log on `ai-message-added-to-memory` event', () => {
const payload: RelayEventMap['ai-message-added-to-memory'] = {
msg: 'Test',
executionId: 'exec456',
nodeName: 'Memory',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.memory',
};
eventService.emit('ai-message-added-to-memory', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.memory.added.message',
payload,
});
});
it('should log on `ai-output-parsed` event', () => {
const payload: RelayEventMap['ai-output-parsed'] = {
msg: 'Test',
executionId: 'exec123',
nodeName: 'Output Parser',
workflowId: 'wf456',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.outputParser',
};
eventService.emit('ai-output-parsed', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.output.parser.parsed',
payload,
});
});
it('should log on `ai-documents-retrieved` event', () => {
const payload: RelayEventMap['ai-documents-retrieved'] = {
msg: 'Test',
executionId: 'exec789',
nodeName: 'Retriever',
workflowId: 'wf123',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.retriever',
};
eventService.emit('ai-documents-retrieved', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.retriever.get.relevant.documents',
payload,
});
});
it('should log on `ai-document-embedded` event', () => {
const payload: RelayEventMap['ai-document-embedded'] = {
msg: 'Test',
executionId: 'exec456',
nodeName: 'Embeddings',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.embeddings',
};
eventService.emit('ai-document-embedded', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.embeddings.embedded.document',
payload,
});
});
it('should log on `ai-query-embedded` event', () => {
const payload: RelayEventMap['ai-query-embedded'] = {
msg: 'Test',
executionId: 'exec123',
nodeName: 'Embeddings',
workflowId: 'wf456',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.embeddings',
};
eventService.emit('ai-query-embedded', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.embeddings.embedded.query',
payload,
});
});
it('should log on `ai-document-processed` event', () => {
const payload: RelayEventMap['ai-document-processed'] = {
msg: 'Test',
executionId: 'exec789',
nodeName: 'Embeddings',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.embeddings',
};
eventService.emit('ai-document-processed', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.document.processed',
payload,
});
});
it('should log on `ai-text-split` event', () => {
const payload: RelayEventMap['ai-text-split'] = {
msg: 'Test',
executionId: 'exec456',
nodeName: 'Text Splitter',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.textSplitter',
};
eventService.emit('ai-text-split', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.text.splitter.split',
payload,
});
});
it('should log on `ai-tool-called` event', () => {
const payload: RelayEventMap['ai-tool-called'] = {
msg: 'Test',
executionId: 'exec123',
nodeName: 'Tool',
workflowId: 'wf456',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.tool',
};
eventService.emit('ai-tool-called', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.tool.called',
payload,
});
});
it('should log on `ai-vector-store-searched` event', () => {
const payload: RelayEventMap['ai-vector-store-searched'] = {
msg: 'Test',
executionId: 'exec789',
nodeName: 'Vector Store',
workflowId: 'wf123',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.vectorStore',
};
eventService.emit('ai-vector-store-searched', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.vector.store.searched',
payload,
});
});
it('should log on `ai-llm-generated-output` event', () => {
const payload: RelayEventMap['ai-llm-generated-output'] = {
msg: 'Test',
executionId: 'exec456',
nodeName: 'OpenAI',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.openai',
};
eventService.emit('ai-llm-generated-output', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.llm.generated',
payload,
});
});
it('should log on `ai-llm-errored` event', () => {
const payload: RelayEventMap['ai-llm-errored'] = {
msg: 'Test',
executionId: 'exec789',
nodeName: 'OpenAI',
workflowId: 'wf123',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.openai',
};
eventService.emit('ai-llm-errored', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.llm.error',
payload,
});
});
it('should log on `ai-vector-store-populated` event', () => {
const payload: RelayEventMap['ai-vector-store-populated'] = {
msg: 'Test',
executionId: 'exec456',
nodeName: 'Vector Store',
workflowId: 'wf789',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.vectorStore',
};
eventService.emit('ai-vector-store-populated', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.vector.store.populated',
payload,
});
});
it('should log on `ai-vector-store-updated` event', () => {
const payload: RelayEventMap['ai-vector-store-updated'] = {
msg: 'Test',
executionId: 'exec789',
nodeName: 'Vector Store',
workflowId: 'wf123',
workflowName: 'My Workflow',
nodeType: 'n8n-nodes-base.vectorStore',
};
eventService.emit('ai-vector-store-updated', payload);
expect(eventBus.sendAiNodeEvent).toHaveBeenCalledWith({
eventName: 'n8n.ai.vector.store.updated',
payload,
});
});
});
});

View File

@@ -0,0 +1,38 @@
export type AiEventPayload = {
msg: string;
workflowName: string;
executionId: string;
nodeName: string;
workflowId?: string;
nodeType?: string;
};
export type AiEventMap = {
'ai-messages-retrieved-from-memory': AiEventPayload;
'ai-message-added-to-memory': AiEventPayload;
'ai-output-parsed': AiEventPayload;
'ai-documents-retrieved': AiEventPayload;
'ai-document-embedded': AiEventPayload;
'ai-query-embedded': AiEventPayload;
'ai-document-processed': AiEventPayload;
'ai-text-split': AiEventPayload;
'ai-tool-called': AiEventPayload;
'ai-vector-store-searched': AiEventPayload;
'ai-llm-generated-output': AiEventPayload;
'ai-llm-errored': AiEventPayload;
'ai-vector-store-populated': AiEventPayload;
'ai-vector-store-updated': AiEventPayload;
};

View File

@@ -2,8 +2,9 @@ import { Service } from 'typedi';
import { TypedEmitter } from '@/typed-emitter';
import type { RelayEventMap } from './relay-event-map';
import type { QueueMetricsEventMap } from './queue-metrics-event-map';
import type { AiEventMap } from './ai-event-map';
type EventMap = RelayEventMap & QueueMetricsEventMap;
type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap;
@Service()
export class EventService extends TypedEmitter<EventMap> {}

View File

@@ -46,6 +46,20 @@ export class LogStreamingEventRelay extends EventRelay {
'community-package-deleted': (event) => this.communityPackageDeleted(event),
'execution-throttled': (event) => this.executionThrottled(event),
'execution-started-during-bootup': (event) => this.executionStartedDuringBootup(event),
'ai-messages-retrieved-from-memory': (event) => this.aiMessagesRetrievedFromMemory(event),
'ai-message-added-to-memory': (event) => this.aiMessageAddedToMemory(event),
'ai-output-parsed': (event) => this.aiOutputParsed(event),
'ai-documents-retrieved': (event) => this.aiDocumentsRetrieved(event),
'ai-document-embedded': (event) => this.aiDocumentEmbedded(event),
'ai-query-embedded': (event) => this.aiQueryEmbedded(event),
'ai-document-processed': (event) => this.aiDocumentProcessed(event),
'ai-text-split': (event) => this.aiTextSplitIntoChunks(event),
'ai-tool-called': (event) => this.aiToolCalled(event),
'ai-vector-store-searched': (event) => this.aiVectorStoreSearched(event),
'ai-llm-generated-output': (event) => this.aiLlmGeneratedOutput(event),
'ai-llm-errored': (event) => this.aiLlmErrored(event),
'ai-vector-store-populated': (event) => this.aiVectorStorePopulated(event),
'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event),
});
}
@@ -387,4 +401,108 @@ export class LogStreamingEventRelay extends EventRelay {
}
// #endregion
// #region AI
private aiMessagesRetrievedFromMemory(
payload: RelayEventMap['ai-messages-retrieved-from-memory'],
) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.memory.get.messages',
payload,
});
}
private aiMessageAddedToMemory(payload: RelayEventMap['ai-message-added-to-memory']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.memory.added.message',
payload,
});
}
private aiOutputParsed(payload: RelayEventMap['ai-output-parsed']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.output.parser.parsed',
payload,
});
}
private aiDocumentsRetrieved(payload: RelayEventMap['ai-documents-retrieved']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.retriever.get.relevant.documents',
payload,
});
}
private aiDocumentEmbedded(payload: RelayEventMap['ai-document-embedded']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.embeddings.embedded.document',
payload,
});
}
private aiQueryEmbedded(payload: RelayEventMap['ai-query-embedded']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.embeddings.embedded.query',
payload,
});
}
private aiDocumentProcessed(payload: RelayEventMap['ai-document-processed']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.document.processed',
payload,
});
}
private aiTextSplitIntoChunks(payload: RelayEventMap['ai-text-split']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.text.splitter.split',
payload,
});
}
private aiToolCalled(payload: RelayEventMap['ai-tool-called']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.tool.called',
payload,
});
}
private aiVectorStoreSearched(payload: RelayEventMap['ai-vector-store-searched']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.vector.store.searched',
payload,
});
}
private aiLlmGeneratedOutput(payload: RelayEventMap['ai-llm-generated-output']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.llm.generated',
payload,
});
}
private aiLlmErrored(payload: RelayEventMap['ai-llm-errored']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.llm.error',
payload,
});
}
private aiVectorStorePopulated(payload: RelayEventMap['ai-vector-store-populated']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.vector.store.populated',
payload,
});
}
private aiVectorStoreUpdated(payload: RelayEventMap['ai-vector-store-updated']) {
void this.eventBus.sendAiNodeEvent({
eventName: 'n8n.ai.vector.store.updated',
payload,
});
}
// #endregion
}

View File

@@ -9,6 +9,7 @@ import type { IWorkflowDb } from '@/interfaces';
import type { ProjectRole } from '@/databases/entities/project-relation';
import type { GlobalRole } from '@/databases/entities/user';
import type { AuthProviderType } from '@/databases/entities/auth-identity';
import type { AiEventMap } from './ai-event-map';
export type UserLike = {
id: string;
@@ -470,4 +471,4 @@ export type RelayEventMap = {
};
// #endregion
};
} & AiEventMap;

View File

@@ -23,7 +23,6 @@ import type {
WorkflowExecuteMode,
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
ExecuteWorkflowOptions,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
@@ -69,7 +68,7 @@ import { WorkflowStaticDataService } from './workflows/workflow-static-data.serv
import { WorkflowRepository } from './databases/repositories/workflow.repository';
import { UrlService } from './services/url.service';
import { WorkflowExecutionService } from './workflows/workflow-execution.service';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import type { AiEventMap, AiEventPayload } from './events/ai-event-map';
import { EventService } from './events/event.service';
import { GlobalConfig } from '@n8n/config';
import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service';
@@ -969,6 +968,8 @@ export async function getBase(
const variables = await WorkflowHelpers.getVariables();
const eventService = Container.get(EventService);
return {
credentialsHelper: Container.get(CredentialsHelper),
executeWorkflow,
@@ -984,22 +985,8 @@ export async function getBase(
setExecutionStatus,
variables,
secretsHelpers: Container.get(SecretsHelper),
logAiEvent: async (
eventName: EventNamesAiNodesType,
payload: {
msg?: string | undefined;
executionId: string;
nodeName: string;
workflowId?: string | undefined;
workflowName: string;
nodeType?: string | undefined;
},
) => {
return await Container.get(MessageEventBus).sendAiNodeEvent({
eventName,
payload,
});
},
logAiEvent: (eventName: keyof AiEventMap, payload: AiEventPayload) =>
eventService.emit(eventName, payload),
};
}