fix(core): Use AbortController to notify nodes to abort execution (#6141)

and add support for cancelling ongoing operations inside a node.

---------
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-11-24 18:17:06 +01:00 committed by GitHub
parent 0ec67dabf7
commit d2c18c5727
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 58 deletions

View file

@ -51,10 +51,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual') .canvasNodeByName('Manual')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
.should('exist'); .should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters workflowPage.getters
.canvasNodeByName('Set') .canvasNodeByName('Set')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
@ -112,10 +108,6 @@ describe('Execution', () => {
.canvasNodeByName('Manual') .canvasNodeByName('Manual')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
.should('exist'); .should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters workflowPage.getters
.canvasNodeByName('Wait') .canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible')); .within(() => cy.get('.fa-sync-alt').should('not.visible'));
@ -128,8 +120,8 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click(); workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist'); workflowPage.getters.clearExecutionDataButton().should('not.exist');
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished) // Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible'); workflowPage.getters.warningToast().should('be.visible');
}); });
it('should test webhook workflow', () => { it('should test webhook workflow', () => {
@ -191,10 +183,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook') .canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
.should('exist'); .should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters workflowPage.getters
.canvasNodeByName('Set') .canvasNodeByName('Set')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
@ -267,10 +255,6 @@ describe('Execution', () => {
.canvasNodeByName('Webhook') .canvasNodeByName('Webhook')
.within(() => cy.get('.fa-check')) .within(() => cy.get('.fa-check'))
.should('exist'); .should('exist');
workflowPage.getters
.canvasNodeByName('Wait')
.within(() => cy.get('.fa-check'))
.should('exist');
workflowPage.getters workflowPage.getters
.canvasNodeByName('Wait') .canvasNodeByName('Wait')
.within(() => cy.get('.fa-sync-alt').should('not.visible')); .within(() => cy.get('.fa-sync-alt').should('not.visible'));
@ -283,7 +267,7 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().click(); workflowPage.getters.clearExecutionDataButton().click();
workflowPage.getters.clearExecutionDataButton().should('not.exist'); workflowPage.getters.clearExecutionDataButton().should('not.exist');
// Check success toast (works because Cypress waits enough for the element to show after the http request node has finished) // Check warning toast (works because Cypress waits enough for the element to show after the http request node has finished)
workflowPage.getters.successToast().should('be.visible'); workflowPage.getters.warningToast().should('be.visible');
}); });
}); });

View file

@ -48,6 +48,7 @@ export class WorkflowPage extends BasePage {
return cy.get(this.getters.getEndpointSelector('plus', nodeName, index)); return cy.get(this.getters.getEndpointSelector('plus', nodeName, index));
}, },
successToast: () => cy.get('.el-notification:has(.el-notification--success)'), successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'), errorToast: () => cy.get('.el-notification:has(.el-notification--error)'),
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'), activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
workflowMenu: () => cy.getByTestId('workflow-menu'), workflowMenu: () => cy.getByTestId('workflow-menu'),

View file

@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
prepareOutputData: async (outputData) => [outputData], prepareOutputData: async (outputData) => [outputData],
}); });
const executionCancellationFunctions = (
abortSignal?: AbortSignal,
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
getExecutionCancelSignal: () => abortSignal,
onExecutionCancellation: (handler) => {
const fn = () => {
abortSignal?.removeEventListener('abort', fn);
handler();
};
abortSignal?.addEventListener('abort', fn);
},
});
const getRequestHelperFunctions = ( const getRequestHelperFunctions = (
workflow: Workflow, workflow: Workflow,
node: INode, node: INode,
@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions { ): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => { return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return { return {
...getCommonWorkflowFunctions(workflow, node, additionalData), ...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode, getMode: () => mode,
getCredentials: async (type, itemIndex) => getCredentials: async (type, itemIndex) =>
getCredentials( getCredentials(
@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions { ): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => { return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return { return {
...getCommonWorkflowFunctions(workflow, node, additionalData), ...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node), continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => { evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex; evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;

View file

@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */ /* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */ /* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
import { setMaxListeners } from 'events';
import PCancelable from 'p-cancelable'; import PCancelable from 'p-cancelable';
import type { import type {
@ -44,23 +43,14 @@ import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as NodeExecuteFunctions from './NodeExecuteFunctions';
export class WorkflowExecute { export class WorkflowExecute {
runExecutionData: IRunExecutionData; private status: ExecutionStatus = 'new';
private additionalData: IWorkflowExecuteAdditionalData; private readonly abortController = new AbortController();
private mode: WorkflowExecuteMode;
private status: ExecutionStatus;
constructor( constructor(
additionalData: IWorkflowExecuteAdditionalData, private readonly additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode, private readonly mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData, private runExecutionData: IRunExecutionData = {
) {
this.additionalData = additionalData;
this.mode = mode;
this.status = 'new';
this.runExecutionData = runExecutionData || {
startData: {}, startData: {},
resultData: { resultData: {
runData: {}, runData: {},
@ -73,8 +63,8 @@ export class WorkflowExecute {
waitingExecution: {}, waitingExecution: {},
waitingExecutionSource: {}, waitingExecutionSource: {},
}, },
}; },
} ) {}
/** /**
* Executes the given workflow. * Executes the given workflow.
@ -830,11 +820,16 @@ export class WorkflowExecute {
let closeFunction: Promise<void> | undefined; let closeFunction: Promise<void> | undefined;
return new PCancelable(async (resolve, reject, onCancel) => { return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false; // Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
setMaxListeners(Infinity, this.abortController.signal);
onCancel.shouldReject = false; onCancel.shouldReject = false;
onCancel(() => { onCancel(() => {
gotCancel = true; this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
setTimeout(() => resolve(fullRunData), 10);
}); });
const returnPromise = (async () => { const returnPromise = (async () => {
@ -881,10 +876,10 @@ export class WorkflowExecute {
this.additionalData.executionTimeoutTimestamp !== undefined && this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp Date.now() >= this.additionalData.executionTimeoutTimestamp
) { ) {
gotCancel = true; this.status = 'canceled';
} }
if (gotCancel) { if (this.status === 'canceled') {
return; return;
} }
@ -1014,9 +1009,6 @@ export class WorkflowExecute {
} }
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) { for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
if (gotCancel) {
return;
}
try { try {
if (tryIndex !== 0) { if (tryIndex !== 0) {
// Reset executionError from previous error try // Reset executionError from previous error try
@ -1052,6 +1044,7 @@ export class WorkflowExecute {
this.additionalData, this.additionalData,
NodeExecuteFunctions, NodeExecuteFunctions,
this.mode, this.mode,
this.abortController.signal,
); );
nodeSuccessData = runNodeData.data; nodeSuccessData = runNodeData.data;
@ -1089,6 +1082,7 @@ export class WorkflowExecute {
this.additionalData, this.additionalData,
executionData, executionData,
this.mode, this.mode,
this.abortController.signal,
); );
const dataProxy = executeFunctions.getWorkflowDataProxy(0); const dataProxy = executeFunctions.getWorkflowDataProxy(0);
@ -1644,7 +1638,7 @@ export class WorkflowExecute {
return; return;
})() })()
.then(async () => { .then(async () => {
if (gotCancel && executionError === undefined) { if (this.status === 'canceled' && executionError === undefined) {
return this.processSuccessExecution( return this.processSuccessExecution(
startedAt, startedAt,
workflow, workflow,

View file

@ -272,7 +272,8 @@ export const pushConnection = defineComponent({
return false; return false;
} }
if (this.workflowsStore.activeExecutionId !== pushData.executionId) { const { activeExecutionId } = this.workflowsStore;
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started // The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet. // by this session or we do not have the execution id yet.
if (isRetry !== true) { if (isRetry !== true) {
@ -285,10 +286,17 @@ export const pushConnection = defineComponent({
let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data); let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);
if (pushData.data.status === 'crashed') { if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = this.$locale.baseText( runDataExecutedErrorMessage = this.$locale.baseText(
'pushConnection.executionFailed.message', 'pushConnection.executionFailed.message',
); );
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = this.$locale.baseText(
'executionsList.showMessage.stopExecution.message',
{
interpolate: { activeExecutionId },
},
);
} }
const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber; const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
@ -389,7 +397,11 @@ export const pushConnection = defineComponent({
}); });
} else { } else {
let title: string; let title: string;
if (runDataExecuted.data.resultData.lastNodeExecuted) { let type = 'error';
if (runDataExecuted.status === 'canceled') {
title = this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title');
type = 'warning';
} else if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ${runDataExecuted.data.resultData.lastNodeExecuted}`; title = `Problem in node ${runDataExecuted.data.resultData.lastNodeExecuted}`;
} else { } else {
title = 'Problem executing workflow'; title = 'Problem executing workflow';
@ -398,7 +410,7 @@ export const pushConnection = defineComponent({
this.showMessage({ this.showMessage({
title, title,
message: runDataExecutedErrorMessage, message: runDataExecutedErrorMessage,
type: 'error', type,
duration: 0, duration: 0,
dangerouslyUseHTMLString: true, dangerouslyUseHTMLString: true,
}); });

View file

@ -1562,10 +1562,6 @@ export default defineComponent({
try { try {
this.stopExecutionInProgress = true; this.stopExecutionInProgress = true;
await this.workflowsStore.stopCurrentExecution(executionId); await this.workflowsStore.stopCurrentExecution(executionId);
this.showMessage({
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} catch (error) { } catch (error) {
// Execution stop might fail when the execution has already finished. Let's treat this here. // Execution stop might fail when the execution has already finished. Let's treat this here.
const execution = await this.workflowsStore.getExecution(executionId); const execution = await this.workflowsStore.getExecution(executionId);

View file

@ -347,10 +347,9 @@ export class Wait extends Webhook {
if (waitValue < 65000) { if (waitValue < 65000) {
// If wait time is shorter than 65 seconds leave execution active because // If wait time is shorter than 65 seconds leave execution active because
// we just check the database every 60 seconds. // we just check the database every 60 seconds.
return new Promise((resolve, _reject) => { return new Promise((resolve) => {
setTimeout(() => { const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
resolve([context.getInputData()]); context.onExecutionCancellation(() => clearTimeout(timer));
}, waitValue);
}); });
} }

View file

@ -422,6 +422,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions; ): IExecuteFunctions;
} }
@ -437,6 +438,7 @@ export interface IGetExecuteSingleFunctions {
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData, executeData: IExecuteData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions; ): IExecuteSingleFunctions;
} }
@ -776,6 +778,8 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getExecuteData(): IExecuteData; getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData; getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
}; };
// TODO: Create later own type only for Config-Nodes // TODO: Create later own type only for Config-Nodes

View file

@ -79,6 +79,7 @@ export class RoutingNode {
executeData: IExecuteData, executeData: IExecuteData,
nodeExecuteFunctions: INodeExecuteFunctions, nodeExecuteFunctions: INodeExecuteFunctions,
credentialsDecrypted?: ICredentialsDecrypted, credentialsDecrypted?: ICredentialsDecrypted,
abortSignal?: AbortSignal,
): Promise<INodeExecutionData[][] | null | undefined> { ): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[]; const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
@ -99,6 +100,7 @@ export class RoutingNode {
this.additionalData, this.additionalData,
executeData, executeData,
this.mode, this.mode,
abortSignal,
); );
let credentials: ICredentialDataDecryptedObject | undefined; let credentials: ICredentialDataDecryptedObject | undefined;
@ -136,6 +138,7 @@ export class RoutingNode {
this.additionalData, this.additionalData,
executeData, executeData,
this.mode, this.mode,
abortSignal,
); );
const requestData: DeclarativeRestApiSettings.ResultOptions = { const requestData: DeclarativeRestApiSettings.ResultOptions = {
options: { options: {

View file

@ -1216,6 +1216,7 @@ export class Workflow {
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
nodeExecuteFunctions: INodeExecuteFunctions, nodeExecuteFunctions: INodeExecuteFunctions,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> { ): Promise<IRunNodeResponse> {
const { node } = executionData; const { node } = executionData;
let inputData = executionData.data; let inputData = executionData.data;
@ -1303,6 +1304,7 @@ export class Workflow {
additionalData, additionalData,
executionData, executionData,
mode, mode,
abortSignal,
); );
const data = const data =
nodeType instanceof Node nodeType instanceof Node
@ -1385,6 +1387,8 @@ export class Workflow {
nodeType, nodeType,
executionData, executionData,
nodeExecuteFunctions, nodeExecuteFunctions,
undefined,
abortSignal,
), ),
}; };
} }