This commit is contained in:
Danny Martini 2024-09-19 17:34:03 +02:00 committed by GitHub
commit 662d2d2a03
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 49 additions and 4 deletions

View file

@ -146,6 +146,18 @@ export class ActiveExecutions {
this.postExecuteCleanup(executionId);
}
cancelExecution(executionId: string): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
// There is no execution running with that id
return;
}
console.log(new Date().toISOString(), 'cancel execution');
execution.workflowExecution!.cancel();
console.log(new Date().toISOString(), 'cancelled execution');
}
/**
* Forces an execution to stop
*/
@ -164,6 +176,12 @@ export class ActiveExecutions {
promise.reject(reason);
}
// FIXME: Option 2: Don't clean up the execution. Because we cancelled it
// above, which means the time a node finishes the execution will stop and
// the WorkflowRunner will remove it from the active executions.
// Given that this function is called from many places this is probably not
// a good fix.
//
this.postExecuteCleanup(executionId);
}
@ -235,7 +253,7 @@ export class ActiveExecutions {
await this.concurrencyControl.removeAll(this.activeExecutions);
}
executionIds.forEach((executionId) => this.stopExecution(executionId));
executionIds.forEach((executionId) => this.cancelExecution(executionId));
}
let count = 0;

View file

@ -452,8 +452,19 @@ export class ExecutionService {
return await this.executionRepository.stopBeforeRun(execution);
}
// NOTE: old code
// NOTE: Should this actually stop the execution?
// Maybe this should just get the execution from the active executions,
// then cancel it and then let the workflow execute and workflow runner
// take care of the rest?
//if (this.activeExecutions.has(execution.id)) {
// this.activeExecutions.stopExecution(execution.id);
//}
// NOTE: new code
// FIXME: Option 3
if (this.activeExecutions.has(execution.id)) {
this.activeExecutions.stopExecution(execution.id);
this.activeExecutions.cancelExecution(execution.id);
}
if (this.waitTracker.has(execution.id)) {

View file

@ -325,18 +325,30 @@ export class WorkflowRunner {
if (workflowTimeout > 0) {
const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
executionTimeout = setTimeout(() => {
void this.activeExecutions.stopExecution(executionId);
void this.activeExecutions.cancelExecution(executionId);
}, timeout);
}
workflowExecution
.then((fullRunData) => {
clearTimeout(executionTimeout);
// NOTE: old code
if (workflowExecution.isCanceled) {
fullRunData.finished = false;
}
fullRunData.status = this.activeExecutions.getStatus(executionId);
this.activeExecutions.remove(executionId, fullRunData);
// NOTE: new code
// FIXME: Option 1: Don't call `getStatus` if the execution was
// already cancelled.
//if (workflowExecution.isCanceled) {
// fullRunData.finished = false;
// this.activeExecutions.remove(executionId, fullRunData);
//} else {
// // if the execution was canceled it was already removed from the active executions
// fullRunData.status = this.activeExecutions.getStatus(executionId);
//}
})
.catch(
async (error) =>

View file

@ -515,7 +515,11 @@ export class Wait extends Webhook {
// we just check the database every 60 seconds.
return await new Promise((resolve) => {
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
context.onExecutionCancellation(() => clearTimeout(timer));
context.onExecutionCancellation(() => {
clearTimeout(timer);
// FIXME: Don't let the promise dangle
resolve([context.getInputData()]);
});
});
}