fix(core): Use AbortController to notify nodes to abort execution (#6141)

and add support for cancelling ongoing operations inside a node.

---------
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-11-24 18:17:06 +01:00 committed by GitHub
parent 0ec67dabf7
commit d2c18c5727
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 58 deletions

View file

@ -51,10 +51,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Set')
.within(() => cy.get('.fa-check'))
@ -112,10 +108,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
@ -128,8 +120,8 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist');
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible');
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.warningToast().should('be.visible');
});
it('should test webhook workflow', () => {
@ -191,10 +183,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Set')
.within(() => cy.get('.fa-check'))
@ -267,10 +255,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible'));
@ -283,7 +267,7 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist');
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible');
// Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.warningToast().should('be.visible');
});
});

View file

@ -48,6 +48,7 @@ export class WorkflowPage extends BasePage {
return cy.get(this.getters.getEndpointSelector('plus', nodeName, index));
},
successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'),
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
workflowMenu: () => cy.getByTestId('workflow-menu'),

View file

@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
prepareOutputData: async (outputData) => [outputData],
});
const executionCancellationFunctions = (
abortSignal?: AbortSignal,
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
getExecutionCancelSignal: () => abortSignal,
onExecutionCancellation: (handler) => {
const fn = () => {
abortSignal?.removeEventListener('abort', fn);
handler();
};
abortSignal?.addEventListener('abort', fn);
},
});
const getRequestHelperFunctions = (
workflow: Workflow,
node: INode,
@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
getCredentials(
@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;

View file

@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
import { setMaxListeners } from 'events';
import PCancelable from 'p-cancelable';
import type {
@ -44,23 +43,14 @@ import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
export class WorkflowExecute {
runExecutionData: IRunExecutionData;
private status: ExecutionStatus = 'new';
private additionalData: IWorkflowExecuteAdditionalData;
private mode: WorkflowExecuteMode;
private status: ExecutionStatus;
private readonly abortController = new AbortController();
constructor(
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData,
) {
this.additionalData = additionalData;
this.mode = mode;
this.status = 'new';
this.runExecutionData = runExecutionData || {
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
@ -73,8 +63,8 @@ export class WorkflowExecute {
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
},
) {}
/**
* Executes the given workflow.
@ -830,11 +820,16 @@ export class WorkflowExecute {
let closeFunction: Promise<void> | undefined;
return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false;
// Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
setMaxListeners(Infinity, this.abortController.signal);
onCancel.shouldReject = false;
onCancel(() => {
gotCancel = true;
this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
setTimeout(() => resolve(fullRunData), 10);
});
const returnPromise = (async () => {
@ -881,10 +876,10 @@ export class WorkflowExecute {
this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp
) {
gotCancel = true;
this.status = 'canceled';
}
if (gotCancel) {
if (this.status === 'canceled') {
return;
}
@ -1014,9 +1009,6 @@ export class WorkflowExecute {
}
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
if (gotCancel) {
return;
}
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
@ -1052,6 +1044,7 @@ export class WorkflowExecute {
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
nodeSuccessData = runNodeData.data;
@ -1089,6 +1082,7 @@ export class WorkflowExecute {
this.additionalData,
executionData,
this.mode,
this.abortController.signal,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
@ -1644,7 +1638,7 @@ export class WorkflowExecute {
return;
})()
.then(async () => {
if (gotCancel && executionError === undefined) {
if (this.status === 'canceled' && executionError === undefined) {
return this.processSuccessExecution(
startedAt,
workflow,

View file

@ -272,7 +272,8 @@ export const pushConnection = defineComponent({
return false;
}
if (this.workflowsStore.activeExecutionId !== pushData.executionId) {
const { activeExecutionId } = this.workflowsStore;
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
@ -285,10 +286,17 @@ export const pushConnection = defineComponent({
let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);
if (pushData.data.status === 'crashed') {
if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = this.$locale.baseText(
'pushConnection.executionFailed.message',
);
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = this.$locale.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
},
);
}
const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
@ -389,7 +397,11 @@ export const pushConnection = defineComponent({
});
} else {
let title: string;
if (runDataExecuted.data.resultData.lastNodeExecuted) {
let type = 'error';
if (runDataExecuted.status === 'canceled') {
title = this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title');
type = 'warning';
} else if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ${runDataExecuted.data.resultData.lastNodeExecuted}`;
} else {
title = 'Problem executing workflow';
@ -398,7 +410,7 @@ export const pushConnection = defineComponent({
this.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
type,
duration: 0,
dangerouslyUseHTMLString: true,
});

View file

@ -1562,10 +1562,6 @@ export default defineComponent({
try {
this.stopExecutionInProgress = true;
await this.workflowsStore.stopCurrentExecution(executionId);
this.showMessage({
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} catch (error) {
// Execution stop might fail when the execution has already finished. Let's treat this here.
const execution = await this.workflowsStore.getExecution(executionId);

View file

@ -347,10 +347,9 @@ export class Wait extends Webhook {
if (waitValue < 65000) {
// If wait time is shorter than 65 seconds leave execution active because
// we just check the database every 60 seconds.
return new Promise((resolve, _reject) => {
setTimeout(() => {
resolve([context.getInputData()]);
}, waitValue);
return new Promise((resolve) => {
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
context.onExecutionCancellation(() => clearTimeout(timer));
});
}

View file

@ -422,6 +422,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions;
}
@ -437,6 +438,7 @@ export interface IGetExecuteSingleFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions;
}
@ -776,6 +778,8 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
};
// TODO: Create later own type only for Config-Nodes

View file

@ -79,6 +79,7 @@ export class RoutingNode {
executeData: IExecuteData,
nodeExecuteFunctions: INodeExecuteFunctions,
credentialsDecrypted?: ICredentialsDecrypted,
abortSignal?: AbortSignal,
): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = [];
@ -99,6 +100,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);
let credentials: ICredentialDataDecryptedObject | undefined;
@ -136,6 +138,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);
const requestData: DeclarativeRestApiSettings.ResultOptions = {
options: {

View file

@ -1216,6 +1216,7 @@ export class Workflow {
additionalData: IWorkflowExecuteAdditionalData,
nodeExecuteFunctions: INodeExecuteFunctions,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
@ -1303,6 +1304,7 @@ export class Workflow {
additionalData,
executionData,
mode,
abortSignal,
);
const data =
nodeType instanceof Node
@ -1385,6 +1387,8 @@ export class Workflow {
nodeType,
executionData,
nodeExecuteFunctions,
undefined,
abortSignal,
),
};
}