mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
feat(core): Expand crash recovery to cover queue mode (#9676)
This commit is contained in:
parent
7e44cd7f16
commit
c58621ab79
|
@ -101,6 +101,15 @@ export class Queue {
|
||||||
return await this.jobQueue.getJobs(jobTypes);
|
return await this.jobQueue.getJobs(jobTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get IDs of executions that are currently in progress in the queue.
|
||||||
|
*/
|
||||||
|
async getInProgressExecutionIds() {
|
||||||
|
const inProgressJobs = await this.getJobs(['active', 'waiting']);
|
||||||
|
|
||||||
|
return new Set(inProgressJobs.map((job) => job.data.executionId));
|
||||||
|
}
|
||||||
|
|
||||||
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
||||||
return await this.jobQueue.process(concurrency, fn);
|
return await this.jobQueue.process(concurrency, fn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
|
||||||
import { ExecutionService } from '@/executions/execution.service';
|
import { ExecutionService } from '@/executions/execution.service';
|
||||||
import { OwnershipService } from '@/services/ownership.service';
|
import { OwnershipService } from '@/services/ownership.service';
|
||||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
|
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
||||||
const open = require('open');
|
const open = require('open');
|
||||||
|
@ -65,8 +66,6 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
protected server = Container.get(Server);
|
protected server = Container.get(Server);
|
||||||
|
|
||||||
private pruningService: PruningService;
|
|
||||||
|
|
||||||
constructor(argv: string[], cmdConfig: Config) {
|
constructor(argv: string[], cmdConfig: Config) {
|
||||||
super(argv, cmdConfig);
|
super(argv, cmdConfig);
|
||||||
this.setInstanceType('main');
|
this.setInstanceType('main');
|
||||||
|
@ -294,6 +293,7 @@ export class Start extends BaseCommand {
|
||||||
await this.server.start();
|
await this.server.start();
|
||||||
|
|
||||||
Container.get(PruningService).init();
|
Container.get(PruningService).init();
|
||||||
|
Container.get(ExecutionRecoveryService).init();
|
||||||
|
|
||||||
if (config.getEnv('executions.mode') === 'regular') {
|
if (config.getEnv('executions.mode') === 'regular') {
|
||||||
await this.runEnqueuedExecutions();
|
await this.runEnqueuedExecutions();
|
||||||
|
|
|
@ -371,6 +371,21 @@ export const schema = {
|
||||||
default: 10000,
|
default: 10000,
|
||||||
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
|
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
|
||||||
},
|
},
|
||||||
|
|
||||||
|
queueRecovery: {
|
||||||
|
interval: {
|
||||||
|
doc: 'How often (minutes) to check for queue recovery',
|
||||||
|
format: Number,
|
||||||
|
default: 180,
|
||||||
|
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL',
|
||||||
|
},
|
||||||
|
batchSize: {
|
||||||
|
doc: 'Size of batch of executions to check for queue recovery',
|
||||||
|
format: Number,
|
||||||
|
default: 100,
|
||||||
|
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH',
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
queue: {
|
queue: {
|
||||||
|
|
|
@ -275,9 +275,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.info('[Execution Recovery] Marked executions as `crashed`', {
|
this.logger.info('Marked executions as `crashed`', { executionIds });
|
||||||
executionIds,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -773,4 +771,18 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
|
|
||||||
return executions.map(({ id }) => id);
|
return executions.map(({ id }) => id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve a batch of execution IDs with `new` or `running` status, in most recent order.
|
||||||
|
*/
|
||||||
|
async getInProgressExecutionIds(batchSize: number) {
|
||||||
|
const executions = await this.find({
|
||||||
|
select: ['id'],
|
||||||
|
where: { status: In(['new', 'running']) },
|
||||||
|
order: { startedAt: 'DESC' },
|
||||||
|
take: batchSize,
|
||||||
|
});
|
||||||
|
|
||||||
|
return executions.map(({ id }) => id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,9 +7,12 @@ import { createWorkflow } from '@test-integration/db/workflows';
|
||||||
import { createExecution } from '@test-integration/db/executions';
|
import { createExecution } from '@test-integration/db/executions';
|
||||||
import * as testDb from '@test-integration/testDb';
|
import * as testDb from '@test-integration/testDb';
|
||||||
|
|
||||||
|
import { NodeConnectionType } from 'n8n-workflow';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
import config from '@/config';
|
||||||
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
|
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
|
|
||||||
import { InternalHooks } from '@/InternalHooks';
|
import { InternalHooks } from '@/InternalHooks';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { ARTIFICIAL_TASK_DATA } from '@/constants';
|
import { ARTIFICIAL_TASK_DATA } from '@/constants';
|
||||||
|
@ -17,11 +20,10 @@ import { NodeCrashedError } from '@/errors/node-crashed.error';
|
||||||
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
|
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
|
||||||
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
|
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
|
||||||
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
|
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
|
||||||
|
|
||||||
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
|
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
|
||||||
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
|
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
|
||||||
import { NodeConnectionType } from 'n8n-workflow';
|
import type { Logger } from '@/Logger';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
|
||||||
import config from '@/config';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Workflow producing an execution whose data will be truncated by an instance crash.
|
* Workflow producing an execution whose data will be truncated by an instance crash.
|
||||||
|
@ -174,20 +176,20 @@ export const setupMessages = (executionId: string, workflowName: string): EventM
|
||||||
};
|
};
|
||||||
|
|
||||||
describe('ExecutionRecoveryService', () => {
|
describe('ExecutionRecoveryService', () => {
|
||||||
let executionRecoveryService: ExecutionRecoveryService;
|
|
||||||
let push: Push;
|
let push: Push;
|
||||||
let executionRepository: ExecutionRepository;
|
let executionRecoveryService: ExecutionRecoveryService;
|
||||||
let orchestrationService: OrchestrationService;
|
let orchestrationService: OrchestrationService;
|
||||||
|
let executionRepository: ExecutionRepository;
|
||||||
|
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
await testDb.init();
|
await testDb.init();
|
||||||
|
|
||||||
mockInstance(InternalHooks);
|
|
||||||
push = mockInstance(Push);
|
push = mockInstance(Push);
|
||||||
executionRepository = Container.get(ExecutionRepository);
|
executionRepository = Container.get(ExecutionRepository);
|
||||||
orchestrationService = Container.get(OrchestrationService);
|
orchestrationService = Container.get(OrchestrationService);
|
||||||
|
|
||||||
|
mockInstance(InternalHooks);
|
||||||
executionRecoveryService = new ExecutionRecoveryService(
|
executionRecoveryService = new ExecutionRecoveryService(
|
||||||
|
mock<Logger>(),
|
||||||
push,
|
push,
|
||||||
executionRepository,
|
executionRepository,
|
||||||
orchestrationService,
|
orchestrationService,
|
||||||
|
@ -199,13 +201,78 @@ describe('ExecutionRecoveryService', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(async () => {
|
||||||
|
config.load(config.default);
|
||||||
|
jest.restoreAllMocks();
|
||||||
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
|
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
|
||||||
|
executionRecoveryService.shutdown();
|
||||||
});
|
});
|
||||||
|
|
||||||
afterAll(async () => {
|
afterAll(async () => {
|
||||||
await testDb.terminate();
|
await testDb.terminate();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('scheduleQueueRecovery', () => {
|
||||||
|
describe('queue mode', () => {
|
||||||
|
it('if leader, should schedule queue recovery', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.mode', 'queue');
|
||||||
|
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
|
||||||
|
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
executionRecoveryService.init();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(scheduleSpy).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('if follower, should do nothing', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.mode', 'queue');
|
||||||
|
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
|
||||||
|
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
executionRecoveryService.init();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(scheduleSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('regular mode', () => {
|
||||||
|
it('should do nothing', () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.mode', 'regular');
|
||||||
|
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
executionRecoveryService.init();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(scheduleSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('recoverFromLogs', () => {
|
describe('recoverFromLogs', () => {
|
||||||
describe('if follower', () => {
|
describe('if follower', () => {
|
||||||
test('should do nothing', async () => {
|
test('should do nothing', async () => {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import Container, { Service } from 'typedi';
|
import Container, { Service } from 'typedi';
|
||||||
import { Push } from '@/push';
|
import { Push } from '@/push';
|
||||||
import { sleep } from 'n8n-workflow';
|
import { jsonStringify, sleep } from 'n8n-workflow';
|
||||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
|
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
|
||||||
import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
|
import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
|
||||||
|
@ -11,6 +11,10 @@ import type { IExecutionResponse } from '@/Interfaces';
|
||||||
import { NodeCrashedError } from '@/errors/node-crashed.error';
|
import { NodeCrashedError } from '@/errors/node-crashed.error';
|
||||||
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
|
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
|
||||||
import { ARTIFICIAL_TASK_DATA } from '@/constants';
|
import { ARTIFICIAL_TASK_DATA } from '@/constants';
|
||||||
|
import { Logger } from '@/Logger';
|
||||||
|
import config from '@/config';
|
||||||
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||||
|
import type { QueueRecoverySettings } from './execution.types';
|
||||||
import { OrchestrationService } from '@/services/orchestration.service';
|
import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -19,11 +23,36 @@ import { OrchestrationService } from '@/services/orchestration.service';
|
||||||
@Service()
|
@Service()
|
||||||
export class ExecutionRecoveryService {
|
export class ExecutionRecoveryService {
|
||||||
constructor(
|
constructor(
|
||||||
|
private readonly logger: Logger,
|
||||||
private readonly push: Push,
|
private readonly push: Push,
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly orchestrationService: OrchestrationService,
|
private readonly orchestrationService: OrchestrationService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @important Requires `OrchestrationService` to be initialized on queue mode.
|
||||||
|
*/
|
||||||
|
init() {
|
||||||
|
if (config.getEnv('executions.mode') === 'regular') return;
|
||||||
|
|
||||||
|
const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService;
|
||||||
|
|
||||||
|
if (isLeader) this.scheduleQueueRecovery();
|
||||||
|
|
||||||
|
if (isMultiMainSetupEnabled) {
|
||||||
|
this.orchestrationService.multiMainSetup
|
||||||
|
.on('leader-takeover', () => this.scheduleQueueRecovery())
|
||||||
|
.on('leader-stepdown', () => this.stopQueueRecovery());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly queueRecoverySettings: QueueRecoverySettings = {
|
||||||
|
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
|
||||||
|
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
|
||||||
|
};
|
||||||
|
|
||||||
|
private isShuttingDown = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recover key properties of a truncated execution using event logs.
|
* Recover key properties of a truncated execution using event logs.
|
||||||
*/
|
*/
|
||||||
|
@ -34,6 +63,10 @@ export class ExecutionRecoveryService {
|
||||||
|
|
||||||
if (!amendedExecution) return null;
|
if (!amendedExecution) return null;
|
||||||
|
|
||||||
|
this.logger.info('[Recovery] Logs available, amended execution', {
|
||||||
|
executionId: amendedExecution.id,
|
||||||
|
});
|
||||||
|
|
||||||
await this.executionRepository.updateExistingExecution(executionId, amendedExecution);
|
await this.executionRepository.updateExistingExecution(executionId, amendedExecution);
|
||||||
|
|
||||||
await this.runHooks(amendedExecution);
|
await this.runHooks(amendedExecution);
|
||||||
|
@ -46,12 +79,89 @@ export class ExecutionRecoveryService {
|
||||||
return amendedExecution;
|
return amendedExecution;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule a cycle to mark dangling executions as crashed in queue mode.
|
||||||
|
*/
|
||||||
|
scheduleQueueRecovery(waitMs = this.queueRecoverySettings.waitMs) {
|
||||||
|
if (!this.shouldScheduleQueueRecovery()) return;
|
||||||
|
|
||||||
|
this.queueRecoverySettings.timeout = setTimeout(async () => {
|
||||||
|
try {
|
||||||
|
const nextWaitMs = await this.recoverFromQueue();
|
||||||
|
this.scheduleQueueRecovery(nextWaitMs);
|
||||||
|
} catch (error) {
|
||||||
|
const msg = this.toErrorMsg(error);
|
||||||
|
|
||||||
|
this.logger.error('[Recovery] Failed to recover dangling executions from queue', { msg });
|
||||||
|
this.logger.error('[Recovery] Retrying...');
|
||||||
|
|
||||||
|
this.scheduleQueueRecovery();
|
||||||
|
}
|
||||||
|
}, waitMs);
|
||||||
|
|
||||||
|
const wait = [this.queueRecoverySettings.waitMs / (60 * 1000), 'min'].join(' ');
|
||||||
|
|
||||||
|
this.logger.debug(`[Recovery] Scheduled queue recovery check for next ${wait}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
stopQueueRecovery() {
|
||||||
|
clearTimeout(this.queueRecoverySettings.timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnShutdown()
|
||||||
|
shutdown() {
|
||||||
|
this.isShuttingDown = true;
|
||||||
|
this.stopQueueRecovery();
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------
|
// ----------------------------------
|
||||||
// private
|
// private
|
||||||
// ----------------------------------
|
// ----------------------------------
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Amend `status`, `stoppedAt`, and (if possible) `data` properties of an execution.
|
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
||||||
|
* but absent from the queue. Return time until next recovery cycle.
|
||||||
|
*/
|
||||||
|
private async recoverFromQueue() {
|
||||||
|
const { waitMs, batchSize } = this.queueRecoverySettings;
|
||||||
|
|
||||||
|
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
||||||
|
|
||||||
|
if (storedIds.length === 0) {
|
||||||
|
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
|
||||||
|
return waitMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { Queue } = await import('@/Queue');
|
||||||
|
|
||||||
|
const queuedIds = await Container.get(Queue).getInProgressExecutionIds();
|
||||||
|
|
||||||
|
if (queuedIds.size === 0) {
|
||||||
|
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
|
||||||
|
return waitMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
||||||
|
|
||||||
|
if (danglingIds.length === 0) {
|
||||||
|
this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions');
|
||||||
|
return waitMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.executionRepository.markAsCrashed(danglingIds);
|
||||||
|
|
||||||
|
this.logger.info('[Recovery] Completed queue recovery check, recovered dangling executions', {
|
||||||
|
danglingIds,
|
||||||
|
});
|
||||||
|
|
||||||
|
// if this cycle used up the whole batch size, it is possible for there to be
|
||||||
|
// dangling executions outside this check, so speed up next cycle
|
||||||
|
|
||||||
|
return storedIds.length >= this.queueRecoverySettings.batchSize ? waitMs / 2 : waitMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Amend `status`, `stoppedAt`, and (if possible) `data` of an execution using event logs.
|
||||||
*/
|
*/
|
||||||
private async amend(executionId: string, messages: EventMessageTypes[]) {
|
private async amend(executionId: string, messages: EventMessageTypes[]) {
|
||||||
if (messages.length === 0) return await this.amendWithoutLogs(executionId);
|
if (messages.length === 0) return await this.amendWithoutLogs(executionId);
|
||||||
|
@ -198,4 +308,18 @@ export class ExecutionRecoveryService {
|
||||||
|
|
||||||
await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]);
|
await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private toErrorMsg(error: unknown) {
|
||||||
|
return error instanceof Error
|
||||||
|
? error.message
|
||||||
|
: jsonStringify(error, { replaceCircularRefs: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
private shouldScheduleQueueRecovery() {
|
||||||
|
return (
|
||||||
|
config.getEnv('executions.mode') === 'queue' &&
|
||||||
|
config.getEnv('multiMainSetup.instanceType') === 'leader' &&
|
||||||
|
!this.isShuttingDown
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,3 +84,20 @@ export namespace ExecutionSummaries {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type QueueRecoverySettings = {
|
||||||
|
/**
|
||||||
|
* ID of timeout for next scheduled recovery cycle.
|
||||||
|
*/
|
||||||
|
timeout?: NodeJS.Timeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of in-progress executions to check per cycle.
|
||||||
|
*/
|
||||||
|
batchSize: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time (in milliseconds) to wait before the next cycle.
|
||||||
|
*/
|
||||||
|
waitMs: number;
|
||||||
|
};
|
||||||
|
|
|
@ -62,7 +62,7 @@ export class MultiMainSetup extends EventEmitter {
|
||||||
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
|
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
|
||||||
config.set('multiMainSetup.instanceType', 'follower');
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
|
|
||||||
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking
|
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery
|
||||||
|
|
||||||
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
|
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ export class MultiMainSetup extends EventEmitter {
|
||||||
config.set('multiMainSetup.instanceType', 'follower');
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal
|
* Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery
|
||||||
*/
|
*/
|
||||||
this.emit('leader-stepdown');
|
this.emit('leader-stepdown');
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ export class MultiMainSetup extends EventEmitter {
|
||||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal
|
* Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery
|
||||||
*/
|
*/
|
||||||
this.emit('leader-takeover');
|
this.emit('leader-takeover');
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue