From 64c5b6e0604ce9da6b19dd5f04e61e38209b3153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 13 Feb 2025 16:10:43 +0100 Subject: [PATCH] fix(core): Reduce risk of race condition during workflow activation loop (#13186) Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> --- .../__tests__/active-workflow-manager.test.ts | 29 ++++++++- packages/cli/src/active-workflow-manager.ts | 59 ++++++++++++------- .../repositories/workflow.repository.ts | 7 ++- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index dd2d3d8cb5..2e579a3df1 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -10,12 +10,15 @@ import type { import { Workflow } from 'n8n-workflow'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; +import type { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import type { NodeTypes } from '@/node-types'; describe('ActiveWorkflowManager', () => { let activeWorkflowManager: ActiveWorkflowManager; - const instanceSettings = mock(); + const instanceSettings = mock({ isMultiMain: false }); const nodeTypes = mock(); + const workflowRepository = mock(); beforeEach(() => { jest.clearAllMocks(); @@ -27,7 +30,7 @@ describe('ActiveWorkflowManager', () => { mock(), nodeTypes, mock(), - mock(), + workflowRepository, mock(), mock(), mock(), @@ -122,5 +125,27 @@ describe('ActiveWorkflowManager', () => { } }); }); + + describe('add', () => { + test.each<[WorkflowActivateMode]>([['init'], ['leadershipChange']])( + 'should skip inactive workflow in `%s` activation mode', + async (mode) => { + const checkSpy = jest.spyOn(activeWorkflowManager, 'checkIfWorkflowCanBeActivated'); + const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn( + activeWorkflowManager, + 'addTriggersAndPollers', + ); + workflowRepository.findById.mockResolvedValue(mock({ active: false })); + + const result = await activeWorkflowManager.add('some-id', mode); + + expect(checkSpy).not.toHaveBeenCalled(); + expect(addWebhooksSpy).not.toHaveBeenCalled(); + expect(addTriggersAndPollersSpy).not.toHaveBeenCalled(); + expect(result).toBe(false); + }, + ); + }); }); }); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 536f9bf268..16500e6355 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -24,6 +24,7 @@ import type { WorkflowActivateMode, WorkflowExecuteMode, INodeType, + WorkflowId, } from 'n8n-workflow'; import { Workflow, @@ -39,11 +40,13 @@ import { WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, WORKFLOW_REACTIVATE_MAX_TIMEOUT, } from '@/constants'; +import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { OnShutdown } from '@/decorators/on-shutdown'; import { executeErrorWorkflow } from '@/execution-lifecycle/execute-error-workflow'; import { ExecutionService } from '@/executions/execution.service'; import { ExternalHooks } from '@/external-hooks'; +import type { IWorkflowDb } from '@/interfaces'; import { NodeTypes } from '@/node-types'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { ActiveWorkflowsService } from '@/services/active-workflows.service'; @@ -59,12 +62,12 @@ interface QueuedActivation { activationMode: WorkflowActivateMode; lastTimeout: number; timeout: NodeJS.Timeout; - workflowData: IWorkflowBase; + workflowData: IWorkflowDb; } @Service() export class ActiveWorkflowManager { - private queuedActivations: { [workflowId: string]: QueuedActivation } = {}; + private queuedActivations: Record = {}; constructor( private readonly logger: Logger, @@ -92,7 +95,6 @@ export class ActiveWorkflowManager { await this.addActiveWorkflows('init'); await this.externalHooks.run('activeWorkflows.initialized'); - await this.webhookService.populateCache(); } async getAllWorkflowActivationErrors() { @@ -134,7 +136,7 @@ export class ActiveWorkflowManager { * @important Do not confuse with `ActiveWorkflows.isActive()`, * which checks if the workflow is active in memory. */ - async isActive(workflowId: string) { + async isActive(workflowId: WorkflowId) { const workflow = await this.workflowRepository.findOne({ select: ['active'], where: { id: workflowId }, @@ -230,7 +232,7 @@ export class ActiveWorkflowManager { * Remove all webhooks of a workflow from the database, and * deregister those webhooks from external services. */ - async clearWebhooks(workflowId: string) { + async clearWebhooks(workflowId: WorkflowId) { const workflowData = await this.workflowRepository.findOne({ where: { id: workflowId }, }); @@ -270,7 +272,7 @@ export class ActiveWorkflowManager { * and overwrites the emit to be able to start it in subprocess */ getExecutePollFunctions( - workflowData: IWorkflowBase, + workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, @@ -321,7 +323,7 @@ export class ActiveWorkflowManager { * and overwrites the emit to be able to start it in subprocess */ getExecuteTriggerFunctions( - workflowData: IWorkflowBase, + workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, @@ -378,7 +380,7 @@ export class ActiveWorkflowManager { ); this.executeErrorWorkflow(activationError, workflowData, mode); - this.addQueuedWorkflowActivation(activation, workflowData); + this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity); }; return new TriggerContext(workflow, node, additionalData, mode, activation, emit, emitError); }; @@ -411,9 +413,9 @@ export class ActiveWorkflowManager { * only on instance init or (in multi-main setup) on leadership change. */ async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') { - const dbWorkflows = await this.workflowRepository.getAllActive(); + const dbWorkflowIds = await this.workflowRepository.getAllActiveIds(); - if (dbWorkflows.length === 0) return; + if (dbWorkflowIds.length === 0) return; if (this.instanceSettings.isLeader) { this.logger.info(' ================================'); @@ -421,11 +423,11 @@ export class ActiveWorkflowManager { this.logger.info(' ================================'); } - const batches = chunk(dbWorkflows, this.workflowsConfig.activationBatchSize); + const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize); for (const batch of batches) { - const activationPromises = batch.map(async (dbWorkflow) => { - await this.activateWorkflow(dbWorkflow, activationMode); + const activationPromises = batch.map(async (dbWorkflowId) => { + await this.activateWorkflow(dbWorkflowId, activationMode); }); await Promise.all(activationPromises); @@ -435,9 +437,12 @@ export class ActiveWorkflowManager { } private async activateWorkflow( - dbWorkflow: IWorkflowBase, + workflowId: WorkflowId, activationMode: 'init' | 'leadershipChange', ) { + const dbWorkflow = await this.workflowRepository.findById(workflowId); + if (!dbWorkflow) return; + try { const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, { shouldPublish: false, @@ -515,9 +520,9 @@ export class ActiveWorkflowManager { * since webhooks do not require continuous execution. */ async add( - workflowId: string, + workflowId: WorkflowId, activationMode: WorkflowActivateMode, - existingWorkflow?: IWorkflowBase, + existingWorkflow?: WorkflowEntity, { shouldPublish } = { shouldPublish: true }, ) { if (this.instanceSettings.isMultiMain && shouldPublish) { @@ -547,6 +552,16 @@ export class ActiveWorkflowManager { }); } + if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.active) { + this.logger.debug( + `Skipping workflow ${formatWorkflow(dbWorkflow)} as it is no longer active`, + { + workflowId: dbWorkflow.id, + }, + ); + return false; + } + if (shouldDisplayActivationMessage) { this.logger.debug(`Initializing active workflow ${formatWorkflow(dbWorkflow)} (startup)`, { workflowName: dbWorkflow.name, @@ -672,7 +687,7 @@ export class ActiveWorkflowManager { */ private addQueuedWorkflowActivation( activationMode: WorkflowActivateMode, - workflowData: IWorkflowBase, + workflowData: WorkflowEntity, ) { const workflowId = workflowData.id; const workflowName = workflowData.name; @@ -729,7 +744,7 @@ export class ActiveWorkflowManager { /** * Remove a workflow from the activation queue */ - private removeQueuedWorkflowActivation(workflowId: string) { + private removeQueuedWorkflowActivation(workflowId: WorkflowId) { if (this.queuedActivations[workflowId]) { clearTimeout(this.queuedActivations[workflowId].timeout); delete this.queuedActivations[workflowId]; @@ -752,7 +767,7 @@ export class ActiveWorkflowManager { */ // TODO: this should happen in a transaction // maybe, see: https://github.com/n8n-io/n8n/pull/8904#discussion_r1530150510 - async remove(workflowId: string) { + async remove(workflowId: WorkflowId) { if (this.instanceSettings.isMultiMain) { try { await this.clearWebhooks(workflowId); @@ -794,7 +809,7 @@ export class ActiveWorkflowManager { /** * Stop running active triggers and pollers for a workflow. */ - async removeWorkflowTriggersAndPollers(workflowId: string) { + async removeWorkflowTriggersAndPollers(workflowId: WorkflowId) { if (!this.activeWorkflows.isActive(workflowId)) return; const wasRemoved = await this.activeWorkflows.remove(workflowId); @@ -810,7 +825,7 @@ export class ActiveWorkflowManager { * Register as active in memory a trigger- or poller-based workflow. */ async addTriggersAndPollers( - dbWorkflow: IWorkflowBase, + dbWorkflow: WorkflowEntity, workflow: Workflow, { activationMode, @@ -856,7 +871,7 @@ export class ActiveWorkflowManager { } } - async removeActivationError(workflowId: string) { + async removeActivationError(workflowId: WorkflowId) { await this.activationErrorsService.deregister(workflowId); } diff --git a/packages/cli/src/databases/repositories/workflow.repository.ts b/packages/cli/src/databases/repositories/workflow.repository.ts index 5099a0b526..1fd66266c6 100644 --- a/packages/cli/src/databases/repositories/workflow.repository.ts +++ b/packages/cli/src/databases/repositories/workflow.repository.ts @@ -37,11 +37,14 @@ export class WorkflowRepository extends Repository { }); } - async getAllActive() { - return await this.find({ + async getAllActiveIds() { + const result = await this.find({ + select: { id: true }, where: { active: true }, relations: { shared: { project: { projectRelations: true } } }, }); + + return result.map(({ id }) => id); } async getActiveIds({ maxResults }: { maxResults?: number } = {}) {