refactor(core): Couple of refactors on WorkflowRunner and ActiveExecutions (no-changelog) (#8487)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-02-06 18:09:50 +01:00 committed by GitHub
parent dc068ce2e6
commit c04f92f7fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 66 additions and 143 deletions

View file

@ -6,7 +6,7 @@ import type {
IRun, IRun,
ExecutionStatus, ExecutionStatus,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { ApplicationError, WorkflowOperationError, createDeferredPromise } from 'n8n-workflow'; import { ApplicationError, createDeferredPromise, sleep } from 'n8n-workflow';
import type { import type {
ExecutionPayload, ExecutionPayload,
@ -22,7 +22,7 @@ import { Logger } from '@/Logger';
@Service() @Service()
export class ActiveExecutions { export class ActiveExecutions {
private activeExecutions: { private activeExecutions: {
[index: string]: IExecutingWorkflowData; [executionId: string]: IExecutingWorkflowData;
} = {}; } = {};
constructor( constructor(
@ -87,32 +87,17 @@ export class ActiveExecutions {
/** /**
* Attaches an execution * Attaches an execution
*
*/ */
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) { attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
const execution = this.activeExecutions[executionId]; this.getExecution(executionId).workflowExecution = workflowExecution;
if (execution === undefined) {
throw new ApplicationError('No active execution found to attach to workflow execution to', {
extra: { executionId },
});
}
execution.workflowExecution = workflowExecution;
} }
attachResponsePromise( attachResponsePromise(
executionId: string, executionId: string,
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>, responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
): void { ): void {
const execution = this.activeExecutions[executionId]; this.getExecution(executionId).responsePromise = responsePromise;
if (execution === undefined) {
throw new ApplicationError('No active execution found to attach to workflow execution to', {
extra: { executionId },
});
}
execution.responsePromise = responsePromise;
} }
resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
@ -126,7 +111,6 @@ export class ActiveExecutions {
/** /**
* Remove an active execution * Remove an active execution
*
*/ */
remove(executionId: string, fullRunData?: IRun): void { remove(executionId: string, fullRunData?: IRun): void {
const execution = this.activeExecutions[executionId]; const execution = this.activeExecutions[executionId];
@ -135,7 +119,6 @@ export class ActiveExecutions {
} }
// Resolve all the waiting promises // Resolve all the waiting promises
for (const promise of execution.postExecutePromises) { for (const promise of execution.postExecutePromises) {
promise.resolve(fullRunData); promise.resolve(fullRunData);
} }
@ -160,28 +143,17 @@ export class ActiveExecutions {
} }
/** /**
* Returns a promise which will resolve with the data of the execution * Returns a promise which will resolve with the data of the execution with the given id
* with the given id
*
* @param {string} executionId The id of the execution to wait for
*/ */
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> { async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
throw new WorkflowOperationError(`There is no active execution with id "${executionId}".`);
}
// Create the promise which will be resolved when the execution finished // Create the promise which will be resolved when the execution finished
const waitPromise = await createDeferredPromise<IRun | undefined>(); const waitPromise = await createDeferredPromise<IRun | undefined>();
this.getExecution(executionId).postExecutePromises.push(waitPromise);
execution.postExecutePromises.push(waitPromise);
return await waitPromise.promise(); return await waitPromise.promise();
} }
/** /**
* Returns all the currently active executions * Returns all the currently active executions
*
*/ */
getActiveExecutions(): IExecutionsCurrentSummary[] { getActiveExecutions(): IExecutionsCurrentSummary[] {
const returnData: IExecutionsCurrentSummary[] = []; const returnData: IExecutionsCurrentSummary[] = [];
@ -203,20 +175,42 @@ export class ActiveExecutions {
return returnData; return returnData;
} }
async setStatus(executionId: string, status: ExecutionStatus): Promise<void> { setStatus(executionId: string, status: ExecutionStatus) {
const execution = this.activeExecutions[executionId]; this.getExecution(executionId).status = status;
if (execution === undefined) {
this.logger.debug(
`There is no active execution with id "${executionId}", can't update status to ${status}.`,
);
return;
}
execution.status = status;
} }
getStatus(executionId: string): ExecutionStatus { getStatus(executionId: string): ExecutionStatus {
return this.getExecution(executionId).status;
}
/** Wait for all active executions to finish */
async shutdown(cancelAll = false) {
let executionIds = Object.keys(this.activeExecutions);
if (cancelAll) {
const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);
await Promise.allSettled(stopPromises);
}
let count = 0;
while (executionIds.length !== 0) {
if (count++ % 4 === 0) {
this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`);
}
await sleep(500);
executionIds = Object.keys(this.activeExecutions);
}
}
private getExecution(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId]; const execution = this.activeExecutions[executionId];
return execution?.status ?? 'unknown'; if (!execution) {
throw new ApplicationError('No active execution found', { extra: { executionId } });
}
return execution;
} }
} }

View file

@ -930,11 +930,7 @@ export function setExecutionStatus(status: ExecutionStatus) {
return; return;
} }
logger.debug(`Setting execution status for ${this.executionId} to "${status}"`); logger.debug(`Setting execution status for ${this.executionId} to "${status}"`);
Container.get(ActiveExecutions) Container.get(ActiveExecutions).setStatus(this.executionId, status);
.setStatus(this.executionId, status)
.catch((error) => {
logger.debug(`Setting execution status "${status}" failed: ${error.message}`);
});
} }
export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) { export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) {

View file

@ -35,7 +35,6 @@ import { Queue } from '@/Queue';
import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
@ -55,7 +54,11 @@ export class WorkflowRunner {
private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes, private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker, private readonly permissionChecker: PermissionChecker,
) {} ) {
if (this.executionsMode === 'queue') {
this.jobQueue = Container.get(Queue);
}
}
/** The process did error */ /** The process did error */
async processError( async processError(
@ -150,27 +153,21 @@ export class WorkflowRunner {
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
realtime?: boolean, realtime?: boolean,
executionId?: string, restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> { ): Promise<string> {
await initErrorHandling(); // Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (this.executionsMode === 'queue') { if (responsePromise) {
this.jobQueue = Container.get(Queue); this.activeExecutions.attachResponsePromise(executionId, responsePromise);
} }
if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { if (this.executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the // Do not run "manual" executions in bull because sending events to the
// frontend would not be possible // frontend would not be possible
executionId = await this.enqueueExecution( await this.enqueueExecution(executionId, data, loadStaticData, realtime);
data,
loadStaticData,
realtime,
executionId,
responsePromise,
);
} else { } else {
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); await this.runMainProcess(executionId, data, loadStaticData, executionId);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
} }
@ -185,7 +182,7 @@ export class WorkflowRunner {
postExecutePromise postExecutePromise
.then(async (executionData) => { .then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute( void Container.get(InternalHooks).onWorkflowPostExecute(
executionId!, executionId,
data.workflowData, data.workflowData,
executionData, executionData,
data.userId, data.userId,
@ -214,11 +211,11 @@ export class WorkflowRunner {
/** Run the workflow in current process */ /** Run the workflow in current process */
private async runMainProcess( private async runMainProcess(
executionId: string,
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
restartExecutionId?: string, restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>, ): Promise<void> {
): Promise<string> {
const workflowId = data.workflowData.id; const workflowId = data.workflowData.id;
if (loadStaticData === true && workflowId) { if (loadStaticData === true && workflowId) {
data.workflowData.staticData = data.workflowData.staticData =
@ -257,10 +254,9 @@ export class WorkflowRunner {
undefined, undefined,
workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000,
); );
// TODO: set this in queue mode as well
additionalData.restartExecutionId = restartExecutionId; additionalData.restartExecutionId = restartExecutionId;
// Register the active execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
additionalData.executionId = executionId; additionalData.executionId = executionId;
this.logger.verbose( this.logger.verbose(
@ -290,14 +286,12 @@ export class WorkflowRunner {
); );
await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]); await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]);
this.activeExecutions.remove(executionId, failedExecution); this.activeExecutions.remove(executionId, failedExecution);
return executionId; return;
} }
additionalData.hooks.hookFunctions.sendResponse = [ additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => { async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) { this.activeExecutions.resolveResponsePromise(executionId, response);
responsePromise.resolve(response);
}
}, },
]; ];
@ -391,25 +385,14 @@ export class WorkflowRunner {
throw error; throw error;
} }
return executionId;
} }
private async enqueueExecution( private async enqueueExecution(
executionId: string,
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
realtime?: boolean, realtime?: boolean,
restartExecutionId?: string, ): Promise<void> {
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// TODO: If "loadStaticData" is set to true it has to load data new on worker
// Register the active execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
const jobData: JobData = { const jobData: JobData = {
executionId, executionId,
loadStaticData: !!loadStaticData, loadStaticData: !!loadStaticData,
@ -601,6 +584,5 @@ export class WorkflowRunner {
}); });
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
} }
} }

View file

@ -4,7 +4,7 @@ import { Flags } from '@oclif/core';
import fs from 'fs'; import fs from 'fs';
import os from 'os'; import os from 'os';
import type { IRun, ITaskData } from 'n8n-workflow'; import type { IRun, ITaskData } from 'n8n-workflow';
import { ApplicationError, jsonParse, sleep } from 'n8n-workflow'; import { ApplicationError, jsonParse } from 'n8n-workflow';
import { sep } from 'path'; import { sep } from 'path';
import { diff } from 'json-diff'; import { diff } from 'json-diff';
import pick from 'lodash/pick'; import pick from 'lodash/pick';
@ -118,28 +118,9 @@ export class ExecuteBatch extends BaseCommand {
} }
ExecuteBatch.cancelled = true; ExecuteBatch.cancelled = true;
const activeExecutionsInstance = Container.get(ActiveExecutions);
const stopPromises = activeExecutionsInstance
.getActiveExecutions()
.map(async (execution) => await activeExecutionsInstance.stopExecution(execution.id));
await Promise.allSettled(stopPromises); await Container.get(ActiveExecutions).shutdown(true);
setTimeout(() => process.exit(0), 30000);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
executingWorkflows.map((execution) => {
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
});
}
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
// We may receive true but when called from `process.on` // We may receive true but when called from `process.on`
// we get the signal (SIGINT, etc.) // we get the signal (SIGINT, etc.)
if (skipExit !== true) { if (skipExit !== true) {

View file

@ -8,7 +8,7 @@ import { createReadStream, createWriteStream, existsSync } from 'fs';
import { pipeline } from 'stream/promises'; import { pipeline } from 'stream/promises';
import replaceStream from 'replacestream'; import replaceStream from 'replacestream';
import glob from 'fast-glob'; import glob from 'fast-glob';
import { sleep, jsonParse } from 'n8n-workflow'; import { jsonParse } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
@ -106,23 +106,7 @@ export class Start extends BaseCommand {
await Container.get(InternalHooks).onN8nStop(); await Container.get(InternalHooks).onN8nStop();
// Wait for active workflow executions to finish await Container.get(ActiveExecutions).shutdown();
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
executingWorkflows.map((execution) => {
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
});
}
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
// Finally shut down Event Bus // Finally shut down Event Bus
await Container.get(MessageEventBus).close(); await Container.get(MessageEventBus).close();

View file

@ -1,6 +1,6 @@
import { Container } from 'typedi'; import { Container } from 'typedi';
import { Flags, type Config } from '@oclif/core'; import { Flags, type Config } from '@oclif/core';
import { ApplicationError, sleep } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow';
import config from '@/config'; import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions'; import { ActiveExecutions } from '@/ActiveExecutions';
@ -42,21 +42,7 @@ export class Webhook extends BaseCommand {
try { try {
await this.externalHooks?.run('n8n.stop', []); await this.externalHooks?.run('n8n.stop', []);
// Wait for active workflow executions to finish await Container.get(ActiveExecutions).shutdown();
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
this.logger.info(
`Waiting for ${executingWorkflows.length} active executions to finish...`,
);
}
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
} catch (error) { } catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error); await this.exitWithCrash('There was an error shutting down n8n.', error);
} }