refactor: Telemetry updates (#3529)
* Init unit tests for telemetry * Update telemetry tests * Test Workflow execution errored event * Add new tracking logic in pulse * cleanup * interfaces * Add event_version for Workflow execution count event * add version_cli in all events * add user saved credentials event * update manual wf exec finished, fixes * improve typings, lint * add node_graph_string in User clicked execute workflow button event * add User set node operation or mode event * Add instance started event in FE * Add User clicked retry execution button event * add expression editor event * add input node type to add node event * add User stopped workflow execution wvent * add error message in saved credential event * update stop execution event * add execution preflight event * Remove instance started even tfrom FE, add session started to FE,BE * improve typing * remove node_graph as property from all events * move back from default export * move psl npm package to cli package * cr * update webhook node domain logic * fix is_valid for User saved credentials event * fix Expression Editor variable selector event * add caused_by_credential in preflight event * undo webhook_domain * change node_type to full type * add webhook_domain property in manual execution event (#3680) * add webhook_domain property in manual execution event * lint fix
This commit is contained in:
@@ -13,6 +13,7 @@ import {
|
||||
IRunExecutionData,
|
||||
ITaskData,
|
||||
ITelemetrySettings,
|
||||
ITelemetryTrackProperties,
|
||||
IWorkflowBase as IWorkflowBaseWorkflow,
|
||||
Workflow,
|
||||
WorkflowExecuteMode,
|
||||
@@ -667,3 +668,14 @@ export interface IWorkflowExecuteProcess {
|
||||
}
|
||||
|
||||
export type WhereClause = Record<string, { id: string }>;
|
||||
|
||||
// ----------------------------------
|
||||
// telemetry
|
||||
// ----------------------------------
|
||||
|
||||
export interface IExecutionTrackProperties extends ITelemetryTrackProperties {
|
||||
workflow_id: string;
|
||||
success: boolean;
|
||||
error_node_type?: string;
|
||||
is_manual: boolean;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
/* eslint-disable import/no-cycle */
|
||||
import { get as pslGet } from 'psl';
|
||||
import { BinaryDataManager } from 'n8n-core';
|
||||
import { IDataObject, INodeTypes, IRun, TelemetryHelpers } from 'n8n-workflow';
|
||||
import {
|
||||
INodesGraphResult,
|
||||
INodeTypes,
|
||||
IRun,
|
||||
ITelemetryTrackProperties,
|
||||
TelemetryHelpers,
|
||||
} from 'n8n-workflow';
|
||||
import { snakeCase } from 'change-case';
|
||||
import {
|
||||
IDiagnosticInfo,
|
||||
@@ -10,6 +17,7 @@ import {
|
||||
IWorkflowDb,
|
||||
} from '.';
|
||||
import { Telemetry } from './telemetry';
|
||||
import { IExecutionTrackProperties } from './Interfaces';
|
||||
|
||||
export class InternalHooksClass implements IInternalHooksClass {
|
||||
private versionCli: string;
|
||||
@@ -48,6 +56,10 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
]);
|
||||
}
|
||||
|
||||
async onFrontendSettingsAPI(sessionId?: string): Promise<void> {
|
||||
return this.telemetry.track('Session started', { session_id: sessionId });
|
||||
}
|
||||
|
||||
async onPersonalizationSurveySubmitted(
|
||||
userId: string,
|
||||
answers: Record<string, string>,
|
||||
@@ -73,7 +85,6 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
return this.telemetry.track('User created workflow', {
|
||||
user_id: userId,
|
||||
workflow_id: workflow.id,
|
||||
node_graph: nodeGraph,
|
||||
node_graph_string: JSON.stringify(nodeGraph),
|
||||
public_api: publicApi,
|
||||
});
|
||||
@@ -98,7 +109,6 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
return this.telemetry.track('User saved workflow', {
|
||||
user_id: userId,
|
||||
workflow_id: workflow.id,
|
||||
node_graph: nodeGraph,
|
||||
node_graph_string: JSON.stringify(nodeGraph),
|
||||
notes_count_overlapping: overlappingCount,
|
||||
notes_count_non_overlapping: notesCount - overlappingCount,
|
||||
@@ -115,10 +125,16 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
userId?: string,
|
||||
): Promise<void> {
|
||||
const promises = [Promise.resolve()];
|
||||
const properties: IDataObject = {
|
||||
workflow_id: workflow.id,
|
||||
|
||||
if (!workflow.id) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const properties: IExecutionTrackProperties = {
|
||||
workflow_id: workflow.id.toString(),
|
||||
is_manual: false,
|
||||
version_cli: this.versionCli,
|
||||
success: false,
|
||||
};
|
||||
|
||||
if (userId) {
|
||||
@@ -130,7 +146,7 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
properties.success = !!runData.finished;
|
||||
properties.is_manual = runData.mode === 'manual';
|
||||
|
||||
let nodeGraphResult;
|
||||
let nodeGraphResult: INodesGraphResult | null = null;
|
||||
|
||||
if (!properties.success && runData?.data.resultData.error) {
|
||||
properties.error_message = runData?.data.resultData.error.message;
|
||||
@@ -165,22 +181,19 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
|
||||
}
|
||||
|
||||
const manualExecEventProperties = {
|
||||
workflow_id: workflow.id,
|
||||
const manualExecEventProperties: ITelemetryTrackProperties = {
|
||||
workflow_id: workflow.id.toString(),
|
||||
status: properties.success ? 'success' : 'failed',
|
||||
error_message: properties.error_message,
|
||||
error_message: properties.error_message as string,
|
||||
error_node_type: properties.error_node_type,
|
||||
node_graph: properties.node_graph,
|
||||
node_graph_string: properties.node_graph_string,
|
||||
error_node_id: properties.error_node_id,
|
||||
node_graph_string: properties.node_graph_string as string,
|
||||
error_node_id: properties.error_node_id as string,
|
||||
webhook_domain: null,
|
||||
};
|
||||
|
||||
if (!manualExecEventProperties.node_graph) {
|
||||
if (!manualExecEventProperties.node_graph_string) {
|
||||
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
|
||||
manualExecEventProperties.node_graph = nodeGraphResult.nodeGraph;
|
||||
manualExecEventProperties.node_graph_string = JSON.stringify(
|
||||
manualExecEventProperties.node_graph,
|
||||
);
|
||||
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
|
||||
}
|
||||
|
||||
if (runData.data.startData?.destinationNode) {
|
||||
@@ -195,6 +208,16 @@ export class InternalHooksClass implements IInternalHooksClass {
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
nodeGraphResult.webhookNodeNames.forEach((name: string) => {
|
||||
const execJson = runData.data.resultData.runData[name]?.[0]?.data?.main?.[0]?.[0]
|
||||
?.json as { headers?: { origin?: string } };
|
||||
if (execJson?.headers?.origin && execJson.headers.origin !== '') {
|
||||
manualExecEventProperties.webhook_domain = pslGet(
|
||||
execJson.headers.origin.replace(/^https?:\/\//, ''),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
promises.push(
|
||||
this.telemetry.track('Manual workflow exec finished', manualExecEventProperties),
|
||||
);
|
||||
|
||||
@@ -2856,6 +2856,10 @@ class App {
|
||||
`/${this.restEndpoint}/settings`,
|
||||
ResponseHelper.send(
|
||||
async (req: express.Request, res: express.Response): Promise<IN8nUISettings> => {
|
||||
void InternalHooksManager.getInstance().onFrontendSettingsAPI(
|
||||
req.headers.sessionid as string,
|
||||
);
|
||||
|
||||
return this.getSettingsForFrontend();
|
||||
},
|
||||
),
|
||||
|
||||
@@ -2,37 +2,25 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
import TelemetryClient from '@rudderstack/rudder-sdk-node';
|
||||
import { IDataObject, LoggerProxy } from 'n8n-workflow';
|
||||
import { ITelemetryTrackProperties, LoggerProxy } from 'n8n-workflow';
|
||||
import * as config from '../../config';
|
||||
import { IExecutionTrackProperties } from '../Interfaces';
|
||||
import { getLogger } from '../Logger';
|
||||
|
||||
type CountBufferItemKey =
|
||||
| 'manual_success_count'
|
||||
| 'manual_error_count'
|
||||
| 'prod_success_count'
|
||||
| 'prod_error_count';
|
||||
type ExecutionTrackDataKey = 'manual_error' | 'manual_success' | 'prod_error' | 'prod_success';
|
||||
|
||||
type FirstExecutionItemKey =
|
||||
| 'first_manual_success'
|
||||
| 'first_manual_error'
|
||||
| 'first_prod_success'
|
||||
| 'first_prod_error';
|
||||
|
||||
type IExecutionCountsBufferItem = {
|
||||
[key in CountBufferItemKey]: number;
|
||||
};
|
||||
|
||||
interface IExecutionCountsBuffer {
|
||||
[workflowId: string]: IExecutionCountsBufferItem;
|
||||
interface IExecutionTrackData {
|
||||
count: number;
|
||||
first: Date;
|
||||
}
|
||||
|
||||
type IFirstExecutions = {
|
||||
[key in FirstExecutionItemKey]: Date | undefined;
|
||||
};
|
||||
|
||||
interface IExecutionsBuffer {
|
||||
counts: IExecutionCountsBuffer;
|
||||
firstExecutions: IFirstExecutions;
|
||||
[workflowId: string]: {
|
||||
manual_error?: IExecutionTrackData;
|
||||
manual_success?: IExecutionTrackData;
|
||||
prod_error?: IExecutionTrackData;
|
||||
prod_success?: IExecutionTrackData;
|
||||
};
|
||||
}
|
||||
|
||||
export class Telemetry {
|
||||
@@ -44,15 +32,7 @@ export class Telemetry {
|
||||
|
||||
private pulseIntervalReference: NodeJS.Timeout;
|
||||
|
||||
private executionCountsBuffer: IExecutionsBuffer = {
|
||||
counts: {},
|
||||
firstExecutions: {
|
||||
first_manual_error: undefined,
|
||||
first_manual_success: undefined,
|
||||
first_prod_error: undefined,
|
||||
first_prod_success: undefined,
|
||||
},
|
||||
};
|
||||
private executionCountsBuffer: IExecutionsBuffer = {};
|
||||
|
||||
constructor(instanceId: string, versionCli: string) {
|
||||
this.instanceId = instanceId;
|
||||
@@ -71,85 +51,70 @@ export class Telemetry {
|
||||
return;
|
||||
}
|
||||
|
||||
this.client = new TelemetryClient(key, url, { logLevel });
|
||||
this.client = this.createTelemetryClient(key, url, logLevel);
|
||||
|
||||
this.pulseIntervalReference = setInterval(async () => {
|
||||
void this.pulse();
|
||||
}, 6 * 60 * 60 * 1000); // every 6 hours
|
||||
this.startPulse();
|
||||
}
|
||||
}
|
||||
|
||||
private createTelemetryClient(
|
||||
key: string,
|
||||
url: string,
|
||||
logLevel: string,
|
||||
): TelemetryClient | undefined {
|
||||
return new TelemetryClient(key, url, { logLevel });
|
||||
}
|
||||
|
||||
private startPulse() {
|
||||
this.pulseIntervalReference = setInterval(async () => {
|
||||
void this.pulse();
|
||||
}, 6 * 60 * 60 * 1000); // every 6 hours
|
||||
}
|
||||
|
||||
private async pulse(): Promise<unknown> {
|
||||
if (!this.client) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const allPromises = Object.keys(this.executionCountsBuffer.counts).map(async (workflowId) => {
|
||||
const allPromises = Object.keys(this.executionCountsBuffer).map(async (workflowId) => {
|
||||
const promise = this.track('Workflow execution count', {
|
||||
version_cli: this.versionCli,
|
||||
event_version: '2',
|
||||
workflow_id: workflowId,
|
||||
...this.executionCountsBuffer.counts[workflowId],
|
||||
...this.executionCountsBuffer.firstExecutions,
|
||||
...this.executionCountsBuffer[workflowId],
|
||||
});
|
||||
|
||||
this.executionCountsBuffer.counts[workflowId].manual_error_count = 0;
|
||||
this.executionCountsBuffer.counts[workflowId].manual_success_count = 0;
|
||||
this.executionCountsBuffer.counts[workflowId].prod_error_count = 0;
|
||||
this.executionCountsBuffer.counts[workflowId].prod_success_count = 0;
|
||||
|
||||
return promise;
|
||||
});
|
||||
|
||||
allPromises.push(this.track('pulse', { version_cli: this.versionCli }));
|
||||
this.executionCountsBuffer = {};
|
||||
allPromises.push(this.track('pulse'));
|
||||
return Promise.all(allPromises);
|
||||
}
|
||||
|
||||
async trackWorkflowExecution(properties: IDataObject): Promise<void> {
|
||||
async trackWorkflowExecution(properties: IExecutionTrackProperties): Promise<void> {
|
||||
if (this.client) {
|
||||
const workflowId = properties.workflow_id as string;
|
||||
this.executionCountsBuffer.counts[workflowId] = this.executionCountsBuffer.counts[
|
||||
workflowId
|
||||
] ?? {
|
||||
manual_error_count: 0,
|
||||
manual_success_count: 0,
|
||||
prod_error_count: 0,
|
||||
prod_success_count: 0,
|
||||
};
|
||||
const execTime = new Date();
|
||||
const workflowId = properties.workflow_id;
|
||||
|
||||
let countKey: CountBufferItemKey;
|
||||
let firstExecKey: FirstExecutionItemKey;
|
||||
this.executionCountsBuffer[workflowId] = this.executionCountsBuffer[workflowId] ?? {};
|
||||
|
||||
if (
|
||||
properties.success === false &&
|
||||
properties.error_node_type &&
|
||||
(properties.error_node_type as string).startsWith('n8n-nodes-base')
|
||||
) {
|
||||
// errored exec
|
||||
void this.track('Workflow execution errored', properties);
|
||||
const key: ExecutionTrackDataKey = `${properties.is_manual ? 'manual' : 'prod'}_${
|
||||
properties.success ? 'success' : 'error'
|
||||
}`;
|
||||
|
||||
if (properties.is_manual) {
|
||||
firstExecKey = 'first_manual_error';
|
||||
countKey = 'manual_error_count';
|
||||
} else {
|
||||
firstExecKey = 'first_prod_error';
|
||||
countKey = 'prod_error_count';
|
||||
}
|
||||
} else if (properties.is_manual) {
|
||||
countKey = 'manual_success_count';
|
||||
firstExecKey = 'first_manual_success';
|
||||
if (!this.executionCountsBuffer[workflowId][key]) {
|
||||
this.executionCountsBuffer[workflowId][key] = {
|
||||
count: 1,
|
||||
first: execTime,
|
||||
};
|
||||
} else {
|
||||
countKey = 'prod_success_count';
|
||||
firstExecKey = 'first_prod_success';
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
this.executionCountsBuffer[workflowId][key]!.count++;
|
||||
}
|
||||
|
||||
if (
|
||||
!this.executionCountsBuffer.firstExecutions[firstExecKey] &&
|
||||
this.executionCountsBuffer.counts[workflowId][countKey] === 0
|
||||
) {
|
||||
this.executionCountsBuffer.firstExecutions[firstExecKey] = new Date();
|
||||
if (!properties.success && properties.error_node_type?.startsWith('n8n-nodes-base')) {
|
||||
void this.track('Workflow execution errored', properties);
|
||||
}
|
||||
|
||||
this.executionCountsBuffer.counts[workflowId][countKey]++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,7 +130,9 @@ export class Telemetry {
|
||||
});
|
||||
}
|
||||
|
||||
async identify(traits?: IDataObject): Promise<void> {
|
||||
async identify(traits?: {
|
||||
[key: string]: string | number | boolean | object | undefined | null;
|
||||
}): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
if (this.client) {
|
||||
this.client.identify(
|
||||
@@ -185,20 +152,22 @@ export class Telemetry {
|
||||
});
|
||||
}
|
||||
|
||||
async track(
|
||||
eventName: string,
|
||||
properties: { [key: string]: unknown; user_id?: string } = {},
|
||||
): Promise<void> {
|
||||
async track(eventName: string, properties: ITelemetryTrackProperties = {}): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
if (this.client) {
|
||||
const { user_id } = properties;
|
||||
Object.assign(properties, { instance_id: this.instanceId });
|
||||
const updatedProperties: ITelemetryTrackProperties = {
|
||||
...properties,
|
||||
instance_id: this.instanceId,
|
||||
version_cli: this.versionCli,
|
||||
};
|
||||
|
||||
this.client.track(
|
||||
{
|
||||
userId: `${this.instanceId}${user_id ? `#${user_id}` : ''}`,
|
||||
anonymousId: '000000000000',
|
||||
event: eventName,
|
||||
properties,
|
||||
properties: updatedProperties,
|
||||
},
|
||||
resolve,
|
||||
);
|
||||
@@ -207,4 +176,10 @@ export class Telemetry {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// test helpers
|
||||
|
||||
getCountsBuffer(): IExecutionsBuffer {
|
||||
return this.executionCountsBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user