mirror of
https://github.com/n8n-io/n8n.git
synced 2024-09-19 22:37:31 -07:00
Merge e9fd2be0b3
into 48294e7ec1
This commit is contained in:
commit
a8eccda849
|
@ -94,10 +94,17 @@ describe('ActiveExecutions', () => {
|
|||
});
|
||||
|
||||
test('Should remove an existing execution', async () => {
|
||||
// ARRANGE
|
||||
const newExecution = mockExecutionData();
|
||||
const executionId = await activeExecutions.add(newExecution);
|
||||
activeExecutions.remove(executionId);
|
||||
|
||||
// ACT
|
||||
activeExecutions.finalizeExecution(executionId);
|
||||
|
||||
// Wait until the next tick to ensure that the post-execution promise has settled
|
||||
await new Promise(setImmediate);
|
||||
|
||||
// ASSERT
|
||||
expect(activeExecutions.getActiveExecutions().length).toBe(0);
|
||||
});
|
||||
|
||||
|
@ -110,7 +117,7 @@ describe('ActiveExecutions', () => {
|
|||
setTimeout(res, 100);
|
||||
});
|
||||
const fakeOutput = mockFullRunData();
|
||||
activeExecutions.remove(executionId, fakeOutput);
|
||||
activeExecutions.finalizeExecution(executionId, fakeOutput);
|
||||
|
||||
await expect(postExecutePromise).resolves.toEqual(fakeOutput);
|
||||
});
|
||||
|
@ -126,7 +133,7 @@ describe('ActiveExecutions', () => {
|
|||
const cancellablePromise = mockCancelablePromise();
|
||||
cancellablePromise.cancel = cancelExecution;
|
||||
activeExecutions.attachWorkflowExecution(executionId, cancellablePromise);
|
||||
void activeExecutions.stopExecution(executionId);
|
||||
activeExecutions.stopExecution(executionId);
|
||||
|
||||
expect(cancelExecution).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
|
||||
import Container from 'typedi';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
import config from '@/config';
|
||||
import type { User } from '@/databases/entities/user';
|
||||
import { Telemetry } from '@/telemetry';
|
||||
|
@ -72,6 +73,10 @@ test('processError should process error', async () => {
|
|||
},
|
||||
workflow,
|
||||
);
|
||||
await Container.get(ActiveExecutions).add(
|
||||
{ executionMode: 'webhook', workflowData: workflow },
|
||||
execution.id,
|
||||
);
|
||||
config.set('executions.mode', 'regular');
|
||||
await runner.processError(
|
||||
new Error('test') as ExecutionError,
|
||||
|
|
|
@ -5,17 +5,13 @@ import type {
|
|||
ExecutionStatus,
|
||||
IWorkflowExecutionDataProcess,
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
ApplicationError,
|
||||
createDeferredPromise,
|
||||
ExecutionCancelledError,
|
||||
sleep,
|
||||
} from 'n8n-workflow';
|
||||
import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow';
|
||||
import { strict as assert } from 'node:assert';
|
||||
import type PCancelable from 'p-cancelable';
|
||||
import { Service } from 'typedi';
|
||||
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
|
||||
import type {
|
||||
ExecutionPayload,
|
||||
IExecutingWorkflowData,
|
||||
|
@ -95,13 +91,29 @@ export class ActiveExecutions {
|
|||
await this.executionRepository.updateExistingExecution(executionId, execution);
|
||||
}
|
||||
|
||||
const postExecutePromise = createDeferredPromise<IRun | undefined>();
|
||||
|
||||
this.activeExecutions[executionId] = {
|
||||
executionData,
|
||||
startedAt: new Date(),
|
||||
postExecutePromises: [],
|
||||
postExecutePromise,
|
||||
status: executionStatus,
|
||||
};
|
||||
|
||||
// Automatically remove execution once the postExecutePromise settles
|
||||
void postExecutePromise.promise
|
||||
.catch((error) => {
|
||||
if (error instanceof ExecutionCancelledError) return;
|
||||
throw error;
|
||||
})
|
||||
.finally(() => {
|
||||
this.concurrencyControl.release({ mode: executionData.executionMode });
|
||||
delete this.activeExecutions[executionId];
|
||||
this.logger.debug('Execution removed', { executionId });
|
||||
});
|
||||
|
||||
this.logger.debug('Execution added', { executionId });
|
||||
|
||||
return executionId;
|
||||
}
|
||||
|
||||
|
@ -125,68 +137,26 @@ export class ActiveExecutions {
|
|||
execution?.responsePromise?.resolve(response);
|
||||
}
|
||||
|
||||
getPostExecutePromiseCount(executionId: string): number {
|
||||
return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an active execution
|
||||
*/
|
||||
remove(executionId: string, fullRunData?: IRun): void {
|
||||
const execution = this.activeExecutions[executionId];
|
||||
if (execution === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Resolve all the waiting promises
|
||||
for (const promise of execution.postExecutePromises) {
|
||||
promise.resolve(fullRunData);
|
||||
}
|
||||
|
||||
this.postExecuteCleanup(executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Forces an execution to stop
|
||||
*/
|
||||
/** Cancel the execution promise and reject its post-execution promise. */
|
||||
stopExecution(executionId: string): void {
|
||||
const execution = this.activeExecutions[executionId];
|
||||
if (execution === undefined) {
|
||||
// There is no execution running with that id
|
||||
return;
|
||||
}
|
||||
|
||||
execution.workflowExecution!.cancel();
|
||||
|
||||
// Reject all the waiting promises
|
||||
const reason = new ExecutionCancelledError(executionId);
|
||||
for (const promise of execution.postExecutePromises) {
|
||||
promise.reject(reason);
|
||||
}
|
||||
|
||||
this.postExecuteCleanup(executionId);
|
||||
const execution = this.getExecution(executionId);
|
||||
execution.workflowExecution?.cancel();
|
||||
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
|
||||
this.logger.debug('Execution cancelled', { executionId });
|
||||
}
|
||||
|
||||
private postExecuteCleanup(executionId: string) {
|
||||
const execution = this.activeExecutions[executionId];
|
||||
if (execution === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove from the list of active executions
|
||||
delete this.activeExecutions[executionId];
|
||||
|
||||
this.concurrencyControl.release({ mode: execution.executionData.executionMode });
|
||||
/** Resolve the post-execution promise in an execution. */
|
||||
finalizeExecution(executionId: string, fullRunData?: IRun) {
|
||||
const execution = this.getExecution(executionId);
|
||||
execution.postExecutePromise.resolve(fullRunData);
|
||||
this.logger.debug('Execution finalized', { executionId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a promise which will resolve with the data of the execution with the given id
|
||||
*/
|
||||
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
|
||||
// Create the promise which will be resolved when the execution finished
|
||||
const waitPromise = createDeferredPromise<IRun | undefined>();
|
||||
this.getExecution(executionId).postExecutePromises.push(waitPromise);
|
||||
return await waitPromise.promise;
|
||||
return await this.getExecution(executionId).postExecutePromise.promise;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -252,7 +222,7 @@ export class ActiveExecutions {
|
|||
private getExecution(executionId: string): IExecutingWorkflowData {
|
||||
const execution = this.activeExecutions[executionId];
|
||||
if (!execution) {
|
||||
throw new ApplicationError('No active execution found', { extra: { executionId } });
|
||||
throw new ExecutionNotFoundError(executionId);
|
||||
}
|
||||
return execution;
|
||||
}
|
||||
|
|
7
packages/cli/src/errors/execution-not-found-error.ts
Normal file
7
packages/cli/src/errors/execution-not-found-error.ts
Normal file
|
@ -0,0 +1,7 @@
|
|||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
export class ExecutionNotFoundError extends ApplicationError {
|
||||
constructor(executionId: string) {
|
||||
super('No active execution found', { extra: { executionId } });
|
||||
}
|
||||
}
|
|
@ -193,7 +193,8 @@ export interface IExecutionsCurrentSummary {
|
|||
export interface IExecutingWorkflowData {
|
||||
executionData: IWorkflowExecutionDataProcess;
|
||||
startedAt: Date;
|
||||
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
|
||||
/** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */
|
||||
postExecutePromise: IDeferredPromise<IRun | undefined>;
|
||||
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
|
||||
workflowExecution?: PCancelable<IRun>;
|
||||
status: ExecutionStatus;
|
||||
|
|
|
@ -879,8 +879,7 @@ async function executeWorkflow(
|
|||
fullExecutionData.workflowId = workflowData.id;
|
||||
}
|
||||
|
||||
// remove execution from active executions
|
||||
activeExecutions.remove(executionId, fullRunData);
|
||||
activeExecutions.finalizeExecution(executionId, fullRunData);
|
||||
|
||||
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
|
||||
throw objectToError(
|
||||
|
@ -906,11 +905,11 @@ async function executeWorkflow(
|
|||
if (data.finished === true || data.status === 'waiting') {
|
||||
// Workflow did finish successfully
|
||||
|
||||
activeExecutions.remove(executionId, data);
|
||||
activeExecutions.finalizeExecution(executionId, data);
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
return returnData!.data!.main;
|
||||
}
|
||||
activeExecutions.remove(executionId, data);
|
||||
activeExecutions.finalizeExecution(executionId, data);
|
||||
|
||||
// Workflow did fail
|
||||
const { error } = data.data.resultData;
|
||||
|
|
|
@ -26,7 +26,6 @@ import { ActiveExecutions } from '@/active-executions';
|
|||
import config from '@/config';
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { ExternalHooks } from '@/external-hooks';
|
||||
import type { IExecutionResponse } from '@/interfaces';
|
||||
import { Logger } from '@/logger';
|
||||
import { NodeTypes } from '@/node-types';
|
||||
import type { ScalingService } from '@/scaling/scaling.service';
|
||||
|
@ -102,7 +101,7 @@ export class WorkflowRunner {
|
|||
|
||||
// Remove from active execution with empty data. That will
|
||||
// set the execution to failed.
|
||||
this.activeExecutions.remove(executionId, fullRunData);
|
||||
this.activeExecutions.finalizeExecution(executionId, fullRunData);
|
||||
|
||||
if (hooks) {
|
||||
await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]);
|
||||
|
@ -132,7 +131,7 @@ export class WorkflowRunner {
|
|||
await workflowHooks.executeHookFunctions('workflowExecuteBefore', []);
|
||||
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]);
|
||||
responsePromise?.reject(error);
|
||||
this.activeExecutions.remove(executionId);
|
||||
this.activeExecutions.finalizeExecution(executionId);
|
||||
return executionId;
|
||||
}
|
||||
|
||||
|
@ -336,7 +335,7 @@ export class WorkflowRunner {
|
|||
fullRunData.finished = false;
|
||||
}
|
||||
fullRunData.status = this.activeExecutions.getStatus(executionId);
|
||||
this.activeExecutions.remove(executionId, fullRunData);
|
||||
this.activeExecutions.finalizeExecution(executionId, fullRunData);
|
||||
})
|
||||
.catch(
|
||||
async (error) =>
|
||||
|
@ -505,45 +504,24 @@ export class WorkflowRunner {
|
|||
reject(error);
|
||||
}
|
||||
|
||||
// optimization: only pull and unflatten execution data from the Db when it is needed
|
||||
const executionHasPostExecutionPromises =
|
||||
this.activeExecutions.getPostExecutePromiseCount(executionId) > 0;
|
||||
|
||||
if (executionHasPostExecutionPromises) {
|
||||
this.logger.debug(
|
||||
`Reading execution data for execution ${executionId} from db for PostExecutionPromise.`,
|
||||
);
|
||||
} else {
|
||||
this.logger.debug(
|
||||
`Skipping execution data for execution ${executionId} since there are no PostExecutionPromise.`,
|
||||
);
|
||||
}
|
||||
|
||||
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
|
||||
includeData: executionHasPostExecutionPromises,
|
||||
unflattenData: executionHasPostExecutionPromises,
|
||||
includeData: true,
|
||||
unflattenData: true,
|
||||
});
|
||||
if (!fullExecutionData) {
|
||||
return reject(new Error(`Could not find execution with id "${executionId}"`));
|
||||
}
|
||||
|
||||
const runData: IRun = {
|
||||
data: {},
|
||||
finished: fullExecutionData.finished,
|
||||
mode: fullExecutionData.mode,
|
||||
startedAt: fullExecutionData.startedAt,
|
||||
stoppedAt: fullExecutionData.stoppedAt,
|
||||
status: fullExecutionData.status,
|
||||
} as IRun;
|
||||
data: fullExecutionData.data,
|
||||
};
|
||||
|
||||
if (executionHasPostExecutionPromises) {
|
||||
runData.data = (fullExecutionData as IExecutionResponse).data;
|
||||
}
|
||||
|
||||
// NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present,
|
||||
// the execution data in runData.data MAY not be available here.
|
||||
// This means that any function expecting with runData has to check if the runData.data defined from this point
|
||||
this.activeExecutions.remove(executionId, runData);
|
||||
this.activeExecutions.finalizeExecution(executionId, runData);
|
||||
|
||||
// Normally also static data should be supplied here but as it only used for sending
|
||||
// data to editor-UI is not needed.
|
||||
|
|
Loading…
Reference in a new issue