feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario (#7643)

Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926

### Manual workflow activation and deactivation

In a multi-main scenario, if the user manually activates or deactivates
a workflow, the process (whether leader or follower) that handles the
PATCH request and updates its internal state should send a message into
the command channel, so that all other main processes update their
internal state accordingly:

- Add to `ActiveWorkflows` if activating
- Remove from `ActiveWorkflows` if deactivating
- Remove and re-add to `ActiveWorkflows` if the update did not change
activation status.

After updating their internal state, if activating or deactivating, the
recipient main processes should push a message to all connected
frontends so that these can update their stores and so reflect the value
in the UI.

### Workflow activation errors

On failure to activate a workflow, the main instance should record the
error in Redis - main instances should always pull activation errors
from Redis in a multi-main scenario.

### Leadership change

On leadership change...

- The old leader should stop pruning and the new leader should start
pruning.
- The old leader should remove trigger- and poller-based workflows and
the new leader should add them.
This commit is contained in:
Iván Ovejero 2023-11-17 15:58:50 +01:00 committed by GitHub
parent b3a3f16bc2
commit 4c4082503c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 639 additions and 336 deletions

View file

@ -0,0 +1,52 @@
import { Service } from 'typedi';
import { CacheService } from './services/cache.service';
import { jsonParse } from 'n8n-workflow';
type ActivationErrors = {
[workflowId: string]: string; // error message
};
@Service()
export class ActivationErrorsService {
private readonly cacheKey = 'workflow-activation-errors';
constructor(private readonly cacheService: CacheService) {}
async set(workflowId: string, errorMessage: string) {
const errors = await this.getAll();
errors[workflowId] = errorMessage;
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}
async unset(workflowId: string) {
const errors = await this.getAll();
if (Object.keys(errors).length === 0) return;
delete errors[workflowId];
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
}
async get(workflowId: string) {
const errors = await this.getAll();
if (Object.keys(errors).length === 0) return null;
return errors[workflowId];
}
async getAll() {
const errors = await this.cacheService.get<string>(this.cacheKey);
if (!errors) return {};
return jsonParse<ActivationErrors>(errors);
}
async clearAll() {
await this.cacheService.delete(this.cacheKey);
}
}

View file

@ -2,8 +2,9 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import Container, { Service } from 'typedi'; import { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import config from '@/config';
import type { import type {
ExecutionError, ExecutionError,
@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository';
import config from '@/config'; import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee'; import { ActivationErrorsService } from '@/ActivationErrors.service';
const WEBHOOK_PROD_UNREGISTERED_HINT = const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
export class ActiveWorkflowRunner implements IWebhookManager { export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows(); activeWorkflows = new ActiveWorkflows();
private activationErrors: {
[workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {};
private queuedActivations: { private queuedActivations: {
[workflowId: string]: { [workflowId: string]: {
activationMode: WorkflowActivateMode; activationMode: WorkflowActivateMode;
@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}; };
} = {}; } = {};
isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions, private readonly activeExecutions: ActiveExecutions,
@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
private readonly webhookService: WebhookService, private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository, private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository, private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
) {} ) {}
async init() { async init() {
if (this.isMultiMainScenario) { if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const { MultiMainInstancePublisher } = await import( await this.multiMainSetup.init();
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await this.multiMainInstancePublisher.init();
} }
await this.addActiveWorkflows('init'); await this.addActiveWorkflows('init');
@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
async allActiveInStorage(user?: User) { async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner'; const isFullAccess = !user || user.globalRole.name === 'owner';
const activationErrors = await this.activationErrorsService.getAll();
if (isFullAccess) { if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({ const activeWorkflows = await this.workflowRepository.find({
select: ['id'], select: ['id'],
@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return activeWorkflows return activeWorkflows
.map((workflow) => workflow.id) .map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]); .filter((workflowId) => !activationErrors[workflowId]);
} }
const where = whereClause({ const where = whereClause({
@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return sharings return sharings
.map((sharing) => sharing.workflowId) .map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]); .filter((workflowId) => !activationErrors[workflowId]);
} }
/** /**
@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/** /**
* Return error if there was a problem activating the workflow * Return error if there was a problem activating the workflow
*/ */
getActivationError(workflowId: string) { async getActivationError(workflowId: string) {
return this.activationErrors[workflowId]; return this.activationErrorsService.get(workflowId);
} }
/** /**
@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Remove the workflow as "active" // Remove the workflow as "active"
void this.activeWorkflows.remove(workflowData.id); void this.activeWorkflows.remove(workflowData.id);
this.activationErrors[workflowData.id] = {
time: new Date().getTime(), void this.activationErrorsService.set(workflowData.id, error.message);
error: {
message: error.message,
},
};
// Run Error Workflow if defined // Run Error Workflow if defined
const activationError = new WorkflowActivationError( const activationError = new WorkflowActivationError(
@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
this.logger.verbose('Finished activating workflows (startup)'); this.logger.verbose('Finished activating workflows (startup)');
} }
async addAllTriggerAndPollerBasedWorkflows() { async clearAllActivationErrors() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...'); await this.activationErrorsService.clearAll();
}
async addAllTriggerAndPollerBasedWorkflows() {
await this.addActiveWorkflows('leadershipChange'); await this.addActiveWorkflows('leadershipChange');
} }
async removeAllTriggerAndPollerBasedWorkflows() { async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
} }
@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
let shouldAddWebhooks = true; let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true; let shouldAddTriggersAndPollers = true;
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') { if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false; shouldAddWebhooks = this.multiMainSetup.isLeader;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false; shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
} }
if (this.isMultiMainScenario && activationMode === 'leadershipChange') { if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
shouldAddWebhooks = false; shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true; shouldAddTriggersAndPollers = true;
} }
@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id); const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
if (shouldAddWebhooks) { if (shouldAddWebhooks) {
this.logger.debug('============'); this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode); await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
} }
if (shouldAddTriggersAndPollers) { if (shouldAddTriggersAndPollers) {
this.logger.debug('============'); this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
await this.addTriggersAndPollers(dbWorkflow, workflow, { await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode, activationMode,
@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// Workflow got now successfully activated so make sure nothing is left in the queue // Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
if (this.activationErrors[workflowId]) { await this.activationErrorsService.unset(workflowId);
delete this.activationErrors[workflowId];
}
const triggerCount = this.countTriggers(workflow, additionalData); const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount); await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (error) { } catch (e) {
this.activationErrors[workflowId] = { const error = e instanceof Error ? e : new Error(`${e}`);
time: new Date().getTime(), await this.activationErrorsService.set(workflowId, error.message);
error: {
message: error.message,
},
};
throw error; throw e;
} }
// If for example webhooks get created it sometimes has to save the // If for example webhooks get created it sometimes has to save the
@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
); );
} }
if (this.activationErrors[workflowId] !== undefined) { await this.activationErrorsService.unset(workflowId);
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
if (this.queuedActivations[workflowId] !== undefined) { if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}); });
} }
} }
async removeActivationError(workflowId: string) {
await this.activationErrorsService.unset(workflowId);
}
} }

View file

@ -19,7 +19,7 @@ import {
import { License } from '@/License'; import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
@Service() @Service()
export class ExternalSecretsManager { export class ExternalSecretsManager {
@ -82,7 +82,7 @@ export class ExternalSecretsManager {
} }
async broadcastReloadExternalSecretsProviders() { async broadcastReloadExternalSecretsProviders() {
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders(); await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
} }
private decryptSecretsSettings(value: string): ExternalSecretsSettings { private decryptSecretsSettings(value: string): ExternalSecretsSettings {

View file

@ -298,6 +298,7 @@ export interface IDiagnosticInfo {
ldap_allowed: boolean; ldap_allowed: boolean;
saml_enabled: boolean; saml_enabled: boolean;
binary_data_s3: boolean; binary_data_s3: boolean;
multi_main_setup_enabled: boolean;
licensePlanName?: string; licensePlanName?: string;
licenseTenantId?: number; licenseTenantId?: number;
} }
@ -469,7 +470,25 @@ export type IPushData =
| PushDataNodeDescriptionUpdated | PushDataNodeDescriptionUpdated
| PushDataExecutionRecovered | PushDataExecutionRecovered
| PushDataActiveWorkflowUsersChanged | PushDataActiveWorkflowUsersChanged
| PushDataWorkerStatusMessage; | PushDataWorkerStatusMessage
| PushDataWorkflowActivated
| PushDataWorkflowDeactivated
| PushDataWorkflowFailedToActivate;
type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataWorkflowActivated = {
data: IActiveWorkflowChanged;
type: 'workflowActivated';
};
type PushDataWorkflowDeactivated = {
data: IActiveWorkflowChanged;
type: 'workflowDeactivated';
};
type PushDataActiveWorkflowUsersChanged = { type PushDataActiveWorkflowUsersChanged = {
data: IActiveWorkflowUsersChanged; data: IActiveWorkflowUsersChanged;
@ -536,11 +555,24 @@ export interface IActiveWorkflowUser {
lastSeen: Date; lastSeen: Date;
} }
export interface IActiveWorkflowAdded {
workflowId: Workflow['id'];
}
export interface IActiveWorkflowUsersChanged { export interface IActiveWorkflowUsersChanged {
workflowId: Workflow['id']; workflowId: Workflow['id'];
activeUsers: IActiveWorkflowUser[]; activeUsers: IActiveWorkflowUser[];
} }
interface IActiveWorkflowChanged {
workflowId: Workflow['id'];
}
interface IWorkflowFailedToActivate {
workflowId: Workflow['id'];
errorMessage: string;
}
export interface IPushDataExecutionRecovered { export interface IPushDataExecutionRecovered {
executionId: string; executionId: string;
} }

View file

@ -16,6 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service'; import { RedisService } from './services/redis.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
type FeatureReturnType = Partial< type FeatureReturnType = Partial<
{ {
@ -40,6 +41,7 @@ export class License {
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly multiMainSetup: MultiMainSetup,
private readonly settingsRepository: SettingsRepository, private readonly settingsRepository: SettingsRepository,
private readonly workflowRepository: WorkflowRepository, private readonly workflowRepository: WorkflowRepository,
) {} ) {}
@ -49,6 +51,10 @@ export class License {
return; return;
} }
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
await this.multiMainSetup.init();
}
const isMainInstance = instanceType === 'main'; const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl'); const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
@ -114,22 +120,28 @@ export class License {
} }
async onFeatureChange(_features: TFeatures): Promise<void> { async onFeatureChange(_features: TFeatures): Promise<void> {
if (config.getEnv('executions.mode') === 'queue') { if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
if (config.getEnv('leaderSelection.enabled')) { const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
const { MultiMainInstancePublisher } = await import( | boolean
'@/services/orchestration/main/MultiMainInstance.publisher.ee' | undefined;
this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false);
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
this.logger.debug(
'[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...',
); );
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping sending of reloadLicense command...');
return; return;
} }
if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) {
this.logger.debug(
'[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.',
);
}
} }
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) { if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service'); this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();

View file

@ -215,6 +215,7 @@ export class Server extends AbstractServer {
ldap_allowed: isLdapCurrentAuthenticationMethod(), ldap_allowed: isLdapCurrentAuthenticationMethod(),
saml_enabled: isSamlCurrentAuthenticationMethod(), saml_enabled: isSamlCurrentAuthenticationMethod(),
binary_data_s3: isS3Available && isS3Selected && isS3Licensed, binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'),
licensePlanName: Container.get(License).getPlanName(), licensePlanName: Container.get(License).getPlanName(),
licenseTenantId: config.getEnv('license.tenantId'), licenseTenantId: config.getEnv('license.tenantId'),
}; };
@ -448,7 +449,7 @@ export class Server extends AbstractServer {
// Returns if the workflow with the given id had any activation errors // Returns if the workflow with the given id had any activation errors
this.app.get( this.app.get(
`/${this.restEndpoint}/active/error/:id`, `/${this.restEndpoint}/active/error/:id`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActivationErrors) => { ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
const { id: workflowId } = req.params; const { id: workflowId } = req.params;
const shared = await Container.get(SharedWorkflowRepository).findOne({ const shared = await Container.get(SharedWorkflowRepository).findOne({

View file

@ -243,21 +243,6 @@ export abstract class BaseCommand extends Command {
} }
async initLicense(): Promise<void> { async initLicense(): Promise<void> {
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) {
this.logger.debug('Instance is follower, skipping license initialization...');
return;
}
}
const license = Container.get(License); const license = Container.get(License);
await license.init(this.instanceType ?? 'main'); await license.init(this.instanceType ?? 'main');

View file

@ -25,9 +25,10 @@ import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { License, FeatureNotLicensedError } from '@/License'; import { License, FeatureNotLicensedError } from '@/License';
import { IConfig } from '@oclif/config'; import { IConfig } from '@oclif/config';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { PruningService } from '@/services/pruning.service'; import { PruningService } from '@/services/pruning.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { SettingsRepository } from '@db/repositories/settings.repository'; import { SettingsRepository } from '@db/repositories/settings.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -112,18 +113,14 @@ export class Start extends BaseCommand {
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete // Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
await Container.get(License).shutdown(); await Container.get(License).shutdown();
if (await this.pruningService.isPruningEnabled()) { if (this.pruningService.isPruningEnabled()) {
await this.pruningService.stopPruning(); this.pruningService.stopPruning();
} }
if (config.getEnv('leaderSelection.enabled')) { if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await Container.get(MultiMainInstancePublisher).destroy(); await Container.get(MultiMainSetup).shutdown();
} }
await Container.get(InternalHooks).onN8nStop(); await Container.get(InternalHooks).onN8nStop();
@ -230,38 +227,42 @@ export class Start extends BaseCommand {
} }
async initOrchestration() { async initOrchestration() {
if (config.get('executions.mode') !== 'queue') return; if (config.getEnv('executions.mode') !== 'queue') return;
if (!config.get('leaderSelection.enabled')) { // queue mode in single-main scenario
await Container.get(SingleMainInstancePublisher).init();
if (!config.getEnv('multiMainSetup.enabled')) {
await Container.get(SingleMainSetup).init();
await Container.get(OrchestrationHandlerMainService).init(); await Container.get(OrchestrationHandlerMainService).init();
return; return;
} }
// multi-main scenario // queue mode in multi-main scenario
const { MultiMainInstancePublisher } = await import( if (!Container.get(License).isMultipleMainInstancesLicensed()) {
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (
multiMainInstancePublisher.isLeader &&
!Container.get(License).isMultipleMainInstancesLicensed()
) {
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
} }
await Container.get(OrchestrationHandlerMainService).init(); await Container.get(OrchestrationHandlerMainService).init();
multiMainInstancePublisher.on('leadershipChange', async () => { const multiMainSetup = Container.get(MultiMainSetup);
if (multiMainInstancePublisher.isLeader) {
await multiMainSetup.init();
multiMainSetup.on('leadershipChange', async () => {
if (multiMainSetup.isLeader) {
this.logger.debug('[Leadership change] Clearing all activation errors...');
await this.activeWorkflowRunner.clearAllActivationErrors();
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else { } else {
// only in case of leadership change without shutdown this.logger.debug(
'[Leadership change] Removing all trigger- and poller-based workflows...',
);
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
} }
}); });
@ -333,10 +334,7 @@ export class Start extends BaseCommand {
await this.server.start(); await this.server.start();
this.pruningService = Container.get(PruningService); await this.initPruning();
if (await this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
// Start to get active workflows and run their triggers // Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init(); await this.activeWorkflowRunner.init();
@ -375,6 +373,32 @@ export class Start extends BaseCommand {
} }
} }
async initPruning() {
this.pruningService = Container.get(PruningService);
if (this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const multiMainSetup = Container.get(MultiMainSetup);
await multiMainSetup.init();
multiMainSetup.on('leadershipChange', async () => {
if (multiMainSetup.isLeader) {
if (this.pruningService.isPruningEnabled()) {
this.pruningService.startPruning();
}
} else {
if (this.pruningService.isPruningEnabled()) {
this.pruningService.stopPruning();
}
}
});
}
}
async catch(error: Error) { async catch(error: Error) {
console.log(error.stack); console.log(error.stack);
await this.exitWithCrash('Exiting due to an error.', error); await this.exitWithCrash('Exiting due to an error.', error);

View file

@ -1324,24 +1324,29 @@ export const schema = {
}, },
}, },
leaderSelection: { multiMainSetup: {
instanceType: {
doc: 'Type of instance in multi-main setup',
format: ['unset', 'leader', 'follower'] as const,
default: 'unset', // only until first leader key check
},
enabled: { enabled: {
doc: 'Whether to enable leader selection for multiple main instances (license required)', doc: 'Whether to enable multi-main setup for queue mode (license required)',
format: Boolean, format: Boolean,
default: false, default: false,
env: 'N8N_LEADER_SELECTION_ENABLED', env: 'N8N_MULTI_MAIN_SETUP_ENABLED',
}, },
ttl: { ttl: {
doc: 'Time to live in Redis for leader selection key, in seconds', doc: 'Time to live (in seconds) for leader key in multi-main setup',
format: Number, format: Number,
default: 10, default: 10,
env: 'N8N_LEADER_SELECTION_KEY_TTL', env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL',
}, },
interval: { interval: {
doc: 'Interval in Redis for leader selection check, in seconds', doc: 'Interval (in seconds) for leader check in multi-main setup',
format: Number, format: Number,
default: 3, default: 3,
env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL', env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL',
}, },
}, },

View file

@ -1,7 +1,7 @@
import { Authorized, Post, RestController } from '@/decorators'; import { Authorized, Post, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests'; import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { License } from '../License'; import { License } from '../License';
@Authorized('any') @Authorized('any')
@ -9,7 +9,7 @@ import { License } from '../License';
@Service() @Service()
export class OrchestrationController { export class OrchestrationController {
constructor( constructor(
private readonly orchestrationService: SingleMainInstancePublisher, private readonly singleMainSetup: SingleMainSetup,
private readonly licenseService: License, private readonly licenseService: License,
) {} ) {}
@ -21,18 +21,18 @@ export class OrchestrationController {
async getWorkersStatus(req: OrchestrationRequest.Get) { async getWorkersStatus(req: OrchestrationRequest.Get) {
if (!this.licenseService.isWorkerViewLicensed()) return; if (!this.licenseService.isWorkerViewLicensed()) return;
const id = req.params.id; const id = req.params.id;
return this.orchestrationService.getWorkerStatus(id); return this.singleMainSetup.getWorkerStatus(id);
} }
@Post('/worker/status') @Post('/worker/status')
async getWorkersStatusAll() { async getWorkersStatusAll() {
if (!this.licenseService.isWorkerViewLicensed()) return; if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerStatus(); return this.singleMainSetup.getWorkerStatus();
} }
@Post('/worker/ids') @Post('/worker/ids')
async getWorkerIdsAll() { async getWorkerIdsAll() {
if (!this.licenseService.isWorkerViewLicensed()) return; if (!this.licenseService.isWorkerViewLicensed()) return;
return this.orchestrationService.getWorkerIds(); return this.singleMainSetup.getWorkerIds();
} }
} }

View file

@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
@ -207,9 +207,7 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination; this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening(); this.destinations[destination.getId()].startListening();
if (notifyWorkers) { if (notifyWorkers) {
await Container.get( await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
SingleMainInstancePublisher,
).broadcastRestartEventbusAfterDestinationUpdate();
} }
return destination; return destination;
} }
@ -235,9 +233,7 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id]; delete this.destinations[id];
} }
if (notifyWorkers) { if (notifyWorkers) {
await Container.get( await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
SingleMainInstancePublisher,
).broadcastRestartEventbusAfterDestinationUpdate();
} }
return result; return result;
} }

View file

@ -117,7 +117,7 @@ export declare namespace WorkflowRequest {
type GetAllActive = AuthenticatedRequest; type GetAllActive = AuthenticatedRequest;
type GetAllActivationErrors = Get; type GetActivationError = Get;
type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>; type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>;

View file

@ -80,21 +80,21 @@ export class CacheService extends EventEmitter {
* @param options.refreshTtl Optional ttl for the refreshFunction's set call * @param options.refreshTtl Optional ttl for the refreshFunction's set call
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided * @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
*/ */
async get( async get<T = unknown>(
key: string, key: string,
options: { options: {
fallbackValue?: unknown; fallbackValue?: T;
refreshFunction?: (key: string) => Promise<unknown>; refreshFunction?: (key: string) => Promise<T>;
refreshTtl?: number; refreshTtl?: number;
} = {}, } = {},
): Promise<unknown> { ): Promise<T | undefined> {
if (!key || key.length === 0) { if (!key || key.length === 0) {
return; return;
} }
const value = await this.cache?.store.get(key); const value = await this.cache?.store.get(key);
if (value !== undefined) { if (value !== undefined) {
this.emit(this.metricsCounterEvents.cacheHit); this.emit(this.metricsCounterEvents.cacheHit);
return value; return value as T;
} }
this.emit(this.metricsCounterEvents.cacheMiss); this.emit(this.metricsCounterEvents.cacheMiss);
if (options.refreshFunction) { if (options.refreshFunction) {

View file

@ -5,7 +5,7 @@ import config from '@/config';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
export abstract class OrchestrationService extends EventEmitter { export abstract class OrchestrationService extends EventEmitter {
protected initialized = false; protected isInitialized = false;
protected queueModeId: string; protected queueModeId: string;
@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter {
} }
sanityCheck(): boolean { sanityCheck(): boolean {
return this.initialized && this.isQueueMode; return this.isInitialized && this.isQueueMode;
} }
async init() { async init() {
await this.initPublisher(); await this.initPublisher();
this.initialized = true; this.isInitialized = true;
} }
async shutdown() { async shutdown() {
await this.redisPublisher?.destroy(); await this.redisPublisher?.destroy();
this.initialized = false; this.isInitialized = false;
} }
protected async initPublisher() { protected async initPublisher() {

View file

@ -1,50 +1,61 @@
import config from '@/config'; import config from '@/config';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { TIME } from '@/constants'; import { TIME } from '@/constants';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
/**
* For use in main instance, in multiple main instances cluster.
*/
@Service() @Service()
export class MultiMainInstancePublisher extends SingleMainInstancePublisher { export class MultiMainSetup extends SingleMainSetup {
private id = this.queueModeId; private id = this.queueModeId;
private leaderId: string | undefined; private isLicensed = false;
get isEnabled() {
return (
config.getEnv('executions.mode') === 'queue' &&
config.getEnv('multiMainSetup.enabled') &&
this.isLicensed
);
}
get isLeader() { get isLeader() {
return this.id === this.leaderId; return config.getEnv('multiMainSetup.instanceType') === 'leader';
} }
get isFollower() { get isFollower() {
return !this.isLeader; return !this.isLeader;
} }
setLicensed(newState: boolean) {
this.isLicensed = newState;
}
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl'); private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
private leaderCheckInterval: NodeJS.Timer | undefined; private leaderCheckInterval: NodeJS.Timer | undefined;
async init() { async init() {
if (this.initialized) return; if (this.isInitialized) return;
await this.initPublisher(); await this.initPublisher();
this.initialized = true; this.isInitialized = true;
await this.tryBecomeLeader(); await this.tryBecomeLeader(); // prevent initial wait
this.leaderCheckInterval = setInterval( this.leaderCheckInterval = setInterval(
async () => { async () => {
await this.checkLeader(); await this.checkLeader();
}, },
config.getEnv('leaderSelection.interval') * TIME.SECOND, config.getEnv('multiMainSetup.interval') * TIME.SECOND,
); );
} }
async destroy() { async shutdown() {
if (!this.isInitialized) return;
clearInterval(this.leaderCheckInterval); clearInterval(this.leaderCheckInterval);
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey); if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
} else { } else {
this.logger.debug(`Leader is other instance "${leaderId}"`); this.logger.debug(`Leader is other instance "${leaderId}"`);
this.leaderId = leaderId; config.set('multiMainSetup.instanceType', 'follower');
} }
} }
private async tryBecomeLeader() { private async tryBecomeLeader() {
if (this.isLeader || !this.redisPublisher.redisClient) return; if (
config.getEnv('multiMainSetup.instanceType') === 'leader' ||
!this.redisPublisher.redisClient
) {
return;
}
// this can only succeed if leadership is currently vacant // this can only succeed if leadership is currently vacant
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id); const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
if (keySetSuccessfully) { if (keySetSuccessfully) {
this.logger.debug(`Leader is now this instance "${this.id}"`); this.logger.debug(`Leader is now this instance "${this.id}"`);
this.leaderId = this.id; config.set('multiMainSetup.instanceType', 'leader');
this.emit('leadershipChange', this.id);
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
this.emit('leadershipChange', this.id);
} else {
config.set('multiMainSetup.instanceType', 'follower');
} }
} }
async broadcastWorkflowActiveStateChanged(payload: {
workflowId: string;
oldState: boolean;
newState: boolean;
versionId: string;
}) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowActiveStateChanged',
payload,
});
}
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
if (!this.sanityCheck()) return;
await this.redisPublisher.publishToCommandChannel({
command: 'workflowFailedToActivate',
payload,
});
}
} }

View file

@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service';
* For use in main instance, in single main instance scenario. * For use in main instance, in single main instance scenario.
*/ */
@Service() @Service()
export class SingleMainInstancePublisher extends OrchestrationService { export class SingleMainSetup extends OrchestrationService {
constructor(protected readonly logger: Logger) { constructor(protected readonly logger: Logger) {
super(); super();
} }
sanityCheck() { sanityCheck() {
return this.initialized && this.isQueueMode && this.isMainInstance; return this.isInitialized && this.isQueueMode && this.isMainInstance;
} }
async getWorkerStatus(id?: string) { async getWorkerStatus(id?: string) {

View file

@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License'; import { License } from '@/License';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { Push } from '@/push';
import { MultiMainSetup } from './MultiMainSetup.ee';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
export async function handleCommandMessageMain(messageString: string) { export async function handleCommandMessageMain(messageString: string) {
const queueModeId = config.get('redis.queueModeId'); const queueModeId = config.getEnv('redis.queueModeId');
const isMainInstance = config.get('generic.instanceType') === 'main'; const isMainInstance = config.getEnv('generic.instanceType') === 'main';
const message = messageToRedisServiceCommandObject(messageString); const message = messageToRedisServiceCommandObject(messageString);
const logger = Container.get(Logger); const logger = Container.get(Logger);
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
if (message) { if (message) {
logger.debug( logger.debug(
@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) {
return message; return message;
} }
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) { if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
logger.error( logger.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) {
return message; return message;
} }
await Container.get(ExternalSecretsManager).reloadAllProviders(); await Container.get(ExternalSecretsManager).reloadAllProviders();
break;
case 'workflowActiveStateChanged': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
if (
typeof workflowId !== 'string' ||
typeof oldState !== 'boolean' ||
typeof newState !== 'boolean' ||
typeof versionId !== 'string'
) {
break;
}
const push = Container.get(Push);
if (!oldState && newState) {
try {
await activeWorkflowRunner.add(workflowId, 'activate');
push.broadcast('workflowActivated', { workflowId });
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
await Container.get(WorkflowRepository).update(workflowId, {
active: false,
versionId,
});
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
workflowId,
errorMessage: error.message,
});
}
} else if (oldState && !newState) {
await activeWorkflowRunner.remove(workflowId);
push.broadcast('workflowDeactivated', { workflowId });
} else {
await activeWorkflowRunner.remove(workflowId);
await activeWorkflowRunner.add(workflowId, 'update');
}
await activeWorkflowRunner.removeActivationError(workflowId);
}
case 'workflowFailedToActivate': {
if (!debounceMessageReceiver(message, 100)) {
message.payload = { result: 'debounced' };
return message;
}
const { workflowId, errorMessage } = message.payload ?? {};
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
}
default: default:
break; break;
} }

View file

@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service() @Service()
export class OrchestrationWebhookService extends OrchestrationService { export class OrchestrationWebhookService extends OrchestrationService {
sanityCheck(): boolean { sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWebhookInstance; return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
} }
} }

View file

@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service';
@Service() @Service()
export class OrchestrationWorkerService extends OrchestrationService { export class OrchestrationWorkerService extends OrchestrationService {
sanityCheck(): boolean { sanityCheck(): boolean {
return this.initialized && this.isQueueMode && this.isWorkerInstance; return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
} }
async publishToEventLog(message: AbstractEventMessage) { async publishToEventLog(message: AbstractEventMessage) {

View file

@ -1,4 +1,4 @@
import Container, { Service } from 'typedi'; import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService } from 'n8n-core';
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm'; import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils'; import { DateUtils } from 'typeorm/util/DateUtils';
@ -23,16 +23,13 @@ export class PruningService {
public hardDeletionTimeout: NodeJS.Timeout | undefined; public hardDeletionTimeout: NodeJS.Timeout | undefined;
private isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository, private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService, private readonly binaryDataService: BinaryDataService,
) {} ) {}
async isPruningEnabled() { isPruningEnabled() {
if ( if (
!config.getEnv('executions.pruneData') || !config.getEnv('executions.pruneData') ||
inTest || inTest ||
@ -41,75 +38,60 @@ export class PruningService {
return false; return false;
} }
if (this.isMultiMainScenario) { if (
const { MultiMainInstancePublisher } = await import( config.getEnv('multiMainSetup.enabled') &&
'@/services/orchestration/main/MultiMainInstance.publisher.ee' config.getEnv('multiMainSetup.instanceType') === 'follower'
); ) {
return false;
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
return multiMainInstancePublisher.isLeader;
} }
return true; return true;
} }
/** /**
* @important Call only after DB connection is established and migrations have completed. * @important Call this method only after DB migrations have completed.
*/ */
startPruning() { startPruning() {
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval(); this.setSoftDeletionInterval();
this.scheduleHardDeletion(); this.scheduleHardDeletion();
} }
async stopPruning() { stopPruning() {
if (this.isMultiMainScenario) { this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
await multiMainInstancePublisher.init();
if (multiMainInstancePublisher.isFollower) return;
}
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
clearInterval(this.softDeletionInterval); clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout); clearTimeout(this.hardDeletionTimeout);
} }
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) { private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
this.softDeletionInterval = setInterval( this.softDeletionInterval = setInterval(
async () => this.softDeleteOnPruningCycle(), async () => this.softDeleteOnPruningCycle(),
this.rates.softDeletion, this.rates.softDeletion,
); );
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
} }
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) { private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
this.hardDeletionTimeout = setTimeout( this.hardDeletionTimeout = setTimeout(
async () => this.hardDeleteOnPruningCycle(), async () => this.hardDeleteOnPruningCycle(),
this.rates.hardDeletion, this.rates.hardDeletion,
); );
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
} }
/** /**
* Mark executions as deleted based on age and count, in a pruning cycle. * Mark executions as deleted based on age and count, in a pruning cycle.
*/ */
async softDeleteOnPruningCycle() { async softDeleteOnPruningCycle() {
this.logger.debug('Starting soft-deletion of executions (pruning cycle)'); this.logger.debug('[Pruning] Starting soft-deletion of executions');
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount'); const maxCount = config.getEnv('executions.pruneDataMaxCount');
@ -157,8 +139,11 @@ export class PruningService {
.execute(); .execute();
if (result.affected === 0) { if (result.affected === 0) {
this.logger.debug('Found no executions to soft-delete (pruning cycle)'); this.logger.debug('[Pruning] Found no executions to soft-delete');
return;
} }
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
} }
/** /**
@ -187,21 +172,23 @@ export class PruningService {
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId); const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
if (executionIds.length === 0) { if (executionIds.length === 0) {
this.logger.debug('Found no executions to hard-delete (pruning cycle)'); this.logger.debug('[Pruning] Found no executions to hard-delete');
this.scheduleHardDeletion(); this.scheduleHardDeletion();
return; return;
} }
try { try {
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', { this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds, executionIds,
}); });
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.executionRepository.delete({ id: In(executionIds) }); await this.executionRepository.delete({ id: In(executionIds) });
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) { } catch (error) {
this.logger.error('Failed to hard-delete executions (pruning cycle)', { this.logger.error('[Pruning] Failed to hard-delete executions', {
executionIds, executionIds,
error: error instanceof Error ? error.message : `${error}`, error: error instanceof Error ? error.message : `${error}`,
}); });

View file

@ -6,7 +6,9 @@ export type RedisServiceCommand =
| 'restartEventBus' | 'restartEventBus'
| 'stopWorker' | 'stopWorker'
| 'reloadLicense' | 'reloadLicense'
| 'reloadExternalSecretsProviders'; | 'reloadExternalSecretsProviders'
| 'workflowActiveStateChanged' // multi-main only
| 'workflowFailedToActivate'; // multi-main only
/** /**
* An object to be sent via Redis pub/sub from the main process to the workers. * An object to be sent via Redis pub/sub from the main process to the workers.
@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = {
| { | {
command: 'stopWorker'; command: 'stopWorker';
} }
| {
command: 'workflowActiveStateChanged';
payload: {
oldState: boolean;
newState: boolean;
workflowId: string;
};
}
); );
export type RedisServiceCommandObject = { export type RedisServiceCommandObject = {

View file

@ -30,6 +30,7 @@ import { isStringArray, isWorkflowIdValid } from '@/utils';
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService } from 'n8n-core';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository'; import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository';
@ -212,6 +213,8 @@ export class WorkflowsService {
); );
} }
const oldState = shared.workflow.active;
if ( if (
!forceSave && !forceSave &&
workflow.versionId !== '' && workflow.versionId !== '' &&
@ -255,9 +258,14 @@ export class WorkflowsService {
await Container.get(ExternalHooks).run('workflow.update', [workflow]); await Container.get(ExternalHooks).run('workflow.update', [workflow]);
/**
* If the workflow being updated is stored as `active`, remove it from
* active workflows in memory, and re-add it after the update.
*
* If a trigger or poller in the workflow was updated, the new value
* will take effect only on removing and re-adding.
*/
if (shared.workflow.active) { if (shared.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await Container.get(ActiveWorkflowRunner).remove(workflowId); await Container.get(ActiveWorkflowRunner).remove(workflowId);
} }
@ -364,6 +372,21 @@ export class WorkflowsService {
} }
} }
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
const multiMainSetup = Container.get(MultiMainSetup);
await multiMainSetup.init();
if (multiMainSetup.isEnabled) {
await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({
workflowId,
oldState,
newState: updatedWorkflow.active,
versionId: shared.workflow.versionId,
});
}
}
return updatedWorkflow; return updatedWorkflow;
} }

View file

@ -17,10 +17,9 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import type { WebhookEntity } from '@db/entities/WebhookEntity'; import type { WebhookEntity } from '@db/entities/WebhookEntity';
import { NodeTypes } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
import { mockInstance } from '../shared/mocking';
import { chooseRandomly } from './shared/random'; import { chooseRandomly } from './shared/random';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../shared/mocking';
import { setSchedulerAsLoadedNode } from './shared/utils'; import { setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb'; import * as testDb from './shared/testDb';
import { createOwner } from './shared/db/users'; import { createOwner } from './shared/db/users';
@ -30,9 +29,13 @@ mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows); mockInstance(ActiveWorkflows);
mockInstance(Push); mockInstance(Push);
mockInstance(SecretsHelper); mockInstance(SecretsHelper);
mockInstance(MultiMainInstancePublisher);
const webhookService = mockInstance(WebhookService); const webhookService = mockInstance(WebhookService);
const multiMainSetup = mockInstance(MultiMainSetup, {
isEnabled: false,
isLeader: false,
isFollower: false,
});
setSchedulerAsLoadedNode(); setSchedulerAsLoadedNode();
@ -230,7 +233,7 @@ describe('executeErrorWorkflow()', () => {
describe('add()', () => { describe('add()', () => {
describe('in single-main scenario', () => { describe('in single-main scenario', () => {
test('leader should add webhooks, triggers and pollers', async () => { test('should add webhooks, triggers and pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await createWorkflow({ active: true }, owner); const workflow = await createWorkflow({ active: true }, owner);
@ -252,17 +255,20 @@ describe('add()', () => {
describe('in multi-main scenario', () => { describe('in multi-main scenario', () => {
describe('leader', () => { describe('leader', () => {
test('on regular activation mode, leader should add webhooks only', async () => { describe('on non-leadership-change activation mode', () => {
test('should add webhooks only', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await createWorkflow({ active: true }, owner); const workflow = await createWorkflow({ active: true }, owner);
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', true);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init(); await activeWorkflowRunner.init();
addWebhooksSpy.mockReset(); addWebhooksSpy.mockReset();
@ -273,18 +279,22 @@ describe('add()', () => {
expect(addWebhooksSpy).toHaveBeenCalledTimes(1); expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
}); });
});
test('on activation via leadership change, leader should add triggers and pollers only', async () => { describe('on leadership change activation mode', () => {
test('should add triggers and pollers only', async () => {
const mode = 'leadershipChange'; const mode = 'leadershipChange';
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await createWorkflow({ active: true }, owner); const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init(); await activeWorkflowRunner.init();
addWebhooksSpy.mockReset(); addWebhooksSpy.mockReset();
@ -296,19 +306,23 @@ describe('add()', () => {
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
}); });
}); });
});
describe('follower', () => { describe('follower', () => {
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => { describe('on any activation mode', () => {
test('should not add webhooks, triggers or pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true); jest.replaceProperty(multiMainSetup, 'isEnabled', true);
jest.replaceProperty(multiMainSetup, 'isLeader', false);
mockInstance(MultiMainInstancePublisher, { isLeader: false });
const workflow = await createWorkflow({ active: true }, owner); const workflow = await createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers'); const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowRunner,
'addTriggersAndPollers',
);
await activeWorkflowRunner.init(); await activeWorkflowRunner.init();
addWebhooksSpy.mockReset(); addWebhooksSpy.mockReset();
@ -321,6 +335,7 @@ describe('add()', () => {
}); });
}); });
}); });
});
}); });
describe('addWebhooks()', () => { describe('addWebhooks()', () => {

View file

@ -1,55 +0,0 @@
import * as Config from '@oclif/config';
import { DataSource } from 'typeorm';
import { Start } from '@/commands/start';
import { BaseCommand } from '@/commands/BaseCommand';
import config from '@/config';
import { License } from '@/License';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { RedisService } from '@/services/redis.service';
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { mockInstance } from '../../shared/mocking';
const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
beforeAll(() => {
mockInstance(DataSource);
mockInstance(ExternalSecretsManager);
mockInstance(ActiveWorkflowRunner);
mockInstance(WorkflowHistoryManager);
mockInstance(RedisService);
mockInstance(RedisServicePubSubPublisher);
mockInstance(RedisServicePubSubSubscriber);
mockInstance(MultiMainInstancePublisher);
mockInstance(OrchestrationHandlerMainService);
});
afterEach(() => {
config.load(config.default);
jest.restoreAllMocks();
});
test('should not init license if instance is follower in multi-main scenario', async () => {
config.set('executions.mode', 'queue');
config.set('endpoints.disableUi', true);
config.set('leaderSelection.enabled', true);
jest.spyOn(MultiMainInstancePublisher.prototype, 'isFollower', 'get').mockReturnValue(true);
jest.spyOn(BaseCommand.prototype, 'init').mockImplementation(async () => {});
const licenseMock = mockInstance(License, {
isMultipleMainInstancesLicensed: jest.fn().mockReturnValue(true),
});
const startCmd = new Start([], oclifConfig);
await startCmd.init();
expect(licenseMock.init).not.toHaveBeenCalled();
});

View file

@ -16,6 +16,7 @@ import { PostHogClient } from '@/posthog';
import { RedisService } from '@/services/redis.service'; import { RedisService } from '@/services/redis.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../../shared/mocking'; import { mockInstance } from '../../shared/mocking';
@ -37,6 +38,7 @@ beforeAll(async () => {
mockInstance(RedisService); mockInstance(RedisService);
mockInstance(RedisServicePubSubPublisher); mockInstance(RedisServicePubSubPublisher);
mockInstance(RedisServicePubSubSubscriber); mockInstance(RedisServicePubSubSubscriber);
mockInstance(MultiMainSetup);
}); });
test('worker initializes all its components', async () => { test('worker initializes all its components', async () => {

View file

@ -16,6 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { SettingsRepository } from '@db/repositories/settings.repository'; import { SettingsRepository } from '@db/repositories/settings.repository';
import { mockNodeTypesData } from '../../../unit/Helpers'; import { mockNodeTypesData } from '../../../unit/Helpers';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { mockInstance } from '../../../shared/mocking'; import { mockInstance } from '../../../shared/mocking';
export { setupTestServer } from './testServer'; export { setupTestServer } from './testServer';
@ -28,6 +29,8 @@ export { setupTestServer } from './testServer';
* Initialize node types. * Initialize node types.
*/ */
export async function initActiveWorkflowRunner() { export async function initActiveWorkflowRunner() {
mockInstance(MultiMainSetup);
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner'); const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
const workflowRunner = Container.get(ActiveWorkflowRunner); const workflowRunner = Container.get(ActiveWorkflowRunner);
await workflowRunner.init(); await workflowRunner.init();

View file

@ -0,0 +1,62 @@
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as testDb from './shared/testDb';
import { WorkflowsService } from '@/workflows/workflows.services';
import { mockInstance } from '../shared/mocking';
import { Telemetry } from '@/telemetry';
import { createOwner } from './shared/db/users';
import { createWorkflow } from './shared/db/workflows';
mockInstance(Telemetry);
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
beforeAll(async () => {
await testDb.init();
});
afterEach(async () => {
await testDb.truncate(['Workflow']);
jest.restoreAllMocks();
});
afterAll(async () => {
await testDb.terminate();
});
describe('update()', () => {
test('should remove and re-add to active workflows on `active: true` payload', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
await WorkflowsService.update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0];
expect(removedWorkflowId).toBe(workflow.id);
expect(addSpy).toHaveBeenCalledTimes(1);
const [addedWorkflowId, activationMode] = addSpy.mock.calls[0];
expect(addedWorkflowId).toBe(workflow.id);
expect(activationMode).toBe('update');
});
test('should remove from active workflows on `active: false` payload', async () => {
const owner = await createOwner();
const workflow = await createWorkflow({ active: true }, owner);
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
workflow.active = false;
await WorkflowsService.update(owner, workflow, workflow.id);
expect(removeSpy).toHaveBeenCalledTimes(1);
const [removedWorkflowId] = removeSpy.mock.calls[0];
expect(removedWorkflowId).toBe(workflow.id);
expect(addSpy).not.toHaveBeenCalled();
});
});

View file

@ -6,6 +6,7 @@ import { License } from '@/License';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { N8N_VERSION } from '@/constants'; import { N8N_VERSION } from '@/constants';
import { mockInstance } from '../shared/mocking'; import { mockInstance } from '../shared/mocking';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
jest.mock('@n8n_io/license-sdk'); jest.mock('@n8n_io/license-sdk');
@ -27,9 +28,10 @@ describe('License', () => {
let license: License; let license: License;
const logger = mockInstance(Logger); const logger = mockInstance(Logger);
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID }); const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
const multiMainSetup = mockInstance(MultiMainSetup);
beforeEach(async () => { beforeEach(async () => {
license = new License(logger, instanceSettings, mock(), mock()); license = new License(logger, instanceSettings, mock(), mock(), mock());
await license.init(); await license.init();
}); });
@ -52,7 +54,7 @@ describe('License', () => {
}); });
test('initializes license manager for worker', async () => { test('initializes license manager for worker', async () => {
license = new License(logger, instanceSettings, mock(), mock()); license = new License(logger, instanceSettings, mock(), mock(), mock());
await license.init('worker'); await license.init('worker');
expect(LicenseManager).toHaveBeenCalledWith({ expect(LicenseManager).toHaveBeenCalledWith({
autoRenewEnabled: false, autoRenewEnabled: false,

View file

@ -1,6 +1,6 @@
import Container from 'typedi'; import Container from 'typedi';
import config from '@/config'; import config from '@/config';
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
import { eventBus } from '@/eventbus'; import { eventBus } from '@/eventbus';
import { RedisService } from '@/services/redis.service'; import { RedisService } from '@/services/redis.service';
@ -10,10 +10,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o
import * as helpers from '@/services/orchestration/helpers'; import * as helpers from '@/services/orchestration/helpers';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { Logger } from '@/Logger'; import { Logger } from '@/Logger';
import { Push } from '@/push';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { mockInstance } from '../../shared/mocking'; import { mockInstance } from '../../shared/mocking';
const os = Container.get(SingleMainInstancePublisher); const os = Container.get(SingleMainSetup);
const handler = Container.get(OrchestrationHandlerMainService); const handler = Container.get(OrchestrationHandlerMainService);
mockInstance(ActiveWorkflowRunner);
let queueModeId: string; let queueModeId: string;
@ -33,6 +36,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
describe('Orchestration Service', () => { describe('Orchestration Service', () => {
const logger = mockInstance(Logger); const logger = mockInstance(Logger);
mockInstance(Push);
beforeAll(async () => { beforeAll(async () => {
mockInstance(RedisService); mockInstance(RedisService);
mockInstance(ExternalSecretsManager); mockInstance(ExternalSecretsManager);

View file

@ -421,7 +421,25 @@ export type IPushData =
| PushDataRemoveNodeType | PushDataRemoveNodeType
| PushDataTestWebhook | PushDataTestWebhook
| PushDataExecutionRecovered | PushDataExecutionRecovered
| PushDataWorkerStatusMessage; | PushDataWorkerStatusMessage
| PushDataActiveWorkflowAdded
| PushDataActiveWorkflowRemoved
| PushDataWorkflowFailedToActivate;
type PushDataActiveWorkflowAdded = {
data: IActiveWorkflowAdded;
type: 'workflowActivated';
};
type PushDataActiveWorkflowRemoved = {
data: IActiveWorkflowRemoved;
type: 'workflowDeactivated';
};
type PushDataWorkflowFailedToActivate = {
data: IWorkflowFailedToActivate;
type: 'workflowFailedToActivate';
};
type PushDataExecutionRecovered = { type PushDataExecutionRecovered = {
data: IPushDataExecutionRecovered; data: IPushDataExecutionRecovered;
@ -491,6 +509,19 @@ export interface IPushDataExecutionFinished {
retryOf?: string; retryOf?: string;
} }
export interface IActiveWorkflowAdded {
workflowId: string;
}
export interface IActiveWorkflowRemoved {
workflowId: string;
}
export interface IWorkflowFailedToActivate {
workflowId: string;
errorMessage: string;
}
export interface IPushDataUnsavedExecutionFinished { export interface IPushDataUnsavedExecutionFinished {
executionId: string; executionId: string;
data: { finished: true; stoppedAt: Date }; data: { finished: true; stoppedAt: Date };

View file

@ -119,7 +119,7 @@ export default defineComponent({
} else { } else {
errorMessage = this.$locale.baseText( errorMessage = this.$locale.baseText(
'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined', 'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined',
{ interpolate: { message: errorData.error.message } }, { interpolate: { message: errorData } },
); );
} }
} catch (error) { } catch (error) {

View file

@ -291,6 +291,33 @@ export const pushConnection = defineComponent({
} }
} }
if (
receivedData.type === 'workflowFailedToActivate' &&
this.workflowsStore.workflowId === receivedData.data.workflowId
) {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
this.workflowsStore.setActive(false);
this.showError(
new Error(receivedData.data.errorMessage),
this.$locale.baseText('workflowActivator.showError.title', {
interpolate: { newStateName: 'activated' },
}) + ':',
);
return true;
}
if (receivedData.type === 'workflowActivated') {
this.workflowsStore.setWorkflowActive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'workflowDeactivated') {
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
return true;
}
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') { if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
// The workflow finished executing // The workflow finished executing
let pushData: IPushDataExecutionFinished; let pushData: IPushDataExecutionFinished;

View file

@ -10,7 +10,6 @@ import {
} from '@/constants'; } from '@/constants';
import type { import type {
ExecutionsQueryFilter, ExecutionsQueryFilter,
IActivationError,
IExecutionDeleteFilter, IExecutionDeleteFilter,
IExecutionPushResponse, IExecutionPushResponse,
IExecutionResponse, IExecutionResponse,
@ -365,7 +364,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
}); });
}, },
async getActivationError(id: string): Promise<IActivationError | undefined> { async getActivationError(id: string): Promise<string | undefined> {
const rootStore = useRootStore(); const rootStore = useRootStore();
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`); return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
}, },
@ -552,6 +551,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
if (this.workflowsById[workflowId]) { if (this.workflowsById[workflowId]) {
this.workflowsById[workflowId].active = true; this.workflowsById[workflowId].active = true;
} }
if (workflowId === this.workflow.id) {
this.setActive(true);
}
}, },
setWorkflowInactive(workflowId: string): void { setWorkflowInactive(workflowId: string): void {
@ -562,6 +564,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
if (this.workflowsById[workflowId]) { if (this.workflowsById[workflowId]) {
this.workflowsById[workflowId].active = false; this.workflowsById[workflowId].active = false;
} }
if (workflowId === this.workflow.id) {
this.setActive(false);
}
}, },
async fetchActiveWorkflows(): Promise<string[]> { async fetchActiveWorkflows(): Promise<string[]> {