refactor: Upsert workflow statistics to suppress unnecessary error messages (#5863)

This commit is contained in:
Omar Ajoue 2023-04-04 18:11:21 +02:00 committed by GitHub
parent bdcfcb2ef7
commit 2b06673b2e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 31 deletions

View file

@ -1,10 +1,79 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db'; import * as Db from '@/Db';
import { StatisticsNames } from '@db/entities/WorkflowStatistics'; import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm'; import { QueryFailedError } from 'typeorm';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import config from '@/config';
enum StatisticsUpsertResult {
insert = 'insert',
update = 'update',
failed = 'failed',
}
async function upsertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
): Promise<StatisticsUpsertResult> {
const dbType = config.getEnv('database.type');
const tablePrefix = config.getEnv('database.tablePrefix');
try {
if (dbType === 'sqlite') {
await Db.collections.WorkflowStatistics
.query(`INSERT INTO "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent")
VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
ON CONFLICT (workflowId, name) DO UPDATE SET
count = count + 1,
latestEvent = CURRENT_TIMESTAMP returning count
`);
// SQLite does not offer a reliable way to know whether or not an insert or update happened.
// We'll use a naive approach in this case. Query again after and it might cause us to miss the
// first production execution sometimes due to concurrency, but it's the only way.
const counter = await Db.collections.WorkflowStatistics.findOne({
where: {
name: eventName,
workflowId,
},
});
if (counter?.count === 1) {
return StatisticsUpsertResult.insert;
}
return StatisticsUpsertResult.update;
} else if (dbType === 'postgresdb') {
const queryResult = (await Db.collections.WorkflowStatistics
.query(`insert into "${tablePrefix}workflow_statistics" ("count", "name", "workflowId", "latestEvent")
values (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) on conflict ("name", "workflowId")
do update set "count" = "${tablePrefix}workflow_statistics"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP returning *;`)) as Array<{
count: number;
}>;
if (queryResult[0].count === 1) {
return StatisticsUpsertResult.insert;
}
return StatisticsUpsertResult.update;
} else {
const queryResult = (await Db.collections.WorkflowStatistics
.query(`insert into \`${tablePrefix}workflow_statistics\` (count,
latestEvent,
name,
workflowId)
values (1, NOW(), "${eventName}", "${workflowId}") ON DUPLICATE KEY UPDATE count = count + 1, latestEvent = NOW();`)) as {
affectedRows: number;
};
if (queryResult.affectedRows === 1) {
return StatisticsUpsertResult.insert;
}
// MySQL returns 2 affected rows on update
return StatisticsUpsertResult.update;
}
} catch (error) {
return StatisticsUpsertResult.failed;
}
}
export async function workflowExecutionCompleted( export async function workflowExecutionCompleted(
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
@ -27,36 +96,23 @@ export async function workflowExecutionCompleted(
const workflowId = workflowData.id; const workflowId = workflowData.id;
if (!workflowId) return; if (!workflowId) return;
// Try insertion and if it fails due to key conflicts then update the existing entry instead
try { try {
await Db.collections.WorkflowStatistics.insert({ const upsertResult = await upsertWorkflowStatistics(name, workflowId);
count: 1,
name,
workflowId,
latestEvent: new Date(),
});
// If we're here we can check if we're sending the first production success metric if (
if (name !== StatisticsNames.productionSuccess) return; name === StatisticsNames.productionSuccess &&
upsertResult === StatisticsUpsertResult.insert
// Get the owner of the workflow so we can send the metric ) {
const owner = await getWorkflowOwner(workflowId); const owner = await getWorkflowOwner(workflowId);
const metrics = { const metrics = {
user_id: owner.id, user_id: owner.id,
workflow_id: workflowId, workflow_id: workflowId,
}; };
// Send the metrics
// Send the metrics await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
if (!(error instanceof QueryFailedError)) {
throw error;
} }
} catch (error) {
await Db.collections.WorkflowStatistics.update( LoggerProxy.verbose('Unable to fire first workflow success telemetry event');
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
);
} }
} }

View file

@ -17,7 +17,11 @@ type WorkflowStatisticsRepository = Repository<WorkflowStatistics>;
jest.mock('@/Db', () => { jest.mock('@/Db', () => {
return { return {
collections: { collections: {
WorkflowStatistics: mock<WorkflowStatisticsRepository>(), WorkflowStatistics: mock<WorkflowStatisticsRepository>({
findOne: jest.fn(() => ({
count: 1,
})),
}),
}, },
}; };
}); });
@ -101,9 +105,9 @@ describe('Events', () => {
test('should not send metrics for updated entries', async () => { test('should not send metrics for updated entries', async () => {
// Call the function with a fail insert, ensure update is called *and* metrics aren't sent // Call the function with a fail insert, ensure update is called *and* metrics aren't sent
workflowStatisticsRepository.insert.mockImplementationOnce(() => { workflowStatisticsRepository.findOne.mockImplementationOnce(() => ({
throw new QueryFailedError('invalid insert', [], ''); count: 2,
}); }));
const workflow = { const workflow = {
id: '1', id: '1',
name: '', name: '',