mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-09 22:24:05 -08:00
feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario (#7643)
Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926 ### Manual workflow activation and deactivation In a multi-main scenario, if the user manually activates or deactivates a workflow, the process (whether leader or follower) that handles the PATCH request and updates its internal state should send a message into the command channel, so that all other main processes update their internal state accordingly: - Add to `ActiveWorkflows` if activating - Remove from `ActiveWorkflows` if deactivating - Remove and re-add to `ActiveWorkflows` if the update did not change activation status. After updating their internal state, if activating or deactivating, the recipient main processes should push a message to all connected frontends so that these can update their stores and so reflect the value in the UI. ### Workflow activation errors On failure to activate a workflow, the main instance should record the error in Redis - main instances should always pull activation errors from Redis in a multi-main scenario. ### Leadership change On leadership change... - The old leader should stop pruning and the new leader should start pruning. - The old leader should remove trigger- and poller-based workflows and the new leader should add them.
This commit is contained in:
parent
b3a3f16bc2
commit
4c4082503c
52
packages/cli/src/ActivationErrors.service.ts
Normal file
52
packages/cli/src/ActivationErrors.service.ts
Normal file
|
@ -0,0 +1,52 @@
|
|||
import { Service } from 'typedi';
|
||||
import { CacheService } from './services/cache.service';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
|
||||
type ActivationErrors = {
|
||||
[workflowId: string]: string; // error message
|
||||
};
|
||||
|
||||
@Service()
|
||||
export class ActivationErrorsService {
|
||||
private readonly cacheKey = 'workflow-activation-errors';
|
||||
|
||||
constructor(private readonly cacheService: CacheService) {}
|
||||
|
||||
async set(workflowId: string, errorMessage: string) {
|
||||
const errors = await this.getAll();
|
||||
|
||||
errors[workflowId] = errorMessage;
|
||||
|
||||
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
|
||||
}
|
||||
|
||||
async unset(workflowId: string) {
|
||||
const errors = await this.getAll();
|
||||
|
||||
if (Object.keys(errors).length === 0) return;
|
||||
|
||||
delete errors[workflowId];
|
||||
|
||||
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
|
||||
}
|
||||
|
||||
async get(workflowId: string) {
|
||||
const errors = await this.getAll();
|
||||
|
||||
if (Object.keys(errors).length === 0) return null;
|
||||
|
||||
return errors[workflowId];
|
||||
}
|
||||
|
||||
async getAll() {
|
||||
const errors = await this.cacheService.get<string>(this.cacheKey);
|
||||
|
||||
if (!errors) return {};
|
||||
|
||||
return jsonParse<ActivationErrors>(errors);
|
||||
}
|
||||
|
||||
async clearAll() {
|
||||
await this.cacheService.delete(this.cacheKey);
|
||||
}
|
||||
}
|
|
@ -2,8 +2,9 @@
|
|||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
|
||||
import Container, { Service } from 'typedi';
|
||||
import { Service } from 'typedi';
|
||||
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
||||
import config from '@/config';
|
||||
|
||||
import type {
|
||||
ExecutionError,
|
||||
|
@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
|
|||
import { Logger } from './Logger';
|
||||
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
import config from '@/config';
|
||||
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { ActivationErrorsService } from '@/ActivationErrors.service';
|
||||
|
||||
const WEBHOOK_PROD_UNREGISTERED_HINT =
|
||||
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
|
||||
|
@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
|
|||
export class ActiveWorkflowRunner implements IWebhookManager {
|
||||
activeWorkflows = new ActiveWorkflows();
|
||||
|
||||
private activationErrors: {
|
||||
[workflowId: string]: {
|
||||
time: number; // ms
|
||||
error: {
|
||||
message: string;
|
||||
};
|
||||
};
|
||||
} = {};
|
||||
|
||||
private queuedActivations: {
|
||||
[workflowId: string]: {
|
||||
activationMode: WorkflowActivateMode;
|
||||
|
@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
};
|
||||
} = {};
|
||||
|
||||
isMultiMainScenario =
|
||||
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
||||
|
||||
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
|
@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
private readonly webhookService: WebhookService,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
||||
private readonly multiMainSetup: MultiMainSetup,
|
||||
private readonly activationErrorsService: ActivationErrorsService,
|
||||
) {}
|
||||
|
||||
async init() {
|
||||
if (this.isMultiMainScenario) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await this.multiMainInstancePublisher.init();
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
await this.multiMainSetup.init();
|
||||
}
|
||||
|
||||
await this.addActiveWorkflows('init');
|
||||
|
@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
async allActiveInStorage(user?: User) {
|
||||
const isFullAccess = !user || user.globalRole.name === 'owner';
|
||||
|
||||
const activationErrors = await this.activationErrorsService.getAll();
|
||||
|
||||
if (isFullAccess) {
|
||||
const activeWorkflows = await this.workflowRepository.find({
|
||||
select: ['id'],
|
||||
|
@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
|
||||
return activeWorkflows
|
||||
.map((workflow) => workflow.id)
|
||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
||||
.filter((workflowId) => !activationErrors[workflowId]);
|
||||
}
|
||||
|
||||
const where = whereClause({
|
||||
|
@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
|
||||
return sharings
|
||||
.map((sharing) => sharing.workflowId)
|
||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
||||
.filter((workflowId) => !activationErrors[workflowId]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
/**
|
||||
* Return error if there was a problem activating the workflow
|
||||
*/
|
||||
getActivationError(workflowId: string) {
|
||||
return this.activationErrors[workflowId];
|
||||
async getActivationError(workflowId: string) {
|
||||
return this.activationErrorsService.get(workflowId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
// Remove the workflow as "active"
|
||||
|
||||
void this.activeWorkflows.remove(workflowData.id);
|
||||
this.activationErrors[workflowData.id] = {
|
||||
time: new Date().getTime(),
|
||||
error: {
|
||||
message: error.message,
|
||||
},
|
||||
};
|
||||
|
||||
void this.activationErrorsService.set(workflowData.id, error.message);
|
||||
|
||||
// Run Error Workflow if defined
|
||||
const activationError = new WorkflowActivationError(
|
||||
|
@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
this.logger.verbose('Finished activating workflows (startup)');
|
||||
}
|
||||
|
||||
async addAllTriggerAndPollerBasedWorkflows() {
|
||||
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
|
||||
async clearAllActivationErrors() {
|
||||
await this.activationErrorsService.clearAll();
|
||||
}
|
||||
|
||||
async addAllTriggerAndPollerBasedWorkflows() {
|
||||
await this.addActiveWorkflows('leadershipChange');
|
||||
}
|
||||
|
||||
async removeAllTriggerAndPollerBasedWorkflows() {
|
||||
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
|
||||
|
||||
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
|
||||
}
|
||||
|
||||
|
@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
let shouldAddWebhooks = true;
|
||||
let shouldAddTriggersAndPollers = true;
|
||||
|
||||
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
|
||||
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
|
||||
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
|
||||
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
|
||||
shouldAddWebhooks = this.multiMainSetup.isLeader;
|
||||
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
|
||||
}
|
||||
|
||||
if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
|
||||
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
|
||||
shouldAddWebhooks = false;
|
||||
shouldAddTriggersAndPollers = true;
|
||||
}
|
||||
|
@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
|
||||
|
||||
if (shouldAddWebhooks) {
|
||||
this.logger.debug('============');
|
||||
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
|
||||
this.logger.debug('============');
|
||||
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);
|
||||
|
||||
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
||||
}
|
||||
|
||||
if (shouldAddTriggersAndPollers) {
|
||||
this.logger.debug('============');
|
||||
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
|
||||
this.logger.debug('============');
|
||||
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
|
||||
|
||||
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||
activationMode,
|
||||
|
@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
// Workflow got now successfully activated so make sure nothing is left in the queue
|
||||
this.removeQueuedWorkflowActivation(workflowId);
|
||||
|
||||
if (this.activationErrors[workflowId]) {
|
||||
delete this.activationErrors[workflowId];
|
||||
}
|
||||
await this.activationErrorsService.unset(workflowId);
|
||||
|
||||
const triggerCount = this.countTriggers(workflow, additionalData);
|
||||
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||
} catch (error) {
|
||||
this.activationErrors[workflowId] = {
|
||||
time: new Date().getTime(),
|
||||
error: {
|
||||
message: error.message,
|
||||
},
|
||||
};
|
||||
} catch (e) {
|
||||
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||
await this.activationErrorsService.set(workflowId, error.message);
|
||||
|
||||
throw error;
|
||||
throw e;
|
||||
}
|
||||
|
||||
// If for example webhooks get created it sometimes has to save the
|
||||
|
@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
);
|
||||
}
|
||||
|
||||
if (this.activationErrors[workflowId] !== undefined) {
|
||||
// If there were any activation errors delete them
|
||||
delete this.activationErrors[workflowId];
|
||||
}
|
||||
await this.activationErrorsService.unset(workflowId);
|
||||
|
||||
if (this.queuedActivations[workflowId] !== undefined) {
|
||||
this.removeQueuedWorkflowActivation(workflowId);
|
||||
|
@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
async removeActivationError(workflowId: string) {
|
||||
await this.activationErrorsService.unset(workflowId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ import {
|
|||
import { License } from '@/License';
|
||||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
|
||||
@Service()
|
||||
export class ExternalSecretsManager {
|
||||
|
@ -82,7 +82,7 @@ export class ExternalSecretsManager {
|
|||
}
|
||||
|
||||
async broadcastReloadExternalSecretsProviders() {
|
||||
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
|
||||
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
|
||||
}
|
||||
|
||||
private decryptSecretsSettings(value: string): ExternalSecretsSettings {
|
||||
|
|
|
@ -298,6 +298,7 @@ export interface IDiagnosticInfo {
|
|||
ldap_allowed: boolean;
|
||||
saml_enabled: boolean;
|
||||
binary_data_s3: boolean;
|
||||
multi_main_setup_enabled: boolean;
|
||||
licensePlanName?: string;
|
||||
licenseTenantId?: number;
|
||||
}
|
||||
|
@ -469,7 +470,25 @@ export type IPushData =
|
|||
| PushDataNodeDescriptionUpdated
|
||||
| PushDataExecutionRecovered
|
||||
| PushDataActiveWorkflowUsersChanged
|
||||
| PushDataWorkerStatusMessage;
|
||||
| PushDataWorkerStatusMessage
|
||||
| PushDataWorkflowActivated
|
||||
| PushDataWorkflowDeactivated
|
||||
| PushDataWorkflowFailedToActivate;
|
||||
|
||||
type PushDataWorkflowFailedToActivate = {
|
||||
data: IWorkflowFailedToActivate;
|
||||
type: 'workflowFailedToActivate';
|
||||
};
|
||||
|
||||
type PushDataWorkflowActivated = {
|
||||
data: IActiveWorkflowChanged;
|
||||
type: 'workflowActivated';
|
||||
};
|
||||
|
||||
type PushDataWorkflowDeactivated = {
|
||||
data: IActiveWorkflowChanged;
|
||||
type: 'workflowDeactivated';
|
||||
};
|
||||
|
||||
type PushDataActiveWorkflowUsersChanged = {
|
||||
data: IActiveWorkflowUsersChanged;
|
||||
|
@ -536,11 +555,24 @@ export interface IActiveWorkflowUser {
|
|||
lastSeen: Date;
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowAdded {
|
||||
workflowId: Workflow['id'];
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowUsersChanged {
|
||||
workflowId: Workflow['id'];
|
||||
activeUsers: IActiveWorkflowUser[];
|
||||
}
|
||||
|
||||
interface IActiveWorkflowChanged {
|
||||
workflowId: Workflow['id'];
|
||||
}
|
||||
|
||||
interface IWorkflowFailedToActivate {
|
||||
workflowId: Workflow['id'];
|
||||
errorMessage: string;
|
||||
}
|
||||
|
||||
export interface IPushDataExecutionRecovered {
|
||||
executionId: string;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
|||
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
||||
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
||||
import { RedisService } from './services/redis.service';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
|
||||
type FeatureReturnType = Partial<
|
||||
{
|
||||
|
@ -40,6 +41,7 @@ export class License {
|
|||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly multiMainSetup: MultiMainSetup,
|
||||
private readonly settingsRepository: SettingsRepository,
|
||||
private readonly workflowRepository: WorkflowRepository,
|
||||
) {}
|
||||
|
@ -49,6 +51,10 @@ export class License {
|
|||
return;
|
||||
}
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
await this.multiMainSetup.init();
|
||||
}
|
||||
|
||||
const isMainInstance = instanceType === 'main';
|
||||
const server = config.getEnv('license.serverUrl');
|
||||
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
|
||||
|
@ -114,22 +120,28 @@ export class License {
|
|||
}
|
||||
|
||||
async onFeatureChange(_features: TFeatures): Promise<void> {
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
if (config.getEnv('leaderSelection.enabled')) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
|
||||
| boolean
|
||||
| undefined;
|
||||
|
||||
this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false);
|
||||
|
||||
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
|
||||
this.logger.debug(
|
||||
'[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...',
|
||||
);
|
||||
|
||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await multiMainInstancePublisher.init();
|
||||
|
||||
if (multiMainInstancePublisher.isFollower) {
|
||||
this.logger.debug('Instance is follower, skipping sending of reloadLicense command...');
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) {
|
||||
this.logger.debug(
|
||||
'[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue') {
|
||||
if (!this.redisPublisher) {
|
||||
this.logger.debug('Initializing Redis publisher for License Service');
|
||||
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
|
||||
|
|
|
@ -215,6 +215,7 @@ export class Server extends AbstractServer {
|
|||
ldap_allowed: isLdapCurrentAuthenticationMethod(),
|
||||
saml_enabled: isSamlCurrentAuthenticationMethod(),
|
||||
binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
|
||||
multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'),
|
||||
licensePlanName: Container.get(License).getPlanName(),
|
||||
licenseTenantId: config.getEnv('license.tenantId'),
|
||||
};
|
||||
|
@ -448,7 +449,7 @@ export class Server extends AbstractServer {
|
|||
// Returns if the workflow with the given id had any activation errors
|
||||
this.app.get(
|
||||
`/${this.restEndpoint}/active/error/:id`,
|
||||
ResponseHelper.send(async (req: WorkflowRequest.GetAllActivationErrors) => {
|
||||
ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
|
||||
const { id: workflowId } = req.params;
|
||||
|
||||
const shared = await Container.get(SharedWorkflowRepository).findOne({
|
||||
|
|
|
@ -243,21 +243,6 @@ export abstract class BaseCommand extends Command {
|
|||
}
|
||||
|
||||
async initLicense(): Promise<void> {
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await multiMainInstancePublisher.init();
|
||||
|
||||
if (multiMainInstancePublisher.isFollower) {
|
||||
this.logger.debug('Instance is follower, skipping license initialization...');
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const license = Container.get(License);
|
||||
await license.init(this.instanceType ?? 'main');
|
||||
|
||||
|
|
|
@ -25,9 +25,10 @@ import { BaseCommand } from './BaseCommand';
|
|||
import { InternalHooks } from '@/InternalHooks';
|
||||
import { License, FeatureNotLicensedError } from '@/License';
|
||||
import { IConfig } from '@oclif/config';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||
import { PruningService } from '@/services/pruning.service';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { SettingsRepository } from '@db/repositories/settings.repository';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
|
||||
|
@ -112,18 +113,14 @@ export class Start extends BaseCommand {
|
|||
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
|
||||
await Container.get(License).shutdown();
|
||||
|
||||
if (await this.pruningService.isPruningEnabled()) {
|
||||
await this.pruningService.stopPruning();
|
||||
if (this.pruningService.isPruningEnabled()) {
|
||||
this.pruningService.stopPruning();
|
||||
}
|
||||
|
||||
if (config.getEnv('leaderSelection.enabled')) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||
|
||||
await Container.get(MultiMainInstancePublisher).destroy();
|
||||
await Container.get(MultiMainSetup).shutdown();
|
||||
}
|
||||
|
||||
await Container.get(InternalHooks).onN8nStop();
|
||||
|
@ -230,38 +227,42 @@ export class Start extends BaseCommand {
|
|||
}
|
||||
|
||||
async initOrchestration() {
|
||||
if (config.get('executions.mode') !== 'queue') return;
|
||||
if (config.getEnv('executions.mode') !== 'queue') return;
|
||||
|
||||
if (!config.get('leaderSelection.enabled')) {
|
||||
await Container.get(SingleMainInstancePublisher).init();
|
||||
// queue mode in single-main scenario
|
||||
|
||||
if (!config.getEnv('multiMainSetup.enabled')) {
|
||||
await Container.get(SingleMainSetup).init();
|
||||
await Container.get(OrchestrationHandlerMainService).init();
|
||||
return;
|
||||
}
|
||||
|
||||
// multi-main scenario
|
||||
// queue mode in multi-main scenario
|
||||
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await multiMainInstancePublisher.init();
|
||||
|
||||
if (
|
||||
multiMainInstancePublisher.isLeader &&
|
||||
!Container.get(License).isMultipleMainInstancesLicensed()
|
||||
) {
|
||||
if (!Container.get(License).isMultipleMainInstancesLicensed()) {
|
||||
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
|
||||
}
|
||||
|
||||
await Container.get(OrchestrationHandlerMainService).init();
|
||||
|
||||
multiMainInstancePublisher.on('leadershipChange', async () => {
|
||||
if (multiMainInstancePublisher.isLeader) {
|
||||
const multiMainSetup = Container.get(MultiMainSetup);
|
||||
|
||||
await multiMainSetup.init();
|
||||
|
||||
multiMainSetup.on('leadershipChange', async () => {
|
||||
if (multiMainSetup.isLeader) {
|
||||
this.logger.debug('[Leadership change] Clearing all activation errors...');
|
||||
|
||||
await this.activeWorkflowRunner.clearAllActivationErrors();
|
||||
|
||||
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
|
||||
|
||||
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
|
||||
} else {
|
||||
// only in case of leadership change without shutdown
|
||||
this.logger.debug(
|
||||
'[Leadership change] Removing all trigger- and poller-based workflows...',
|
||||
);
|
||||
|
||||
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||
}
|
||||
});
|
||||
|
@ -333,10 +334,7 @@ export class Start extends BaseCommand {
|
|||
|
||||
await this.server.start();
|
||||
|
||||
this.pruningService = Container.get(PruningService);
|
||||
if (await this.pruningService.isPruningEnabled()) {
|
||||
this.pruningService.startPruning();
|
||||
}
|
||||
await this.initPruning();
|
||||
|
||||
// Start to get active workflows and run their triggers
|
||||
await this.activeWorkflowRunner.init();
|
||||
|
@ -375,6 +373,32 @@ export class Start extends BaseCommand {
|
|||
}
|
||||
}
|
||||
|
||||
async initPruning() {
|
||||
this.pruningService = Container.get(PruningService);
|
||||
|
||||
if (this.pruningService.isPruningEnabled()) {
|
||||
this.pruningService.startPruning();
|
||||
}
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
const multiMainSetup = Container.get(MultiMainSetup);
|
||||
|
||||
await multiMainSetup.init();
|
||||
|
||||
multiMainSetup.on('leadershipChange', async () => {
|
||||
if (multiMainSetup.isLeader) {
|
||||
if (this.pruningService.isPruningEnabled()) {
|
||||
this.pruningService.startPruning();
|
||||
}
|
||||
} else {
|
||||
if (this.pruningService.isPruningEnabled()) {
|
||||
this.pruningService.stopPruning();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async catch(error: Error) {
|
||||
console.log(error.stack);
|
||||
await this.exitWithCrash('Exiting due to an error.', error);
|
||||
|
|
|
@ -1324,24 +1324,29 @@ export const schema = {
|
|||
},
|
||||
},
|
||||
|
||||
leaderSelection: {
|
||||
multiMainSetup: {
|
||||
instanceType: {
|
||||
doc: 'Type of instance in multi-main setup',
|
||||
format: ['unset', 'leader', 'follower'] as const,
|
||||
default: 'unset', // only until first leader key check
|
||||
},
|
||||
enabled: {
|
||||
doc: 'Whether to enable leader selection for multiple main instances (license required)',
|
||||
doc: 'Whether to enable multi-main setup for queue mode (license required)',
|
||||
format: Boolean,
|
||||
default: false,
|
||||
env: 'N8N_LEADER_SELECTION_ENABLED',
|
||||
env: 'N8N_MULTI_MAIN_SETUP_ENABLED',
|
||||
},
|
||||
ttl: {
|
||||
doc: 'Time to live in Redis for leader selection key, in seconds',
|
||||
doc: 'Time to live (in seconds) for leader key in multi-main setup',
|
||||
format: Number,
|
||||
default: 10,
|
||||
env: 'N8N_LEADER_SELECTION_KEY_TTL',
|
||||
env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL',
|
||||
},
|
||||
interval: {
|
||||
doc: 'Interval in Redis for leader selection check, in seconds',
|
||||
doc: 'Interval (in seconds) for leader check in multi-main setup',
|
||||
format: Number,
|
||||
default: 3,
|
||||
env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL',
|
||||
env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL',
|
||||
},
|
||||
},
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Authorized, Post, RestController } from '@/decorators';
|
||||
import { OrchestrationRequest } from '@/requests';
|
||||
import { Service } from 'typedi';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import { License } from '../License';
|
||||
|
||||
@Authorized('any')
|
||||
|
@ -9,7 +9,7 @@ import { License } from '../License';
|
|||
@Service()
|
||||
export class OrchestrationController {
|
||||
constructor(
|
||||
private readonly orchestrationService: SingleMainInstancePublisher,
|
||||
private readonly singleMainSetup: SingleMainSetup,
|
||||
private readonly licenseService: License,
|
||||
) {}
|
||||
|
||||
|
@ -21,18 +21,18 @@ export class OrchestrationController {
|
|||
async getWorkersStatus(req: OrchestrationRequest.Get) {
|
||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||
const id = req.params.id;
|
||||
return this.orchestrationService.getWorkerStatus(id);
|
||||
return this.singleMainSetup.getWorkerStatus(id);
|
||||
}
|
||||
|
||||
@Post('/worker/status')
|
||||
async getWorkersStatusAll() {
|
||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||
return this.orchestrationService.getWorkerStatus();
|
||||
return this.singleMainSetup.getWorkerStatus();
|
||||
}
|
||||
|
||||
@Post('/worker/ids')
|
||||
async getWorkerIdsAll() {
|
||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||
return this.orchestrationService.getWorkerIds();
|
||||
return this.singleMainSetup.getWorkerIds();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
|
|||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import { Logger } from '@/Logger';
|
||||
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
||||
|
||||
|
@ -207,9 +207,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
this.destinations[destination.getId()] = destination;
|
||||
this.destinations[destination.getId()].startListening();
|
||||
if (notifyWorkers) {
|
||||
await Container.get(
|
||||
SingleMainInstancePublisher,
|
||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
@ -235,9 +233,7 @@ export class MessageEventBus extends EventEmitter {
|
|||
delete this.destinations[id];
|
||||
}
|
||||
if (notifyWorkers) {
|
||||
await Container.get(
|
||||
SingleMainInstancePublisher,
|
||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ export declare namespace WorkflowRequest {
|
|||
|
||||
type GetAllActive = AuthenticatedRequest;
|
||||
|
||||
type GetAllActivationErrors = Get;
|
||||
type GetActivationError = Get;
|
||||
|
||||
type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>;
|
||||
|
||||
|
|
|
@ -80,21 +80,21 @@ export class CacheService extends EventEmitter {
|
|||
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
|
||||
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
|
||||
*/
|
||||
async get(
|
||||
async get<T = unknown>(
|
||||
key: string,
|
||||
options: {
|
||||
fallbackValue?: unknown;
|
||||
refreshFunction?: (key: string) => Promise<unknown>;
|
||||
fallbackValue?: T;
|
||||
refreshFunction?: (key: string) => Promise<T>;
|
||||
refreshTtl?: number;
|
||||
} = {},
|
||||
): Promise<unknown> {
|
||||
): Promise<T | undefined> {
|
||||
if (!key || key.length === 0) {
|
||||
return;
|
||||
}
|
||||
const value = await this.cache?.store.get(key);
|
||||
if (value !== undefined) {
|
||||
this.emit(this.metricsCounterEvents.cacheHit);
|
||||
return value;
|
||||
return value as T;
|
||||
}
|
||||
this.emit(this.metricsCounterEvents.cacheMiss);
|
||||
if (options.refreshFunction) {
|
||||
|
|
|
@ -5,7 +5,7 @@ import config from '@/config';
|
|||
import { EventEmitter } from 'node:events';
|
||||
|
||||
export abstract class OrchestrationService extends EventEmitter {
|
||||
protected initialized = false;
|
||||
protected isInitialized = false;
|
||||
|
||||
protected queueModeId: string;
|
||||
|
||||
|
@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter {
|
|||
}
|
||||
|
||||
sanityCheck(): boolean {
|
||||
return this.initialized && this.isQueueMode;
|
||||
return this.isInitialized && this.isQueueMode;
|
||||
}
|
||||
|
||||
async init() {
|
||||
await this.initPublisher();
|
||||
this.initialized = true;
|
||||
this.isInitialized = true;
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
await this.redisPublisher?.destroy();
|
||||
this.initialized = false;
|
||||
this.isInitialized = false;
|
||||
}
|
||||
|
||||
protected async initPublisher() {
|
||||
|
|
|
@ -1,50 +1,61 @@
|
|||
import config from '@/config';
|
||||
import { Service } from 'typedi';
|
||||
import { TIME } from '@/constants';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
|
||||
|
||||
/**
|
||||
* For use in main instance, in multiple main instances cluster.
|
||||
*/
|
||||
@Service()
|
||||
export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||
export class MultiMainSetup extends SingleMainSetup {
|
||||
private id = this.queueModeId;
|
||||
|
||||
private leaderId: string | undefined;
|
||||
private isLicensed = false;
|
||||
|
||||
get isEnabled() {
|
||||
return (
|
||||
config.getEnv('executions.mode') === 'queue' &&
|
||||
config.getEnv('multiMainSetup.enabled') &&
|
||||
this.isLicensed
|
||||
);
|
||||
}
|
||||
|
||||
get isLeader() {
|
||||
return this.id === this.leaderId;
|
||||
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||
}
|
||||
|
||||
get isFollower() {
|
||||
return !this.isLeader;
|
||||
}
|
||||
|
||||
setLicensed(newState: boolean) {
|
||||
this.isLicensed = newState;
|
||||
}
|
||||
|
||||
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
||||
|
||||
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
|
||||
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
|
||||
|
||||
private leaderCheckInterval: NodeJS.Timer | undefined;
|
||||
|
||||
async init() {
|
||||
if (this.initialized) return;
|
||||
if (this.isInitialized) return;
|
||||
|
||||
await this.initPublisher();
|
||||
|
||||
this.initialized = true;
|
||||
this.isInitialized = true;
|
||||
|
||||
await this.tryBecomeLeader();
|
||||
await this.tryBecomeLeader(); // prevent initial wait
|
||||
|
||||
this.leaderCheckInterval = setInterval(
|
||||
async () => {
|
||||
await this.checkLeader();
|
||||
},
|
||||
config.getEnv('leaderSelection.interval') * TIME.SECOND,
|
||||
config.getEnv('multiMainSetup.interval') * TIME.SECOND,
|
||||
);
|
||||
}
|
||||
|
||||
async destroy() {
|
||||
async shutdown() {
|
||||
if (!this.isInitialized) return;
|
||||
|
||||
clearInterval(this.leaderCheckInterval);
|
||||
|
||||
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
|
||||
|
@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
|||
} else {
|
||||
this.logger.debug(`Leader is other instance "${leaderId}"`);
|
||||
|
||||
this.leaderId = leaderId;
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
}
|
||||
}
|
||||
|
||||
private async tryBecomeLeader() {
|
||||
if (this.isLeader || !this.redisPublisher.redisClient) return;
|
||||
if (
|
||||
config.getEnv('multiMainSetup.instanceType') === 'leader' ||
|
||||
!this.redisPublisher.redisClient
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
// this can only succeed if leadership is currently vacant
|
||||
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
|
||||
|
@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
|||
if (keySetSuccessfully) {
|
||||
this.logger.debug(`Leader is now this instance "${this.id}"`);
|
||||
|
||||
this.leaderId = this.id;
|
||||
|
||||
this.emit('leadershipChange', this.id);
|
||||
config.set('multiMainSetup.instanceType', 'leader');
|
||||
|
||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||
|
||||
this.emit('leadershipChange', this.id);
|
||||
} else {
|
||||
config.set('multiMainSetup.instanceType', 'follower');
|
||||
}
|
||||
}
|
||||
|
||||
async broadcastWorkflowActiveStateChanged(payload: {
|
||||
workflowId: string;
|
||||
oldState: boolean;
|
||||
newState: boolean;
|
||||
versionId: string;
|
||||
}) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'workflowActiveStateChanged',
|
||||
payload,
|
||||
});
|
||||
}
|
||||
|
||||
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
|
||||
if (!this.sanityCheck()) return;
|
||||
|
||||
await this.redisPublisher.publishToCommandChannel({
|
||||
command: 'workflowFailedToActivate',
|
||||
payload,
|
||||
});
|
||||
}
|
||||
}
|
|
@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service';
|
|||
* For use in main instance, in single main instance scenario.
|
||||
*/
|
||||
@Service()
|
||||
export class SingleMainInstancePublisher extends OrchestrationService {
|
||||
export class SingleMainSetup extends OrchestrationService {
|
||||
constructor(protected readonly logger: Logger) {
|
||||
super();
|
||||
}
|
||||
|
||||
sanityCheck() {
|
||||
return this.initialized && this.isQueueMode && this.isMainInstance;
|
||||
return this.isInitialized && this.isQueueMode && this.isMainInstance;
|
||||
}
|
||||
|
||||
async getWorkerStatus(id?: string) {
|
|
@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
|||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||
import { License } from '@/License';
|
||||
import { Logger } from '@/Logger';
|
||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import { Push } from '@/push';
|
||||
import { MultiMainSetup } from './MultiMainSetup.ee';
|
||||
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||
|
||||
export async function handleCommandMessageMain(messageString: string) {
|
||||
const queueModeId = config.get('redis.queueModeId');
|
||||
const isMainInstance = config.get('generic.instanceType') === 'main';
|
||||
const queueModeId = config.getEnv('redis.queueModeId');
|
||||
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
|
||||
const message = messageToRedisServiceCommandObject(messageString);
|
||||
const logger = Container.get(Logger);
|
||||
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
||||
|
||||
if (message) {
|
||||
logger.debug(
|
||||
|
@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) {
|
|||
return message;
|
||||
}
|
||||
|
||||
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) {
|
||||
if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
|
||||
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
|
||||
logger.error(
|
||||
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
|
||||
|
@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) {
|
|||
return message;
|
||||
}
|
||||
await Container.get(ExternalSecretsManager).reloadAllProviders();
|
||||
break;
|
||||
|
||||
case 'workflowActiveStateChanged': {
|
||||
if (!debounceMessageReceiver(message, 100)) {
|
||||
message.payload = { result: 'debounced' };
|
||||
return message;
|
||||
}
|
||||
|
||||
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
|
||||
|
||||
if (
|
||||
typeof workflowId !== 'string' ||
|
||||
typeof oldState !== 'boolean' ||
|
||||
typeof newState !== 'boolean' ||
|
||||
typeof versionId !== 'string'
|
||||
) {
|
||||
break;
|
||||
}
|
||||
|
||||
const push = Container.get(Push);
|
||||
|
||||
if (!oldState && newState) {
|
||||
try {
|
||||
await activeWorkflowRunner.add(workflowId, 'activate');
|
||||
push.broadcast('workflowActivated', { workflowId });
|
||||
} catch (e) {
|
||||
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||
|
||||
await Container.get(WorkflowRepository).update(workflowId, {
|
||||
active: false,
|
||||
versionId,
|
||||
});
|
||||
|
||||
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
|
||||
workflowId,
|
||||
errorMessage: error.message,
|
||||
});
|
||||
}
|
||||
} else if (oldState && !newState) {
|
||||
await activeWorkflowRunner.remove(workflowId);
|
||||
push.broadcast('workflowDeactivated', { workflowId });
|
||||
} else {
|
||||
await activeWorkflowRunner.remove(workflowId);
|
||||
await activeWorkflowRunner.add(workflowId, 'update');
|
||||
}
|
||||
|
||||
await activeWorkflowRunner.removeActivationError(workflowId);
|
||||
}
|
||||
|
||||
case 'workflowFailedToActivate': {
|
||||
if (!debounceMessageReceiver(message, 100)) {
|
||||
message.payload = { result: 'debounced' };
|
||||
return message;
|
||||
}
|
||||
|
||||
const { workflowId, errorMessage } = message.payload ?? {};
|
||||
|
||||
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
|
||||
|
||||
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service';
|
|||
@Service()
|
||||
export class OrchestrationWebhookService extends OrchestrationService {
|
||||
sanityCheck(): boolean {
|
||||
return this.initialized && this.isQueueMode && this.isWebhookInstance;
|
||||
return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service';
|
|||
@Service()
|
||||
export class OrchestrationWorkerService extends OrchestrationService {
|
||||
sanityCheck(): boolean {
|
||||
return this.initialized && this.isQueueMode && this.isWorkerInstance;
|
||||
return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
|
||||
}
|
||||
|
||||
async publishToEventLog(message: AbstractEventMessage) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import Container, { Service } from 'typedi';
|
||||
import { Service } from 'typedi';
|
||||
import { BinaryDataService } from 'n8n-core';
|
||||
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
|
||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||
|
@ -23,16 +23,13 @@ export class PruningService {
|
|||
|
||||
public hardDeletionTimeout: NodeJS.Timeout | undefined;
|
||||
|
||||
private isMultiMainScenario =
|
||||
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly binaryDataService: BinaryDataService,
|
||||
) {}
|
||||
|
||||
async isPruningEnabled() {
|
||||
isPruningEnabled() {
|
||||
if (
|
||||
!config.getEnv('executions.pruneData') ||
|
||||
inTest ||
|
||||
|
@ -41,75 +38,60 @@ export class PruningService {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (this.isMultiMainScenario) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await multiMainInstancePublisher.init();
|
||||
|
||||
return multiMainInstancePublisher.isLeader;
|
||||
if (
|
||||
config.getEnv('multiMainSetup.enabled') &&
|
||||
config.getEnv('multiMainSetup.instanceType') === 'follower'
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @important Call only after DB connection is established and migrations have completed.
|
||||
* @important Call this method only after DB migrations have completed.
|
||||
*/
|
||||
startPruning() {
|
||||
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
|
||||
|
||||
this.setSoftDeletionInterval();
|
||||
this.scheduleHardDeletion();
|
||||
}
|
||||
|
||||
async stopPruning() {
|
||||
if (this.isMultiMainScenario) {
|
||||
const { MultiMainInstancePublisher } = await import(
|
||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||
);
|
||||
|
||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||
|
||||
await multiMainInstancePublisher.init();
|
||||
|
||||
if (multiMainInstancePublisher.isFollower) return;
|
||||
}
|
||||
|
||||
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
|
||||
stopPruning() {
|
||||
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
|
||||
|
||||
clearInterval(this.softDeletionInterval);
|
||||
clearTimeout(this.hardDeletionTimeout);
|
||||
}
|
||||
|
||||
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||
|
||||
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
|
||||
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
|
||||
|
||||
this.softDeletionInterval = setInterval(
|
||||
async () => this.softDeleteOnPruningCycle(),
|
||||
this.rates.softDeletion,
|
||||
);
|
||||
|
||||
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
|
||||
}
|
||||
|
||||
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||
|
||||
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
|
||||
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
|
||||
|
||||
this.hardDeletionTimeout = setTimeout(
|
||||
async () => this.hardDeleteOnPruningCycle(),
|
||||
this.rates.hardDeletion,
|
||||
);
|
||||
|
||||
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark executions as deleted based on age and count, in a pruning cycle.
|
||||
*/
|
||||
async softDeleteOnPruningCycle() {
|
||||
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
|
||||
this.logger.debug('[Pruning] Starting soft-deletion of executions');
|
||||
|
||||
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
||||
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
||||
|
@ -157,8 +139,11 @@ export class PruningService {
|
|||
.execute();
|
||||
|
||||
if (result.affected === 0) {
|
||||
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
|
||||
this.logger.debug('[Pruning] Found no executions to soft-delete');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -187,21 +172,23 @@ export class PruningService {
|
|||
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
||||
|
||||
if (executionIds.length === 0) {
|
||||
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
|
||||
this.logger.debug('[Pruning] Found no executions to hard-delete');
|
||||
this.scheduleHardDeletion();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
|
||||
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
|
||||
executionIds,
|
||||
});
|
||||
|
||||
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
||||
|
||||
await this.executionRepository.delete({ id: In(executionIds) });
|
||||
|
||||
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
|
||||
this.logger.error('[Pruning] Failed to hard-delete executions', {
|
||||
executionIds,
|
||||
error: error instanceof Error ? error.message : `${error}`,
|
||||
});
|
||||
|
|
|
@ -6,7 +6,9 @@ export type RedisServiceCommand =
|
|||
| 'restartEventBus'
|
||||
| 'stopWorker'
|
||||
| 'reloadLicense'
|
||||
| 'reloadExternalSecretsProviders';
|
||||
| 'reloadExternalSecretsProviders'
|
||||
| 'workflowActiveStateChanged' // multi-main only
|
||||
| 'workflowFailedToActivate'; // multi-main only
|
||||
|
||||
/**
|
||||
* An object to be sent via Redis pub/sub from the main process to the workers.
|
||||
|
@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = {
|
|||
| {
|
||||
command: 'stopWorker';
|
||||
}
|
||||
| {
|
||||
command: 'workflowActiveStateChanged';
|
||||
payload: {
|
||||
oldState: boolean;
|
||||
newState: boolean;
|
||||
workflowId: string;
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
export type RedisServiceCommandObject = {
|
||||
|
|
|
@ -30,6 +30,7 @@ import { isStringArray, isWorkflowIdValid } from '@/utils';
|
|||
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
|
||||
import { BinaryDataService } from 'n8n-core';
|
||||
import { Logger } from '@/Logger';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
||||
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
|
||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||
|
@ -212,6 +213,8 @@ export class WorkflowsService {
|
|||
);
|
||||
}
|
||||
|
||||
const oldState = shared.workflow.active;
|
||||
|
||||
if (
|
||||
!forceSave &&
|
||||
workflow.versionId !== '' &&
|
||||
|
@ -255,9 +258,14 @@ export class WorkflowsService {
|
|||
|
||||
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
|
||||
|
||||
/**
|
||||
* If the workflow being updated is stored as `active`, remove it from
|
||||
* active workflows in memory, and re-add it after the update.
|
||||
*
|
||||
* If a trigger or poller in the workflow was updated, the new value
|
||||
* will take effect only on removing and re-adding.
|
||||
*/
|
||||
if (shared.workflow.active) {
|
||||
// When workflow gets saved always remove it as the triggers could have been
|
||||
// changed and so the changes would not take effect
|
||||
await Container.get(ActiveWorkflowRunner).remove(workflowId);
|
||||
}
|
||||
|
||||
|
@ -364,6 +372,21 @@ export class WorkflowsService {
|
|||
}
|
||||
}
|
||||
|
||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||
const multiMainSetup = Container.get(MultiMainSetup);
|
||||
|
||||
await multiMainSetup.init();
|
||||
|
||||
if (multiMainSetup.isEnabled) {
|
||||
await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({
|
||||
workflowId,
|
||||
oldState,
|
||||
newState: updatedWorkflow.active,
|
||||
versionId: shared.workflow.versionId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return updatedWorkflow;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,10 +17,9 @@ import { WorkflowRunner } from '@/WorkflowRunner';
|
|||
import type { User } from '@db/entities/User';
|
||||
import type { WebhookEntity } from '@db/entities/WebhookEntity';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
|
||||
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
import { chooseRandomly } from './shared/random';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
import { setSchedulerAsLoadedNode } from './shared/utils';
|
||||
import * as testDb from './shared/testDb';
|
||||
import { createOwner } from './shared/db/users';
|
||||
|
@ -30,9 +29,13 @@ mockInstance(ActiveExecutions);
|
|||
mockInstance(ActiveWorkflows);
|
||||
mockInstance(Push);
|
||||
mockInstance(SecretsHelper);
|
||||
mockInstance(MultiMainInstancePublisher);
|
||||
|
||||
const webhookService = mockInstance(WebhookService);
|
||||
const multiMainSetup = mockInstance(MultiMainSetup, {
|
||||
isEnabled: false,
|
||||
isLeader: false,
|
||||
isFollower: false,
|
||||
});
|
||||
|
||||
setSchedulerAsLoadedNode();
|
||||
|
||||
|
@ -230,7 +233,7 @@ describe('executeErrorWorkflow()', () => {
|
|||
|
||||
describe('add()', () => {
|
||||
describe('in single-main scenario', () => {
|
||||
test('leader should add webhooks, triggers and pollers', async () => {
|
||||
test('should add webhooks, triggers and pollers', async () => {
|
||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
@ -252,72 +255,84 @@ describe('add()', () => {
|
|||
|
||||
describe('in multi-main scenario', () => {
|
||||
describe('leader', () => {
|
||||
test('on regular activation mode, leader should add webhooks only', async () => {
|
||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||
describe('on non-leadership-change activation mode', () => {
|
||||
test('should add webhooks only', async () => {
|
||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||
|
||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
||||
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
||||
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||
jest.replaceProperty(multiMainSetup, 'isLeader', true);
|
||||
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(
|
||||
activeWorkflowRunner,
|
||||
'addTriggersAndPollers',
|
||||
);
|
||||
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
|
||||
const mode = 'leadershipChange';
|
||||
describe('on leadership change activation mode', () => {
|
||||
test('should add triggers and pollers only', async () => {
|
||||
const mode = 'leadershipChange';
|
||||
|
||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||
jest.replaceProperty(multiMainSetup, 'isLeader', true);
|
||||
|
||||
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(
|
||||
activeWorkflowRunner,
|
||||
'addTriggersAndPollers',
|
||||
);
|
||||
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('follower', () => {
|
||||
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
|
||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||
describe('on any activation mode', () => {
|
||||
test('should not add webhooks, triggers or pollers', async () => {
|
||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||
|
||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||
jest.replaceProperty(multiMainSetup, 'isLeader', false);
|
||||
|
||||
mockInstance(MultiMainInstancePublisher, { isLeader: false });
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(
|
||||
activeWorkflowRunner,
|
||||
'addTriggersAndPollers',
|
||||
);
|
||||
|
||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
|
||||
await activeWorkflowRunner.init();
|
||||
addWebhooksSpy.mockReset();
|
||||
addTriggersAndPollersSpy.mockReset();
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
await activeWorkflowRunner.add(workflow.id, mode);
|
||||
|
||||
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
|
||||
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
import * as Config from '@oclif/config';
|
||||
import { DataSource } from 'typeorm';
|
||||
|
||||
import { Start } from '@/commands/start';
|
||||
import { BaseCommand } from '@/commands/BaseCommand';
|
||||
import config from '@/config';
|
||||
import { License } from '@/License';
|
||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
|
||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
|
||||
import { RedisService } from '@/services/redis.service';
|
||||
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
||||
import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
|
||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||
|
||||
import { mockInstance } from '../../shared/mocking';
|
||||
|
||||
const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
|
||||
|
||||
beforeAll(() => {
|
||||
mockInstance(DataSource);
|
||||
mockInstance(ExternalSecretsManager);
|
||||
mockInstance(ActiveWorkflowRunner);
|
||||
mockInstance(WorkflowHistoryManager);
|
||||
mockInstance(RedisService);
|
||||
mockInstance(RedisServicePubSubPublisher);
|
||||
mockInstance(RedisServicePubSubSubscriber);
|
||||
mockInstance(MultiMainInstancePublisher);
|
||||
mockInstance(OrchestrationHandlerMainService);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
config.load(config.default);
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
test('should not init license if instance is follower in multi-main scenario', async () => {
|
||||
config.set('executions.mode', 'queue');
|
||||
config.set('endpoints.disableUi', true);
|
||||
config.set('leaderSelection.enabled', true);
|
||||
|
||||
jest.spyOn(MultiMainInstancePublisher.prototype, 'isFollower', 'get').mockReturnValue(true);
|
||||
jest.spyOn(BaseCommand.prototype, 'init').mockImplementation(async () => {});
|
||||
|
||||
const licenseMock = mockInstance(License, {
|
||||
isMultipleMainInstancesLicensed: jest.fn().mockReturnValue(true),
|
||||
});
|
||||
|
||||
const startCmd = new Start([], oclifConfig);
|
||||
|
||||
await startCmd.init();
|
||||
|
||||
expect(licenseMock.init).not.toHaveBeenCalled();
|
||||
});
|
|
@ -16,6 +16,7 @@ import { PostHogClient } from '@/posthog';
|
|||
import { RedisService } from '@/services/redis.service';
|
||||
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
||||
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
|
||||
import { mockInstance } from '../../shared/mocking';
|
||||
|
||||
|
@ -37,6 +38,7 @@ beforeAll(async () => {
|
|||
mockInstance(RedisService);
|
||||
mockInstance(RedisServicePubSubPublisher);
|
||||
mockInstance(RedisServicePubSubSubscriber);
|
||||
mockInstance(MultiMainSetup);
|
||||
});
|
||||
|
||||
test('worker initializes all its components', async () => {
|
||||
|
|
|
@ -16,6 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants';
|
|||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||
import { SettingsRepository } from '@db/repositories/settings.repository';
|
||||
import { mockNodeTypesData } from '../../../unit/Helpers';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
import { mockInstance } from '../../../shared/mocking';
|
||||
|
||||
export { setupTestServer } from './testServer';
|
||||
|
@ -28,6 +29,8 @@ export { setupTestServer } from './testServer';
|
|||
* Initialize node types.
|
||||
*/
|
||||
export async function initActiveWorkflowRunner() {
|
||||
mockInstance(MultiMainSetup);
|
||||
|
||||
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
|
||||
const workflowRunner = Container.get(ActiveWorkflowRunner);
|
||||
await workflowRunner.init();
|
||||
|
|
62
packages/cli/test/integration/workflow.service.test.ts
Normal file
62
packages/cli/test/integration/workflow.service.test.ts
Normal file
|
@ -0,0 +1,62 @@
|
|||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import * as testDb from './shared/testDb';
|
||||
import { WorkflowsService } from '@/workflows/workflows.services';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
import { Telemetry } from '@/telemetry';
|
||||
import { createOwner } from './shared/db/users';
|
||||
import { createWorkflow } from './shared/db/workflows';
|
||||
|
||||
mockInstance(Telemetry);
|
||||
|
||||
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
|
||||
|
||||
beforeAll(async () => {
|
||||
await testDb.init();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await testDb.truncate(['Workflow']);
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
describe('update()', () => {
|
||||
test('should remove and re-add to active workflows on `active: true` payload', async () => {
|
||||
const owner = await createOwner();
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
||||
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
|
||||
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
|
||||
|
||||
await WorkflowsService.update(owner, workflow, workflow.id);
|
||||
|
||||
expect(removeSpy).toHaveBeenCalledTimes(1);
|
||||
const [removedWorkflowId] = removeSpy.mock.calls[0];
|
||||
expect(removedWorkflowId).toBe(workflow.id);
|
||||
|
||||
expect(addSpy).toHaveBeenCalledTimes(1);
|
||||
const [addedWorkflowId, activationMode] = addSpy.mock.calls[0];
|
||||
expect(addedWorkflowId).toBe(workflow.id);
|
||||
expect(activationMode).toBe('update');
|
||||
});
|
||||
|
||||
test('should remove from active workflows on `active: false` payload', async () => {
|
||||
const owner = await createOwner();
|
||||
const workflow = await createWorkflow({ active: true }, owner);
|
||||
|
||||
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
|
||||
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
|
||||
|
||||
workflow.active = false;
|
||||
await WorkflowsService.update(owner, workflow, workflow.id);
|
||||
|
||||
expect(removeSpy).toHaveBeenCalledTimes(1);
|
||||
const [removedWorkflowId] = removeSpy.mock.calls[0];
|
||||
expect(removedWorkflowId).toBe(workflow.id);
|
||||
|
||||
expect(addSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
|
@ -6,6 +6,7 @@ import { License } from '@/License';
|
|||
import { Logger } from '@/Logger';
|
||||
import { N8N_VERSION } from '@/constants';
|
||||
import { mockInstance } from '../shared/mocking';
|
||||
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||
|
||||
jest.mock('@n8n_io/license-sdk');
|
||||
|
||||
|
@ -27,9 +28,10 @@ describe('License', () => {
|
|||
let license: License;
|
||||
const logger = mockInstance(Logger);
|
||||
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
|
||||
const multiMainSetup = mockInstance(MultiMainSetup);
|
||||
|
||||
beforeEach(async () => {
|
||||
license = new License(logger, instanceSettings, mock(), mock());
|
||||
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
||||
await license.init();
|
||||
});
|
||||
|
||||
|
@ -52,7 +54,7 @@ describe('License', () => {
|
|||
});
|
||||
|
||||
test('initializes license manager for worker', async () => {
|
||||
license = new License(logger, instanceSettings, mock(), mock());
|
||||
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
||||
await license.init('worker');
|
||||
expect(LicenseManager).toHaveBeenCalledWith({
|
||||
autoRenewEnabled: false,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import Container from 'typedi';
|
||||
import config from '@/config';
|
||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
||||
import { eventBus } from '@/eventbus';
|
||||
import { RedisService } from '@/services/redis.service';
|
||||
|
@ -10,10 +10,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o
|
|||
import * as helpers from '@/services/orchestration/helpers';
|
||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||
import { Logger } from '@/Logger';
|
||||
import { Push } from '@/push';
|
||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||
import { mockInstance } from '../../shared/mocking';
|
||||
|
||||
const os = Container.get(SingleMainInstancePublisher);
|
||||
const os = Container.get(SingleMainSetup);
|
||||
const handler = Container.get(OrchestrationHandlerMainService);
|
||||
mockInstance(ActiveWorkflowRunner);
|
||||
|
||||
let queueModeId: string;
|
||||
|
||||
|
@ -33,6 +36,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
|
|||
|
||||
describe('Orchestration Service', () => {
|
||||
const logger = mockInstance(Logger);
|
||||
mockInstance(Push);
|
||||
beforeAll(async () => {
|
||||
mockInstance(RedisService);
|
||||
mockInstance(ExternalSecretsManager);
|
||||
|
|
|
@ -421,7 +421,25 @@ export type IPushData =
|
|||
| PushDataRemoveNodeType
|
||||
| PushDataTestWebhook
|
||||
| PushDataExecutionRecovered
|
||||
| PushDataWorkerStatusMessage;
|
||||
| PushDataWorkerStatusMessage
|
||||
| PushDataActiveWorkflowAdded
|
||||
| PushDataActiveWorkflowRemoved
|
||||
| PushDataWorkflowFailedToActivate;
|
||||
|
||||
type PushDataActiveWorkflowAdded = {
|
||||
data: IActiveWorkflowAdded;
|
||||
type: 'workflowActivated';
|
||||
};
|
||||
|
||||
type PushDataActiveWorkflowRemoved = {
|
||||
data: IActiveWorkflowRemoved;
|
||||
type: 'workflowDeactivated';
|
||||
};
|
||||
|
||||
type PushDataWorkflowFailedToActivate = {
|
||||
data: IWorkflowFailedToActivate;
|
||||
type: 'workflowFailedToActivate';
|
||||
};
|
||||
|
||||
type PushDataExecutionRecovered = {
|
||||
data: IPushDataExecutionRecovered;
|
||||
|
@ -491,6 +509,19 @@ export interface IPushDataExecutionFinished {
|
|||
retryOf?: string;
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowAdded {
|
||||
workflowId: string;
|
||||
}
|
||||
|
||||
export interface IActiveWorkflowRemoved {
|
||||
workflowId: string;
|
||||
}
|
||||
|
||||
export interface IWorkflowFailedToActivate {
|
||||
workflowId: string;
|
||||
errorMessage: string;
|
||||
}
|
||||
|
||||
export interface IPushDataUnsavedExecutionFinished {
|
||||
executionId: string;
|
||||
data: { finished: true; stoppedAt: Date };
|
||||
|
|
|
@ -119,7 +119,7 @@ export default defineComponent({
|
|||
} else {
|
||||
errorMessage = this.$locale.baseText(
|
||||
'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined',
|
||||
{ interpolate: { message: errorData.error.message } },
|
||||
{ interpolate: { message: errorData } },
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
|
|
|
@ -291,6 +291,33 @@ export const pushConnection = defineComponent({
|
|||
}
|
||||
}
|
||||
|
||||
if (
|
||||
receivedData.type === 'workflowFailedToActivate' &&
|
||||
this.workflowsStore.workflowId === receivedData.data.workflowId
|
||||
) {
|
||||
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
|
||||
this.workflowsStore.setActive(false);
|
||||
|
||||
this.showError(
|
||||
new Error(receivedData.data.errorMessage),
|
||||
this.$locale.baseText('workflowActivator.showError.title', {
|
||||
interpolate: { newStateName: 'activated' },
|
||||
}) + ':',
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (receivedData.type === 'workflowActivated') {
|
||||
this.workflowsStore.setWorkflowActive(receivedData.data.workflowId);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (receivedData.type === 'workflowDeactivated') {
|
||||
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
|
||||
// The workflow finished executing
|
||||
let pushData: IPushDataExecutionFinished;
|
||||
|
|
|
@ -10,7 +10,6 @@ import {
|
|||
} from '@/constants';
|
||||
import type {
|
||||
ExecutionsQueryFilter,
|
||||
IActivationError,
|
||||
IExecutionDeleteFilter,
|
||||
IExecutionPushResponse,
|
||||
IExecutionResponse,
|
||||
|
@ -365,7 +364,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
|||
});
|
||||
},
|
||||
|
||||
async getActivationError(id: string): Promise<IActivationError | undefined> {
|
||||
async getActivationError(id: string): Promise<string | undefined> {
|
||||
const rootStore = useRootStore();
|
||||
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
|
||||
},
|
||||
|
@ -552,6 +551,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
|||
if (this.workflowsById[workflowId]) {
|
||||
this.workflowsById[workflowId].active = true;
|
||||
}
|
||||
if (workflowId === this.workflow.id) {
|
||||
this.setActive(true);
|
||||
}
|
||||
},
|
||||
|
||||
setWorkflowInactive(workflowId: string): void {
|
||||
|
@ -562,6 +564,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
|||
if (this.workflowsById[workflowId]) {
|
||||
this.workflowsById[workflowId].active = false;
|
||||
}
|
||||
if (workflowId === this.workflow.id) {
|
||||
this.setActive(false);
|
||||
}
|
||||
},
|
||||
|
||||
async fetchActiveWorkflows(): Promise<string[]> {
|
||||
|
|
Loading…
Reference in a new issue