import { Service } from 'typedi'; import type { IExecuteData, INode, IPinData, IRunExecutionData } from 'n8n-workflow'; import { SubworkflowOperationError, Workflow, ErrorReporterProxy as ErrorReporter, } from 'n8n-workflow'; import config from '@/config'; import type { User } from '@db/entities/User'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import type { WorkflowRequest } from '@/workflows/workflow.request'; import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData, IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import { WorkflowRunner } from '@/WorkflowRunner'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { TestWebhooks } from '@/TestWebhooks'; import { Logger } from '@/Logger'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; @Service() export class WorkflowExecutionService { constructor( private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, private readonly testWebhooks: TestWebhooks, private readonly permissionChecker: PermissionChecker, ) {} async executeManually( { workflowData, runData, pinData, startNodes, destinationNode, }: WorkflowRequest.ManualRunPayload, user: User, sessionId?: string, ) { const pinnedTrigger = this.findPinnedTrigger(workflowData, startNodes, pinData); // If webhooks nodes exist and are active we have to wait for till we receive a call if ( pinnedTrigger === null && (runData === undefined || startNodes === undefined || startNodes.length === 0 || destinationNode === undefined) ) { const additionalData = await WorkflowExecuteAdditionalData.getBase(user.id); const needsWebhook = await this.testWebhooks.needsWebhook( user.id, workflowData, additionalData, runData, sessionId, destinationNode, ); if (needsWebhook) return { waitingForWebhook: true }; } // For manual testing always set to not active workflowData.active = false; // Start the workflow const data: IWorkflowExecutionDataProcess = { destinationNode, executionMode: 'manual', runData, pinData, sessionId, startNodes, workflowData, userId: user.id, }; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; if (pinnedTrigger && !hasRunData(pinnedTrigger)) { data.startNodes = [pinnedTrigger.name]; } const workflowRunner = new WorkflowRunner(); const executionId = await workflowRunner.run(data); return { executionId, }; } /** Executes an error workflow */ async executeErrorWorkflow( workflowId: string, workflowErrorData: IWorkflowErrorData, runningUser: User, ): Promise { // Wrap everything in try/catch to make sure that no errors bubble up and all get caught here try { const workflowData = await this.workflowRepository.findOneBy({ id: workflowId }); if (workflowData === null) { // The error workflow could not be found this.logger.error( `Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find error workflow "${workflowId}"`, { workflowId }, ); return; } const executionMode = 'error'; const workflowInstance = new Workflow({ id: workflowId, name: workflowData.name, nodeTypes: this.nodeTypes, nodes: workflowData.nodes, connections: workflowData.connections, active: workflowData.active, staticData: workflowData.staticData, settings: workflowData.settings, }); try { const failedNode = workflowErrorData.execution?.lastNodeExecuted ? workflowInstance.getNode(workflowErrorData.execution?.lastNodeExecuted) : undefined; await this.permissionChecker.checkSubworkflowExecutePolicy( workflowInstance, workflowErrorData.workflow.id!, failedNode ?? undefined, ); } catch (error) { const initialNode = workflowInstance.getStartNode(); if (initialNode) { const errorWorkflowPermissionError = new SubworkflowOperationError( `Another workflow: (ID ${workflowErrorData.workflow.id}) tried to invoke this workflow to handle errors.`, "Unfortunately current permissions do not allow this. Please check that this workflow's settings allow it to be called by others", ); // Create a fake execution and save it to DB. const fakeExecution = WorkflowHelpers.generateFailedExecutionFromError( 'error', errorWorkflowPermissionError, initialNode, ); const fullExecutionData: ExecutionPayload = { data: fakeExecution.data, mode: fakeExecution.mode, finished: false, startedAt: new Date(), stoppedAt: new Date(), workflowData, waitTill: null, status: fakeExecution.status, workflowId: workflowData.id, }; await this.executionRepository.createNewExecution(fullExecutionData); } this.logger.info('Error workflow execution blocked due to subworkflow settings', { erroredWorkflowId: workflowErrorData.workflow.id, errorWorkflowId: workflowId, }); return; } let node: INode; let workflowStartNode: INode | undefined; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); for (const nodeName of Object.keys(workflowInstance.nodes)) { node = workflowInstance.nodes[nodeName]; if (node.type === ERROR_TRIGGER_TYPE) { workflowStartNode = node; } } if (workflowStartNode === undefined) { this.logger.error( `Calling Error Workflow for "${workflowErrorData.workflow.id}". Could not find "${ERROR_TRIGGER_TYPE}" in workflow "${workflowId}"`, ); return; } // Can execute without webhook so go on // Initialize the data of the webhook node const nodeExecutionStack: IExecuteData[] = []; nodeExecutionStack.push({ node: workflowStartNode, data: { main: [ [ { json: workflowErrorData, }, ], ], }, source: null, }); const runExecutionData: IRunExecutionData = { startData: {}, resultData: { runData: {}, }, executionData: { contextData: {}, metadata: {}, nodeExecutionStack, waitingExecution: {}, waitingExecutionSource: {}, }, }; const runData: IWorkflowExecutionDataProcess = { executionMode, executionData: runExecutionData, workflowData, userId: runningUser.id, }; const workflowRunner = new WorkflowRunner(); await workflowRunner.run(runData); } catch (error) { ErrorReporter.error(error); this.logger.error( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access `Calling Error Workflow for "${workflowErrorData.workflow.id}": "${error.message}"`, { workflowId: workflowErrorData.workflow.id }, ); } } /** * Find the pinned trigger to execute the workflow from, if any. * * - In a full execution, select the _first_ pinned trigger. * - In a partial execution, * - select the _first_ pinned trigger that leads to the executed node, * - else select the executed pinned trigger. */ private findPinnedTrigger(workflow: IWorkflowDb, startNodes?: string[], pinData?: IPinData) { if (!pinData || !startNodes) return null; const isTrigger = (nodeTypeName: string) => ['trigger', 'webhook'].some((suffix) => nodeTypeName.toLowerCase().includes(suffix)); const pinnedTriggers = workflow.nodes.filter( (node) => !node.disabled && pinData[node.name] && isTrigger(node.type), ); if (pinnedTriggers.length === 0) return null; if (startNodes?.length === 0) return pinnedTriggers[0]; // full execution const [startNodeName] = startNodes; const parentNames = new Workflow({ nodes: workflow.nodes, connections: workflow.connections, active: workflow.active, nodeTypes: this.nodeTypes, }).getParentNodes(startNodeName); let checkNodeName = ''; if (parentNames.length === 0) { checkNodeName = startNodeName; } else { checkNodeName = parentNames.find((pn) => pn === pinnedTriggers[0].name) as string; } return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution } }