diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index abcc729927..fdb97b6867 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -6,7 +6,7 @@ import type { IRun, ExecutionStatus, } from 'n8n-workflow'; -import { ApplicationError, WorkflowOperationError, createDeferredPromise } from 'n8n-workflow'; +import { ApplicationError, createDeferredPromise, sleep } from 'n8n-workflow'; import type { ExecutionPayload, @@ -22,7 +22,7 @@ import { Logger } from '@/Logger'; @Service() export class ActiveExecutions { private activeExecutions: { - [index: string]: IExecutingWorkflowData; + [executionId: string]: IExecutingWorkflowData; } = {}; constructor( @@ -87,32 +87,17 @@ export class ActiveExecutions { /** * Attaches an execution - * */ attachWorkflowExecution(executionId: string, workflowExecution: PCancelable) { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - throw new ApplicationError('No active execution found to attach to workflow execution to', { - extra: { executionId }, - }); - } - - execution.workflowExecution = workflowExecution; + this.getExecution(executionId).workflowExecution = workflowExecution; } attachResponsePromise( executionId: string, responsePromise: IDeferredPromise, ): void { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - throw new ApplicationError('No active execution found to attach to workflow execution to', { - extra: { executionId }, - }); - } - - execution.responsePromise = responsePromise; + this.getExecution(executionId).responsePromise = responsePromise; } resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { @@ -126,7 +111,6 @@ export class ActiveExecutions { /** * Remove an active execution - * */ remove(executionId: string, fullRunData?: IRun): void { const execution = this.activeExecutions[executionId]; @@ -135,7 +119,6 @@ export class ActiveExecutions { } // Resolve all the waiting promises - for (const promise of execution.postExecutePromises) { promise.resolve(fullRunData); } @@ -160,28 +143,17 @@ export class ActiveExecutions { } /** - * Returns a promise which will resolve with the data of the execution - * with the given id - * - * @param {string} executionId The id of the execution to wait for + * Returns a promise which will resolve with the data of the execution with the given id */ async getPostExecutePromise(executionId: string): Promise { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - throw new WorkflowOperationError(`There is no active execution with id "${executionId}".`); - } - // Create the promise which will be resolved when the execution finished const waitPromise = await createDeferredPromise(); - - execution.postExecutePromises.push(waitPromise); - + this.getExecution(executionId).postExecutePromises.push(waitPromise); return await waitPromise.promise(); } /** * Returns all the currently active executions - * */ getActiveExecutions(): IExecutionsCurrentSummary[] { const returnData: IExecutionsCurrentSummary[] = []; @@ -203,20 +175,42 @@ export class ActiveExecutions { return returnData; } - async setStatus(executionId: string, status: ExecutionStatus): Promise { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - this.logger.debug( - `There is no active execution with id "${executionId}", can't update status to ${status}.`, - ); - return; - } - - execution.status = status; + setStatus(executionId: string, status: ExecutionStatus) { + this.getExecution(executionId).status = status; } getStatus(executionId: string): ExecutionStatus { + return this.getExecution(executionId).status; + } + + /** Wait for all active executions to finish */ + async shutdown(cancelAll = false) { + let executionIds = Object.keys(this.activeExecutions); + + if (cancelAll) { + const stopPromises = executionIds.map( + async (executionId) => await this.stopExecution(executionId), + ); + + await Promise.allSettled(stopPromises); + } + + let count = 0; + while (executionIds.length !== 0) { + if (count++ % 4 === 0) { + this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`); + } + + await sleep(500); + executionIds = Object.keys(this.activeExecutions); + } + } + + private getExecution(executionId: string): IExecutingWorkflowData { const execution = this.activeExecutions[executionId]; - return execution?.status ?? 'unknown'; + if (!execution) { + throw new ApplicationError('No active execution found', { extra: { executionId } }); + } + return execution; } } diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index f9bbb83c8b..cc146008f4 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -930,11 +930,7 @@ export function setExecutionStatus(status: ExecutionStatus) { return; } logger.debug(`Setting execution status for ${this.executionId} to "${status}"`); - Container.get(ActiveExecutions) - .setStatus(this.executionId, status) - .catch((error) => { - logger.debug(`Setting execution status "${status}" failed: ${error.message}`); - }); + Container.get(ActiveExecutions).setStatus(this.executionId, status); } export function sendDataToUI(type: string, data: IDataObject | IDataObject[]) { diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index c26c631a2a..6f4bc6eee6 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -35,7 +35,6 @@ import { Queue } from '@/Queue'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; -import { initErrorHandling } from '@/ErrorReporting'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { InternalHooks } from '@/InternalHooks'; import { Logger } from '@/Logger'; @@ -55,7 +54,11 @@ export class WorkflowRunner { private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly nodeTypes: NodeTypes, private readonly permissionChecker: PermissionChecker, - ) {} + ) { + if (this.executionsMode === 'queue') { + this.jobQueue = Container.get(Queue); + } + } /** The process did error */ async processError( @@ -150,27 +153,21 @@ export class WorkflowRunner { data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, - executionId?: string, + restartExecutionId?: string, responsePromise?: IDeferredPromise, ): Promise { - await initErrorHandling(); - - if (this.executionsMode === 'queue') { - this.jobQueue = Container.get(Queue); + // Register a new execution + const executionId = await this.activeExecutions.add(data, restartExecutionId); + if (responsePromise) { + this.activeExecutions.attachResponsePromise(executionId, responsePromise); } if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { // Do not run "manual" executions in bull because sending events to the // frontend would not be possible - executionId = await this.enqueueExecution( - data, - loadStaticData, - realtime, - executionId, - responsePromise, - ); + await this.enqueueExecution(executionId, data, loadStaticData, realtime); } else { - executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); + await this.runMainProcess(executionId, data, loadStaticData, executionId); void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); } @@ -185,7 +182,7 @@ export class WorkflowRunner { postExecutePromise .then(async (executionData) => { void Container.get(InternalHooks).onWorkflowPostExecute( - executionId!, + executionId, data.workflowData, executionData, data.userId, @@ -214,11 +211,11 @@ export class WorkflowRunner { /** Run the workflow in current process */ private async runMainProcess( + executionId: string, data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, - responsePromise?: IDeferredPromise, - ): Promise { + ): Promise { const workflowId = data.workflowData.id; if (loadStaticData === true && workflowId) { data.workflowData.staticData = @@ -257,10 +254,9 @@ export class WorkflowRunner { undefined, workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, ); + // TODO: set this in queue mode as well additionalData.restartExecutionId = restartExecutionId; - // Register the active execution - const executionId = await this.activeExecutions.add(data, restartExecutionId); additionalData.executionId = executionId; this.logger.verbose( @@ -290,14 +286,12 @@ export class WorkflowRunner { ); await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]); this.activeExecutions.remove(executionId, failedExecution); - return executionId; + return; } additionalData.hooks.hookFunctions.sendResponse = [ async (response: IExecuteResponsePromiseData): Promise => { - if (responsePromise) { - responsePromise.resolve(response); - } + this.activeExecutions.resolveResponsePromise(executionId, response); }, ]; @@ -391,25 +385,14 @@ export class WorkflowRunner { throw error; } - - return executionId; } private async enqueueExecution( + executionId: string, data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, - restartExecutionId?: string, - responsePromise?: IDeferredPromise, - ): Promise { - // TODO: If "loadStaticData" is set to true it has to load data new on worker - - // Register the active execution - const executionId = await this.activeExecutions.add(data, restartExecutionId); - if (responsePromise) { - this.activeExecutions.attachResponsePromise(executionId, responsePromise); - } - + ): Promise { const jobData: JobData = { executionId, loadStaticData: !!loadStaticData, @@ -601,6 +584,5 @@ export class WorkflowRunner { }); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); - return executionId; } } diff --git a/packages/cli/src/commands/executeBatch.ts b/packages/cli/src/commands/executeBatch.ts index cc50809622..483b4a3dc4 100644 --- a/packages/cli/src/commands/executeBatch.ts +++ b/packages/cli/src/commands/executeBatch.ts @@ -4,7 +4,7 @@ import { Flags } from '@oclif/core'; import fs from 'fs'; import os from 'os'; import type { IRun, ITaskData } from 'n8n-workflow'; -import { ApplicationError, jsonParse, sleep } from 'n8n-workflow'; +import { ApplicationError, jsonParse } from 'n8n-workflow'; import { sep } from 'path'; import { diff } from 'json-diff'; import pick from 'lodash/pick'; @@ -118,28 +118,9 @@ export class ExecuteBatch extends BaseCommand { } ExecuteBatch.cancelled = true; - const activeExecutionsInstance = Container.get(ActiveExecutions); - const stopPromises = activeExecutionsInstance - .getActiveExecutions() - .map(async (execution) => await activeExecutionsInstance.stopExecution(execution.id)); - await Promise.allSettled(stopPromises); + await Container.get(ActiveExecutions).shutdown(true); - setTimeout(() => process.exit(0), 30000); - - let executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - - let count = 0; - while (executingWorkflows.length !== 0) { - if (count++ % 4 === 0) { - console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`); - executingWorkflows.map((execution) => { - console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`); - }); - } - await sleep(500); - executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - } // We may receive true but when called from `process.on` // we get the signal (SIGINT, etc.) if (skipExit !== true) { diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index da23ba0031..ad09299089 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -8,7 +8,7 @@ import { createReadStream, createWriteStream, existsSync } from 'fs'; import { pipeline } from 'stream/promises'; import replaceStream from 'replacestream'; import glob from 'fast-glob'; -import { sleep, jsonParse } from 'n8n-workflow'; +import { jsonParse } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; @@ -106,23 +106,7 @@ export class Start extends BaseCommand { await Container.get(InternalHooks).onN8nStop(); - // Wait for active workflow executions to finish - const activeExecutionsInstance = Container.get(ActiveExecutions); - let executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - - let count = 0; - while (executingWorkflows.length !== 0) { - if (count++ % 4 === 0) { - console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`); - - executingWorkflows.map((execution) => { - console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`); - }); - } - - await sleep(500); - executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - } + await Container.get(ActiveExecutions).shutdown(); // Finally shut down Event Bus await Container.get(MessageEventBus).close(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 2be3031ad4..5b72c1eb86 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -1,6 +1,6 @@ import { Container } from 'typedi'; import { Flags, type Config } from '@oclif/core'; -import { ApplicationError, sleep } from 'n8n-workflow'; +import { ApplicationError } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; @@ -42,21 +42,7 @@ export class Webhook extends BaseCommand { try { await this.externalHooks?.run('n8n.stop', []); - // Wait for active workflow executions to finish - const activeExecutionsInstance = Container.get(ActiveExecutions); - let executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - - let count = 0; - while (executingWorkflows.length !== 0) { - if (count++ % 4 === 0) { - this.logger.info( - `Waiting for ${executingWorkflows.length} active executions to finish...`, - ); - } - - await sleep(500); - executingWorkflows = activeExecutionsInstance.getActiveExecutions(); - } + await Container.get(ActiveExecutions).shutdown(); } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); }