fix(core): Fixes issue with workflow lastUpdated field (#5015)

Fixed issue causing workflow updated field to be affected by statistics data
This commit is contained in:
freya 2023-01-05 12:16:40 +00:00 committed by GitHub
parent 7954025eb2
commit 59004fe7bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 263 additions and 28 deletions

View file

@ -152,6 +152,7 @@ import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBu
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import { corsMiddleware } from './middlewares/cors';
import { initEvents } from './events';
import { AbstractServer } from './AbstractServer';
const exec = promisify(callbackExec);
@ -1448,6 +1449,9 @@ export async function start(): Promise<void> {
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
};
// Set up event handling
initEvents();
const workflow = await Db.collections.Workflow!.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },

View file

@ -169,15 +169,17 @@ workflowStatsController.get(
// Get flag
const workflowId = req.params.id;
// Get the corresponding workflow
const workflow = await Db.collections.Workflow.findOne(workflowId);
// It will be valid if we reach this point, this is just for TS
if (!workflow) {
return { dataLoaded: false };
}
// Get the flag
const stats = await Db.collections.WorkflowStatistics.findOne({
select: ['latestEvent'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});
const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: workflow.dataLoaded,
dataLoaded: stats ? true : false,
};
return data;

View file

@ -21,6 +21,7 @@ import { getLogger } from '@/Logger';
import config from '@/config';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';
export class Execute extends Command {
static description = '\nExecutes a given workflow';
@ -47,6 +48,9 @@ export class Execute extends Command {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
// Add event handlers
initEvents();
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Execute);

View file

@ -34,6 +34,7 @@ import config from '@/config';
import { User } from '@db/entities/User';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';
const re = /\d+/;
@ -197,6 +198,9 @@ export class ExecuteBatch extends Command {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExecuteBatch);
// Add event handlers
initEvents();
ExecuteBatch.debug = flags.debug;
ExecuteBatch.concurrency = flags.concurrency || 1;

View file

@ -86,9 +86,6 @@ export class WorkflowEntity extends AbstractEntity implements IWorkflowDb {
@JoinColumn({ referencedColumnName: 'workflow' })
statistics: WorkflowStatistics[];
@Column({ default: false })
dataLoaded: boolean;
@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,

View file

@ -8,6 +8,7 @@ export enum StatisticsNames {
productionError = 'production_error',
manualSuccess = 'manual_success',
manualError = 'manual_error',
dataLoaded = 'data_loaded',
}
@Entity()

View file

@ -0,0 +1,66 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
export class RemoveWorkflowDataLoadedFlag1671726148420 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148420';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');
// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM ${tablePrefix}workflow_entity
`);
workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);
return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN dataLoaded`,
);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN dataLoaded BOOLEAN DEFAULT false`,
);
// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});
await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}

View file

@ -26,6 +26,7 @@ import { CreateCredentialUsageTable1665484192213 } from './1665484192213-CreateC
import { RemoveCredentialUsageTable1665754637026 } from './1665754637026-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707125 } from './1669739707125-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906994 } from './1669823906994-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148420 } from './1671726148420-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
export const mysqlMigrations = [
@ -57,5 +58,6 @@ export const mysqlMigrations = [
AddWorkflowVersionIdColumn1669739707125,
WorkflowStatistics1664196174002,
AddTriggerCountColumn1669823906994,
RemoveWorkflowDataLoadedFlag1671726148420,
MessageEventBusDestinations1671535397530,
];

View file

@ -0,0 +1,64 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
export class RemoveWorkflowDataLoadedFlag1671726148421 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148421';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();
// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, "dataLoaded"
FROM ${tablePrefix}workflow_entity
`);
workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics ("workflowId", name, count, "latestEvent") VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);
return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN "dataLoaded"`);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner) {
const tablePrefix = getTablePrefix();
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);
// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT "workflowId"
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET "dataLoaded" = true
WHERE id = '${workflowId}'`);
});
await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}

View file

@ -24,6 +24,7 @@ import { CreateCredentialUsageTable1665484192212 } from './1665484192212-CreateC
import { RemoveCredentialUsageTable1665754637025 } from './1665754637025-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707126 } from './1669739707126-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906995 } from './1669823906995-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148421 } from './1671726148421-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
export const postgresMigrations = [
@ -53,5 +54,6 @@ export const postgresMigrations = [
AddWorkflowVersionIdColumn1669739707126,
WorkflowStatistics1664196174001,
AddTriggerCountColumn1669823906995,
RemoveWorkflowDataLoadedFlag1671726148421,
MessageEventBusDestinations1671535397530,
];

View file

@ -0,0 +1,67 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { v4 as uuidv4 } from 'uuid';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
export class RemoveWorkflowDataLoadedFlag1671726148419 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148419';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');
// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM "${tablePrefix}workflow_entity"
`);
workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO "${tablePrefix}workflow_statistics" (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);
return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` DROP COLUMN "dataLoaded"`,
);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);
// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM "${tablePrefix}workflow_statistics"
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE "${tablePrefix}workflow_entity"
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});
await queryRunner.query(
`DELETE FROM "${tablePrefix}workflow_statistics" WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}

View file

@ -23,6 +23,7 @@ import { CreateCredentialUsageTable1665484192211 } from './1665484192211-CreateC
import { RemoveCredentialUsageTable1665754637024 } from './1665754637024-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707124 } from './1669739707124-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906993 } from './1669823906993-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148419 } from './1671726148419-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
const sqliteMigrations = [
@ -51,6 +52,7 @@ const sqliteMigrations = [
AddWorkflowVersionIdColumn1669739707124,
AddTriggerCountColumn1669823906993,
WorkflowStatistics1664196174000,
RemoveWorkflowDataLoadedFlag1671726148419,
MessageEventBusDestinations1671535397530,
];

View file

@ -1,8 +1,9 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';
export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
@ -47,7 +48,10 @@ export async function workflowExecutionCompleted(
// Send the metrics
await InternalHooksManager.getInstance().onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
// Do we just assume it's a conflict error? If there is any other sort of error in the DB it should trigger here too
if (!(error instanceof QueryFailedError)) {
throw error;
}
await Db.collections.WorkflowStatistics.update(
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
@ -56,16 +60,24 @@ export async function workflowExecutionCompleted(
}
export async function nodeFetchedData(workflowId: string, node: INode): Promise<void> {
// Update only if necessary
const response = await Db.collections.Workflow.update(
{ id: workflowId, dataLoaded: false },
{ dataLoaded: true },
);
// Try to insert the data loaded statistic
try {
await Db.collections.WorkflowStatistics.insert({
workflowId,
name: StatisticsNames.dataLoaded,
count: 1,
latestEvent: new Date(),
});
} catch (error) {
// if it's a duplicate key error then that's fine, otherwise throw the error
if (!(error instanceof QueryFailedError)) {
throw error;
}
// If it is a query failed error, we return
return;
}
// If response.affected is 1 then we know this was the first time data was loaded into the workflow; do posthog event here
if (!response.affected) return;
// Compile the metrics
// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,

View file

@ -1,7 +1,12 @@
import { eventEmitter } from 'n8n-core';
import { nodeFetchedData, workflowExecutionCompleted } from './WorkflowStatistics';
// Check for undefined as during testing these functions end up undefined for some reason
if (nodeFetchedData) eventEmitter.on(eventEmitter.types.nodeFetchedData, nodeFetchedData);
if (workflowExecutionCompleted)
eventEmitter.on(eventEmitter.types.workflowExecutionCompleted, workflowExecutionCompleted);
export function initEvents() {
// Check for undefined as during testing these functions end up undefined for some reason
if (nodeFetchedData) {
eventEmitter.on(eventEmitter.types.nodeFetchedData, nodeFetchedData);
}
if (workflowExecutionCompleted) {
eventEmitter.on(eventEmitter.types.workflowExecutionCompleted, workflowExecutionCompleted);
}
}

View file

@ -25,8 +25,6 @@ import * as WebhookServer from './WebhookServer';
import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData';
import * as WorkflowHelpers from './WorkflowHelpers';
import './events';
export {
ActiveExecutions,
ActiveWorkflowRunner,

View file

@ -3,6 +3,8 @@ import { InternalHooksManager } from '@/InternalHooksManager';
import { nodeFetchedData, workflowExecutionCompleted } from '@/events/WorkflowStatistics';
import { LoggerProxy, WorkflowExecuteMode } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
import { QueryFailedError } from 'typeorm';
const FAKE_USER_ID = 'abcde-fghij';
@ -27,8 +29,11 @@ jest.mock('@/Db', () => {
}),
},
WorkflowStatistics: {
// Have made a tech debt ticket to refactor this test suite for later
insert: jest.fn(({ count, name, workflowId }) => {
if (workflowId === '-1') throw new Error('test error');
if (workflowId === '-1') throw new QueryFailedError('test error', [], '');
else if (name === StatisticsNames.dataLoaded && workflowId === '2')
throw new QueryFailedError('test error 2', [], '');
return null;
}),
update: jest.fn((...args) => {}),