refactor(core): Make PruningService.init and WaitTracker.init consistent (no-changelog) (#9761)

This commit is contained in:
Iván Ovejero 2024-06-17 12:49:40 +02:00 committed by GitHub
parent 7c70b782a1
commit f7352b6a8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 43 additions and 30 deletions

View file

@ -30,20 +30,20 @@ export class WaitTracker {
private readonly orchestrationService: OrchestrationService, private readonly orchestrationService: OrchestrationService,
) {} ) {}
/**
* @important Requires `OrchestrationService` to be initialized.
*/
init() { init() {
const { isSingleMainSetup, isLeader, multiMainSetup } = this.orchestrationService; const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;
if (isSingleMainSetup) {
this.startTracking();
return;
}
if (isLeader) this.startTracking(); if (isLeader) this.startTracking();
multiMainSetup if (isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup
.on('leader-takeover', () => this.startTracking()) .on('leader-takeover', () => this.startTracking())
.on('leader-stepdown', () => this.stopTracking()); .on('leader-stepdown', () => this.stopTracking());
} }
}
private startTracking() { private startTracking() {
this.logger.debug('Wait tracker started tracking waiting executions'); this.logger.debug('Wait tracker started tracking waiting executions');

View file

@ -199,7 +199,10 @@ export class Start extends BaseCommand {
} }
async initOrchestration() { async initOrchestration() {
if (config.getEnv('executions.mode') !== 'queue') return; if (config.getEnv('executions.mode') === 'regular') {
config.set('multiMainSetup.instanceType', 'leader');
return;
}
if ( if (
config.getEnv('multiMainSetup.enabled') && config.getEnv('multiMainSetup.enabled') &&
@ -290,7 +293,7 @@ export class Start extends BaseCommand {
await this.server.start(); await this.server.start();
await this.initPruning(); Container.get(PruningService).init();
if (config.getEnv('executions.mode') === 'regular') { if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions(); await this.runEnqueuedExecutions();
@ -333,24 +336,6 @@ export class Start extends BaseCommand {
} }
} }
async initPruning() {
this.pruningService = Container.get(PruningService);
this.pruningService.startPruning();
if (config.getEnv('executions.mode') !== 'queue') return;
const orchestrationService = Container.get(OrchestrationService);
await orchestrationService.init();
if (!orchestrationService.isMultiMainSetupEnabled) return;
orchestrationService.multiMainSetup
.on('leader-stepdown', () => this.pruningService.stopPruning())
.on('leader-takeover', () => this.pruningService.startPruning());
}
async catch(error: Error) { async catch(error: Error) {
if (error.stack) this.logger.error(error.stack); if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error); await this.exitWithCrash('Exiting due to an error.', error);

View file

@ -6,6 +6,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { jsonStringify } from 'n8n-workflow'; import { jsonStringify } from 'n8n-workflow';
import { OnShutdown } from '@/decorators/OnShutdown'; import { OnShutdown } from '@/decorators/OnShutdown';
import { OrchestrationService } from './orchestration.service';
@Service() @Service()
export class PruningService { export class PruningService {
@ -26,8 +27,23 @@ export class PruningService {
private readonly logger: Logger, private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService, private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService,
) {} ) {}
/**
* @important Requires `OrchestrationService` to be initialized.
*/
init() {
const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;
if (isLeader) this.startPruning();
if (isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning());
this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning());
}
}
private isPruningEnabled() { private isPruningEnabled() {
if ( if (
!config.getEnv('executions.pruneData') || !config.getEnv('executions.pruneData') ||

View file

@ -14,6 +14,8 @@ import { Logger } from '@/Logger';
import { mockInstance } from '../shared/mocking'; import { mockInstance } from '../shared/mocking';
import { createWorkflow } from './shared/db/workflows'; import { createWorkflow } from './shared/db/workflows';
import { createExecution, createSuccessfulExecution } from './shared/db/executions'; import { createExecution, createSuccessfulExecution } from './shared/db/executions';
import { mock } from 'jest-mock-extended';
import type { OrchestrationService } from '@/services/orchestration.service';
describe('softDeleteOnPruningCycle()', () => { describe('softDeleteOnPruningCycle()', () => {
let pruningService: PruningService; let pruningService: PruningService;
@ -29,6 +31,7 @@ describe('softDeleteOnPruningCycle()', () => {
mockInstance(Logger), mockInstance(Logger),
Container.get(ExecutionRepository), Container.get(ExecutionRepository),
mockInstance(BinaryDataService), mockInstance(BinaryDataService),
mock<OrchestrationService>(),
); );
workflow = await createWorkflow(); workflow = await createWorkflow();

View file

@ -34,7 +34,8 @@ describe('WaitTracker', () => {
}); });
describe('init()', () => { describe('init()', () => {
it('should query DB for waiting executions', async () => { it('should query DB for waiting executions if leader', async () => {
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]); executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
waitTracker.init(); waitTracker.init();
@ -42,6 +43,14 @@ describe('WaitTracker', () => {
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
}); });
it('if follower, should do nothing', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
waitTracker.init();
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
});
it('if no executions to start, should do nothing', () => { it('if no executions to start, should do nothing', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]); executionRepository.getWaitingExecutions.mockResolvedValue([]);