refactor(core): Decouple post workflow execute event from internal hooks (no-changelog) (#10280)

This commit is contained in:
Iván Ovejero 2024-08-02 12:05:06 +02:00 committed by GitHub
parent 07d7b247f0
commit a533916628
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 179 additions and 219 deletions

View file

@ -1,23 +1,10 @@
import { Service } from 'typedi';
import { snakeCase } from 'change-case';
import { get as pslGet } from 'psl';
import type {
ExecutionStatus,
INodesGraphResult,
IRun,
ITelemetryTrackProperties,
IWorkflowBase,
} from 'n8n-workflow';
import { TelemetryHelpers } from 'n8n-workflow';
import { N8N_VERSION } from '@/constants';
import type { ITelemetryTrackProperties } from 'n8n-workflow';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { determineFinalExecutionStatus } from '@/executionLifecycleHooks/shared/sharedHookFunctions';
import type { ITelemetryUserDeletionData, IExecutionTrackProperties } from '@/Interfaces';
import type { ITelemetryUserDeletionData } from '@/Interfaces';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { NodeTypes } from '@/NodeTypes';
import { Telemetry } from '@/telemetry';
import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';
@ -30,8 +17,6 @@ import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';
export class InternalHooks {
constructor(
private readonly telemetry: Telemetry,
private readonly nodeTypes: NodeTypes,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
workflowStatisticsService: WorkflowStatisticsService,
// Can't use @ts-expect-error because only dev time tsconfig considers this as an error, but not build time
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -64,145 +49,6 @@ export class InternalHooks {
this.telemetry.track('User responded to personalization questions', personalizationSurveyData);
}
// eslint-disable-next-line complexity
async onWorkflowPostExecute(
_executionId: string,
workflow: IWorkflowBase,
runData?: IRun,
userId?: string,
) {
if (!workflow.id) {
return;
}
if (runData?.status === 'waiting') {
// No need to send telemetry or logs when the workflow hasn't finished yet.
return;
}
const telemetryProperties: IExecutionTrackProperties = {
workflow_id: workflow.id,
is_manual: false,
version_cli: N8N_VERSION,
success: false,
};
if (userId) {
telemetryProperties.user_id = userId;
}
if (runData?.data.resultData.error?.message?.includes('canceled')) {
runData.status = 'canceled';
}
telemetryProperties.success = !!runData?.finished;
// const executionStatus: ExecutionStatus = runData?.status ?? 'unknown';
const executionStatus: ExecutionStatus = runData
? determineFinalExecutionStatus(runData)
: 'unknown';
if (runData !== undefined) {
telemetryProperties.execution_mode = runData.mode;
telemetryProperties.is_manual = runData.mode === 'manual';
let nodeGraphResult: INodesGraphResult | null = null;
if (!telemetryProperties.success && runData?.data.resultData.error) {
telemetryProperties.error_message = runData?.data.resultData.error.message;
let errorNodeName =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.name
: undefined;
telemetryProperties.error_node_type =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.type
: undefined;
if (runData.data.resultData.lastNodeExecuted) {
const lastNode = TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.resultData.lastNodeExecuted,
);
if (lastNode !== undefined) {
telemetryProperties.error_node_type = lastNode.type;
errorNodeName = lastNode.name;
}
}
if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
if (errorNodeName) {
telemetryProperties.error_node_id = nodeGraphResult.nameIndices[errorNodeName];
}
}
}
if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
}
let userRole: 'owner' | 'sharee' | undefined = undefined;
if (userId) {
const role = await this.sharedWorkflowRepository.findSharingRole(userId, workflow.id);
if (role) {
userRole = role === 'workflow:owner' ? 'owner' : 'sharee';
}
}
const manualExecEventProperties: ITelemetryTrackProperties = {
user_id: userId,
workflow_id: workflow.id,
status: executionStatus,
executionStatus: runData?.status ?? 'unknown',
error_message: telemetryProperties.error_message as string,
error_node_type: telemetryProperties.error_node_type,
node_graph_string: telemetryProperties.node_graph_string as string,
error_node_id: telemetryProperties.error_node_id as string,
webhook_domain: null,
sharing_role: userRole,
};
if (!manualExecEventProperties.node_graph_string) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
}
if (runData.data.startData?.destinationNode) {
const telemetryPayload = {
...manualExecEventProperties,
node_type: TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.startData?.destinationNode,
)?.type,
node_id: nodeGraphResult.nameIndices[runData.data.startData?.destinationNode],
};
this.telemetry.track('Manual node exec finished', telemetryPayload);
} else {
nodeGraphResult.webhookNodeNames.forEach((name: string) => {
const execJson = runData.data.resultData.runData[name]?.[0]?.data?.main?.[0]?.[0]
?.json as { headers?: { origin?: string } };
if (execJson?.headers?.origin && execJson.headers.origin !== '') {
manualExecEventProperties.webhook_domain = pslGet(
execJson.headers.origin.replace(/^https?:\/\//, ''),
);
}
});
this.telemetry.track('Manual workflow exec finished', manualExecEventProperties);
}
}
}
this.telemetry.trackWorkflowExecution(telemetryProperties);
}
onWorkflowSharingUpdate(workflowId: string, userId: string, userList: string[]) {
const properties: ITelemetryTrackProperties = {
workflow_id: workflowId,

View file

@ -52,7 +52,6 @@ import { Push } from '@/push';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { SecretsHelper } from './SecretsHelpers';
@ -548,7 +547,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const internalHooks = Container.get(InternalHooks);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
@ -644,13 +642,9 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;
void internalHooks.onWorkflowPostExecute(executionId, workflow, runData);
eventService.emit('workflow-post-execute', {
workflowId: workflow.id,
workflowName: workflow.name,
workflow,
executionId,
success: runData.status === 'success',
isManual: runData.mode === 'manual',
runData,
});
},
@ -787,7 +781,6 @@ async function executeWorkflow(
parentCallbackManager?: CallbackManager;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const internalHooks = Container.get(InternalHooks);
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
@ -933,13 +926,9 @@ async function executeWorkflow(
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
void internalHooks.onWorkflowPostExecute(executionId, workflowData, data, additionalData.userId);
eventService.emit('workflow-post-execute', {
workflowId: workflowData.id,
workflowName: workflowData.name,
workflow: workflowData,
executionId,
success: data.status === 'success',
isManual: data.mode === 'manual',
userId: additionalData.userId,
runData: data,
});

View file

@ -34,7 +34,6 @@ import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { EventService } from './eventbus/event.service';
@ -160,18 +159,9 @@ export class WorkflowRunner {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
postExecutePromise
.then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
data.workflowData,
executionData,
data.userId,
);
this.eventService.emit('workflow-post-execute', {
workflowId: data.workflowData.id,
workflowName: data.workflowData.name,
workflow: data.workflowData,
executionId,
success: executionData?.status === 'success',
isManual: data.executionMode === 'manual',
userId: data.userId,
runData: executionData,
});

View file

@ -141,27 +141,31 @@ describe('AuditEventRelay', () => {
it('should log on `workflow-post-execute` for successful execution', () => {
const payload = mock<Event['workflow-post-execute']>({
executionId: 'some-id',
success: true,
userId: 'some-id',
workflowId: 'some-id',
isManual: true,
workflowName: 'some-name',
metadata: {},
runData: mock<IRun>({ data: { resultData: {} } }),
workflow: mock<IWorkflowBase>({ id: 'some-id', name: 'some-name' }),
runData: mock<IRun>({ status: 'success', mode: 'manual', data: { resultData: {} } }),
});
eventService.emit('workflow-post-execute', payload);
const { runData: _, ...rest } = payload;
const { runData: _, workflow: __, ...rest } = payload;
expect(eventBus.sendWorkflowEvent).toHaveBeenCalledWith({
eventName: 'n8n.workflow.success',
payload: rest,
payload: {
...rest,
success: true,
isManual: true,
workflowName: 'some-name',
workflowId: 'some-id',
},
});
});
it('should handle `workflow-post-execute` event for unsuccessful execution', () => {
it('should log on `workflow-post-execute` event for unsuccessful execution', () => {
const runData = mock<IRun>({
status: 'error',
mode: 'manual',
data: {
resultData: {
lastNodeExecuted: 'some-node',
@ -177,23 +181,23 @@ describe('AuditEventRelay', () => {
const event = {
executionId: 'some-id',
success: false,
userId: 'some-id',
workflowId: 'some-id',
isManual: true,
workflowName: 'some-name',
metadata: {},
workflow: mock<IWorkflowBase>({ id: 'some-id', name: 'some-name' }),
runData,
};
eventService.emit('workflow-post-execute', event);
const { runData: _, ...rest } = event;
const { runData: _, workflow: __, ...rest } = event;
expect(eventBus.sendWorkflowEvent).toHaveBeenCalledWith({
eventName: 'n8n.workflow.failed',
payload: {
...rest,
success: false,
isManual: true,
workflowName: 'some-name',
workflowId: 'some-id',
lastNodeExecuted: 'some-node',
errorNodeType: 'some-type',
errorMessage: 'some-message',

View file

@ -122,12 +122,20 @@ export class AuditEventRelay {
}
private workflowPostExecute(event: Event['workflow-post-execute']) {
const { runData, ...rest } = event;
const { runData, workflow, ...rest } = event;
if (event.success) {
const payload = {
...rest,
success: runData?.status === 'success',
isManual: runData?.mode === 'manual',
workflowId: workflow.id,
workflowName: workflow.name,
};
if (payload.success) {
void this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.success',
payload: rest,
payload,
});
return;
@ -136,7 +144,7 @@ export class AuditEventRelay {
void this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.failed',
payload: {
...rest,
...payload,
lastNodeExecuted: runData?.data.resultData.lastNodeExecuted,
errorNodeType:
runData?.data.resultData.error && 'node' in runData?.data.resultData.error

View file

@ -44,12 +44,8 @@ export type Event = {
'workflow-post-execute': {
executionId: string;
success: boolean;
userId?: string;
workflowId: string;
isManual: boolean;
workflowName: string;
metadata?: Record<string, string>;
workflow: IWorkflowBase;
runData?: IRun;
};

View file

@ -3,7 +3,6 @@ import { Push } from '@/push';
import { jsonStringify, sleep } from 'n8n-workflow';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
import type { DateTime } from 'luxon';
import type { IRun, ITaskData } from 'n8n-workflow';
import type { EventMessageTypes } from '../eventbus/EventMessageClasses';
@ -280,22 +279,9 @@ export class ExecutionRecoveryService {
private async runHooks(execution: IExecutionResponse) {
execution.data ??= { resultData: { runData: {} } };
await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, {
data: execution.data,
finished: false,
mode: execution.mode,
waitTill: execution.waitTill,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
status: execution.status,
});
this.eventService.emit('workflow-post-execute', {
workflowId: execution.workflowData.id,
workflowName: execution.workflowData.name,
workflow: execution.workflowData,
executionId: execution.id,
success: execution.status === 'success',
isManual: execution.mode === 'manual',
runData: execution,
});

View file

@ -8,10 +8,14 @@ import { License } from '@/License';
import { GlobalConfig } from '@n8n/config';
import { N8N_VERSION } from '@/constants';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { ExecutionStatus, INodesGraphResult, ITelemetryTrackProperties } from 'n8n-workflow';
import { get as pslGet } from 'psl';
import { TelemetryHelpers } from 'n8n-workflow';
import { NodeTypes } from '@/NodeTypes';
import { SharedWorkflowRepository } from '@/databases/repositories/sharedWorkflow.repository';
import { ProjectRelationRepository } from '@/databases/repositories/projectRelation.repository';
import type { IExecutionTrackProperties } from '@/Interfaces';
import { determineFinalExecutionStatus } from '@/executionLifecycleHooks/shared/sharedHookFunctions';
@Service()
export class TelemetryEventRelay {
@ -118,6 +122,9 @@ export class TelemetryEventRelay {
this.eventService.on('workflow-saved', async (event) => {
await this.workflowSaved(event);
});
this.eventService.on('workflow-post-execute', async (event) => {
await this.workflowPostExecute(event);
});
}
private teamProjectUpdated({ userId, role, members, projectId }: Event['team-project-updated']) {
@ -584,4 +591,138 @@ export class TelemetryEventRelay {
earliest_workflow_created: firstWorkflow?.createdAt,
});
}
// eslint-disable-next-line complexity
private async workflowPostExecute({ workflow, runData, userId }: Event['workflow-post-execute']) {
if (!workflow.id) {
return;
}
if (runData?.status === 'waiting') {
// No need to send telemetry or logs when the workflow hasn't finished yet.
return;
}
const telemetryProperties: IExecutionTrackProperties = {
workflow_id: workflow.id,
is_manual: false,
version_cli: N8N_VERSION,
success: false,
};
if (userId) {
telemetryProperties.user_id = userId;
}
if (runData?.data.resultData.error?.message?.includes('canceled')) {
runData.status = 'canceled';
}
telemetryProperties.success = !!runData?.finished;
// const executionStatus: ExecutionStatus = runData?.status ?? 'unknown';
const executionStatus: ExecutionStatus = runData
? determineFinalExecutionStatus(runData)
: 'unknown';
if (runData !== undefined) {
telemetryProperties.execution_mode = runData.mode;
telemetryProperties.is_manual = runData.mode === 'manual';
let nodeGraphResult: INodesGraphResult | null = null;
if (!telemetryProperties.success && runData?.data.resultData.error) {
telemetryProperties.error_message = runData?.data.resultData.error.message;
let errorNodeName =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.name
: undefined;
telemetryProperties.error_node_type =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.type
: undefined;
if (runData.data.resultData.lastNodeExecuted) {
const lastNode = TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.resultData.lastNodeExecuted,
);
if (lastNode !== undefined) {
telemetryProperties.error_node_type = lastNode.type;
errorNodeName = lastNode.name;
}
}
if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
if (errorNodeName) {
telemetryProperties.error_node_id = nodeGraphResult.nameIndices[errorNodeName];
}
}
}
if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
}
let userRole: 'owner' | 'sharee' | undefined = undefined;
if (userId) {
const role = await this.sharedWorkflowRepository.findSharingRole(userId, workflow.id);
if (role) {
userRole = role === 'workflow:owner' ? 'owner' : 'sharee';
}
}
const manualExecEventProperties: ITelemetryTrackProperties = {
user_id: userId,
workflow_id: workflow.id,
status: executionStatus,
executionStatus: runData?.status ?? 'unknown',
error_message: telemetryProperties.error_message as string,
error_node_type: telemetryProperties.error_node_type,
node_graph_string: telemetryProperties.node_graph_string as string,
error_node_id: telemetryProperties.error_node_id as string,
webhook_domain: null,
sharing_role: userRole,
};
if (!manualExecEventProperties.node_graph_string) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
}
if (runData.data.startData?.destinationNode) {
const telemetryPayload = {
...manualExecEventProperties,
node_type: TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.startData?.destinationNode,
)?.type,
node_id: nodeGraphResult.nameIndices[runData.data.startData?.destinationNode],
};
this.telemetry.track('Manual node exec finished', telemetryPayload);
} else {
nodeGraphResult.webhookNodeNames.forEach((name: string) => {
const execJson = runData.data.resultData.runData[name]?.[0]?.data?.main?.[0]?.[0]
?.json as { headers?: { origin?: string } };
if (execJson?.headers?.origin && execJson.headers.origin !== '') {
manualExecEventProperties.webhook_domain = pslGet(
execJson.headers.origin.replace(/^https?:\/\//, ''),
);
}
});
this.telemetry.track('Manual workflow exec finished', manualExecEventProperties);
}
}
}
this.telemetry.trackWorkflowExecution(telemetryProperties);
}
}