Add ExecuteWorkflow-Node

This commit is contained in:
Jan Oberhauser 2019-12-19 16:07:55 -06:00
parent 403c52ddf8
commit 8acc3c5931
15 changed files with 525 additions and 197 deletions

View file

@ -301,7 +301,7 @@ export class ActiveWorkflowRunner {
const mode = 'trigger';
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(mode, credentials);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode);
// Add the workflows which have webhooks defined

View file

@ -1,16 +1,14 @@
import {
IConnections,
ICredentialsDecrypted,
ICredentialsEncrypted,
IDataObject,
IExecutionError,
INode,
IRun,
IRunData,
IRunExecutionData,
ITaskData,
IWorkflowBase as IWorkflowBaseWorkflow,
IWorkflowCredentials,
IWorkflowSettings,
WorkflowExecuteMode,
} from 'n8n-workflow';
@ -44,16 +42,9 @@ export interface IDatabaseCollections {
}
export interface IWorkflowBase {
export interface IWorkflowBase extends IWorkflowBaseWorkflow {
id?: number | string | ObjectID;
name: string;
active: boolean;
createdAt: Date;
updatedAt: Date;
nodes: INode[];
connections: IConnections;
settings?: IWorkflowSettings;
staticData?: IDataObject;
}

View file

@ -497,7 +497,7 @@ class App {
if (WorkflowHelpers.isWorkflowIdValid(workflowData.id) === true && (runData === undefined || startNodes === undefined || startNodes.length === 0 || destinationNode === undefined)) {
// Webhooks can only be tested with saved workflows
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
const nodeTypes = NodeTypes();
const workflowInstance = new Workflow(workflowData.id, workflowData.nodes, workflowData.connections, false, nodeTypes, undefined, workflowData.settings);
const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, sessionId, destinationNode);
@ -544,13 +544,12 @@ class App {
const methodName = req.query.methodName;
const nodeTypes = NodeTypes();
const executionMode = 'manual';
const loadDataInstance = new LoadNodeParameterOptions(nodeType, nodeTypes, credentials);
const workflowData = loadDataInstance.getWorkflowData() as IWorkflowBase;
const workflowCredentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, workflowCredentials, currentNodeParameters);
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowCredentials, currentNodeParameters);
return loadDataInstance.getOptions(methodName, additionalData);
}));

View file

@ -9,6 +9,7 @@ import {
IWorkflowDb,
IWorkflowExecutionDataProcess,
ResponseHelper,
WorkflowHelpers,
WorkflowRunner,
WorkflowCredentials,
WorkflowExecuteAdditionalData,
@ -24,9 +25,7 @@ import {
IDataObject,
IExecuteData,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWebhookData,
IWebhookResponseData,
IWorkflowExecuteAdditionalData,
@ -38,29 +37,6 @@ import {
const activeExecutions = ActiveExecutions.getInstance();
/**
* Returns the data of the last executed node
*
* @export
* @param {IRun} inputData
* @returns {(ITaskData | undefined)}
*/
export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined {
const runData = inputData.data.resultData.runData;
const lastNodeExecuted = inputData.data.resultData.lastNodeExecuted;
if (lastNodeExecuted === undefined) {
return undefined;
}
if (runData[lastNodeExecuted] === undefined) {
return undefined;
}
return runData[lastNodeExecuted][runData[lastNodeExecuted].length - 1];
}
/**
* Returns all the webhooks which should be created for the give workflow
*
@ -132,7 +108,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
// Prepare everything that is needed to run the workflow
const credentials = await WorkflowCredentials(workflowData.nodes);
const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials);
const additionalData = await WorkflowExecuteAdditionalData.getBase(credentials);
// Add the Response and Request so that this data can be accessed in the node
additionalData.httpRequest = req;
@ -286,7 +262,7 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo
return undefined;
}
const returnData = getDataLastExecutedNodeData(data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
if (returnData === undefined) {
if (didSendResponse === false) {
responseCallback(null, {

View file

@ -5,6 +5,7 @@ import {
IPushDataExecutionFinished,
IWorkflowBase,
IWorkflowExecutionDataProcess,
NodeTypes,
Push,
ResponseHelper,
WebhookHelpers,
@ -13,17 +14,25 @@ import {
import {
UserSettings,
WorkflowExecute,
} from 'n8n-core';
import {
IDataObject,
IExecuteData,
INode,
INodeParameters,
INodeExecutionData,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowCredentials,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
Workflow,
WorkflowExecuteMode,
WorkflowHooks,
} from 'n8n-workflow';
import * as config from '../config';
@ -35,15 +44,10 @@ import * as config from '../config';
*
* @param {IWorkflowBase} workflowData The workflow which got executed
* @param {IRun} fullRunData The run which produced the error
* @param {WorkflowExecuteMode} mode The mode in which the workflow which did error got started in
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
* @param {string} [executionId] The id the execution got saved as
*/
function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mode: WorkflowExecuteMode, executionId?: string, retryOf?: string): void {
if (mode === 'manual') {
// Do not call error workflow when executed manually
return;
}
// Check if there was an error and if so if an errorWorkflow is set
if (fullRunData.data.resultData.error !== undefined && workflowData.settings !== undefined && workflowData.settings.errorWorkflow) {
const workflowErrorData = {
@ -68,11 +72,12 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo
/**
* Pushes the execution out to all connected clients
*
* @param {WorkflowExecuteMode} mode The mode in which the workflow got started in
* @param {IRun} fullRunData The RunData of the finished execution
* @param {string} executionIdActive The id of the finished execution
* @param {string} [executionIdDb] The database id of finished execution
*/
export function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) {
export function pushExecutionFinished(mode: WorkflowExecuteMode, fullRunData: IRun, executionIdActive: string, executionIdDb?: string, retryOf?: string) {
// Clone the object except the runData. That one is not supposed
// to be send. Because that data got send piece by piece after
// each node which finished executing
@ -101,100 +106,116 @@ export function pushExecutionFinished(fullRunData: IRun, executionIdActive: stri
/**
* Returns the workflow execution hooks
* Returns hook functions to push data to Editor-UI
*
* @param {WorkflowExecuteMode} mode
* @param {IWorkflowBase} workflowData
* @param {string} executionId
* @param {string} [sessionId]
* @param {string} [retryOf]
* @returns {IWorkflowExecuteHooks}
*/
const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, executionId: string, sessionId?: string, retryOf?: string): IWorkflowExecuteHooks => {
function hookFunctionsPush(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [
async (nodeName: string): Promise<void> => {
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
// Push data to session which started workflow before each
// node which starts rendering
if (sessionId === undefined) {
if (this.sessionId === undefined) {
return;
}
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteBefore', {
executionId,
executionId: this.executionId,
nodeName,
}, sessionId);
}, this.sessionId);
},
],
nodeExecuteAfter: [
async (nodeName: string, data: ITaskData): Promise<void> => {
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
// Push data to session which started workflow after each rendered node
if (sessionId === undefined) {
if (this.sessionId === undefined) {
return;
}
const pushInstance = Push.getInstance();
pushInstance.send('nodeExecuteAfter', {
executionId,
executionId: this.executionId,
nodeName,
data,
}, sessionId);
}, this.sessionId);
},
],
workflowExecuteBefore: [
async (): Promise<void> => {
async function (this: WorkflowHooks): Promise<void> {
// Push data to editor-ui once workflow finished
const pushInstance = Push.getInstance();
pushInstance.send('executionStarted', {
executionId,
mode,
executionId: this.executionId,
mode: this.mode,
startedAt: new Date(),
retryOf,
workflowId: workflowData.id as string,
workflowName: workflowData.name,
retryOf: this.retryOf,
workflowId: this.workflowData.id as string,
workflowName: this.workflowData.name,
});
}
],
workflowExecuteAfter: [
async (fullRunData: IRun, newStaticData: IDataObject): Promise<void> => {
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf);
},
]
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*
* @returns {IWorkflowExecuteHooks}
*/
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise<void> {
const isManualMode = [this.mode, parentProcessMode].includes('manual');
try {
if (mode !== 'manual' && WorkflowHelpers.isWorkflowIdValid(workflowData.id as string) === true && newStaticData) {
if (!isManualMode && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id as string) === true && newStaticData) {
// Workflow is saved so update in database
try {
await WorkflowHelpers.saveStaticDataById(workflowData.id as string, newStaticData);
await WorkflowHelpers.saveStaticDataById(this.workflowData.id as string, newStaticData);
} catch (e) {
// TODO: Add proper logging!
console.error(`There was a problem saving the workflow with id "${workflowData.id}" to save changed staticData: ${e.message}`);
console.error(`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: ${e.message}`);
}
}
let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean;
if (workflowData.settings !== undefined && workflowData.settings.saveManualExecutions !== undefined) {
if (this.workflowData.settings !== undefined && this.workflowData.settings.saveManualExecutions !== undefined) {
// Apply to workflow override
saveManualExecutions = workflowData.settings.saveManualExecutions as boolean;
saveManualExecutions = this.workflowData.settings.saveManualExecutions as boolean;
}
if (mode === 'manual' && saveManualExecutions === false) {
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf);
if (isManualMode && saveManualExecutions === false) {
return;
}
// Check config to know if execution should be saved or not
let saveDataErrorExecution = config.get('executions.saveDataOnError') as string;
let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string;
if (workflowData.settings !== undefined) {
saveDataErrorExecution = (workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution = (workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution;
if (this.workflowData.settings !== undefined) {
saveDataErrorExecution = (this.workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution;
saveDataSuccessExecution = (this.workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution;
}
const workflowDidSucceed = !fullRunData.data.resultData.error;
if (workflowDidSucceed === true && saveDataSuccessExecution === 'none' ||
workflowDidSucceed === false && saveDataErrorExecution === 'none'
) {
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf);
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
return;
}
@ -204,15 +225,15 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution
finished: fullRunData.finished ? fullRunData.finished : false,
startedAt: fullRunData.startedAt,
stoppedAt: fullRunData.stoppedAt,
workflowData,
workflowData: this.workflowData,
};
if (retryOf !== undefined) {
fullExecutionData.retryOf = retryOf.toString();
if (this.retryOf !== undefined) {
fullExecutionData.retryOf = this.retryOf.toString();
}
if (workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(workflowData.id.toString()) === true) {
fullExecutionData.workflowId = workflowData.id.toString();
if (this.workflowData.id !== undefined && WorkflowHelpers.isWorkflowIdValid(this.workflowData.id.toString()) === true) {
fullExecutionData.workflowId = this.workflowData.id.toString();
}
const executionData = ResponseHelper.flattenExecutionData(fullExecutionData);
@ -220,33 +241,133 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, execution
// Save the Execution in DB
const executionResult = await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
if (fullRunData.finished === true && retryOf !== undefined) {
if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution!.save(executionData as IExecutionFlattedDb);
await Db.collections.Execution!.update(retryOf, { retrySuccessId: executionResult.id });
await Db.collections.Execution!.update(this.retryOf, { retrySuccessId: executionResult.id });
}
pushExecutionFinished(fullRunData, executionId, executionResult.id as string, retryOf);
executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined, retryOf);
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, executionResult ? executionResult.id as string : undefined, this.retryOf);
}
} catch (error) {
pushExecutionFinished(fullRunData, executionId, undefined, retryOf);
executeErrorWorkflow(workflowData, fullRunData, mode, undefined, retryOf);
if (!isManualMode) {
executeErrorWorkflow(this.workflowData, fullRunData, this.mode, undefined, this.retryOf);
}
}
},
]
};
}
/**
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]): Promise<Array<INodeExecutionData[] | null>> {
const mode = 'integrated';
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
const workflowData = await Db.collections!.Workflow!.findOne(workflowId);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowId}" does not exist.`);
}
const nodeTypes = NodeTypes();
const workflow = new Workflow(workflowId as string | undefined, workflowData!.nodes, workflowData!.connections, workflowData!.active, nodeTypes, workflowData!.staticData);
// Does not get used so set it simply to empty string
const executionId = '';
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(additionalData.credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData, { parentProcessMode: additionalData.hooks!.mode });
// Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start'];
let startNode: INode | undefined;
for (const node of workflowData!.nodes) {
if (requiredNodeTypes.includes(node.type)) {
startNode = node;
break;
}
}
if (startNode === undefined) {
// If the workflow does not contain a start-node we can not know what
// should be executed and with what data to start.
throw new Error(`The workflow does not contain a "Start" node and can so not be executed.`);
}
// Always start with empty data if no inputData got supplied
inputData = inputData || [
{
json: {}
}
];
// Initialize the incoming data
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push(
{
node: startNode,
data: {
main: [inputData],
},
},
);
const runExecutionData: IRunExecutionData = {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
};
// Execute the workflow
const workflowExecute = new WorkflowExecute(additionalDataIntegrated, mode, runExecutionData);
const data = await workflowExecute.processRunExecutionData(workflow);
if (data.finished === true) {
// Workflow did finish successfully
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
} else {
// Workflow did fail
const error = new Error(data.data.resultData.error!.message);
error.stack = data.data.resultData.error!.stack;
throw error;
}
}
/**
* Returns the base additional data without webhooks
*
* @export
* @param {WorkflowExecuteMode} mode
* @param {IWorkflowCredentials} credentials
* @param {INodeParameters[]} [currentNodeParameters=[]]
* @returns {Promise<IWorkflowExecuteAdditionalData>}
*/
export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowCredentials, currentNodeParameters: INodeParameters[] = []): Promise<IWorkflowExecuteAdditionalData> {
export async function getBase(credentials: IWorkflowCredentials, currentNodeParameters: INodeParameters[] = []): Promise<IWorkflowExecuteAdditionalData> {
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const timezone = config.get('generic.timezone') as string;
@ -261,6 +382,8 @@ export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowC
return {
credentials,
encryptionKey,
executeWorkflow,
restApiUrl: urlBaseWebhook + config.get('endpoints.rest') as string,
timezone,
webhookBaseUrl,
webhookTestBaseUrl,
@ -270,13 +393,30 @@ export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowC
/**
* Returns the workflow hooks
* Returns WorkflowHooks instance for running integrated workflows
* (Workflows which get started inside of another workflow)
*/
export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
optionalParameters = optionalParameters || {};
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}
/**
* Returns WorkflowHooks instance for running the main workflow
*
* @export
* @param {IWorkflowExecutionDataProcess} data
* @param {string} executionId
* @returns {IWorkflowExecuteHooks}
* @returns {WorkflowHooks}
*/
export function getHookMethods(data: IWorkflowExecutionDataProcess, executionId: string): IWorkflowExecuteHooks {
return hooks(data.executionMode, data.workflowData, executionId, data.sessionId, data.retryOf as string | undefined);
export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string): WorkflowHooks {
const hookFunctions = hookFunctionsSave();
const pushFunctions = hookFunctionsPush();
for (const key of Object.keys(pushFunctions)) {
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
}
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string});
}

View file

@ -1,5 +1,6 @@
import {
Db,
ITransferNodeTypes,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
NodeTypes,
@ -11,7 +12,9 @@ import {
IDataObject,
IExecuteData,
INode,
IRun,
IRunExecutionData,
ITaskData,
Workflow,
} from 'n8n-workflow';
@ -19,6 +22,31 @@ import * as config from '../config';
const ERROR_TRIGGER_TYPE = config.get('nodes.errorTriggerType') as string;
/**
* Returns the data of the last executed node
*
* @export
* @param {IRun} inputData
* @returns {(ITaskData | undefined)}
*/
export function getDataLastExecutedNodeData(inputData: IRun): ITaskData | undefined {
const runData = inputData.data.resultData.runData;
const lastNodeExecuted = inputData.data.resultData.lastNodeExecuted;
if (lastNodeExecuted === undefined) {
return undefined;
}
if (runData[lastNodeExecuted] === undefined) {
return undefined;
}
return runData[lastNodeExecuted][runData[lastNodeExecuted].length - 1];
}
/**
* Returns if the given id is a valid workflow id
*
@ -129,6 +157,89 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData
/**
* Returns all the defined NodeTypes
*
* @export
* @returns {ITransferNodeTypes}
*/
export function getAllNodeTypeData(): ITransferNodeTypes {
const nodeTypes = NodeTypes();
// Get the data of all thenode types that they
// can be loaded again in the process
const returnData: ITransferNodeTypes = {};
for (const nodeTypeName of Object.keys(nodeTypes.nodeTypes)) {
if (nodeTypes.nodeTypes[nodeTypeName] === undefined) {
throw new Error(`The NodeType "${nodeTypeName}" could not be found!`);
}
returnData[nodeTypeName] = {
className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name,
sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath,
};
}
return returnData;
}
/**
* Returns the data of the node types that are needed
* to execute the given nodes
*
* @export
* @param {INode[]} nodes
* @returns {ITransferNodeTypes}
*/
export function getNodeTypeData(nodes: INode[]): ITransferNodeTypes {
const nodeTypes = NodeTypes();
// Check which node-types have to be loaded
const neededNodeTypes = getNeededNodeTypes(nodes);
// Get all the data of the needed node types that they
// can be loaded again in the process
const returnData: ITransferNodeTypes = {};
for (const nodeTypeName of neededNodeTypes) {
if (nodeTypes.nodeTypes[nodeTypeName] === undefined) {
throw new Error(`The NodeType "${nodeTypeName}" could not be found!`);
}
returnData[nodeTypeName] = {
className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name,
sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath,
};
}
return returnData;
}
/**
* Returns the names of the NodeTypes which are are needed
* to execute the gives nodes
*
* @export
* @param {INode[]} nodes
* @returns {string[]}
*/
export function getNeededNodeTypes(nodes: INode[]): string[] {
// Check which node-types have to be loaded
const neededNodeTypes: string[] = [];
for (const node of nodes) {
if (!neededNodeTypes.includes(node.type)) {
neededNodeTypes.push(node.type);
}
}
return neededNodeTypes;
}
/**
* Saves the static data if it changed
*

View file

@ -1,11 +1,9 @@
import {
ActiveExecutions,
IProcessMessageDataHook,
ITransferNodeTypes,
IWorkflowExecutionDataProcess,
IWorkflowExecutionDataProcessWithExecution,
NodeTypes,
Push,
WorkflowExecuteAdditionalData,
WorkflowHelpers,
@ -17,9 +15,8 @@ import {
import {
IExecutionError,
INode,
IRun,
IWorkflowExecuteHooks,
WorkflowHooks,
WorkflowExecuteMode,
} from 'n8n-workflow';
@ -38,70 +35,15 @@ export class WorkflowRunner {
}
/**
* Returns the data of the node types that are needed
* to execute the given nodes
*
* @param {INode[]} nodes
* @returns {ITransferNodeTypes}
* @memberof WorkflowRunner
*/
getNodeTypeData(nodes: INode[]): ITransferNodeTypes {
const nodeTypes = NodeTypes();
// Check which node-types have to be loaded
const neededNodeTypes: string[] = [];
for (const node of nodes) {
if (!neededNodeTypes.includes(node.type)) {
neededNodeTypes.push(node.type);
}
}
// Get all the data of the needed node types that they
// can be loaded again in the process
const returnData: ITransferNodeTypes = {};
for (const nodeTypeName of neededNodeTypes) {
if (nodeTypes.nodeTypes[nodeTypeName] === undefined) {
throw new Error(`The NodeType "${nodeTypeName}" could not be found!`);
}
returnData[nodeTypeName] = {
className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name,
sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath,
};
}
return returnData;
}
/**
* The process did send a hook message so execute the appropiate hook
*
* @param {IWorkflowExecuteHooks} hookFunctions
* @param {WorkflowHooks} workflowHooks
* @param {IProcessMessageDataHook} hookData
* @memberof WorkflowRunner
*/
processHookMessage(hookFunctions: IWorkflowExecuteHooks, hookData: IProcessMessageDataHook) {
if (hookFunctions[hookData.hook] !== undefined && Array.isArray(hookFunctions[hookData.hook])) {
for (const hookFunction of hookFunctions[hookData.hook]!) {
// TODO: Not sure if that is 100% correct or something is still missing like to wait
hookFunction.apply(this, hookData.parameters)
.catch((error: Error) => {
// Catch all errors here because when "executeHook" gets called
// we have the most time no "await" and so the errors would so
// not be uncaught by anything.
// TODO: Add proper logging
console.error(`There was a problem executing hook: "${hookData.hook}"`);
console.error('Parameters:');
console.error(hookData.parameters);
console.error('Error:');
console.error(error);
});
}
}
processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) {
workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
}
@ -133,7 +75,7 @@ export class WorkflowRunner {
this.activeExecutions.remove(executionId, fullRunData);
// Also send to Editor UI
WorkflowExecuteAdditionalData.pushExecutionFinished(fullRunData, executionId);
WorkflowExecuteAdditionalData.pushExecutionFinished(executionMode, fullRunData, executionId);
}
@ -157,12 +99,30 @@ export class WorkflowRunner {
// Register the active execution
const executionId = this.activeExecutions.add(subprocess, data);
const nodeTypeData = this.getNodeTypeData(data.workflowData.nodes);
// Check if workflow contains a "executeWorkflow" Node as in this
// case we can not know which nodeTypes will be needed and so have
// to load all of them in the workflowRunnerProcess
let loadAllNodeTypes = false;
for (const node of data.workflowData.nodes) {
if (node.type === 'n8n-nodes-base.executeWorkflow') {
loadAllNodeTypes = true;
break;
}
}
let nodeTypeData: ITransferNodeTypes;
if (loadAllNodeTypes === true) {
// Supply all nodeTypes
nodeTypeData = WorkflowHelpers.getAllNodeTypeData();
} else {
// Supply only nodeTypes which the workflow needs
nodeTypeData = WorkflowHelpers.getNodeTypeData(data.workflowData.nodes);
}
(data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId;
(data as unknown as IWorkflowExecutionDataProcessWithExecution).nodeTypeData = nodeTypeData;
const hookFunctions = WorkflowExecuteAdditionalData.getHookMethods(data, executionId);
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
@ -178,7 +138,7 @@ export class WorkflowRunner {
this.processError(executionError, startedAt, data.executionMode, executionId);
} else if (message.type === 'processHook') {
this.processHookMessage(hookFunctions, message.data as IProcessMessageDataHook);
this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
}
});

View file

@ -1,6 +1,5 @@
import {
IProcessMessageDataHook,
IWorkflowExecutionDataProcessWithExecution,
NodeTypes,
WorkflowExecuteAdditionalData,
@ -18,10 +17,9 @@ import {
INodeTypeData,
IRun,
ITaskData,
IWorkflowExecuteHooks,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
import { ChildProcess } from 'child_process';
export class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined;
@ -61,10 +59,9 @@ export class WorkflowRunnerProcess {
await nodeTypes.init(nodeTypesData);
this.workflow = new Workflow(this.data.workflowData.id as string | undefined, this.data.workflowData!.nodes, this.data.workflowData!.connections, this.data.workflowData!.active, nodeTypes, this.data.workflowData!.staticData);
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.executionMode, this.data.credentials);
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials);
additionalData.hooks = this.getProcessForwardHooks();
if (this.data.executionData !== undefined) {
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData);
return this.workflowExecute.processRunExecutionData(this.workflow);
@ -111,11 +108,10 @@ export class WorkflowRunnerProcess {
* the parent process where they then can be executed with access
* to database and to PushService
*
* @param {ChildProcess} process
* @returns
*/
getProcessForwardHooks(): IWorkflowExecuteHooks {
return {
getProcessForwardHooks(): WorkflowHooks {
const hookFunctions = {
nodeExecuteBefore: [
async (nodeName: string): Promise<void> => {
this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]);
@ -137,6 +133,8 @@ export class WorkflowRunnerProcess {
},
]
};
return new WorkflowHooks(hookFunctions, this.data!.executionMode, this.data!.executionId, this.data!.workflowData, { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string });
}
}

View file

@ -341,6 +341,9 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi
return getNodeParameter(workflow, runExecutionData, runIndex, connectionInputData, node, parameterName, itemIndex, fallbackValue);
},
getRestApiUrl: (): string => {
return additionalData.restApiUrl;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
@ -375,6 +378,10 @@ export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, addi
export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
async executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise<any> { // tslint:disable-line:no-any
// return additionalData.executeWorkflow(workflowId, additionalData, inputData);
return additionalData.executeWorkflow(workflowId, additionalData, inputData);
},
getContext(type: string): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
@ -408,6 +415,9 @@ export function getExecuteFunctions(workflow: Workflow, runExecutionData: IRunEx
getMode: (): WorkflowExecuteMode => {
return mode;
},
getRestApiUrl: (): string => {
return additionalData.restApiUrl;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
@ -482,6 +492,9 @@ export function getExecuteSingleFunctions(workflow: Workflow, runExecutionData:
getMode: (): WorkflowExecuteMode => {
return mode;
},
getRestApiUrl: (): string => {
return additionalData.restApiUrl;
},
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
@ -540,6 +553,9 @@ export function getLoadOptionsFunctions(workflow: Workflow, node: INode, additio
getTimezone: (): string => {
return getTimezone(workflow, additionalData);
},
getRestApiUrl: (): string => {
return additionalData.restApiUrl;
},
helpers: {
request: requestPromise,
},

View file

@ -226,25 +226,8 @@ export class WorkflowExecute {
if (this.additionalData.hooks === undefined) {
return;
}
if (this.additionalData.hooks[hookName] === undefined || this.additionalData.hooks[hookName]!.length === 0) {
return;
}
for (const hookFunction of this.additionalData.hooks[hookName]!) {
await hookFunction.apply(this, parameters as [IRun, IWaitingForExecution])
.catch((error) => {
// Catch all errors here because when "executeHook" gets called
// we have the most time no "await" and so the errors would so
// not be caught by anything.
// TODO: Add proper logging
console.error(`There was a problem executing hook: "${hookName}"`);
console.error('Parameters:');
console.error(parameters);
console.error('Error:');
console.error(error);
});
}
return this.additionalData.hooks.executeHookFunctions(hookName, parameters);
}

View file

@ -0,0 +1,75 @@
import { OptionsWithUri } from 'request';
import { IExecuteFunctions } from 'n8n-core';
import {
ILoadOptionsFunctions,
INodeExecutionData,
INodePropertyOptions,
INodeType,
INodeTypeDescription,
} from 'n8n-workflow';
export class ExecuteWorkflow implements INodeType {
description: INodeTypeDescription = {
displayName: 'Execute Workflow',
name: 'executeWorkflow',
icon: 'fa:network-wired',
group: ['transform'],
version: 1,
subtitle: '={{"Workflow: " + $parameter["workflowId"]}}',
description: 'Execute another workflow',
defaults: {
name: 'Execute Workflow',
color: '#ff6d5a',
},
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Workflow',
name: 'workflowId',
type: 'options',
typeOptions: {
loadOptionsMethod: 'getWorkflows',
},
default: '',
required: true,
description: 'The workflow to execute.',
},
]
};
methods = {
loadOptions: {
async getWorkflows(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const options: OptionsWithUri = {
method: 'GET',
uri: this.getRestApiUrl() + '/workflows',
json: true
};
const returnData: INodePropertyOptions[] = [];
const responseData = await this.helpers.request!(options);
for (const workflowData of responseData.data) {
returnData.push({
name: workflowData.name,
value: workflowData.id,
});
}
return returnData;
}
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const workflowId = this.getNodeParameter('workflowId', 0) as string;
const receivedData = await this.executeWorkflow(workflowId, items);
return receivedData;
}
}

View file

@ -94,6 +94,7 @@
"dist/nodes/EmailSend.node.js",
"dist/nodes/ErrorTrigger.node.js",
"dist/nodes/ExecuteCommand.node.js",
"dist/nodes/ExecuteWorkflow.node.js",
"dist/nodes/FileMaker/FileMaker.node.js",
"dist/nodes/Freshdesk/Freshdesk.node.js",
"dist/nodes/Flow/Flow.node.js",

View file

@ -1,4 +1,5 @@
import { Workflow } from './Workflow';
import { WorkflowHooks } from './WorkflowHooks';
import * as express from 'express';
export interface IBinaryData {
@ -149,6 +150,7 @@ export interface IExecuteContextData {
export interface IExecuteFunctions {
executeWorkflow(workflowId: string, inputData?: INodeExecutionData[]): Promise<any>; // tslint:disable-line:no-any
getContext(type: string): IContextObject;
getCredentials(type: string): ICredentialDataDecryptedObject | undefined;
getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[];
@ -156,6 +158,7 @@ export interface IExecuteFunctions {
getNodeParameter(parameterName: string, itemIndex: number, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getWorkflowStaticData(type: string): IDataObject;
getRestApiUrl(): string;
getTimezone(): string;
prepareOutputData(outputData: INodeExecutionData[], outputIndex?: number): Promise<INodeExecutionData[][]>;
helpers: {
@ -170,6 +173,7 @@ export interface IExecuteSingleFunctions {
getInputData(inputIndex?: number, inputName?: string): INodeExecutionData;
getMode(): WorkflowExecuteMode;
getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any
getRestApiUrl(): string;
getTimezone(): string;
getWorkflowDataProxy(): IWorkflowDataProxyData;
getWorkflowStaticData(type: string): IDataObject;
@ -184,6 +188,7 @@ export interface ILoadOptionsFunctions {
getCurrentNodeParameter(parameterName: string): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object | undefined;
getCurrentNodeParameters(): INodeParameters | undefined;
getTimezone(): string;
getRestApiUrl(): string;
helpers: {
[key: string]: ((...args: any[]) => any) | undefined; //tslint:disable-line:no-any
};
@ -208,6 +213,7 @@ export interface ITriggerFunctions {
getCredentials(type: string): ICredentialDataDecryptedObject | undefined;
getMode(): WorkflowExecuteMode;
getNodeParameter(parameterName: string, fallbackValue?: any): NodeParameterValue | INodeParameters | NodeParameterValue[] | INodeParameters[] | object; //tslint:disable-line:no-any
getRestApiUrl(): string;
getTimezone(): string;
getWorkflowStaticData(type: string): IDataObject;
helpers: {
@ -574,6 +580,18 @@ export interface IWaitingForExecution {
}
export interface IWorkflowBase {
id?: number | string | any; // tslint:disable-line:no-any
name: string;
active: boolean;
createdAt: Date;
updatedAt: Date;
nodes: INode[];
connections: IConnections;
settings?: IWorkflowSettings;
staticData?: IDataObject;
}
export interface IWorkflowCredentials {
// Credential type
[key: string]: {
@ -582,6 +600,7 @@ export interface IWorkflowCredentials {
};
}
export interface IWorkflowExecuteHooks {
[key: string]: Array<((...args: any[]) => Promise<void>)> | undefined; // tslint:disable-line:no-any
nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise<void>)>;
@ -593,16 +612,26 @@ export interface IWorkflowExecuteHooks {
export interface IWorkflowExecuteAdditionalData {
credentials: IWorkflowCredentials;
encryptionKey: string;
hooks?: IWorkflowExecuteHooks;
executeWorkflow: (workflowId: string, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]) => Promise<any>; // tslint:disable-line:no-any
// hooks?: IWorkflowExecuteHooks;
hooks?: WorkflowHooks;
httpResponse?: express.Response;
httpRequest?: express.Request;
restApiUrl: string;
timezone: string;
webhookBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters? : INodeParameters[];
}
export type WorkflowExecuteMode = 'cli' | 'error' | 'internal' | 'manual' | 'retry' | 'trigger' | 'webhook';
export type WorkflowExecuteMode = 'cli' | 'error' | 'integrated' | 'internal' | 'manual' | 'retry' | 'trigger' | 'webhook';
export interface IWorkflowHooksOptionalParameters {
parentProcessMode?: string;
retryOf?: string;
sessionId?: string;
}
export interface IWorkflowSettings {
[key: string]: IDataObject | string | number | boolean | undefined;

View file

@ -0,0 +1,48 @@
import {
IWorkflowBase,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode,
} from './Interfaces';
export class WorkflowHooks {
mode: WorkflowExecuteMode;
workflowData: IWorkflowBase;
executionId: string;
sessionId?: string;
retryOf?: string;
hookFunctions: IWorkflowExecuteHooks;
constructor(hookFunctions: IWorkflowExecuteHooks, mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters) {
optionalParameters = optionalParameters || {};
this.hookFunctions = hookFunctions;
this.mode = mode;
this.executionId = executionId;
this.workflowData = workflowData;
this.sessionId = optionalParameters.sessionId;
this.retryOf = optionalParameters.retryOf;
}
async executeHookFunctions(hookName: string, parameters: any[]) { // tslint:disable-line:no-any
if (this.hookFunctions[hookName] !== undefined && Array.isArray(this.hookFunctions[hookName])) {
for (const hookFunction of this.hookFunctions[hookName]!) {
await hookFunction.apply(this, parameters)
.catch((error: Error) => {
// Catch all errors here because when "executeHook" gets called
// we have the most time no "await" and so the errors would so
// not be uncaught by anything.
// TODO: Add proper logging
console.error(`There was a problem executing hook: "${hookName}"`);
console.error('Parameters:');
console.error(parameters);
console.error('Error:');
console.error(error);
});
}
}
}
}

View file

@ -1,6 +1,7 @@
export * from './Interfaces';
export * from './Workflow';
export * from './WorkflowDataProxy';
export * from './WorkflowHooks';
import * as NodeHelpers from './NodeHelpers';
import * as ObservableObject from './ObservableObject';