feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario (#7643)

Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926

### Manual workflow activation and deactivation

In a multi-main scenario, if the user manually activates or deactivates
a workflow, the process (whether leader or follower) that handles the
PATCH request and updates its internal state should send a message into
the command channel, so that all other main processes update their
internal state accordingly:

- Add to `ActiveWorkflows` if activating
- Remove from `ActiveWorkflows` if deactivating
- Remove and re-add to `ActiveWorkflows` if the update did not change
activation status.

After updating their internal state, if activating or deactivating, the
recipient main processes should push a message to all connected
frontends so that these can update their stores and so reflect the value
in the UI.

### Workflow activation errors

On failure to activate a workflow, the main instance should record the
error in Redis - main instances should always pull activation errors
from Redis in a multi-main scenario.

### Leadership change

On leadership change...

- The old leader should stop pruning and the new leader should start
pruning.
- The old leader should remove trigger- and poller-based workflows and
the new leader should add them.
This commit is contained in:
Iván Ovejero 2023-11-17 15:58:50 +01:00 committed by GitHub
parent b3a3f16bc2
commit 4c4082503c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 639 additions and 336 deletions

View file

@ -0,0 +1,52 @@
import { Service } from 'typedi';
import { CacheService } from './services/cache.service';
import { jsonParse } from 'n8n-workflow';
type ActivationErrors = {
[workflowId: string]: string; // error message
};
@Service()
export class ActivationErrorsService {
private readonly cacheKey = 'workflow-activation-errors';
constructor(private readonly cacheService: CacheService) {}
async set(workflowId: string, errorMessage: string) {
const errors = await this.getAll();
errors[workflowId] = errorMessage;
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}
async unset(workflowId: string) {
const errors = await this.getAll();
if (Object.keys(errors).length === 0) return;
delete errors[workflowId];
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}
async get(workflowId: string) {
const errors = await this.getAll();
if (Object.keys(errors).length === 0) return null;
return errors[workflowId];
}
async getAll() {
const errors = await this.cacheService.get<string>(this.cacheKey);
if (!errors) return {};
return jsonParse<ActivationErrors>(errors);
}
async clearAll() {
await this.cacheService.delete(this.cacheKey);
}
}

View file

@ -2,8 +2,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import config from '@/config';
import type {
ExecutionError,
@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';
const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows();
private activationErrors: {
[workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {};
private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
};
} = {};
isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
) {}
async init() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await this.multiMainInstancePublisher.init();
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.multiMainSetup.init();
}
await this.addActiveWorkflows('init');
@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner';
const activationErrors = await this.activationErrorsService.getAll();
if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}
const where = whereClause({
@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]);
.filter((workflowId) => !activationErrors[workflowId]);
}
/**
@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Return error if there was a problem activating the workflow
*/
getActivationError(workflowId: string) {
return this.activationErrors[workflowId];
async getActivationError(workflowId: string) {
return this.activationErrorsService.get(workflowId);
}
/**
@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Remove the workflow as "active"
void this.activeWorkflows.remove(workflowData.id);
this.activationErrors[workflowData.id] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};
void this.activationErrorsService.set(workflowData.id, error.message);
// Run Error Workflow if defined
const activationError = new WorkflowActivationError(
@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
this.logger.verbose('Finished activating workflows (startup)');
}
async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
async clearAllActivationErrors() {
await this.activationErrorsService.clearAll();
}
async addAllTriggerAndPollerBasedWorkflows() {
await this.addActiveWorkflows('leadershipChange');
}
async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}
@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
}
if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
}
@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
if (shouldAddWebhooks) {
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
}
if (shouldAddTriggersAndPollers) {
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId);
if (this.activationErrors[workflowId]) {
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);
const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (error) {
this.activationErrors[workflowId] = {
time: new Date().getTime(),
error: {
message: error.message,
},
};
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await this.activationErrorsService.set(workflowId, error.message);
throw error;
throw e;
}
// If for example webhooks get created it sometimes has to save the
@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
);
}
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
await this.activationErrorsService.unset(workflowId);
if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
});
}
}
async removeActivationError(workflowId: string) {
await this.activationErrorsService.unset(workflowId);
}
}

View file

@ -19,7 +19,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
@Service()
export class ExternalSecretsManager {
@ -82,7 +82,7 @@ export class ExternalSecretsManager {
}
async broadcastReloadExternalSecretsProviders() {
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
}
private decryptSecretsSettings(value: string): ExternalSecretsSettings {

View file

@ -298,6 +298,7 @@ export interface IDiagnosticInfo {
ldap_allowed: boolean;
saml_enabled: boolean;
binary_data_s3: boolean;
multi_main_setup_enabled: boolean;
licensePlanName?: string;
licenseTenantId?: number;
}
@ -469,7 +470,25 @@ export type IPushData =
| PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged
| PushDataWorkerStatusMessage;
| PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;
type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataWorkflowActivated = {
data: IActiveWorkflowChanged;
type: 'workflowActivated';
};
type PushDataWorkflowDeactivated = {
data: IActiveWorkflowChanged;
type: 'workflowDeactivated';
};
type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged;
@ -536,11 +555,24 @@ export interface IActiveWorkflowUser {
lastSeen: Date;
}
export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}
export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[];
}
interface IActiveWorkflowChanged {
workflowId: Workflow['id'];
}
interface IWorkflowFailedToActivate {
workflowId: Workflow['id'];
errorMessage: string;
}
export interface IPushDataExecutionRecovered {
executionId: string;
}

View file

@ -16,6 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
type FeatureReturnType = Partial<
{
@ -40,6 +41,7 @@ export class License {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly multiMainSetup: MultiMainSetup,
private readonly settingsRepository: SettingsRepository,
private readonly workflowRepository: WorkflowRepository,
) {}
@ -49,6 +51,10 @@ export class License {
return;
}
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.multiMainSetup.init();
}
const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
@ -114,22 +120,28 @@ export class License {
}
async onFeatureChange(_features: TFeatures): Promise<void> {
if (config.getEnv('executions.mode') === 'queue') {
if (config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
| boolean
| undefined;
this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false);
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...',
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping sending of reloadLicense command...');
return;
}
if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) {
this.logger.debug(
'[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.',
);
}
}
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();

View file

@ -215,6 +215,7 @@ export class Server extends AbstractServer {
ldap_allowed: isLdapCurrentAuthenticationMethod(),
saml_enabled: isSamlCurrentAuthenticationMethod(),
binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'),
licensePlanName: Container.get(License).getPlanName(),
licenseTenantId: config.getEnv('license.tenantId'),
};
@ -448,7 +449,7 @@ export class Server extends AbstractServer {
// Returns if the workflow with the given id had any activation errors
this.app.get(
`/${this.restEndpoint}/active/error/:id`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActivationErrors) => {
ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
const { id: workflowId } = req.params;
const shared = await Container.get(SharedWorkflowRepository).findOne({

View file

@ -243,21 +243,6 @@ export abstract class BaseCommand extends Command {
}
async initLicense(): Promise<void> {
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping license initialization...');
return;
}
}
const license = Container.get(License);
await license.init(this.instanceType ?? 'main');

View file

@ -25,9 +25,10 @@ import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { License, FeatureNotLicensedError } from '@/License';
import { IConfig } from '@oclif/config';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { PruningService } from '@/services/pruning.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -112,18 +113,14 @@ export class Start extends BaseCommand {
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
await Container.get(License).shutdown();
if (await this.pruningService.isPruningEnabled()) {
await this.pruningService.stopPruning();
if (this.pruningService.isPruningEnabled()) {
this.pruningService.stopPruning();
}
if (config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await Container.get(MultiMainInstancePublisher).destroy();
await Container.get(MultiMainSetup).shutdown();
}
await Container.get(InternalHooks).onN8nStop();
@ -230,38 +227,42 @@ export class Start extends BaseCommand {
}
async initOrchestration() {
if (config.get('executions.mode') !== 'queue') return;
if (config.getEnv('executions.mode') !== 'queue') return;
if (!config.get('leaderSelection.enabled')) {
await Container.get(SingleMainInstancePublisher).init();
// queue mode in single-main scenario
if (!config.getEnv('multiMainSetup.enabled')) {
await Container.get(SingleMainSetup).init();
await Container.get(OrchestrationHandlerMainService).init();
return;
}
// multi-main scenario
// queue mode in multi-main scenario
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (
multiMainInstancePublisher.isLeader &&
!Container.get(License).isMultipleMainInstancesLicensed()
) {
if (!Container.get(License).isMultipleMainInstancesLicensed()) {
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
}
await Container.get(OrchestrationHandlerMainService).init();
multiMainInstancePublisher.on('leadershipChange', async () => {
if (multiMainInstancePublisher.isLeader) {
const multiMainSetup = Container.get(MultiMainSetup);
await multiMainSetup.init();
multiMainSetup.on('leadershipChange', async () => {
if (multiMainSetup.isLeader) {
this.logger.debug('[Leadership change] Clearing all activation errors...');
await this.activeWorkflowRunner.clearAllActivationErrors();
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
// only in case of leadership change without shutdown
this.logger.debug(
'[Leadership change] Removing all trigger- and poller-based workflows...',
);
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
}
});
@ -333,10 +334,7 @@ export class Start extends BaseCommand {
await this.server.start();
this.pruningService = Container.get(PruningService);
if (await this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
await this.initPruning();
// Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init();
@ -375,6 +373,32 @@ export class Start extends BaseCommand {
}
}
async initPruning() {
this.pruningService = Container.get(PruningService);
if (this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const multiMainSetup = Container.get(MultiMainSetup);
await multiMainSetup.init();
multiMainSetup.on('leadershipChange', async () => {
if (multiMainSetup.isLeader) {
if (this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
} else {
if (this.pruningService.isPruningEnabled()) {
this.pruningService.stopPruning();
}
}
});
}
}
async catch(error: Error) {
console.log(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);

View file

@ -1324,24 +1324,29 @@ export const schema = {
},
},
leaderSelection: {
multiMainSetup: {
instanceType: {
doc: 'Type of instance in multi-main setup',
format: ['unset', 'leader', 'follower'] as const,
default: 'unset', // only until first leader key check
},
enabled: {
doc: 'Whether to enable leader selection for multiple main instances (license required)',
doc: 'Whether to enable multi-main setup for queue mode (license required)',
format: Boolean,
default: false,
env: 'N8N_LEADER_SELECTION_ENABLED',
env: 'N8N_MULTI_MAIN_SETUP_ENABLED',
},
ttl: {
doc: 'Time to live in Redis for leader selection key, in seconds',
doc: 'Time to live (in seconds) for leader key in multi-main setup',
format: Number,
default: 10,
env: 'N8N_LEADER_SELECTION_KEY_TTL',
env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL',
},
interval: {
doc: 'Interval in Redis for leader selection check, in seconds',
doc: 'Interval (in seconds) for leader check in multi-main setup',
format: Number,
default: 3,
env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL',
env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL',
},
},

View file

@ -1,7 +1,7 @@
import { Authorized, Post, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { License } from '../License';
@Authorized('any')
@ -9,7 +9,7 @@ import { License } from '../License';
@Service()
export class OrchestrationController {
constructor(
private readonly orchestrationService: SingleMainInstancePublisher,
private readonly singleMainSetup: SingleMainSetup,
private readonly licenseService: License,
) {}
@ -21,18 +21,18 @@ export class OrchestrationController {
async getWorkersStatus(req: OrchestrationRequest.Get) {
if (!this.licenseService.isWorkerViewLicensed()) return;
const id = req.params.id;
return this.orchestrationService.getWorkerStatus(id);
return this.singleMainSetup.getWorkerStatus(id);
}
@Post('/worker/status')
async getWorkersStatusAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerStatus();
return this.singleMainSetup.getWorkerStatus();
}
@Post('/worker/ids')
async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerIds();
return this.singleMainSetup.getWorkerIds();
}
}

View file

@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { Logger } from '@/Logger';
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
@ -207,9 +207,7 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await Container.get(
SingleMainInstancePublisher,
).broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
}
return destination;
}
@ -235,9 +233,7 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id];
}
if (notifyWorkers) {
await Container.get(
SingleMainInstancePublisher,
).broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
}
return result;
}

View file

@ -117,7 +117,7 @@ export declare namespace WorkflowRequest {
type GetAllActive = AuthenticatedRequest;
type GetAllActivationErrors = Get;
type GetActivationError = Get;
type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>;

View file

@ -80,21 +80,21 @@ export class CacheService extends EventEmitter {
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
*/
async get(
async get<T = unknown>(
key: string,
options: {
fallbackValue?: unknown;
refreshFunction?: (key: string) => Promise<unknown>;
fallbackValue?: T;
refreshFunction?: (key: string) => Promise<T>;
refreshTtl?: number;
} = {},
): Promise<unknown> {
): Promise<T | undefined> {
if (!key || key.length === 0) {
return;
}
const value = await this.cache?.store.get(key);
if (value !== undefined) {
this.emit(this.metricsCounterEvents.cacheHit);
return value;
return value as T;
}
this.emit(this.metricsCounterEvents.cacheMiss);
if (options.refreshFunction) {

View file

@ -5,7 +5,7 @@ import config from '@/config';
import { EventEmitter } from 'node:events';
export abstract class OrchestrationService extends EventEmitter {
protected initialized = false;
protected isInitialized = false;
protected queueModeId: string;
@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter {
}
sanityCheck(): boolean {
return this.initialized && this.isQueueMode;
return this.isInitialized && this.isQueueMode;
}
async init() {
await this.initPublisher();
this.initialized = true;
this.isInitialized = true;
}
async shutdown() {
await this.redisPublisher?.destroy();
this.initialized = false;
this.isInitialized = false;
}
protected async initPublisher() {

View file

@ -1,50 +1,61 @@
import config from '@/config';
import { Service } from 'typedi';
import { TIME } from '@/constants';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
/**
* For use in main instance, in multiple main instances cluster.
*/
@Service()
export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
export class MultiMainSetup extends SingleMainSetup {
private id = this.queueModeId;
private leaderId: string | undefined;
private isLicensed = false;
get isEnabled() {
return (
config.getEnv('executions.mode') === 'queue' &&
config.getEnv('multiMainSetup.enabled') &&
this.isLicensed
);
}
get isLeader() {
return this.id === this.leaderId;
return config.getEnv('multiMainSetup.instanceType') === 'leader';
}
get isFollower() {
return !this.isLeader;
}
setLicensed(newState: boolean) {
this.isLicensed = newState;
}
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
private leaderCheckInterval: NodeJS.Timer | undefined;
async init() {
if (this.initialized) return;
if (this.isInitialized) return;
await this.initPublisher();
this.initialized = true;
this.isInitialized = true;
await this.tryBecomeLeader();
await this.tryBecomeLeader(); // prevent initial wait
this.leaderCheckInterval = setInterval(
async () => {
await this.checkLeader();
},
config.getEnv('leaderSelection.interval') * TIME.SECOND,
config.getEnv('multiMainSetup.interval') * TIME.SECOND,
);
}
async destroy() {
async shutdown() {
if (!this.isInitialized) return;
clearInterval(this.leaderCheckInterval);
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
} else {
this.logger.debug(`Leader is other instance "${leaderId}"`);
this.leaderId = leaderId;
config.set('multiMainSetup.instanceType', 'follower');
}
}
private async tryBecomeLeader() {
if (this.isLeader || !this.redisPublisher.redisClient) return;
if (
config.getEnv('multiMainSetup.instanceType') === 'leader' ||
!this.redisPublisher.redisClient
) {
return;
}
// this can only succeed if leadership is currently vacant
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
if (keySetSuccessfully) {
this.logger.debug(`Leader is now this instance "${this.id}"`);
this.leaderId = this.id;
this.emit('leadershipChange', this.id);
config.set('multiMainSetup.instanceType', 'leader');
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
this.emit('leadershipChange', this.id);
} else {
config.set('multiMainSetup.instanceType', 'follower');
}
}
async broadcastWorkflowActiveStateChanged(payload: {
workflowId: string;
oldState: boolean;
newState: boolean;
versionId: string;
}) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowActiveStateChanged',
payload,
});
}
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowFailedToActivate',
payload,
});
}
}

View file

@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service';
* For use in main instance, in single main instance scenario.
*/
@Service()
export class SingleMainInstancePublisher extends OrchestrationService {
export class SingleMainSetup extends OrchestrationService {
constructor(protected readonly logger: Logger) {
super();
}
sanityCheck() {
return this.initialized && this.isQueueMode && this.isMainInstance;
return this.isInitialized && this.isQueueMode && this.isMainInstance;
}
async getWorkerStatus(id?: string) {

View file

@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { Push } from '@/push';
import { MultiMainSetup } from './MultiMainSetup.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
export async function handleCommandMessageMain(messageString: string) {
const queueModeId = config.get('redis.queueModeId');
const isMainInstance = config.get('generic.instanceType') === 'main';
const queueModeId = config.getEnv('redis.queueModeId');
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
const message = messageToRedisServiceCommandObject(messageString);
const logger = Container.get(Logger);
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
if (message) {
logger.debug(
@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) {
return message;
}
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) {
if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
logger.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) {
return message;
}
await Container.get(ExternalSecretsManager).reloadAllProviders();
break;
case 'workflowActiveStateChanged': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
if (
typeof workflowId !== 'string' ||
typeof oldState !== 'boolean' ||
typeof newState !== 'boolean' ||
typeof versionId !== 'string'
) {
break;
}
const push = Container.get(Push);
if (!oldState && newState) {
try {
await activeWorkflowRunner.add(workflowId, 'activate');
push.broadcast('workflowActivated', { workflowId });
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await Container.get(WorkflowRepository).update(workflowId, {
active: false,
versionId,
});
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
workflowId,
errorMessage: error.message,
});
}
} else if (oldState && !newState) {
await activeWorkflowRunner.remove(workflowId);
push.broadcast('workflowDeactivated', { workflowId });
} else {
await activeWorkflowRunner.remove(workflowId);
await activeWorkflowRunner.add(workflowId, 'update');
}
await activeWorkflowRunner.removeActivationError(workflowId);
}
case 'workflowFailedToActivate': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, errorMessage } = message.payload ?? {};
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
}
default:
break;
}

View file

@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWebhookInstance;
return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
}
}

View file

@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service()
export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWorkerInstance;
return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
}
async publishToEventLog(message: AbstractEventMessage) {

View file

@ -1,4 +1,4 @@
import Container, { Service } from 'typedi';
import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
@ -23,16 +23,13 @@ export class PruningService {
public hardDeletionTimeout: NodeJS.Timeout | undefined;
private isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
) {}
async isPruningEnabled() {
isPruningEnabled() {
if (
!config.getEnv('executions.pruneData') ||
inTest ||
@ -41,75 +38,60 @@ export class PruningService {
return false;
}
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
return multiMainInstancePublisher.isLeader;
if (
config.getEnv('multiMainSetup.enabled') &&
config.getEnv('multiMainSetup.instanceType') === 'follower'
) {
return false;
}
return true;
}
/**
* @important Call only after DB connection is established and migrations have completed.
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
this.scheduleHardDeletion();
}
async stopPruning() {
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) return;
}
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
stopPruning() {
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout);
}
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.softDeletionInterval = setInterval(
async () => this.softDeleteOnPruningCycle(),
this.rates.softDeletion,
);
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
}
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.hardDeletionTimeout = setTimeout(
async () => this.hardDeleteOnPruningCycle(),
this.rates.hardDeletion,
);
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
}
/**
* Mark executions as deleted based on age and count, in a pruning cycle.
*/
async softDeleteOnPruningCycle() {
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
this.logger.debug('[Pruning] Starting soft-deletion of executions');
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
@ -157,8 +139,11 @@ export class PruningService {
.execute();
if (result.affected === 0) {
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
this.logger.debug('[Pruning] Found no executions to soft-delete');
return;
}
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
}
/**
@ -187,21 +172,23 @@ export class PruningService {
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
if (executionIds.length === 0) {
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
this.logger.debug('[Pruning] Found no executions to hard-delete');
this.scheduleHardDeletion();
return;
}
try {
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds,
});
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.executionRepository.delete({ id: In(executionIds) });
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) {
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
this.logger.error('[Pruning] Failed to hard-delete executions', {
executionIds,
error: error instanceof Error ? error.message : `${error}`,
});

View file

@ -6,7 +6,9 @@ export type RedisServiceCommand =
| 'restartEventBus'
| 'stopWorker'
| 'reloadLicense'
| 'reloadExternalSecretsProviders';
| 'reloadExternalSecretsProviders'
| 'workflowActiveStateChanged' // multi-main only
| 'workflowFailedToActivate'; // multi-main only
/**
* An object to be sent via Redis pub/sub from the main process to the workers.
@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = {
| {
command: 'stopWorker';
}
| {
command: 'workflowActiveStateChanged';
payload: {
oldState: boolean;
newState: boolean;
workflowId: string;
};
}
);
export type RedisServiceCommandObject = {

View file

@ -30,6 +30,7 @@ import { isStringArray, isWorkflowIdValid } from '@/utils';
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
import { BinaryDataService } from 'n8n-core';
import { Logger } from '@/Logger';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -212,6 +213,8 @@ export class WorkflowsService {
);
}
const oldState = shared.workflow.active;
if (
!forceSave &&
workflow.versionId !== '' &&
@ -255,9 +258,14 @@ export class WorkflowsService {
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
/**
* If the workflow being updated is stored as `active`, remove it from
* active workflows in memory, and re-add it after the update.
*
* If a trigger or poller in the workflow was updated, the new value
* will take effect only on removing and re-adding.
*/
if (shared.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await Container.get(ActiveWorkflowRunner).remove(workflowId);
}
@ -364,6 +372,21 @@ export class WorkflowsService {
}
}
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const multiMainSetup = Container.get(MultiMainSetup);
await multiMainSetup.init();
if (multiMainSetup.isEnabled) {
await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({
workflowId,
oldState,
newState: updatedWorkflow.active,
versionId: shared.workflow.versionId,
});
}
}
return updatedWorkflow;
}

View file

@ -17,10 +17,9 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import type { User } from '@db/entities/User';
import type { WebhookEntity } from '@db/entities/WebhookEntity';
import { NodeTypes } from '@/NodeTypes';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
import { mockInstance } from '../shared/mocking';
import { chooseRandomly } from './shared/random';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../shared/mocking';
import { setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb';
import { createOwner } from './shared/db/users';
@ -30,9 +29,13 @@ mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows);
mockInstance(Push);
mockInstance(SecretsHelper);
mockInstance(MultiMainInstancePublisher);
const webhookService = mockInstance(WebhookService);
const multiMainSetup = mockInstance(MultiMainSetup, {
isEnabled: false,
isLeader: false,
isFollower: false,
});
setSchedulerAsLoadedNode();
@ -230,7 +233,7 @@ describe('executeErrorWorkflow()', () => {
describe('add()', () => {
describe('in single-main scenario', () => {
test('leader should add webhooks, triggers and pollers', async () => {
test('should add webhooks, triggers and pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await createWorkflow({ active: true }, owner);
@ -252,17 +255,20 @@ describe('add()', () => {
describe('in multi-main scenario', () => {
describe('leader', () => {
test('on regular activation mode, leader should add webhooks only', async () => {
describe('on non-leadership-change activation mode', () => {
test('should add webhooks only', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await createWorkflow({ active: true }, owner);
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', true);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
@ -273,18 +279,22 @@ describe('add()', () => {
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
describe('on leadership change activation mode', () => {
test('should add triggers and pollers only', async () => {
const mode = 'leadershipChange';
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', true);
const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
@ -296,19 +306,23 @@ describe('add()', () => {
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
});
describe('follower', () => {
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
describe('on any activation mode', () => {
test('should not add webhooks, triggers or pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: false });
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', false);
const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
@ -322,6 +336,7 @@ describe('add()', () => {
});
});
});
});
describe('addWebhooks()', () => {
test('should call `WebhookService.storeWebhook()`', async () => {

View file

@ -1,55 +0,0 @@
import * as Config from '@oclif/config';
import { DataSource } from 'typeorm';
import { Start } from '@/commands/start';
import { BaseCommand } from '@/commands/BaseCommand';
import config from '@/config';
import { License } from '@/License';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { RedisService } from '@/services/redis.service';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { mockInstance } from '../../shared/mocking';
const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
beforeAll(() => {
mockInstance(DataSource);
mockInstance(ExternalSecretsManager);
mockInstance(ActiveWorkflowRunner);
mockInstance(WorkflowHistoryManager);
mockInstance(RedisService);
mockInstance(RedisServicePubSubPublisher);
mockInstance(RedisServicePubSubSubscriber);
mockInstance(MultiMainInstancePublisher);
mockInstance(OrchestrationHandlerMainService);
});
afterEach(() => {
config.load(config.default);
jest.restoreAllMocks();
});
test('should not init license if instance is follower in multi-main scenario', async () => {
config.set('executions.mode', 'queue');
config.set('endpoints.disableUi', true);
config.set('leaderSelection.enabled', true);
jest.spyOn(MultiMainInstancePublisher.prototype, 'isFollower', 'get').mockReturnValue(true);
jest.spyOn(BaseCommand.prototype, 'init').mockImplementation(async () => {});
const licenseMock = mockInstance(License, {
isMultipleMainInstancesLicensed: jest.fn().mockReturnValue(true),
});
const startCmd = new Start([], oclifConfig);
await startCmd.init();
expect(licenseMock.init).not.toHaveBeenCalled();
});

View file

@ -16,6 +16,7 @@ import { PostHogClient } from '@/posthog';
import { RedisService } from '@/services/redis.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../../shared/mocking';
@ -37,6 +38,7 @@ beforeAll(async () => {
mockInstance(RedisService);
mockInstance(RedisServicePubSubPublisher);
mockInstance(RedisServicePubSubSubscriber);
mockInstance(MultiMainSetup);
});
test('worker initializes all its components', async () => {

View file

@ -16,6 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { mockNodeTypesData } from '../../../unit/Helpers';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../../../shared/mocking';
export { setupTestServer } from './testServer';
@ -28,6 +29,8 @@ export { setupTestServer } from './testServer';
* Initialize node types.
*/
export async function initActiveWorkflowRunner() {
mockInstance(MultiMainSetup);
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
const workflowRunner = Container.get(ActiveWorkflowRunner);
await workflowRunner.init();

View file

@ -0,0 +1,62 @@
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as testDb from './shared/testDb';
import { WorkflowsService } from '@/workflows/workflows.services';
import { mockInstance } from '../shared/mocking';
import { Telemetry } from '@/telemetry';
import { createOwner } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows';
mockInstance(Telemetry);
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
beforeAll(async () => {
await testDb.init();
});
afterEach(async () => {
await testDb.truncate(['Workflow']);
jest.restoreAllMocks();
});
afterAll(async () => {
await testDb.terminate();
});
describe('update()', () => {
test('should remove and re-add to active workflows on `active: true` payload', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
await WorkflowsService.update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0];
expect(removedWorkflowId).toBe(workflow.id);
expect(addSpy).toHaveBeenCalledTimes(1);
const [addedWorkflowId, activationMode] = addSpy.mock.calls[0];
expect(addedWorkflowId).toBe(workflow.id);
expect(activationMode).toBe('update');
});
test('should remove from active workflows on `active: false` payload', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
workflow.active = false;
await WorkflowsService.update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0];
expect(removedWorkflowId).toBe(workflow.id);
expect(addSpy).not.toHaveBeenCalled();
});
});

View file

@ -6,6 +6,7 @@ import { License } from '@/License';
import { Logger } from '@/Logger';
import { N8N_VERSION } from '@/constants';
import { mockInstance } from '../shared/mocking';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
jest.mock('@n8n_io/license-sdk');
@ -27,9 +28,10 @@ describe('License', () => {
let license: License;
const logger = mockInstance(Logger);
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
const multiMainSetup = mockInstance(MultiMainSetup);
beforeEach(async () => {
license = new License(logger, instanceSettings, mock(), mock());
license = new License(logger, instanceSettings, mock(), mock(), mock());
await license.init();
});
@ -52,7 +54,7 @@ describe('License', () => {
});
test('initializes license manager for worker', async () => {
license = new License(logger, instanceSettings, mock(), mock());
license = new License(logger, instanceSettings, mock(), mock(), mock());
await license.init('worker');
expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false,

View file

@ -1,6 +1,6 @@
import Container from 'typedi';
import config from '@/config';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
import { eventBus } from '@/eventbus';
import { RedisService } from '@/services/redis.service';
@ -10,10 +10,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o
import * as helpers from '@/services/orchestration/helpers';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { Logger } from '@/Logger';
import { Push } from '@/push';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { mockInstance } from '../../shared/mocking';
const os = Container.get(SingleMainInstancePublisher);
const os = Container.get(SingleMainSetup);
const handler = Container.get(OrchestrationHandlerMainService);
mockInstance(ActiveWorkflowRunner);
let queueModeId: string;
@ -33,6 +36,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
describe('Orchestration Service', () => {
const logger = mockInstance(Logger);
mockInstance(Push);
beforeAll(async () => {
mockInstance(RedisService);
mockInstance(ExternalSecretsManager);

View file

@ -421,7 +421,25 @@ export type IPushData =
| PushDataRemoveNodeType
| PushDataTestWebhook
| PushDataExecutionRecovered
| PushDataWorkerStatusMessage;
| PushDataWorkerStatusMessage
| PushDataActiveWorkflowAdded
| PushDataActiveWorkflowRemoved
| PushDataWorkflowFailedToActivate;
type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
type: 'workflowActivated';
};
type PushDataActiveWorkflowRemoved = {
data: IActiveWorkflowRemoved;
type: 'workflowDeactivated';
};
type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered;
@ -491,6 +509,19 @@ export interface IPushDataExecutionFinished {
retryOf?: string;
}
export interface IActiveWorkflowAdded {
workflowId: string;
}
export interface IActiveWorkflowRemoved {
workflowId: string;
}
export interface IWorkflowFailedToActivate {
workflowId: string;
errorMessage: string;
}
export interface IPushDataUnsavedExecutionFinished {
executionId: string;
data: { finished: true; stoppedAt: Date };

View file

@ -119,7 +119,7 @@ export default defineComponent({
} else {
errorMessage = this.$locale.baseText(
'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined',
{ interpolate: { message: errorData.error.message } },
{ interpolate: { message: errorData } },
);
}
} catch (error) {

View file

@ -291,6 +291,33 @@ export const pushConnection = defineComponent({
}
}
if (
receivedData.type === 'workflowFailedToActivate' &&
this.workflowsStore.workflowId === receivedData.data.workflowId
) {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
this.workflowsStore.setActive(false);
this.showError(
new Error(receivedData.data.errorMessage),
this.$locale.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
return true;
}
if (receivedData.type === 'workflowActivated') {
this.workflowsStore.setWorkflowActive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'workflowDeactivated') {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
// The workflow finished executing
let pushData: IPushDataExecutionFinished;

View file

@ -10,7 +10,6 @@ import {
} from '@/constants';
import type {
ExecutionsQueryFilter,
IActivationError,
IExecutionDeleteFilter,
IExecutionPushResponse,
IExecutionResponse,
@ -365,7 +364,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
});
},
async getActivationError(id: string): Promise<IActivationError | undefined> {
async getActivationError(id: string): Promise<string | undefined> {
const rootStore = useRootStore();
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
},
@ -552,6 +551,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
if (this.workflowsById[workflowId]) {
this.workflowsById[workflowId].active = true;
}
if (workflowId === this.workflow.id) {
this.setActive(true);
}
},
setWorkflowInactive(workflowId: string): void {
@ -562,6 +564,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
if (this.workflowsById[workflowId]) {
this.workflowsById[workflowId].active = false;
}
if (workflowId === this.workflow.id) {
this.setActive(false);
}
},
async fetchActiveWorkflows(): Promise<string[]> {