2021-08-29 11:58:11 -07:00
|
|
|
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
|
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
|
|
|
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
|
|
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
2023-02-17 01:54:07 -08:00
|
|
|
import type {
|
|
|
|
IDeferredPromise,
|
|
|
|
IExecuteResponsePromiseData,
|
|
|
|
IRun,
|
|
|
|
ExecutionStatus,
|
|
|
|
} from 'n8n-workflow';
|
|
|
|
import { createDeferredPromise, LoggerProxy } from 'n8n-workflow';
|
2019-08-08 11:38:25 -07:00
|
|
|
|
2022-12-06 03:16:49 -08:00
|
|
|
import type { ChildProcess } from 'child_process';
|
2023-01-27 05:56:56 -08:00
|
|
|
import type PCancelable from 'p-cancelable';
|
|
|
|
import type {
|
2019-08-08 11:38:25 -07:00
|
|
|
IExecutingWorkflowData,
|
2021-02-08 23:59:32 -08:00
|
|
|
IExecutionDb,
|
2020-10-22 06:46:03 -07:00
|
|
|
IExecutionsCurrentSummary,
|
2019-08-08 11:38:25 -07:00
|
|
|
IWorkflowExecutionDataProcess,
|
2022-11-09 06:25:00 -08:00
|
|
|
} from '@/Interfaces';
|
2023-03-16 07:34:13 -07:00
|
|
|
import { isWorkflowIdValid } from '@/utils';
|
2023-06-20 10:13:18 -07:00
|
|
|
import Container, { Service } from 'typedi';
|
|
|
|
import { ExecutionRepository } from './databases/repositories';
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2023-02-21 10:21:56 -08:00
|
|
|
@Service()
|
2019-06-23 03:35:23 -07:00
|
|
|
export class ActiveExecutions {
|
|
|
|
private activeExecutions: {
|
|
|
|
[index: string]: IExecutingWorkflowData;
|
|
|
|
} = {};
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Add a new active execution
|
|
|
|
*/
|
2021-08-29 11:58:11 -07:00
|
|
|
async add(
|
|
|
|
executionData: IWorkflowExecutionDataProcess,
|
|
|
|
process?: ChildProcess,
|
|
|
|
executionId?: string,
|
|
|
|
): Promise<string> {
|
2023-02-17 01:54:07 -08:00
|
|
|
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
|
2021-08-21 05:11:32 -07:00
|
|
|
if (executionId === undefined) {
|
|
|
|
// Is a new execution so save in DB
|
2021-02-08 23:59:32 -08:00
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
const fullExecutionData: IExecutionDb = {
|
|
|
|
data: executionData.executionData!,
|
|
|
|
mode: executionData.executionMode,
|
|
|
|
finished: false,
|
|
|
|
startedAt: new Date(),
|
|
|
|
workflowData: executionData.workflowData,
|
2023-02-17 01:54:07 -08:00
|
|
|
status: executionStatus,
|
2021-08-21 05:11:32 -07:00
|
|
|
};
|
2021-02-08 23:59:32 -08:00
|
|
|
|
2021-08-21 05:11:32 -07:00
|
|
|
if (executionData.retryOf !== undefined) {
|
|
|
|
fullExecutionData.retryOf = executionData.retryOf.toString();
|
|
|
|
}
|
2021-05-29 11:31:21 -07:00
|
|
|
|
2023-01-02 08:42:32 -08:00
|
|
|
const workflowId = executionData.workflowData.id;
|
2023-03-16 07:34:13 -07:00
|
|
|
if (workflowId !== undefined && isWorkflowIdValid(workflowId)) {
|
2023-01-02 08:42:32 -08:00
|
|
|
fullExecutionData.workflowId = workflowId;
|
2021-08-21 05:11:32 -07:00
|
|
|
}
|
2021-02-08 23:59:32 -08:00
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
const executionResult = await Container.get(ExecutionRepository).createNewExecution(
|
|
|
|
fullExecutionData,
|
|
|
|
);
|
|
|
|
executionId = executionResult.id;
|
2023-02-17 01:54:07 -08:00
|
|
|
if (executionId === undefined) {
|
|
|
|
throw new Error('There was an issue assigning an execution id to the execution');
|
|
|
|
}
|
|
|
|
executionStatus = 'running';
|
2021-08-21 05:11:32 -07:00
|
|
|
} else {
|
|
|
|
// Is an existing execution we want to finish so update in DB
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
|
2021-08-21 05:11:32 -07:00
|
|
|
id: executionId,
|
2023-06-20 10:13:18 -07:00
|
|
|
data: executionData.executionData!,
|
2021-08-21 05:11:32 -07:00
|
|
|
waitTill: null,
|
2023-02-17 01:54:07 -08:00
|
|
|
status: executionStatus,
|
2021-08-21 05:11:32 -07:00
|
|
|
};
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
await Container.get(ExecutionRepository).updateExistingExecution(executionId, execution);
|
2021-08-21 05:11:32 -07:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
|
|
|
this.activeExecutions[executionId] = {
|
2019-08-08 11:38:25 -07:00
|
|
|
executionData,
|
|
|
|
process,
|
2019-07-22 11:29:06 -07:00
|
|
|
startedAt: new Date(),
|
2019-06-23 03:35:23 -07:00
|
|
|
postExecutePromises: [],
|
2023-02-17 01:54:07 -08:00
|
|
|
status: executionStatus,
|
2019-06-23 03:35:23 -07:00
|
|
|
};
|
|
|
|
|
2021-02-08 23:59:32 -08:00
|
|
|
return executionId;
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
2020-01-17 17:34:31 -08:00
|
|
|
/**
|
|
|
|
* Attaches an execution
|
|
|
|
*
|
|
|
|
*/
|
2021-08-29 11:58:11 -07:00
|
|
|
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
2020-01-17 17:34:31 -08:00
|
|
|
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
2021-08-29 11:58:11 -07:00
|
|
|
throw new Error(
|
|
|
|
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
|
|
|
|
);
|
2020-01-17 17:34:31 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
this.activeExecutions[executionId].workflowExecution = workflowExecution;
|
|
|
|
}
|
|
|
|
|
2021-11-05 09:45:51 -07:00
|
|
|
attachResponsePromise(
|
|
|
|
executionId: string,
|
|
|
|
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
|
|
|
|
): void {
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
throw new Error(
|
|
|
|
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
this.activeExecutions[executionId].responsePromise = responsePromise;
|
|
|
|
}
|
|
|
|
|
|
|
|
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
|
|
this.activeExecutions[executionId].responsePromise?.resolve(response);
|
|
|
|
}
|
|
|
|
|
2019-06-23 03:35:23 -07:00
|
|
|
/**
|
|
|
|
* Remove an active execution
|
|
|
|
*
|
|
|
|
*/
|
2019-08-08 11:38:25 -07:00
|
|
|
remove(executionId: string, fullRunData?: IRun): void {
|
2019-06-23 03:35:23 -07:00
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Resolve all the waiting promises
|
2021-08-29 11:58:11 -07:00
|
|
|
// eslint-disable-next-line no-restricted-syntax
|
2019-06-23 03:35:23 -07:00
|
|
|
for (const promise of this.activeExecutions[executionId].postExecutePromises) {
|
|
|
|
promise.resolve(fullRunData);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remove from the list of active executions
|
|
|
|
delete this.activeExecutions[executionId];
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Forces an execution to stop
|
|
|
|
*
|
|
|
|
* @param {string} executionId The id of the execution to stop
|
2020-07-29 05:12:54 -07:00
|
|
|
* @param {string} timeout String 'timeout' given if stop due to timeout
|
2019-06-23 03:35:23 -07:00
|
|
|
*/
|
2020-07-29 05:12:54 -07:00
|
|
|
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
|
2019-06-23 03:35:23 -07:00
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
// There is no execution running with that id
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2019-08-08 11:38:25 -07:00
|
|
|
// In case something goes wrong make sure that promise gets first
|
|
|
|
// returned that it gets then also resolved correctly.
|
2020-01-17 17:34:31 -08:00
|
|
|
if (this.activeExecutions[executionId].process !== undefined) {
|
|
|
|
// Workflow is running in subprocess
|
2020-07-29 05:12:54 -07:00
|
|
|
if (this.activeExecutions[executionId].process!.connected) {
|
|
|
|
setTimeout(() => {
|
2021-08-29 11:58:11 -07:00
|
|
|
// execute on next event loop tick;
|
2020-01-17 17:34:31 -08:00
|
|
|
this.activeExecutions[executionId].process!.send({
|
2021-08-29 11:58:11 -07:00
|
|
|
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
|
|
|
|
type: timeout || 'stopExecution',
|
2020-01-17 17:34:31 -08:00
|
|
|
});
|
2020-07-29 05:19:35 -07:00
|
|
|
}, 1);
|
2020-07-29 05:12:54 -07:00
|
|
|
}
|
2020-01-17 17:34:31 -08:00
|
|
|
} else {
|
|
|
|
// Workflow is running in current process
|
2020-07-29 05:12:54 -07:00
|
|
|
this.activeExecutions[executionId].workflowExecution!.cancel();
|
2020-01-17 17:34:31 -08:00
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
|
2021-08-29 11:58:11 -07:00
|
|
|
// eslint-disable-next-line consistent-return
|
2019-06-23 03:35:23 -07:00
|
|
|
return this.getPostExecutePromise(executionId);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns a promise which will resolve with the data of the execution
|
|
|
|
* with the given id
|
|
|
|
*
|
|
|
|
* @param {string} executionId The id of the execution to wait for
|
|
|
|
*/
|
2019-08-08 11:38:25 -07:00
|
|
|
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
|
2019-06-23 03:35:23 -07:00
|
|
|
// Create the promise which will be resolved when the execution finished
|
2019-08-08 11:38:25 -07:00
|
|
|
const waitPromise = await createDeferredPromise<IRun | undefined>();
|
2019-06-23 03:35:23 -07:00
|
|
|
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
throw new Error(`There is no active execution with id "${executionId}".`);
|
|
|
|
}
|
|
|
|
|
|
|
|
this.activeExecutions[executionId].postExecutePromises.push(waitPromise);
|
|
|
|
|
2021-11-05 09:45:51 -07:00
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access
|
2019-06-23 03:35:23 -07:00
|
|
|
return waitPromise.promise();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns all the currently active executions
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
getActiveExecutions(): IExecutionsCurrentSummary[] {
|
|
|
|
const returnData: IExecutionsCurrentSummary[] = [];
|
|
|
|
|
2019-08-08 11:38:25 -07:00
|
|
|
let data;
|
2021-08-29 11:58:11 -07:00
|
|
|
// eslint-disable-next-line no-restricted-syntax
|
2019-06-23 03:35:23 -07:00
|
|
|
for (const id of Object.keys(this.activeExecutions)) {
|
2019-08-08 11:38:25 -07:00
|
|
|
data = this.activeExecutions[id];
|
2021-08-29 11:58:11 -07:00
|
|
|
returnData.push({
|
|
|
|
id,
|
|
|
|
retryOf: data.executionData.retryOf as string | undefined,
|
|
|
|
startedAt: data.startedAt,
|
|
|
|
mode: data.executionData.executionMode,
|
|
|
|
workflowId: data.executionData.workflowData.id! as string,
|
2023-02-17 01:54:07 -08:00
|
|
|
status: data.status,
|
2021-08-29 11:58:11 -07:00
|
|
|
});
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return returnData;
|
|
|
|
}
|
2023-02-17 01:54:07 -08:00
|
|
|
|
|
|
|
async setStatus(executionId: string, status: ExecutionStatus): Promise<void> {
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
LoggerProxy.debug(
|
|
|
|
`There is no active execution with id "${executionId}", can't update status to ${status}.`,
|
|
|
|
);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.activeExecutions[executionId].status = status;
|
|
|
|
}
|
|
|
|
|
|
|
|
getStatus(executionId: string): ExecutionStatus {
|
|
|
|
if (this.activeExecutions[executionId] === undefined) {
|
|
|
|
return 'unknown';
|
|
|
|
}
|
|
|
|
|
|
|
|
return this.activeExecutions[executionId].status;
|
|
|
|
}
|
2019-06-23 03:35:23 -07:00
|
|
|
}
|