refactor(core): Streamline flows in multi-main mode (no-changelog) (#8446)

This commit is contained in:
Iván Ovejero 2024-02-05 09:26:55 +01:00 committed by GitHub
parent da1fe44d52
commit dc5ec8f946
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 205 additions and 283 deletions

View file

@ -223,7 +223,8 @@ export class ActiveWorkflowRunner {
}
/**
* Clear workflow-defined webhooks from the `webhook_entity` table.
* Remove all webhooks of a workflow from the database, and
* deregister those webhooks from external services.
*/
async clearWebhooks(workflowId: string) {
const workflowData = await this.workflowRepository.findOne({
@ -418,9 +419,10 @@ export class ActiveWorkflowRunner {
}
/**
* Register as active in memory all workflows stored as `active`.
* Register as active in memory all workflows stored as `active`,
* only on instance init or (in multi-main setup) on leadership change.
*/
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') {
const dbWorkflows = await this.workflowRepository.getAllActive();
if (dbWorkflows.length === 0) return;
@ -433,7 +435,9 @@ export class ActiveWorkflowRunner {
for (const dbWorkflow of dbWorkflows) {
try {
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow);
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
shouldPublish: false,
});
if (wasActivated) {
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
@ -471,15 +475,21 @@ export class ActiveWorkflowRunner {
}
async clearAllActivationErrors() {
this.logger.debug('Clearing all activation errors');
await this.activationErrorsService.clearAll();
}
async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('Adding all trigger- and poller-based workflows');
await this.addActiveWorkflows('leadershipChange');
}
@OnShutdown()
async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('Removing all trigger- and poller-based workflows');
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}
@ -506,35 +516,24 @@ export class ActiveWorkflowRunner {
workflowId: string,
activationMode: WorkflowActivateMode,
existingWorkflow?: WorkflowEntity,
{ shouldPublish } = { shouldPublish: true },
) {
if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) {
await this.orchestrationService.publish('add-webhooks-triggers-and-pollers', {
workflowId,
});
return;
}
let workflow: Workflow;
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;
const shouldAddWebhooks = this.orchestrationService.shouldAddWebhooks(activationMode);
const shouldAddTriggersAndPollers = this.orchestrationService.shouldAddTriggersAndPollers();
/**
* In a multi-main scenario, webhooks are stored in the database, while triggers
* and pollers are run only by the leader main instance.
*
* - During a regular workflow activation (i.e. not leadership change), only the
* leader should add webhooks to prevent duplicate insertions, and only the leader
* should handle triggers and pollers to prevent duplicate work.
*
* - During a leadership change, webhooks remain in storage and so need not be added
* again, and the new leader should take over the triggers and pollers that stopped
* running when the former leader became unresponsive.
*/
if (this.orchestrationService.isMultiMainSetupEnabled) {
if (activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.orchestrationService.isLeader;
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
} else {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = this.orchestrationService.isLeader;
}
}
const shouldActivate = shouldAddWebhooks || shouldAddTriggersAndPollers;
const shouldDisplayActivationMessage =
(shouldAddWebhooks || shouldAddTriggersAndPollers) &&
['init', 'leadershipChange'].includes(activationMode);
try {
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
@ -543,7 +542,7 @@ export class ActiveWorkflowRunner {
throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
}
if (shouldActivate) {
if (shouldDisplayActivationMessage) {
this.logger.info(` - ${dbWorkflow.display()}`);
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
workflowName: dbWorkflow.name,
@ -608,7 +607,7 @@ export class ActiveWorkflowRunner {
// id of them in the static data. So make sure that data gets persisted.
await this.workflowStaticDataService.saveStaticData(workflow);
return shouldActivate;
return shouldDisplayActivationMessage;
}
/**
@ -709,7 +708,21 @@ export class ActiveWorkflowRunner {
*/
// TODO: this should happen in a transaction
async remove(workflowId: string) {
// Remove all the webhooks of the workflow
if (this.orchestrationService.isMultiMainSetupEnabled) {
try {
await this.clearWebhooks(workflowId);
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`,
);
}
await this.orchestrationService.publish('remove-triggers-and-pollers', { workflowId });
return;
}
try {
await this.clearWebhooks(workflowId);
} catch (error) {
@ -727,11 +740,21 @@ export class ActiveWorkflowRunner {
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
const removalSuccess = await this.activeWorkflows.remove(workflowId);
if (removalSuccess) {
this.logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId });
await this.removeWorkflowTriggersAndPollers(workflowId);
}
/**
* Stop running active triggers and pollers for a workflow.
*/
async removeWorkflowTriggersAndPollers(workflowId: string) {
if (!this.activeWorkflows.isActive(workflowId)) return;
const wasRemoved = await this.activeWorkflows.remove(workflowId);
if (wasRemoved) {
this.logger.warn(`Removed triggers and pollers for workflow "${workflowId}"`, {
workflowId,
});
}
}

View file

@ -226,31 +226,11 @@ export class Start extends BaseCommand {
if (!orchestrationService.isMultiMainSetupEnabled) return;
orchestrationService.multiMainSetup
.addListener('leadershipChange', async () => {
if (orchestrationService.isLeader) {
this.logger.debug('[Leadership change] Clearing all activation errors...');
await this.activeWorkflowRunner.clearAllActivationErrors();
this.logger.debug(
'[Leadership change] Adding all trigger- and poller-based workflows...',
);
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
this.logger.debug(
'[Leadership change] Removing all trigger- and poller-based workflows...',
);
.on('leader-stepdown', async () => {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
}
})
.addListener('leadershipVacant', async () => {
this.logger.debug(
'[Leadership vacant] Removing all trigger- and poller-based workflows...',
);
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
.on('leader-takeover', async () => {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
});
}
@ -370,16 +350,8 @@ export class Start extends BaseCommand {
if (!orchestrationService.isMultiMainSetupEnabled) return;
orchestrationService.multiMainSetup
.addListener('leadershipChange', async () => {
if (orchestrationService.isLeader) {
this.pruningService.startPruning();
} else {
this.pruningService.stopPruning();
}
})
.addListener('leadershipVacant', () => {
this.pruningService.stopPruning();
});
.on('leader-stepdown', () => this.pruningService.stopPruning())
.on('leader-takeover', () => this.pruningService.startPruning());
}
async catch(error: Error) {

View file

@ -6,6 +6,7 @@ import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/Redis
import { RedisService } from './redis.service';
import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee';
import type { WorkflowActivateMode } from 'n8n-workflow';
@Service()
export class OrchestrationService {
@ -118,4 +119,29 @@ export class OrchestrationService {
await this.redisPublisher.publishToCommandChannel({ command });
}
// ----------------------------------
// activations
// ----------------------------------
/**
* Whether this instance may add webhooks to the `webhook_entity` table.
*/
shouldAddWebhooks(activationMode: WorkflowActivateMode) {
if (activationMode === 'init') return false;
if (activationMode === 'leadershipChange') return false;
return this.isLeader; // 'update' or 'activate'
}
/**
* Whether this instance may add triggers and pollers to memory.
*
* In both single- and multi-main setup, only the leader is allowed to manage
* triggers and pollers in memory, to ensure they are not duplicated.
*/
shouldAddTriggersAndPollers() {
return this.isLeader;
}
}

View file

@ -62,11 +62,9 @@ export class MultiMainSetup extends EventEmitter {
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
config.set('multiMainSetup.instanceType', 'follower');
this.emit('leadershipChange'); // stop triggers, pollers, pruning
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning
EventReporter.report('[Multi-main setup] Leader failed to renew leader key', {
level: 'info',
});
EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
}
return;
@ -79,7 +77,7 @@ export class MultiMainSetup extends EventEmitter {
config.set('multiMainSetup.instanceType', 'follower');
this.emit('leadershipVacant'); // stop triggers, pollers, pruning
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning
await this.tryBecomeLeader();
}
@ -99,7 +97,7 @@ export class MultiMainSetup extends EventEmitter {
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
this.emit('leadershipChange'); // start triggers, pollers, pruning
this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning
} else {
config.set('multiMainSetup.instanceType', 'follower');
}

View file

@ -7,24 +7,30 @@ import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { Push } from '@/push';
import { TestWebhooks } from '@/TestWebhooks';
import { OrchestrationService } from '@/services/orchestration.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { TestWebhooks } from '@/TestWebhooks';
export async function handleCommandMessageMain(messageString: string) {
const queueModeId = config.getEnv('redis.queueModeId');
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
const message = messageToRedisServiceCommandObject(messageString);
const logger = Container.get(Logger);
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
if (message) {
logger.debug(
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
);
const selfSendingAllowed = [
'add-webhooks-triggers-and-pollers',
'remove-triggers-and-pollers',
].includes(message.command);
if (
message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId))
!selfSendingAllowed &&
(message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId)))
) {
// Skipping command message because it's not for this instance
logger.debug(
@ -71,52 +77,106 @@ export async function handleCommandMessageMain(messageString: string) {
await Container.get(ExternalSecretsManager).reloadAllProviders();
break;
case 'workflowActiveStateChanged': {
case 'add-webhooks-triggers-and-pollers': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
const orchestrationService = Container.get(OrchestrationService);
if (
typeof workflowId !== 'string' ||
typeof oldState !== 'boolean' ||
typeof newState !== 'boolean' ||
typeof versionId !== 'string'
) {
break;
}
if (orchestrationService.isFollower) break;
if (typeof message.payload?.workflowId !== 'string') break;
const { workflowId } = message.payload;
if (!oldState && newState) {
try {
await activeWorkflowRunner.add(workflowId, 'activate');
push.broadcast('workflowActivated', { workflowId });
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await Container.get(WorkflowRepository).update(workflowId, {
active: false,
versionId,
await Container.get(ActiveWorkflowRunner).add(workflowId, 'activate', undefined, {
shouldPublish: false, // prevent leader re-publishing message
});
await Container.get(OrchestrationService).publish('workflowFailedToActivate', {
push.broadcast('workflowActivated', { workflowId });
// instruct followers to show activation in UI
await orchestrationService.publish('display-workflow-activation', { workflowId });
} catch (error) {
if (error instanceof Error) {
await Container.get(WorkflowRepository).update(workflowId, { active: false });
Container.get(Push).broadcast('workflowFailedToActivate', {
workflowId,
errorMessage: error.message,
});
await Container.get(OrchestrationService).publish('workflow-failed-to-activate', {
workflowId,
errorMessage: error.message,
});
}
} else if (oldState && !newState) {
await activeWorkflowRunner.remove(workflowId);
push.broadcast('workflowDeactivated', { workflowId });
} else {
await activeWorkflowRunner.remove(workflowId);
await activeWorkflowRunner.add(workflowId, 'update');
}
break;
}
case 'remove-triggers-and-pollers': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const orchestrationService = Container.get(OrchestrationService);
if (orchestrationService.isFollower) break;
if (typeof message.payload?.workflowId !== 'string') break;
const { workflowId } = message.payload;
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
await activeWorkflowRunner.removeActivationError(workflowId);
await activeWorkflowRunner.removeWorkflowTriggersAndPollers(workflowId);
push.broadcast('workflowDeactivated', { workflowId });
// instruct followers to show workflow deactivation in UI
await orchestrationService.publish('display-workflow-deactivation', { workflowId });
break;
}
case 'workflowFailedToActivate': {
case 'display-workflow-activation': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId } = message.payload ?? {};
if (typeof workflowId !== 'string') break;
push.broadcast('workflowActivated', { workflowId });
break;
}
case 'display-workflow-deactivation': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId } = message.payload ?? {};
if (typeof workflowId !== 'string') break;
push.broadcast('workflowDeactivated', { workflowId });
break;
}
case 'workflow-failed-to-activate': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;

View file

@ -7,8 +7,11 @@ export type RedisServiceCommand =
| 'stopWorker'
| 'reloadLicense'
| 'reloadExternalSecretsProviders'
| 'workflowActiveStateChanged' // multi-main only
| 'workflowFailedToActivate' // multi-main only
| 'display-workflow-activation' // multi-main only
| 'display-workflow-deactivation' // multi-main only
| 'add-webhooks-triggers-and-pollers' // multi-main only
| 'remove-triggers-and-pollers' // multi-main only
| 'workflow-failed-to-activate' // multi-main only
| 'relay-execution-lifecycle-event' // multi-main only
| 'clear-test-webhooks'; // multi-main only

View file

@ -80,8 +80,6 @@ export class WorkflowService {
);
}
const oldState = shared.workflow.active;
if (
!forceSave &&
workflow.versionId !== '' &&
@ -227,17 +225,6 @@ export class WorkflowService {
await this.orchestrationService.init();
const newState = updatedWorkflow.active;
if (this.orchestrationService.isMultiMainSetupEnabled && oldState !== newState) {
await this.orchestrationService.publish('workflowActiveStateChanged', {
workflowId,
oldState,
newState,
versionId: shared.workflow.versionId,
});
}
return updatedWorkflow;
}

View file

@ -1,6 +1,6 @@
import { Container } from 'typedi';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import type { IWebhookData } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@ -14,13 +14,11 @@ import * as AdditionalData from '@/WorkflowExecuteAdditionalData';
import type { User } from '@db/entities/User';
import type { WebhookEntity } from '@db/entities/WebhookEntity';
import { NodeTypes } from '@/NodeTypes';
import { OrchestrationService } from '@/services/orchestration.service';
import { ExecutionService } from '@/executions/execution.service';
import { WorkflowService } from '@/workflows/workflow.service';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { mockInstance } from '../shared/mocking';
import { chooseRandomly } from './shared/random';
import { setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb';
import { createOwner } from './shared/db/users';
@ -33,11 +31,6 @@ mockInstance(ExecutionService);
mockInstance(WorkflowService);
const webhookService = mockInstance(WebhookService);
const orchestrationService = mockInstance(OrchestrationService, {
isMultiMainSetupEnabled: false,
isLeader: false,
isFollower: false,
});
setSchedulerAsLoadedNode();
@ -47,14 +40,6 @@ let activeWorkflowsService: ActiveWorkflowsService;
let activeWorkflowRunner: ActiveWorkflowRunner;
let owner: User;
const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [
'init',
'create',
'update',
'activate',
'manual',
];
beforeAll(async () => {
await testDb.init();
@ -215,113 +200,6 @@ describe('executeErrorWorkflow()', () => {
});
});
describe('add()', () => {
describe('in single-main scenario', () => {
test('should add webhooks, triggers and pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('in multi-main scenario', () => {
describe('leader', () => {
describe('on non-leadership-change activation mode', () => {
test('should add webhooks only', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await createWorkflow({ active: true }, owner);
jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true);
jest.replaceProperty(orchestrationService, 'isLeader', true);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('on leadership change activation mode', () => {
test('should add triggers and pollers only', async () => {
const mode = 'leadershipChange';
jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true);
jest.replaceProperty(orchestrationService, 'isLeader', true);
const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
});
describe('follower', () => {
describe('on any activation mode', () => {
test('should not add webhooks, triggers or pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true);
jest.replaceProperty(orchestrationService, 'isLeader', false);
const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
});
});
});
});
});
describe('addWebhooks()', () => {
test('should call `WebhookService.storeWebhook()`', async () => {
const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData;

View file

@ -31,9 +31,12 @@ export { setupTestServer } from './testServer';
* Initialize node types.
*/
export async function initActiveWorkflowRunner() {
mockInstance(Push);
mockInstance(OrchestrationService);
mockInstance(OrchestrationService, {
isMultiMainSetupEnabled: false,
shouldAddWebhooks: jest.fn().mockReturnValue(true),
});
mockInstance(Push);
mockInstance(ExecutionService);
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
const workflowRunner = Container.get(ActiveWorkflowRunner);

View file

@ -83,35 +83,4 @@ describe('update()', () => {
expect(addSpy).not.toHaveBeenCalled();
});
test('should broadcast active workflow state change if state changed', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const publishSpy = jest.spyOn(orchestrationService, 'publish');
workflow.active = false;
await workflowService.update(owner, workflow, workflow.id);
expect(publishSpy).toHaveBeenCalledTimes(1);
expect(publishSpy).toHaveBeenCalledWith(
'workflowActiveStateChanged',
expect.objectContaining({
newState: false,
oldState: true,
workflowId: workflow.id,
}),
);
});
test('should not broadcast active workflow state change if state did not change', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const publishSpy = jest.spyOn(orchestrationService, 'publish');
await workflowService.update(owner, workflow, workflow.id);
expect(publishSpy).not.toHaveBeenCalled();
});
});

View file

@ -33,7 +33,10 @@ export const error = (e: unknown, options?: ReportingOptions) => {
if (toReport) instance.report(toReport, options);
};
export const report = error;
export const info = (msg: string, options?: ReportingOptions) => {
Logger.info(msg);
instance.report(msg, options);
};
export const warn = (warning: Error | string, options?: ReportingOptions) =>
error(warning, { level: 'warning', ...options });

View file

@ -1986,10 +1986,10 @@ export type WorkflowExecuteMode =
export type WorkflowActivateMode =
| 'init'
| 'create'
| 'create' // unused
| 'update'
| 'activate'
| 'manual'
| 'manual' // unused
| 'leadershipChange';
export interface IWorkflowHooksOptionalParameters {