diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index b45d0602d4..d15dd9026d 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -292,6 +292,10 @@ export class ActiveWorkflowRunner implements IWebhookManager { const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); let path = ''; + if (webhooks.length === 0) return; + + this.logger.debug(`Adding webhooks for workflow "${workflow.name}" (ID ${workflow.id})`); + for (const webhookData of webhooks) { const node = workflow.getNode(webhookData.node) as INode; node.name = webhookData.node; @@ -699,14 +703,26 @@ export class ActiveWorkflowRunner implements IWebhookManager { let shouldAddWebhooks = true; let shouldAddTriggersAndPollers = true; - if (this.multiMainSetup.isEnabled && activationMode !== 'leadershipChange') { - shouldAddWebhooks = this.multiMainSetup.isLeader; - shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; - } - - if (this.multiMainSetup.isEnabled && activationMode === 'leadershipChange') { - shouldAddWebhooks = false; - shouldAddTriggersAndPollers = true; + /** + * In a multi-main scenario, webhooks are stored in the database, while triggers + * and pollers are run only by the leader main instance. + * + * - During a regular workflow activation (i.e. not leadership change), only the + * leader should add webhooks to prevent duplicate insertions, and only the leader + * should handle triggers and pollers to prevent duplicate work. + * + * - During a leadership change, webhooks remain in storage and so need not be added + * again, and the new leader should take over the triggers and pollers that stopped + * running when the former leader became unresponsive. + */ + if (this.multiMainSetup.isEnabled) { + if (activationMode !== 'leadershipChange') { + shouldAddWebhooks = this.multiMainSetup.isLeader; + shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; + } else { + shouldAddWebhooks = false; + shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; + } } try { @@ -744,14 +760,10 @@ export class ActiveWorkflowRunner implements IWebhookManager { const additionalData = await WorkflowExecuteAdditionalData.getBase(sharing.user.id); if (shouldAddWebhooks) { - this.logger.debug(`Adding webhooks for workflow ${dbWorkflow.display()}`); - await this.addWebhooks(workflow, additionalData, 'trigger', activationMode); } if (shouldAddTriggersAndPollers) { - this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`); - await this.addTriggersAndPollers(dbWorkflow, workflow, { activationMode, executionMode: 'trigger', @@ -936,6 +948,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { ); if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) { + this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`); + await this.activeWorkflows.add( workflow.id, workflow, diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 50333f2078..708012d1a1 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -248,7 +248,7 @@ export class Server extends AbstractServer { ActiveWorkflowsController, ]; - if (Container.get(MultiMainSetup).isEnabled) { + if (process.env.NODE_ENV !== 'production' && Container.get(MultiMainSetup).isEnabled) { const { DebugController } = await import('./controllers/debug.controller'); controllers.push(DebugController); } diff --git a/packages/cli/src/controllers/debug.controller.ts b/packages/cli/src/controllers/debug.controller.ts index 5580eff476..78173ba599 100644 --- a/packages/cli/src/controllers/debug.controller.ts +++ b/packages/cli/src/controllers/debug.controller.ts @@ -3,6 +3,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { In } from 'typeorm'; +import { WebhookEntity } from '@/databases/entities/WebhookEntity'; @RestController('/debug') export class DebugController { @@ -16,17 +17,27 @@ export class DebugController { async getMultiMainSetupDetails() { const leaderKey = await this.multiMainSetup.fetchLeaderKey(); - const activeWorkflows = await this.workflowRepository.find({ + const triggersAndPollers = await this.workflowRepository.find({ select: ['id', 'name'], where: { id: In(this.activeWorkflowRunner.allActiveInMemory()) }, }); + const webhooks = (await this.workflowRepository + .createQueryBuilder('workflow') + .select('DISTINCT workflow.id, workflow.name') + .innerJoin(WebhookEntity, 'webhook_entity', 'workflow.id = webhook_entity.workflowId') + .execute()) as Array<{ id: string; name: string }>; + const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors(); return { instanceId: this.multiMainSetup.instanceId, leaderKey, - activeWorkflows, + isLeader: this.multiMainSetup.isLeader, + activeWorkflows: { + webhooks, // webhook-based active workflows + triggersAndPollers, // poller- and trigger-based active workflows + }, activationErrors, }; } diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index 5d98775d57..96cd9715d8 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -307,11 +307,13 @@ export class WorkflowService { await this.multiMainSetup.init(); - if (this.multiMainSetup.isEnabled) { + const newState = updatedWorkflow.active; + + if (this.multiMainSetup.isEnabled && oldState !== newState) { await this.multiMainSetup.broadcastWorkflowActiveStateChanged({ workflowId, oldState, - newState: updatedWorkflow.active, + newState, versionId: shared.workflow.versionId, }); } diff --git a/packages/cli/test/integration/debug.controller.test.ts b/packages/cli/test/integration/debug.controller.test.ts index 954d17da9c..8336702c68 100644 --- a/packages/cli/test/integration/debug.controller.test.ts +++ b/packages/cli/test/integration/debug.controller.test.ts @@ -25,22 +25,39 @@ describe('DebugController', () => { describe('GET /debug/multi-main-setup', () => { test('should return multi-main setup details', async () => { const workflowId = generateNanoId(); - const activeWorkflows = [{ id: workflowId, name: randomName() }] as WorkflowEntity[]; + const webhooks = [{ id: workflowId, name: randomName() }] as WorkflowEntity[]; + const triggersAndPollers = [{ id: workflowId, name: randomName() }] as WorkflowEntity[]; const activationErrors = { [workflowId]: 'Failed to activate' }; const instanceId = 'main-71JdWtq306epIFki'; + const leaderKey = 'some-leader-key'; - workflowRepository.find.mockResolvedValue(activeWorkflows); + const createQueryBuilder = { + select: () => createQueryBuilder, + innerJoin: () => createQueryBuilder, + execute: () => webhooks, + }; + + workflowRepository.find.mockResolvedValue(triggersAndPollers); activeWorkflowRunner.allActiveInMemory.mockReturnValue([workflowId]); activeWorkflowRunner.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors); + + jest + .spyOn(workflowRepository, 'createQueryBuilder') + .mockImplementation(() => createQueryBuilder); jest.spyOn(MultiMainSetup.prototype, 'instanceId', 'get').mockReturnValue(instanceId); - jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue('some-leader-key'); + jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey); + jest.spyOn(MultiMainSetup.prototype, 'isLeader', 'get').mockReturnValue(true); const response = await ownerAgent.get('/debug/multi-main-setup').expect(200); expect(response.body.data).toMatchObject({ instanceId, - leaderKey: 'some-leader-key', - activeWorkflows, + leaderKey, + isLeader: true, + activeWorkflows: { + webhooks, + triggersAndPollers, + }, activationErrors, }); }); diff --git a/packages/cli/test/integration/workflow.service.test.ts b/packages/cli/test/integration/workflow.service.test.ts index 9afd767cd4..923fc71756 100644 --- a/packages/cli/test/integration/workflow.service.test.ts +++ b/packages/cli/test/integration/workflow.service.test.ts @@ -9,14 +9,17 @@ import { SharedWorkflowRepository } from '@/databases/repositories/sharedWorkflo import { mock } from 'jest-mock-extended'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { Telemetry } from '@/telemetry'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; let workflowService: WorkflowService; let activeWorkflowRunner: ActiveWorkflowRunner; +let multiMainSetup: MultiMainSetup; beforeAll(async () => { await testDb.init(); activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); + multiMainSetup = mockInstance(MultiMainSetup); mockInstance(Telemetry); workflowService = new WorkflowService( @@ -29,7 +32,7 @@ beforeAll(async () => { mock(), mock(), mock(), - mock(), + multiMainSetup, mock(), mock(), mock(), @@ -82,4 +85,27 @@ describe('update()', () => { expect(addSpy).not.toHaveBeenCalled(); }); + + test('should broadcast active workflow state change if state changed', async () => { + const owner = await createOwner(); + const workflow = await createWorkflow({ active: true }, owner); + + const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + + workflow.active = false; + await workflowService.update(owner, workflow, workflow.id); + + expect(broadcastSpy).toHaveBeenCalledTimes(1); + }); + + test('should not broadcast active workflow state change if state did not change', async () => { + const owner = await createOwner(); + const workflow = await createWorkflow({ active: true }, owner); + + const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + + await workflowService.update(owner, workflow, workflow.id); + + expect(broadcastSpy).not.toHaveBeenCalled(); + }); });