mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-10 12:27:31 -08:00
4c4082503c
Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926 ### Manual workflow activation and deactivation In a multi-main scenario, if the user manually activates or deactivates a workflow, the process (whether leader or follower) that handles the PATCH request and updates its internal state should send a message into the command channel, so that all other main processes update their internal state accordingly: - Add to `ActiveWorkflows` if activating - Remove from `ActiveWorkflows` if deactivating - Remove and re-add to `ActiveWorkflows` if the update did not change activation status. After updating their internal state, if activating or deactivating, the recipient main processes should push a message to all connected frontends so that these can update their stores and so reflect the value in the UI. ### Workflow activation errors On failure to activate a workflow, the main instance should record the error in Redis - main instances should always pull activation errors from Redis in a multi-main scenario. ### Leadership change On leadership change... - The old leader should stop pruning and the new leader should start pruning. - The old leader should remove trigger- and poller-based workflows and the new leader should add them.
153 lines
5.1 KiB
TypeScript
153 lines
5.1 KiB
TypeScript
import Container from 'typedi';
|
|
import config from '@/config';
|
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
|
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
|
import { eventBus } from '@/eventbus';
|
|
import { RedisService } from '@/services/redis.service';
|
|
import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handleWorkerResponseMessageMain';
|
|
import { handleCommandMessageMain } from '@/services/orchestration/main/handleCommandMessageMain';
|
|
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
|
import * as helpers from '@/services/orchestration/helpers';
|
|
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
|
import { Logger } from '@/Logger';
|
|
import { Push } from '@/push';
|
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
|
import { mockInstance } from '../../shared/mocking';
|
|
|
|
const os = Container.get(SingleMainSetup);
|
|
const handler = Container.get(OrchestrationHandlerMainService);
|
|
mockInstance(ActiveWorkflowRunner);
|
|
|
|
let queueModeId: string;
|
|
|
|
function setDefaultConfig() {
|
|
config.set('executions.mode', 'queue');
|
|
config.set('generic.instanceType', 'main');
|
|
}
|
|
|
|
const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
|
|
senderId: 'test',
|
|
workerId: 'test',
|
|
command: 'restartEventBus',
|
|
payload: {
|
|
result: 'success',
|
|
},
|
|
};
|
|
|
|
describe('Orchestration Service', () => {
|
|
const logger = mockInstance(Logger);
|
|
mockInstance(Push);
|
|
beforeAll(async () => {
|
|
mockInstance(RedisService);
|
|
mockInstance(ExternalSecretsManager);
|
|
jest.mock('ioredis', () => {
|
|
const Redis = require('ioredis-mock');
|
|
if (typeof Redis === 'object') {
|
|
// the first mock is an ioredis shim because ioredis-mock depends on it
|
|
// https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111
|
|
return {
|
|
Command: { _transformer: { argument: {}, reply: {} } },
|
|
};
|
|
}
|
|
// second mock for our code
|
|
return function (...args: any) {
|
|
return new Redis(args);
|
|
};
|
|
});
|
|
jest.mock('@/services/redis/RedisServicePubSubPublisher', () => {
|
|
return jest.fn().mockImplementation(() => {
|
|
return {
|
|
init: jest.fn(),
|
|
publishToEventLog: jest.fn(),
|
|
publishToWorkerChannel: jest.fn(),
|
|
destroy: jest.fn(),
|
|
};
|
|
});
|
|
});
|
|
jest.mock('@/services/redis/RedisServicePubSubSubscriber', () => {
|
|
return jest.fn().mockImplementation(() => {
|
|
return {
|
|
subscribeToCommandChannel: jest.fn(),
|
|
destroy: jest.fn(),
|
|
};
|
|
});
|
|
});
|
|
setDefaultConfig();
|
|
queueModeId = config.get('redis.queueModeId');
|
|
});
|
|
|
|
afterAll(async () => {
|
|
jest.mock('@/services/redis/RedisServicePubSubPublisher').restoreAllMocks();
|
|
jest.mock('@/services/redis/RedisServicePubSubSubscriber').restoreAllMocks();
|
|
await os.shutdown();
|
|
});
|
|
|
|
test('should initialize', async () => {
|
|
await os.init();
|
|
await handler.init();
|
|
expect(os.redisPublisher).toBeDefined();
|
|
expect(handler.redisSubscriber).toBeDefined();
|
|
expect(queueModeId).toBeDefined();
|
|
});
|
|
|
|
test('should handle worker responses', async () => {
|
|
const response = await handleWorkerResponseMessageMain(
|
|
JSON.stringify(workerRestartEventbusResponse),
|
|
);
|
|
expect(response.command).toEqual('restartEventBus');
|
|
});
|
|
|
|
test('should handle command messages from others', async () => {
|
|
const responseFalseId = await handleCommandMessageMain(
|
|
JSON.stringify({
|
|
senderId: 'test',
|
|
command: 'reloadLicense',
|
|
}),
|
|
);
|
|
expect(responseFalseId).toBeDefined();
|
|
expect(responseFalseId!.command).toEqual('reloadLicense');
|
|
expect(responseFalseId!.senderId).toEqual('test');
|
|
expect(logger.error).toHaveBeenCalled();
|
|
});
|
|
|
|
test('should reject command messages from iteslf', async () => {
|
|
jest.spyOn(eventBus, 'restart');
|
|
const response = await handleCommandMessageMain(
|
|
JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }),
|
|
);
|
|
expect(response).toBeDefined();
|
|
expect(response!.command).toEqual('restartEventBus');
|
|
expect(response!.senderId).toEqual(queueModeId);
|
|
expect(eventBus.restart).not.toHaveBeenCalled();
|
|
jest.spyOn(eventBus, 'restart').mockRestore();
|
|
});
|
|
|
|
test('should send command messages', async () => {
|
|
setDefaultConfig();
|
|
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockImplementation(async () => {});
|
|
await os.getWorkerIds();
|
|
expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled();
|
|
jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore();
|
|
});
|
|
|
|
test('should prevent receiving commands too often', async () => {
|
|
setDefaultConfig();
|
|
jest.spyOn(helpers, 'debounceMessageReceiver');
|
|
const res1 = await handleCommandMessageMain(
|
|
JSON.stringify({
|
|
senderId: 'test',
|
|
command: 'reloadExternalSecretsProviders',
|
|
}),
|
|
);
|
|
const res2 = await handleCommandMessageMain(
|
|
JSON.stringify({
|
|
senderId: 'test',
|
|
command: 'reloadExternalSecretsProviders',
|
|
}),
|
|
);
|
|
expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2);
|
|
expect(res1!.payload).toBeUndefined();
|
|
expect(res2!.payload!.result).toEqual('debounced');
|
|
});
|
|
});
|