feat(core): Coordinate workflow activation in multiple main scenario in internal API (#7566)

Story: https://linear.app/n8n/issue/PAY-926

This PR coordinates workflow activation on instance startup and on
leadership change in multiple main scenario in the internal API. Part 3
on manual workflow activation and deactivation will be a separate PR.

### Part 1: Instance startup

In multi-main scenario, on starting an instance...
- [x] If the instance is the leader, it should add webhooks, triggers
and pollers.
- [x] If the instance is the follower, it should not add webhooks,
triggers or pollers.
- [x] Unit tests.

### Part 2: Leadership change 

In multi-main scenario, if the main instance leader dies…

- [x] The new main instance leader must activate all trigger- and
poller-based workflows, excluding webhook-based workflows.
- [x] The old main instance leader must deactivate all trigger- and
poller-based workflows, excluding webhook-based workflows.
- [x] Unit tests.

To test, start two instances and check behavior on startup and
leadership change:

```
EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug npm run start

EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug N8N_PORT=5679 npm run start
```
This commit is contained in:
Iván Ovejero 2023-11-07 13:48:48 +01:00 committed by GitHub
parent 151e60f829
commit c857e42677
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 839 additions and 618 deletions

View file

@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type {
@ -35,8 +35,6 @@ import type express from 'express';
import * as Db from '@/Db';
import type {
IActivationError,
IQueuedWorkflowActivations,
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
@ -65,99 +63,76 @@ import { webhookNotFoundErrorMessage } from './utils';
import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { WorkflowRepository } from '@/databases/repositories';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@Service()
export class ActiveWorkflowRunner implements IWebhookManager {
private activeWorkflows = new ActiveWorkflows();
activeWorkflows = new ActiveWorkflows();
private activationErrors: {
[key: string]: IActivationError;
[workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {};
private queuedWorkflowActivations: {
[key: string]: IQueuedWorkflowActivations;
private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
};
} = {};
isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
private readonly externalHooks: ExternalHooks,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
) {}
async init() {
// Get the active workflows from database
if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
// NOTE
// Here I guess we can have a flag on the workflow table like hasTrigger
// so instead of pulling all the active webhooks just pull the actives that have a trigger
const workflowsData: IWorkflowDb[] = (await Db.collections.Workflow.find({
where: { active: true },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
})) as IWorkflowDb[];
this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
if (workflowsData.length !== 0) {
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');
for (const workflowData of workflowsData) {
this.logger.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
this.logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, {
workflowName: workflowData.name,
workflowId: workflowData.id,
});
try {
await this.add(workflowData.id, 'init', workflowData);
this.logger.verbose(`Successfully started workflow "${workflowData.name}"`, {
workflowName: workflowData.name,
workflowId: workflowData.id,
});
this.logger.info(' => Started');
} catch (error) {
ErrorReporter.error(error);
this.logger.info(
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
);
this.logger.info(` ${error.message}`);
this.logger.error(
`Issue on initial workflow activation try "${workflowData.name}" (startup)`,
{
workflowName: workflowData.name,
workflowId: workflowData.id,
},
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.executeErrorWorkflow(error, workflowData, 'internal');
if (!error.message.includes('Authorization')) {
// Keep on trying to activate the workflow if not an auth issue
this.addQueuedWorkflowActivation('init', workflowData);
}
}
}
this.logger.verbose('Finished initializing active workflows (startup)');
await this.multiMainInstancePublisher.init();
}
await this.addActiveWorkflows('init');
await this.externalHooks.run('activeWorkflows.initialized', []);
await this.webhookService.populateCache();
}
/**
* Removes all the currently active workflows
* Removes all the currently active workflows from memory.
*/
async removeAll(): Promise<void> {
async removeAll() {
let activeWorkflowIds: string[] = [];
this.logger.verbose('Call to remove all active workflows received (removeAll)');
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
const activeWorkflows = await this.getActiveWorkflows();
const activeWorkflows = await this.allActiveInStorage();
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
// Make sure IDs are unique
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
@ -284,76 +259,86 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
/**
* Returns the ids of the currently active workflows
* Returns the ids of the currently active workflows from memory.
*/
async getActiveWorkflows(user?: User): Promise<string[]> {
let activeWorkflows: WorkflowEntity[] = [];
if (!user || user.globalRole.name === 'owner') {
activeWorkflows = await Db.collections.Workflow.find({
select: ['id'],
where: { active: true },
});
return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]);
} else {
const active = await Db.collections.Workflow.find({
select: ['id'],
where: { active: true },
});
const activeIds = active.map((workflow) => workflow.id);
const where = whereClause({
user,
entityType: 'workflow',
});
Object.assign(where, { workflowId: In(activeIds) });
const shared = await Db.collections.SharedWorkflow.find({
select: ['workflowId'],
where,
});
return shared
.map((id) => id.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]);
}
allActiveInMemory() {
return this.activeWorkflows.allActiveWorkflows();
}
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
* Get the IDs of active workflows from storage.
*/
async isActive(id: string): Promise<boolean> {
const workflow = await Db.collections.Workflow.findOne({
select: ['active'],
where: { id },
async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner';
if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});
return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]);
}
const where = whereClause({
user,
entityType: 'workflow',
});
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});
const activeIds = activeWorkflows.map((workflow) => workflow.id);
Object.assign(where, { workflowId: In(activeIds) });
const sharings = await Db.collections.SharedWorkflow.find({
select: ['workflowId'],
where,
});
return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]);
}
/**
* Returns if the workflow is stored as `active`.
*
* @important Do not confuse with `ActiveWorkflows.isActive()`,
* which checks if the workflow is active in memory.
*/
async isActive(workflowId: string) {
const workflow = await this.workflowRepository.findOne({
select: ['active'],
where: { id: workflowId },
});
return !!workflow?.active;
}
/**
* Return error if there was a problem activating the workflow
*
* @param {string} id The id of the workflow to return the error of
*/
getActivationError(id: string): IActivationError | undefined {
if (this.activationErrors[id] === undefined) {
return undefined;
}
return this.activationErrors[id];
getActivationError(workflowId: string) {
return this.activationErrors[workflowId];
}
/**
* Adds all the webhooks of the workflow
* Register workflow-defined webhooks in the `workflow_entity` table.
*/
async addWorkflowWebhooks(
async addWebhooks(
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
): Promise<void> {
) {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
let path = '' as string | undefined;
let path = '';
for (const webhookData of webhooks) {
const node = workflow.getNode(webhookData.node) as INode;
@ -401,7 +386,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
try {
await this.removeWorkflowWebhooks(workflow.id);
await this.clearWebhooks(workflow.id);
} catch (error1) {
ErrorReporter.error(error1);
this.logger.error(
@ -428,14 +413,14 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
/**
* Remove all the webhooks of the workflow
*
* Clear workflow-defined webhooks from the `webhook_entity` table.
*/
async removeWorkflowWebhooks(workflowId: string): Promise<void> {
async clearWebhooks(workflowId: string) {
const workflowData = await Db.collections.Workflow.findOne({
where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
});
if (workflowData === null) {
throw new Error(`Could not find workflow with id "${workflowId}"`);
}
@ -468,11 +453,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.webhookService.deleteWorkflowWebhooks(workflowId);
}
/**
* Runs the given workflow
*
*/
async runWorkflow(
workflowData: IWorkflowDb,
node: INode,
@ -520,7 +500,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Return poll function which gets the global functions from n8n-core
* and overwrites the emit to be able to start it in subprocess
*
*/
getExecutePollFunctions(
workflowData: IWorkflowDb,
@ -576,7 +555,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Return trigger function which gets the global functions from n8n-core
* and overwrites the emit to be able to start it in subprocess
*
*/
getExecuteTriggerFunctions(
workflowData: IWorkflowDb,
@ -647,7 +625,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
);
this.executeErrorWorkflow(activationError, workflowData, mode);
this.addQueuedWorkflowActivation(activation, workflowData);
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
};
return returnFunctions;
};
@ -676,113 +654,175 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
/**
* Makes a workflow active
* Register as active in memory all workflows stored as `active`.
*/
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
const dbWorkflows = await this.workflowRepository.getAllActive();
if (dbWorkflows.length === 0) return;
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');
for (const dbWorkflow of dbWorkflows) {
this.logger.info(` - ${dbWorkflow.display()}`);
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
try {
await this.add(dbWorkflow.id, activationMode, dbWorkflow);
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
this.logger.info(' => Started');
} catch (error) {
ErrorReporter.error(error);
this.logger.info(
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
);
this.logger.info(` ${error.message}`);
this.logger.error(
`Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`,
{
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
},
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.executeErrorWorkflow(error, dbWorkflow, 'internal');
// do not keep trying to activate on authorization error
if (error.message.includes('Authorization')) continue;
this.addQueuedWorkflowActivation('init', dbWorkflow);
}
}
this.logger.verbose('Finished activating workflows (startup)');
}
async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
await this.addActiveWorkflows('leadershipChange');
}
async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}
/**
* Register a workflow as active.
*
* @param {string} workflowId The id of the workflow to activate
* @param {IWorkflowDb} [workflowData] If workflowData is given it saves the DB query
* An activatable workflow may be webhook-, trigger-, or poller-based:
*
* - A `webhook` is an HTTP-based node that can start a workflow when called
* by a third-party service.
* - A `poller` is an HTTP-based node that can start a workflow when detecting
* a change while regularly checking a third-party service.
* - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a
* time-based node like Schedule Trigger or a message-queue-based node.
*
* Note that despite the name, most "trigger" nodes are actually webhook-based
* and so qualify as `webhook`, e.g. Stripe Trigger.
*
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
* but webhooks are registered by being entered in the `webhook_entity` table,
* since webhooks do not require continuous execution.
*/
async add(
workflowId: string,
activation: WorkflowActivateMode,
workflowData?: IWorkflowDb,
): Promise<void> {
let workflowInstance: Workflow;
activationMode: WorkflowActivateMode,
existingWorkflow?: WorkflowEntity,
) {
let workflow: Workflow;
let shouldAddWebhooks = true;
let shouldAddTriggersAndPollers = true;
if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
}
if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
}
try {
if (workflowData === undefined) {
workflowData = (await Db.collections.Workflow.findOne({
where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
})) as IWorkflowDb;
const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
if (!dbWorkflow) {
throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
}
if (!workflowData) {
throw new Error(`Could not find workflow with id "${workflowId}".`);
}
workflowInstance = new Workflow({
id: workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
workflow = new Workflow({
id: dbWorkflow.id,
name: dbWorkflow.name,
nodes: dbWorkflow.nodes,
connections: dbWorkflow.connections,
active: dbWorkflow.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
staticData: dbWorkflow.staticData,
settings: dbWorkflow.settings,
});
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(STARTING_NODES);
const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES);
if (!canBeActivated) {
this.logger.error(`Unable to activate workflow "${workflowData.name}"`);
throw new Error(
'The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.',
throw new WorkflowActivationError(
`Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
);
}
const mode = 'trigger';
const workflowOwner = (workflowData as WorkflowEntity).shared.find(
(shared) => shared.role.name === 'owner',
);
if (!workflowOwner) {
throw new Error('Workflow cannot be activated because it has no owner');
const sharing = dbWorkflow.shared.find((shared) => shared.role.name === 'owner');
if (!sharing) {
throw new WorkflowActivationError(`Workflow ${dbWorkflow.display()} has no owner`);
}
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.user.id);
const getTriggerFunctions = this.getExecuteTriggerFunctions(
workflowData,
additionalData,
mode,
activation,
);
const getPollFunctions = this.getExecutePollFunctions(
workflowData,
additionalData,
mode,
activation,
);
// Add the workflows which have webhooks defined
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode, activation);
const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
if (
workflowInstance.getTriggerNodes().length !== 0 ||
workflowInstance.getPollNodes().length !== 0
) {
await this.activeWorkflows.add(
workflowId,
workflowInstance,
if (shouldAddWebhooks) {
this.logger.debug('============');
this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
}
if (shouldAddTriggersAndPollers) {
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
executionMode: 'trigger',
additionalData,
mode,
activation,
getTriggerFunctions,
getPollFunctions,
);
this.logger.verbose(`Successfully activated workflow "${workflowData.name}"`, {
workflowId,
workflowName: workflowData.name,
});
}
// Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId);
if (this.activationErrors[workflowId] !== undefined) {
// If there were activation errors delete them
if (this.activationErrors[workflowId]) {
delete this.activationErrors[workflowId];
}
if (workflowInstance.id) {
// Sum all triggers in the workflow, EXCLUDING the manual trigger
const triggerFilter = (nodeType: INodeType) =>
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
const triggerCount =
workflowInstance.queryNodes(triggerFilter).length +
workflowInstance.getPollNodes().length +
WebhookHelpers.getWorkflowWebhooks(workflowInstance, additionalData, undefined, true)
.length;
await WorkflowsService.updateWorkflowTriggerCount(workflowInstance.id, triggerCount);
}
const triggerCount = this.countTriggers(workflow, additionalData);
await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
} catch (error) {
// There was a problem activating the workflow
// Save the error
this.activationErrors[workflowId] = {
time: new Date().getTime(),
error: {
@ -795,7 +835,24 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted.
await WorkflowsService.saveStaticData(workflowInstance!);
await WorkflowsService.saveStaticData(workflow);
}
/**
* Count all triggers in the workflow, excluding Manual Trigger.
*/
private countTriggers(
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
) {
const triggerFilter = (nodeType: INodeType) =>
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
return (
workflow.queryNodes(triggerFilter).length +
workflow.getPollNodes().length +
WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true).length
);
}
/**
@ -803,10 +860,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
* Meaning it will keep on trying to activate it in regular
* amounts indefinitely.
*/
addQueuedWorkflowActivation(
activationMode: WorkflowActivateMode,
workflowData: IWorkflowDb,
): void {
addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) {
const workflowId = workflowData.id;
const workflowName = workflowData.name;
@ -819,7 +873,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.add(workflowId, activationMode, workflowData);
} catch (error) {
ErrorReporter.error(error);
let lastTimeout = this.queuedWorkflowActivations[workflowId].lastTimeout;
let lastTimeout = this.queuedActivations[workflowId].lastTimeout;
if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) {
lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT);
}
@ -834,8 +888,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
},
);
this.queuedWorkflowActivations[workflowId].lastTimeout = lastTimeout;
this.queuedWorkflowActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
this.queuedActivations[workflowId].lastTimeout = lastTimeout;
this.queuedActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
return;
}
this.logger.info(
@ -851,7 +905,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// multiple run in parallel
this.removeQueuedWorkflowActivation(workflowId);
this.queuedWorkflowActivations[workflowId] = {
this.queuedActivations[workflowId] = {
activationMode,
lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT),
@ -862,18 +916,18 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/**
* Remove a workflow from the activation queue
*/
removeQueuedWorkflowActivation(workflowId: string): void {
if (this.queuedWorkflowActivations[workflowId]) {
clearTimeout(this.queuedWorkflowActivations[workflowId].timeout);
delete this.queuedWorkflowActivations[workflowId];
removeQueuedWorkflowActivation(workflowId: string) {
if (this.queuedActivations[workflowId]) {
clearTimeout(this.queuedActivations[workflowId].timeout);
delete this.queuedActivations[workflowId];
}
}
/**
* Remove all workflows from the activation queue
*/
removeAllQueuedWorkflowActivations(): void {
for (const workflowId in this.queuedWorkflowActivations) {
removeAllQueuedWorkflowActivations() {
for (const workflowId in this.queuedActivations) {
this.removeQueuedWorkflowActivation(workflowId);
}
}
@ -884,10 +938,10 @@ export class ActiveWorkflowRunner implements IWebhookManager {
* @param {string} workflowId The id of the workflow to deactivate
*/
// TODO: this should happen in a transaction
async remove(workflowId: string): Promise<void> {
async remove(workflowId: string) {
// Remove all the webhooks of the workflow
try {
await this.removeWorkflowWebhooks(workflowId);
await this.clearWebhooks(workflowId);
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
@ -900,7 +954,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
delete this.activationErrors[workflowId];
}
if (this.queuedWorkflowActivations[workflowId] !== undefined) {
if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
}
@ -913,4 +967,52 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}
}
}
/**
* Register as active in memory a trigger- or poller-based workflow.
*/
async addTriggersAndPollers(
dbWorkflow: WorkflowEntity,
workflow: Workflow,
{
activationMode,
executionMode,
additionalData,
}: {
activationMode: WorkflowActivateMode;
executionMode: WorkflowExecuteMode;
additionalData: IWorkflowExecuteAdditionalDataWorkflow;
},
) {
const getTriggerFunctions = this.getExecuteTriggerFunctions(
dbWorkflow,
additionalData,
executionMode,
activationMode,
);
const getPollFunctions = this.getExecutePollFunctions(
dbWorkflow,
additionalData,
executionMode,
activationMode,
);
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
await this.activeWorkflows.add(
workflow.id,
workflow,
additionalData,
executionMode,
activationMode,
getTriggerFunctions,
getPollFunctions,
);
this.logger.verbose(`Workflow ${dbWorkflow.display()} activated`, {
workflowId: dbWorkflow.id,
workflowName: dbWorkflow.name,
});
}
}
}

View file

@ -16,7 +16,6 @@ import type {
IWorkflowBase,
CredentialLoadingDetails,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
ExecutionStatus,
IExecutionsSummary,
@ -64,20 +63,6 @@ import type {
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
export interface IActivationError {
time: number;
error: {
message: string;
};
}
export interface IQueuedWorkflowActivations {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
}
export interface ICredentialsTypeData {
[key: string]: CredentialLoadingDetails;
}

View file

@ -620,7 +620,7 @@ export class Server extends AbstractServer {
this.app.get(
`/${this.restEndpoint}/active`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
return this.activeWorkflowRunner.getActiveWorkflows(req.user);
return this.activeWorkflowRunner.allActiveInStorage(req.user);
}),
);

View file

@ -119,6 +119,8 @@ export class Start extends BaseCommand {
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
);
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await Container.get(MultiMainInstancePublisher).destroy();
}
@ -251,6 +253,15 @@ export class Start extends BaseCommand {
}
await Container.get(OrchestrationHandlerMainService).init();
multiMainInstancePublisher.on('leadershipChange', async () => {
if (multiMainInstancePublisher.isLeader) {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
// only in case of leadership change without shutdown
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
}
});
}
async run() {

View file

@ -89,6 +89,10 @@ export class WorkflowEntity extends WithTimestampsAndStringId implements IWorkfl
@Column({ default: 0 })
triggerCount: number;
display() {
return `"${this.name}" (ID: ${this.id})`;
}
}
/**

View file

@ -7,4 +7,18 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
constructor(dataSource: DataSource) {
super(WorkflowEntity, dataSource.manager);
}
async getAllActive() {
return this.find({
where: { active: true },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
});
}
async findById(workflowId: string) {
return this.findOne({
where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
});
}
}

View file

@ -2,8 +2,9 @@ import Container from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import config from '@/config';
import { EventEmitter } from 'node:events';
export abstract class OrchestrationService {
export abstract class OrchestrationService extends EventEmitter {
protected initialized = false;
protected queueModeId: string;
@ -29,6 +30,7 @@ export abstract class OrchestrationService {
}
constructor() {
super();
this.redisService = Container.get(RedisService);
this.queueModeId = config.getEnv('redis.queueModeId');
}

View file

@ -13,11 +13,11 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
private leaderId: string | undefined;
public get isLeader() {
get isLeader() {
return this.id === this.leaderId;
}
public get isFollower() {
get isFollower() {
return !this.isLeader;
}
@ -84,6 +84,8 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
this.leaderId = this.id;
this.emit('leadershipChange', this.id);
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
}
}

View file

@ -0,0 +1,356 @@
import { Container } from 'typedi';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import { ActiveWorkflows } from 'n8n-core';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import config from '@/config';
import { ExternalHooks } from '@/ExternalHooks';
import { Push } from '@/push';
import { SecretsHelper } from '@/SecretsHelpers';
import { WebhookService } from '@/services/webhook.service';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as AdditionalData from '@/WorkflowExecuteAdditionalData';
import { WorkflowRunner } from '@/WorkflowRunner';
import { mockInstance, setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb';
import type { User } from '@/databases/entities/User';
import type { WebhookEntity } from '@/databases/entities/WebhookEntity';
import { NodeTypes } from '@/NodeTypes';
import { chooseRandomly } from './shared/random';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows);
mockInstance(Push);
mockInstance(SecretsHelper);
mockInstance(MultiMainInstancePublisher);
const webhookService = mockInstance(WebhookService);
setSchedulerAsLoadedNode();
const externalHooks = mockInstance(ExternalHooks);
let activeWorkflowRunner: ActiveWorkflowRunner;
let owner: User;
const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [
'init',
'create',
'update',
'activate',
'manual',
];
beforeAll(async () => {
await testDb.init();
activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
owner = await testDb.createOwner();
});
afterEach(async () => {
await activeWorkflowRunner.removeAll();
activeWorkflowRunner.removeAllQueuedWorkflowActivations();
await testDb.truncate(['Workflow']);
config.load(config.default);
jest.restoreAllMocks();
});
afterAll(async () => {
await testDb.terminate();
});
describe('init()', () => {
test('should call `ExternalHooks.run()`', async () => {
const runSpy = jest.spyOn(externalHooks, 'run');
await activeWorkflowRunner.init();
expect(runSpy).toHaveBeenCalledTimes(1);
const [hook, arg] = runSpy.mock.calls[0];
expect(hook).toBe('activeWorkflows.initialized');
expect(arg).toBeEmptyArray();
});
test('should start with no active workflows', async () => {
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(0);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(0);
});
test('should start with one active workflow', async () => {
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(1);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(1);
});
test('should start with multiple active workflows', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(2);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(2);
});
test('should pre-check that every workflow can be activated', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
const precheckSpy = jest
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
.mockReturnValue(true);
await activeWorkflowRunner.init();
expect(precheckSpy).toHaveBeenCalledTimes(2);
});
});
describe('removeAll()', () => {
test('should remove all active workflows from memory', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
await activeWorkflowRunner.removeAll();
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(0);
});
});
describe('remove()', () => {
test('should call `ActiveWorkflowRunner.clearWebhooks()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
const clearWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'clearWebhooks');
await activeWorkflowRunner.init();
await activeWorkflowRunner.remove(workflow.id);
expect(clearWebhooksSpy).toHaveBeenCalledTimes(1);
});
});
describe('isActive()', () => {
test('should return `true` for active workflow in storage', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
await expect(isActiveInStorage).resolves.toBe(true);
});
test('should return `false` for inactive workflow in storage', async () => {
const workflow = await testDb.createWorkflow({ active: false }, owner);
await activeWorkflowRunner.init();
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
await expect(isActiveInStorage).resolves.toBe(false);
});
});
describe('runWorkflow()', () => {
test('should call `WorkflowRunner.run()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const additionalData = await AdditionalData.getBase('fake-user-id');
const runSpy = jest
.spyOn(WorkflowRunner.prototype, 'run')
.mockResolvedValue('fake-execution-id');
const [node] = workflow.nodes;
await activeWorkflowRunner.runWorkflow(workflow, node, [[]], additionalData, 'trigger');
expect(runSpy).toHaveBeenCalledTimes(1);
});
});
describe('executeErrorWorkflow()', () => {
test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
const [node] = workflow.nodes;
const error = new NodeOperationError(node, 'Fake error message');
const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow');
await activeWorkflowRunner.init();
activeWorkflowRunner.executeErrorWorkflow(error, workflow, 'trigger');
expect(executeSpy).toHaveBeenCalledTimes(1);
});
test('should be called on failure to activate due to 401', async () => {
const storedWorkflow = await testDb.createWorkflow({ active: true }, owner);
const [node] = storedWorkflow.nodes;
const executeSpy = jest.spyOn(activeWorkflowRunner, 'executeErrorWorkflow');
jest.spyOn(activeWorkflowRunner, 'add').mockImplementation(() => {
throw new NodeApiError(node, {
httpCode: '401',
message: 'Authorization failed - please check your credentials',
});
});
await activeWorkflowRunner.init();
expect(executeSpy).toHaveBeenCalledTimes(1);
const [error, workflow] = executeSpy.mock.calls[0];
expect(error.message).toContain('Authorization');
expect(workflow.id).toBe(storedWorkflow.id);
});
});
describe('add()', () => {
describe('in single-main scenario', () => {
test('leader should add webhooks, triggers and pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('in multi-main scenario', () => {
describe('leader', () => {
test('on regular activation mode, leader should add webhooks only', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
const mode = 'leadershipChange';
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('follower', () => {
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: false });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
});
});
});
});
describe('addWebhooks()', () => {
test('should call `WebhookService.storeWebhook()`', async () => {
const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData;
const mockWebhookEntity = { webhookPath: 'fake-path' } as unknown as WebhookEntity;
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([mockWebhook]);
webhookService.createWebhook.mockReturnValue(mockWebhookEntity);
const additionalData = await AdditionalData.getBase('fake-user-id');
const dbWorkflow = await testDb.createWorkflow({ active: true }, owner);
const workflow = new Workflow({
id: dbWorkflow.id,
name: dbWorkflow.name,
nodes: dbWorkflow.nodes,
connections: dbWorkflow.connections,
active: dbWorkflow.active,
nodeTypes: Container.get(NodeTypes),
staticData: dbWorkflow.staticData,
settings: dbWorkflow.settings,
});
const [node] = dbWorkflow.nodes;
jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node);
jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true);
jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined);
await activeWorkflowRunner.addWebhooks(workflow, additionalData, 'trigger', 'init');
expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1);
});
});

View file

@ -459,10 +459,10 @@ export async function createWorkflow(attributes: Partial<WorkflowEntity> = {}, u
nodes: nodes ?? [
{
id: 'uuid-1234',
name: 'Start',
name: 'Schedule Trigger',
parameters: {},
position: [-20, 260],
type: 'n8n-nodes-base.start',
type: 'n8n-nodes-base.scheduleTrigger',
typeVersion: 1,
},
],

View file

@ -1,6 +1,6 @@
import { Container } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import type { INode } from 'n8n-workflow';
import { type INode } from 'n8n-workflow';
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
@ -16,6 +16,8 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { AUTH_COOKIE_NAME } from '@/constants';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { mockInstance } from './mocking';
import { mockNodeTypesData } from '../../../unit/Helpers';
export { mockInstance } from './mocking';
export { setupTestServer } from './testServer';
@ -166,3 +168,15 @@ export function makeWorkflow(options?: {
}
export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] };
export function setSchedulerAsLoadedNode() {
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
Object.assign(nodesAndCredentials, {
loadedNodes: mockNodeTypesData(['scheduleTrigger'], {
addTrigger: true,
}),
known: { nodes: {}, credentials: {} },
types: { nodes: [], credentials: [] },
});
}

View file

@ -1,278 +0,0 @@
import { v4 as uuid } from 'uuid';
import { mocked } from 'jest-mock';
import { Container } from 'typedi';
import type { INode } from 'n8n-workflow';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { Role } from '@db/entities/Role';
import { User } from '@db/entities/User';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions';
import { SecretsHelper } from '@/SecretsHelpers';
import { WebhookService } from '@/services/webhook.service';
import { VariablesService } from '@/environments/variables/variables.service';
import { mockInstance } from '../integration/shared/utils/';
import { randomEmail, randomName } from '../integration/shared/random';
import * as Helpers from './Helpers';
/**
* TODO:
* - test workflow webhooks activation (that trigger `executeWebhook`and other webhook methods)
* - test activation error catching and getters such as `getActivationError` (requires building a workflow that fails to activate)
* - test queued workflow activation functions (might need to create a non-working workflow to test this)
*/
let databaseActiveWorkflowsCount = 0;
let databaseActiveWorkflowsList: WorkflowEntity[] = [];
const generateWorkflows = (count: number): WorkflowEntity[] => {
const workflows: WorkflowEntity[] = [];
const ownerRole = new Role();
ownerRole.scope = 'workflow';
ownerRole.name = 'owner';
ownerRole.id = '1';
const owner = new User();
owner.id = uuid();
owner.firstName = randomName();
owner.lastName = randomName();
owner.email = randomEmail();
for (let i = 0; i < count; i++) {
const workflow = new WorkflowEntity();
Object.assign(workflow, {
id: (i + 1).toString(),
name: randomName(),
active: true,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [
{
parameters: {
rule: {
interval: [{}],
},
},
id: uuid(),
name: 'Schedule Trigger',
type: 'n8n-nodes-base.scheduleTrigger',
typeVersion: 1,
position: [900, 460],
},
],
connections: {},
tags: [],
});
const sharedWorkflow = new SharedWorkflow();
sharedWorkflow.workflowId = workflow.id;
sharedWorkflow.role = ownerRole;
sharedWorkflow.user = owner;
workflow.shared = [sharedWorkflow];
workflows.push(workflow);
}
databaseActiveWorkflowsList = workflows;
return workflows;
};
const MOCK_NODE_TYPES_DATA = Helpers.mockNodeTypesData(['scheduleTrigger'], {
addTrigger: true,
});
jest.mock('@/Db', () => {
return {
collections: {
Workflow: {
find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)),
findOne: jest.fn(async (searchParams) => {
return databaseActiveWorkflowsList.find(
(workflow) => workflow.id === searchParams.where.id.toString(),
);
}),
update: jest.fn(),
createQueryBuilder: jest.fn(() => {
const fakeQueryBuilder = {
update: () => fakeQueryBuilder,
set: () => fakeQueryBuilder,
where: () => fakeQueryBuilder,
execute: async () => {},
};
return fakeQueryBuilder;
}),
},
},
};
});
const workflowCheckIfCanBeActivated = jest.fn(() => true);
jest
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
.mockImplementation(workflowCheckIfCanBeActivated);
const removeFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'remove');
const removeWebhooksFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'removeWorkflowWebhooks');
const workflowRunnerRun = jest.spyOn(WorkflowRunner.prototype, 'run');
const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
WorkflowExecuteAdditionalData,
'executeErrorWorkflow',
);
describe('ActiveWorkflowRunner', () => {
mockInstance(ActiveExecutions);
const externalHooks = mockInstance(ExternalHooks);
const webhookService = mockInstance(WebhookService);
mockInstance(Push);
mockInstance(SecretsHelper);
const variablesService = mockInstance(VariablesService);
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
Object.assign(nodesAndCredentials, {
loadedNodes: MOCK_NODE_TYPES_DATA,
known: { nodes: {}, credentials: {} },
types: { nodes: [], credentials: [] },
});
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
beforeAll(async () => {
variablesService.getAllCached.mockResolvedValue([]);
});
afterEach(async () => {
await activeWorkflowRunner.removeAll();
databaseActiveWorkflowsCount = 0;
databaseActiveWorkflowsList = [];
jest.clearAllMocks();
});
test('Should initialize activeWorkflowRunner with empty list of active workflows and call External Hooks', async () => {
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalledTimes(1);
});
test('Should initialize activeWorkflowRunner with one active workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled();
});
test('Should make sure function checkIfWorkflowCanBeActivated was called for every workflow', async () => {
databaseActiveWorkflowsCount = 2;
await activeWorkflowRunner.init();
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to removeAll should remove every workflow', async () => {
databaseActiveWorkflowsCount = 2;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
await activeWorkflowRunner.removeAll();
expect(removeFunction).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to remove should also call removeWorkflowWebhooks', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
await activeWorkflowRunner.remove('1');
expect(removeWebhooksFunction).toHaveBeenCalledTimes(1);
});
test('Call to isActive should return true for valid workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('1')).toBe(true);
});
test('Call to isActive should return false for invalid workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('2')).toBe(false);
});
test('Calling add should call checkIfWorkflowCanBeActivated', async () => {
// Initialize with default (0) workflows
await activeWorkflowRunner.init();
generateWorkflows(1);
await activeWorkflowRunner.add('1', 'activate');
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(1);
});
test('runWorkflow should call run method in WorkflowRunner', async () => {
await activeWorkflowRunner.init();
const workflow = generateWorkflows(1);
const additionalData = await WorkflowExecuteAdditionalData.getBase('fake-user-id');
workflowRunnerRun.mockResolvedValueOnce('invalid-execution-id');
await activeWorkflowRunner.runWorkflow(
workflow[0],
workflow[0].nodes[0],
[[]],
additionalData,
'trigger',
);
expect(workflowRunnerRun).toHaveBeenCalledTimes(1);
});
test('executeErrorWorkflow should call function with same name in WorkflowExecuteAdditionalData', async () => {
const workflowData = generateWorkflows(1)[0];
const error = new NodeOperationError(workflowData.nodes[0], 'Fake error message');
await activeWorkflowRunner.init();
activeWorkflowRunner.executeErrorWorkflow(error, workflowData, 'trigger');
expect(workflowExecuteAdditionalDataExecuteErrorWorkflowSpy).toHaveBeenCalledTimes(1);
});
describe('init()', () => {
it('should execute error workflow on failure to activate due to 401', async () => {
databaseActiveWorkflowsCount = 1;
jest.spyOn(ActiveWorkflowRunner.prototype, 'add').mockImplementation(() => {
throw new NodeApiError(
{
id: 'a75dcd1b-9fed-4643-90bd-75933d67936c',
name: 'Github Trigger',
type: 'n8n-nodes-base.githubTrigger',
typeVersion: 1,
position: [0, 0],
} as INode,
{
httpCode: '401',
message: 'Authorization failed - please check your credentials',
},
);
});
const executeSpy = jest.spyOn(ActiveWorkflowRunner.prototype, 'executeErrorWorkflow');
await activeWorkflowRunner.init();
const [error, workflow] = executeSpy.mock.calls[0];
expect(error.message).toContain('Authorization');
expect(workflow.id).toBe('1');
});
});
});

View file

@ -1,5 +1,3 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
import { CronJob } from 'cron';
import type {
@ -14,62 +12,64 @@ import type {
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { LoggerProxy as Logger, toCronExpression, WorkflowActivationError } from 'n8n-workflow';
import {
LoggerProxy as Logger,
toCronExpression,
WorkflowActivationError,
WorkflowDeactivationError,
} from 'n8n-workflow';
import type { IWorkflowData } from './Interfaces';
export class ActiveWorkflows {
private workflowData: {
[key: string]: IWorkflowData;
private activeWorkflows: {
[workflowId: string]: IWorkflowData;
} = {};
/**
* Returns if the workflow is active
*
* @param {string} id The id of the workflow to check
* Returns if the workflow is active in memory.
*/
isActive(id: string): boolean {
return this.workflowData.hasOwnProperty(id);
isActive(workflowId: string) {
return this.activeWorkflows.hasOwnProperty(workflowId);
}
/**
* Returns the ids of the currently active workflows
*
* Returns the IDs of the currently active workflows in memory.
*/
allActiveWorkflows(): string[] {
return Object.keys(this.workflowData);
allActiveWorkflows() {
return Object.keys(this.activeWorkflows);
}
/**
* Returns the Workflow data for the workflow with
* the given id if it is currently active
*
* Returns the workflow data for the given ID if currently active in memory.
*/
get(id: string): IWorkflowData | undefined {
return this.workflowData[id];
get(workflowId: string) {
return this.activeWorkflows[workflowId];
}
/**
* Makes a workflow active
*
* @param {string} id The id of the workflow to activate
* @param {string} workflowId The id of the workflow to activate
* @param {Workflow} workflow The workflow to activate
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
*/
async add(
id: string,
workflowId: string,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions,
): Promise<void> {
this.workflowData[id] = {};
) {
this.activeWorkflows[workflowId] = {};
const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined;
this.workflowData[id].triggerResponses = [];
this.activeWorkflows[workflowId].triggerResponses = [];
for (const triggerNode of triggerNodes) {
try {
triggerResponse = await workflow.runTrigger(
@ -82,46 +82,49 @@ export class ActiveWorkflows {
if (triggerResponse !== undefined) {
// If a response was given save it
this.workflowData[id].triggerResponses!.push(triggerResponse);
this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse);
}
} catch (error) {
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: triggerNode },
{ cause: error, node: triggerNode },
);
}
}
const pollNodes = workflow.getPollNodes();
if (pollNodes.length) {
this.workflowData[id].pollResponses = [];
for (const pollNode of pollNodes) {
try {
this.workflowData[id].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
);
} catch (error) {
throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: pollNode },
);
}
const pollingNodes = workflow.getPollNodes();
if (pollingNodes.length === 0) return;
this.activeWorkflows[workflowId].pollResponses = [];
for (const pollNode of pollingNodes) {
try {
this.activeWorkflows[workflowId].pollResponses!.push(
await this.activatePolling(
pollNode,
workflow,
additionalData,
getPollFunctions,
mode,
activation,
),
);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowActivationError(
`There was a problem activating the workflow: "${error.message}"`,
{ cause: error, node: pollNode },
);
}
}
}
/**
* Activates polling for the given node
*
*/
async activatePolling(
node: INode,
@ -159,7 +162,7 @@ export class ActiveWorkflows {
if (testingTrigger) {
throw error;
}
pollFunctions.__emitError(error);
pollFunctions.__emitError(error as Error);
}
};
@ -193,59 +196,49 @@ export class ActiveWorkflows {
}
/**
* Makes a workflow inactive
*
* @param {string} id The id of the workflow to deactivate
* Makes a workflow inactive in memory.
*/
async remove(id: string): Promise<boolean> {
if (!this.isActive(id)) {
// Workflow is currently not registered
Logger.warn(
`The workflow with the id "${id}" is currently not active and can so not be removed`,
);
async remove(workflowId: string) {
if (!this.isActive(workflowId)) {
Logger.warn(`Cannot deactivate already inactive workflow ID "${workflowId}"`);
return false;
}
const workflowData = this.workflowData[id];
const w = this.activeWorkflows[workflowId];
if (workflowData.triggerResponses) {
for (const triggerResponse of workflowData.triggerResponses) {
if (triggerResponse.closeFunction) {
try {
await triggerResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
if (workflowData.pollResponses) {
for (const pollResponse of workflowData.pollResponses) {
if (pollResponse.closeFunction) {
try {
await pollResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating polling trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
delete this.workflowData[id];
delete this.activeWorkflows[workflowId];
return true;
}
async removeAllTriggerAndPollerBasedWorkflows() {
for (const workflowId of Object.keys(this.activeWorkflows)) {
const w = this.activeWorkflows[workflowId];
w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
}
}
private async close(
response: ITriggerResponse | IPollResponse,
workflowId: string,
target: 'trigger' | 'poller',
) {
if (!response.closeFunction) return;
try {
await response.closeFunction();
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowDeactivationError(
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
{ cause: error, workflowId },
);
}
}
}

View file

@ -1905,7 +1905,14 @@ export type WorkflowExecuteMode =
| 'retry'
| 'trigger'
| 'webhook';
export type WorkflowActivateMode = 'init' | 'create' | 'update' | 'activate' | 'manual';
export type WorkflowActivateMode =
| 'init'
| 'create'
| 'update'
| 'activate'
| 'manual'
| 'leadershipChange';
export interface IWorkflowHooksOptionalParameters {
parentProcessMode?: string;

View file

@ -5,6 +5,7 @@ interface WorkflowActivationErrorOptions {
cause?: Error;
node?: INode;
severity?: Severity;
workflowId?: string;
}
/**
@ -13,7 +14,12 @@ interface WorkflowActivationErrorOptions {
export class WorkflowActivationError extends ExecutionBaseError {
node: INode | undefined;
constructor(message: string, { cause, node, severity }: WorkflowActivationErrorOptions) {
workflowId: string | undefined;
constructor(
message: string,
{ cause, node, severity, workflowId }: WorkflowActivationErrorOptions = {},
) {
let error = cause as Error;
if (cause instanceof ExecutionBaseError) {
error = new Error(cause.message);
@ -23,11 +29,14 @@ export class WorkflowActivationError extends ExecutionBaseError {
}
super(message, { cause: error });
this.node = node;
this.workflowId = workflowId;
this.message = message;
if (severity) this.severity = severity;
}
}
export class WorkflowDeactivationError extends WorkflowActivationError {}
export class WebhookPathAlreadyTakenError extends WorkflowActivationError {
constructor(nodeName: string, cause?: Error) {
super(