From c863abd08300b53ea898fc4d06aae97dec7afa9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 22 Oct 2024 17:11:53 +0200 Subject: [PATCH] fix(core): Account for waiting jobs during shutdown (#11338) --- .../scaling/__tests__/scaling.service.test.ts | 40 +++++++++++++++---- packages/cli/src/scaling/scaling.service.ts | 18 +++++++-- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index a6c14ab964..0b5f80da48 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -56,6 +56,7 @@ describe('ScalingService', () => { let registerWorkerListenersSpy: jest.SpyInstance; let scheduleQueueRecoverySpy: jest.SpyInstance; let stopQueueRecoverySpy: jest.SpyInstance; + let stopQueueMetricsSpy: jest.SpyInstance; let getRunningJobsCountSpy: jest.SpyInstance; const bullConstructorArgs = [ @@ -99,6 +100,9 @@ describe('ScalingService', () => { scheduleQueueRecoverySpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); // @ts-expect-error Private method stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery'); + + // @ts-expect-error Private method + stopQueueMetricsSpy = jest.spyOn(scalingService, 'stopQueueMetrics'); }); describe('setupQueue', () => { @@ -180,15 +184,37 @@ describe('ScalingService', () => { }); describe('stop', () => { - it('should pause queue, wait for running jobs, stop queue recovery', async () => { - await scalingService.setupQueue(); - jobProcessor.getRunningJobIds.mockReturnValue([]); + describe('if main', () => { + it('should pause queue, stop queue recovery and queue metrics', async () => { + // @ts-expect-error readonly property + instanceSettings.instanceType = 'main'; + await scalingService.setupQueue(); + // @ts-expect-error readonly property + scalingService.queueRecoveryContext.timeout = 1; + jest.spyOn(scalingService, 'isQueueMetricsEnabled', 'get').mockReturnValue(true); - await scalingService.stop(); + await scalingService.stop(); - expect(queue.pause).toHaveBeenCalledWith(true, true); - expect(stopQueueRecoverySpy).toHaveBeenCalled(); - expect(getRunningJobsCountSpy).toHaveBeenCalled(); + expect(getRunningJobsCountSpy).not.toHaveBeenCalled(); + expect(queue.pause).toHaveBeenCalledWith(true, true); + expect(stopQueueRecoverySpy).toHaveBeenCalled(); + expect(stopQueueMetricsSpy).toHaveBeenCalled(); + }); + }); + + describe('if worker', () => { + it('should wait for running jobs to finish', async () => { + // @ts-expect-error readonly property + instanceSettings.instanceType = 'worker'; + await scalingService.setupQueue(); + jobProcessor.getRunningJobIds.mockReturnValue([]); + + await scalingService.stop(); + + expect(getRunningJobsCountSpy).toHaveBeenCalled(); + expect(queue.pause).not.toHaveBeenCalled(); + expect(stopQueueRecoverySpy).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 0645642ddb..5ce80f26a8 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -126,13 +126,23 @@ export class ScalingService { @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) async stop() { - await this.queue.pause(true, true); // no more jobs will be picked up + const { instanceType } = this.instanceSettings; - this.logger.debug('Queue paused'); + if (instanceType === 'main') await this.stopMain(); + else if (instanceType === 'worker') await this.stopWorker(); + } - this.stopQueueRecovery(); - this.stopQueueMetrics(); + private async stopMain() { + if (this.orchestrationService.isSingleMainSetup) { + await this.queue.pause(true, true); // no more jobs will be picked up + this.logger.debug('Queue paused'); + } + if (this.queueRecoveryContext.timeout) this.stopQueueRecovery(); + if (this.isQueueMetricsEnabled) this.stopQueueMetrics(); + } + + private async stopWorker() { let count = 0; while (this.getRunningJobsCount() !== 0) {