mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
⚡ Add preExecuteHooks (#1151)
* ⚡ Save initital data on hook error * 🚧 update function interface * 🚧 response webhook with error, 🐛 fix adding preExecutionHooks to hooks * 🔥 remove execute hook * ⚡ execute preExecute hooks on integrated workflows Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
parent
8f8603320a
commit
f2666e92ff
|
@ -15,7 +15,6 @@ import {
|
|||
Db,
|
||||
ExternalHooks,
|
||||
GenericHelpers,
|
||||
IExecutionsCurrentSummary,
|
||||
LoadNodesAndCredentials,
|
||||
NodeTypes,
|
||||
Server,
|
||||
|
|
|
@ -222,7 +222,7 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
|
|||
return;
|
||||
}
|
||||
|
||||
// Now that we know that the workflow should run we can return the default respons
|
||||
// Now that we know that the workflow should run we can return the default response
|
||||
// directly if responseMode it set to "onReceived" and a respone should be sent
|
||||
if (responseMode === 'onReceived' && didSendResponse === false) {
|
||||
// Return response directly and do not wait for the workflow to finish
|
||||
|
@ -302,6 +302,19 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
|
|||
}
|
||||
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
if(data.data.resultData.error || returnData?.error !== undefined) {
|
||||
if (didSendResponse === false) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Workflow did error.',
|
||||
},
|
||||
responseCode: 500,
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
}
|
||||
|
||||
if (returnData === undefined) {
|
||||
if (didSendResponse === false) {
|
||||
responseCallback(null, {
|
||||
|
@ -313,17 +326,6 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] {
|
|||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
} else if (returnData.error !== undefined) {
|
||||
if (didSendResponse === false) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
message: 'Workflow did error.',
|
||||
},
|
||||
responseCode: 500,
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
}
|
||||
|
||||
const responseData = workflow.expression.getSimpleParameterValue(workflowStartNode, webhookData.webhookDescription['responseData'], 'firstEntryJson');
|
||||
|
|
|
@ -202,6 +202,18 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
|
|||
}
|
||||
|
||||
|
||||
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
|
||||
const externalHooks = ExternalHooks();
|
||||
|
||||
return {
|
||||
workflowExecuteBefore: [
|
||||
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
|
||||
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns hook functions to save workflow execution and call error workflow
|
||||
*
|
||||
|
@ -337,7 +349,6 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi
|
|||
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
await externalHooks.run('workflow.execute', [workflowData, mode]);
|
||||
|
||||
const nodeTypes = NodeTypes();
|
||||
|
||||
|
@ -462,6 +473,10 @@ export async function getBase(credentials: IWorkflowCredentials, currentNodePara
|
|||
export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, optionalParameters?: IWorkflowHooksOptionalParameters): WorkflowHooks {
|
||||
optionalParameters = optionalParameters || {};
|
||||
const hookFunctions = hookFunctionsSave(optionalParameters.parentProcessMode);
|
||||
const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode);
|
||||
for (const key of Object.keys(preExecuteFunctions)) {
|
||||
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
|
||||
}
|
||||
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
|
||||
}
|
||||
|
||||
|
@ -474,12 +489,19 @@ export function getWorkflowHooksIntegrated(mode: WorkflowExecuteMode, executionI
|
|||
* @param {string} executionId
|
||||
* @returns {WorkflowHooks}
|
||||
*/
|
||||
export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string): WorkflowHooks {
|
||||
export function getWorkflowHooksMain(data: IWorkflowExecutionDataProcess, executionId: string, isMainProcess = false): WorkflowHooks {
|
||||
const hookFunctions = hookFunctionsSave();
|
||||
const pushFunctions = hookFunctionsPush();
|
||||
for (const key of Object.keys(pushFunctions)) {
|
||||
hookFunctions[key]!.push.apply(hookFunctions[key], pushFunctions[key]);
|
||||
}
|
||||
|
||||
if (isMainProcess) {
|
||||
const preExecuteFunctions = hookFunctionsPreExecute();
|
||||
for (const key of Object.keys(preExecuteFunctions)) {
|
||||
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
|
||||
}
|
||||
}
|
||||
|
||||
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { sessionId: data.sessionId, retryOf: data.retryOf as string});
|
||||
}
|
||||
|
|
|
@ -100,9 +100,6 @@ export class WorkflowRunner {
|
|||
* @memberof WorkflowRunner
|
||||
*/
|
||||
async run(data: IWorkflowExecutionDataProcess, loadStaticData?: boolean): Promise<string> {
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.run('workflow.execute', [data.workflowData, data.executionMode]);
|
||||
|
||||
const executionsProcess = config.get('executions.process') as string;
|
||||
|
||||
let executionId: string;
|
||||
|
@ -112,6 +109,7 @@ export class WorkflowRunner {
|
|||
executionId = await this.runSubprocess(data, loadStaticData);
|
||||
}
|
||||
|
||||
const externalHooks = ExternalHooks();
|
||||
if (externalHooks.exists('workflow.postExecute')) {
|
||||
this.activeExecutions.getPostExecutePromise(executionId)
|
||||
.then(async (executionData) => {
|
||||
|
@ -148,7 +146,7 @@ export class WorkflowRunner {
|
|||
// Register the active execution
|
||||
const executionId = this.activeExecutions.add(data, undefined);
|
||||
|
||||
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
|
||||
additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId, true);
|
||||
|
||||
let workflowExecution: PCancelable<IRun>;
|
||||
if (data.executionData !== undefined) {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import {
|
||||
CredentialsOverwrites,
|
||||
CredentialTypes,
|
||||
ExternalHooks,
|
||||
IWorkflowExecutionDataProcessWithExecution,
|
||||
NodeTypes,
|
||||
WorkflowExecuteAdditionalData,
|
||||
|
@ -19,6 +20,7 @@ import {
|
|||
INodeTypeData,
|
||||
IRun,
|
||||
ITaskData,
|
||||
IWorkflowExecuteHooks,
|
||||
Workflow,
|
||||
WorkflowHooks,
|
||||
} from 'n8n-workflow';
|
||||
|
@ -68,6 +70,10 @@ export class WorkflowRunnerProcess {
|
|||
const credentialsOverwrites = CredentialsOverwrites();
|
||||
await credentialsOverwrites.init(inputData.credentialsOverwrite);
|
||||
|
||||
// Load all external hooks
|
||||
const externalHooks = ExternalHooks();
|
||||
await externalHooks.init();
|
||||
|
||||
this.workflow = new Workflow({ id: this.data.workflowData.id as string | undefined, name: this.data.workflowData.name, nodes: this.data.workflowData!.nodes, connections: this.data.workflowData!.connections, active: this.data.workflowData!.active, nodeTypes, staticData: this.data.workflowData!.staticData, settings: this.data.workflowData!.settings});
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials);
|
||||
additionalData.hooks = this.getProcessForwardHooks();
|
||||
|
@ -121,7 +127,7 @@ export class WorkflowRunnerProcess {
|
|||
* @returns
|
||||
*/
|
||||
getProcessForwardHooks(): WorkflowHooks {
|
||||
const hookFunctions = {
|
||||
const hookFunctions: IWorkflowExecuteHooks = {
|
||||
nodeExecuteBefore: [
|
||||
async (nodeName: string): Promise<void> => {
|
||||
this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]);
|
||||
|
@ -144,6 +150,11 @@ export class WorkflowRunnerProcess {
|
|||
],
|
||||
};
|
||||
|
||||
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
|
||||
for (const key of Object.keys(preExecuteFunctions)) {
|
||||
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
|
||||
}
|
||||
|
||||
return new WorkflowHooks(hookFunctions, this.data!.executionMode, this.data!.executionId, this.data!.workflowData, { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string });
|
||||
}
|
||||
|
||||
|
|
|
@ -468,7 +468,6 @@ export class WorkflowExecute {
|
|||
this.runExecutionData.startData = {};
|
||||
}
|
||||
|
||||
this.executeHook('workflowExecuteBefore', []);
|
||||
|
||||
let currentExecutionTry = '';
|
||||
let lastExecutionTry = '';
|
||||
|
@ -482,6 +481,35 @@ export class WorkflowExecute {
|
|||
});
|
||||
|
||||
const returnPromise = (async () => {
|
||||
try {
|
||||
await this.executeHook('workflowExecuteBefore', [workflow]);
|
||||
} catch (error) {
|
||||
// Set the error that it can be saved correctly
|
||||
executionError = {
|
||||
message: error.message,
|
||||
stack: error.stack,
|
||||
};
|
||||
|
||||
// Set the incoming data of the node that it can be saved correctly
|
||||
executionData = this.runExecutionData.executionData!.nodeExecutionStack[0] as IExecuteData;
|
||||
this.runExecutionData.resultData = {
|
||||
runData: {
|
||||
[executionData.node.name]: [
|
||||
{
|
||||
startTime,
|
||||
executionTime: (new Date().getTime()) - startTime,
|
||||
data: ({
|
||||
'main': executionData.data.main,
|
||||
} as ITaskDataConnections),
|
||||
},
|
||||
],
|
||||
},
|
||||
lastNodeExecuted: executionData.node.name,
|
||||
error: executionError,
|
||||
};
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
executionLoop:
|
||||
while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
|
||||
|
|
|
@ -716,7 +716,7 @@ export interface IWorkflowExecuteHooks {
|
|||
nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise<void>)>;
|
||||
nodeExecuteBefore?: Array<((nodeName: string) => Promise<void>)>;
|
||||
workflowExecuteAfter?: Array<((data: IRun, newStaticData: IDataObject) => Promise<void>)>;
|
||||
workflowExecuteBefore?: Array<(() => Promise<void>)>;
|
||||
workflowExecuteBefore?: Array<((workflow: Workflow, data: IRunExecutionData) => Promise<void>)>;
|
||||
}
|
||||
|
||||
export interface IWorkflowExecuteAdditionalData {
|
||||
|
|
|
@ -28,19 +28,9 @@ export class WorkflowHooks {
|
|||
async executeHookFunctions(hookName: string, parameters: any[]) { // tslint:disable-line:no-any
|
||||
if (this.hookFunctions[hookName] !== undefined && Array.isArray(this.hookFunctions[hookName])) {
|
||||
for (const hookFunction of this.hookFunctions[hookName]!) {
|
||||
await hookFunction.apply(this, parameters)
|
||||
.catch((error: Error) => {
|
||||
// Catch all errors here because when "executeHook" gets called
|
||||
// we have the most time no "await" and so the errors would so
|
||||
// not be uncaught by anything.
|
||||
|
||||
// TODO: Add proper logging
|
||||
console.error(`There was a problem executing hook: "${hookName}"`);
|
||||
console.error('Parameters:');
|
||||
console.error(parameters);
|
||||
console.error('Error:');
|
||||
console.error(error);
|
||||
});
|
||||
// TODO: As catch got removed we should make sure that we catch errors
|
||||
// where hooks get called
|
||||
await hookFunction.apply(this, parameters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue