feat(core): Workflow Execution Statistics (#4200)

Add recording and reporting of workflow execution statistics
This commit is contained in:
freya 2022-12-06 14:55:40 +00:00 committed by GitHub
parent b71295e4de
commit 1722c6b0c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 908 additions and 66 deletions

View file

@ -165,7 +165,7 @@
"passport-jwt": "^4.0.0",
"pg": "^8.3.0",
"picocolors": "^1.0.0",
"posthog-node": "^1.3.0",
"posthog-node": "^2.2.2",
"prom-client": "^13.1.0",
"psl": "^1.8.0",
"replacestream": "^4.0.3",

View file

@ -178,6 +178,7 @@ export async function init(
collections.Settings = linkRepository(entities.Settings);
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
isInitialized = true;

View file

@ -39,6 +39,7 @@ import type { SharedWorkflow } from '@db/entities/SharedWorkflow';
import type { TagEntity } from '@db/entities/TagEntity';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
export interface IActivationError {
time: number;
@ -79,6 +80,7 @@ export interface IDatabaseCollections {
Settings: Repository<Settings>;
InstalledPackages: Repository<InstalledPackages>;
InstalledNodes: Repository<InstalledNodes>;
WorkflowStatistics: Repository<WorkflowStatistics>;
}
export interface IWebhookDb {
@ -691,6 +693,24 @@ export interface IWorkflowExecuteProcess {
workflowExecute: WorkflowExecute;
}
export interface IWorkflowStatisticsCounts {
productionSuccess: number;
productionError: number;
manualSuccess: number;
manualError: number;
}
export interface IWorkflowStatisticsDataLoaded {
dataLoaded: boolean;
}
export interface IWorkflowStatisticsTimestamps {
productionSuccess: Date | null;
productionError: Date | null;
manualSuccess: Date | null;
manualError: Date | null;
}
export type WhereClause = Record<string, { [key: string]: string | FindOperator<unknown> }>;
// ----------------------------------

View file

@ -477,4 +477,25 @@ export class InternalHooksClass implements IInternalHooksClass {
}): Promise<void> {
return this.telemetry.track('cnr package deleted', updateData);
}
/**
* Execution Statistics
*/
async onFirstProductionWorkflowSuccess(data: {
user_id: string;
workflow_id: string | number;
}): Promise<void> {
return this.telemetry.track('Workflow first prod success', data, { withPostHog: true });
}
async onFirstWorkflowDataLoad(data: {
user_id: string;
workflow_id: string | number;
node_type: string;
node_id: string;
credential_type?: string;
credential_id?: string;
}): Promise<void> {
return this.telemetry.track('Workflow first data fetched', data, { withPostHog: true });
}
}

View file

@ -112,6 +112,7 @@ import { resolveJwt } from '@/UserManagement/auth/jwt';
import { executionsController } from '@/executions/executions.controller';
import { nodeTypesController } from '@/api/nodeTypes.api';
import { tagsController } from '@/api/tags.api';
import { workflowStatsController } from '@/api/workflowStats.api';
import { loadPublicApiVersions } from '@/PublicApi';
import {
getInstanceBaseUrl,
@ -806,6 +807,11 @@ class App {
// ----------------------------------------
this.app.use(`/${this.restEndpoint}/workflows`, workflowsController);
// ----------------------------------------
// Workflow Statistics
// ----------------------------------------
this.app.use(`/${this.restEndpoint}/workflow-stats`, workflowStatsController);
// ----------------------------------------
// Tags
// ----------------------------------------

View file

@ -16,7 +16,7 @@
import express from 'express';
import get from 'lodash.get';
import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core';
import {
createDeferredPromise,
@ -233,6 +233,7 @@ export async function executeWebhook(
NodeExecuteFunctions,
executionMode,
);
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflow.id, workflowStartNode);
} catch (err) {
// Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';

View file

@ -15,12 +15,13 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, eventEmitter, UserSettings, WorkflowExecute } from 'n8n-core';
import {
IDataObject,
IExecuteData,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
INodeParameters,
IRun,
@ -648,9 +649,20 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.retryOf,
);
}
} finally {
eventEmitter.emit(
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node);
},
],
};
}
@ -742,9 +754,20 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
} finally {
eventEmitter.emit(
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node);
},
],
};
}

View file

@ -16,6 +16,7 @@ import {
IExecuteResponsePromiseData,
IExecuteWorkflowInfo,
ILogger,
INode,
INodeExecutionData,
IRun,
ITaskData,
@ -396,6 +397,11 @@ class WorkflowRunnerProcess {
await this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]);
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
await this.sendHookToParentProcess('nodeFetchedData', [workflowId, node]);
},
],
};
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();

View file

@ -0,0 +1,185 @@
import { User } from '@/databases/entities/User';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import express from 'express';
import { LoggerProxy } from 'n8n-workflow';
import {
Db,
IWorkflowStatisticsCounts,
IWorkflowStatisticsDataLoaded,
IWorkflowStatisticsTimestamps,
ResponseHelper,
} from '..';
import { StatisticsNames } from '../databases/entities/WorkflowStatistics';
import { getLogger } from '../Logger';
import { ExecutionRequest } from '../requests';
export const workflowStatsController = express.Router();
// Helper function that validates the ID, return a flag stating whether the request is allowed
async function checkWorkflowId(workflowId: string, user: User): Promise<boolean> {
// Check permissions
const shared = await Db.collections.SharedWorkflow.findOne({
relations: ['workflow'],
where: whereClause({
user,
entityType: 'workflow',
entityId: workflowId,
}),
});
if (!shared) {
LoggerProxy.info('User attempted to read a workflow without permissions', {
workflowId,
userId: user.id,
});
return false;
}
return true;
}
/**
* Initialise Logger if needed
*/
workflowStatsController.use((req, res, next) => {
try {
LoggerProxy.getInstance();
} catch (error) {
LoggerProxy.init(getLogger());
}
next();
});
/**
* Check that the workflow ID is valid and allowed to be read by the user
*/
workflowStatsController.use(async (req: ExecutionRequest.Get, res, next) => {
const allowed = await checkWorkflowId(req.params.id, req.user);
if (allowed) {
next();
} else {
// Otherwise, make and return an error
const response = new ResponseHelper.NotFoundError(`Workflow ${req.params.id} does not exist.`);
next(response);
}
});
/**
* GET /workflow-stats/:id/counts/
*/
workflowStatsController.get(
'/:id/counts/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsCounts> => {
// Get counts from DB
const workflowId = req.params.id;
// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['count', 'name'],
where: {
workflowId,
},
});
const data: IWorkflowStatisticsCounts = {
productionSuccess: 0,
productionError: 0,
manualSuccess: 0,
manualError: 0,
};
// There will be a maximum of 4 stats (currently)
stats.forEach(({ count, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = count;
break;
case StatisticsNames.manualSuccess:
data.manualSuccess = count;
break;
case StatisticsNames.productionError:
data.productionError = count;
break;
case StatisticsNames.productionSuccess:
data.productionSuccess = count;
}
});
return data;
}),
);
/**
* GET /workflow-stats/:id/times/
*/
workflowStatsController.get(
'/:id/times/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsTimestamps> => {
// Get times from DB
const workflowId = req.params.id;
// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['latestEvent', 'name'],
where: {
workflowId,
},
});
const data: IWorkflowStatisticsTimestamps = {
productionSuccess: null,
productionError: null,
manualSuccess: null,
manualError: null,
};
// There will be a maximum of 4 stats (currently)
stats.forEach(({ latestEvent, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = latestEvent;
break;
case StatisticsNames.manualSuccess:
data.manualSuccess = latestEvent;
break;
case StatisticsNames.productionError:
data.productionError = latestEvent;
break;
case StatisticsNames.productionSuccess:
data.productionSuccess = latestEvent;
}
});
return data;
}),
);
/**
* GET /workflow-stats/:id/data-loaded/
*/
workflowStatsController.get(
'/:id/data-loaded/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsDataLoaded> => {
// Get flag
const workflowId = req.params.id;
// Get the corresponding workflow
const workflow = await Db.collections.Workflow.findOne(workflowId);
// It will be valid if we reach this point, this is just for TS
if (!workflow) {
return { dataLoaded: false };
}
const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: workflow.dataLoaded,
};
return data;
}),
);

View file

@ -13,6 +13,7 @@ import {
Column,
Entity,
Index,
JoinColumn,
JoinTable,
ManyToMany,
OneToMany,
@ -24,6 +25,7 @@ import { TagEntity } from './TagEntity';
import { SharedWorkflow } from './SharedWorkflow';
import { objectRetriever, sqlite } from '../utils/transformers';
import { AbstractEntity, jsonColumnType } from './AbstractEntity';
import { WorkflowStatistics } from './WorkflowStatistics';
import type { IWorkflowDb } from '@/Interfaces';
@Entity()
@ -78,6 +80,16 @@ export class WorkflowEntity extends AbstractEntity implements IWorkflowDb {
@OneToMany(() => SharedWorkflow, (sharedWorkflow) => sharedWorkflow.workflow)
shared: SharedWorkflow[];
@OneToMany(
() => WorkflowStatistics,
(workflowStatistics: WorkflowStatistics) => workflowStatistics.workflow,
)
@JoinColumn({ referencedColumnName: 'workflow' })
statistics: WorkflowStatistics[];
@Column({ default: false })
dataLoaded: boolean;
@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,

View file

@ -0,0 +1,32 @@
import { Column, Entity, RelationId, ManyToOne, PrimaryColumn } from 'typeorm';
import { datetimeColumnType } from './AbstractEntity';
import { WorkflowEntity } from './WorkflowEntity';
export enum StatisticsNames {
productionSuccess = 'production_success',
productionError = 'production_error',
manualSuccess = 'manual_success',
manualError = 'manual_error',
}
@Entity()
export class WorkflowStatistics {
@Column()
count: number;
@Column(datetimeColumnType)
latestEvent: Date;
@PrimaryColumn({ length: 128 })
name: StatisticsNames;
@ManyToOne(() => WorkflowEntity, (workflow) => workflow.shared, {
primary: true,
onDelete: 'CASCADE',
})
workflow: WorkflowEntity;
@RelationId((workflowStatistics: WorkflowStatistics) => workflowStatistics.workflow)
@PrimaryColumn()
workflowId: number;
}

View file

@ -11,6 +11,7 @@ import { SharedWorkflow } from './SharedWorkflow';
import { SharedCredentials } from './SharedCredentials';
import { InstalledPackages } from './InstalledPackages';
import { InstalledNodes } from './InstalledNodes';
import { WorkflowStatistics } from './WorkflowStatistics';
export const entities = {
CredentialsEntity,
@ -25,4 +26,5 @@ export const entities = {
SharedCredentials,
InstalledPackages,
InstalledNodes,
WorkflowStatistics,
};

View file

@ -0,0 +1,38 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers';
import config from '@/config';
export class WorkflowStatistics1664196174002 implements MigrationInterface {
name = 'WorkflowStatistics1664196174002';
async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(
`CREATE TABLE ${tablePrefix}workflow_statistics (
count INTEGER DEFAULT 0,
latestEvent DATETIME,
name VARCHAR(128) NOT NULL,
workflowId INTEGER,
PRIMARY KEY(workflowId, name),
FOREIGN KEY(workflowId) REFERENCES ${tablePrefix}workflow_entity(id) ON DELETE CASCADE
)`,
);
// Add dataLoaded column to workflow table
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN dataLoaded BOOLEAN DEFAULT false`,
);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(`DROP TABLE "${tablePrefix}workflow_statistics"`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN dataLoaded`);
}
}

View file

@ -20,6 +20,7 @@ import { IntroducePinData1654090101303 } from './1654090101303-IntroducePinData'
import { AddNodeIds1658932910559 } from './1658932910559-AddNodeIds';
import { AddJsonKeyPinData1659895550980 } from './1659895550980-AddJsonKeyPinData';
import { CreateCredentialsUserRole1660062385367 } from './1660062385367-CreateCredentialsUserRole';
import { WorkflowStatistics1664196174002 } from './1664196174002-WorkflowStatistics';
import { CreateWorkflowsEditorRole1663755770894 } from './1663755770894-CreateWorkflowsEditorRole';
import { CreateCredentialUsageTable1665484192213 } from './1665484192213-CreateCredentialUsageTable';
import { RemoveCredentialUsageTable1665754637026 } from './1665754637026-RemoveCredentialUsageTable';
@ -52,4 +53,5 @@ export const mysqlMigrations = [
CreateCredentialUsageTable1665484192213,
RemoveCredentialUsageTable1665754637026,
AddWorkflowVersionIdColumn1669739707125,
WorkflowStatistics1664196174002,
];

View file

@ -0,0 +1,42 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers';
import config from '@/config';
export class WorkflowStatistics1664196174001 implements MigrationInterface {
name = 'WorkflowStatistics1664196174001';
async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();
// Create statistics table
await queryRunner.query(
`CREATE TABLE ${tablePrefix}workflow_statistics (
"count" INTEGER DEFAULT 0,
"latestEvent" TIMESTAMP,
"name" VARCHAR(128) NOT NULL,
"workflowId" INTEGER,
PRIMARY KEY("workflowId", "name"),
FOREIGN KEY("workflowId") REFERENCES ${tablePrefix}workflow_entity("id") ON DELETE CASCADE
)`,
);
// Add dataLoaded column to workflow table
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false;`,
);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner): Promise<void> {
let tablePrefix = config.getEnv('database.tablePrefix');
const schema = config.getEnv('database.postgresdb.schema');
if (schema) {
tablePrefix = schema + '.' + tablePrefix;
}
await queryRunner.query(`DROP TABLE ${tablePrefix}workflow_statistics`);
await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN dataLoaded`);
}
}

View file

@ -18,6 +18,7 @@ import { IntroducePinData1654090467022 } from './1654090467022-IntroducePinData'
import { AddNodeIds1658932090381 } from './1658932090381-AddNodeIds';
import { AddJsonKeyPinData1659902242948 } from './1659902242948-AddJsonKeyPinData';
import { CreateCredentialsUserRole1660062385367 } from './1660062385367-CreateCredentialsUserRole';
import { WorkflowStatistics1664196174001 } from './1664196174001-WorkflowStatistics';
import { CreateWorkflowsEditorRole1663755770893 } from './1663755770893-CreateWorkflowsEditorRole';
import { CreateCredentialUsageTable1665484192212 } from './1665484192212-CreateCredentialUsageTable';
import { RemoveCredentialUsageTable1665754637025 } from './1665754637025-RemoveCredentialUsageTable';
@ -48,4 +49,5 @@ export const postgresMigrations = [
CreateCredentialUsageTable1665484192212,
RemoveCredentialUsageTable1665754637025,
AddWorkflowVersionIdColumn1669739707126,
WorkflowStatistics1664196174001,
];

View file

@ -0,0 +1,40 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers';
import config from '@/config';
export class WorkflowStatistics1664196174000 implements MigrationInterface {
name = 'WorkflowStatistics1664196174000';
async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(
`CREATE TABLE \`${tablePrefix}workflow_statistics\` (
"count" INTEGER DEFAULT 0,
"latestEvent" DATETIME,
"name" VARCHAR(128) NOT NULL,
"workflowId" INTEGER,
PRIMARY KEY("workflowId", "name"),
FOREIGN KEY("workflowId") REFERENCES \`${tablePrefix}workflow_entity\`("id") ON DELETE CASCADE
)`,
);
// Add dataLoaded column to workflow table
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);
logMigrationEnd(this.name);
}
async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.getEnv('database.tablePrefix');
await queryRunner.query(`DROP TABLE "${tablePrefix}workflow_statistics"`);
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` DROP COLUMN "dataLoaded"`,
);
}
}

View file

@ -17,6 +17,7 @@ import { IntroducePinData1654089251344 } from './1654089251344-IntroducePinData'
import { AddNodeIds1658930531669 } from './1658930531669-AddNodeIds';
import { AddJsonKeyPinData1659888469333 } from './1659888469333-AddJsonKeyPinData';
import { CreateCredentialsUserRole1660062385367 } from './1660062385367-CreateCredentialsUserRole';
import { WorkflowStatistics1664196174000 } from './1664196174000-WorkflowStatistics';
import { CreateWorkflowsEditorRole1663755770892 } from './1663755770892-CreateWorkflowsUserRole';
import { CreateCredentialUsageTable1665484192211 } from './1665484192211-CreateCredentialUsageTable';
import { RemoveCredentialUsageTable1665754637024 } from './1665754637024-RemoveCredentialUsageTable';
@ -46,6 +47,7 @@ const sqliteMigrations = [
CreateCredentialUsageTable1665484192211,
RemoveCredentialUsageTable1665754637024,
AddWorkflowVersionIdColumn1669739707124,
WorkflowStatistics1664196174000,
];
export { sqliteMigrations };

View file

@ -0,0 +1,105 @@
import { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { Db, InternalHooksManager } from '..';
import { StatisticsNames } from '../databases/entities/WorkflowStatistics';
import { getWorkflowOwner } from '../UserManagement/UserManagementHelper';
export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
runData: IRun,
): Promise<void> {
// Determine the name of the statistic
const finished = runData.finished ? runData.finished : false;
const manual = runData.mode === 'manual';
let name: StatisticsNames;
if (finished) {
if (manual) name = StatisticsNames.manualSuccess;
else name = StatisticsNames.productionSuccess;
} else {
if (manual) name = StatisticsNames.manualError;
else name = StatisticsNames.productionError;
}
// Get the workflow id
let workflowId: number;
try {
workflowId = parseInt(workflowData.id as string, 10);
if (isNaN(workflowId)) throw new Error('not a number');
} catch (error) {
console.error(`Error "${error as string}" when casting workflow ID to a number`);
return;
}
// Try insertion and if it fails due to key conflicts then update the existing entry instead
try {
await Db.collections.WorkflowStatistics.insert({
count: 1,
name,
workflowId,
latestEvent: new Date(),
});
// If we're here we can check if we're sending the first production success metric
if (name !== StatisticsNames.productionSuccess) return;
// Get the owner of the workflow so we can send the metric
const owner = await getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
};
// Send the metrics
await InternalHooksManager.getInstance().onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
console.error(error);
// Do we just assume it's a conflict error? If there is any other sort of error in the DB it should trigger here too
await Db.collections.WorkflowStatistics.update(
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
);
}
}
export async function nodeFetchedData(workflowId: string, node: INode): Promise<void> {
// Get the workflow id
let id: number;
try {
id = parseInt(workflowId, 10);
if (isNaN(id)) throw new Error('not a number');
} catch (error) {
console.error(`Error ${error as string} when casting workflow ID to a number`);
return;
}
// Update only if necessary
const response = await Db.collections.Workflow.update(
{ id, dataLoaded: false },
{ dataLoaded: true },
);
// If response.affected is 1 then we know this was the first time data was loaded into the workflow; do posthog event here
if (!response.affected) return;
// Compile the metrics
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,
workflow_id: id,
node_type: node.type,
node_id: node.id,
};
// This is probably naive but I can't see a way for a node to have multiple credentials attached so..
if (node.credentials) {
Object.entries(node.credentials).forEach(([credName, credDetails]) => {
metrics = Object.assign(metrics, {
credential_type: credName,
credential_id: credDetails.id,
});
});
}
// Send metrics to posthog
await InternalHooksManager.getInstance().onFirstWorkflowDataLoad(metrics);
}

View file

@ -0,0 +1,7 @@
import { eventEmitter } from 'n8n-core';
import { nodeFetchedData, workflowExecutionCompleted } from './WorkflowStatistics';
// Check for undefined as during testing these functions end up undefined for some reason
if (nodeFetchedData) eventEmitter.on(eventEmitter.types.nodeFetchedData, nodeFetchedData);
if (workflowExecutionCompleted)
eventEmitter.on(eventEmitter.types.workflowExecutionCompleted, workflowExecutionCompleted);

View file

@ -25,6 +25,8 @@ import * as WebhookServer from './WebhookServer';
import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData';
import * as WorkflowHelpers from './WorkflowHelpers';
import './events';
export {
ActiveExecutions,
ActiveWorkflowRunner,

View file

@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import RudderStack from '@rudderstack/rudder-sdk-node';
import PostHog from 'posthog-node';
import { PostHog } from 'posthog-node';
import { ITelemetryTrackProperties, LoggerProxy } from 'n8n-workflow';
import config from '@/config';
import { IExecutionTrackProperties } from '@/Interfaces';
@ -20,6 +20,7 @@ interface IExecutionsBuffer {
manual_success?: IExecutionTrackData;
prod_error?: IExecutionTrackData;
prod_success?: IExecutionTrackData;
user_id: string | undefined;
};
}
@ -80,11 +81,15 @@ export class Telemetry {
}
const allPromises = Object.keys(this.executionCountsBuffer).map(async (workflowId) => {
const promise = this.track('Workflow execution count', {
event_version: '2',
workflow_id: workflowId,
...this.executionCountsBuffer[workflowId],
});
const promise = this.track(
'Workflow execution count',
{
event_version: '2',
workflow_id: workflowId,
...this.executionCountsBuffer[workflowId],
},
{ withPostHog: true },
);
return promise;
});
@ -99,7 +104,9 @@ export class Telemetry {
const execTime = new Date();
const workflowId = properties.workflow_id;
this.executionCountsBuffer[workflowId] = this.executionCountsBuffer[workflowId] ?? {};
this.executionCountsBuffer[workflowId] = this.executionCountsBuffer[workflowId] ?? {
user_id: properties.user_id,
};
const key: ExecutionTrackDataKey = `${properties.is_manual ? 'manual' : 'prod'}_${
properties.success ? 'success' : 'error'
@ -184,6 +191,7 @@ export class Telemetry {
return Promise.all([
this.postHog.capture({
distinctId: payload.userId,
sendFeatureFlags: true,
...payload,
}),
this.rudderStack.track(payload),
@ -200,7 +208,7 @@ export class Telemetry {
async isFeatureFlagEnabled(
featureFlagName: string,
{ user_id: userId }: ITelemetryTrackProperties = {},
): Promise<boolean> {
): Promise<boolean | undefined> {
if (!this.postHog) return Promise.resolve(false);
const fullId = [this.instanceId, userId].join('#');

View file

@ -0,0 +1,238 @@
import config from '@/config';
import { InternalHooksManager } from '../../src';
import { nodeFetchedData, workflowExecutionCompleted } from '../../src/events/WorkflowStatistics';
import { WorkflowExecuteMode } from 'n8n-workflow';
const FAKE_USER_ID = 'abcde-fghij';
const mockedFirstProductionWorkflowSuccess = jest.fn((...args) => {});
const mockedFirstWorkflowDataLoad = jest.fn((...args) => {});
const mockedError = jest.spyOn(console, 'error');
jest.spyOn(InternalHooksManager, 'getInstance').mockImplementation((...args) => {
const actual = jest.requireActual('../../src/InternalHooks');
return {
...actual,
onFirstProductionWorkflowSuccess: mockedFirstProductionWorkflowSuccess,
onFirstWorkflowDataLoad: mockedFirstWorkflowDataLoad,
};
});
jest.mock('../../src/Db', () => {
return {
collections: {
Workflow: {
update: jest.fn(({ id, dataLoaded }, updateArgs) => {
if (id === 1) return { affected: 1 };
return { affected: 0 };
}),
},
WorkflowStatistics: {
insert: jest.fn(({ count, name, workflowId }) => {
if (workflowId === -1) throw new Error('test error');
return null;
}),
update: jest.fn((...args) => {}),
},
},
};
});
jest.mock('../../src/UserManagement/UserManagementHelper', () => {
return {
getWorkflowOwner: jest.fn((workflowId) => {
return { id: FAKE_USER_ID };
}),
};
});
describe('Events', () => {
beforeAll(() => {
config.set('diagnostics.enabled', true);
config.set('deployment.type', 'n8n-testing');
});
afterAll(() => {
jest.clearAllTimers();
jest.useRealTimers();
});
beforeEach(() => {
mockedFirstProductionWorkflowSuccess.mockClear();
mockedFirstWorkflowDataLoad.mockClear();
mockedError.mockClear();
});
afterEach(() => {});
describe('workflowExecutionCompleted', () => {
test('should fail with an invalid workflowId', async () => {
const workflow = {
id: 'abcde',
name: '',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
};
const runData = {
finished: true,
data: { resultData: { runData: {} } },
mode: 'internal' as WorkflowExecuteMode,
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedError).toBeCalledTimes(1);
});
test('should create metrics for production successes', async () => {
// Call the function with a production success result, ensure metrics hook gets called
const workflow = {
id: '1',
name: '',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
};
const runData = {
finished: true,
data: { resultData: { runData: {} } },
mode: 'internal' as WorkflowExecuteMode,
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(1);
expect(mockedFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: parseInt(workflow.id, 10),
});
});
test('should only create metrics for production successes', async () => {
// Call the function with a non production success result, ensure metrics hook is never called
const workflow = {
id: '1',
name: '',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
};
const runData = {
finished: false,
data: { resultData: { runData: {} } },
mode: 'internal' as WorkflowExecuteMode,
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(0);
});
test('should not send metrics for updated entries', async () => {
// Call the function with the id that causes insert to fail, ensure update is called *and* metrics aren't sent
const mockedError = jest.spyOn(console, 'error');
const workflow = {
id: '-1',
name: '',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
};
const runData = {
finished: true,
data: { resultData: { runData: {} } },
mode: 'internal' as WorkflowExecuteMode,
startedAt: new Date(),
};
mockedError.mockClear();
await workflowExecutionCompleted(workflow, runData);
expect(mockedError).toBeCalled();
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(0);
});
});
describe('nodeFetchedData', () => {
test('should fail with an invalid workflowId', async () => {
const workflowId = 'abcde';
const node = {
id: 'abcde',
name: 'test node',
typeVersion: 1,
type: '',
position: [0, 0] as [number, number],
parameters: {},
};
await nodeFetchedData(workflowId, node);
expect(mockedError).toBeCalledTimes(1);
});
test('should create metrics when the db is updated', async () => {
// Call the function with a production success result, ensure metrics hook gets called
const workflowId = '1';
const node = {
id: 'abcde',
name: 'test node',
typeVersion: 1,
type: '',
position: [0, 0] as [number, number],
parameters: {},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(mockedFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: parseInt(workflowId, 10),
node_type: node.type,
node_id: node.id,
});
});
test('should create metrics with credentials when the db is updated', async () => {
// Call the function with a production success result, ensure metrics hook gets called
const workflowId = '1';
const node = {
id: 'abcde',
name: 'test node',
typeVersion: 1,
type: '',
position: [0, 0] as [number, number],
parameters: {},
credentials: {
testCredentials: {
id: '1',
name: 'Test Credentials',
},
},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(mockedFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: parseInt(workflowId, 10),
node_type: node.type,
node_id: node.id,
credential_type: 'testCredentials',
credential_id: node.credentials.testCredentials.id,
});
});
test('should not send metrics for entries that already have the flag set', async () => {
// Fetch data for workflow 2 which is set up to not be altered in the mocks
const workflowId = '2';
const node = {
id: 'abcde',
name: 'test node',
typeVersion: 1,
type: '',
position: [0, 0] as [number, number],
parameters: {},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(0);
});
});
});

View file

@ -335,37 +335,51 @@ describe('Telemetry', () => {
expect(pulseSpy).toBeCalledTimes(1);
expect(spyTrack).toHaveBeenCalledTimes(3);
expect(spyTrack).toHaveBeenNthCalledWith(1, 'Workflow execution count', {
event_version: '2',
workflow_id: '1',
manual_error: {
count: 2,
first: testDateTime,
expect(spyTrack).toHaveBeenNthCalledWith(
1,
'Workflow execution count',
{
event_version: '2',
workflow_id: '1',
user_id: undefined,
manual_error: {
count: 2,
first: testDateTime,
},
manual_success: {
count: 2,
first: testDateTime,
},
prod_error: {
count: 2,
first: testDateTime,
},
prod_success: {
count: 2,
first: testDateTime,
},
},
manual_success: {
count: 2,
first: testDateTime,
{ withPostHog: true },
);
expect(spyTrack).toHaveBeenNthCalledWith(
2,
'Workflow execution count',
{
event_version: '2',
workflow_id: '2',
user_id: undefined,
prod_error: {
count: 2,
first: testDateTime,
},
},
prod_error: {
count: 2,
first: testDateTime,
},
prod_success: {
count: 2,
first: testDateTime,
},
});
expect(spyTrack).toHaveBeenNthCalledWith(2, 'Workflow execution count', {
event_version: '2',
workflow_id: '2',
prod_error: {
count: 2,
first: testDateTime,
},
});
{ withPostHog: true },
);
expect(spyTrack).toHaveBeenNthCalledWith(3, 'pulse');
expect(Object.keys(execBuffer).length).toBe(0);
// Adding a second step here because we believe PostHog may use timers for sending data
// and adding posthog to the above metric was causing the pulseSpy timer to not be ran
jest.advanceTimersToNextTimer();
execBuffer = telemetry.getCountsBuffer();

View file

@ -0,0 +1,15 @@
import EventEmitter from 'events';
interface EventTypes {
nodeFetchedData: string;
workflowExecutionCompleted: string;
}
class N8NEventEmitter extends EventEmitter {
types: EventTypes = {
nodeFetchedData: 'nodeFetchedData',
workflowExecutionCompleted: 'workflowExecutionCompleted',
};
}
export const eventEmitter = new N8NEventEmitter();

View file

@ -555,6 +555,9 @@ function digestAuthAxiosConfig(
}
async function proxyRequestToAxios(
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
node: INode,
uriOrObject: string | IDataObject,
options?: IDataObject,
): Promise<any> {
@ -624,7 +627,7 @@ async function proxyRequestToAxios(
return new Promise((resolve, reject) => {
axiosPromise
.then((response) => {
.then(async (response) => {
if (configObject.resolveWithFullResponse === true) {
let body = response.data;
if (response.data === '') {
@ -634,6 +637,7 @@ async function proxyRequestToAxios(
body = undefined;
}
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
resolve({
body,
headers: response.headers,
@ -650,6 +654,7 @@ async function proxyRequestToAxios(
body = undefined;
}
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
resolve(body);
}
})
@ -1517,7 +1522,7 @@ export async function requestWithAuthentication(
node,
additionalData.timezone,
);
return await proxyRequestToAxios(requestOptions as IDataObject);
return await proxyRequestToAxios(workflow, additionalData, node, requestOptions as IDataObject);
} catch (error) {
try {
if (credentialsDecrypted !== undefined) {
@ -1543,7 +1548,12 @@ export async function requestWithAuthentication(
additionalData.timezone,
);
// retry the request
return await proxyRequestToAxios(requestOptions as IDataObject);
return await proxyRequestToAxios(
workflow,
additionalData,
node,
requestOptions as IDataObject,
);
}
}
throw error;
@ -2004,7 +2014,9 @@ export function getExecutePollFunctions(
mimeType,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestWithAuthentication(
this: IAllExecuteFunctions,
credentialsType: string,
@ -2169,7 +2181,9 @@ export function getExecuteTriggerFunctions(
mimeType,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
@ -2437,7 +2451,9 @@ export function getExecuteFunctions(
): Promise<Buffer> {
return getBinaryDataBuffer.call(this, inputData, itemIndex, propertyName, inputIndex);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
@ -2662,7 +2678,9 @@ export function getExecuteSingleFunctions(
mimeType,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
@ -2816,7 +2834,9 @@ export function getLoadOptionsFunctions(
additionalCredentialOptions,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
@ -2962,7 +2982,9 @@ export function getExecuteHookFunctions(
additionalCredentialOptions,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,
@ -3162,7 +3184,9 @@ export function getExecuteWebhookFunctions(
mimeType,
);
},
request: proxyRequestToAxios,
request: async (uriOrObject: string | IDataObject, options?: IDataObject | undefined) => {
return proxyRequestToAxios(workflow, additionalData, node, uriOrObject, options);
},
async requestOAuth2(
this: IAllExecuteFunctions,
credentialsType: string,

View file

@ -1,3 +1,4 @@
import { eventEmitter } from './EventEmitter';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as UserSettings from './UserSettings';
@ -13,7 +14,7 @@ export * from './LoadNodeParameterOptions';
export * from './LoadNodeListSearch';
export * from './NodeExecuteFunctions';
export * from './WorkflowExecute';
export { NodeExecuteFunctions, UserSettings };
export { eventEmitter, NodeExecuteFunctions, UserSettings };
declare module 'http' {
export interface IncomingMessage {

View file

@ -186,7 +186,7 @@ importers:
passport-jwt: ^4.0.0
pg: ^8.3.0
picocolors: ^1.0.0
posthog-node: ^1.3.0
posthog-node: ^2.2.2
prom-client: ^13.1.0
psl: ^1.8.0
replacestream: ^4.0.3
@ -270,7 +270,7 @@ importers:
passport-jwt: 4.0.0
pg: 8.8.0
picocolors: 1.0.0
posthog-node: 1.3.0
posthog-node: 2.2.2
prom-client: 13.2.0
psl: 1.9.0
replacestream: 4.0.3
@ -7587,10 +7587,11 @@ packages:
- debug
dev: false
/axios/0.24.0:
resolution: {integrity: sha512-Q6cWsys88HoPgAaFAVUb0WpPk0O8iTeisR9IMqy9G8AbO4NlpVknrnQS03zzF9PGAWgO3cgletO3VjV/P7VztA==}
/axios/0.27.2:
resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==}
dependencies:
follow-redirects: 1.15.2_debug@3.2.7
follow-redirects: 1.15.2
form-data: 4.0.0
transitivePeerDependencies:
- debug
dev: false
@ -7607,7 +7608,7 @@ packages:
/axios/1.1.3:
resolution: {integrity: sha512-00tXVRwKx/FZr/IDVFt4C+f9FYairX517WoGCL6dpOntqLkZofjhu43F/Xl44UOpqa+9sLFDrG/XAnFsUYgkDA==}
dependencies:
follow-redirects: 1.15.2_debug@3.2.7
follow-redirects: 1.15.2
form-data: 4.0.0
proxy-from-env: 1.1.0
transitivePeerDependencies:
@ -17301,19 +17302,11 @@ packages:
xtend: 4.0.2
dev: false
/posthog-node/1.3.0:
resolution: {integrity: sha512-2+VhqiY/rKIqKIXyvemBFHbeijHE25sP7eKltnqcFqAssUE6+sX6vusN9A4luzToOqHQkUZexiCKxvuGagh7JA==}
engines: {node: '>=4'}
hasBin: true
/posthog-node/2.2.2:
resolution: {integrity: sha512-aXYe/D+28kF63W8Cz53t09ypEORz+ULeDCahdAqhVrRm2scbOXFbtnn0GGhvMpYe45grepLKuwui9KxrZ2ZuMw==}
engines: {node: '>=14.17.0'}
dependencies:
axios: 0.24.0
axios-retry: 3.3.1
component-type: 1.2.1
join-component: 1.1.0
md5: 2.3.0
ms: 2.1.3
remove-trailing-slash: 0.1.1
uuid: 8.3.2
axios: 0.27.2
transitivePeerDependencies:
- debug
dev: false