refactor(core): Simplify postExecutePromises

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-09-13 12:57:37 +02:00
parent cef64329a9
commit 2b06cb91c7
No known key found for this signature in database
GPG key ID: 9300FF7CDEA1FBAA
5 changed files with 35 additions and 88 deletions

View file

@ -96,7 +96,7 @@ describe('ActiveExecutions', () => {
test('Should remove an existing execution', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
activeExecutions.remove(executionId);
activeExecutions.finishExecution(executionId);
expect(activeExecutions.getActiveExecutions().length).toBe(0);
});
@ -110,7 +110,7 @@ describe('ActiveExecutions', () => {
setTimeout(res, 100);
});
const fakeOutput = mockFullRunData();
activeExecutions.remove(executionId, fakeOutput);
activeExecutions.finishExecution(executionId, fakeOutput);
await expect(postExecutePromise).resolves.toEqual(fakeOutput);
});
@ -126,7 +126,7 @@ describe('ActiveExecutions', () => {
const cancellablePromise = mockCancelablePromise();
cancellablePromise.cancel = cancelExecution;
activeExecutions.attachWorkflowExecution(executionId, cancellablePromise);
void activeExecutions.stopExecution(executionId);
activeExecutions.stopExecution(executionId);
expect(cancelExecution).toHaveBeenCalledTimes(1);
});

View file

@ -95,13 +95,23 @@ export class ActiveExecutions {
await this.executionRepository.updateExistingExecution(executionId, execution);
}
const postExecutePromise = createDeferredPromise<IRun | undefined>();
this.activeExecutions[executionId] = {
executionData,
startedAt: new Date(),
postExecutePromises: [],
postExecutePromise,
status: executionStatus,
};
// Automatically remove execution once the postExecutePromise settles
void postExecutePromise.promise.finally(() => {
this.concurrencyControl.release({ mode: executionData.executionMode });
setImmediate(() => {
delete this.activeExecutions[executionId];
});
});
return executionId;
}
@ -125,68 +135,24 @@ export class ActiveExecutions {
execution?.responsePromise?.resolve(response);
}
getPostExecutePromiseCount(executionId: string): number {
return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0;
}
/**
* Remove an active execution
*/
remove(executionId: string, fullRunData?: IRun): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
return;
}
// Resolve all the waiting promises
for (const promise of execution.postExecutePromises) {
promise.resolve(fullRunData);
}
this.postExecuteCleanup(executionId);
}
/**
* Forces an execution to stop
*/
/** Forces an execution to stop */
stopExecution(executionId: string): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
// There is no execution running with that id
return;
}
execution.workflowExecution!.cancel();
// Reject all the waiting promises
const reason = new ExecutionCancelledError(executionId);
for (const promise of execution.postExecutePromises) {
promise.reject(reason);
}
this.postExecuteCleanup(executionId);
const execution = this.getExecution(executionId);
execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
}
private postExecuteCleanup(executionId: string) {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
return;
}
// Remove from the list of active executions
delete this.activeExecutions[executionId];
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
/** Mark an execution as completed */
finishExecution(executionId: string, fullRunData?: IRun) {
const execution = this.getExecution(executionId);
execution.postExecutePromise.resolve(fullRunData);
}
/**
* Returns a promise which will resolve with the data of the execution with the given id
*/
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
// Create the promise which will be resolved when the execution finished
const waitPromise = createDeferredPromise<IRun | undefined>();
this.getExecution(executionId).postExecutePromises.push(waitPromise);
return await waitPromise.promise;
return await this.getExecution(executionId).postExecutePromise.promise;
}
/**

View file

@ -193,7 +193,7 @@ export interface IExecutionsCurrentSummary {
export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess;
startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
postExecutePromise: IDeferredPromise<IRun | undefined>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>;
status: ExecutionStatus;

View file

@ -880,7 +880,7 @@ async function executeWorkflow(
}
// remove execution from active executions
activeExecutions.remove(executionId, fullRunData);
activeExecutions.finishExecution(executionId, fullRunData);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError(
@ -906,11 +906,11 @@ async function executeWorkflow(
if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully
activeExecutions.remove(executionId, data);
activeExecutions.finishExecution(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
}
activeExecutions.remove(executionId, data);
activeExecutions.finishExecution(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;

View file

@ -26,7 +26,6 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse } from '@/interfaces';
import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types';
import type { ScalingService } from '@/scaling/scaling.service';
@ -101,7 +100,7 @@ export class WorkflowRunner {
// Remove from active execution with empty data. That will
// set the execution to failed.
this.activeExecutions.remove(executionId, fullRunData);
this.activeExecutions.finishExecution(executionId, fullRunData);
if (hooks) {
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
@ -129,7 +128,7 @@ export class WorkflowRunner {
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
responsePromise?.reject(error);
this.activeExecutions.remove(executionId);
this.activeExecutions.finishExecution(executionId);
return executionId;
}
@ -321,7 +320,7 @@ export class WorkflowRunner {
fullRunData.finished = false;
}
fullRunData.status = this.activeExecutions.getStatus(executionId);
this.activeExecutions.remove(executionId, fullRunData);
this.activeExecutions.finishExecution(executionId, fullRunData);
})
.catch(
async (error) =>
@ -490,45 +489,27 @@ export class WorkflowRunner {
reject(error);
}
// optimization: only pull and unflatten execution data from the Db when it is needed
const executionHasPostExecutionPromises =
this.activeExecutions.getPostExecutePromiseCount(executionId) > 0;
if (executionHasPostExecutionPromises) {
this.logger.debug(
`Reading execution data for execution ${executionId} from db for PostExecutionPromise.`,
);
} else {
this.logger.debug(
`Skipping execution data for execution ${executionId} since there are no PostExecutionPromise.`,
);
}
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: executionHasPostExecutionPromises,
unflattenData: executionHasPostExecutionPromises,
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
return reject(new Error(`Could not find execution with id "${executionId}"`));
}
const runData: IRun = {
data: {},
finished: fullExecutionData.finished,
mode: fullExecutionData.mode,
startedAt: fullExecutionData.startedAt,
stoppedAt: fullExecutionData.stoppedAt,
status: fullExecutionData.status,
} as IRun;
if (executionHasPostExecutionPromises) {
runData.data = (fullExecutionData as IExecutionResponse).data;
}
data: fullExecutionData.data,
};
// NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present,
// the execution data in runData.data MAY not be available here.
// This means that any function expecting with runData has to check if the runData.data defined from this point
this.activeExecutions.remove(executionId, runData);
this.activeExecutions.finishExecution(executionId, runData);
// Normally also static data should be supplied here but as it only used for sending
// data to editor-UI is not needed.