mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Ensure only leader handles waiting executions (#9014)
This commit is contained in:
parent
db4f8d49a3
commit
217b07d735
|
@ -7,8 +7,9 @@ import { Container, Service } from 'typedi';
|
||||||
import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces';
|
||||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
import { OwnershipService } from './services/ownership.service';
|
import { OwnershipService } from '@/services/ownership.service';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class WaitTracker {
|
export class WaitTracker {
|
||||||
|
@ -26,7 +27,22 @@ export class WaitTracker {
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly ownershipService: OwnershipService,
|
private readonly ownershipService: OwnershipService,
|
||||||
private readonly workflowRunner: WorkflowRunner,
|
private readonly workflowRunner: WorkflowRunner,
|
||||||
|
readonly orchestrationService: OrchestrationService,
|
||||||
) {
|
) {
|
||||||
|
const { isLeader, isMultiMainSetupEnabled, multiMainSetup } = orchestrationService;
|
||||||
|
|
||||||
|
if (isLeader) this.startTracking();
|
||||||
|
|
||||||
|
if (isMultiMainSetupEnabled) {
|
||||||
|
multiMainSetup
|
||||||
|
.on('leader-takeover', () => this.startTracking())
|
||||||
|
.on('leader-stepdown', () => this.stopTracking());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startTracking() {
|
||||||
|
this.logger.debug('Wait tracker started tracking waiting executions');
|
||||||
|
|
||||||
// Poll every 60 seconds a list of upcoming executions
|
// Poll every 60 seconds a list of upcoming executions
|
||||||
this.mainTimer = setInterval(() => {
|
this.mainTimer = setInterval(() => {
|
||||||
void this.getWaitingExecutions();
|
void this.getWaitingExecutions();
|
||||||
|
@ -174,7 +190,9 @@ export class WaitTracker {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
shutdown() {
|
stopTracking() {
|
||||||
|
this.logger.debug('Wait tracker shutting down');
|
||||||
|
|
||||||
clearInterval(this.mainTimer);
|
clearInterval(this.mainTimer);
|
||||||
Object.keys(this.waitingExecutions).forEach((executionId) => {
|
Object.keys(this.waitingExecutions).forEach((executionId) => {
|
||||||
clearTimeout(this.waitingExecutions[executionId].timer);
|
clearTimeout(this.waitingExecutions[executionId].timer);
|
||||||
|
|
|
@ -94,7 +94,7 @@ export class Start extends BaseCommand {
|
||||||
// Stop with trying to activate workflows that could not be activated
|
// Stop with trying to activate workflows that could not be activated
|
||||||
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
|
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
|
||||||
|
|
||||||
Container.get(WaitTracker).shutdown();
|
Container.get(WaitTracker).stopTracking();
|
||||||
|
|
||||||
await this.externalHooks?.run('n8n.stop', []);
|
await this.externalHooks?.run('n8n.stop', []);
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,9 @@ export class OrchestrationService {
|
||||||
return config.getEnv('redis.queueModeId');
|
return config.getEnv('redis.queueModeId');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether this instance is the leader in a multi-main setup. Always `true` in single-main setup.
|
||||||
|
*/
|
||||||
get isLeader() {
|
get isLeader() {
|
||||||
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ export class MultiMainSetup extends EventEmitter {
|
||||||
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
|
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
|
||||||
config.set('multiMainSetup.instanceType', 'follower');
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
|
|
||||||
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning
|
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking
|
||||||
|
|
||||||
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
|
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ export class MultiMainSetup extends EventEmitter {
|
||||||
|
|
||||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||||
|
|
||||||
this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning
|
this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning, wait-tracking
|
||||||
} else {
|
} else {
|
||||||
config.set('multiMainSetup.instanceType', 'follower');
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,17 @@ import { WaitTracker } from '@/WaitTracker';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
import type { IExecutionResponse } from '@/Interfaces';
|
import type { IExecutionResponse } from '@/Interfaces';
|
||||||
|
import type { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
|
||||||
jest.useFakeTimers();
|
jest.useFakeTimers();
|
||||||
|
|
||||||
describe('WaitTracker', () => {
|
describe('WaitTracker', () => {
|
||||||
const executionRepository = mock<ExecutionRepository>();
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
|
const orchestrationService = mock<OrchestrationService>({
|
||||||
|
isLeader: true,
|
||||||
|
isMultiMainSetupEnabled: false,
|
||||||
|
});
|
||||||
|
|
||||||
const execution = mock<IExecutionResponse>({
|
const execution = mock<IExecutionResponse>({
|
||||||
id: '123',
|
id: '123',
|
||||||
|
@ -21,7 +27,7 @@ describe('WaitTracker', () => {
|
||||||
it('should query DB for waiting executions', async () => {
|
it('should query DB for waiting executions', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock());
|
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
||||||
|
|
||||||
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
@ -29,7 +35,7 @@ describe('WaitTracker', () => {
|
||||||
it('if no executions to start, should do nothing', () => {
|
it('if no executions to start, should do nothing', () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
new WaitTracker(mock(), executionRepository, mock(), mock());
|
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
||||||
|
|
||||||
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
|
expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
@ -37,7 +43,13 @@ describe('WaitTracker', () => {
|
||||||
describe('if execution to start', () => {
|
describe('if execution to start', () => {
|
||||||
it('if not enough time passed, should not start execution', async () => {
|
it('if not enough time passed, should not start execution', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
|
const waitTracker = new WaitTracker(
|
||||||
|
mock(),
|
||||||
|
executionRepository,
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
orchestrationService,
|
||||||
|
);
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
await waitTracker.getWaitingExecutions();
|
await waitTracker.getWaitingExecutions();
|
||||||
|
@ -51,7 +63,13 @@ describe('WaitTracker', () => {
|
||||||
|
|
||||||
it('if enough time passed, should start execution', async () => {
|
it('if enough time passed, should start execution', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
|
const waitTracker = new WaitTracker(
|
||||||
|
mock(),
|
||||||
|
executionRepository,
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
orchestrationService,
|
||||||
|
);
|
||||||
|
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
|
||||||
await waitTracker.getWaitingExecutions();
|
await waitTracker.getWaitingExecutions();
|
||||||
|
@ -68,7 +86,13 @@ describe('WaitTracker', () => {
|
||||||
describe('startExecution()', () => {
|
describe('startExecution()', () => {
|
||||||
it('should query for execution to start', async () => {
|
it('should query for execution to start', async () => {
|
||||||
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
|
const waitTracker = new WaitTracker(
|
||||||
|
mock(),
|
||||||
|
executionRepository,
|
||||||
|
mock(),
|
||||||
|
mock(),
|
||||||
|
orchestrationService,
|
||||||
|
);
|
||||||
|
|
||||||
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
executionRepository.findSingleExecution.mockResolvedValue(execution);
|
||||||
waitTracker.startExecution(execution.id);
|
waitTracker.startExecution(execution.id);
|
||||||
|
@ -80,4 +104,34 @@ describe('WaitTracker', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('multi-main setup', () => {
|
||||||
|
it('should start tracking if leader', () => {
|
||||||
|
const orchestrationService = mock<OrchestrationService>({
|
||||||
|
isLeader: true,
|
||||||
|
isMultiMainSetupEnabled: true,
|
||||||
|
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
|
||||||
|
});
|
||||||
|
|
||||||
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
|
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
||||||
|
|
||||||
|
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not start tracking if follower', () => {
|
||||||
|
const orchestrationService = mock<OrchestrationService>({
|
||||||
|
isLeader: false,
|
||||||
|
isMultiMainSetupEnabled: true,
|
||||||
|
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
|
||||||
|
});
|
||||||
|
|
||||||
|
executionRepository.getWaitingExecutions.mockResolvedValue([]);
|
||||||
|
|
||||||
|
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);
|
||||||
|
|
||||||
|
expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue