mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-13 16:14:07 -08:00
feat(core): Coordinate workflow activation in multiple main scenario in internal API (#7566)
Story: https://linear.app/n8n/issue/PAY-926 This PR coordinates workflow activation on instance startup and on leadership change in multiple main scenario in the internal API. Part 3 on manual workflow activation and deactivation will be a separate PR. ### Part 1: Instance startup In multi-main scenario, on starting an instance... - [x] If the instance is the leader, it should add webhooks, triggers and pollers. - [x] If the instance is the follower, it should not add webhooks, triggers or pollers. - [x] Unit tests. ### Part 2: Leadership change In multi-main scenario, if the main instance leader dies… - [x] The new main instance leader must activate all trigger- and poller-based workflows, excluding webhook-based workflows. - [x] The old main instance leader must deactivate all trigger- and poller-based workflows, excluding webhook-based workflows. - [x] Unit tests. To test, start two instances and check behavior on startup and leadership change: ``` EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug npm run start EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug N8N_PORT=5679 npm run start ```
This commit is contained in:
parent
151e60f829
commit
c857e42677
|
@ -2,7 +2,7 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||||
|
|
||||||
import { Service } from 'typedi';
|
import Container, { Service } from 'typedi';
|
||||||
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
|
@ -35,8 +35,6 @@ import type express from 'express';
|
||||||
|
|
||||||
import * as Db from '@/Db';
|
import * as Db from '@/Db';
|
||||||
import type {
|
import type {
|
||||||
IActivationError,
|
|
||||||
IQueuedWorkflowActivations,
|
|
||||||
IResponseCallbackData,
|
IResponseCallbackData,
|
||||||
IWebhookManager,
|
IWebhookManager,
|
||||||
IWorkflowDb,
|
IWorkflowDb,
|
||||||
|
@ -65,99 +63,76 @@ import { webhookNotFoundErrorMessage } from './utils';
|
||||||
import { In } from 'typeorm';
|
import { In } from 'typeorm';
|
||||||
import { WebhookService } from './services/webhook.service';
|
import { WebhookService } from './services/webhook.service';
|
||||||
import { Logger } from './Logger';
|
import { Logger } from './Logger';
|
||||||
|
import { WorkflowRepository } from '@/databases/repositories';
|
||||||
|
import config from '@/config';
|
||||||
|
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
|
||||||
|
|
||||||
const WEBHOOK_PROD_UNREGISTERED_HINT =
|
const WEBHOOK_PROD_UNREGISTERED_HINT =
|
||||||
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
|
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ActiveWorkflowRunner implements IWebhookManager {
|
export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
private activeWorkflows = new ActiveWorkflows();
|
activeWorkflows = new ActiveWorkflows();
|
||||||
|
|
||||||
private activationErrors: {
|
private activationErrors: {
|
||||||
[key: string]: IActivationError;
|
[workflowId: string]: {
|
||||||
|
time: number; // ms
|
||||||
|
error: {
|
||||||
|
message: string;
|
||||||
|
};
|
||||||
|
};
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
private queuedWorkflowActivations: {
|
private queuedActivations: {
|
||||||
[key: string]: IQueuedWorkflowActivations;
|
[workflowId: string]: {
|
||||||
|
activationMode: WorkflowActivateMode;
|
||||||
|
lastTimeout: number;
|
||||||
|
timeout: NodeJS.Timeout;
|
||||||
|
workflowData: IWorkflowDb;
|
||||||
|
};
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
|
isMultiMainScenario =
|
||||||
|
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
||||||
|
|
||||||
|
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly activeExecutions: ActiveExecutions,
|
private readonly activeExecutions: ActiveExecutions,
|
||||||
private readonly externalHooks: ExternalHooks,
|
private readonly externalHooks: ExternalHooks,
|
||||||
private readonly nodeTypes: NodeTypes,
|
private readonly nodeTypes: NodeTypes,
|
||||||
private readonly webhookService: WebhookService,
|
private readonly webhookService: WebhookService,
|
||||||
|
private readonly workflowRepository: WorkflowRepository,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
// Get the active workflows from database
|
if (this.isMultiMainScenario) {
|
||||||
|
const { MultiMainInstancePublisher } = await import(
|
||||||
// NOTE
|
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||||
// Here I guess we can have a flag on the workflow table like hasTrigger
|
|
||||||
// so instead of pulling all the active webhooks just pull the actives that have a trigger
|
|
||||||
const workflowsData: IWorkflowDb[] = (await Db.collections.Workflow.find({
|
|
||||||
where: { active: true },
|
|
||||||
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
|
|
||||||
})) as IWorkflowDb[];
|
|
||||||
|
|
||||||
if (workflowsData.length !== 0) {
|
|
||||||
this.logger.info(' ================================');
|
|
||||||
this.logger.info(' Start Active Workflows:');
|
|
||||||
this.logger.info(' ================================');
|
|
||||||
|
|
||||||
for (const workflowData of workflowsData) {
|
|
||||||
this.logger.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
|
|
||||||
this.logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, {
|
|
||||||
workflowName: workflowData.name,
|
|
||||||
workflowId: workflowData.id,
|
|
||||||
});
|
|
||||||
try {
|
|
||||||
await this.add(workflowData.id, 'init', workflowData);
|
|
||||||
this.logger.verbose(`Successfully started workflow "${workflowData.name}"`, {
|
|
||||||
workflowName: workflowData.name,
|
|
||||||
workflowId: workflowData.id,
|
|
||||||
});
|
|
||||||
this.logger.info(' => Started');
|
|
||||||
} catch (error) {
|
|
||||||
ErrorReporter.error(error);
|
|
||||||
this.logger.info(
|
|
||||||
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
|
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.info(` ${error.message}`);
|
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
|
||||||
this.logger.error(
|
|
||||||
`Issue on initial workflow activation try "${workflowData.name}" (startup)`,
|
|
||||||
{
|
|
||||||
workflowName: workflowData.name,
|
|
||||||
workflowId: workflowData.id,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
|
||||||
this.executeErrorWorkflow(error, workflowData, 'internal');
|
|
||||||
|
|
||||||
if (!error.message.includes('Authorization')) {
|
await this.multiMainInstancePublisher.init();
|
||||||
// Keep on trying to activate the workflow if not an auth issue
|
|
||||||
this.addQueuedWorkflowActivation('init', workflowData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.logger.verbose('Finished initializing active workflows (startup)');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.addActiveWorkflows('init');
|
||||||
|
|
||||||
await this.externalHooks.run('activeWorkflows.initialized', []);
|
await this.externalHooks.run('activeWorkflows.initialized', []);
|
||||||
await this.webhookService.populateCache();
|
await this.webhookService.populateCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes all the currently active workflows
|
* Removes all the currently active workflows from memory.
|
||||||
*/
|
*/
|
||||||
async removeAll(): Promise<void> {
|
async removeAll() {
|
||||||
let activeWorkflowIds: string[] = [];
|
let activeWorkflowIds: string[] = [];
|
||||||
this.logger.verbose('Call to remove all active workflows received (removeAll)');
|
this.logger.verbose('Call to remove all active workflows received (removeAll)');
|
||||||
|
|
||||||
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
|
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
|
||||||
|
|
||||||
const activeWorkflows = await this.getActiveWorkflows();
|
const activeWorkflows = await this.allActiveInStorage();
|
||||||
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
|
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
|
||||||
// Make sure IDs are unique
|
// Make sure IDs are unique
|
||||||
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
|
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
|
||||||
|
@ -284,76 +259,86 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the ids of the currently active workflows
|
* Returns the ids of the currently active workflows from memory.
|
||||||
*/
|
*/
|
||||||
async getActiveWorkflows(user?: User): Promise<string[]> {
|
allActiveInMemory() {
|
||||||
let activeWorkflows: WorkflowEntity[] = [];
|
return this.activeWorkflows.allActiveWorkflows();
|
||||||
if (!user || user.globalRole.name === 'owner') {
|
}
|
||||||
activeWorkflows = await Db.collections.Workflow.find({
|
|
||||||
|
/**
|
||||||
|
* Get the IDs of active workflows from storage.
|
||||||
|
*/
|
||||||
|
async allActiveInStorage(user?: User) {
|
||||||
|
const isFullAccess = !user || user.globalRole.name === 'owner';
|
||||||
|
|
||||||
|
if (isFullAccess) {
|
||||||
|
const activeWorkflows = await this.workflowRepository.find({
|
||||||
select: ['id'],
|
select: ['id'],
|
||||||
where: { active: true },
|
where: { active: true },
|
||||||
});
|
});
|
||||||
|
|
||||||
return activeWorkflows
|
return activeWorkflows
|
||||||
.map((workflow) => workflow.id)
|
.map((workflow) => workflow.id)
|
||||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
.filter((workflowId) => !this.activationErrors[workflowId]);
|
||||||
} else {
|
}
|
||||||
const active = await Db.collections.Workflow.find({
|
|
||||||
select: ['id'],
|
|
||||||
where: { active: true },
|
|
||||||
});
|
|
||||||
const activeIds = active.map((workflow) => workflow.id);
|
|
||||||
const where = whereClause({
|
const where = whereClause({
|
||||||
user,
|
user,
|
||||||
entityType: 'workflow',
|
entityType: 'workflow',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const activeWorkflows = await this.workflowRepository.find({
|
||||||
|
select: ['id'],
|
||||||
|
where: { active: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
const activeIds = activeWorkflows.map((workflow) => workflow.id);
|
||||||
|
|
||||||
Object.assign(where, { workflowId: In(activeIds) });
|
Object.assign(where, { workflowId: In(activeIds) });
|
||||||
const shared = await Db.collections.SharedWorkflow.find({
|
|
||||||
|
const sharings = await Db.collections.SharedWorkflow.find({
|
||||||
select: ['workflowId'],
|
select: ['workflowId'],
|
||||||
where,
|
where,
|
||||||
});
|
});
|
||||||
return shared
|
|
||||||
.map((id) => id.workflowId)
|
return sharings
|
||||||
|
.map((sharing) => sharing.workflowId)
|
||||||
.filter((workflowId) => !this.activationErrors[workflowId]);
|
.filter((workflowId) => !this.activationErrors[workflowId]);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns if the workflow is active
|
* Returns if the workflow is stored as `active`.
|
||||||
*
|
*
|
||||||
* @param {string} id The id of the workflow to check
|
* @important Do not confuse with `ActiveWorkflows.isActive()`,
|
||||||
|
* which checks if the workflow is active in memory.
|
||||||
*/
|
*/
|
||||||
async isActive(id: string): Promise<boolean> {
|
async isActive(workflowId: string) {
|
||||||
const workflow = await Db.collections.Workflow.findOne({
|
const workflow = await this.workflowRepository.findOne({
|
||||||
select: ['active'],
|
select: ['active'],
|
||||||
where: { id },
|
where: { id: workflowId },
|
||||||
});
|
});
|
||||||
|
|
||||||
return !!workflow?.active;
|
return !!workflow?.active;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return error if there was a problem activating the workflow
|
* Return error if there was a problem activating the workflow
|
||||||
*
|
|
||||||
* @param {string} id The id of the workflow to return the error of
|
|
||||||
*/
|
*/
|
||||||
getActivationError(id: string): IActivationError | undefined {
|
getActivationError(workflowId: string) {
|
||||||
if (this.activationErrors[id] === undefined) {
|
return this.activationErrors[workflowId];
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
return this.activationErrors[id];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds all the webhooks of the workflow
|
* Register workflow-defined webhooks in the `workflow_entity` table.
|
||||||
*/
|
*/
|
||||||
async addWorkflowWebhooks(
|
async addWebhooks(
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
): Promise<void> {
|
) {
|
||||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
|
||||||
let path = '' as string | undefined;
|
let path = '';
|
||||||
|
|
||||||
for (const webhookData of webhooks) {
|
for (const webhookData of webhooks) {
|
||||||
const node = workflow.getNode(webhookData.node) as INode;
|
const node = workflow.getNode(webhookData.node) as INode;
|
||||||
|
@ -401,7 +386,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.removeWorkflowWebhooks(workflow.id);
|
await this.clearWebhooks(workflow.id);
|
||||||
} catch (error1) {
|
} catch (error1) {
|
||||||
ErrorReporter.error(error1);
|
ErrorReporter.error(error1);
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
|
@ -428,14 +413,14 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all the webhooks of the workflow
|
* Clear workflow-defined webhooks from the `webhook_entity` table.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
async removeWorkflowWebhooks(workflowId: string): Promise<void> {
|
async clearWebhooks(workflowId: string) {
|
||||||
const workflowData = await Db.collections.Workflow.findOne({
|
const workflowData = await Db.collections.Workflow.findOne({
|
||||||
where: { id: workflowId },
|
where: { id: workflowId },
|
||||||
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
|
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
|
||||||
});
|
});
|
||||||
|
|
||||||
if (workflowData === null) {
|
if (workflowData === null) {
|
||||||
throw new Error(`Could not find workflow with id "${workflowId}"`);
|
throw new Error(`Could not find workflow with id "${workflowId}"`);
|
||||||
}
|
}
|
||||||
|
@ -468,11 +453,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
await this.webhookService.deleteWorkflowWebhooks(workflowId);
|
await this.webhookService.deleteWorkflowWebhooks(workflowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Runs the given workflow
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
async runWorkflow(
|
async runWorkflow(
|
||||||
workflowData: IWorkflowDb,
|
workflowData: IWorkflowDb,
|
||||||
node: INode,
|
node: INode,
|
||||||
|
@ -520,7 +500,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
/**
|
/**
|
||||||
* Return poll function which gets the global functions from n8n-core
|
* Return poll function which gets the global functions from n8n-core
|
||||||
* and overwrites the emit to be able to start it in subprocess
|
* and overwrites the emit to be able to start it in subprocess
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
getExecutePollFunctions(
|
getExecutePollFunctions(
|
||||||
workflowData: IWorkflowDb,
|
workflowData: IWorkflowDb,
|
||||||
|
@ -576,7 +555,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
/**
|
/**
|
||||||
* Return trigger function which gets the global functions from n8n-core
|
* Return trigger function which gets the global functions from n8n-core
|
||||||
* and overwrites the emit to be able to start it in subprocess
|
* and overwrites the emit to be able to start it in subprocess
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
getExecuteTriggerFunctions(
|
getExecuteTriggerFunctions(
|
||||||
workflowData: IWorkflowDb,
|
workflowData: IWorkflowDb,
|
||||||
|
@ -647,7 +625,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
);
|
);
|
||||||
this.executeErrorWorkflow(activationError, workflowData, mode);
|
this.executeErrorWorkflow(activationError, workflowData, mode);
|
||||||
|
|
||||||
this.addQueuedWorkflowActivation(activation, workflowData);
|
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
|
||||||
};
|
};
|
||||||
return returnFunctions;
|
return returnFunctions;
|
||||||
};
|
};
|
||||||
|
@ -676,113 +654,175 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes a workflow active
|
* Register as active in memory all workflows stored as `active`.
|
||||||
|
*/
|
||||||
|
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
|
||||||
|
const dbWorkflows = await this.workflowRepository.getAllActive();
|
||||||
|
|
||||||
|
if (dbWorkflows.length === 0) return;
|
||||||
|
|
||||||
|
this.logger.info(' ================================');
|
||||||
|
this.logger.info(' Start Active Workflows:');
|
||||||
|
this.logger.info(' ================================');
|
||||||
|
|
||||||
|
for (const dbWorkflow of dbWorkflows) {
|
||||||
|
this.logger.info(` - ${dbWorkflow.display()}`);
|
||||||
|
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
|
||||||
|
workflowName: dbWorkflow.name,
|
||||||
|
workflowId: dbWorkflow.id,
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.add(dbWorkflow.id, activationMode, dbWorkflow);
|
||||||
|
|
||||||
|
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
|
||||||
|
workflowName: dbWorkflow.name,
|
||||||
|
workflowId: dbWorkflow.id,
|
||||||
|
});
|
||||||
|
this.logger.info(' => Started');
|
||||||
|
} catch (error) {
|
||||||
|
ErrorReporter.error(error);
|
||||||
|
this.logger.info(
|
||||||
|
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.info(` ${error.message}`);
|
||||||
|
this.logger.error(
|
||||||
|
`Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`,
|
||||||
|
{
|
||||||
|
workflowName: dbWorkflow.name,
|
||||||
|
workflowId: dbWorkflow.id,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
|
||||||
|
this.executeErrorWorkflow(error, dbWorkflow, 'internal');
|
||||||
|
|
||||||
|
// do not keep trying to activate on authorization error
|
||||||
|
if (error.message.includes('Authorization')) continue;
|
||||||
|
|
||||||
|
this.addQueuedWorkflowActivation('init', dbWorkflow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.verbose('Finished activating workflows (startup)');
|
||||||
|
}
|
||||||
|
|
||||||
|
async addAllTriggerAndPollerBasedWorkflows() {
|
||||||
|
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
|
||||||
|
|
||||||
|
await this.addActiveWorkflows('leadershipChange');
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeAllTriggerAndPollerBasedWorkflows() {
|
||||||
|
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
|
||||||
|
|
||||||
|
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a workflow as active.
|
||||||
*
|
*
|
||||||
* @param {string} workflowId The id of the workflow to activate
|
* An activatable workflow may be webhook-, trigger-, or poller-based:
|
||||||
* @param {IWorkflowDb} [workflowData] If workflowData is given it saves the DB query
|
*
|
||||||
|
* - A `webhook` is an HTTP-based node that can start a workflow when called
|
||||||
|
* by a third-party service.
|
||||||
|
* - A `poller` is an HTTP-based node that can start a workflow when detecting
|
||||||
|
* a change while regularly checking a third-party service.
|
||||||
|
* - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a
|
||||||
|
* time-based node like Schedule Trigger or a message-queue-based node.
|
||||||
|
*
|
||||||
|
* Note that despite the name, most "trigger" nodes are actually webhook-based
|
||||||
|
* and so qualify as `webhook`, e.g. Stripe Trigger.
|
||||||
|
*
|
||||||
|
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
|
||||||
|
* but webhooks are registered by being entered in the `webhook_entity` table,
|
||||||
|
* since webhooks do not require continuous execution.
|
||||||
*/
|
*/
|
||||||
async add(
|
async add(
|
||||||
workflowId: string,
|
workflowId: string,
|
||||||
activation: WorkflowActivateMode,
|
activationMode: WorkflowActivateMode,
|
||||||
workflowData?: IWorkflowDb,
|
existingWorkflow?: WorkflowEntity,
|
||||||
): Promise<void> {
|
) {
|
||||||
let workflowInstance: Workflow;
|
let workflow: Workflow;
|
||||||
try {
|
|
||||||
if (workflowData === undefined) {
|
let shouldAddWebhooks = true;
|
||||||
workflowData = (await Db.collections.Workflow.findOne({
|
let shouldAddTriggersAndPollers = true;
|
||||||
where: { id: workflowId },
|
|
||||||
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
|
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
|
||||||
})) as IWorkflowDb;
|
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
|
||||||
|
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!workflowData) {
|
if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
|
||||||
throw new Error(`Could not find workflow with id "${workflowId}".`);
|
shouldAddWebhooks = false;
|
||||||
|
shouldAddTriggersAndPollers = true;
|
||||||
}
|
}
|
||||||
workflowInstance = new Workflow({
|
|
||||||
id: workflowId,
|
try {
|
||||||
name: workflowData.name,
|
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
|
||||||
nodes: workflowData.nodes,
|
|
||||||
connections: workflowData.connections,
|
if (!dbWorkflow) {
|
||||||
active: workflowData.active,
|
throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
|
||||||
|
}
|
||||||
|
|
||||||
|
workflow = new Workflow({
|
||||||
|
id: dbWorkflow.id,
|
||||||
|
name: dbWorkflow.name,
|
||||||
|
nodes: dbWorkflow.nodes,
|
||||||
|
connections: dbWorkflow.connections,
|
||||||
|
active: dbWorkflow.active,
|
||||||
nodeTypes: this.nodeTypes,
|
nodeTypes: this.nodeTypes,
|
||||||
staticData: workflowData.staticData,
|
staticData: dbWorkflow.staticData,
|
||||||
settings: workflowData.settings,
|
settings: dbWorkflow.settings,
|
||||||
});
|
});
|
||||||
|
|
||||||
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(STARTING_NODES);
|
const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES);
|
||||||
|
|
||||||
if (!canBeActivated) {
|
if (!canBeActivated) {
|
||||||
this.logger.error(`Unable to activate workflow "${workflowData.name}"`);
|
throw new WorkflowActivationError(
|
||||||
throw new Error(
|
`Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
|
||||||
'The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.',
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const mode = 'trigger';
|
const sharing = dbWorkflow.shared.find((shared) => shared.role.name === 'owner');
|
||||||
const workflowOwner = (workflowData as WorkflowEntity).shared.find(
|
|
||||||
(shared) => shared.role.name === 'owner',
|
if (!sharing) {
|
||||||
);
|
throw new WorkflowActivationError(`Workflow ${dbWorkflow.display()} has no owner`);
|
||||||
if (!workflowOwner) {
|
|
||||||
throw new Error('Workflow cannot be activated because it has no owner');
|
|
||||||
}
|
}
|
||||||
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.user.id);
|
|
||||||
const getTriggerFunctions = this.getExecuteTriggerFunctions(
|
|
||||||
workflowData,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
activation,
|
|
||||||
);
|
|
||||||
const getPollFunctions = this.getExecutePollFunctions(
|
|
||||||
workflowData,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
activation,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Add the workflows which have webhooks defined
|
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
|
||||||
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode, activation);
|
|
||||||
|
|
||||||
if (
|
if (shouldAddWebhooks) {
|
||||||
workflowInstance.getTriggerNodes().length !== 0 ||
|
this.logger.debug('============');
|
||||||
workflowInstance.getPollNodes().length !== 0
|
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
|
||||||
) {
|
this.logger.debug('============');
|
||||||
await this.activeWorkflows.add(
|
|
||||||
workflowId,
|
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
|
||||||
workflowInstance,
|
}
|
||||||
|
|
||||||
|
if (shouldAddTriggersAndPollers) {
|
||||||
|
this.logger.debug('============');
|
||||||
|
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
|
||||||
|
this.logger.debug('============');
|
||||||
|
|
||||||
|
await this.addTriggersAndPollers(dbWorkflow, workflow, {
|
||||||
|
activationMode,
|
||||||
|
executionMode: 'trigger',
|
||||||
additionalData,
|
additionalData,
|
||||||
mode,
|
|
||||||
activation,
|
|
||||||
getTriggerFunctions,
|
|
||||||
getPollFunctions,
|
|
||||||
);
|
|
||||||
this.logger.verbose(`Successfully activated workflow "${workflowData.name}"`, {
|
|
||||||
workflowId,
|
|
||||||
workflowName: workflowData.name,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Workflow got now successfully activated so make sure nothing is left in the queue
|
// Workflow got now successfully activated so make sure nothing is left in the queue
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
|
|
||||||
if (this.activationErrors[workflowId] !== undefined) {
|
if (this.activationErrors[workflowId]) {
|
||||||
// If there were activation errors delete them
|
|
||||||
delete this.activationErrors[workflowId];
|
delete this.activationErrors[workflowId];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (workflowInstance.id) {
|
const triggerCount = this.countTriggers(workflow, additionalData);
|
||||||
// Sum all triggers in the workflow, EXCLUDING the manual trigger
|
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
|
||||||
const triggerFilter = (nodeType: INodeType) =>
|
|
||||||
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
|
|
||||||
const triggerCount =
|
|
||||||
workflowInstance.queryNodes(triggerFilter).length +
|
|
||||||
workflowInstance.getPollNodes().length +
|
|
||||||
WebhookHelpers.getWorkflowWebhooks(workflowInstance, additionalData, undefined, true)
|
|
||||||
.length;
|
|
||||||
await WorkflowsService.updateWorkflowTriggerCount(workflowInstance.id, triggerCount);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// There was a problem activating the workflow
|
|
||||||
|
|
||||||
// Save the error
|
|
||||||
this.activationErrors[workflowId] = {
|
this.activationErrors[workflowId] = {
|
||||||
time: new Date().getTime(),
|
time: new Date().getTime(),
|
||||||
error: {
|
error: {
|
||||||
|
@ -795,7 +835,24 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
|
|
||||||
// If for example webhooks get created it sometimes has to save the
|
// If for example webhooks get created it sometimes has to save the
|
||||||
// id of them in the static data. So make sure that data gets persisted.
|
// id of them in the static data. So make sure that data gets persisted.
|
||||||
await WorkflowsService.saveStaticData(workflowInstance!);
|
await WorkflowsService.saveStaticData(workflow);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count all triggers in the workflow, excluding Manual Trigger.
|
||||||
|
*/
|
||||||
|
private countTriggers(
|
||||||
|
workflow: Workflow,
|
||||||
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
|
||||||
|
) {
|
||||||
|
const triggerFilter = (nodeType: INodeType) =>
|
||||||
|
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
|
||||||
|
|
||||||
|
return (
|
||||||
|
workflow.queryNodes(triggerFilter).length +
|
||||||
|
workflow.getPollNodes().length +
|
||||||
|
WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true).length
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -803,10 +860,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
* Meaning it will keep on trying to activate it in regular
|
* Meaning it will keep on trying to activate it in regular
|
||||||
* amounts indefinitely.
|
* amounts indefinitely.
|
||||||
*/
|
*/
|
||||||
addQueuedWorkflowActivation(
|
addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) {
|
||||||
activationMode: WorkflowActivateMode,
|
|
||||||
workflowData: IWorkflowDb,
|
|
||||||
): void {
|
|
||||||
const workflowId = workflowData.id;
|
const workflowId = workflowData.id;
|
||||||
const workflowName = workflowData.name;
|
const workflowName = workflowData.name;
|
||||||
|
|
||||||
|
@ -819,7 +873,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
await this.add(workflowId, activationMode, workflowData);
|
await this.add(workflowId, activationMode, workflowData);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
ErrorReporter.error(error);
|
ErrorReporter.error(error);
|
||||||
let lastTimeout = this.queuedWorkflowActivations[workflowId].lastTimeout;
|
let lastTimeout = this.queuedActivations[workflowId].lastTimeout;
|
||||||
if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) {
|
if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) {
|
||||||
lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT);
|
lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
@ -834,8 +888,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
this.queuedWorkflowActivations[workflowId].lastTimeout = lastTimeout;
|
this.queuedActivations[workflowId].lastTimeout = lastTimeout;
|
||||||
this.queuedWorkflowActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
|
this.queuedActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.logger.info(
|
this.logger.info(
|
||||||
|
@ -851,7 +905,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
// multiple run in parallel
|
// multiple run in parallel
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
|
|
||||||
this.queuedWorkflowActivations[workflowId] = {
|
this.queuedActivations[workflowId] = {
|
||||||
activationMode,
|
activationMode,
|
||||||
lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
|
lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
|
||||||
timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT),
|
timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT),
|
||||||
|
@ -862,18 +916,18 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
/**
|
/**
|
||||||
* Remove a workflow from the activation queue
|
* Remove a workflow from the activation queue
|
||||||
*/
|
*/
|
||||||
removeQueuedWorkflowActivation(workflowId: string): void {
|
removeQueuedWorkflowActivation(workflowId: string) {
|
||||||
if (this.queuedWorkflowActivations[workflowId]) {
|
if (this.queuedActivations[workflowId]) {
|
||||||
clearTimeout(this.queuedWorkflowActivations[workflowId].timeout);
|
clearTimeout(this.queuedActivations[workflowId].timeout);
|
||||||
delete this.queuedWorkflowActivations[workflowId];
|
delete this.queuedActivations[workflowId];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all workflows from the activation queue
|
* Remove all workflows from the activation queue
|
||||||
*/
|
*/
|
||||||
removeAllQueuedWorkflowActivations(): void {
|
removeAllQueuedWorkflowActivations() {
|
||||||
for (const workflowId in this.queuedWorkflowActivations) {
|
for (const workflowId in this.queuedActivations) {
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -884,10 +938,10 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
* @param {string} workflowId The id of the workflow to deactivate
|
* @param {string} workflowId The id of the workflow to deactivate
|
||||||
*/
|
*/
|
||||||
// TODO: this should happen in a transaction
|
// TODO: this should happen in a transaction
|
||||||
async remove(workflowId: string): Promise<void> {
|
async remove(workflowId: string) {
|
||||||
// Remove all the webhooks of the workflow
|
// Remove all the webhooks of the workflow
|
||||||
try {
|
try {
|
||||||
await this.removeWorkflowWebhooks(workflowId);
|
await this.clearWebhooks(workflowId);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
ErrorReporter.error(error);
|
ErrorReporter.error(error);
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
|
@ -900,7 +954,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
delete this.activationErrors[workflowId];
|
delete this.activationErrors[workflowId];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.queuedWorkflowActivations[workflowId] !== undefined) {
|
if (this.queuedActivations[workflowId] !== undefined) {
|
||||||
this.removeQueuedWorkflowActivation(workflowId);
|
this.removeQueuedWorkflowActivation(workflowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -913,4 +967,52 @@ export class ActiveWorkflowRunner implements IWebhookManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register as active in memory a trigger- or poller-based workflow.
|
||||||
|
*/
|
||||||
|
async addTriggersAndPollers(
|
||||||
|
dbWorkflow: WorkflowEntity,
|
||||||
|
workflow: Workflow,
|
||||||
|
{
|
||||||
|
activationMode,
|
||||||
|
executionMode,
|
||||||
|
additionalData,
|
||||||
|
}: {
|
||||||
|
activationMode: WorkflowActivateMode;
|
||||||
|
executionMode: WorkflowExecuteMode;
|
||||||
|
additionalData: IWorkflowExecuteAdditionalDataWorkflow;
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
const getTriggerFunctions = this.getExecuteTriggerFunctions(
|
||||||
|
dbWorkflow,
|
||||||
|
additionalData,
|
||||||
|
executionMode,
|
||||||
|
activationMode,
|
||||||
|
);
|
||||||
|
|
||||||
|
const getPollFunctions = this.getExecutePollFunctions(
|
||||||
|
dbWorkflow,
|
||||||
|
additionalData,
|
||||||
|
executionMode,
|
||||||
|
activationMode,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
|
||||||
|
await this.activeWorkflows.add(
|
||||||
|
workflow.id,
|
||||||
|
workflow,
|
||||||
|
additionalData,
|
||||||
|
executionMode,
|
||||||
|
activationMode,
|
||||||
|
getTriggerFunctions,
|
||||||
|
getPollFunctions,
|
||||||
|
);
|
||||||
|
|
||||||
|
this.logger.verbose(`Workflow ${dbWorkflow.display()} activated`, {
|
||||||
|
workflowId: dbWorkflow.id,
|
||||||
|
workflowName: dbWorkflow.name,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ import type {
|
||||||
IWorkflowBase,
|
IWorkflowBase,
|
||||||
CredentialLoadingDetails,
|
CredentialLoadingDetails,
|
||||||
Workflow,
|
Workflow,
|
||||||
WorkflowActivateMode,
|
|
||||||
WorkflowExecuteMode,
|
WorkflowExecuteMode,
|
||||||
ExecutionStatus,
|
ExecutionStatus,
|
||||||
IExecutionsSummary,
|
IExecutionsSummary,
|
||||||
|
@ -64,20 +63,6 @@ import type {
|
||||||
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
|
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
|
||||||
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
|
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
|
||||||
|
|
||||||
export interface IActivationError {
|
|
||||||
time: number;
|
|
||||||
error: {
|
|
||||||
message: string;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface IQueuedWorkflowActivations {
|
|
||||||
activationMode: WorkflowActivateMode;
|
|
||||||
lastTimeout: number;
|
|
||||||
timeout: NodeJS.Timeout;
|
|
||||||
workflowData: IWorkflowDb;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ICredentialsTypeData {
|
export interface ICredentialsTypeData {
|
||||||
[key: string]: CredentialLoadingDetails;
|
[key: string]: CredentialLoadingDetails;
|
||||||
}
|
}
|
||||||
|
|
|
@ -620,7 +620,7 @@ export class Server extends AbstractServer {
|
||||||
this.app.get(
|
this.app.get(
|
||||||
`/${this.restEndpoint}/active`,
|
`/${this.restEndpoint}/active`,
|
||||||
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
|
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
|
||||||
return this.activeWorkflowRunner.getActiveWorkflows(req.user);
|
return this.activeWorkflowRunner.allActiveInStorage(req.user);
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -119,6 +119,8 @@ export class Start extends BaseCommand {
|
||||||
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
|
|
||||||
await Container.get(MultiMainInstancePublisher).destroy();
|
await Container.get(MultiMainInstancePublisher).destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,6 +253,15 @@ export class Start extends BaseCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
await Container.get(OrchestrationHandlerMainService).init();
|
await Container.get(OrchestrationHandlerMainService).init();
|
||||||
|
|
||||||
|
multiMainInstancePublisher.on('leadershipChange', async () => {
|
||||||
|
if (multiMainInstancePublisher.isLeader) {
|
||||||
|
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
|
||||||
|
} else {
|
||||||
|
// only in case of leadership change without shutdown
|
||||||
|
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async run() {
|
async run() {
|
||||||
|
|
|
@ -89,6 +89,10 @@ export class WorkflowEntity extends WithTimestampsAndStringId implements IWorkfl
|
||||||
|
|
||||||
@Column({ default: 0 })
|
@Column({ default: 0 })
|
||||||
triggerCount: number;
|
triggerCount: number;
|
||||||
|
|
||||||
|
display() {
|
||||||
|
return `"${this.name}" (ID: ${this.id})`;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -7,4 +7,18 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
|
||||||
constructor(dataSource: DataSource) {
|
constructor(dataSource: DataSource) {
|
||||||
super(WorkflowEntity, dataSource.manager);
|
super(WorkflowEntity, dataSource.manager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getAllActive() {
|
||||||
|
return this.find({
|
||||||
|
where: { active: true },
|
||||||
|
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async findById(workflowId: string) {
|
||||||
|
return this.findOne({
|
||||||
|
where: { id: workflowId },
|
||||||
|
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,9 @@ import Container from 'typedi';
|
||||||
import { RedisService } from './redis.service';
|
import { RedisService } from './redis.service';
|
||||||
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
|
import { EventEmitter } from 'node:events';
|
||||||
|
|
||||||
export abstract class OrchestrationService {
|
export abstract class OrchestrationService extends EventEmitter {
|
||||||
protected initialized = false;
|
protected initialized = false;
|
||||||
|
|
||||||
protected queueModeId: string;
|
protected queueModeId: string;
|
||||||
|
@ -29,6 +30,7 @@ export abstract class OrchestrationService {
|
||||||
}
|
}
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
|
super();
|
||||||
this.redisService = Container.get(RedisService);
|
this.redisService = Container.get(RedisService);
|
||||||
this.queueModeId = config.getEnv('redis.queueModeId');
|
this.queueModeId = config.getEnv('redis.queueModeId');
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,11 +13,11 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||||
|
|
||||||
private leaderId: string | undefined;
|
private leaderId: string | undefined;
|
||||||
|
|
||||||
public get isLeader() {
|
get isLeader() {
|
||||||
return this.id === this.leaderId;
|
return this.id === this.leaderId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public get isFollower() {
|
get isFollower() {
|
||||||
return !this.isLeader;
|
return !this.isLeader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +84,8 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||||
|
|
||||||
this.leaderId = this.id;
|
this.leaderId = this.id;
|
||||||
|
|
||||||
|
this.emit('leadershipChange', this.id);
|
||||||
|
|
||||||
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
356
packages/cli/test/integration/ActiveWorkflowRunner.test.ts
Normal file
356
packages/cli/test/integration/ActiveWorkflowRunner.test.ts
Normal file
|
@ -0,0 +1,356 @@
|
||||||
|
import { Container } from 'typedi';
|
||||||
|
|
||||||
|
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
|
||||||
|
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
|
||||||
|
import { ActiveWorkflows } from 'n8n-core';
|
||||||
|
|
||||||
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||||
|
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||||
|
import config from '@/config';
|
||||||
|
import { ExternalHooks } from '@/ExternalHooks';
|
||||||
|
import { Push } from '@/push';
|
||||||
|
import { SecretsHelper } from '@/SecretsHelpers';
|
||||||
|
import { WebhookService } from '@/services/webhook.service';
|
||||||
|
import * as WebhookHelpers from '@/WebhookHelpers';
|
||||||
|
import * as AdditionalData from '@/WorkflowExecuteAdditionalData';
|
||||||
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
||||||
|
|
||||||
|
import { mockInstance, setSchedulerAsLoadedNode } from './shared/utils';
|
||||||
|
import * as testDb from './shared/testDb';
|
||||||
|
import type { User } from '@/databases/entities/User';
|
||||||
|
import type { WebhookEntity } from '@/databases/entities/WebhookEntity';
|
||||||
|
import { NodeTypes } from '@/NodeTypes';
|
||||||
|
import { chooseRandomly } from './shared/random';
|
||||||
|
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
|
||||||
|
|
||||||
|
mockInstance(ActiveExecutions);
|
||||||
|
mockInstance(ActiveWorkflows);
|
||||||
|
mockInstance(Push);
|
||||||
|
mockInstance(SecretsHelper);
|
||||||
|
mockInstance(MultiMainInstancePublisher);
|
||||||
|
|
||||||
|
const webhookService = mockInstance(WebhookService);
|
||||||
|
|
||||||
|
setSchedulerAsLoadedNode();
|
||||||
|
|
||||||
|
const externalHooks = mockInstance(ExternalHooks);
|
||||||
|
|
||||||
|
let activeWorkflowRunner: ActiveWorkflowRunner;
|
||||||
|
let owner: User;
|
||||||
|
|
||||||
|
const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [
|
||||||
|
'init',
|
||||||
|
'create',
|
||||||
|
'update',
|
||||||
|
'activate',
|
||||||
|
'manual',
|
||||||
|
];
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
await testDb.init();
|
||||||
|
|
||||||
|
activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
||||||
|
owner = await testDb.createOwner();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await activeWorkflowRunner.removeAll();
|
||||||
|
activeWorkflowRunner.removeAllQueuedWorkflowActivations();
|
||||||
|
await testDb.truncate(['Workflow']);
|
||||||
|
config.load(config.default);
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await testDb.terminate();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('init()', () => {
|
||||||
|
test('should call `ExternalHooks.run()`', async () => {
|
||||||
|
const runSpy = jest.spyOn(externalHooks, 'run');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const [hook, arg] = runSpy.mock.calls[0];
|
||||||
|
expect(hook).toBe('activeWorkflows.initialized');
|
||||||
|
expect(arg).toBeEmptyArray();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should start with no active workflows', async () => {
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const inStorage = activeWorkflowRunner.allActiveInStorage();
|
||||||
|
await expect(inStorage).resolves.toHaveLength(0);
|
||||||
|
|
||||||
|
const inMemory = activeWorkflowRunner.allActiveInMemory();
|
||||||
|
expect(inMemory).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should start with one active workflow', async () => {
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const inStorage = activeWorkflowRunner.allActiveInStorage();
|
||||||
|
await expect(inStorage).resolves.toHaveLength(1);
|
||||||
|
|
||||||
|
const inMemory = activeWorkflowRunner.allActiveInMemory();
|
||||||
|
expect(inMemory).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should start with multiple active workflows', async () => {
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const inStorage = activeWorkflowRunner.allActiveInStorage();
|
||||||
|
await expect(inStorage).resolves.toHaveLength(2);
|
||||||
|
|
||||||
|
const inMemory = activeWorkflowRunner.allActiveInMemory();
|
||||||
|
expect(inMemory).toHaveLength(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should pre-check that every workflow can be activated', async () => {
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const precheckSpy = jest
|
||||||
|
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
|
||||||
|
.mockReturnValue(true);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
expect(precheckSpy).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('removeAll()', () => {
|
||||||
|
test('should remove all active workflows from memory', async () => {
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
await activeWorkflowRunner.removeAll();
|
||||||
|
|
||||||
|
const inMemory = activeWorkflowRunner.allActiveInMemory();
|
||||||
|
expect(inMemory).toHaveLength(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('remove()', () => {
|
||||||
|
test('should call `ActiveWorkflowRunner.clearWebhooks()`', async () => {
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
const clearWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'clearWebhooks');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
await activeWorkflowRunner.remove(workflow.id);
|
||||||
|
|
||||||
|
expect(clearWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('isActive()', () => {
|
||||||
|
test('should return `true` for active workflow in storage', async () => {
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
|
||||||
|
await expect(isActiveInStorage).resolves.toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should return `false` for inactive workflow in storage', async () => {
|
||||||
|
const workflow = await testDb.createWorkflow({ active: false }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
|
||||||
|
await expect(isActiveInStorage).resolves.toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('runWorkflow()', () => {
|
||||||
|
test('should call `WorkflowRunner.run()`', async () => {
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
const additionalData = await AdditionalData.getBase('fake-user-id');
|
||||||
|
|
||||||
|
const runSpy = jest
|
||||||
|
.spyOn(WorkflowRunner.prototype, 'run')
|
||||||
|
.mockResolvedValue('fake-execution-id');
|
||||||
|
|
||||||
|
const [node] = workflow.nodes;
|
||||||
|
|
||||||
|
await activeWorkflowRunner.runWorkflow(workflow, node, [[]], additionalData, 'trigger');
|
||||||
|
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('executeErrorWorkflow()', () => {
|
||||||
|
test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => {
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
const [node] = workflow.nodes;
|
||||||
|
const error = new NodeOperationError(node, 'Fake error message');
|
||||||
|
const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
activeWorkflowRunner.executeErrorWorkflow(error, workflow, 'trigger');
|
||||||
|
|
||||||
|
expect(executeSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should be called on failure to activate due to 401', async () => {
|
||||||
|
const storedWorkflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
const [node] = storedWorkflow.nodes;
|
||||||
|
const executeSpy = jest.spyOn(activeWorkflowRunner, 'executeErrorWorkflow');
|
||||||
|
|
||||||
|
jest.spyOn(activeWorkflowRunner, 'add').mockImplementation(() => {
|
||||||
|
throw new NodeApiError(node, {
|
||||||
|
httpCode: '401',
|
||||||
|
message: 'Authorization failed - please check your credentials',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
expect(executeSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const [error, workflow] = executeSpy.mock.calls[0];
|
||||||
|
expect(error.message).toContain('Authorization');
|
||||||
|
expect(workflow.id).toBe(storedWorkflow.id);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('add()', () => {
|
||||||
|
describe('in single-main scenario', () => {
|
||||||
|
test('leader should add webhooks, triggers and pollers', async () => {
|
||||||
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
|
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
|
||||||
|
addWebhooksSpy.mockReset();
|
||||||
|
addTriggersAndPollersSpy.mockReset();
|
||||||
|
|
||||||
|
await activeWorkflowRunner.add(workflow.id, mode);
|
||||||
|
|
||||||
|
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('in multi-main scenario', () => {
|
||||||
|
describe('leader', () => {
|
||||||
|
test('on regular activation mode, leader should add webhooks only', async () => {
|
||||||
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
|
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||||
|
|
||||||
|
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
||||||
|
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
|
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
addWebhooksSpy.mockReset();
|
||||||
|
addTriggersAndPollersSpy.mockReset();
|
||||||
|
|
||||||
|
await activeWorkflowRunner.add(workflow.id, mode);
|
||||||
|
|
||||||
|
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
|
||||||
|
const mode = 'leadershipChange';
|
||||||
|
|
||||||
|
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||||
|
|
||||||
|
mockInstance(MultiMainInstancePublisher, { isLeader: true });
|
||||||
|
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
|
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
addWebhooksSpy.mockReset();
|
||||||
|
addTriggersAndPollersSpy.mockReset();
|
||||||
|
|
||||||
|
await activeWorkflowRunner.add(workflow.id, mode);
|
||||||
|
|
||||||
|
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||||
|
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('follower', () => {
|
||||||
|
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
|
||||||
|
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
|
||||||
|
|
||||||
|
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
|
||||||
|
|
||||||
|
mockInstance(MultiMainInstancePublisher, { isLeader: false });
|
||||||
|
|
||||||
|
const workflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
|
||||||
|
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
|
||||||
|
|
||||||
|
await activeWorkflowRunner.init();
|
||||||
|
addWebhooksSpy.mockReset();
|
||||||
|
addTriggersAndPollersSpy.mockReset();
|
||||||
|
|
||||||
|
await activeWorkflowRunner.add(workflow.id, mode);
|
||||||
|
|
||||||
|
expect(addWebhooksSpy).not.toHaveBeenCalled();
|
||||||
|
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('addWebhooks()', () => {
|
||||||
|
test('should call `WebhookService.storeWebhook()`', async () => {
|
||||||
|
const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData;
|
||||||
|
const mockWebhookEntity = { webhookPath: 'fake-path' } as unknown as WebhookEntity;
|
||||||
|
|
||||||
|
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([mockWebhook]);
|
||||||
|
webhookService.createWebhook.mockReturnValue(mockWebhookEntity);
|
||||||
|
|
||||||
|
const additionalData = await AdditionalData.getBase('fake-user-id');
|
||||||
|
|
||||||
|
const dbWorkflow = await testDb.createWorkflow({ active: true }, owner);
|
||||||
|
|
||||||
|
const workflow = new Workflow({
|
||||||
|
id: dbWorkflow.id,
|
||||||
|
name: dbWorkflow.name,
|
||||||
|
nodes: dbWorkflow.nodes,
|
||||||
|
connections: dbWorkflow.connections,
|
||||||
|
active: dbWorkflow.active,
|
||||||
|
nodeTypes: Container.get(NodeTypes),
|
||||||
|
staticData: dbWorkflow.staticData,
|
||||||
|
settings: dbWorkflow.settings,
|
||||||
|
});
|
||||||
|
|
||||||
|
const [node] = dbWorkflow.nodes;
|
||||||
|
|
||||||
|
jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node);
|
||||||
|
jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true);
|
||||||
|
jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
await activeWorkflowRunner.addWebhooks(workflow, additionalData, 'trigger', 'init');
|
||||||
|
|
||||||
|
expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
|
@ -459,10 +459,10 @@ export async function createWorkflow(attributes: Partial<WorkflowEntity> = {}, u
|
||||||
nodes: nodes ?? [
|
nodes: nodes ?? [
|
||||||
{
|
{
|
||||||
id: 'uuid-1234',
|
id: 'uuid-1234',
|
||||||
name: 'Start',
|
name: 'Schedule Trigger',
|
||||||
parameters: {},
|
parameters: {},
|
||||||
position: [-20, 260],
|
position: [-20, 260],
|
||||||
type: 'n8n-nodes-base.start',
|
type: 'n8n-nodes-base.scheduleTrigger',
|
||||||
typeVersion: 1,
|
typeVersion: 1,
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { Container } from 'typedi';
|
import { Container } from 'typedi';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import type { INode } from 'n8n-workflow';
|
import { type INode } from 'n8n-workflow';
|
||||||
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
|
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
|
||||||
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
|
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
|
||||||
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
|
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
|
||||||
|
@ -16,6 +16,8 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
||||||
import { AUTH_COOKIE_NAME } from '@/constants';
|
import { AUTH_COOKIE_NAME } from '@/constants';
|
||||||
|
|
||||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
||||||
|
import { mockInstance } from './mocking';
|
||||||
|
import { mockNodeTypesData } from '../../../unit/Helpers';
|
||||||
|
|
||||||
export { mockInstance } from './mocking';
|
export { mockInstance } from './mocking';
|
||||||
export { setupTestServer } from './testServer';
|
export { setupTestServer } from './testServer';
|
||||||
|
@ -166,3 +168,15 @@ export function makeWorkflow(options?: {
|
||||||
}
|
}
|
||||||
|
|
||||||
export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] };
|
export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] };
|
||||||
|
|
||||||
|
export function setSchedulerAsLoadedNode() {
|
||||||
|
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
|
||||||
|
|
||||||
|
Object.assign(nodesAndCredentials, {
|
||||||
|
loadedNodes: mockNodeTypesData(['scheduleTrigger'], {
|
||||||
|
addTrigger: true,
|
||||||
|
}),
|
||||||
|
known: { nodes: {}, credentials: {} },
|
||||||
|
types: { nodes: [], credentials: [] },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
|
@ -1,278 +0,0 @@
|
||||||
import { v4 as uuid } from 'uuid';
|
|
||||||
import { mocked } from 'jest-mock';
|
|
||||||
import { Container } from 'typedi';
|
|
||||||
|
|
||||||
import type { INode } from 'n8n-workflow';
|
|
||||||
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
|
|
||||||
|
|
||||||
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
|
|
||||||
import * as Db from '@/Db';
|
|
||||||
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
|
|
||||||
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
|
|
||||||
import { Role } from '@db/entities/Role';
|
|
||||||
import { User } from '@db/entities/User';
|
|
||||||
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
|
|
||||||
import { WorkflowRunner } from '@/WorkflowRunner';
|
|
||||||
import { ExternalHooks } from '@/ExternalHooks';
|
|
||||||
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
|
|
||||||
import { Push } from '@/push';
|
|
||||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
|
||||||
import { SecretsHelper } from '@/SecretsHelpers';
|
|
||||||
import { WebhookService } from '@/services/webhook.service';
|
|
||||||
import { VariablesService } from '@/environments/variables/variables.service';
|
|
||||||
|
|
||||||
import { mockInstance } from '../integration/shared/utils/';
|
|
||||||
import { randomEmail, randomName } from '../integration/shared/random';
|
|
||||||
import * as Helpers from './Helpers';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO:
|
|
||||||
* - test workflow webhooks activation (that trigger `executeWebhook`and other webhook methods)
|
|
||||||
* - test activation error catching and getters such as `getActivationError` (requires building a workflow that fails to activate)
|
|
||||||
* - test queued workflow activation functions (might need to create a non-working workflow to test this)
|
|
||||||
*/
|
|
||||||
|
|
||||||
let databaseActiveWorkflowsCount = 0;
|
|
||||||
let databaseActiveWorkflowsList: WorkflowEntity[] = [];
|
|
||||||
|
|
||||||
const generateWorkflows = (count: number): WorkflowEntity[] => {
|
|
||||||
const workflows: WorkflowEntity[] = [];
|
|
||||||
const ownerRole = new Role();
|
|
||||||
ownerRole.scope = 'workflow';
|
|
||||||
ownerRole.name = 'owner';
|
|
||||||
ownerRole.id = '1';
|
|
||||||
|
|
||||||
const owner = new User();
|
|
||||||
owner.id = uuid();
|
|
||||||
owner.firstName = randomName();
|
|
||||||
owner.lastName = randomName();
|
|
||||||
owner.email = randomEmail();
|
|
||||||
|
|
||||||
for (let i = 0; i < count; i++) {
|
|
||||||
const workflow = new WorkflowEntity();
|
|
||||||
Object.assign(workflow, {
|
|
||||||
id: (i + 1).toString(),
|
|
||||||
name: randomName(),
|
|
||||||
active: true,
|
|
||||||
createdAt: new Date(),
|
|
||||||
updatedAt: new Date(),
|
|
||||||
nodes: [
|
|
||||||
{
|
|
||||||
parameters: {
|
|
||||||
rule: {
|
|
||||||
interval: [{}],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: uuid(),
|
|
||||||
name: 'Schedule Trigger',
|
|
||||||
type: 'n8n-nodes-base.scheduleTrigger',
|
|
||||||
typeVersion: 1,
|
|
||||||
position: [900, 460],
|
|
||||||
},
|
|
||||||
],
|
|
||||||
connections: {},
|
|
||||||
tags: [],
|
|
||||||
});
|
|
||||||
const sharedWorkflow = new SharedWorkflow();
|
|
||||||
sharedWorkflow.workflowId = workflow.id;
|
|
||||||
sharedWorkflow.role = ownerRole;
|
|
||||||
sharedWorkflow.user = owner;
|
|
||||||
|
|
||||||
workflow.shared = [sharedWorkflow];
|
|
||||||
|
|
||||||
workflows.push(workflow);
|
|
||||||
}
|
|
||||||
databaseActiveWorkflowsList = workflows;
|
|
||||||
return workflows;
|
|
||||||
};
|
|
||||||
|
|
||||||
const MOCK_NODE_TYPES_DATA = Helpers.mockNodeTypesData(['scheduleTrigger'], {
|
|
||||||
addTrigger: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
jest.mock('@/Db', () => {
|
|
||||||
return {
|
|
||||||
collections: {
|
|
||||||
Workflow: {
|
|
||||||
find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)),
|
|
||||||
findOne: jest.fn(async (searchParams) => {
|
|
||||||
return databaseActiveWorkflowsList.find(
|
|
||||||
(workflow) => workflow.id === searchParams.where.id.toString(),
|
|
||||||
);
|
|
||||||
}),
|
|
||||||
update: jest.fn(),
|
|
||||||
createQueryBuilder: jest.fn(() => {
|
|
||||||
const fakeQueryBuilder = {
|
|
||||||
update: () => fakeQueryBuilder,
|
|
||||||
set: () => fakeQueryBuilder,
|
|
||||||
where: () => fakeQueryBuilder,
|
|
||||||
execute: async () => {},
|
|
||||||
};
|
|
||||||
return fakeQueryBuilder;
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
const workflowCheckIfCanBeActivated = jest.fn(() => true);
|
|
||||||
|
|
||||||
jest
|
|
||||||
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
|
|
||||||
.mockImplementation(workflowCheckIfCanBeActivated);
|
|
||||||
|
|
||||||
const removeFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'remove');
|
|
||||||
const removeWebhooksFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'removeWorkflowWebhooks');
|
|
||||||
const workflowRunnerRun = jest.spyOn(WorkflowRunner.prototype, 'run');
|
|
||||||
const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
|
|
||||||
WorkflowExecuteAdditionalData,
|
|
||||||
'executeErrorWorkflow',
|
|
||||||
);
|
|
||||||
|
|
||||||
describe('ActiveWorkflowRunner', () => {
|
|
||||||
mockInstance(ActiveExecutions);
|
|
||||||
const externalHooks = mockInstance(ExternalHooks);
|
|
||||||
const webhookService = mockInstance(WebhookService);
|
|
||||||
mockInstance(Push);
|
|
||||||
mockInstance(SecretsHelper);
|
|
||||||
const variablesService = mockInstance(VariablesService);
|
|
||||||
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
|
|
||||||
Object.assign(nodesAndCredentials, {
|
|
||||||
loadedNodes: MOCK_NODE_TYPES_DATA,
|
|
||||||
known: { nodes: {}, credentials: {} },
|
|
||||||
types: { nodes: [], credentials: [] },
|
|
||||||
});
|
|
||||||
|
|
||||||
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
|
|
||||||
|
|
||||||
beforeAll(async () => {
|
|
||||||
variablesService.getAllCached.mockResolvedValue([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async () => {
|
|
||||||
await activeWorkflowRunner.removeAll();
|
|
||||||
databaseActiveWorkflowsCount = 0;
|
|
||||||
databaseActiveWorkflowsList = [];
|
|
||||||
jest.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should initialize activeWorkflowRunner with empty list of active workflows and call External Hooks', async () => {
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
|
|
||||||
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
|
|
||||||
expect(externalHooks.run).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should initialize activeWorkflowRunner with one active workflow', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 1;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
|
|
||||||
databaseActiveWorkflowsCount,
|
|
||||||
);
|
|
||||||
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
|
|
||||||
expect(externalHooks.run).toHaveBeenCalled();
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Should make sure function checkIfWorkflowCanBeActivated was called for every workflow', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 2;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Call to removeAll should remove every workflow', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 2;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
|
|
||||||
databaseActiveWorkflowsCount,
|
|
||||||
);
|
|
||||||
await activeWorkflowRunner.removeAll();
|
|
||||||
expect(removeFunction).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Call to remove should also call removeWorkflowWebhooks', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 1;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
|
|
||||||
databaseActiveWorkflowsCount,
|
|
||||||
);
|
|
||||||
await activeWorkflowRunner.remove('1');
|
|
||||||
expect(removeWebhooksFunction).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Call to isActive should return true for valid workflow', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 1;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.isActive('1')).toBe(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Call to isActive should return false for invalid workflow', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 1;
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
expect(await activeWorkflowRunner.isActive('2')).toBe(false);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('Calling add should call checkIfWorkflowCanBeActivated', async () => {
|
|
||||||
// Initialize with default (0) workflows
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
generateWorkflows(1);
|
|
||||||
await activeWorkflowRunner.add('1', 'activate');
|
|
||||||
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('runWorkflow should call run method in WorkflowRunner', async () => {
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
const workflow = generateWorkflows(1);
|
|
||||||
const additionalData = await WorkflowExecuteAdditionalData.getBase('fake-user-id');
|
|
||||||
|
|
||||||
workflowRunnerRun.mockResolvedValueOnce('invalid-execution-id');
|
|
||||||
|
|
||||||
await activeWorkflowRunner.runWorkflow(
|
|
||||||
workflow[0],
|
|
||||||
workflow[0].nodes[0],
|
|
||||||
[[]],
|
|
||||||
additionalData,
|
|
||||||
'trigger',
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(workflowRunnerRun).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
test('executeErrorWorkflow should call function with same name in WorkflowExecuteAdditionalData', async () => {
|
|
||||||
const workflowData = generateWorkflows(1)[0];
|
|
||||||
const error = new NodeOperationError(workflowData.nodes[0], 'Fake error message');
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
activeWorkflowRunner.executeErrorWorkflow(error, workflowData, 'trigger');
|
|
||||||
expect(workflowExecuteAdditionalDataExecuteErrorWorkflowSpy).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('init()', () => {
|
|
||||||
it('should execute error workflow on failure to activate due to 401', async () => {
|
|
||||||
databaseActiveWorkflowsCount = 1;
|
|
||||||
|
|
||||||
jest.spyOn(ActiveWorkflowRunner.prototype, 'add').mockImplementation(() => {
|
|
||||||
throw new NodeApiError(
|
|
||||||
{
|
|
||||||
id: 'a75dcd1b-9fed-4643-90bd-75933d67936c',
|
|
||||||
name: 'Github Trigger',
|
|
||||||
type: 'n8n-nodes-base.githubTrigger',
|
|
||||||
typeVersion: 1,
|
|
||||||
position: [0, 0],
|
|
||||||
} as INode,
|
|
||||||
{
|
|
||||||
httpCode: '401',
|
|
||||||
message: 'Authorization failed - please check your credentials',
|
|
||||||
},
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
const executeSpy = jest.spyOn(ActiveWorkflowRunner.prototype, 'executeErrorWorkflow');
|
|
||||||
|
|
||||||
await activeWorkflowRunner.init();
|
|
||||||
|
|
||||||
const [error, workflow] = executeSpy.mock.calls[0];
|
|
||||||
|
|
||||||
expect(error.message).toContain('Authorization');
|
|
||||||
expect(workflow.id).toBe('1');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
|
@ -1,5 +1,3 @@
|
||||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
|
||||||
|
|
||||||
import { CronJob } from 'cron';
|
import { CronJob } from 'cron';
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
|
@ -14,62 +12,64 @@ import type {
|
||||||
WorkflowActivateMode,
|
WorkflowActivateMode,
|
||||||
WorkflowExecuteMode,
|
WorkflowExecuteMode,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import { LoggerProxy as Logger, toCronExpression, WorkflowActivationError } from 'n8n-workflow';
|
import {
|
||||||
|
LoggerProxy as Logger,
|
||||||
|
toCronExpression,
|
||||||
|
WorkflowActivationError,
|
||||||
|
WorkflowDeactivationError,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
import type { IWorkflowData } from './Interfaces';
|
import type { IWorkflowData } from './Interfaces';
|
||||||
|
|
||||||
export class ActiveWorkflows {
|
export class ActiveWorkflows {
|
||||||
private workflowData: {
|
private activeWorkflows: {
|
||||||
[key: string]: IWorkflowData;
|
[workflowId: string]: IWorkflowData;
|
||||||
} = {};
|
} = {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns if the workflow is active
|
* Returns if the workflow is active in memory.
|
||||||
*
|
|
||||||
* @param {string} id The id of the workflow to check
|
|
||||||
*/
|
*/
|
||||||
isActive(id: string): boolean {
|
isActive(workflowId: string) {
|
||||||
return this.workflowData.hasOwnProperty(id);
|
return this.activeWorkflows.hasOwnProperty(workflowId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the ids of the currently active workflows
|
* Returns the IDs of the currently active workflows in memory.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
allActiveWorkflows(): string[] {
|
allActiveWorkflows() {
|
||||||
return Object.keys(this.workflowData);
|
return Object.keys(this.activeWorkflows);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the Workflow data for the workflow with
|
* Returns the workflow data for the given ID if currently active in memory.
|
||||||
* the given id if it is currently active
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
get(id: string): IWorkflowData | undefined {
|
get(workflowId: string) {
|
||||||
return this.workflowData[id];
|
return this.activeWorkflows[workflowId];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes a workflow active
|
* Makes a workflow active
|
||||||
*
|
*
|
||||||
* @param {string} id The id of the workflow to activate
|
* @param {string} workflowId The id of the workflow to activate
|
||||||
* @param {Workflow} workflow The workflow to activate
|
* @param {Workflow} workflow The workflow to activate
|
||||||
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
|
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
|
||||||
*/
|
*/
|
||||||
async add(
|
async add(
|
||||||
id: string,
|
workflowId: string,
|
||||||
workflow: Workflow,
|
workflow: Workflow,
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
mode: WorkflowExecuteMode,
|
mode: WorkflowExecuteMode,
|
||||||
activation: WorkflowActivateMode,
|
activation: WorkflowActivateMode,
|
||||||
getTriggerFunctions: IGetExecuteTriggerFunctions,
|
getTriggerFunctions: IGetExecuteTriggerFunctions,
|
||||||
getPollFunctions: IGetExecutePollFunctions,
|
getPollFunctions: IGetExecutePollFunctions,
|
||||||
): Promise<void> {
|
) {
|
||||||
this.workflowData[id] = {};
|
this.activeWorkflows[workflowId] = {};
|
||||||
const triggerNodes = workflow.getTriggerNodes();
|
const triggerNodes = workflow.getTriggerNodes();
|
||||||
|
|
||||||
let triggerResponse: ITriggerResponse | undefined;
|
let triggerResponse: ITriggerResponse | undefined;
|
||||||
this.workflowData[id].triggerResponses = [];
|
|
||||||
|
this.activeWorkflows[workflowId].triggerResponses = [];
|
||||||
|
|
||||||
for (const triggerNode of triggerNodes) {
|
for (const triggerNode of triggerNodes) {
|
||||||
try {
|
try {
|
||||||
triggerResponse = await workflow.runTrigger(
|
triggerResponse = await workflow.runTrigger(
|
||||||
|
@ -82,23 +82,27 @@ export class ActiveWorkflows {
|
||||||
if (triggerResponse !== undefined) {
|
if (triggerResponse !== undefined) {
|
||||||
// If a response was given save it
|
// If a response was given save it
|
||||||
|
|
||||||
this.workflowData[id].triggerResponses!.push(triggerResponse);
|
this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
throw new WorkflowActivationError(
|
throw new WorkflowActivationError(
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
||||||
`There was a problem activating the workflow: "${error.message}"`,
|
`There was a problem activating the workflow: "${error.message}"`,
|
||||||
{ cause: error as Error, node: triggerNode },
|
{ cause: error, node: triggerNode },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const pollNodes = workflow.getPollNodes();
|
const pollingNodes = workflow.getPollNodes();
|
||||||
if (pollNodes.length) {
|
|
||||||
this.workflowData[id].pollResponses = [];
|
if (pollingNodes.length === 0) return;
|
||||||
for (const pollNode of pollNodes) {
|
|
||||||
|
this.activeWorkflows[workflowId].pollResponses = [];
|
||||||
|
|
||||||
|
for (const pollNode of pollingNodes) {
|
||||||
try {
|
try {
|
||||||
this.workflowData[id].pollResponses!.push(
|
this.activeWorkflows[workflowId].pollResponses!.push(
|
||||||
await this.activatePolling(
|
await this.activatePolling(
|
||||||
pollNode,
|
pollNode,
|
||||||
workflow,
|
workflow,
|
||||||
|
@ -108,20 +112,19 @@ export class ActiveWorkflows {
|
||||||
activation,
|
activation,
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
} catch (error) {
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
throw new WorkflowActivationError(
|
throw new WorkflowActivationError(
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
||||||
`There was a problem activating the workflow: "${error.message}"`,
|
`There was a problem activating the workflow: "${error.message}"`,
|
||||||
{ cause: error as Error, node: pollNode },
|
{ cause: error, node: pollNode },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Activates polling for the given node
|
* Activates polling for the given node
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
async activatePolling(
|
async activatePolling(
|
||||||
node: INode,
|
node: INode,
|
||||||
|
@ -159,7 +162,7 @@ export class ActiveWorkflows {
|
||||||
if (testingTrigger) {
|
if (testingTrigger) {
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
pollFunctions.__emitError(error);
|
pollFunctions.__emitError(error as Error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -193,59 +196,49 @@ export class ActiveWorkflows {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes a workflow inactive
|
* Makes a workflow inactive in memory.
|
||||||
*
|
|
||||||
* @param {string} id The id of the workflow to deactivate
|
|
||||||
*/
|
*/
|
||||||
async remove(id: string): Promise<boolean> {
|
async remove(workflowId: string) {
|
||||||
if (!this.isActive(id)) {
|
if (!this.isActive(workflowId)) {
|
||||||
// Workflow is currently not registered
|
Logger.warn(`Cannot deactivate already inactive workflow ID "${workflowId}"`);
|
||||||
Logger.warn(
|
|
||||||
`The workflow with the id "${id}" is currently not active and can so not be removed`,
|
|
||||||
);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
const workflowData = this.workflowData[id];
|
const w = this.activeWorkflows[workflowId];
|
||||||
|
|
||||||
if (workflowData.triggerResponses) {
|
w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
|
||||||
for (const triggerResponse of workflowData.triggerResponses) {
|
w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
|
||||||
if (triggerResponse.closeFunction) {
|
|
||||||
try {
|
|
||||||
await triggerResponse.closeFunction();
|
|
||||||
} catch (error) {
|
|
||||||
Logger.error(
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
||||||
`There was a problem deactivating trigger of workflow "${id}": "${error.message}"`,
|
|
||||||
{
|
|
||||||
workflowId: id,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workflowData.pollResponses) {
|
delete this.activeWorkflows[workflowId];
|
||||||
for (const pollResponse of workflowData.pollResponses) {
|
|
||||||
if (pollResponse.closeFunction) {
|
|
||||||
try {
|
|
||||||
await pollResponse.closeFunction();
|
|
||||||
} catch (error) {
|
|
||||||
Logger.error(
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
|
||||||
`There was a problem deactivating polling trigger of workflow "${id}": "${error.message}"`,
|
|
||||||
{
|
|
||||||
workflowId: id,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
delete this.workflowData[id];
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeAllTriggerAndPollerBasedWorkflows() {
|
||||||
|
for (const workflowId of Object.keys(this.activeWorkflows)) {
|
||||||
|
const w = this.activeWorkflows[workflowId];
|
||||||
|
|
||||||
|
w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
|
||||||
|
w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async close(
|
||||||
|
response: ITriggerResponse | IPollResponse,
|
||||||
|
workflowId: string,
|
||||||
|
target: 'trigger' | 'poller',
|
||||||
|
) {
|
||||||
|
if (!response.closeFunction) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await response.closeFunction();
|
||||||
|
} catch (e) {
|
||||||
|
const error = e instanceof Error ? e : new Error(`${e}`);
|
||||||
|
|
||||||
|
throw new WorkflowDeactivationError(
|
||||||
|
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
|
||||||
|
{ cause: error, workflowId },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1905,7 +1905,14 @@ export type WorkflowExecuteMode =
|
||||||
| 'retry'
|
| 'retry'
|
||||||
| 'trigger'
|
| 'trigger'
|
||||||
| 'webhook';
|
| 'webhook';
|
||||||
export type WorkflowActivateMode = 'init' | 'create' | 'update' | 'activate' | 'manual';
|
|
||||||
|
export type WorkflowActivateMode =
|
||||||
|
| 'init'
|
||||||
|
| 'create'
|
||||||
|
| 'update'
|
||||||
|
| 'activate'
|
||||||
|
| 'manual'
|
||||||
|
| 'leadershipChange';
|
||||||
|
|
||||||
export interface IWorkflowHooksOptionalParameters {
|
export interface IWorkflowHooksOptionalParameters {
|
||||||
parentProcessMode?: string;
|
parentProcessMode?: string;
|
||||||
|
|
|
@ -5,6 +5,7 @@ interface WorkflowActivationErrorOptions {
|
||||||
cause?: Error;
|
cause?: Error;
|
||||||
node?: INode;
|
node?: INode;
|
||||||
severity?: Severity;
|
severity?: Severity;
|
||||||
|
workflowId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -13,7 +14,12 @@ interface WorkflowActivationErrorOptions {
|
||||||
export class WorkflowActivationError extends ExecutionBaseError {
|
export class WorkflowActivationError extends ExecutionBaseError {
|
||||||
node: INode | undefined;
|
node: INode | undefined;
|
||||||
|
|
||||||
constructor(message: string, { cause, node, severity }: WorkflowActivationErrorOptions) {
|
workflowId: string | undefined;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
message: string,
|
||||||
|
{ cause, node, severity, workflowId }: WorkflowActivationErrorOptions = {},
|
||||||
|
) {
|
||||||
let error = cause as Error;
|
let error = cause as Error;
|
||||||
if (cause instanceof ExecutionBaseError) {
|
if (cause instanceof ExecutionBaseError) {
|
||||||
error = new Error(cause.message);
|
error = new Error(cause.message);
|
||||||
|
@ -23,11 +29,14 @@ export class WorkflowActivationError extends ExecutionBaseError {
|
||||||
}
|
}
|
||||||
super(message, { cause: error });
|
super(message, { cause: error });
|
||||||
this.node = node;
|
this.node = node;
|
||||||
|
this.workflowId = workflowId;
|
||||||
this.message = message;
|
this.message = message;
|
||||||
if (severity) this.severity = severity;
|
if (severity) this.severity = severity;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class WorkflowDeactivationError extends WorkflowActivationError {}
|
||||||
|
|
||||||
export class WebhookPathAlreadyTakenError extends WorkflowActivationError {
|
export class WebhookPathAlreadyTakenError extends WorkflowActivationError {
|
||||||
constructor(nodeName: string, cause?: Error) {
|
constructor(nodeName: string, cause?: Error) {
|
||||||
super(
|
super(
|
||||||
|
|
Loading…
Reference in a new issue