feat(core): Coordinate workflow activation in multiple main scenario in internal API (#7566)

Story: https://linear.app/n8n/issue/PAY-926

This PR coordinates workflow activation on instance startup and on
leadership change in multiple main scenario in the internal API. Part 3
on manual workflow activation and deactivation will be a separate PR.

### Part 1: Instance startup

In multi-main scenario, on starting an instance...
- [x] If the instance is the leader, it should add webhooks, triggers
and pollers.
- [x] If the instance is the follower, it should not add webhooks,
triggers or pollers.
- [x] Unit tests.

### Part 2: Leadership change 

In multi-main scenario, if the main instance leader dies…

- [x] The new main instance leader must activate all trigger- and
poller-based workflows, excluding webhook-based workflows.
- [x] The old main instance leader must deactivate all trigger- and
poller-based workflows, excluding webhook-based workflows.
- [x] Unit tests.

To test, start two instances and check behavior on startup and
leadership change:

```
EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug npm run start

EXECUTIONS_MODE=queue N8N_LEADER_SELECTION_ENABLED=true N8N_LICENSE_TENANT_ID=... N8N_LICENSE_ACTIVATION_KEY=... N8N_LOG_LEVEL=debug N8N_PORT=5679 npm run start
```
This commit is contained in:
Iván Ovejero 2023-11-07 13:48:48 +01:00 committed by GitHub
parent 151e60f829
commit c857e42677
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 839 additions and 618 deletions

View file

@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Service } from 'typedi'; import Container, { Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type { import type {
@ -35,8 +35,6 @@ import type express from 'express';
import * as Db from '@/Db'; import * as Db from '@/Db';
import type { import type {
IActivationError,
IQueuedWorkflowActivations,
IResponseCallbackData, IResponseCallbackData,
IWebhookManager, IWebhookManager,
IWorkflowDb, IWorkflowDb,
@ -65,99 +63,76 @@ import { webhookNotFoundErrorMessage } from './utils';
import { In } from 'typeorm'; import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service'; import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger'; import { Logger } from './Logger';
import { WorkflowRepository } from '@/databases/repositories';
import config from '@/config';
import type { MultiMainInstancePublisher } from './services/orchestration/main/MultiMainInstance.publisher.ee';
const WEBHOOK_PROD_UNREGISTERED_HINT = const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@Service() @Service()
export class ActiveWorkflowRunner implements IWebhookManager { export class ActiveWorkflowRunner implements IWebhookManager {
private activeWorkflows = new ActiveWorkflows(); activeWorkflows = new ActiveWorkflows();
private activationErrors: { private activationErrors: {
[key: string]: IActivationError; [workflowId: string]: {
time: number; // ms
error: {
message: string;
};
};
} = {}; } = {};
private queuedWorkflowActivations: { private queuedActivations: {
[key: string]: IQueuedWorkflowActivations; [workflowId: string]: {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
};
} = {}; } = {};
isMultiMainScenario =
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
multiMainInstancePublisher: MultiMainInstancePublisher | undefined;
constructor( constructor(
private readonly logger: Logger, private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions, private readonly activeExecutions: ActiveExecutions,
private readonly externalHooks: ExternalHooks, private readonly externalHooks: ExternalHooks,
private readonly nodeTypes: NodeTypes, private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService, private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
) {} ) {}
async init() { async init() {
// Get the active workflows from database if (this.isMultiMainScenario) {
const { MultiMainInstancePublisher } = await import(
// NOTE '@/services/orchestration/main/MultiMainInstance.publisher.ee'
// Here I guess we can have a flag on the workflow table like hasTrigger
// so instead of pulling all the active webhooks just pull the actives that have a trigger
const workflowsData: IWorkflowDb[] = (await Db.collections.Workflow.find({
where: { active: true },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
})) as IWorkflowDb[];
if (workflowsData.length !== 0) {
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');
for (const workflowData of workflowsData) {
this.logger.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
this.logger.debug(`Initializing active workflow "${workflowData.name}" (startup)`, {
workflowName: workflowData.name,
workflowId: workflowData.id,
});
try {
await this.add(workflowData.id, 'init', workflowData);
this.logger.verbose(`Successfully started workflow "${workflowData.name}"`, {
workflowName: workflowData.name,
workflowId: workflowData.id,
});
this.logger.info(' => Started');
} catch (error) {
ErrorReporter.error(error);
this.logger.info(
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
); );
this.logger.info(` ${error.message}`); this.multiMainInstancePublisher = Container.get(MultiMainInstancePublisher);
this.logger.error(
`Issue on initial workflow activation try "${workflowData.name}" (startup)`,
{
workflowName: workflowData.name,
workflowId: workflowData.id,
},
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.executeErrorWorkflow(error, workflowData, 'internal');
if (!error.message.includes('Authorization')) { await this.multiMainInstancePublisher.init();
// Keep on trying to activate the workflow if not an auth issue
this.addQueuedWorkflowActivation('init', workflowData);
}
}
}
this.logger.verbose('Finished initializing active workflows (startup)');
} }
await this.addActiveWorkflows('init');
await this.externalHooks.run('activeWorkflows.initialized', []); await this.externalHooks.run('activeWorkflows.initialized', []);
await this.webhookService.populateCache(); await this.webhookService.populateCache();
} }
/** /**
* Removes all the currently active workflows * Removes all the currently active workflows from memory.
*/ */
async removeAll(): Promise<void> { async removeAll() {
let activeWorkflowIds: string[] = []; let activeWorkflowIds: string[] = [];
this.logger.verbose('Call to remove all active workflows received (removeAll)'); this.logger.verbose('Call to remove all active workflows received (removeAll)');
activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows()); activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());
const activeWorkflows = await this.getActiveWorkflows(); const activeWorkflows = await this.allActiveInStorage();
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows]; activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
// Make sure IDs are unique // Make sure IDs are unique
activeWorkflowIds = Array.from(new Set(activeWorkflowIds)); activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
@ -284,76 +259,86 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
/** /**
* Returns the ids of the currently active workflows * Returns the ids of the currently active workflows from memory.
*/ */
async getActiveWorkflows(user?: User): Promise<string[]> { allActiveInMemory() {
let activeWorkflows: WorkflowEntity[] = []; return this.activeWorkflows.allActiveWorkflows();
if (!user || user.globalRole.name === 'owner') { }
activeWorkflows = await Db.collections.Workflow.find({
/**
* Get the IDs of active workflows from storage.
*/
async allActiveInStorage(user?: User) {
const isFullAccess = !user || user.globalRole.name === 'owner';
if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'], select: ['id'],
where: { active: true }, where: { active: true },
}); });
return activeWorkflows return activeWorkflows
.map((workflow) => workflow.id) .map((workflow) => workflow.id)
.filter((workflowId) => !this.activationErrors[workflowId]); .filter((workflowId) => !this.activationErrors[workflowId]);
} else { }
const active = await Db.collections.Workflow.find({
select: ['id'],
where: { active: true },
});
const activeIds = active.map((workflow) => workflow.id);
const where = whereClause({ const where = whereClause({
user, user,
entityType: 'workflow', entityType: 'workflow',
}); });
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});
const activeIds = activeWorkflows.map((workflow) => workflow.id);
Object.assign(where, { workflowId: In(activeIds) }); Object.assign(where, { workflowId: In(activeIds) });
const shared = await Db.collections.SharedWorkflow.find({
const sharings = await Db.collections.SharedWorkflow.find({
select: ['workflowId'], select: ['workflowId'],
where, where,
}); });
return shared
.map((id) => id.workflowId) return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !this.activationErrors[workflowId]); .filter((workflowId) => !this.activationErrors[workflowId]);
} }
}
/** /**
* Returns if the workflow is active * Returns if the workflow is stored as `active`.
* *
* @param {string} id The id of the workflow to check * @important Do not confuse with `ActiveWorkflows.isActive()`,
* which checks if the workflow is active in memory.
*/ */
async isActive(id: string): Promise<boolean> { async isActive(workflowId: string) {
const workflow = await Db.collections.Workflow.findOne({ const workflow = await this.workflowRepository.findOne({
select: ['active'], select: ['active'],
where: { id }, where: { id: workflowId },
}); });
return !!workflow?.active; return !!workflow?.active;
} }
/** /**
* Return error if there was a problem activating the workflow * Return error if there was a problem activating the workflow
*
* @param {string} id The id of the workflow to return the error of
*/ */
getActivationError(id: string): IActivationError | undefined { getActivationError(workflowId: string) {
if (this.activationErrors[id] === undefined) { return this.activationErrors[workflowId];
return undefined;
}
return this.activationErrors[id];
} }
/** /**
* Adds all the webhooks of the workflow * Register workflow-defined webhooks in the `workflow_entity` table.
*/ */
async addWorkflowWebhooks( async addWebhooks(
workflow: Workflow, workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalDataWorkflow, additionalData: IWorkflowExecuteAdditionalDataWorkflow,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
): Promise<void> { ) {
const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true);
let path = '' as string | undefined; let path = '';
for (const webhookData of webhooks) { for (const webhookData of webhooks) {
const node = workflow.getNode(webhookData.node) as INode; const node = workflow.getNode(webhookData.node) as INode;
@ -401,7 +386,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
try { try {
await this.removeWorkflowWebhooks(workflow.id); await this.clearWebhooks(workflow.id);
} catch (error1) { } catch (error1) {
ErrorReporter.error(error1); ErrorReporter.error(error1);
this.logger.error( this.logger.error(
@ -428,14 +413,14 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
/** /**
* Remove all the webhooks of the workflow * Clear workflow-defined webhooks from the `webhook_entity` table.
*
*/ */
async removeWorkflowWebhooks(workflowId: string): Promise<void> { async clearWebhooks(workflowId: string) {
const workflowData = await Db.collections.Workflow.findOne({ const workflowData = await Db.collections.Workflow.findOne({
where: { id: workflowId }, where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole'], relations: ['shared', 'shared.user', 'shared.user.globalRole'],
}); });
if (workflowData === null) { if (workflowData === null) {
throw new Error(`Could not find workflow with id "${workflowId}"`); throw new Error(`Could not find workflow with id "${workflowId}"`);
} }
@ -468,11 +453,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.webhookService.deleteWorkflowWebhooks(workflowId); await this.webhookService.deleteWorkflowWebhooks(workflowId);
} }
/**
* Runs the given workflow
*
*/
async runWorkflow( async runWorkflow(
workflowData: IWorkflowDb, workflowData: IWorkflowDb,
node: INode, node: INode,
@ -520,7 +500,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/** /**
* Return poll function which gets the global functions from n8n-core * Return poll function which gets the global functions from n8n-core
* and overwrites the emit to be able to start it in subprocess * and overwrites the emit to be able to start it in subprocess
*
*/ */
getExecutePollFunctions( getExecutePollFunctions(
workflowData: IWorkflowDb, workflowData: IWorkflowDb,
@ -576,7 +555,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/** /**
* Return trigger function which gets the global functions from n8n-core * Return trigger function which gets the global functions from n8n-core
* and overwrites the emit to be able to start it in subprocess * and overwrites the emit to be able to start it in subprocess
*
*/ */
getExecuteTriggerFunctions( getExecuteTriggerFunctions(
workflowData: IWorkflowDb, workflowData: IWorkflowDb,
@ -647,7 +625,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
); );
this.executeErrorWorkflow(activationError, workflowData, mode); this.executeErrorWorkflow(activationError, workflowData, mode);
this.addQueuedWorkflowActivation(activation, workflowData); this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
}; };
return returnFunctions; return returnFunctions;
}; };
@ -676,113 +654,175 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
/** /**
* Makes a workflow active * Register as active in memory all workflows stored as `active`.
*/
async addActiveWorkflows(activationMode: WorkflowActivateMode) {
const dbWorkflows = await this.workflowRepository.getAllActive();
if (dbWorkflows.length === 0) return;
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');
for (const dbWorkflow of dbWorkflows) {
this.logger.info(` - ${dbWorkflow.display()}`);
this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
try {
await this.add(dbWorkflow.id, activationMode, dbWorkflow);
this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, {
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
});
this.logger.info(' => Started');
} catch (error) {
ErrorReporter.error(error);
this.logger.info(
' => ERROR: Workflow could not be activated on first try, keep on trying if not an auth issue',
);
this.logger.info(` ${error.message}`);
this.logger.error(
`Issue on initial workflow activation try of ${dbWorkflow.display()} (startup)`,
{
workflowName: dbWorkflow.name,
workflowId: dbWorkflow.id,
},
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this.executeErrorWorkflow(error, dbWorkflow, 'internal');
// do not keep trying to activate on authorization error
if (error.message.includes('Authorization')) continue;
this.addQueuedWorkflowActivation('init', dbWorkflow);
}
}
this.logger.verbose('Finished activating workflows (startup)');
}
async addAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...');
await this.addActiveWorkflows('leadershipChange');
}
async removeAllTriggerAndPollerBasedWorkflows() {
this.logger.debug('[Leadership change] Removing all trigger- and poller-based workflows...');
await this.activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();
}
/**
* Register a workflow as active.
* *
* @param {string} workflowId The id of the workflow to activate * An activatable workflow may be webhook-, trigger-, or poller-based:
* @param {IWorkflowDb} [workflowData] If workflowData is given it saves the DB query *
* - A `webhook` is an HTTP-based node that can start a workflow when called
* by a third-party service.
* - A `poller` is an HTTP-based node that can start a workflow when detecting
* a change while regularly checking a third-party service.
* - A `trigger` is any non-HTTP-based node that can start a workflow, e.g. a
* time-based node like Schedule Trigger or a message-queue-based node.
*
* Note that despite the name, most "trigger" nodes are actually webhook-based
* and so qualify as `webhook`, e.g. Stripe Trigger.
*
* Triggers and pollers are registered as active in memory at `ActiveWorkflows`,
* but webhooks are registered by being entered in the `webhook_entity` table,
* since webhooks do not require continuous execution.
*/ */
async add( async add(
workflowId: string, workflowId: string,
activation: WorkflowActivateMode, activationMode: WorkflowActivateMode,
workflowData?: IWorkflowDb, existingWorkflow?: WorkflowEntity,
): Promise<void> { ) {
let workflowInstance: Workflow; let workflow: Workflow;
try {
if (workflowData === undefined) { let shouldAddWebhooks = true;
workflowData = (await Db.collections.Workflow.findOne({ let shouldAddTriggersAndPollers = true;
where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'], if (this.isMultiMainScenario && activationMode !== 'leadershipChange') {
})) as IWorkflowDb; shouldAddWebhooks = this.multiMainInstancePublisher?.isLeader ?? false;
shouldAddTriggersAndPollers = this.multiMainInstancePublisher?.isLeader ?? false;
} }
if (!workflowData) { if (this.isMultiMainScenario && activationMode === 'leadershipChange') {
throw new Error(`Could not find workflow with id "${workflowId}".`); shouldAddWebhooks = false;
shouldAddTriggersAndPollers = true;
} }
workflowInstance = new Workflow({
id: workflowId, try {
name: workflowData.name, const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId));
nodes: workflowData.nodes,
connections: workflowData.connections, if (!dbWorkflow) {
active: workflowData.active, throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`);
}
workflow = new Workflow({
id: dbWorkflow.id,
name: dbWorkflow.name,
nodes: dbWorkflow.nodes,
connections: dbWorkflow.connections,
active: dbWorkflow.active,
nodeTypes: this.nodeTypes, nodeTypes: this.nodeTypes,
staticData: workflowData.staticData, staticData: dbWorkflow.staticData,
settings: workflowData.settings, settings: dbWorkflow.settings,
}); });
const canBeActivated = workflowInstance.checkIfWorkflowCanBeActivated(STARTING_NODES); const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES);
if (!canBeActivated) { if (!canBeActivated) {
this.logger.error(`Unable to activate workflow "${workflowData.name}"`); throw new WorkflowActivationError(
throw new Error( `Workflow ${dbWorkflow.display()} has no node to start the workflow - at least one trigger, poller or webhook node is required`,
'The workflow can not be activated because it does not contain any nodes which could start the workflow. Only workflows which have trigger or webhook nodes can be activated.',
); );
} }
const mode = 'trigger'; const sharing = dbWorkflow.shared.find((shared) => shared.role.name === 'owner');
const workflowOwner = (workflowData as WorkflowEntity).shared.find(
(shared) => shared.role.name === 'owner', if (!sharing) {
); throw new WorkflowActivationError(`Workflow ${dbWorkflow.display()} has no owner`);
if (!workflowOwner) {
throw new Error('Workflow cannot be activated because it has no owner');
} }
const additionalData = await WorkflowExecuteAdditionalData.getBase(workflowOwner.user.id);
const getTriggerFunctions = this.getExecuteTriggerFunctions(
workflowData,
additionalData,
mode,
activation,
);
const getPollFunctions = this.getExecutePollFunctions(
workflowData,
additionalData,
mode,
activation,
);
// Add the workflows which have webhooks defined const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id);
await this.addWorkflowWebhooks(workflowInstance, additionalData, mode, activation);
if ( if (shouldAddWebhooks) {
workflowInstance.getTriggerNodes().length !== 0 || this.logger.debug('============');
workflowInstance.getPollNodes().length !== 0 this.logger.debug(`Adding webhooks for workflow "${dbWorkflow.display()}"`);
) { this.logger.debug('============');
await this.activeWorkflows.add(
workflowId, await this.addWebhooks(workflow, additionalData, 'trigger', activationMode);
workflowInstance, }
if (shouldAddTriggersAndPollers) {
this.logger.debug('============');
this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`);
this.logger.debug('============');
await this.addTriggersAndPollers(dbWorkflow, workflow, {
activationMode,
executionMode: 'trigger',
additionalData, additionalData,
mode,
activation,
getTriggerFunctions,
getPollFunctions,
);
this.logger.verbose(`Successfully activated workflow "${workflowData.name}"`, {
workflowId,
workflowName: workflowData.name,
}); });
} }
// Workflow got now successfully activated so make sure nothing is left in the queue // Workflow got now successfully activated so make sure nothing is left in the queue
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
if (this.activationErrors[workflowId] !== undefined) { if (this.activationErrors[workflowId]) {
// If there were activation errors delete them
delete this.activationErrors[workflowId]; delete this.activationErrors[workflowId];
} }
if (workflowInstance.id) { const triggerCount = this.countTriggers(workflow, additionalData);
// Sum all triggers in the workflow, EXCLUDING the manual trigger await WorkflowsService.updateWorkflowTriggerCount(workflow.id, triggerCount);
const triggerFilter = (nodeType: INodeType) =>
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
const triggerCount =
workflowInstance.queryNodes(triggerFilter).length +
workflowInstance.getPollNodes().length +
WebhookHelpers.getWorkflowWebhooks(workflowInstance, additionalData, undefined, true)
.length;
await WorkflowsService.updateWorkflowTriggerCount(workflowInstance.id, triggerCount);
}
} catch (error) { } catch (error) {
// There was a problem activating the workflow
// Save the error
this.activationErrors[workflowId] = { this.activationErrors[workflowId] = {
time: new Date().getTime(), time: new Date().getTime(),
error: { error: {
@ -795,7 +835,24 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// If for example webhooks get created it sometimes has to save the // If for example webhooks get created it sometimes has to save the
// id of them in the static data. So make sure that data gets persisted. // id of them in the static data. So make sure that data gets persisted.
await WorkflowsService.saveStaticData(workflowInstance!); await WorkflowsService.saveStaticData(workflow);
}
/**
* Count all triggers in the workflow, excluding Manual Trigger.
*/
private countTriggers(
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalDataWorkflow,
) {
const triggerFilter = (nodeType: INodeType) =>
!!nodeType.trigger && !nodeType.description.name.includes('manualTrigger');
return (
workflow.queryNodes(triggerFilter).length +
workflow.getPollNodes().length +
WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true).length
);
} }
/** /**
@ -803,10 +860,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
* Meaning it will keep on trying to activate it in regular * Meaning it will keep on trying to activate it in regular
* amounts indefinitely. * amounts indefinitely.
*/ */
addQueuedWorkflowActivation( addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) {
activationMode: WorkflowActivateMode,
workflowData: IWorkflowDb,
): void {
const workflowId = workflowData.id; const workflowId = workflowData.id;
const workflowName = workflowData.name; const workflowName = workflowData.name;
@ -819,7 +873,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await this.add(workflowId, activationMode, workflowData); await this.add(workflowId, activationMode, workflowData);
} catch (error) { } catch (error) {
ErrorReporter.error(error); ErrorReporter.error(error);
let lastTimeout = this.queuedWorkflowActivations[workflowId].lastTimeout; let lastTimeout = this.queuedActivations[workflowId].lastTimeout;
if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) { if (lastTimeout < WORKFLOW_REACTIVATE_MAX_TIMEOUT) {
lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT); lastTimeout = Math.min(lastTimeout * 2, WORKFLOW_REACTIVATE_MAX_TIMEOUT);
} }
@ -834,8 +888,8 @@ export class ActiveWorkflowRunner implements IWebhookManager {
}, },
); );
this.queuedWorkflowActivations[workflowId].lastTimeout = lastTimeout; this.queuedActivations[workflowId].lastTimeout = lastTimeout;
this.queuedWorkflowActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout); this.queuedActivations[workflowId].timeout = setTimeout(retryFunction, lastTimeout);
return; return;
} }
this.logger.info( this.logger.info(
@ -851,7 +905,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
// multiple run in parallel // multiple run in parallel
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
this.queuedWorkflowActivations[workflowId] = { this.queuedActivations[workflowId] = {
activationMode, activationMode,
lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, lastTimeout: WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT), timeout: setTimeout(retryFunction, WORKFLOW_REACTIVATE_INITIAL_TIMEOUT),
@ -862,18 +916,18 @@ export class ActiveWorkflowRunner implements IWebhookManager {
/** /**
* Remove a workflow from the activation queue * Remove a workflow from the activation queue
*/ */
removeQueuedWorkflowActivation(workflowId: string): void { removeQueuedWorkflowActivation(workflowId: string) {
if (this.queuedWorkflowActivations[workflowId]) { if (this.queuedActivations[workflowId]) {
clearTimeout(this.queuedWorkflowActivations[workflowId].timeout); clearTimeout(this.queuedActivations[workflowId].timeout);
delete this.queuedWorkflowActivations[workflowId]; delete this.queuedActivations[workflowId];
} }
} }
/** /**
* Remove all workflows from the activation queue * Remove all workflows from the activation queue
*/ */
removeAllQueuedWorkflowActivations(): void { removeAllQueuedWorkflowActivations() {
for (const workflowId in this.queuedWorkflowActivations) { for (const workflowId in this.queuedActivations) {
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
} }
} }
@ -884,10 +938,10 @@ export class ActiveWorkflowRunner implements IWebhookManager {
* @param {string} workflowId The id of the workflow to deactivate * @param {string} workflowId The id of the workflow to deactivate
*/ */
// TODO: this should happen in a transaction // TODO: this should happen in a transaction
async remove(workflowId: string): Promise<void> { async remove(workflowId: string) {
// Remove all the webhooks of the workflow // Remove all the webhooks of the workflow
try { try {
await this.removeWorkflowWebhooks(workflowId); await this.clearWebhooks(workflowId);
} catch (error) { } catch (error) {
ErrorReporter.error(error); ErrorReporter.error(error);
this.logger.error( this.logger.error(
@ -900,7 +954,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {
delete this.activationErrors[workflowId]; delete this.activationErrors[workflowId];
} }
if (this.queuedWorkflowActivations[workflowId] !== undefined) { if (this.queuedActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId); this.removeQueuedWorkflowActivation(workflowId);
} }
@ -913,4 +967,52 @@ export class ActiveWorkflowRunner implements IWebhookManager {
} }
} }
} }
/**
* Register as active in memory a trigger- or poller-based workflow.
*/
async addTriggersAndPollers(
dbWorkflow: WorkflowEntity,
workflow: Workflow,
{
activationMode,
executionMode,
additionalData,
}: {
activationMode: WorkflowActivateMode;
executionMode: WorkflowExecuteMode;
additionalData: IWorkflowExecuteAdditionalDataWorkflow;
},
) {
const getTriggerFunctions = this.getExecuteTriggerFunctions(
dbWorkflow,
additionalData,
executionMode,
activationMode,
);
const getPollFunctions = this.getExecutePollFunctions(
dbWorkflow,
additionalData,
executionMode,
activationMode,
);
if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) {
await this.activeWorkflows.add(
workflow.id,
workflow,
additionalData,
executionMode,
activationMode,
getTriggerFunctions,
getPollFunctions,
);
this.logger.verbose(`Workflow ${dbWorkflow.display()} activated`, {
workflowId: dbWorkflow.id,
workflowName: dbWorkflow.name,
});
}
}
} }

View file

@ -16,7 +16,6 @@ import type {
IWorkflowBase, IWorkflowBase,
CredentialLoadingDetails, CredentialLoadingDetails,
Workflow, Workflow,
WorkflowActivateMode,
WorkflowExecuteMode, WorkflowExecuteMode,
ExecutionStatus, ExecutionStatus,
IExecutionsSummary, IExecutionsSummary,
@ -64,20 +63,6 @@ import type {
import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants'; import type { LICENSE_FEATURES, LICENSE_QUOTAS } from './constants';
import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types'; import type { WorkflowWithSharingsAndCredentials } from './workflows/workflows.types';
export interface IActivationError {
time: number;
error: {
message: string;
};
}
export interface IQueuedWorkflowActivations {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
}
export interface ICredentialsTypeData { export interface ICredentialsTypeData {
[key: string]: CredentialLoadingDetails; [key: string]: CredentialLoadingDetails;
} }

View file

@ -620,7 +620,7 @@ export class Server extends AbstractServer {
this.app.get( this.app.get(
`/${this.restEndpoint}/active`, `/${this.restEndpoint}/active`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => { ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
return this.activeWorkflowRunner.getActiveWorkflows(req.user); return this.activeWorkflowRunner.allActiveInStorage(req.user);
}), }),
); );

View file

@ -119,6 +119,8 @@ export class Start extends BaseCommand {
'@/services/orchestration/main/MultiMainInstance.publisher.ee' '@/services/orchestration/main/MultiMainInstance.publisher.ee'
); );
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await Container.get(MultiMainInstancePublisher).destroy(); await Container.get(MultiMainInstancePublisher).destroy();
} }
@ -251,6 +253,15 @@ export class Start extends BaseCommand {
} }
await Container.get(OrchestrationHandlerMainService).init(); await Container.get(OrchestrationHandlerMainService).init();
multiMainInstancePublisher.on('leadershipChange', async () => {
if (multiMainInstancePublisher.isLeader) {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
} else {
// only in case of leadership change without shutdown
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
}
});
} }
async run() { async run() {

View file

@ -89,6 +89,10 @@ export class WorkflowEntity extends WithTimestampsAndStringId implements IWorkfl
@Column({ default: 0 }) @Column({ default: 0 })
triggerCount: number; triggerCount: number;
display() {
return `"${this.name}" (ID: ${this.id})`;
}
} }
/** /**

View file

@ -7,4 +7,18 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
constructor(dataSource: DataSource) { constructor(dataSource: DataSource) {
super(WorkflowEntity, dataSource.manager); super(WorkflowEntity, dataSource.manager);
} }
async getAllActive() {
return this.find({
where: { active: true },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
});
}
async findById(workflowId: string) {
return this.findOne({
where: { id: workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole', 'shared.role'],
});
}
} }

View file

@ -2,8 +2,9 @@ import Container from 'typedi';
import { RedisService } from './redis.service'; import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import config from '@/config'; import config from '@/config';
import { EventEmitter } from 'node:events';
export abstract class OrchestrationService { export abstract class OrchestrationService extends EventEmitter {
protected initialized = false; protected initialized = false;
protected queueModeId: string; protected queueModeId: string;
@ -29,6 +30,7 @@ export abstract class OrchestrationService {
} }
constructor() { constructor() {
super();
this.redisService = Container.get(RedisService); this.redisService = Container.get(RedisService);
this.queueModeId = config.getEnv('redis.queueModeId'); this.queueModeId = config.getEnv('redis.queueModeId');
} }

View file

@ -13,11 +13,11 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
private leaderId: string | undefined; private leaderId: string | undefined;
public get isLeader() { get isLeader() {
return this.id === this.leaderId; return this.id === this.leaderId;
} }
public get isFollower() { get isFollower() {
return !this.isLeader; return !this.isLeader;
} }
@ -84,6 +84,8 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
this.leaderId = this.id; this.leaderId = this.id;
this.emit('leadershipChange', this.id);
await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);
} }
} }

View file

@ -0,0 +1,356 @@
import { Container } from 'typedi';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';
import { ActiveWorkflows } from 'n8n-core';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import config from '@/config';
import { ExternalHooks } from '@/ExternalHooks';
import { Push } from '@/push';
import { SecretsHelper } from '@/SecretsHelpers';
import { WebhookService } from '@/services/webhook.service';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as AdditionalData from '@/WorkflowExecuteAdditionalData';
import { WorkflowRunner } from '@/WorkflowRunner';
import { mockInstance, setSchedulerAsLoadedNode } from './shared/utils';
import * as testDb from './shared/testDb';
import type { User } from '@/databases/entities/User';
import type { WebhookEntity } from '@/databases/entities/WebhookEntity';
import { NodeTypes } from '@/NodeTypes';
import { chooseRandomly } from './shared/random';
import { MultiMainInstancePublisher } from '@/services/orchestration/main/MultiMainInstance.publisher.ee';
mockInstance(ActiveExecutions);
mockInstance(ActiveWorkflows);
mockInstance(Push);
mockInstance(SecretsHelper);
mockInstance(MultiMainInstancePublisher);
const webhookService = mockInstance(WebhookService);
setSchedulerAsLoadedNode();
const externalHooks = mockInstance(ExternalHooks);
let activeWorkflowRunner: ActiveWorkflowRunner;
let owner: User;
const NON_LEADERSHIP_CHANGE_MODES: WorkflowActivateMode[] = [
'init',
'create',
'update',
'activate',
'manual',
];
beforeAll(async () => {
await testDb.init();
activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
owner = await testDb.createOwner();
});
afterEach(async () => {
await activeWorkflowRunner.removeAll();
activeWorkflowRunner.removeAllQueuedWorkflowActivations();
await testDb.truncate(['Workflow']);
config.load(config.default);
jest.restoreAllMocks();
});
afterAll(async () => {
await testDb.terminate();
});
describe('init()', () => {
test('should call `ExternalHooks.run()`', async () => {
const runSpy = jest.spyOn(externalHooks, 'run');
await activeWorkflowRunner.init();
expect(runSpy).toHaveBeenCalledTimes(1);
const [hook, arg] = runSpy.mock.calls[0];
expect(hook).toBe('activeWorkflows.initialized');
expect(arg).toBeEmptyArray();
});
test('should start with no active workflows', async () => {
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(0);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(0);
});
test('should start with one active workflow', async () => {
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(1);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(1);
});
test('should start with multiple active workflows', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const inStorage = activeWorkflowRunner.allActiveInStorage();
await expect(inStorage).resolves.toHaveLength(2);
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(2);
});
test('should pre-check that every workflow can be activated', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
const precheckSpy = jest
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
.mockReturnValue(true);
await activeWorkflowRunner.init();
expect(precheckSpy).toHaveBeenCalledTimes(2);
});
});
describe('removeAll()', () => {
test('should remove all active workflows from memory', async () => {
await testDb.createWorkflow({ active: true }, owner);
await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
await activeWorkflowRunner.removeAll();
const inMemory = activeWorkflowRunner.allActiveInMemory();
expect(inMemory).toHaveLength(0);
});
});
describe('remove()', () => {
test('should call `ActiveWorkflowRunner.clearWebhooks()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
const clearWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'clearWebhooks');
await activeWorkflowRunner.init();
await activeWorkflowRunner.remove(workflow.id);
expect(clearWebhooksSpy).toHaveBeenCalledTimes(1);
});
});
describe('isActive()', () => {
test('should return `true` for active workflow in storage', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
await expect(isActiveInStorage).resolves.toBe(true);
});
test('should return `false` for inactive workflow in storage', async () => {
const workflow = await testDb.createWorkflow({ active: false }, owner);
await activeWorkflowRunner.init();
const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id);
await expect(isActiveInStorage).resolves.toBe(false);
});
});
describe('runWorkflow()', () => {
test('should call `WorkflowRunner.run()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
await activeWorkflowRunner.init();
const additionalData = await AdditionalData.getBase('fake-user-id');
const runSpy = jest
.spyOn(WorkflowRunner.prototype, 'run')
.mockResolvedValue('fake-execution-id');
const [node] = workflow.nodes;
await activeWorkflowRunner.runWorkflow(workflow, node, [[]], additionalData, 'trigger');
expect(runSpy).toHaveBeenCalledTimes(1);
});
});
describe('executeErrorWorkflow()', () => {
test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => {
const workflow = await testDb.createWorkflow({ active: true }, owner);
const [node] = workflow.nodes;
const error = new NodeOperationError(node, 'Fake error message');
const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow');
await activeWorkflowRunner.init();
activeWorkflowRunner.executeErrorWorkflow(error, workflow, 'trigger');
expect(executeSpy).toHaveBeenCalledTimes(1);
});
test('should be called on failure to activate due to 401', async () => {
const storedWorkflow = await testDb.createWorkflow({ active: true }, owner);
const [node] = storedWorkflow.nodes;
const executeSpy = jest.spyOn(activeWorkflowRunner, 'executeErrorWorkflow');
jest.spyOn(activeWorkflowRunner, 'add').mockImplementation(() => {
throw new NodeApiError(node, {
httpCode: '401',
message: 'Authorization failed - please check your credentials',
});
});
await activeWorkflowRunner.init();
expect(executeSpy).toHaveBeenCalledTimes(1);
const [error, workflow] = executeSpy.mock.calls[0];
expect(error.message).toContain('Authorization');
expect(workflow.id).toBe(storedWorkflow.id);
});
});
describe('add()', () => {
describe('in single-main scenario', () => {
test('leader should add webhooks, triggers and pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('in multi-main scenario', () => {
describe('leader', () => {
test('on regular activation mode, leader should add webhooks only', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).toHaveBeenCalledTimes(1);
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
test('on activation via leadership change, leader should add triggers and pollers only', async () => {
const mode = 'leadershipChange';
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: true });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1);
});
});
describe('follower', () => {
test('on regular activation mode, follower should not add webhooks, triggers or pollers', async () => {
const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES);
jest.replaceProperty(activeWorkflowRunner, 'isMultiMainScenario', true);
mockInstance(MultiMainInstancePublisher, { isLeader: false });
const workflow = await testDb.createWorkflow({ active: true }, owner);
const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(activeWorkflowRunner, 'addTriggersAndPollers');
await activeWorkflowRunner.init();
addWebhooksSpy.mockReset();
addTriggersAndPollersSpy.mockReset();
await activeWorkflowRunner.add(workflow.id, mode);
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
});
});
});
});
describe('addWebhooks()', () => {
test('should call `WebhookService.storeWebhook()`', async () => {
const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData;
const mockWebhookEntity = { webhookPath: 'fake-path' } as unknown as WebhookEntity;
jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([mockWebhook]);
webhookService.createWebhook.mockReturnValue(mockWebhookEntity);
const additionalData = await AdditionalData.getBase('fake-user-id');
const dbWorkflow = await testDb.createWorkflow({ active: true }, owner);
const workflow = new Workflow({
id: dbWorkflow.id,
name: dbWorkflow.name,
nodes: dbWorkflow.nodes,
connections: dbWorkflow.connections,
active: dbWorkflow.active,
nodeTypes: Container.get(NodeTypes),
staticData: dbWorkflow.staticData,
settings: dbWorkflow.settings,
});
const [node] = dbWorkflow.nodes;
jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node);
jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true);
jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined);
await activeWorkflowRunner.addWebhooks(workflow, additionalData, 'trigger', 'init');
expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1);
});
});

View file

@ -459,10 +459,10 @@ export async function createWorkflow(attributes: Partial<WorkflowEntity> = {}, u
nodes: nodes ?? [ nodes: nodes ?? [
{ {
id: 'uuid-1234', id: 'uuid-1234',
name: 'Start', name: 'Schedule Trigger',
parameters: {}, parameters: {},
position: [-20, 260], position: [-20, 260],
type: 'n8n-nodes-base.start', type: 'n8n-nodes-base.scheduleTrigger',
typeVersion: 1, typeVersion: 1,
}, },
], ],

View file

@ -1,6 +1,6 @@
import { Container } from 'typedi'; import { Container } from 'typedi';
import { BinaryDataService } from 'n8n-core'; import { BinaryDataService } from 'n8n-core';
import type { INode } from 'n8n-workflow'; import { type INode } from 'n8n-workflow';
import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials'; import { GithubApi } from 'n8n-nodes-base/credentials/GithubApi.credentials';
import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials'; import { Ftp } from 'n8n-nodes-base/credentials/Ftp.credentials';
import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node'; import { Cron } from 'n8n-nodes-base/nodes/Cron/Cron.node';
@ -16,6 +16,8 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { AUTH_COOKIE_NAME } from '@/constants'; import { AUTH_COOKIE_NAME } from '@/constants';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { mockInstance } from './mocking';
import { mockNodeTypesData } from '../../../unit/Helpers';
export { mockInstance } from './mocking'; export { mockInstance } from './mocking';
export { setupTestServer } from './testServer'; export { setupTestServer } from './testServer';
@ -166,3 +168,15 @@ export function makeWorkflow(options?: {
} }
export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] }; export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] };
export function setSchedulerAsLoadedNode() {
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
Object.assign(nodesAndCredentials, {
loadedNodes: mockNodeTypesData(['scheduleTrigger'], {
addTrigger: true,
}),
known: { nodes: {}, credentials: {} },
types: { nodes: [], credentials: [] },
});
}

View file

@ -1,278 +0,0 @@
import { v4 as uuid } from 'uuid';
import { mocked } from 'jest-mock';
import { Container } from 'typedi';
import type { INode } from 'n8n-workflow';
import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { Role } from '@db/entities/Role';
import { User } from '@db/entities/User';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { Push } from '@/push';
import { ActiveExecutions } from '@/ActiveExecutions';
import { SecretsHelper } from '@/SecretsHelpers';
import { WebhookService } from '@/services/webhook.service';
import { VariablesService } from '@/environments/variables/variables.service';
import { mockInstance } from '../integration/shared/utils/';
import { randomEmail, randomName } from '../integration/shared/random';
import * as Helpers from './Helpers';
/**
* TODO:
* - test workflow webhooks activation (that trigger `executeWebhook`and other webhook methods)
* - test activation error catching and getters such as `getActivationError` (requires building a workflow that fails to activate)
* - test queued workflow activation functions (might need to create a non-working workflow to test this)
*/
let databaseActiveWorkflowsCount = 0;
let databaseActiveWorkflowsList: WorkflowEntity[] = [];
const generateWorkflows = (count: number): WorkflowEntity[] => {
const workflows: WorkflowEntity[] = [];
const ownerRole = new Role();
ownerRole.scope = 'workflow';
ownerRole.name = 'owner';
ownerRole.id = '1';
const owner = new User();
owner.id = uuid();
owner.firstName = randomName();
owner.lastName = randomName();
owner.email = randomEmail();
for (let i = 0; i < count; i++) {
const workflow = new WorkflowEntity();
Object.assign(workflow, {
id: (i + 1).toString(),
name: randomName(),
active: true,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [
{
parameters: {
rule: {
interval: [{}],
},
},
id: uuid(),
name: 'Schedule Trigger',
type: 'n8n-nodes-base.scheduleTrigger',
typeVersion: 1,
position: [900, 460],
},
],
connections: {},
tags: [],
});
const sharedWorkflow = new SharedWorkflow();
sharedWorkflow.workflowId = workflow.id;
sharedWorkflow.role = ownerRole;
sharedWorkflow.user = owner;
workflow.shared = [sharedWorkflow];
workflows.push(workflow);
}
databaseActiveWorkflowsList = workflows;
return workflows;
};
const MOCK_NODE_TYPES_DATA = Helpers.mockNodeTypesData(['scheduleTrigger'], {
addTrigger: true,
});
jest.mock('@/Db', () => {
return {
collections: {
Workflow: {
find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)),
findOne: jest.fn(async (searchParams) => {
return databaseActiveWorkflowsList.find(
(workflow) => workflow.id === searchParams.where.id.toString(),
);
}),
update: jest.fn(),
createQueryBuilder: jest.fn(() => {
const fakeQueryBuilder = {
update: () => fakeQueryBuilder,
set: () => fakeQueryBuilder,
where: () => fakeQueryBuilder,
execute: async () => {},
};
return fakeQueryBuilder;
}),
},
},
};
});
const workflowCheckIfCanBeActivated = jest.fn(() => true);
jest
.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated')
.mockImplementation(workflowCheckIfCanBeActivated);
const removeFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'remove');
const removeWebhooksFunction = jest.spyOn(ActiveWorkflowRunner.prototype, 'removeWorkflowWebhooks');
const workflowRunnerRun = jest.spyOn(WorkflowRunner.prototype, 'run');
const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
WorkflowExecuteAdditionalData,
'executeErrorWorkflow',
);
describe('ActiveWorkflowRunner', () => {
mockInstance(ActiveExecutions);
const externalHooks = mockInstance(ExternalHooks);
const webhookService = mockInstance(WebhookService);
mockInstance(Push);
mockInstance(SecretsHelper);
const variablesService = mockInstance(VariablesService);
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
Object.assign(nodesAndCredentials, {
loadedNodes: MOCK_NODE_TYPES_DATA,
known: { nodes: {}, credentials: {} },
types: { nodes: [], credentials: [] },
});
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
beforeAll(async () => {
variablesService.getAllCached.mockResolvedValue([]);
});
afterEach(async () => {
await activeWorkflowRunner.removeAll();
databaseActiveWorkflowsCount = 0;
databaseActiveWorkflowsList = [];
jest.clearAllMocks();
});
test('Should initialize activeWorkflowRunner with empty list of active workflows and call External Hooks', async () => {
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalledTimes(1);
});
test('Should initialize activeWorkflowRunner with one active workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled();
});
test('Should make sure function checkIfWorkflowCanBeActivated was called for every workflow', async () => {
databaseActiveWorkflowsCount = 2;
await activeWorkflowRunner.init();
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to removeAll should remove every workflow', async () => {
databaseActiveWorkflowsCount = 2;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
await activeWorkflowRunner.removeAll();
expect(removeFunction).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to remove should also call removeWorkflowWebhooks', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
await activeWorkflowRunner.remove('1');
expect(removeWebhooksFunction).toHaveBeenCalledTimes(1);
});
test('Call to isActive should return true for valid workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('1')).toBe(true);
});
test('Call to isActive should return false for invalid workflow', async () => {
databaseActiveWorkflowsCount = 1;
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('2')).toBe(false);
});
test('Calling add should call checkIfWorkflowCanBeActivated', async () => {
// Initialize with default (0) workflows
await activeWorkflowRunner.init();
generateWorkflows(1);
await activeWorkflowRunner.add('1', 'activate');
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(1);
});
test('runWorkflow should call run method in WorkflowRunner', async () => {
await activeWorkflowRunner.init();
const workflow = generateWorkflows(1);
const additionalData = await WorkflowExecuteAdditionalData.getBase('fake-user-id');
workflowRunnerRun.mockResolvedValueOnce('invalid-execution-id');
await activeWorkflowRunner.runWorkflow(
workflow[0],
workflow[0].nodes[0],
[[]],
additionalData,
'trigger',
);
expect(workflowRunnerRun).toHaveBeenCalledTimes(1);
});
test('executeErrorWorkflow should call function with same name in WorkflowExecuteAdditionalData', async () => {
const workflowData = generateWorkflows(1)[0];
const error = new NodeOperationError(workflowData.nodes[0], 'Fake error message');
await activeWorkflowRunner.init();
activeWorkflowRunner.executeErrorWorkflow(error, workflowData, 'trigger');
expect(workflowExecuteAdditionalDataExecuteErrorWorkflowSpy).toHaveBeenCalledTimes(1);
});
describe('init()', () => {
it('should execute error workflow on failure to activate due to 401', async () => {
databaseActiveWorkflowsCount = 1;
jest.spyOn(ActiveWorkflowRunner.prototype, 'add').mockImplementation(() => {
throw new NodeApiError(
{
id: 'a75dcd1b-9fed-4643-90bd-75933d67936c',
name: 'Github Trigger',
type: 'n8n-nodes-base.githubTrigger',
typeVersion: 1,
position: [0, 0],
} as INode,
{
httpCode: '401',
message: 'Authorization failed - please check your credentials',
},
);
});
const executeSpy = jest.spyOn(ActiveWorkflowRunner.prototype, 'executeErrorWorkflow');
await activeWorkflowRunner.init();
const [error, workflow] = executeSpy.mock.calls[0];
expect(error.message).toContain('Authorization');
expect(workflow.id).toBe('1');
});
});
});

View file

@ -1,5 +1,3 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
import { CronJob } from 'cron'; import { CronJob } from 'cron';
import type { import type {
@ -14,62 +12,64 @@ import type {
WorkflowActivateMode, WorkflowActivateMode,
WorkflowExecuteMode, WorkflowExecuteMode,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { LoggerProxy as Logger, toCronExpression, WorkflowActivationError } from 'n8n-workflow'; import {
LoggerProxy as Logger,
toCronExpression,
WorkflowActivationError,
WorkflowDeactivationError,
} from 'n8n-workflow';
import type { IWorkflowData } from './Interfaces'; import type { IWorkflowData } from './Interfaces';
export class ActiveWorkflows { export class ActiveWorkflows {
private workflowData: { private activeWorkflows: {
[key: string]: IWorkflowData; [workflowId: string]: IWorkflowData;
} = {}; } = {};
/** /**
* Returns if the workflow is active * Returns if the workflow is active in memory.
*
* @param {string} id The id of the workflow to check
*/ */
isActive(id: string): boolean { isActive(workflowId: string) {
return this.workflowData.hasOwnProperty(id); return this.activeWorkflows.hasOwnProperty(workflowId);
} }
/** /**
* Returns the ids of the currently active workflows * Returns the IDs of the currently active workflows in memory.
*
*/ */
allActiveWorkflows(): string[] { allActiveWorkflows() {
return Object.keys(this.workflowData); return Object.keys(this.activeWorkflows);
} }
/** /**
* Returns the Workflow data for the workflow with * Returns the workflow data for the given ID if currently active in memory.
* the given id if it is currently active
*
*/ */
get(id: string): IWorkflowData | undefined { get(workflowId: string) {
return this.workflowData[id]; return this.activeWorkflows[workflowId];
} }
/** /**
* Makes a workflow active * Makes a workflow active
* *
* @param {string} id The id of the workflow to activate * @param {string} workflowId The id of the workflow to activate
* @param {Workflow} workflow The workflow to activate * @param {Workflow} workflow The workflow to activate
* @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows * @param {IWorkflowExecuteAdditionalData} additionalData The additional data which is needed to run workflows
*/ */
async add( async add(
id: string, workflowId: string,
workflow: Workflow, workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData, additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
activation: WorkflowActivateMode, activation: WorkflowActivateMode,
getTriggerFunctions: IGetExecuteTriggerFunctions, getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions, getPollFunctions: IGetExecutePollFunctions,
): Promise<void> { ) {
this.workflowData[id] = {}; this.activeWorkflows[workflowId] = {};
const triggerNodes = workflow.getTriggerNodes(); const triggerNodes = workflow.getTriggerNodes();
let triggerResponse: ITriggerResponse | undefined; let triggerResponse: ITriggerResponse | undefined;
this.workflowData[id].triggerResponses = [];
this.activeWorkflows[workflowId].triggerResponses = [];
for (const triggerNode of triggerNodes) { for (const triggerNode of triggerNodes) {
try { try {
triggerResponse = await workflow.runTrigger( triggerResponse = await workflow.runTrigger(
@ -82,23 +82,27 @@ export class ActiveWorkflows {
if (triggerResponse !== undefined) { if (triggerResponse !== undefined) {
// If a response was given save it // If a response was given save it
this.workflowData[id].triggerResponses!.push(triggerResponse); this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse);
} }
} catch (error) { } catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowActivationError( throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`, `There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: triggerNode }, { cause: error, node: triggerNode },
); );
} }
} }
const pollNodes = workflow.getPollNodes(); const pollingNodes = workflow.getPollNodes();
if (pollNodes.length) {
this.workflowData[id].pollResponses = []; if (pollingNodes.length === 0) return;
for (const pollNode of pollNodes) {
this.activeWorkflows[workflowId].pollResponses = [];
for (const pollNode of pollingNodes) {
try { try {
this.workflowData[id].pollResponses!.push( this.activeWorkflows[workflowId].pollResponses!.push(
await this.activatePolling( await this.activatePolling(
pollNode, pollNode,
workflow, workflow,
@ -108,20 +112,19 @@ export class ActiveWorkflows {
activation, activation,
), ),
); );
} catch (error) { } catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowActivationError( throw new WorkflowActivationError(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem activating the workflow: "${error.message}"`, `There was a problem activating the workflow: "${error.message}"`,
{ cause: error as Error, node: pollNode }, { cause: error, node: pollNode },
); );
} }
} }
} }
}
/** /**
* Activates polling for the given node * Activates polling for the given node
*
*/ */
async activatePolling( async activatePolling(
node: INode, node: INode,
@ -159,7 +162,7 @@ export class ActiveWorkflows {
if (testingTrigger) { if (testingTrigger) {
throw error; throw error;
} }
pollFunctions.__emitError(error); pollFunctions.__emitError(error as Error);
} }
}; };
@ -193,59 +196,49 @@ export class ActiveWorkflows {
} }
/** /**
* Makes a workflow inactive * Makes a workflow inactive in memory.
*
* @param {string} id The id of the workflow to deactivate
*/ */
async remove(id: string): Promise<boolean> { async remove(workflowId: string) {
if (!this.isActive(id)) { if (!this.isActive(workflowId)) {
// Workflow is currently not registered Logger.warn(`Cannot deactivate already inactive workflow ID "${workflowId}"`);
Logger.warn(
`The workflow with the id "${id}" is currently not active and can so not be removed`,
);
return false; return false;
} }
const workflowData = this.workflowData[id]; const w = this.activeWorkflows[workflowId];
if (workflowData.triggerResponses) { w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
for (const triggerResponse of workflowData.triggerResponses) { w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
if (triggerResponse.closeFunction) {
try {
await triggerResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
if (workflowData.pollResponses) { delete this.activeWorkflows[workflowId];
for (const pollResponse of workflowData.pollResponses) {
if (pollResponse.closeFunction) {
try {
await pollResponse.closeFunction();
} catch (error) {
Logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem deactivating polling trigger of workflow "${id}": "${error.message}"`,
{
workflowId: id,
},
);
}
}
}
}
delete this.workflowData[id];
return true; return true;
} }
async removeAllTriggerAndPollerBasedWorkflows() {
for (const workflowId of Object.keys(this.activeWorkflows)) {
const w = this.activeWorkflows[workflowId];
w.triggerResponses?.forEach(async (r) => this.close(r, workflowId, 'trigger'));
w.pollResponses?.forEach(async (r) => this.close(r, workflowId, 'poller'));
}
}
private async close(
response: ITriggerResponse | IPollResponse,
workflowId: string,
target: 'trigger' | 'poller',
) {
if (!response.closeFunction) return;
try {
await response.closeFunction();
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
throw new WorkflowDeactivationError(
`Failed to deactivate ${target} of workflow ID "${workflowId}": "${error.message}"`,
{ cause: error, workflowId },
);
}
}
} }

View file

@ -1905,7 +1905,14 @@ export type WorkflowExecuteMode =
| 'retry' | 'retry'
| 'trigger' | 'trigger'
| 'webhook'; | 'webhook';
export type WorkflowActivateMode = 'init' | 'create' | 'update' | 'activate' | 'manual';
export type WorkflowActivateMode =
| 'init'
| 'create'
| 'update'
| 'activate'
| 'manual'
| 'leadershipChange';
export interface IWorkflowHooksOptionalParameters { export interface IWorkflowHooksOptionalParameters {
parentProcessMode?: string; parentProcessMode?: string;

View file

@ -5,6 +5,7 @@ interface WorkflowActivationErrorOptions {
cause?: Error; cause?: Error;
node?: INode; node?: INode;
severity?: Severity; severity?: Severity;
workflowId?: string;
} }
/** /**
@ -13,7 +14,12 @@ interface WorkflowActivationErrorOptions {
export class WorkflowActivationError extends ExecutionBaseError { export class WorkflowActivationError extends ExecutionBaseError {
node: INode | undefined; node: INode | undefined;
constructor(message: string, { cause, node, severity }: WorkflowActivationErrorOptions) { workflowId: string | undefined;
constructor(
message: string,
{ cause, node, severity, workflowId }: WorkflowActivationErrorOptions = {},
) {
let error = cause as Error; let error = cause as Error;
if (cause instanceof ExecutionBaseError) { if (cause instanceof ExecutionBaseError) {
error = new Error(cause.message); error = new Error(cause.message);
@ -23,11 +29,14 @@ export class WorkflowActivationError extends ExecutionBaseError {
} }
super(message, { cause: error }); super(message, { cause: error });
this.node = node; this.node = node;
this.workflowId = workflowId;
this.message = message; this.message = message;
if (severity) this.severity = severity; if (severity) this.severity = severity;
} }
} }
export class WorkflowDeactivationError extends WorkflowActivationError {}
export class WebhookPathAlreadyTakenError extends WorkflowActivationError { export class WebhookPathAlreadyTakenError extends WorkflowActivationError {
constructor(nodeName: string, cause?: Error) { constructor(nodeName: string, cause?: Error) {
super( super(