From 8acc3c5931805c686ea47e82e1fccafbde25378a Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Thu, 19 Dec 2019 16:07:55 -0600 Subject: [PATCH] :sparkles: Add ExecuteWorkflow-Node --- packages/cli/src/ActiveWorkflowRunner.ts | 2 +- packages/cli/src/Interfaces.ts | 15 +- packages/cli/src/Server.ts | 5 +- packages/cli/src/WebhookHelpers.ts | 30 +- .../cli/src/WorkflowExecuteAdditionalData.ts | 260 ++++++++++++++---- packages/cli/src/WorkflowHelpers.ts | 111 ++++++++ packages/cli/src/WorkflowRunner.ts | 92 ++----- packages/cli/src/WorkflowRunnerProcess.ts | 14 +- packages/core/src/NodeExecuteFunctions.ts | 16 ++ packages/core/src/WorkflowExecute.ts | 19 +- .../nodes-base/nodes/ExecuteWorkflow.node.ts | 75 +++++ packages/nodes-base/package.json | 1 + packages/workflow/src/Interfaces.ts | 33 ++- packages/workflow/src/WorkflowHooks.ts | 48 ++++ packages/workflow/src/index.ts | 1 + 15 files changed, 525 insertions(+), 197 deletions(-) create mode 100644 packages/nodes-base/nodes/ExecuteWorkflow.node.ts create mode 100644 packages/workflow/src/WorkflowHooks.ts diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index cde03c20d3..6a0e5cb02b 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -301,7 +301,7 @@ export class ActiveWorkflowRunner { const mode = 'trigger'; const credentials = await WorkflowCredentials(workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.getBase(mode, credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode); // Add the workflows which have webhooks defined diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 9d615d36a8..4000c30785 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -1,16 +1,14 @@ import { - IConnections, ICredentialsDecrypted, ICredentialsEncrypted, IDataObject, IExecutionError, - INode, IRun, IRunData, IRunExecutionData, ITaskData, + IWorkflowBase as IWorkflowBaseWorkflow, IWorkflowCredentials, - IWorkflowSettings, WorkflowExecuteMode, } from 'n8n-workflow'; @@ -44,16 +42,9 @@ export interface IDatabaseCollections { } -export interface IWorkflowBase { +export interface IWorkflowBase extends IWorkflowBaseWorkflow { id?: number | string | ObjectID; - name: string; - active: boolean; - createdAt: Date; - updatedAt: Date; - nodes: INode[]; - connections: IConnections; - settings?: IWorkflowSettings; - staticData?: IDataObject; + } diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index d40374915f..41fda80372 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -497,7 +497,7 @@ class App { if (WorkflowHelpers.isWorkflowIdValid(workflowData.id) === true && (runData === undefined || startNodes === undefined || startNodes.length === 0 || destinationNode === undefined)) { // Webhooks can only be tested with saved workflows const credentials = await WorkflowCredentials(workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); const nodeTypes = NodeTypes(); const workflowInstance = new Workflow(workflowData.id, workflowData.nodes, workflowData.connections, false, nodeTypes, undefined, workflowData.settings); const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, sessionId, destinationNode); @@ -544,13 +544,12 @@ class App { const methodName = req.query.methodName; const nodeTypes = NodeTypes(); - const executionMode = 'manual'; const loadDataInstance = new LoadNodeParameterOptions(nodeType, nodeTypes, credentials); const workflowData = loadDataInstance.getWorkflowData() as IWorkflowBase; const workflowCredentials = await WorkflowCredentials(workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, workflowCredentials, currentNodeParameters); + const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowCredentials, currentNodeParameters); return loadDataInstance.getOptions(methodName, additionalData); })); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 77ef8087be..b13cc3b0ea 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -9,6 +9,7 @@ import { IWorkflowDb, IWorkflowExecutionDataProcess, ResponseHelper, + WorkflowHelpers, WorkflowRunner, WorkflowCredentials, WorkflowExecuteAdditionalData, @@ -24,9 +25,7 @@ import { IDataObject, IExecuteData, INode, - IRun, IRunExecutionData, - ITaskData, IWebhookData, IWebhookResponseData, IWorkflowExecuteAdditionalData, @@ -38,29 +37,6 @@ import { const activeExecutions = ActiveExecutions.getInstance(); -/** - * Returns the data of the last executed node - * - * @export - * @param {IRun} inputData - * @returns {(ITaskData | undefined)} - */ -export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined { - const runData = inputData.data.resultData.runData; - const lastNodeExecuted = inputData.data.resultData.lastNodeExecuted; - - if (lastNodeExecuted === undefined) { - return undefined; - } - - if (runData[lastNodeExecuted] === undefined) { - return undefined; - } - - return runData[lastNodeExecuted][runData[lastNodeExecuted].length - 1]; -} - - /** * Returns all the webhooks which should be created for the give workflow * @@ -132,7 +108,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo // Prepare everything that is needed to run the workflow const credentials = await WorkflowCredentials(workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials); // Add the Response and Request so that this data can be accessed in the node additionalData.httpRequest = req; @@ -286,7 +262,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo return undefined; } - const returnData = getDataLastExecutedNodeData(data); + const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); if (returnData === undefined) { if (didSendResponse === false) { responseCallback(null, { diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 91821e607b..befa2c6680 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -5,6 +5,7 @@ import { IPushDataExecutionFinished, IWorkflowBase, IWorkflowExecutionDataProcess, + NodeTypes, Push, ResponseHelper, WebhookHelpers, @@ -13,17 +14,25 @@ import { import { UserSettings, + WorkflowExecute, } from 'n8n-core'; import { IDataObject, + IExecuteData, + INode, INodeParameters, + INodeExecutionData, IRun, + IRunExecutionData, ITaskData, IWorkflowCredentials, IWorkflowExecuteAdditionalData, IWorkflowExecuteHooks, + IWorkflowHooksOptionalParameters, + Workflow, WorkflowExecuteMode, + WorkflowHooks, } from 'n8n-workflow'; import * as config from '../config'; @@ -35,15 +44,10 @@ import * as config from '../config'; * * @param {IWorkflowBase} workflowData The workflow which got executed * @param {IRun} fullRunData The run which produced the error - * @param {WorkflowExecuteMode} mode The mode in which the workflow which did error got started in + * @param {WorkflowExecuteMode} mode The mode in which the workflow got started in * @param {string} [executionId] The id the execution got saved as */ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mode: WorkflowExecuteMode, executionId?: string, retryOf?: string): void { - if (mode === 'manual') { - // Do not call error workflow when executed manually - return; - } - // Check if there was an error and if so if an errorWorkflow is set if (fullRunData.data.resultData.error !== undefined && workflowData.settings !== undefined && workflowData.settings.errorWorkflow) { const workflowErrorData = { @@ -68,11 +72,12 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo /** * Pushes the execution out to all connected clients * + * @param {WorkflowExecuteMode} mode The mode in which the workflow got started in * @param {IRun} fullRunData The RunData of the finished execution * @param {string} executionIdActive The id of the finished execution * @param {string} [executionIdDb] The database id of finished execution */ -export function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) { +export function pushExecutionFinished(mode: WorkflowExecuteMode, fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) { // Clone the object except the runData. That one is not supposed // to be send. Because that data got send piece by piece after // each node which finished executing @@ -101,100 +106,116 @@ export function pushExecutionFinished(fullRunData: IRun, executionIdActive: stri /** - * Returns the workflow execution hooks + * Returns hook functions to push data to Editor-UI * - * @param {WorkflowExecuteMode} mode - * @param {IWorkflowBase} workflowData - * @param {string} executionId - * @param {string} [sessionId] - * @param {string} [retryOf] * @returns {IWorkflowExecuteHooks} */ -const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, executionId: string, sessionId?: string, retryOf?: string): IWorkflowExecuteHooks => { +function hookFunctionsPush(): IWorkflowExecuteHooks { return { nodeExecuteBefore: [ - async (nodeName: string): Promise => { + async function (this: WorkflowHooks, nodeName: string): Promise { // Push data to session which started workflow before each // node which starts rendering - if (sessionId === undefined) { + if (this.sessionId === undefined) { return; } const pushInstance = Push.getInstance(); pushInstance.send('nodeExecuteBefore', { - executionId, + executionId: this.executionId, nodeName, - }, sessionId); + }, this.sessionId); }, ], nodeExecuteAfter: [ - async (nodeName: string, data: ITaskData): Promise => { + async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { // Push data to session which started workflow after each rendered node - if (sessionId === undefined) { + if (this.sessionId === undefined) { return; } const pushInstance = Push.getInstance(); pushInstance.send('nodeExecuteAfter', { - executionId, + executionId: this.executionId, nodeName, data, - }, sessionId); + }, this.sessionId); }, ], workflowExecuteBefore: [ - async (): Promise => { + async function (this: WorkflowHooks): Promise { // Push data to editor-ui once workflow finished const pushInstance = Push.getInstance(); pushInstance.send('executionStarted', { - executionId, - mode, + executionId: this.executionId, + mode: this.mode, startedAt: new Date(), - retryOf, - workflowId: workflowData.id as string, - workflowName: workflowData.name, + retryOf: this.retryOf, + workflowId: this.workflowData.id as string, + workflowName: this.workflowData.name, }); } ], workflowExecuteAfter: [ - async (fullRunData: IRun, newStaticData: IDataObject): Promise => { + async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise { + pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf); + }, + ] + }; +} + + +/** + * Returns hook functions to save workflow execution and call error workflow + * + * @returns {IWorkflowExecuteHooks} + */ +function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { + return { + nodeExecuteBefore: [], + nodeExecuteAfter: [], + workflowExecuteBefore: [], + workflowExecuteAfter: [ + async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise { + + const isManualMode = [this.mode, parentProcessMode].includes('manual'); + try { - if (mode !== 'manual' && WorkflowHelpers.isWorkflowIdValid(workflowData.id as string) === true && newStaticData) { + if (!isManualMode && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) { // Workflow is saved so update in database try { - await WorkflowHelpers.saveStaticDataById(workflowData.id as string, newStaticData); + await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData); } catch (e) { // TODO: Add proper logging! - console.error(`There was a problem saving the workflow with id "${workflowData.id}" to save changed staticData: ${e.message}`); + console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`); } } let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean; - if (workflowData.settings !== undefined && workflowData.settings.saveManualExecutions !== undefined) { + if (this.workflowData.settings !== undefined && this.workflowData.settings.saveManualExecutions !== undefined) { // Apply to workflow override - saveManualExecutions = workflowData.settings.saveManualExecutions as boolean; + saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean; } - if (mode === 'manual' && saveManualExecutions === false) { - pushExecutionFinished(fullRunData, executionId, undefined, retryOf); - executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf); + if (isManualMode && saveManualExecutions === false) { return; } // Check config to know if execution should be saved or not let saveDataErrorExecution = config.get('executions.saveDataOnError') as string; let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string; - if (workflowData.settings !== undefined) { - saveDataErrorExecution = (workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution; - saveDataSuccessExecution = (workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution; + if (this.workflowData.settings !== undefined) { + saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution; + saveDataSuccessExecution = (this.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution; } const workflowDidSucceed = !fullRunData.data.resultData.error; if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' || workflowDidSucceed === false && saveDataErrorExecution === 'none' ) { - pushExecutionFinished(fullRunData, executionId, undefined, retryOf); - executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf); + if (!isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + } return; } @@ -204,15 +225,15 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution finished: fullRunData.finished ? fullRunData.finished : false, startedAt: fullRunData.startedAt, stoppedAt: fullRunData.stoppedAt, - workflowData, + workflowData: this.workflowData, }; - if (retryOf !== undefined) { - fullExecutionData.retryOf = retryOf.toString(); + if (this.retryOf !== undefined) { + fullExecutionData.retryOf = this.retryOf.toString(); } - if (workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(workflowData.id.toString()) === true) { - fullExecutionData.workflowId = workflowData.id.toString(); + if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) { + fullExecutionData.workflowId = this.workflowData.id.toString(); } const executionData = ResponseHelper.flattenExecutionData(fullExecutionData); @@ -220,33 +241,133 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution // Save the Execution in DB const executionResult = await Db.collections.Execution!.save(executionData as IExecutionFlattedDb); - if (fullRunData.finished === true && retryOf !== undefined) { + if (fullRunData.finished === true && this.retryOf !== undefined) { // If the retry was successful save the reference it on the original execution // await Db.collections.Execution!.save(executionData as IExecutionFlattedDb); - await Db.collections.Execution!.update(retryOf, { retrySuccessId: executionResult.id }); + await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: executionResult.id }); } - pushExecutionFinished(fullRunData, executionId, executionResult.id as string, retryOf); - executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined, retryOf); + if (!isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, executionResult ? executionResult.id as string : undefined, this.retryOf); + } } catch (error) { - pushExecutionFinished(fullRunData, executionId, undefined, retryOf); - executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf); + if (!isManualMode) { + executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf); + } } }, ] }; -}; +} + + +/** + * Executes the workflow with the given ID + * + * @export + * @param {string} workflowId The id of the workflow to execute + * @param {IWorkflowExecuteAdditionalData} additionalData + * @param {INodeExecutionData[]} [inputData] + * @returns {(Promise>)} + */ +export async function executeWorkflow(workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]): Promise> { + const mode = 'integrated'; + + if (Db.collections!.Workflow === null) { + // The first time executeWorkflow gets called the Database has + // to get initialized first + await Db.init(); + } + + const workflowData = await Db.collections!.Workflow!.findOne(workflowId); + if (workflowData === undefined) { + throw new Error(`The workflow with the id "${workflowId}" does not exist.`); + } + + const nodeTypes = NodeTypes(); + + const workflow = new Workflow(workflowId as string | undefined, workflowData!.nodes, workflowData!.connections, workflowData!.active, nodeTypes, workflowData!.staticData); + + // Does not get used so set it simply to empty string + const executionId = ''; + + // Create new additionalData to have different workflow loaded and to call + // different webooks + const additionalDataIntegrated = await getBase(additionalData.credentials); + additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData, { parentProcessMode: additionalData.hooks!.mode }); + + // Find Start-Node + const requiredNodeTypes = ['n8n-nodes-base.start']; + let startNode: INode | undefined; + for (const node of workflowData!.nodes) { + if (requiredNodeTypes.includes(node.type)) { + startNode = node; + break; + } + } + if (startNode === undefined) { + // If the workflow does not contain a start-node we can not know what + // should be executed and with what data to start. + throw new Error(`The workflow does not contain a "Start" node and can so not be executed.`); + } + + // Always start with empty data if no inputData got supplied + inputData = inputData || [ + { + json: {} + } + ]; + + // Initialize the incoming data + const nodeExecutionStack: IExecuteData[] = []; + nodeExecutionStack.push( + { + node: startNode, + data: { + main: [inputData], + }, + }, + ); + + const runExecutionData: IRunExecutionData = { + startData: { + }, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + waitingExecution: {}, + }, + }; + + // Execute the workflow + const workflowExecute = new WorkflowExecute(additionalDataIntegrated, mode, runExecutionData); + const data = await workflowExecute.processRunExecutionData(workflow); + + if (data.finished === true) { + // Workflow did finish successfully + const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); + return returnData!.data!.main; + } else { + // Workflow did fail + const error = new Error(data.data.resultData.error!.message); + error.stack = data.data.resultData.error!.stack; + throw error; + } +} /** * Returns the base additional data without webhooks * * @export - * @param {WorkflowExecuteMode} mode * @param {IWorkflowCredentials} credentials + * @param {INodeParameters[]} [currentNodeParameters=[]] * @returns {Promise} */ -export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowCredentials, currentNodeParameters: INodeParameters[] = []): Promise { +export async function getBase(credentials: IWorkflowCredentials, currentNodeParameters: INodeParameters[] = []): Promise { const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const timezone = config.get('generic.timezone') as string; @@ -261,6 +382,8 @@ export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowC return { credentials, encryptionKey, + executeWorkflow, + restApiUrl: urlBaseWebhook + config.get('endpoints.rest') as string, timezone, webhookBaseUrl, webhookTestBaseUrl, @@ -270,13 +393,30 @@ export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowC /** - * Returns the workflow hooks + * Returns WorkflowHooks instance for running integrated workflows + * (Workflows which get started inside of another workflow) + */ +export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks { + optionalParameters = optionalParameters || {}; + const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode); + return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); +} + + +/** + * Returns WorkflowHooks instance for running the main workflow * * @export * @param {IWorkflowExecutionDataProcess} data * @param {string} executionId - * @returns {IWorkflowExecuteHooks} + * @returns {WorkflowHooks} */ -export function getHookMethods(data: IWorkflowExecutionDataProcess, executionId: string): IWorkflowExecuteHooks { - return hooks(data.executionMode, data.workflowData, executionId, data.sessionId, data.retryOf as string | undefined); +export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string): WorkflowHooks { + const hookFunctions = hookFunctionsSave(); + const pushFunctions = hookFunctionsPush(); + for (const key of Object.keys(pushFunctions)) { + hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]); + } + + return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string}); } diff --git a/packages/cli/src/WorkflowHelpers.ts b/packages/cli/src/WorkflowHelpers.ts index b3e1156fa9..ab723a95e5 100644 --- a/packages/cli/src/WorkflowHelpers.ts +++ b/packages/cli/src/WorkflowHelpers.ts @@ -1,5 +1,6 @@ import { Db, + ITransferNodeTypes, IWorkflowExecutionDataProcess, IWorkflowErrorData, NodeTypes, @@ -11,7 +12,9 @@ import { IDataObject, IExecuteData, INode, + IRun, IRunExecutionData, + ITaskData, Workflow, } from 'n8n-workflow'; @@ -19,6 +22,31 @@ import * as config from '../config'; const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string; + +/** + * Returns the data of the last executed node + * + * @export + * @param {IRun} inputData + * @returns {(ITaskData | undefined)} + */ +export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined { + const runData = inputData.data.resultData.runData; + const lastNodeExecuted = inputData.data.resultData.lastNodeExecuted; + + if (lastNodeExecuted === undefined) { + return undefined; + } + + if (runData[lastNodeExecuted] === undefined) { + return undefined; + } + + return runData[lastNodeExecuted][runData[lastNodeExecuted].length - 1]; +} + + + /** * Returns if the given id is a valid workflow id * @@ -129,6 +157,89 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData +/** + * Returns all the defined NodeTypes + * + * @export + * @returns {ITransferNodeTypes} + */ +export function getAllNodeTypeData(): ITransferNodeTypes { + const nodeTypes = NodeTypes(); + + // Get the data of all thenode types that they + // can be loaded again in the process + const returnData: ITransferNodeTypes = {}; + for (const nodeTypeName of Object.keys(nodeTypes.nodeTypes)) { + if (nodeTypes.nodeTypes[nodeTypeName] === undefined) { + throw new Error(`The NodeType "${nodeTypeName}" could not be found!`); + } + + returnData[nodeTypeName] = { + className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name, + sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath, + }; + } + + return returnData; +} + + + +/** + * Returns the data of the node types that are needed + * to execute the given nodes + * + * @export + * @param {INode[]} nodes + * @returns {ITransferNodeTypes} + */ +export function getNodeTypeData(nodes: INode[]): ITransferNodeTypes { + const nodeTypes = NodeTypes(); + + // Check which node-types have to be loaded + const neededNodeTypes = getNeededNodeTypes(nodes); + + // Get all the data of the needed node types that they + // can be loaded again in the process + const returnData: ITransferNodeTypes = {}; + for (const nodeTypeName of neededNodeTypes) { + if (nodeTypes.nodeTypes[nodeTypeName] === undefined) { + throw new Error(`The NodeType "${nodeTypeName}" could not be found!`); + } + + returnData[nodeTypeName] = { + className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name, + sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath, + }; + } + + return returnData; +} + + + +/** + * Returns the names of the NodeTypes which are are needed + * to execute the gives nodes + * + * @export + * @param {INode[]} nodes + * @returns {string[]} + */ +export function getNeededNodeTypes(nodes: INode[]): string[] { + // Check which node-types have to be loaded + const neededNodeTypes: string[] = []; + for (const node of nodes) { + if (!neededNodeTypes.includes(node.type)) { + neededNodeTypes.push(node.type); + } + } + + return neededNodeTypes; +} + + + /** * Saves the static data if it changed * diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 5496ace2a4..8840d1cfb0 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -1,11 +1,9 @@ - import { ActiveExecutions, IProcessMessageDataHook, ITransferNodeTypes, IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcessWithExecution, - NodeTypes, Push, WorkflowExecuteAdditionalData, WorkflowHelpers, @@ -17,9 +15,8 @@ import { import { IExecutionError, - INode, IRun, - IWorkflowExecuteHooks, + WorkflowHooks, WorkflowExecuteMode, } from 'n8n-workflow'; @@ -38,70 +35,15 @@ export class WorkflowRunner { } - /** - * Returns the data of the node types that are needed - * to execute the given nodes - * - * @param {INode[]} nodes - * @returns {ITransferNodeTypes} - * @memberof WorkflowRunner - */ - getNodeTypeData(nodes: INode[]): ITransferNodeTypes { - const nodeTypes = NodeTypes(); - - // Check which node-types have to be loaded - const neededNodeTypes: string[] = []; - for (const node of nodes) { - if (!neededNodeTypes.includes(node.type)) { - neededNodeTypes.push(node.type); - } - } - - // Get all the data of the needed node types that they - // can be loaded again in the process - const returnData: ITransferNodeTypes = {}; - for (const nodeTypeName of neededNodeTypes) { - if (nodeTypes.nodeTypes[nodeTypeName] === undefined) { - throw new Error(`The NodeType "${nodeTypeName}" could not be found!`); - } - - returnData[nodeTypeName] = { - className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name, - sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath, - }; - } - - return returnData; - } - - /** * The process did send a hook message so execute the appropiate hook * - * @param {IWorkflowExecuteHooks} hookFunctions + * @param {WorkflowHooks} workflowHooks * @param {IProcessMessageDataHook} hookData * @memberof WorkflowRunner */ - processHookMessage(hookFunctions: IWorkflowExecuteHooks, hookData: IProcessMessageDataHook) { - if (hookFunctions[hookData.hook] !== undefined && Array.isArray(hookFunctions[hookData.hook])) { - - for (const hookFunction of hookFunctions[hookData.hook]!) { - // TODO: Not sure if that is 100% correct or something is still missing like to wait - hookFunction.apply(this, hookData.parameters) - .catch((error: Error) => { - // Catch all errors here because when "executeHook" gets called - // we have the most time no "await" and so the errors would so - // not be uncaught by anything. - - // TODO: Add proper logging - console.error(`There was a problem executing hook: "${hookData.hook}"`); - console.error('Parameters:'); - console.error(hookData.parameters); - console.error('Error:'); - console.error(error); - }); - } - } + processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) { + workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters); } @@ -133,7 +75,7 @@ export class WorkflowRunner { this.activeExecutions.remove(executionId, fullRunData); // Also send to Editor UI - WorkflowExecuteAdditionalData.pushExecutionFinished(fullRunData, executionId); + WorkflowExecuteAdditionalData.pushExecutionFinished(executionMode, fullRunData, executionId); } @@ -157,12 +99,30 @@ export class WorkflowRunner { // Register the active execution const executionId = this.activeExecutions.add(subprocess, data); - const nodeTypeData = this.getNodeTypeData(data.workflowData.nodes); + // Check if workflow contains a "executeWorkflow" Node as in this + // case we can not know which nodeTypes will be needed and so have + // to load all of them in the workflowRunnerProcess + let loadAllNodeTypes = false; + for (const node of data.workflowData.nodes) { + if (node.type === 'n8n-nodes-base.executeWorkflow') { + loadAllNodeTypes = true; + break; + } + } + + let nodeTypeData: ITransferNodeTypes; + if (loadAllNodeTypes === true) { + // Supply all nodeTypes + nodeTypeData = WorkflowHelpers.getAllNodeTypeData(); + } else { + // Supply only nodeTypes which the workflow needs + nodeTypeData = WorkflowHelpers.getNodeTypeData(data.workflowData.nodes); + } (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; (data as unknown as IWorkflowExecutionDataProcessWithExecution).nodeTypeData = nodeTypeData; - const hookFunctions = WorkflowExecuteAdditionalData.getHookMethods(data, executionId); + const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); // Send all data to subprocess it needs to run the workflow subprocess.send({ type: 'startWorkflow', data } as IProcessMessage); @@ -178,7 +138,7 @@ export class WorkflowRunner { this.processError(executionError, startedAt, data.executionMode, executionId); } else if (message.type === 'processHook') { - this.processHookMessage(hookFunctions, message.data as IProcessMessageDataHook); + this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook); } }); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 528c92f471..4d8ef4d000 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -1,6 +1,5 @@ import { - IProcessMessageDataHook, IWorkflowExecutionDataProcessWithExecution, NodeTypes, WorkflowExecuteAdditionalData, @@ -18,10 +17,9 @@ import { INodeTypeData, IRun, ITaskData, - IWorkflowExecuteHooks, Workflow, + WorkflowHooks, } from 'n8n-workflow'; -import { ChildProcess } from 'child_process'; export class WorkflowRunnerProcess { data: IWorkflowExecutionDataProcessWithExecution | undefined; @@ -61,10 +59,9 @@ export class WorkflowRunnerProcess { await nodeTypes.init(nodeTypesData); this.workflow = new Workflow(this.data.workflowData.id as string | undefined, this.data.workflowData!.nodes, this.data.workflowData!.connections, this.data.workflowData!.active, nodeTypes, this.data.workflowData!.staticData); - const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.executionMode, this.data.credentials); + const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials); additionalData.hooks = this.getProcessForwardHooks(); - if (this.data.executionData !== undefined) { this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData); return this.workflowExecute.processRunExecutionData(this.workflow); @@ -111,11 +108,10 @@ export class WorkflowRunnerProcess { * the parent process where they then can be executed with access * to database and to PushService * - * @param {ChildProcess} process * @returns */ - getProcessForwardHooks(): IWorkflowExecuteHooks { - return { + getProcessForwardHooks(): WorkflowHooks { + const hookFunctions = { nodeExecuteBefore: [ async (nodeName: string): Promise => { this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]); @@ -137,6 +133,8 @@ export class WorkflowRunnerProcess { }, ] }; + + return new WorkflowHooks(hookFunctions, this.data!.executionMode, this.data!.executionId, this.data!.workflowData, { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string }); } } diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 344abf9f83..775c2fd754 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -341,6 +341,9 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue); }, + getRestApiUrl: (): string => { + return additionalData.restApiUrl; + }, getTimezone: (): string => { return getTimezone(workflow, additionalData); }, @@ -375,6 +378,10 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node) => { return { + async executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise { // tslint:disable-line:no-any + // return additionalData.executeWorkflow(workflowId, additionalData, inputData); + return additionalData.executeWorkflow(workflowId, additionalData, inputData); + }, getContext(type: string): IContextObject { return NodeHelpers.getContext(runExecutionData, type, node); }, @@ -408,6 +415,9 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx getMode: (): WorkflowExecuteMode => { return mode; }, + getRestApiUrl: (): string => { + return additionalData.restApiUrl; + }, getTimezone: (): string => { return getTimezone(workflow, additionalData); }, @@ -482,6 +492,9 @@ export function getExecuteSingleFunctions(workflow: Workflow, runExecutionData: getMode: (): WorkflowExecuteMode => { return mode; }, + getRestApiUrl: (): string => { + return additionalData.restApiUrl; + }, getTimezone: (): string => { return getTimezone(workflow, additionalData); }, @@ -540,6 +553,9 @@ export function getLoadOptionsFunctions(workflow: Workflow, node: INode, additio getTimezone: (): string => { return getTimezone(workflow, additionalData); }, + getRestApiUrl: (): string => { + return additionalData.restApiUrl; + }, helpers: { request: requestPromise, }, diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 83fd2f4296..d8ca8cd50c 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -226,25 +226,8 @@ export class WorkflowExecute { if (this.additionalData.hooks === undefined) { return; } - if (this.additionalData.hooks[hookName] === undefined || this.additionalData.hooks[hookName]!.length === 0) { - return; - } - for (const hookFunction of this.additionalData.hooks[hookName]!) { - await hookFunction.apply(this, parameters as [IRun, IWaitingForExecution]) - .catch((error) => { - // Catch all errors here because when "executeHook" gets called - // we have the most time no "await" and so the errors would so - // not be caught by anything. - - // TODO: Add proper logging - console.error(`There was a problem executing hook: "${hookName}"`); - console.error('Parameters:'); - console.error(parameters); - console.error('Error:'); - console.error(error); - }); - } + return this.additionalData.hooks.executeHookFunctions(hookName, parameters); } diff --git a/packages/nodes-base/nodes/ExecuteWorkflow.node.ts b/packages/nodes-base/nodes/ExecuteWorkflow.node.ts new file mode 100644 index 0000000000..bf5090e4c4 --- /dev/null +++ b/packages/nodes-base/nodes/ExecuteWorkflow.node.ts @@ -0,0 +1,75 @@ +import { OptionsWithUri } from 'request'; + +import { IExecuteFunctions } from 'n8n-core'; +import { + ILoadOptionsFunctions, + INodeExecutionData, + INodePropertyOptions, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + + +export class ExecuteWorkflow implements INodeType { + description: INodeTypeDescription = { + displayName: 'Execute Workflow', + name: 'executeWorkflow', + icon: 'fa:network-wired', + group: ['transform'], + version: 1, + subtitle: '={{"Workflow: " + $parameter["workflowId"]}}', + description: 'Execute another workflow', + defaults: { + name: 'Execute Workflow', + color: '#ff6d5a', + }, + inputs: ['main'], + outputs: ['main'], + properties: [ + { + displayName: 'Workflow', + name: 'workflowId', + type: 'options', + typeOptions: { + loadOptionsMethod: 'getWorkflows', + }, + default: '', + required: true, + description: 'The workflow to execute.', + }, + ] + }; + + methods = { + loadOptions: { + async getWorkflows(this: ILoadOptionsFunctions): Promise { + const options: OptionsWithUri = { + method: 'GET', + uri: this.getRestApiUrl() + '/workflows', + json: true + }; + + const returnData: INodePropertyOptions[] = []; + + const responseData = await this.helpers.request!(options); + for (const workflowData of responseData.data) { + returnData.push({ + name: workflowData.name, + value: workflowData.id, + }); + } + + return returnData; + } + }, + }; + + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + const workflowId = this.getNodeParameter('workflowId', 0) as string; + const receivedData = await this.executeWorkflow(workflowId, items); + + return receivedData; + } +} diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 5afa6b431c..a5c3e02ce6 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -94,6 +94,7 @@ "dist/nodes/EmailSend.node.js", "dist/nodes/ErrorTrigger.node.js", "dist/nodes/ExecuteCommand.node.js", + "dist/nodes/ExecuteWorkflow.node.js", "dist/nodes/FileMaker/FileMaker.node.js", "dist/nodes/Freshdesk/Freshdesk.node.js", "dist/nodes/Flow/Flow.node.js", diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 660096d7c1..7824ee52cb 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1,4 +1,5 @@ import { Workflow } from './Workflow'; +import { WorkflowHooks } from './WorkflowHooks'; import * as express from 'express'; export interface IBinaryData { @@ -149,6 +150,7 @@ export interface IExecuteContextData { export interface IExecuteFunctions { + executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise; // tslint:disable-line:no-any getContext(type: string): IContextObject; getCredentials(type: string): ICredentialDataDecryptedObject | undefined; getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[]; @@ -156,6 +158,7 @@ export interface IExecuteFunctions { getNodeParameter(parameterName: string, itemIndex: number, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData; getWorkflowStaticData(type: string): IDataObject; + getRestApiUrl(): string; getTimezone(): string; prepareOutputData(outputData: INodeExecutionData[], outputIndex?: number): Promise; helpers: { @@ -170,6 +173,7 @@ export interface IExecuteSingleFunctions { getInputData(inputIndex?: number, inputName?: string): INodeExecutionData; getMode(): WorkflowExecuteMode; getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any + getRestApiUrl(): string; getTimezone(): string; getWorkflowDataProxy(): IWorkflowDataProxyData; getWorkflowStaticData(type: string): IDataObject; @@ -184,6 +188,7 @@ export interface ILoadOptionsFunctions { getCurrentNodeParameter(parameterName: string): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object | undefined; getCurrentNodeParameters(): INodeParameters | undefined; getTimezone(): string; + getRestApiUrl(): string; helpers: { [key: string]: ((...args: any[]) => any) | undefined; //tslint:disable-line:no-any }; @@ -208,6 +213,7 @@ export interface ITriggerFunctions { getCredentials(type: string): ICredentialDataDecryptedObject | undefined; getMode(): WorkflowExecuteMode; getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any + getRestApiUrl(): string; getTimezone(): string; getWorkflowStaticData(type: string): IDataObject; helpers: { @@ -574,6 +580,18 @@ export interface IWaitingForExecution { } +export interface IWorkflowBase { + id?: number | string | any; // tslint:disable-line:no-any + name: string; + active: boolean; + createdAt: Date; + updatedAt: Date; + nodes: INode[]; + connections: IConnections; + settings?: IWorkflowSettings; + staticData?: IDataObject; +} + export interface IWorkflowCredentials { // Credential type [key: string]: { @@ -582,6 +600,7 @@ export interface IWorkflowCredentials { }; } + export interface IWorkflowExecuteHooks { [key: string]: Array<((...args: any[]) => Promise)> | undefined; // tslint:disable-line:no-any nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise)>; @@ -593,16 +612,26 @@ export interface IWorkflowExecuteHooks { export interface IWorkflowExecuteAdditionalData { credentials: IWorkflowCredentials; encryptionKey: string; - hooks?: IWorkflowExecuteHooks; + executeWorkflow: (workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]) => Promise; // tslint:disable-line:no-any + // hooks?: IWorkflowExecuteHooks; + hooks?: WorkflowHooks; httpResponse?: express.Response; httpRequest?: express.Request; + restApiUrl: string; timezone: string; webhookBaseUrl: string; webhookTestBaseUrl: string; currentNodeParameters? : INodeParameters[]; } -export type WorkflowExecuteMode = 'cli' | 'error' | 'internal' | 'manual' | 'retry' | 'trigger' | 'webhook'; +export type WorkflowExecuteMode = 'cli' | 'error' | 'integrated' | 'internal' | 'manual' | 'retry' | 'trigger' | 'webhook'; + +export interface IWorkflowHooksOptionalParameters { + parentProcessMode?: string; + retryOf?: string; + sessionId?: string; +} + export interface IWorkflowSettings { [key: string]: IDataObject | string | number | boolean | undefined; diff --git a/packages/workflow/src/WorkflowHooks.ts b/packages/workflow/src/WorkflowHooks.ts new file mode 100644 index 0000000000..94a02abddc --- /dev/null +++ b/packages/workflow/src/WorkflowHooks.ts @@ -0,0 +1,48 @@ +import { + IWorkflowBase, + IWorkflowExecuteHooks, + IWorkflowHooksOptionalParameters, + WorkflowExecuteMode, +} from './Interfaces'; + + +export class WorkflowHooks { + mode: WorkflowExecuteMode; + workflowData: IWorkflowBase; + executionId: string; + sessionId?: string; + retryOf?: string; + hookFunctions: IWorkflowExecuteHooks; + + constructor(hookFunctions: IWorkflowExecuteHooks, mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters) { + optionalParameters = optionalParameters || {}; + + this.hookFunctions = hookFunctions; + this.mode = mode; + this.executionId = executionId; + this.workflowData = workflowData; + this.sessionId = optionalParameters.sessionId; + this.retryOf = optionalParameters.retryOf; + } + + async executeHookFunctions(hookName: string, parameters: any[]) { // tslint:disable-line:no-any + if (this.hookFunctions[hookName] !== undefined && Array.isArray(this.hookFunctions[hookName])) { + for (const hookFunction of this.hookFunctions[hookName]!) { + await hookFunction.apply(this, parameters) + .catch((error: Error) => { + // Catch all errors here because when "executeHook" gets called + // we have the most time no "await" and so the errors would so + // not be uncaught by anything. + + // TODO: Add proper logging + console.error(`There was a problem executing hook: "${hookName}"`); + console.error('Parameters:'); + console.error(parameters); + console.error('Error:'); + console.error(error); + }); + } + } + } + +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 42a5cf77c1..89d01313d6 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -1,6 +1,7 @@ export * from './Interfaces'; export * from './Workflow'; export * from './WorkflowDataProxy'; +export * from './WorkflowHooks'; import * as NodeHelpers from './NodeHelpers'; import * as ObservableObject from './ObservableObject';