mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 21:37:32 -08:00
fix(core): Start WaitTracker only in the main container (#9600)
This commit is contained in:
parent
d6db8cbf23
commit
08d9c9a787
|
@ -27,9 +27,11 @@ export class WaitTracker {
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly ownershipService: OwnershipService,
|
private readonly ownershipService: OwnershipService,
|
||||||
private readonly workflowRunner: WorkflowRunner,
|
private readonly workflowRunner: WorkflowRunner,
|
||||||
readonly orchestrationService: OrchestrationService,
|
private readonly orchestrationService: OrchestrationService,
|
||||||
) {
|
) {}
|
||||||
const { isSingleMainSetup, isLeader, multiMainSetup } = orchestrationService;
|
|
||||||
|
init() {
|
||||||
|
const { isSingleMainSetup, isLeader, multiMainSetup } = this.orchestrationService;
|
||||||
|
|
||||||
if (isSingleMainSetup) {
|
if (isSingleMainSetup) {
|
||||||
this.startTracking();
|
this.startTracking();
|
||||||
|
@ -43,7 +45,7 @@ export class WaitTracker {
|
||||||
.on('leader-stepdown', () => this.stopTracking());
|
.on('leader-stepdown', () => this.stopTracking());
|
||||||
}
|
}
|
||||||
|
|
||||||
startTracking() {
|
private startTracking() {
|
||||||
this.logger.debug('Wait tracker started tracking waiting executions');
|
this.logger.debug('Wait tracker started tracking waiting executions');
|
||||||
|
|
||||||
// Poll every 60 seconds a list of upcoming executions
|
// Poll every 60 seconds a list of upcoming executions
|
||||||
|
|
|
@ -178,6 +178,8 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
await this.initOrchestration();
|
await this.initOrchestration();
|
||||||
this.logger.debug('Orchestration init complete');
|
this.logger.debug('Orchestration init complete');
|
||||||
|
Container.get(WaitTracker).init();
|
||||||
|
this.logger.debug('Wait tracker init complete');
|
||||||
await this.initBinaryDataService();
|
await this.initBinaryDataService();
|
||||||
this.logger.debug('Binary data service init complete');
|
this.logger.debug('Binary data service init complete');
|
||||||
await this.initExternalHooks();
|
await this.initExternalHooks();
|
||||||
|
|
|
@ -2,31 +2,42 @@ import { WaitTracker } from '@/WaitTracker';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import type { IExecutionResponse } from '@/Interfaces';
|
import type { IExecutionResponse } from '@/Interfaces';
|
||||||
import type { OrchestrationService } from '@/services/orchestration.service';
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
|
||||||
jest.useFakeTimers();
|
jest.useFakeTimers();
|
||||||
|
|
||||||
describe('WaitTracker', () => {
|
describe('WaitTracker', () => {
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
const orchestrationService = mock<OrchestrationService>({
|
const multiMainSetup = mock<MultiMainSetup>();
|
||||||
isSingleMainSetup: true,
|
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);
|
||||||
});
|
|
||||||
|
|
||||||
const execution = mock<IExecutionResponse>({
|
const execution = mock<IExecutionResponse>({
|
||||||
id: '123',
|
id: '123',
|
||||||
waitTill: new Date(Date.now() + 1000),
|
waitTill: new Date(Date.now() + 1000),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let waitTracker: WaitTracker;
|
||||||
|
beforeEach(() => {
|
||||||
|
waitTracker = new WaitTracker(
|
||||||
|
mock(),
|
||||||
|
executionRepository,
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
orchestrationService,
|
||||||
|
);
|
||||||
|
multiMainSetup.on.mockReturnThis();
|
||||||
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('constructor()', () => {
|
describe('init()', () => {
|
||||||
it('should query DB for waiting executions', async () => {
|
it('should query DB for waiting executions', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
waitTracker.init();
|
||||||
|
|
||||||
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
@ -34,7 +45,7 @@ describe('WaitTracker', () => {
|
||||||
it('if no executions to start, should do nothing', () => {
|
it('if no executions to start, should do nothing', () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
waitTracker.init();
|
||||||
|
|
||||||
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
|
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
@ -42,13 +53,7 @@ describe('WaitTracker', () => {
|
||||||
describe('if execution to start', () => {
|
describe('if execution to start', () => {
|
||||||
it('if not enough time passed, should not start execution', async () => {
|
it('if not enough time passed, should not start execution', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
const waitTracker = new WaitTracker(
|
waitTracker.init();
|
||||||
mock(),
|
|
||||||
executionRepository,
|
|
||||||
mock(),
|
|
||||||
mock(),
|
|
||||||
orchestrationService,
|
|
||||||
);
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
await waitTracker.getWaitingExecutions();
|
await waitTracker.getWaitingExecutions();
|
||||||
|
@ -62,13 +67,7 @@ describe('WaitTracker', () => {
|
||||||
|
|
||||||
it('if enough time passed, should start execution', async () => {
|
it('if enough time passed, should start execution', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
const waitTracker = new WaitTracker(
|
waitTracker.init();
|
||||||
mock(),
|
|
||||||
executionRepository,
|
|
||||||
mock(),
|
|
||||||
mock(),
|
|
||||||
orchestrationService,
|
|
||||||
);
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
await waitTracker.getWaitingExecutions();
|
await waitTracker.getWaitingExecutions();
|
||||||
|
@ -85,13 +84,7 @@ describe('WaitTracker', () => {
|
||||||
describe('startExecution()', () => {
|
describe('startExecution()', () => {
|
||||||
it('should query for execution to start', async () => {
|
it('should query for execution to start', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
const waitTracker = new WaitTracker(
|
waitTracker.init();
|
||||||
mock(),
|
|
||||||
executionRepository,
|
|
||||||
mock(),
|
|
||||||
mock(),
|
|
||||||
orchestrationService,
|
|
||||||
);
|
|
||||||
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
||||||
waitTracker.startExecution(execution.id);
|
waitTracker.startExecution(execution.id);
|
||||||
|
@ -108,7 +101,7 @@ describe('WaitTracker', () => {
|
||||||
it('should start tracking', () => {
|
it('should start tracking', () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
waitTracker.init();
|
||||||
|
|
||||||
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
@ -116,29 +109,23 @@ describe('WaitTracker', () => {
|
||||||
|
|
||||||
describe('multi-main setup', () => {
|
describe('multi-main setup', () => {
|
||||||
it('should start tracking if leader', () => {
|
it('should start tracking if leader', () => {
|
||||||
const orchestrationService = mock<OrchestrationService>({
|
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
|
||||||
isLeader: true,
|
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
||||||
isSingleMainSetup: false,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
|
|
||||||
});
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
waitTracker.init();
|
||||||
|
|
||||||
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not start tracking if follower', () => {
|
it('should not start tracking if follower', () => {
|
||||||
const orchestrationService = mock<OrchestrationService>({
|
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
|
||||||
isLeader: false,
|
jest.spyOn(orchestrationService, 'isSingleMainSetup', 'get').mockReturnValue(false);
|
||||||
isSingleMainSetup: false,
|
|
||||||
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
|
|
||||||
});
|
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
waitTracker.init();
|
||||||
|
|
||||||
expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled();
|
expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue