diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index d2cf986af2..af3958c25a 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -333,7 +333,9 @@ export abstract class BaseCommand extends Command { this.logger.info(`Received ${signal}. Shutting down...`); this.shutdownService.shutdown(); - await Promise.all([this.stopProcess(), this.shutdownService.waitForShutdown()]); + await this.shutdownService.waitForShutdown(); + + await this.stopProcess(); clearTimeout(forceShutdownTimer); }; diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 1473ed13ff..5e75e4792d 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -2,7 +2,7 @@ import { Container } from 'typedi'; import { Flags, type Config } from '@oclif/core'; import express from 'express'; import http from 'http'; -import { sleep, ApplicationError } from 'n8n-workflow'; +import { ApplicationError } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; @@ -61,23 +61,6 @@ export class Worker extends BaseCommand { try { await this.externalHooks?.run('n8n.stop', []); - - const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000; - - // Wait for active workflow executions to finish - let count = 0; - while (this.jobProcessor.getRunningJobIds().length !== 0) { - if (count++ % 4 === 0) { - const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000); - this.logger.info( - `Waiting for ${ - Object.keys(this.jobProcessor.getRunningJobIds()).length - } active executions to finish... (max wait ${waitLeft} more seconds)`, - ); - } - - await sleep(500); - } } catch (error) { await this.exitWithCrash('There was an error shutting down n8n.', error); } diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 6883a96ffd..0603309e20 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -7,6 +7,7 @@ import type { Job, JobData, JobOptions, JobQueue } from '../types'; import { ApplicationError } from 'n8n-workflow'; import { mockInstance } from '@test/mocking'; import { GlobalConfig } from '@n8n/config'; +import type { JobProcessor } from '../job-processor'; const queue = mock({ client: { ping: jest.fn() }, @@ -100,23 +101,27 @@ describe('ScalingService', () => { }); }); - describe('pauseQueue', () => { - it('should pause the queue', async () => { + describe('stop', () => { + it('should pause the queue and check for running jobs', async () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); + const jobProcessor = mock(); + const scalingService = new ScalingService(mock(), mock(), jobProcessor, globalConfig); await scalingService.setupQueue(); + jobProcessor.getRunningJobIds.mockReturnValue([]); + const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount'); /** * Act */ - await scalingService.pauseQueue(); + await scalingService.stop(); /** * Assert */ expect(queue.pause).toHaveBeenCalledWith(true, true); + expect(getRunningJobsCountSpy).toHaveBeenCalled(); }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index f26feebc58..a30e797ccb 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,5 +1,5 @@ import Container, { Service } from 'typedi'; -import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow'; +import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import { Logger } from '@/Logger'; @@ -59,10 +59,22 @@ export class ScalingService { } @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) - async pauseQueue() { + async stop() { await this.queue.pause(true, true); this.logger.debug('[ScalingService] Queue paused'); + + let count = 0; + + while (this.getRunningJobsCount() !== 0) { + if (count++ % 4 === 0) { + this.logger.info( + `Waiting for ${this.getRunningJobsCount()} active executions to finish...`, + ); + } + + await sleep(500); + } } async pingQueue() { @@ -113,6 +125,10 @@ export class ScalingService { } } + getRunningJobsCount() { + return this.jobProcessor.getRunningJobIds().length; + } + // #endregion // #region Listeners