mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-13 05:47:31 -08:00
feat(core): Coordinate manual workflow activation and deactivation in multi-main scenario (#7643)
Followup to #7566 | Story: https://linear.app/n8n/issue/PAY-926 ### Manual workflow activation and deactivation In a multi-main scenario, if the user manually activates or deactivates a workflow, the process (whether leader or follower) that handles the PATCH request and updates its internal state should send a message into the command channel, so that all other main processes update their internal state accordingly: - Add to `ActiveWorkflows` if activating - Remove from `ActiveWorkflows` if deactivating - Remove and re-add to `ActiveWorkflows` if the update did not change activation status. After updating their internal state, if activating or deactivating, the recipient main processes should push a message to all connected frontends so that these can update their stores and so reflect the value in the UI. ### Workflow activation errors On failure to activate a workflow, the main instance should record the error in Redis - main instances should always pull activation errors from Redis in a multi-main scenario. ### Leadership change On leadership change... - The old leader should stop pruning and the new leader should start pruning. - The old leader should remove trigger- and poller-based workflows and the new leader should add them.
This commit is contained in:
parent
b3a3f16bc2
commit
4c4082503c
52
packages/cli/src/ActivationErrors.service.ts
Normal file
52
packages/cli/src/ActivationErrors.service.ts
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
import { Service } from 'typedi';
|
||||||
|
import { CacheService } from './services/cache.service';
|
||||||
|
import { jsonParse } from 'n8n-workflow';
|
||||||
|
|
||||||
|
type ActivationErrors = {
|
||||||
|
[workflowId: string]: string; // error message
|
||||||
|
};
|
||||||
|
|
||||||
|
@Service()
|
||||||
|
export class ActivationErrorsService {
|
||||||
|
private readonly cacheKey = 'workflow-activation-errors';
|
||||||
|
|
||||||
|
constructor(private readonly cacheService: CacheService) {}
|
||||||
|
|
||||||
|
async set(workflowId: string, errorMessage: string) {
|
||||||
|
const errors = await this.getAll();
|
||||||
|
|
||||||
|
errors[workflowId] = errorMessage;
|
||||||
|
|
||||||
|
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
|
||||||
|
}
|
||||||
|
|
||||||
|
async unset(workflowId: string) {
|
||||||
|
const errors = await this.getAll();
|
||||||
|
|
||||||
|
if (Object.keys(errors).length === 0) return;
|
||||||
|
|
||||||
|
delete errors[workflowId];
|
||||||
|
|
||||||
|
await this.cacheService.set(this.cacheKey, JSON.stringify(errors));
|
||||||
|
}
|
||||||
|
|
||||||
|
async get(workflowId: string) {
|
||||||
|
const errors = await this.getAll();
|
||||||
|
|
||||||
|
if (Object.keys(errors).length === 0) return null;
|
||||||
|
|
||||||
|
return errors[workflowId];
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAll() {
|
||||||
|
const errors = await this.cacheService.get<string>(this.cacheKey);
|
||||||
|
|
||||||
|
if (!errors) return {};
|
||||||
|
|
||||||
|
return jsonParse<ActivationErrors>(errors);
|
||||||
|
}
|
||||||
|
|
||||||
|
async clearAll() {
|
||||||
|
await this.cacheService.delete(this.cacheKey);
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,8 +2,9 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||||
|
|
||||||
import Container, { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
||||||
|
import config from '@/config';
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
ExecutionError,
|
ExecutionError,
|
||||||
|
@ -64,8 +65,8 @@ import { WebhookService } from './services/webhook.service';
|
||||||
import { Logger } from './Logger';
|
import { Logger } from './Logger';
|
||||||
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
||||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||||
import config from '@/config';
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
|
import { ActivationErrorsService } from '@/ActivationErrors.service';
|
||||||
|
|
||||||
const WEBHOOK_PROD_UNREGISTERED_HINT =
|
const WEBHOOK_PROD_UNREGISTERED_HINT =
|
||||||
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
|
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
|
||||||
|
@ -74,15 +75,6 @@ const WEBHOOK_PROD_UNREGISTERED_HINT =
|
||||||
export class ActiveWorkflowRunner implements IWebhookManager {
|
export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
activeWorkflows = new ActiveWorkflows();
|
activeWorkflows = new ActiveWorkflows();
|
||||||
|
|
||||||
private activationErrors: {
|
|
||||||
[workflowId: string]: {
|
|
||||||
time: number; // ms
|
|
||||||
error: {
|
|
||||||
message: string;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
} = {};
|
|
||||||
|
|
||||||
private queuedActivations: {
|
private queuedActivations: {
|
||||||
[workflowId: string]: {
|
[workflowId: string]: {
|
||||||
activationMode: WorkflowActivateMode;
|
activationMode: WorkflowActivateMode;
|
||||||
|
@ -92,11 +84,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
};
|
};
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
isMultiMainScenario =
|
|
||||||
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
|
||||||
|
|
||||||
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly activeExecutions: ActiveExecutions,
|
private readonly activeExecutions: ActiveExecutions,
|
||||||
|
@ -105,17 +92,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
private readonly webhookService: WebhookService,
|
private readonly webhookService: WebhookService,
|
||||||
private readonly workflowRepository: WorkflowRepository,
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
|
||||||
|
private readonly multiMainSetup: MultiMainSetup,
|
||||||
|
private readonly activationErrorsService: ActivationErrorsService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
if (this.isMultiMainScenario) {
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
const { MultiMainInstancePublisher } = await import(
|
await this.multiMainSetup.init();
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
|
||||||
);
|
|
||||||
|
|
||||||
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await this.multiMainInstancePublisher.init();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.addActiveWorkflows('init');
|
await this.addActiveWorkflows('init');
|
||||||
|
@ -272,6 +255,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
async allActiveInStorage(user?: User) {
|
async allActiveInStorage(user?: User) {
|
||||||
const isFullAccess = !user || user.globalRole.name === 'owner';
|
const isFullAccess = !user || user.globalRole.name === 'owner';
|
||||||
|
|
||||||
|
const activationErrors = await this.activationErrorsService.getAll();
|
||||||
|
|
||||||
if (isFullAccess) {
|
if (isFullAccess) {
|
||||||
const activeWorkflows = await this.workflowRepository.find({
|
const activeWorkflows = await this.workflowRepository.find({
|
||||||
select: ['id'],
|
select: ['id'],
|
||||||
|
@ -280,7 +265,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
|
|
||||||
return activeWorkflows
|
return activeWorkflows
|
||||||
.map((workflow) => workflow.id)
|
.map((workflow) => workflow.id)
|
||||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
.filter((workflowId) => !activationErrors[workflowId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
const where = whereClause({
|
const where = whereClause({
|
||||||
|
@ -304,7 +289,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
|
|
||||||
return sharings
|
return sharings
|
||||||
.map((sharing) => sharing.workflowId)
|
.map((sharing) => sharing.workflowId)
|
||||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
.filter((workflowId) => !activationErrors[workflowId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -325,8 +310,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
/**
|
/**
|
||||||
* Return error if there was a problem activating the workflow
|
* Return error if there was a problem activating the workflow
|
||||||
*/
|
*/
|
||||||
getActivationError(workflowId: string) {
|
async getActivationError(workflowId: string) {
|
||||||
return this.activationErrors[workflowId];
|
return this.activationErrorsService.get(workflowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -612,12 +597,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
// Remove the workflow as "active"
|
// Remove the workflow as "active"
|
||||||
|
|
||||||
void this.activeWorkflows.remove(workflowData.id);
|
void this.activeWorkflows.remove(workflowData.id);
|
||||||
this.activationErrors[workflowData.id] = {
|
|
||||||
time: new Date().getTime(),
|
void this.activationErrorsService.set(workflowData.id, error.message);
|
||||||
error: {
|
|
||||||
message: error.message,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Run Error Workflow if defined
|
// Run Error Workflow if defined
|
||||||
const activationError = new WorkflowActivationError(
|
const activationError = new WorkflowActivationError(
|
||||||
|
@ -709,15 +690,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
this.logger.verbose('Finished activating workflows (startup)');
|
this.logger.verbose('Finished activating workflows (startup)');
|
||||||
}
|
}
|
||||||
|
|
||||||
async addAllTriggerAndPollerBasedWorkflows() {
|
async clearAllActivationErrors() {
|
||||||
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
|
await this.activationErrorsService.clearAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
async addAllTriggerAndPollerBasedWorkflows() {
|
||||||
await this.addActiveWorkflows('leadershipChange');
|
await this.addActiveWorkflows('leadershipChange');
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeAllTriggerAndPollerBasedWorkflows() {
|
async removeAllTriggerAndPollerBasedWorkflows() {
|
||||||
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
|
|
||||||
|
|
||||||
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
|
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,12 +731,12 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
let shouldAddWebhooks = true;
|
let shouldAddWebhooks = true;
|
||||||
let shouldAddTriggersAndPollers = true;
|
let shouldAddTriggersAndPollers = true;
|
||||||
|
|
||||||
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
|
if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') {
|
||||||
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
|
shouldAddWebhooks = this.multiMainSetup.isLeader;
|
||||||
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
|
shouldAddTriggersAndPollers = this.multiMainSetup.isLeader;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
|
if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') {
|
||||||
shouldAddWebhooks = false;
|
shouldAddWebhooks = false;
|
||||||
shouldAddTriggersAndPollers = true;
|
shouldAddTriggersAndPollers = true;
|
||||||
}
|
}
|
||||||
|
@ -795,17 +776,13 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
|
||||||
|
|
||||||
if (shouldAddWebhooks) {
|
if (shouldAddWebhooks) {
|
||||||
this.logger.debug('============');
|
this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`);
|
||||||
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
|
|
||||||
this.logger.debug('============');
|
|
||||||
|
|
||||||
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldAddTriggersAndPollers) {
|
if (shouldAddTriggersAndPollers) {
|
||||||
this.logger.debug('============');
|
this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`);
|
||||||
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
|
|
||||||
this.logger.debug('============');
|
|
||||||
|
|
||||||
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||||
activationMode,
|
activationMode,
|
||||||
|
@ -817,21 +794,15 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
// Workflow got now successfully activated so make sure nothing is left in the queue
|
// Workflow got now successfully activated so make sure nothing is left in the queue
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
|
|
||||||
if (this.activationErrors[workflowId]) {
|
await this.activationErrorsService.unset(workflowId);
|
||||||
delete this.activationErrors[workflowId];
|
|
||||||
}
|
|
||||||
|
|
||||||
const triggerCount = this.countTriggers(workflow, additionalData);
|
const triggerCount = this.countTriggers(workflow, additionalData);
|
||||||
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||||
} catch (error) {
|
} catch (e) {
|
||||||
this.activationErrors[workflowId] = {
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
time: new Date().getTime(),
|
await this.activationErrorsService.set(workflowId, error.message);
|
||||||
error: {
|
|
||||||
message: error.message,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
throw error;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If for example webhooks get created it sometimes has to save the
|
// If for example webhooks get created it sometimes has to save the
|
||||||
|
@ -950,10 +921,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.activationErrors[workflowId] !== undefined) {
|
await this.activationErrorsService.unset(workflowId);
|
||||||
// If there were any activation errors delete them
|
|
||||||
delete this.activationErrors[workflowId];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.queuedActivations[workflowId] !== undefined) {
|
if (this.queuedActivations[workflowId] !== undefined) {
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
|
@ -1016,4 +984,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeActivationError(workflowId: string) {
|
||||||
|
await this.activationErrorsService.unset(workflowId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ import {
|
||||||
import { License } from '@/License';
|
import { License } from '@/License';
|
||||||
import { InternalHooks } from '@/InternalHooks';
|
import { InternalHooks } from '@/InternalHooks';
|
||||||
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
|
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ExternalSecretsManager {
|
export class ExternalSecretsManager {
|
||||||
|
@ -82,7 +82,7 @@ export class ExternalSecretsManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async broadcastReloadExternalSecretsProviders() {
|
async broadcastReloadExternalSecretsProviders() {
|
||||||
await Container.get(SingleMainInstancePublisher).broadcastReloadExternalSecretsProviders();
|
await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders();
|
||||||
}
|
}
|
||||||
|
|
||||||
private decryptSecretsSettings(value: string): ExternalSecretsSettings {
|
private decryptSecretsSettings(value: string): ExternalSecretsSettings {
|
||||||
|
|
|
@ -298,6 +298,7 @@ export interface IDiagnosticInfo {
|
||||||
ldap_allowed: boolean;
|
ldap_allowed: boolean;
|
||||||
saml_enabled: boolean;
|
saml_enabled: boolean;
|
||||||
binary_data_s3: boolean;
|
binary_data_s3: boolean;
|
||||||
|
multi_main_setup_enabled: boolean;
|
||||||
licensePlanName?: string;
|
licensePlanName?: string;
|
||||||
licenseTenantId?: number;
|
licenseTenantId?: number;
|
||||||
}
|
}
|
||||||
|
@ -469,7 +470,25 @@ export type IPushData =
|
||||||
| PushDataNodeDescriptionUpdated
|
| PushDataNodeDescriptionUpdated
|
||||||
| PushDataExecutionRecovered
|
| PushDataExecutionRecovered
|
||||||
| PushDataActiveWorkflowUsersChanged
|
| PushDataActiveWorkflowUsersChanged
|
||||||
| PushDataWorkerStatusMessage;
|
| PushDataWorkerStatusMessage
|
||||||
|
| PushDataWorkflowActivated
|
||||||
|
| PushDataWorkflowDeactivated
|
||||||
|
| PushDataWorkflowFailedToActivate;
|
||||||
|
|
||||||
|
type PushDataWorkflowFailedToActivate = {
|
||||||
|
data: IWorkflowFailedToActivate;
|
||||||
|
type: 'workflowFailedToActivate';
|
||||||
|
};
|
||||||
|
|
||||||
|
type PushDataWorkflowActivated = {
|
||||||
|
data: IActiveWorkflowChanged;
|
||||||
|
type: 'workflowActivated';
|
||||||
|
};
|
||||||
|
|
||||||
|
type PushDataWorkflowDeactivated = {
|
||||||
|
data: IActiveWorkflowChanged;
|
||||||
|
type: 'workflowDeactivated';
|
||||||
|
};
|
||||||
|
|
||||||
type PushDataActiveWorkflowUsersChanged = {
|
type PushDataActiveWorkflowUsersChanged = {
|
||||||
data: IActiveWorkflowUsersChanged;
|
data: IActiveWorkflowUsersChanged;
|
||||||
|
@ -536,11 +555,24 @@ export interface IActiveWorkflowUser {
|
||||||
lastSeen: Date;
|
lastSeen: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IActiveWorkflowAdded {
|
||||||
|
workflowId: Workflow['id'];
|
||||||
|
}
|
||||||
|
|
||||||
export interface IActiveWorkflowUsersChanged {
|
export interface IActiveWorkflowUsersChanged {
|
||||||
workflowId: Workflow['id'];
|
workflowId: Workflow['id'];
|
||||||
activeUsers: IActiveWorkflowUser[];
|
activeUsers: IActiveWorkflowUser[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface IActiveWorkflowChanged {
|
||||||
|
workflowId: Workflow['id'];
|
||||||
|
}
|
||||||
|
|
||||||
|
interface IWorkflowFailedToActivate {
|
||||||
|
workflowId: Workflow['id'];
|
||||||
|
errorMessage: string;
|
||||||
|
}
|
||||||
|
|
||||||
export interface IPushDataExecutionRecovered {
|
export interface IPushDataExecutionRecovered {
|
||||||
executionId: string;
|
executionId: string;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||||
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
|
||||||
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
|
||||||
import { RedisService } from './services/redis.service';
|
import { RedisService } from './services/redis.service';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
|
||||||
type FeatureReturnType = Partial<
|
type FeatureReturnType = Partial<
|
||||||
{
|
{
|
||||||
|
@ -40,6 +41,7 @@ export class License {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly instanceSettings: InstanceSettings,
|
private readonly instanceSettings: InstanceSettings,
|
||||||
|
private readonly multiMainSetup: MultiMainSetup,
|
||||||
private readonly settingsRepository: SettingsRepository,
|
private readonly settingsRepository: SettingsRepository,
|
||||||
private readonly workflowRepository: WorkflowRepository,
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
) {}
|
) {}
|
||||||
|
@ -49,6 +51,10 @@ export class License {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
|
await this.multiMainSetup.init();
|
||||||
|
}
|
||||||
|
|
||||||
const isMainInstance = instanceType === 'main';
|
const isMainInstance = instanceType === 'main';
|
||||||
const server = config.getEnv('license.serverUrl');
|
const server = config.getEnv('license.serverUrl');
|
||||||
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
|
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
|
||||||
|
@ -114,22 +120,28 @@ export class License {
|
||||||
}
|
}
|
||||||
|
|
||||||
async onFeatureChange(_features: TFeatures): Promise<void> {
|
async onFeatureChange(_features: TFeatures): Promise<void> {
|
||||||
if (config.getEnv('executions.mode') === 'queue') {
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
if (config.getEnv('leaderSelection.enabled')) {
|
const isMultiMainLicensed = _features[LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES] as
|
||||||
const { MultiMainInstancePublisher } = await import(
|
| boolean
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
| undefined;
|
||||||
|
|
||||||
|
this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false);
|
||||||
|
|
||||||
|
if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) {
|
||||||
|
this.logger.debug(
|
||||||
|
'[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...',
|
||||||
);
|
);
|
||||||
|
|
||||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await multiMainInstancePublisher.init();
|
|
||||||
|
|
||||||
if (multiMainInstancePublisher.isFollower) {
|
|
||||||
this.logger.debug('Instance is follower, skipping sending of reloadLicense command...');
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) {
|
||||||
|
this.logger.debug(
|
||||||
|
'[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.',
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.getEnv('executions.mode') === 'queue') {
|
||||||
if (!this.redisPublisher) {
|
if (!this.redisPublisher) {
|
||||||
this.logger.debug('Initializing Redis publisher for License Service');
|
this.logger.debug('Initializing Redis publisher for License Service');
|
||||||
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
|
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
|
||||||
|
|
|
@ -215,6 +215,7 @@ export class Server extends AbstractServer {
|
||||||
ldap_allowed: isLdapCurrentAuthenticationMethod(),
|
ldap_allowed: isLdapCurrentAuthenticationMethod(),
|
||||||
saml_enabled: isSamlCurrentAuthenticationMethod(),
|
saml_enabled: isSamlCurrentAuthenticationMethod(),
|
||||||
binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
|
binary_data_s3: isS3Available && isS3Selected && isS3Licensed,
|
||||||
|
multi_main_setup_enabled: config.getEnv('multiMainSetup.enabled'),
|
||||||
licensePlanName: Container.get(License).getPlanName(),
|
licensePlanName: Container.get(License).getPlanName(),
|
||||||
licenseTenantId: config.getEnv('license.tenantId'),
|
licenseTenantId: config.getEnv('license.tenantId'),
|
||||||
};
|
};
|
||||||
|
@ -448,7 +449,7 @@ export class Server extends AbstractServer {
|
||||||
// Returns if the workflow with the given id had any activation errors
|
// Returns if the workflow with the given id had any activation errors
|
||||||
this.app.get(
|
this.app.get(
|
||||||
`/${this.restEndpoint}/active/error/:id`,
|
`/${this.restEndpoint}/active/error/:id`,
|
||||||
ResponseHelper.send(async (req: WorkflowRequest.GetAllActivationErrors) => {
|
ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
|
||||||
const { id: workflowId } = req.params;
|
const { id: workflowId } = req.params;
|
||||||
|
|
||||||
const shared = await Container.get(SharedWorkflowRepository).findOne({
|
const shared = await Container.get(SharedWorkflowRepository).findOne({
|
||||||
|
|
|
@ -243,21 +243,6 @@ export abstract class BaseCommand extends Command {
|
||||||
}
|
}
|
||||||
|
|
||||||
async initLicense(): Promise<void> {
|
async initLicense(): Promise<void> {
|
||||||
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled')) {
|
|
||||||
const { MultiMainInstancePublisher } = await import(
|
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
|
||||||
);
|
|
||||||
|
|
||||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await multiMainInstancePublisher.init();
|
|
||||||
|
|
||||||
if (multiMainInstancePublisher.isFollower) {
|
|
||||||
this.logger.debug('Instance is follower, skipping license initialization...');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const license = Container.get(License);
|
const license = Container.get(License);
|
||||||
await license.init(this.instanceType ?? 'main');
|
await license.init(this.instanceType ?? 'main');
|
||||||
|
|
||||||
|
|
|
@ -25,9 +25,10 @@ import { BaseCommand } from './BaseCommand';
|
||||||
import { InternalHooks } from '@/InternalHooks';
|
import { InternalHooks } from '@/InternalHooks';
|
||||||
import { License, FeatureNotLicensedError } from '@/License';
|
import { License, FeatureNotLicensedError } from '@/License';
|
||||||
import { IConfig } from '@oclif/config';
|
import { IConfig } from '@oclif/config';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||||
import { PruningService } from '@/services/pruning.service';
|
import { PruningService } from '@/services/pruning.service';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
import { SettingsRepository } from '@db/repositories/settings.repository';
|
import { SettingsRepository } from '@db/repositories/settings.repository';
|
||||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
|
|
||||||
|
@ -112,18 +113,14 @@ export class Start extends BaseCommand {
|
||||||
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
|
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
|
||||||
await Container.get(License).shutdown();
|
await Container.get(License).shutdown();
|
||||||
|
|
||||||
if (await this.pruningService.isPruningEnabled()) {
|
if (this.pruningService.isPruningEnabled()) {
|
||||||
await this.pruningService.stopPruning();
|
this.pruningService.stopPruning();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.getEnv('leaderSelection.enabled')) {
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
const { MultiMainInstancePublisher } = await import(
|
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
|
|
||||||
await Container.get(MultiMainInstancePublisher).destroy();
|
await Container.get(MultiMainSetup).shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
await Container.get(InternalHooks).onN8nStop();
|
await Container.get(InternalHooks).onN8nStop();
|
||||||
|
@ -230,38 +227,42 @@ export class Start extends BaseCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
async initOrchestration() {
|
async initOrchestration() {
|
||||||
if (config.get('executions.mode') !== 'queue') return;
|
if (config.getEnv('executions.mode') !== 'queue') return;
|
||||||
|
|
||||||
if (!config.get('leaderSelection.enabled')) {
|
// queue mode in single-main scenario
|
||||||
await Container.get(SingleMainInstancePublisher).init();
|
|
||||||
|
if (!config.getEnv('multiMainSetup.enabled')) {
|
||||||
|
await Container.get(SingleMainSetup).init();
|
||||||
await Container.get(OrchestrationHandlerMainService).init();
|
await Container.get(OrchestrationHandlerMainService).init();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// multi-main scenario
|
// queue mode in multi-main scenario
|
||||||
|
|
||||||
const { MultiMainInstancePublisher } = await import(
|
if (!Container.get(License).isMultipleMainInstancesLicensed()) {
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
|
||||||
);
|
|
||||||
|
|
||||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await multiMainInstancePublisher.init();
|
|
||||||
|
|
||||||
if (
|
|
||||||
multiMainInstancePublisher.isLeader &&
|
|
||||||
!Container.get(License).isMultipleMainInstancesLicensed()
|
|
||||||
) {
|
|
||||||
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
|
throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Container.get(OrchestrationHandlerMainService).init();
|
await Container.get(OrchestrationHandlerMainService).init();
|
||||||
|
|
||||||
multiMainInstancePublisher.on('leadershipChange', async () => {
|
const multiMainSetup = Container.get(MultiMainSetup);
|
||||||
if (multiMainInstancePublisher.isLeader) {
|
|
||||||
|
await multiMainSetup.init();
|
||||||
|
|
||||||
|
multiMainSetup.on('leadershipChange', async () => {
|
||||||
|
if (multiMainSetup.isLeader) {
|
||||||
|
this.logger.debug('[Leadership change] Clearing all activation errors...');
|
||||||
|
|
||||||
|
await this.activeWorkflowRunner.clearAllActivationErrors();
|
||||||
|
|
||||||
|
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
|
||||||
|
|
||||||
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
|
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
|
||||||
} else {
|
} else {
|
||||||
// only in case of leadership change without shutdown
|
this.logger.debug(
|
||||||
|
'[Leadership change] Removing all trigger- and poller-based workflows...',
|
||||||
|
);
|
||||||
|
|
||||||
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -333,10 +334,7 @@ export class Start extends BaseCommand {
|
||||||
|
|
||||||
await this.server.start();
|
await this.server.start();
|
||||||
|
|
||||||
this.pruningService = Container.get(PruningService);
|
await this.initPruning();
|
||||||
if (await this.pruningService.isPruningEnabled()) {
|
|
||||||
this.pruningService.startPruning();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start to get active workflows and run their triggers
|
// Start to get active workflows and run their triggers
|
||||||
await this.activeWorkflowRunner.init();
|
await this.activeWorkflowRunner.init();
|
||||||
|
@ -375,6 +373,32 @@ export class Start extends BaseCommand {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async initPruning() {
|
||||||
|
this.pruningService = Container.get(PruningService);
|
||||||
|
|
||||||
|
if (this.pruningService.isPruningEnabled()) {
|
||||||
|
this.pruningService.startPruning();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
|
const multiMainSetup = Container.get(MultiMainSetup);
|
||||||
|
|
||||||
|
await multiMainSetup.init();
|
||||||
|
|
||||||
|
multiMainSetup.on('leadershipChange', async () => {
|
||||||
|
if (multiMainSetup.isLeader) {
|
||||||
|
if (this.pruningService.isPruningEnabled()) {
|
||||||
|
this.pruningService.startPruning();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (this.pruningService.isPruningEnabled()) {
|
||||||
|
this.pruningService.stopPruning();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async catch(error: Error) {
|
async catch(error: Error) {
|
||||||
console.log(error.stack);
|
console.log(error.stack);
|
||||||
await this.exitWithCrash('Exiting due to an error.', error);
|
await this.exitWithCrash('Exiting due to an error.', error);
|
||||||
|
|
|
@ -1324,24 +1324,29 @@ export const schema = {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
leaderSelection: {
|
multiMainSetup: {
|
||||||
|
instanceType: {
|
||||||
|
doc: 'Type of instance in multi-main setup',
|
||||||
|
format: ['unset', 'leader', 'follower'] as const,
|
||||||
|
default: 'unset', // only until first leader key check
|
||||||
|
},
|
||||||
enabled: {
|
enabled: {
|
||||||
doc: 'Whether to enable leader selection for multiple main instances (license required)',
|
doc: 'Whether to enable multi-main setup for queue mode (license required)',
|
||||||
format: Boolean,
|
format: Boolean,
|
||||||
default: false,
|
default: false,
|
||||||
env: 'N8N_LEADER_SELECTION_ENABLED',
|
env: 'N8N_MULTI_MAIN_SETUP_ENABLED',
|
||||||
},
|
},
|
||||||
ttl: {
|
ttl: {
|
||||||
doc: 'Time to live in Redis for leader selection key, in seconds',
|
doc: 'Time to live (in seconds) for leader key in multi-main setup',
|
||||||
format: Number,
|
format: Number,
|
||||||
default: 10,
|
default: 10,
|
||||||
env: 'N8N_LEADER_SELECTION_KEY_TTL',
|
env: 'N8N_MULTI_MAIN_SETUP_KEY_TTL',
|
||||||
},
|
},
|
||||||
interval: {
|
interval: {
|
||||||
doc: 'Interval in Redis for leader selection check, in seconds',
|
doc: 'Interval (in seconds) for leader check in multi-main setup',
|
||||||
format: Number,
|
format: Number,
|
||||||
default: 3,
|
default: 3,
|
||||||
env: 'N8N_LEADER_SELECTION_CHECK_INTERVAL',
|
env: 'N8N_MULTI_MAIN_SETUP_CHECK_INTERVAL',
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { Authorized, Post, RestController } from '@/decorators';
|
import { Authorized, Post, RestController } from '@/decorators';
|
||||||
import { OrchestrationRequest } from '@/requests';
|
import { OrchestrationRequest } from '@/requests';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
import { License } from '../License';
|
import { License } from '../License';
|
||||||
|
|
||||||
@Authorized('any')
|
@Authorized('any')
|
||||||
|
@ -9,7 +9,7 @@ import { License } from '../License';
|
||||||
@Service()
|
@Service()
|
||||||
export class OrchestrationController {
|
export class OrchestrationController {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly orchestrationService: SingleMainInstancePublisher,
|
private readonly singleMainSetup: SingleMainSetup,
|
||||||
private readonly licenseService: License,
|
private readonly licenseService: License,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
@ -21,18 +21,18 @@ export class OrchestrationController {
|
||||||
async getWorkersStatus(req: OrchestrationRequest.Get) {
|
async getWorkersStatus(req: OrchestrationRequest.Get) {
|
||||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||||
const id = req.params.id;
|
const id = req.params.id;
|
||||||
return this.orchestrationService.getWorkerStatus(id);
|
return this.singleMainSetup.getWorkerStatus(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Post('/worker/status')
|
@Post('/worker/status')
|
||||||
async getWorkersStatusAll() {
|
async getWorkersStatusAll() {
|
||||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||||
return this.orchestrationService.getWorkerStatus();
|
return this.singleMainSetup.getWorkerStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Post('/worker/ids')
|
@Post('/worker/ids')
|
||||||
async getWorkerIdsAll() {
|
async getWorkerIdsAll() {
|
||||||
if (!this.licenseService.isWorkerViewLicensed()) return;
|
if (!this.licenseService.isWorkerViewLicensed()) return;
|
||||||
return this.orchestrationService.getWorkerIds();
|
return this.singleMainSetup.getWorkerIds();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
import { WorkflowRepository } from '@db/repositories/workflow.repository';
|
||||||
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
|
||||||
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository';
|
||||||
|
|
||||||
|
@ -207,9 +207,7 @@ export class MessageEventBus extends EventEmitter {
|
||||||
this.destinations[destination.getId()] = destination;
|
this.destinations[destination.getId()] = destination;
|
||||||
this.destinations[destination.getId()].startListening();
|
this.destinations[destination.getId()].startListening();
|
||||||
if (notifyWorkers) {
|
if (notifyWorkers) {
|
||||||
await Container.get(
|
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
|
||||||
SingleMainInstancePublisher,
|
|
||||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
|
||||||
}
|
}
|
||||||
return destination;
|
return destination;
|
||||||
}
|
}
|
||||||
|
@ -235,9 +233,7 @@ export class MessageEventBus extends EventEmitter {
|
||||||
delete this.destinations[id];
|
delete this.destinations[id];
|
||||||
}
|
}
|
||||||
if (notifyWorkers) {
|
if (notifyWorkers) {
|
||||||
await Container.get(
|
await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate();
|
||||||
SingleMainInstancePublisher,
|
|
||||||
).broadcastRestartEventbusAfterDestinationUpdate();
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,7 +117,7 @@ export declare namespace WorkflowRequest {
|
||||||
|
|
||||||
type GetAllActive = AuthenticatedRequest;
|
type GetAllActive = AuthenticatedRequest;
|
||||||
|
|
||||||
type GetAllActivationErrors = Get;
|
type GetActivationError = Get;
|
||||||
|
|
||||||
type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>;
|
type ManualRun = AuthenticatedRequest<{}, {}, ManualRunPayload>;
|
||||||
|
|
||||||
|
|
|
@ -80,21 +80,21 @@ export class CacheService extends EventEmitter {
|
||||||
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
|
* @param options.refreshTtl Optional ttl for the refreshFunction's set call
|
||||||
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
|
* @param options.fallbackValue Optional value returned is cache is not hit and refreshFunction is not provided
|
||||||
*/
|
*/
|
||||||
async get(
|
async get<T = unknown>(
|
||||||
key: string,
|
key: string,
|
||||||
options: {
|
options: {
|
||||||
fallbackValue?: unknown;
|
fallbackValue?: T;
|
||||||
refreshFunction?: (key: string) => Promise<unknown>;
|
refreshFunction?: (key: string) => Promise<T>;
|
||||||
refreshTtl?: number;
|
refreshTtl?: number;
|
||||||
} = {},
|
} = {},
|
||||||
): Promise<unknown> {
|
): Promise<T | undefined> {
|
||||||
if (!key || key.length === 0) {
|
if (!key || key.length === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const value = await this.cache?.store.get(key);
|
const value = await this.cache?.store.get(key);
|
||||||
if (value !== undefined) {
|
if (value !== undefined) {
|
||||||
this.emit(this.metricsCounterEvents.cacheHit);
|
this.emit(this.metricsCounterEvents.cacheHit);
|
||||||
return value;
|
return value as T;
|
||||||
}
|
}
|
||||||
this.emit(this.metricsCounterEvents.cacheMiss);
|
this.emit(this.metricsCounterEvents.cacheMiss);
|
||||||
if (options.refreshFunction) {
|
if (options.refreshFunction) {
|
||||||
|
|
|
@ -5,7 +5,7 @@ import config from '@/config';
|
||||||
import { EventEmitter } from 'node:events';
|
import { EventEmitter } from 'node:events';
|
||||||
|
|
||||||
export abstract class OrchestrationService extends EventEmitter {
|
export abstract class OrchestrationService extends EventEmitter {
|
||||||
protected initialized = false;
|
protected isInitialized = false;
|
||||||
|
|
||||||
protected queueModeId: string;
|
protected queueModeId: string;
|
||||||
|
|
||||||
|
@ -36,17 +36,17 @@ export abstract class OrchestrationService extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
sanityCheck(): boolean {
|
sanityCheck(): boolean {
|
||||||
return this.initialized && this.isQueueMode;
|
return this.isInitialized && this.isQueueMode;
|
||||||
}
|
}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
await this.initPublisher();
|
await this.initPublisher();
|
||||||
this.initialized = true;
|
this.isInitialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async shutdown() {
|
async shutdown() {
|
||||||
await this.redisPublisher?.destroy();
|
await this.redisPublisher?.destroy();
|
||||||
this.initialized = false;
|
this.isInitialized = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected async initPublisher() {
|
protected async initPublisher() {
|
||||||
|
|
|
@ -1,50 +1,61 @@
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { TIME } from '@/constants';
|
import { TIME } from '@/constants';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
|
import { getRedisPrefix } from '@/services/redis/RedisServiceHelper';
|
||||||
|
|
||||||
/**
|
|
||||||
* For use in main instance, in multiple main instances cluster.
|
|
||||||
*/
|
|
||||||
@Service()
|
@Service()
|
||||||
export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
export class MultiMainSetup extends SingleMainSetup {
|
||||||
private id = this.queueModeId;
|
private id = this.queueModeId;
|
||||||
|
|
||||||
private leaderId: string | undefined;
|
private isLicensed = false;
|
||||||
|
|
||||||
|
get isEnabled() {
|
||||||
|
return (
|
||||||
|
config.getEnv('executions.mode') === 'queue' &&
|
||||||
|
config.getEnv('multiMainSetup.enabled') &&
|
||||||
|
this.isLicensed
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
get isLeader() {
|
get isLeader() {
|
||||||
return this.id === this.leaderId;
|
return config.getEnv('multiMainSetup.instanceType') === 'leader';
|
||||||
}
|
}
|
||||||
|
|
||||||
get isFollower() {
|
get isFollower() {
|
||||||
return !this.isLeader;
|
return !this.isLeader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setLicensed(newState: boolean) {
|
||||||
|
this.isLicensed = newState;
|
||||||
|
}
|
||||||
|
|
||||||
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
||||||
|
|
||||||
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
|
private readonly leaderKeyTtl = config.getEnv('multiMainSetup.ttl');
|
||||||
|
|
||||||
private leaderCheckInterval: NodeJS.Timer | undefined;
|
private leaderCheckInterval: NodeJS.Timer | undefined;
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
if (this.initialized) return;
|
if (this.isInitialized) return;
|
||||||
|
|
||||||
await this.initPublisher();
|
await this.initPublisher();
|
||||||
|
|
||||||
this.initialized = true;
|
this.isInitialized = true;
|
||||||
|
|
||||||
await this.tryBecomeLeader();
|
await this.tryBecomeLeader(); // prevent initial wait
|
||||||
|
|
||||||
this.leaderCheckInterval = setInterval(
|
this.leaderCheckInterval = setInterval(
|
||||||
async () => {
|
async () => {
|
||||||
await this.checkLeader();
|
await this.checkLeader();
|
||||||
},
|
},
|
||||||
config.getEnv('leaderSelection.interval') * TIME.SECOND,
|
config.getEnv('multiMainSetup.interval') * TIME.SECOND,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async destroy() {
|
async shutdown() {
|
||||||
|
if (!this.isInitialized) return;
|
||||||
|
|
||||||
clearInterval(this.leaderCheckInterval);
|
clearInterval(this.leaderCheckInterval);
|
||||||
|
|
||||||
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
|
if (this.isLeader) await this.redisPublisher.clear(this.leaderKey);
|
||||||
|
@ -69,12 +80,17 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||||
} else {
|
} else {
|
||||||
this.logger.debug(`Leader is other instance "${leaderId}"`);
|
this.logger.debug(`Leader is other instance "${leaderId}"`);
|
||||||
|
|
||||||
this.leaderId = leaderId;
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async tryBecomeLeader() {
|
private async tryBecomeLeader() {
|
||||||
if (this.isLeader || !this.redisPublisher.redisClient) return;
|
if (
|
||||||
|
config.getEnv('multiMainSetup.instanceType') === 'leader' ||
|
||||||
|
!this.redisPublisher.redisClient
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// this can only succeed if leadership is currently vacant
|
// this can only succeed if leadership is currently vacant
|
||||||
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
|
const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id);
|
||||||
|
@ -82,11 +98,36 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||||
if (keySetSuccessfully) {
|
if (keySetSuccessfully) {
|
||||||
this.logger.debug(`Leader is now this instance "${this.id}"`);
|
this.logger.debug(`Leader is now this instance "${this.id}"`);
|
||||||
|
|
||||||
this.leaderId = this.id;
|
config.set('multiMainSetup.instanceType', 'leader');
|
||||||
|
|
||||||
this.emit('leadershipChange', this.id);
|
|
||||||
|
|
||||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||||
|
|
||||||
|
this.emit('leadershipChange', this.id);
|
||||||
|
} else {
|
||||||
|
config.set('multiMainSetup.instanceType', 'follower');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async broadcastWorkflowActiveStateChanged(payload: {
|
||||||
|
workflowId: string;
|
||||||
|
oldState: boolean;
|
||||||
|
newState: boolean;
|
||||||
|
versionId: string;
|
||||||
|
}) {
|
||||||
|
if (!this.sanityCheck()) return;
|
||||||
|
|
||||||
|
await this.redisPublisher.publishToCommandChannel({
|
||||||
|
command: 'workflowActiveStateChanged',
|
||||||
|
payload,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) {
|
||||||
|
if (!this.sanityCheck()) return;
|
||||||
|
|
||||||
|
await this.redisPublisher.publishToCommandChannel({
|
||||||
|
command: 'workflowFailedToActivate',
|
||||||
|
payload,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -6,13 +6,13 @@ import { OrchestrationService } from '@/services/orchestration.base.service';
|
||||||
* For use in main instance, in single main instance scenario.
|
* For use in main instance, in single main instance scenario.
|
||||||
*/
|
*/
|
||||||
@Service()
|
@Service()
|
||||||
export class SingleMainInstancePublisher extends OrchestrationService {
|
export class SingleMainSetup extends OrchestrationService {
|
||||||
constructor(protected readonly logger: Logger) {
|
constructor(protected readonly logger: Logger) {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
sanityCheck() {
|
sanityCheck() {
|
||||||
return this.initialized && this.isQueueMode && this.isMainInstance;
|
return this.isInitialized && this.isQueueMode && this.isMainInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
async getWorkerStatus(id?: string) {
|
async getWorkerStatus(id?: string) {
|
|
@ -5,12 +5,17 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
|
||||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||||
import { License } from '@/License';
|
import { License } from '@/License';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||||
|
import { Push } from '@/push';
|
||||||
|
import { MultiMainSetup } from './MultiMainSetup.ee';
|
||||||
|
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
|
||||||
|
|
||||||
export async function handleCommandMessageMain(messageString: string) {
|
export async function handleCommandMessageMain(messageString: string) {
|
||||||
const queueModeId = config.get('redis.queueModeId');
|
const queueModeId = config.getEnv('redis.queueModeId');
|
||||||
const isMainInstance = config.get('generic.instanceType') === 'main';
|
const isMainInstance = config.getEnv('generic.instanceType') === 'main';
|
||||||
const message = messageToRedisServiceCommandObject(messageString);
|
const message = messageToRedisServiceCommandObject(messageString);
|
||||||
const logger = Container.get(Logger);
|
const logger = Container.get(Logger);
|
||||||
|
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
||||||
|
|
||||||
if (message) {
|
if (message) {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
@ -35,7 +40,7 @@ export async function handleCommandMessageMain(messageString: string) {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isMainInstance && !config.getEnv('leaderSelection.enabled')) {
|
if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) {
|
||||||
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
|
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
|
||||||
logger.error(
|
logger.error(
|
||||||
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
|
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
|
||||||
|
@ -60,6 +65,68 @@ export async function handleCommandMessageMain(messageString: string) {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
await Container.get(ExternalSecretsManager).reloadAllProviders();
|
await Container.get(ExternalSecretsManager).reloadAllProviders();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 'workflowActiveStateChanged': {
|
||||||
|
if (!debounceMessageReceiver(message, 100)) {
|
||||||
|
message.payload = { result: 'debounced' };
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { workflowId, oldState, newState, versionId } = message.payload ?? {};
|
||||||
|
|
||||||
|
if (
|
||||||
|
typeof workflowId !== 'string' ||
|
||||||
|
typeof oldState !== 'boolean' ||
|
||||||
|
typeof newState !== 'boolean' ||
|
||||||
|
typeof versionId !== 'string'
|
||||||
|
) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const push = Container.get(Push);
|
||||||
|
|
||||||
|
if (!oldState && newState) {
|
||||||
|
try {
|
||||||
|
await activeWorkflowRunner.add(workflowId, 'activate');
|
||||||
|
push.broadcast('workflowActivated', { workflowId });
|
||||||
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
|
await Container.get(WorkflowRepository).update(workflowId, {
|
||||||
|
active: false,
|
||||||
|
versionId,
|
||||||
|
});
|
||||||
|
|
||||||
|
await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({
|
||||||
|
workflowId,
|
||||||
|
errorMessage: error.message,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else if (oldState && !newState) {
|
||||||
|
await activeWorkflowRunner.remove(workflowId);
|
||||||
|
push.broadcast('workflowDeactivated', { workflowId });
|
||||||
|
} else {
|
||||||
|
await activeWorkflowRunner.remove(workflowId);
|
||||||
|
await activeWorkflowRunner.add(workflowId, 'update');
|
||||||
|
}
|
||||||
|
|
||||||
|
await activeWorkflowRunner.removeActivationError(workflowId);
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'workflowFailedToActivate': {
|
||||||
|
if (!debounceMessageReceiver(message, 100)) {
|
||||||
|
message.payload = { result: 'debounced' };
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { workflowId, errorMessage } = message.payload ?? {};
|
||||||
|
|
||||||
|
if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break;
|
||||||
|
|
||||||
|
Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage });
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,6 @@ import { OrchestrationService } from '../../orchestration.base.service';
|
||||||
@Service()
|
@Service()
|
||||||
export class OrchestrationWebhookService extends OrchestrationService {
|
export class OrchestrationWebhookService extends OrchestrationService {
|
||||||
sanityCheck(): boolean {
|
sanityCheck(): boolean {
|
||||||
return this.initialized && this.isQueueMode && this.isWebhookInstance;
|
return this.isInitialized && this.isQueueMode && this.isWebhookInstance;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import { OrchestrationService } from '../../orchestration.base.service';
|
||||||
@Service()
|
@Service()
|
||||||
export class OrchestrationWorkerService extends OrchestrationService {
|
export class OrchestrationWorkerService extends OrchestrationService {
|
||||||
sanityCheck(): boolean {
|
sanityCheck(): boolean {
|
||||||
return this.initialized && this.isQueueMode && this.isWorkerInstance;
|
return this.isInitialized && this.isQueueMode && this.isWorkerInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishToEventLog(message: AbstractEventMessage) {
|
async publishToEventLog(message: AbstractEventMessage) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import Container, { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
|
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
|
@ -23,16 +23,13 @@ export class PruningService {
|
||||||
|
|
||||||
public hardDeletionTimeout: NodeJS.Timeout | undefined;
|
public hardDeletionTimeout: NodeJS.Timeout | undefined;
|
||||||
|
|
||||||
private isMultiMainScenario =
|
|
||||||
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly executionRepository: ExecutionRepository,
|
private readonly executionRepository: ExecutionRepository,
|
||||||
private readonly binaryDataService: BinaryDataService,
|
private readonly binaryDataService: BinaryDataService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async isPruningEnabled() {
|
isPruningEnabled() {
|
||||||
if (
|
if (
|
||||||
!config.getEnv('executions.pruneData') ||
|
!config.getEnv('executions.pruneData') ||
|
||||||
inTest ||
|
inTest ||
|
||||||
|
@ -41,75 +38,60 @@ export class PruningService {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isMultiMainScenario) {
|
if (
|
||||||
const { MultiMainInstancePublisher } = await import(
|
config.getEnv('multiMainSetup.enabled') &&
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
config.getEnv('multiMainSetup.instanceType') === 'follower'
|
||||||
);
|
) {
|
||||||
|
return false;
|
||||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await multiMainInstancePublisher.init();
|
|
||||||
|
|
||||||
return multiMainInstancePublisher.isLeader;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @important Call only after DB connection is established and migrations have completed.
|
* @important Call this method only after DB migrations have completed.
|
||||||
*/
|
*/
|
||||||
startPruning() {
|
startPruning() {
|
||||||
|
this.logger.debug('[Pruning] Starting soft-deletion and hard-deletion timers');
|
||||||
|
|
||||||
this.setSoftDeletionInterval();
|
this.setSoftDeletionInterval();
|
||||||
this.scheduleHardDeletion();
|
this.scheduleHardDeletion();
|
||||||
}
|
}
|
||||||
|
|
||||||
async stopPruning() {
|
stopPruning() {
|
||||||
if (this.isMultiMainScenario) {
|
this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers');
|
||||||
const { MultiMainInstancePublisher } = await import(
|
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
|
||||||
);
|
|
||||||
|
|
||||||
const multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
await multiMainInstancePublisher.init();
|
|
||||||
|
|
||||||
if (multiMainInstancePublisher.isFollower) return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
|
|
||||||
|
|
||||||
clearInterval(this.softDeletionInterval);
|
clearInterval(this.softDeletionInterval);
|
||||||
clearTimeout(this.hardDeletionTimeout);
|
clearTimeout(this.hardDeletionTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
||||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
|
||||||
|
|
||||||
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
|
|
||||||
|
|
||||||
this.softDeletionInterval = setInterval(
|
this.softDeletionInterval = setInterval(
|
||||||
async () => this.softDeleteOnPruningCycle(),
|
async () => this.softDeleteOnPruningCycle(),
|
||||||
this.rates.softDeletion,
|
this.rates.softDeletion,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.logger.debug(`[Pruning] Soft-deletion scheduled every ${when}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
||||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
|
||||||
|
|
||||||
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
|
|
||||||
|
|
||||||
this.hardDeletionTimeout = setTimeout(
|
this.hardDeletionTimeout = setTimeout(
|
||||||
async () => this.hardDeleteOnPruningCycle(),
|
async () => this.hardDeleteOnPruningCycle(),
|
||||||
this.rates.hardDeletion,
|
this.rates.hardDeletion,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.logger.debug(`[Pruning] Hard-deletion scheduled for next ${when}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark executions as deleted based on age and count, in a pruning cycle.
|
* Mark executions as deleted based on age and count, in a pruning cycle.
|
||||||
*/
|
*/
|
||||||
async softDeleteOnPruningCycle() {
|
async softDeleteOnPruningCycle() {
|
||||||
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
|
this.logger.debug('[Pruning] Starting soft-deletion of executions');
|
||||||
|
|
||||||
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
||||||
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
||||||
|
@ -157,8 +139,11 @@ export class PruningService {
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
if (result.affected === 0) {
|
if (result.affected === 0) {
|
||||||
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
|
this.logger.debug('[Pruning] Found no executions to soft-delete');
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.logger.debug('[Pruning] Soft-deleted executions', { count: result.affected });
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -187,21 +172,23 @@ export class PruningService {
|
||||||
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
||||||
|
|
||||||
if (executionIds.length === 0) {
|
if (executionIds.length === 0) {
|
||||||
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
|
this.logger.debug('[Pruning] Found no executions to hard-delete');
|
||||||
this.scheduleHardDeletion();
|
this.scheduleHardDeletion();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
|
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
|
||||||
executionIds,
|
executionIds,
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
||||||
|
|
||||||
await this.executionRepository.delete({ id: In(executionIds) });
|
await this.executionRepository.delete({ id: In(executionIds) });
|
||||||
|
|
||||||
|
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
|
this.logger.error('[Pruning] Failed to hard-delete executions', {
|
||||||
executionIds,
|
executionIds,
|
||||||
error: error instanceof Error ? error.message : `${error}`,
|
error: error instanceof Error ? error.message : `${error}`,
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,7 +6,9 @@ export type RedisServiceCommand =
|
||||||
| 'restartEventBus'
|
| 'restartEventBus'
|
||||||
| 'stopWorker'
|
| 'stopWorker'
|
||||||
| 'reloadLicense'
|
| 'reloadLicense'
|
||||||
| 'reloadExternalSecretsProviders';
|
| 'reloadExternalSecretsProviders'
|
||||||
|
| 'workflowActiveStateChanged' // multi-main only
|
||||||
|
| 'workflowFailedToActivate'; // multi-main only
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An object to be sent via Redis pub/sub from the main process to the workers.
|
* An object to be sent via Redis pub/sub from the main process to the workers.
|
||||||
|
@ -50,6 +52,14 @@ export type RedisServiceWorkerResponseObject = {
|
||||||
| {
|
| {
|
||||||
command: 'stopWorker';
|
command: 'stopWorker';
|
||||||
}
|
}
|
||||||
|
| {
|
||||||
|
command: 'workflowActiveStateChanged';
|
||||||
|
payload: {
|
||||||
|
oldState: boolean;
|
||||||
|
newState: boolean;
|
||||||
|
workflowId: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
export type RedisServiceCommandObject = {
|
export type RedisServiceCommandObject = {
|
||||||
|
|
|
@ -30,6 +30,7 @@ import { isStringArray, isWorkflowIdValid } from '@/utils';
|
||||||
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
|
import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
|
||||||
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
|
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
|
||||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
|
@ -212,6 +213,8 @@ export class WorkflowsService {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const oldState = shared.workflow.active;
|
||||||
|
|
||||||
if (
|
if (
|
||||||
!forceSave &&
|
!forceSave &&
|
||||||
workflow.versionId !== '' &&
|
workflow.versionId !== '' &&
|
||||||
|
@ -255,9 +258,14 @@ export class WorkflowsService {
|
||||||
|
|
||||||
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
|
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the workflow being updated is stored as `active`, remove it from
|
||||||
|
* active workflows in memory, and re-add it after the update.
|
||||||
|
*
|
||||||
|
* If a trigger or poller in the workflow was updated, the new value
|
||||||
|
* will take effect only on removing and re-adding.
|
||||||
|
*/
|
||||||
if (shared.workflow.active) {
|
if (shared.workflow.active) {
|
||||||
// When workflow gets saved always remove it as the triggers could have been
|
|
||||||
// changed and so the changes would not take effect
|
|
||||||
await Container.get(ActiveWorkflowRunner).remove(workflowId);
|
await Container.get(ActiveWorkflowRunner).remove(workflowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,6 +372,21 @@ export class WorkflowsService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) {
|
||||||
|
const multiMainSetup = Container.get(MultiMainSetup);
|
||||||
|
|
||||||
|
await multiMainSetup.init();
|
||||||
|
|
||||||
|
if (multiMainSetup.isEnabled) {
|
||||||
|
await Container.get(MultiMainSetup).broadcastWorkflowActiveStateChanged({
|
||||||
|
workflowId,
|
||||||
|
oldState,
|
||||||
|
newState: updatedWorkflow.active,
|
||||||
|
versionId: shared.workflow.versionId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return updatedWorkflow;
|
return updatedWorkflow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,9 @@ import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
import type { User } from '@db/entities/User';
|
import type { User } from '@db/entities/User';
|
||||||
import type { WebhookEntity } from '@db/entities/WebhookEntity';
|
import type { WebhookEntity } from '@db/entities/WebhookEntity';
|
||||||
import { NodeTypes } from '@/NodeTypes';
|
import { NodeTypes } from '@/NodeTypes';
|
||||||
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
|
|
||||||
|
|
||||||
import { mockInstance } from '../shared/mocking';
|
|
||||||
import { chooseRandomly } from './shared/random';
|
import { chooseRandomly } from './shared/random';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
import { mockInstance } from '../shared/mocking';
|
||||||
import { setSchedulerAsLoadedNode } from './shared/utils';
|
import { setSchedulerAsLoadedNode } from './shared/utils';
|
||||||
import * as testDb from './shared/testDb';
|
import * as testDb from './shared/testDb';
|
||||||
import { createOwner } from './shared/db/users';
|
import { createOwner } from './shared/db/users';
|
||||||
|
@ -30,9 +29,13 @@ mockInstance(ActiveExecutions);
|
||||||
mockInstance(ActiveWorkflows);
|
mockInstance(ActiveWorkflows);
|
||||||
mockInstance(Push);
|
mockInstance(Push);
|
||||||
mockInstance(SecretsHelper);
|
mockInstance(SecretsHelper);
|
||||||
mockInstance(MultiMainInstancePublisher);
|
|
||||||
|
|
||||||
const webhookService = mockInstance(WebhookService);
|
const webhookService = mockInstance(WebhookService);
|
||||||
|
const multiMainSetup = mockInstance(MultiMainSetup, {
|
||||||
|
isEnabled: false,
|
||||||
|
isLeader: false,
|
||||||
|
isFollower: false,
|
||||||
|
});
|
||||||
|
|
||||||
setSchedulerAsLoadedNode();
|
setSchedulerAsLoadedNode();
|
||||||
|
|
||||||
|
@ -230,7 +233,7 @@ describe('executeErrorWorkflow()', () => {
|
||||||
|
|
||||||
describe('add()', () => {
|
describe('add()', () => {
|
||||||
describe('in single-main scenario', () => {
|
describe('in single-main scenario', () => {
|
||||||
test('leader should add webhooks, triggers and pollers', async () => {
|
test('should add webhooks, triggers and pollers', async () => {
|
||||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
const workflow = await createWorkflow({ active: true }, owner);
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
@ -252,17 +255,20 @@ describe('add()', () => {
|
||||||
|
|
||||||
describe('in multi-main scenario', () => {
|
describe('in multi-main scenario', () => {
|
||||||
describe('leader', () => {
|
describe('leader', () => {
|
||||||
test('on regular activation mode, leader should add webhooks only', async () => {
|
describe('on non-leadership-change activation mode', () => {
|
||||||
|
test('should add webhooks only', async () => {
|
||||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
|
||||||
|
|
||||||
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
|
||||||
|
|
||||||
const workflow = await createWorkflow({ active: true }, owner);
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||||
|
jest.replaceProperty(multiMainSetup, 'isLeader', true);
|
||||||
|
|
||||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
const addTriggersAndPollersSpy = jest.spyOn(
|
||||||
|
activeWorkflowRunner,
|
||||||
|
'addTriggersAndPollers',
|
||||||
|
);
|
||||||
|
|
||||||
await activeWorkflowRunner.init();
|
await activeWorkflowRunner.init();
|
||||||
addWebhooksSpy.mockReset();
|
addWebhooksSpy.mockReset();
|
||||||
|
@ -273,18 +279,22 @@ describe('add()', () => {
|
||||||
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
|
describe('on leadership change activation mode', () => {
|
||||||
|
test('should add triggers and pollers only', async () => {
|
||||||
const mode = 'leadershipChange';
|
const mode = 'leadershipChange';
|
||||||
|
|
||||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||||
|
jest.replaceProperty(multiMainSetup, 'isLeader', true);
|
||||||
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
|
||||||
|
|
||||||
const workflow = await createWorkflow({ active: true }, owner);
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
const addTriggersAndPollersSpy = jest.spyOn(
|
||||||
|
activeWorkflowRunner,
|
||||||
|
'addTriggersAndPollers',
|
||||||
|
);
|
||||||
|
|
||||||
await activeWorkflowRunner.init();
|
await activeWorkflowRunner.init();
|
||||||
addWebhooksSpy.mockReset();
|
addWebhooksSpy.mockReset();
|
||||||
|
@ -296,19 +306,23 @@ describe('add()', () => {
|
||||||
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('follower', () => {
|
describe('follower', () => {
|
||||||
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
|
describe('on any activation mode', () => {
|
||||||
|
test('should not add webhooks, triggers or pollers', async () => {
|
||||||
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
jest.replaceProperty(multiMainSetup, 'isEnabled', true);
|
||||||
|
jest.replaceProperty(multiMainSetup, 'isLeader', false);
|
||||||
mockInstance(MultiMainInstancePublisher, { isLeader: false });
|
|
||||||
|
|
||||||
const workflow = await createWorkflow({ active: true }, owner);
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
const addTriggersAndPollersSpy = jest.spyOn(
|
||||||
|
activeWorkflowRunner,
|
||||||
|
'addTriggersAndPollers',
|
||||||
|
);
|
||||||
|
|
||||||
await activeWorkflowRunner.init();
|
await activeWorkflowRunner.init();
|
||||||
addWebhooksSpy.mockReset();
|
addWebhooksSpy.mockReset();
|
||||||
|
@ -321,6 +335,7 @@ describe('add()', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('addWebhooks()', () => {
|
describe('addWebhooks()', () => {
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
import * as Config from '@oclif/config';
|
|
||||||
import { DataSource } from 'typeorm';
|
|
||||||
|
|
||||||
import { Start } from '@/commands/start';
|
|
||||||
import { BaseCommand } from '@/commands/BaseCommand';
|
|
||||||
import config from '@/config';
|
|
||||||
import { License } from '@/License';
|
|
||||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
|
||||||
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
|
|
||||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
|
||||||
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
|
|
||||||
import { RedisService } from '@/services/redis.service';
|
|
||||||
import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
|
|
||||||
import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
|
|
||||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
|
||||||
|
|
||||||
import { mockInstance } from '../../shared/mocking';
|
|
||||||
|
|
||||||
const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname });
|
|
||||||
|
|
||||||
beforeAll(() => {
|
|
||||||
mockInstance(DataSource);
|
|
||||||
mockInstance(ExternalSecretsManager);
|
|
||||||
mockInstance(ActiveWorkflowRunner);
|
|
||||||
mockInstance(WorkflowHistoryManager);
|
|
||||||
mockInstance(RedisService);
|
|
||||||
mockInstance(RedisServicePubSubPublisher);
|
|
||||||
mockInstance(RedisServicePubSubSubscriber);
|
|
||||||
mockInstance(MultiMainInstancePublisher);
|
|
||||||
mockInstance(OrchestrationHandlerMainService);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
config.load(config.default);
|
|
||||||
jest.restoreAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('should not init license if instance is follower in multi-main scenario', async () => {
|
|
||||||
config.set('executions.mode', 'queue');
|
|
||||||
config.set('endpoints.disableUi', true);
|
|
||||||
config.set('leaderSelection.enabled', true);
|
|
||||||
|
|
||||||
jest.spyOn(MultiMainInstancePublisher.prototype, 'isFollower', 'get').mockReturnValue(true);
|
|
||||||
jest.spyOn(BaseCommand.prototype, 'init').mockImplementation(async () => {});
|
|
||||||
|
|
||||||
const licenseMock = mockInstance(License, {
|
|
||||||
isMultipleMainInstancesLicensed: jest.fn().mockReturnValue(true),
|
|
||||||
});
|
|
||||||
|
|
||||||
const startCmd = new Start([], oclifConfig);
|
|
||||||
|
|
||||||
await startCmd.init();
|
|
||||||
|
|
||||||
expect(licenseMock.init).not.toHaveBeenCalled();
|
|
||||||
});
|
|
|
@ -16,6 +16,7 @@ import { PostHogClient } from '@/posthog';
|
||||||
import { RedisService } from '@/services/redis.service';
|
import { RedisService } from '@/services/redis.service';
|
||||||
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
|
||||||
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
|
||||||
import { mockInstance } from '../../shared/mocking';
|
import { mockInstance } from '../../shared/mocking';
|
||||||
|
|
||||||
|
@ -37,6 +38,7 @@ beforeAll(async () => {
|
||||||
mockInstance(RedisService);
|
mockInstance(RedisService);
|
||||||
mockInstance(RedisServicePubSubPublisher);
|
mockInstance(RedisServicePubSubPublisher);
|
||||||
mockInstance(RedisServicePubSubSubscriber);
|
mockInstance(RedisServicePubSubSubscriber);
|
||||||
|
mockInstance(MultiMainSetup);
|
||||||
});
|
});
|
||||||
|
|
||||||
test('worker initializes all its components', async () => {
|
test('worker initializes all its components', async () => {
|
||||||
|
|
|
@ -16,6 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants';
|
||||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||||
import { SettingsRepository } from '@db/repositories/settings.repository';
|
import { SettingsRepository } from '@db/repositories/settings.repository';
|
||||||
import { mockNodeTypesData } from '../../../unit/Helpers';
|
import { mockNodeTypesData } from '../../../unit/Helpers';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
import { mockInstance } from '../../../shared/mocking';
|
import { mockInstance } from '../../../shared/mocking';
|
||||||
|
|
||||||
export { setupTestServer } from './testServer';
|
export { setupTestServer } from './testServer';
|
||||||
|
@ -28,6 +29,8 @@ export { setupTestServer } from './testServer';
|
||||||
* Initialize node types.
|
* Initialize node types.
|
||||||
*/
|
*/
|
||||||
export async function initActiveWorkflowRunner() {
|
export async function initActiveWorkflowRunner() {
|
||||||
|
mockInstance(MultiMainSetup);
|
||||||
|
|
||||||
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
|
const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner');
|
||||||
const workflowRunner = Container.get(ActiveWorkflowRunner);
|
const workflowRunner = Container.get(ActiveWorkflowRunner);
|
||||||
await workflowRunner.init();
|
await workflowRunner.init();
|
||||||
|
|
62
packages/cli/test/integration/workflow.service.test.ts
Normal file
62
packages/cli/test/integration/workflow.service.test.ts
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||||
|
import * as testDb from './shared/testDb';
|
||||||
|
import { WorkflowsService } from '@/workflows/workflows.services';
|
||||||
|
import { mockInstance } from '../shared/mocking';
|
||||||
|
import { Telemetry } from '@/telemetry';
|
||||||
|
import { createOwner } from './shared/db/users';
|
||||||
|
import { createWorkflow } from './shared/db/workflows';
|
||||||
|
|
||||||
|
mockInstance(Telemetry);
|
||||||
|
|
||||||
|
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
await testDb.init();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await testDb.truncate(['Workflow']);
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await testDb.terminate();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('update()', () => {
|
||||||
|
test('should remove and re-add to active workflows on `active: true` payload', async () => {
|
||||||
|
const owner = await createOwner();
|
||||||
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
|
||||||
|
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
|
||||||
|
|
||||||
|
await WorkflowsService.update(owner, workflow, workflow.id);
|
||||||
|
|
||||||
|
expect(removeSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const [removedWorkflowId] = removeSpy.mock.calls[0];
|
||||||
|
expect(removedWorkflowId).toBe(workflow.id);
|
||||||
|
|
||||||
|
expect(addSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const [addedWorkflowId, activationMode] = addSpy.mock.calls[0];
|
||||||
|
expect(addedWorkflowId).toBe(workflow.id);
|
||||||
|
expect(activationMode).toBe('update');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should remove from active workflows on `active: false` payload', async () => {
|
||||||
|
const owner = await createOwner();
|
||||||
|
const workflow = await createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const removeSpy = jest.spyOn(activeWorkflowRunner, 'remove');
|
||||||
|
const addSpy = jest.spyOn(activeWorkflowRunner, 'add');
|
||||||
|
|
||||||
|
workflow.active = false;
|
||||||
|
await WorkflowsService.update(owner, workflow, workflow.id);
|
||||||
|
|
||||||
|
expect(removeSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const [removedWorkflowId] = removeSpy.mock.calls[0];
|
||||||
|
expect(removedWorkflowId).toBe(workflow.id);
|
||||||
|
|
||||||
|
expect(addSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
|
@ -6,6 +6,7 @@ import { License } from '@/License';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import { N8N_VERSION } from '@/constants';
|
import { N8N_VERSION } from '@/constants';
|
||||||
import { mockInstance } from '../shared/mocking';
|
import { mockInstance } from '../shared/mocking';
|
||||||
|
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
|
||||||
|
|
||||||
jest.mock('@n8n_io/license-sdk');
|
jest.mock('@n8n_io/license-sdk');
|
||||||
|
|
||||||
|
@ -27,9 +28,10 @@ describe('License', () => {
|
||||||
let license: License;
|
let license: License;
|
||||||
const logger = mockInstance(Logger);
|
const logger = mockInstance(Logger);
|
||||||
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
|
const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID });
|
||||||
|
const multiMainSetup = mockInstance(MultiMainSetup);
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
license = new License(logger, instanceSettings, mock(), mock());
|
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
||||||
await license.init();
|
await license.init();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -52,7 +54,7 @@ describe('License', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
test('initializes license manager for worker', async () => {
|
test('initializes license manager for worker', async () => {
|
||||||
license = new License(logger, instanceSettings, mock(), mock());
|
license = new License(logger, instanceSettings, mock(), mock(), mock());
|
||||||
await license.init('worker');
|
await license.init('worker');
|
||||||
expect(LicenseManager).toHaveBeenCalledWith({
|
expect(LicenseManager).toHaveBeenCalledWith({
|
||||||
autoRenewEnabled: false,
|
autoRenewEnabled: false,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup';
|
||||||
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands';
|
||||||
import { eventBus } from '@/eventbus';
|
import { eventBus } from '@/eventbus';
|
||||||
import { RedisService } from '@/services/redis.service';
|
import { RedisService } from '@/services/redis.service';
|
||||||
|
@ -10,10 +10,13 @@ import { OrchestrationHandlerMainService } from '@/services/orchestration/main/o
|
||||||
import * as helpers from '@/services/orchestration/helpers';
|
import * as helpers from '@/services/orchestration/helpers';
|
||||||
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
import { Push } from '@/push';
|
||||||
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||||
import { mockInstance } from '../../shared/mocking';
|
import { mockInstance } from '../../shared/mocking';
|
||||||
|
|
||||||
const os = Container.get(SingleMainInstancePublisher);
|
const os = Container.get(SingleMainSetup);
|
||||||
const handler = Container.get(OrchestrationHandlerMainService);
|
const handler = Container.get(OrchestrationHandlerMainService);
|
||||||
|
mockInstance(ActiveWorkflowRunner);
|
||||||
|
|
||||||
let queueModeId: string;
|
let queueModeId: string;
|
||||||
|
|
||||||
|
@ -33,6 +36,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = {
|
||||||
|
|
||||||
describe('Orchestration Service', () => {
|
describe('Orchestration Service', () => {
|
||||||
const logger = mockInstance(Logger);
|
const logger = mockInstance(Logger);
|
||||||
|
mockInstance(Push);
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
mockInstance(RedisService);
|
mockInstance(RedisService);
|
||||||
mockInstance(ExternalSecretsManager);
|
mockInstance(ExternalSecretsManager);
|
||||||
|
|
|
@ -421,7 +421,25 @@ export type IPushData =
|
||||||
| PushDataRemoveNodeType
|
| PushDataRemoveNodeType
|
||||||
| PushDataTestWebhook
|
| PushDataTestWebhook
|
||||||
| PushDataExecutionRecovered
|
| PushDataExecutionRecovered
|
||||||
| PushDataWorkerStatusMessage;
|
| PushDataWorkerStatusMessage
|
||||||
|
| PushDataActiveWorkflowAdded
|
||||||
|
| PushDataActiveWorkflowRemoved
|
||||||
|
| PushDataWorkflowFailedToActivate;
|
||||||
|
|
||||||
|
type PushDataActiveWorkflowAdded = {
|
||||||
|
data: IActiveWorkflowAdded;
|
||||||
|
type: 'workflowActivated';
|
||||||
|
};
|
||||||
|
|
||||||
|
type PushDataActiveWorkflowRemoved = {
|
||||||
|
data: IActiveWorkflowRemoved;
|
||||||
|
type: 'workflowDeactivated';
|
||||||
|
};
|
||||||
|
|
||||||
|
type PushDataWorkflowFailedToActivate = {
|
||||||
|
data: IWorkflowFailedToActivate;
|
||||||
|
type: 'workflowFailedToActivate';
|
||||||
|
};
|
||||||
|
|
||||||
type PushDataExecutionRecovered = {
|
type PushDataExecutionRecovered = {
|
||||||
data: IPushDataExecutionRecovered;
|
data: IPushDataExecutionRecovered;
|
||||||
|
@ -491,6 +509,19 @@ export interface IPushDataExecutionFinished {
|
||||||
retryOf?: string;
|
retryOf?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IActiveWorkflowAdded {
|
||||||
|
workflowId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IActiveWorkflowRemoved {
|
||||||
|
workflowId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IWorkflowFailedToActivate {
|
||||||
|
workflowId: string;
|
||||||
|
errorMessage: string;
|
||||||
|
}
|
||||||
|
|
||||||
export interface IPushDataUnsavedExecutionFinished {
|
export interface IPushDataUnsavedExecutionFinished {
|
||||||
executionId: string;
|
executionId: string;
|
||||||
data: { finished: true; stoppedAt: Date };
|
data: { finished: true; stoppedAt: Date };
|
||||||
|
|
|
@ -119,7 +119,7 @@ export default defineComponent({
|
||||||
} else {
|
} else {
|
||||||
errorMessage = this.$locale.baseText(
|
errorMessage = this.$locale.baseText(
|
||||||
'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined',
|
'workflowActivator.showMessage.displayActivationError.message.errorDataNotUndefined',
|
||||||
{ interpolate: { message: errorData.error.message } },
|
{ interpolate: { message: errorData } },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|
|
@ -291,6 +291,33 @@ export const pushConnection = defineComponent({
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
receivedData.type === 'workflowFailedToActivate' &&
|
||||||
|
this.workflowsStore.workflowId === receivedData.data.workflowId
|
||||||
|
) {
|
||||||
|
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
|
||||||
|
this.workflowsStore.setActive(false);
|
||||||
|
|
||||||
|
this.showError(
|
||||||
|
new Error(receivedData.data.errorMessage),
|
||||||
|
this.$locale.baseText('workflowActivator.showError.title', {
|
||||||
|
interpolate: { newStateName: 'activated' },
|
||||||
|
}) + ':',
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (receivedData.type === 'workflowActivated') {
|
||||||
|
this.workflowsStore.setWorkflowActive(receivedData.data.workflowId);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (receivedData.type === 'workflowDeactivated') {
|
||||||
|
this.workflowsStore.setWorkflowInactive(receivedData.data.workflowId);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
|
if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') {
|
||||||
// The workflow finished executing
|
// The workflow finished executing
|
||||||
let pushData: IPushDataExecutionFinished;
|
let pushData: IPushDataExecutionFinished;
|
||||||
|
|
|
@ -10,7 +10,6 @@ import {
|
||||||
} from '@/constants';
|
} from '@/constants';
|
||||||
import type {
|
import type {
|
||||||
ExecutionsQueryFilter,
|
ExecutionsQueryFilter,
|
||||||
IActivationError,
|
|
||||||
IExecutionDeleteFilter,
|
IExecutionDeleteFilter,
|
||||||
IExecutionPushResponse,
|
IExecutionPushResponse,
|
||||||
IExecutionResponse,
|
IExecutionResponse,
|
||||||
|
@ -365,7 +364,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
async getActivationError(id: string): Promise<IActivationError | undefined> {
|
async getActivationError(id: string): Promise<string | undefined> {
|
||||||
const rootStore = useRootStore();
|
const rootStore = useRootStore();
|
||||||
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
|
return makeRestApiRequest(rootStore.getRestApiContext, 'GET', `/active/error/${id}`);
|
||||||
},
|
},
|
||||||
|
@ -552,6 +551,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
||||||
if (this.workflowsById[workflowId]) {
|
if (this.workflowsById[workflowId]) {
|
||||||
this.workflowsById[workflowId].active = true;
|
this.workflowsById[workflowId].active = true;
|
||||||
}
|
}
|
||||||
|
if (workflowId === this.workflow.id) {
|
||||||
|
this.setActive(true);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
setWorkflowInactive(workflowId: string): void {
|
setWorkflowInactive(workflowId: string): void {
|
||||||
|
@ -562,6 +564,9 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, {
|
||||||
if (this.workflowsById[workflowId]) {
|
if (this.workflowsById[workflowId]) {
|
||||||
this.workflowsById[workflowId].active = false;
|
this.workflowsById[workflowId].active = false;
|
||||||
}
|
}
|
||||||
|
if (workflowId === this.workflow.id) {
|
||||||
|
this.setActive(false);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
async fetchActiveWorkflows(): Promise<string[]> {
|
async fetchActiveWorkflows(): Promise<string[]> {
|
||||||
|
|
Loading…
Reference in a new issue