diff --git a/packages/cli/src/events/WorkflowStatistics.ts b/packages/cli/src/events/WorkflowStatistics.ts index 006dc506d..21799fae6 100644 --- a/packages/cli/src/events/WorkflowStatistics.ts +++ b/packages/cli/src/events/WorkflowStatistics.ts @@ -19,21 +19,21 @@ async function upsertWorkflowStatistics( workflowId: string, ): Promise { const dbType = config.getEnv('database.type'); - const tablePrefix = config.getEnv('database.tablePrefix'); + const { tableName } = Db.collections.WorkflowStatistics.metadata; try { if (dbType === 'sqlite') { - await Db.collections.WorkflowStatistics - .query(`INSERT INTO "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent") - VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) - ON CONFLICT (workflowId, name) DO UPDATE SET - count = count + 1, - latestEvent = CURRENT_TIMESTAMP returning count - `); + await Db.collections.WorkflowStatistics.query( + `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") + VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) + ON CONFLICT (workflowId, name) + DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`, + ); // SQLite does not offer a reliable way to know whether or not an insert or update happened. // We'll use a naive approach in this case. Query again after and it might cause us to miss the // first production execution sometimes due to concurrency, but it's the only way. const counter = await Db.collections.WorkflowStatistics.findOne({ + select: ['count'], where: { name: eventName, workflowId, @@ -45,10 +45,13 @@ async function upsertWorkflowStatistics( } return StatisticsUpsertResult.update; } else if (dbType === 'postgresdb') { - const queryResult = (await Db.collections.WorkflowStatistics - .query(`insert into "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent") - values (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) on conflict ("name", "workflowId") - do update set "count" = "${tablePrefix}workflow_statistics"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP returning *;`)) as Array<{ + const queryResult = (await Db.collections.WorkflowStatistics.query( + `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") + VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) + ON CONFLICT ("name", "workflowId") + DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP + RETURNING *;`, + )) as Array<{ count: number; }>; if (queryResult[0].count === 1) { @@ -56,12 +59,12 @@ async function upsertWorkflowStatistics( } return StatisticsUpsertResult.update; } else { - const queryResult = (await Db.collections.WorkflowStatistics - .query(`insert into \`${tablePrefix}workflow_statistics\` (count, - latestEvent, - name, - workflowId) - values (1, NOW(), "${eventName}", "${workflowId}") ON DUPLICATE KEY UPDATE count = count + 1, latestEvent = NOW();`)) as { + const queryResult = (await Db.collections.WorkflowStatistics.query( + `INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent) + VALUES (1, "${eventName}", "${workflowId}", NOW()) + ON DUPLICATE KEY + UPDATE count = count + 1, latestEvent = NOW();`, + )) as { affectedRows: number; }; if (queryResult.affectedRows === 1) { @@ -71,7 +74,8 @@ async function upsertWorkflowStatistics( return StatisticsUpsertResult.update; } } catch (error) { - return StatisticsUpsertResult.failed; + if (error instanceof QueryFailedError) return StatisticsUpsertResult.failed; + throw error; } } diff --git a/packages/cli/test/unit/Events.test.ts b/packages/cli/test/unit/Events.test.ts index 80433466f..57792e96a 100644 --- a/packages/cli/test/unit/Events.test.ts +++ b/packages/cli/test/unit/Events.test.ts @@ -18,15 +18,14 @@ jest.mock('@/Db', () => { return { collections: { WorkflowStatistics: mock({ - findOne: jest.fn(() => ({ - count: 1, - })), + metadata: { tableName: 'workflow_statistics' }, }), }, }; }); describe('Events', () => { + const dbType = config.getEnv('database.type'); const fakeUser = Object.assign(new User(), { id: 'abcde-fghij' }); const internalHooks = mockInstance(InternalHooks); @@ -48,11 +47,28 @@ describe('Events', () => { }); beforeEach(() => { + if (dbType === 'sqlite') { + workflowStatisticsRepository.findOne.mockClear(); + } else { + workflowStatisticsRepository.query.mockClear(); + } + internalHooks.onFirstProductionWorkflowSuccess.mockClear(); internalHooks.onFirstWorkflowDataLoad.mockClear(); }); - afterEach(() => {}); + const mockDBCall = (count = 1) => { + if (dbType === 'sqlite') { + workflowStatisticsRepository.findOne.mockResolvedValueOnce( + mock({ count }), + ); + } else { + const result = dbType === 'postgresdb' ? [{ count }] : { affectedRows: count }; + workflowStatisticsRepository.query.mockImplementationOnce(async (query) => + query.startsWith('INSERT INTO') ? result : null, + ); + } + }; describe('workflowExecutionCompleted', () => { test('should create metrics for production successes', async () => { @@ -73,6 +89,8 @@ describe('Events', () => { mode: 'internal' as WorkflowExecuteMode, startedAt: new Date(), }; + mockDBCall(); + await workflowExecutionCompleted(workflow, runData); expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(1); expect(internalHooks.onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, { @@ -105,9 +123,6 @@ describe('Events', () => { test('should not send metrics for updated entries', async () => { // Call the function with a fail insert, ensure update is called *and* metrics aren't sent - workflowStatisticsRepository.findOne.mockImplementationOnce(() => ({ - count: 2, - })); const workflow = { id: '1', name: '', @@ -124,6 +139,7 @@ describe('Events', () => { mode: 'internal' as WorkflowExecuteMode, startedAt: new Date(), }; + mockDBCall(2); await workflowExecutionCompleted(workflow, runData); expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0); });