fix(core): Account for waiting jobs during shutdown (#11338)

This commit is contained in:
Iván Ovejero 2024-10-22 17:11:53 +02:00 committed by GitHub
parent 094ec68d4c
commit c863abd083
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 47 additions and 11 deletions

View file

@ -56,6 +56,7 @@ describe('ScalingService', () => {
let registerWorkerListenersSpy: jest.SpyInstance; let registerWorkerListenersSpy: jest.SpyInstance;
let scheduleQueueRecoverySpy: jest.SpyInstance; let scheduleQueueRecoverySpy: jest.SpyInstance;
let stopQueueRecoverySpy: jest.SpyInstance; let stopQueueRecoverySpy: jest.SpyInstance;
let stopQueueMetricsSpy: jest.SpyInstance;
let getRunningJobsCountSpy: jest.SpyInstance; let getRunningJobsCountSpy: jest.SpyInstance;
const bullConstructorArgs = [ const bullConstructorArgs = [
@ -99,6 +100,9 @@ describe('ScalingService', () => {
scheduleQueueRecoverySpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); scheduleQueueRecoverySpy = jest.spyOn(scalingService, 'scheduleQueueRecovery');
// @ts-expect-error Private method // @ts-expect-error Private method
stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery'); stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery');
// @ts-expect-error Private method
stopQueueMetricsSpy = jest.spyOn(scalingService, 'stopQueueMetrics');
}); });
describe('setupQueue', () => { describe('setupQueue', () => {
@ -180,15 +184,37 @@ describe('ScalingService', () => {
}); });
describe('stop', () => { describe('stop', () => {
it('should pause queue, wait for running jobs, stop queue recovery', async () => { describe('if main', () => {
await scalingService.setupQueue(); it('should pause queue, stop queue recovery and queue metrics', async () => {
jobProcessor.getRunningJobIds.mockReturnValue([]); // @ts-expect-error readonly property
instanceSettings.instanceType = 'main';
await scalingService.setupQueue();
// @ts-expect-error readonly property
scalingService.queueRecoveryContext.timeout = 1;
jest.spyOn(scalingService, 'isQueueMetricsEnabled', 'get').mockReturnValue(true);
await scalingService.stop(); await scalingService.stop();
expect(queue.pause).toHaveBeenCalledWith(true, true); expect(getRunningJobsCountSpy).not.toHaveBeenCalled();
expect(stopQueueRecoverySpy).toHaveBeenCalled(); expect(queue.pause).toHaveBeenCalledWith(true, true);
expect(getRunningJobsCountSpy).toHaveBeenCalled(); expect(stopQueueRecoverySpy).toHaveBeenCalled();
expect(stopQueueMetricsSpy).toHaveBeenCalled();
});
});
describe('if worker', () => {
it('should wait for running jobs to finish', async () => {
// @ts-expect-error readonly property
instanceSettings.instanceType = 'worker';
await scalingService.setupQueue();
jobProcessor.getRunningJobIds.mockReturnValue([]);
await scalingService.stop();
expect(getRunningJobsCountSpy).toHaveBeenCalled();
expect(queue.pause).not.toHaveBeenCalled();
expect(stopQueueRecoverySpy).not.toHaveBeenCalled();
});
}); });
}); });

View file

@ -126,13 +126,23 @@ export class ScalingService {
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY) @OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
async stop() { async stop() {
await this.queue.pause(true, true); // no more jobs will be picked up const { instanceType } = this.instanceSettings;
this.logger.debug('Queue paused'); if (instanceType === 'main') await this.stopMain();
else if (instanceType === 'worker') await this.stopWorker();
}
this.stopQueueRecovery(); private async stopMain() {
this.stopQueueMetrics(); if (this.orchestrationService.isSingleMainSetup) {
await this.queue.pause(true, true); // no more jobs will be picked up
this.logger.debug('Queue paused');
}
if (this.queueRecoveryContext.timeout) this.stopQueueRecovery();
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
}
private async stopWorker() {
let count = 0; let count = 0;
while (this.getRunningJobsCount() !== 0) { while (this.getRunningJobsCount() !== 0) {