mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 21:37:32 -08:00
fix(core): Use AbortController to notify nodes to abort execution (#6141)
and add support for cancelling ongoing operations inside a node. --------- Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
parent
0ec67dabf7
commit
d2c18c5727
|
@ -51,10 +51,6 @@ describe('Execution', () => {
|
|||
.canvasNodeByName('Manual')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Set')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
|
@ -112,10 +108,6 @@ describe('Execution', () => {
|
|||
.canvasNodeByName('Manual')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
|
||||
|
@ -128,8 +120,8 @@ describe('Execution', () => {
|
|||
workflowPage.getters.clearExecutionDataButton().click();
|
||||
workflowPage.getters.clearExecutionDataButton().should('not.exist');
|
||||
|
||||
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
|
||||
workflowPage.getters.successToast().should('be.visible');
|
||||
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
|
||||
workflowPage.getters.warningToast().should('be.visible');
|
||||
});
|
||||
|
||||
it('should test webhook workflow', () => {
|
||||
|
@ -191,10 +183,6 @@ describe('Execution', () => {
|
|||
.canvasNodeByName('Webhook')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Set')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
|
@ -267,10 +255,6 @@ describe('Execution', () => {
|
|||
.canvasNodeByName('Webhook')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-check'))
|
||||
.should('exist');
|
||||
workflowPage.getters
|
||||
.canvasNodeByName('Wait')
|
||||
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
|
||||
|
@ -283,7 +267,7 @@ describe('Execution', () => {
|
|||
workflowPage.getters.clearExecutionDataButton().click();
|
||||
workflowPage.getters.clearExecutionDataButton().should('not.exist');
|
||||
|
||||
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
|
||||
workflowPage.getters.successToast().should('be.visible');
|
||||
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
|
||||
workflowPage.getters.warningToast().should('be.visible');
|
||||
});
|
||||
});
|
||||
|
|
|
@ -48,6 +48,7 @@ export class WorkflowPage extends BasePage {
|
|||
return cy.get(this.getters.getEndpointSelector('plus', nodeName, index));
|
||||
},
|
||||
successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
|
||||
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
|
||||
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'),
|
||||
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
|
||||
workflowMenu: () => cy.getByTestId('workflow-menu'),
|
||||
|
|
|
@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
|
|||
prepareOutputData: async (outputData) => [outputData],
|
||||
});
|
||||
|
||||
const executionCancellationFunctions = (
|
||||
abortSignal?: AbortSignal,
|
||||
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
|
||||
getExecutionCancelSignal: () => abortSignal,
|
||||
onExecutionCancellation: (handler) => {
|
||||
const fn = () => {
|
||||
abortSignal?.removeEventListener('abort', fn);
|
||||
handler();
|
||||
};
|
||||
abortSignal?.addEventListener('abort', fn);
|
||||
},
|
||||
});
|
||||
|
||||
const getRequestHelperFunctions = (
|
||||
workflow: Workflow,
|
||||
node: INode,
|
||||
|
@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
executeData: IExecuteData,
|
||||
mode: WorkflowExecuteMode,
|
||||
abortSignal?: AbortSignal,
|
||||
): IExecuteFunctions {
|
||||
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
|
||||
return {
|
||||
...getCommonWorkflowFunctions(workflow, node, additionalData),
|
||||
...executionCancellationFunctions(abortSignal),
|
||||
getMode: () => mode,
|
||||
getCredentials: async (type, itemIndex) =>
|
||||
getCredentials(
|
||||
|
@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
executeData: IExecuteData,
|
||||
mode: WorkflowExecuteMode,
|
||||
abortSignal?: AbortSignal,
|
||||
): IExecuteSingleFunctions {
|
||||
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
|
||||
return {
|
||||
...getCommonWorkflowFunctions(workflow, node, additionalData),
|
||||
...executionCancellationFunctions(abortSignal),
|
||||
continueOnFail: () => continueOnFail(node),
|
||||
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
|
||||
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
/* eslint-disable @typescript-eslint/prefer-optional-chain */
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
|
||||
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
|
||||
import { setMaxListeners } from 'events';
|
||||
import PCancelable from 'p-cancelable';
|
||||
|
||||
import type {
|
||||
|
@ -44,23 +43,14 @@ import get from 'lodash/get';
|
|||
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
|
||||
|
||||
export class WorkflowExecute {
|
||||
runExecutionData: IRunExecutionData;
|
||||
private status: ExecutionStatus = 'new';
|
||||
|
||||
private additionalData: IWorkflowExecuteAdditionalData;
|
||||
|
||||
private mode: WorkflowExecuteMode;
|
||||
|
||||
private status: ExecutionStatus;
|
||||
private readonly abortController = new AbortController();
|
||||
|
||||
constructor(
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
mode: WorkflowExecuteMode,
|
||||
runExecutionData?: IRunExecutionData,
|
||||
) {
|
||||
this.additionalData = additionalData;
|
||||
this.mode = mode;
|
||||
this.status = 'new';
|
||||
this.runExecutionData = runExecutionData || {
|
||||
private readonly additionalData: IWorkflowExecuteAdditionalData,
|
||||
private readonly mode: WorkflowExecuteMode,
|
||||
private runExecutionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
|
@ -73,8 +63,8 @@ export class WorkflowExecute {
|
|||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
}
|
||||
},
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Executes the given workflow.
|
||||
|
@ -830,11 +820,16 @@ export class WorkflowExecute {
|
|||
let closeFunction: Promise<void> | undefined;
|
||||
|
||||
return new PCancelable(async (resolve, reject, onCancel) => {
|
||||
let gotCancel = false;
|
||||
// Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
|
||||
setMaxListeners(Infinity, this.abortController.signal);
|
||||
|
||||
onCancel.shouldReject = false;
|
||||
onCancel(() => {
|
||||
gotCancel = true;
|
||||
this.status = 'canceled';
|
||||
this.abortController.abort();
|
||||
const fullRunData = this.getFullRunData(startedAt);
|
||||
void this.executeHook('workflowExecuteAfter', [fullRunData]);
|
||||
setTimeout(() => resolve(fullRunData), 10);
|
||||
});
|
||||
|
||||
const returnPromise = (async () => {
|
||||
|
@ -881,10 +876,10 @@ export class WorkflowExecute {
|
|||
this.additionalData.executionTimeoutTimestamp !== undefined &&
|
||||
Date.now() >= this.additionalData.executionTimeoutTimestamp
|
||||
) {
|
||||
gotCancel = true;
|
||||
this.status = 'canceled';
|
||||
}
|
||||
|
||||
if (gotCancel) {
|
||||
if (this.status === 'canceled') {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1014,9 +1009,6 @@ export class WorkflowExecute {
|
|||
}
|
||||
|
||||
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
|
||||
if (gotCancel) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (tryIndex !== 0) {
|
||||
// Reset executionError from previous error try
|
||||
|
@ -1052,6 +1044,7 @@ export class WorkflowExecute {
|
|||
this.additionalData,
|
||||
NodeExecuteFunctions,
|
||||
this.mode,
|
||||
this.abortController.signal,
|
||||
);
|
||||
nodeSuccessData = runNodeData.data;
|
||||
|
||||
|
@ -1089,6 +1082,7 @@ export class WorkflowExecute {
|
|||
this.additionalData,
|
||||
executionData,
|
||||
this.mode,
|
||||
this.abortController.signal,
|
||||
);
|
||||
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
|
||||
|
||||
|
@ -1644,7 +1638,7 @@ export class WorkflowExecute {
|
|||
return;
|
||||
})()
|
||||
.then(async () => {
|
||||
if (gotCancel && executionError === undefined) {
|
||||
if (this.status === 'canceled' && executionError === undefined) {
|
||||
return this.processSuccessExecution(
|
||||
startedAt,
|
||||
workflow,
|
||||
|
|
|
@ -272,7 +272,8 @@ export const pushConnection = defineComponent({
|
|||
return false;
|
||||
}
|
||||
|
||||
if (this.workflowsStore.activeExecutionId !== pushData.executionId) {
|
||||
const { activeExecutionId } = this.workflowsStore;
|
||||
if (activeExecutionId !== pushData.executionId) {
|
||||
// The workflow which did finish execution did either not get started
|
||||
// by this session or we do not have the execution id yet.
|
||||
if (isRetry !== true) {
|
||||
|
@ -285,10 +286,17 @@ export const pushConnection = defineComponent({
|
|||
|
||||
let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);
|
||||
|
||||
if (pushData.data.status === 'crashed') {
|
||||
if (runDataExecuted.status === 'crashed') {
|
||||
runDataExecutedErrorMessage = this.$locale.baseText(
|
||||
'pushConnection.executionFailed.message',
|
||||
);
|
||||
} else if (runDataExecuted.status === 'canceled') {
|
||||
runDataExecutedErrorMessage = this.$locale.baseText(
|
||||
'executionsList.showMessage.stopExecution.message',
|
||||
{
|
||||
interpolate: { activeExecutionId },
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
|
||||
|
@ -389,7 +397,11 @@ export const pushConnection = defineComponent({
|
|||
});
|
||||
} else {
|
||||
let title: string;
|
||||
if (runDataExecuted.data.resultData.lastNodeExecuted) {
|
||||
let type = 'error';
|
||||
if (runDataExecuted.status === 'canceled') {
|
||||
title = this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title');
|
||||
type = 'warning';
|
||||
} else if (runDataExecuted.data.resultData.lastNodeExecuted) {
|
||||
title = `Problem in node ‘${runDataExecuted.data.resultData.lastNodeExecuted}‘`;
|
||||
} else {
|
||||
title = 'Problem executing workflow';
|
||||
|
@ -398,7 +410,7 @@ export const pushConnection = defineComponent({
|
|||
this.showMessage({
|
||||
title,
|
||||
message: runDataExecutedErrorMessage,
|
||||
type: 'error',
|
||||
type,
|
||||
duration: 0,
|
||||
dangerouslyUseHTMLString: true,
|
||||
});
|
||||
|
|
|
@ -1562,10 +1562,6 @@ export default defineComponent({
|
|||
try {
|
||||
this.stopExecutionInProgress = true;
|
||||
await this.workflowsStore.stopCurrentExecution(executionId);
|
||||
this.showMessage({
|
||||
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
|
||||
type: 'success',
|
||||
});
|
||||
} catch (error) {
|
||||
// Execution stop might fail when the execution has already finished. Let's treat this here.
|
||||
const execution = await this.workflowsStore.getExecution(executionId);
|
||||
|
|
|
@ -347,10 +347,9 @@ export class Wait extends Webhook {
|
|||
if (waitValue < 65000) {
|
||||
// If wait time is shorter than 65 seconds leave execution active because
|
||||
// we just check the database every 60 seconds.
|
||||
return new Promise((resolve, _reject) => {
|
||||
setTimeout(() => {
|
||||
resolve([context.getInputData()]);
|
||||
}, waitValue);
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
|
||||
context.onExecutionCancellation(() => clearTimeout(timer));
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -422,6 +422,7 @@ export interface IGetExecuteFunctions {
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
executeData: IExecuteData,
|
||||
mode: WorkflowExecuteMode,
|
||||
abortSignal?: AbortSignal,
|
||||
): IExecuteFunctions;
|
||||
}
|
||||
|
||||
|
@ -437,6 +438,7 @@ export interface IGetExecuteSingleFunctions {
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
executeData: IExecuteData,
|
||||
mode: WorkflowExecuteMode,
|
||||
abortSignal?: AbortSignal,
|
||||
): IExecuteSingleFunctions;
|
||||
}
|
||||
|
||||
|
@ -776,6 +778,8 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
|
|||
getExecuteData(): IExecuteData;
|
||||
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
|
||||
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
|
||||
getExecutionCancelSignal(): AbortSignal | undefined;
|
||||
onExecutionCancellation(handler: () => unknown): void;
|
||||
};
|
||||
|
||||
// TODO: Create later own type only for Config-Nodes
|
||||
|
|
|
@ -79,6 +79,7 @@ export class RoutingNode {
|
|||
executeData: IExecuteData,
|
||||
nodeExecuteFunctions: INodeExecuteFunctions,
|
||||
credentialsDecrypted?: ICredentialsDecrypted,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<INodeExecutionData[][] | null | undefined> {
|
||||
const items = inputData.main[0] as INodeExecutionData[];
|
||||
const returnData: INodeExecutionData[] = [];
|
||||
|
@ -99,6 +100,7 @@ export class RoutingNode {
|
|||
this.additionalData,
|
||||
executeData,
|
||||
this.mode,
|
||||
abortSignal,
|
||||
);
|
||||
|
||||
let credentials: ICredentialDataDecryptedObject | undefined;
|
||||
|
@ -136,6 +138,7 @@ export class RoutingNode {
|
|||
this.additionalData,
|
||||
executeData,
|
||||
this.mode,
|
||||
abortSignal,
|
||||
);
|
||||
const requestData: DeclarativeRestApiSettings.ResultOptions = {
|
||||
options: {
|
||||
|
|
|
@ -1216,6 +1216,7 @@ export class Workflow {
|
|||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
nodeExecuteFunctions: INodeExecuteFunctions,
|
||||
mode: WorkflowExecuteMode,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<IRunNodeResponse> {
|
||||
const { node } = executionData;
|
||||
let inputData = executionData.data;
|
||||
|
@ -1303,6 +1304,7 @@ export class Workflow {
|
|||
additionalData,
|
||||
executionData,
|
||||
mode,
|
||||
abortSignal,
|
||||
);
|
||||
const data =
|
||||
nodeType instanceof Node
|
||||
|
@ -1385,6 +1387,8 @@ export class Workflow {
|
|||
nodeType,
|
||||
executionData,
|
||||
nodeExecuteFunctions,
|
||||
undefined,
|
||||
abortSignal,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue