mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
fix(core): Scheduler tasks should not trigger on follower instances (#10507)
This commit is contained in:
parent
c8ab9b1f84
commit
3428f28a73
|
@ -1,13 +1,24 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { CronJob } from 'cron';
|
import { CronJob } from 'cron';
|
||||||
import type { CronExpression, Workflow } from 'n8n-workflow';
|
import type { CronExpression, Workflow } from 'n8n-workflow';
|
||||||
|
import { InstanceSettings } from './InstanceSettings';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ScheduledTaskManager {
|
export class ScheduledTaskManager {
|
||||||
|
constructor(private readonly instanceSettings: InstanceSettings) {}
|
||||||
|
|
||||||
readonly cronJobs = new Map<string, CronJob[]>();
|
readonly cronJobs = new Map<string, CronJob[]>();
|
||||||
|
|
||||||
registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
|
registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
|
||||||
const cronJob = new CronJob(cronExpression, onTick, undefined, true, workflow.timezone);
|
const cronJob = new CronJob(
|
||||||
|
cronExpression,
|
||||||
|
() => {
|
||||||
|
if (this.instanceSettings.isLeader) onTick();
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
true,
|
||||||
|
workflow.timezone,
|
||||||
|
);
|
||||||
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
|
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
|
||||||
if (cronJobsForWorkflow) {
|
if (cronJobsForWorkflow) {
|
||||||
cronJobsForWorkflow.push(cronJob);
|
cronJobsForWorkflow.push(cronJob);
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import type { Workflow } from 'n8n-workflow';
|
import type { Workflow } from 'n8n-workflow';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
|
|
||||||
|
import type { InstanceSettings } from '@/InstanceSettings';
|
||||||
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
|
import { ScheduledTaskManager } from '@/ScheduledTaskManager';
|
||||||
|
|
||||||
describe('ScheduledTaskManager', () => {
|
describe('ScheduledTaskManager', () => {
|
||||||
|
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
|
||||||
const workflow = mock<Workflow>({ timezone: 'GMT' });
|
const workflow = mock<Workflow>({ timezone: 'GMT' });
|
||||||
const everyMinute = '0 * * * * *';
|
const everyMinute = '0 * * * * *';
|
||||||
const onTick = jest.fn();
|
const onTick = jest.fn();
|
||||||
|
@ -13,7 +15,7 @@ describe('ScheduledTaskManager', () => {
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
jest.clearAllMocks();
|
jest.clearAllMocks();
|
||||||
jest.useFakeTimers();
|
jest.useFakeTimers();
|
||||||
scheduledTaskManager = new ScheduledTaskManager();
|
scheduledTaskManager = new ScheduledTaskManager(instanceSettings);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw when workflow timezone is invalid', () => {
|
it('should throw when workflow timezone is invalid', () => {
|
||||||
|
@ -41,6 +43,15 @@ describe('ScheduledTaskManager', () => {
|
||||||
expect(onTick).toHaveBeenCalledTimes(10);
|
expect(onTick).toHaveBeenCalledTimes(10);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should should not invoke on follower instances', async () => {
|
||||||
|
scheduledTaskManager = new ScheduledTaskManager(mock<InstanceSettings>({ isLeader: false }));
|
||||||
|
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
|
||||||
|
|
||||||
|
expect(onTick).not.toHaveBeenCalled();
|
||||||
|
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
|
||||||
|
expect(onTick).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
it('should deregister CronJobs for a workflow', async () => {
|
it('should deregister CronJobs for a workflow', async () => {
|
||||||
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
|
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
|
||||||
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
|
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import * as n8nWorkflow from 'n8n-workflow';
|
import * as n8nWorkflow from 'n8n-workflow';
|
||||||
import type { INode, ITriggerFunctions, Workflow } from 'n8n-workflow';
|
import type { INode, ITriggerFunctions, Workflow } from 'n8n-workflow';
|
||||||
import { returnJsonArray } from 'n8n-core';
|
import { type InstanceSettings, returnJsonArray } from 'n8n-core';
|
||||||
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
|
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { ScheduleTrigger } from '../ScheduleTrigger.node';
|
import { ScheduleTrigger } from '../ScheduleTrigger.node';
|
||||||
|
@ -18,7 +18,8 @@ describe('ScheduleTrigger', () => {
|
||||||
|
|
||||||
const node = mock<INode>({ typeVersion: 1 });
|
const node = mock<INode>({ typeVersion: 1 });
|
||||||
const workflow = mock<Workflow>({ timezone });
|
const workflow = mock<Workflow>({ timezone });
|
||||||
const scheduledTaskManager = new ScheduledTaskManager();
|
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
|
||||||
|
const scheduledTaskManager = new ScheduledTaskManager(instanceSettings);
|
||||||
const helpers = mock<ITriggerFunctions['helpers']>({
|
const helpers = mock<ITriggerFunctions['helpers']>({
|
||||||
returnJsonArray,
|
returnJsonArray,
|
||||||
registerCron: (cronExpression, onTick) =>
|
registerCron: (cronExpression, onTick) =>
|
||||||
|
|
Loading…
Reference in a new issue