From f924f2a6d736e33ab5fc12cbac6cba27340839db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 20 Dec 2024 15:25:33 +0100 Subject: [PATCH] fix(core): Register workflows as active only after all of the triggers and pollers setup successfully (#12244) --- packages/core/src/ActiveWorkflows.ts | 21 +- .../src/__tests__/ActiveWorkflows.test.ts | 290 ++++++++++++++++++ packages/core/test/TriggersAndPollers.test.ts | 138 +++++---- 3 files changed, 373 insertions(+), 76 deletions(-) create mode 100644 packages/core/src/__tests__/ActiveWorkflows.test.ts diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index b7604f9778..e3ca8614c2 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -71,16 +71,13 @@ export class ActiveWorkflows { getTriggerFunctions: IGetExecuteTriggerFunctions, getPollFunctions: IGetExecutePollFunctions, ) { - this.activeWorkflows[workflowId] = {}; const triggerNodes = workflow.getTriggerNodes(); - let triggerResponse: ITriggerResponse | undefined; - - this.activeWorkflows[workflowId].triggerResponses = []; + const triggerResponses: ITriggerResponse[] = []; for (const triggerNode of triggerNodes) { try { - triggerResponse = await this.triggersAndPollers.runTrigger( + const triggerResponse = await this.triggersAndPollers.runTrigger( workflow, triggerNode, getTriggerFunctions, @@ -89,10 +86,7 @@ export class ActiveWorkflows { activation, ); if (triggerResponse !== undefined) { - // If a response was given save it - - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion - this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse); + triggerResponses.push(triggerResponse); } } catch (e) { const error = e instanceof Error ? e : new Error(`${e}`); @@ -104,6 +98,8 @@ export class ActiveWorkflows { } } + this.activeWorkflows[workflowId] = { triggerResponses }; + const pollingNodes = workflow.getPollNodes(); if (pollingNodes.length === 0) return; @@ -119,6 +115,11 @@ export class ActiveWorkflows { activation, ); } catch (e) { + // Do not mark this workflow as active if there are no triggerResponses, and any polling activation failed + if (triggerResponses.length === 0) { + delete this.activeWorkflows[workflowId]; + } + const error = e instanceof Error ? e : new Error(`${e}`); throw new WorkflowActivationError( @@ -132,7 +133,7 @@ export class ActiveWorkflows { /** * Activates polling for the given node */ - async activatePolling( + private async activatePolling( node: INode, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, diff --git a/packages/core/src/__tests__/ActiveWorkflows.test.ts b/packages/core/src/__tests__/ActiveWorkflows.test.ts new file mode 100644 index 0000000000..85487a0cec --- /dev/null +++ b/packages/core/src/__tests__/ActiveWorkflows.test.ts @@ -0,0 +1,290 @@ +import { mock } from 'jest-mock-extended'; +import type { + IGetExecuteTriggerFunctions, + INode, + ITriggerResponse, + IWorkflowExecuteAdditionalData, + Workflow, + WorkflowActivateMode, + WorkflowExecuteMode, + TriggerTime, + CronExpression, +} from 'n8n-workflow'; +import { LoggerProxy, TriggerCloseError, WorkflowActivationError } from 'n8n-workflow'; + +import { ActiveWorkflows } from '@/ActiveWorkflows'; +import type { ErrorReporter } from '@/error-reporter'; +import type { PollContext } from '@/node-execution-context'; +import type { ScheduledTaskManager } from '@/ScheduledTaskManager'; +import type { TriggersAndPollers } from '@/TriggersAndPollers'; + +describe('ActiveWorkflows', () => { + const workflowId = 'test-workflow-id'; + const workflow = mock(); + const additionalData = mock(); + const mode: WorkflowExecuteMode = 'trigger'; + const activation: WorkflowActivateMode = 'init'; + + const getTriggerFunctions = jest.fn() as IGetExecuteTriggerFunctions; + const triggerResponse = mock(); + + const pollFunctions = mock(); + const getPollFunctions = jest.fn(); + + LoggerProxy.init(mock()); + const scheduledTaskManager = mock(); + const triggersAndPollers = mock(); + const errorReporter = mock(); + const triggerNode = mock(); + const pollNode = mock(); + + let activeWorkflows: ActiveWorkflows; + + beforeEach(() => { + jest.clearAllMocks(); + activeWorkflows = new ActiveWorkflows(scheduledTaskManager, triggersAndPollers, errorReporter); + }); + + type PollTimes = { item: TriggerTime[] }; + type TestOptions = { + triggerNodes?: INode[]; + pollNodes?: INode[]; + triggerError?: Error; + pollError?: Error; + pollTimes?: PollTimes; + }; + + const addWorkflow = async ({ + triggerNodes = [], + pollNodes = [], + triggerError, + pollError, + pollTimes = { item: [{ mode: 'everyMinute' }] }, + }: TestOptions) => { + workflow.getTriggerNodes.mockReturnValue(triggerNodes); + workflow.getPollNodes.mockReturnValue(pollNodes); + pollFunctions.getNodeParameter.calledWith('pollTimes').mockReturnValue(pollTimes); + + if (triggerError) { + triggersAndPollers.runTrigger.mockRejectedValueOnce(triggerError); + } else { + triggersAndPollers.runTrigger.mockResolvedValue(triggerResponse); + } + + if (pollError) { + triggersAndPollers.runPoll.mockRejectedValueOnce(pollError); + } else { + getPollFunctions.mockReturnValue(pollFunctions); + } + + return await activeWorkflows.add( + workflowId, + workflow, + additionalData, + mode, + activation, + getTriggerFunctions, + getPollFunctions, + ); + }; + + describe('add()', () => { + describe('should activate workflow', () => { + it('with trigger nodes', async () => { + await addWorkflow({ triggerNodes: [triggerNode] }); + + expect(activeWorkflows.isActive(workflowId)).toBe(true); + expect(workflow.getTriggerNodes).toHaveBeenCalled(); + expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith( + workflow, + triggerNode, + getTriggerFunctions, + additionalData, + mode, + activation, + ); + }); + + it('with polling nodes', async () => { + await addWorkflow({ pollNodes: [pollNode] }); + + expect(activeWorkflows.isActive(workflowId)).toBe(true); + expect(workflow.getPollNodes).toHaveBeenCalled(); + expect(scheduledTaskManager.registerCron).toHaveBeenCalled(); + }); + + it('with both trigger and polling nodes', async () => { + await addWorkflow({ triggerNodes: [triggerNode], pollNodes: [pollNode] }); + + expect(activeWorkflows.isActive(workflowId)).toBe(true); + expect(workflow.getTriggerNodes).toHaveBeenCalled(); + expect(workflow.getPollNodes).toHaveBeenCalled(); + expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith( + workflow, + triggerNode, + getTriggerFunctions, + additionalData, + mode, + activation, + ); + expect(scheduledTaskManager.registerCron).toHaveBeenCalled(); + expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions); + }); + }); + + describe('should throw error', () => { + it('if trigger activation fails', async () => { + const error = new Error('Trigger activation failed'); + await expect( + addWorkflow({ triggerNodes: [triggerNode], triggerError: error }), + ).rejects.toThrow(WorkflowActivationError); + expect(activeWorkflows.isActive(workflowId)).toBe(false); + }); + + it('if polling activation fails', async () => { + const error = new Error('Failed to activate polling'); + await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow( + WorkflowActivationError, + ); + expect(activeWorkflows.isActive(workflowId)).toBe(false); + }); + + it('if the polling interval is too short', async () => { + const pollTimes: PollTimes = { + item: [ + { + mode: 'custom', + cronExpression: '* * * * *' as CronExpression, + }, + ], + }; + + await expect(addWorkflow({ pollNodes: [pollNode], pollTimes })).rejects.toThrow( + 'The polling interval is too short. It has to be at least a minute.', + ); + + expect(scheduledTaskManager.registerCron).not.toHaveBeenCalled(); + }); + }); + + describe('should handle polling errors', () => { + it('should throw error when poll fails during initial testing', async () => { + const error = new Error('Poll function failed'); + + await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow( + WorkflowActivationError, + ); + + expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions); + expect(pollFunctions.__emit).not.toHaveBeenCalled(); + expect(pollFunctions.__emitError).not.toHaveBeenCalled(); + }); + + it('should emit error when poll fails during regular polling', async () => { + const error = new Error('Poll function failed'); + triggersAndPollers.runPoll + .mockResolvedValueOnce(null) // Succeed on first call (testing) + .mockRejectedValueOnce(error); // Fail on second call (regular polling) + + await addWorkflow({ pollNodes: [pollNode] }); + + // Get the executeTrigger function that was registered + const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0]; + const executeTrigger = registerCronCall[2] as () => Promise; + + // Execute the trigger function to simulate a regular poll + await executeTrigger(); + + expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(2); + expect(pollFunctions.__emit).not.toHaveBeenCalled(); + expect(pollFunctions.__emitError).toHaveBeenCalledWith(error); + }); + }); + }); + + describe('remove()', () => { + const setupForRemoval = async () => { + await addWorkflow({ triggerNodes: [triggerNode] }); + return await activeWorkflows.remove(workflowId); + }; + + it('should remove an active workflow', async () => { + const result = await setupForRemoval(); + + expect(result).toBe(true); + expect(activeWorkflows.isActive(workflowId)).toBe(false); + expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId); + expect(triggerResponse.closeFunction).toHaveBeenCalled(); + }); + + it('should return false when removing non-existent workflow', async () => { + const result = await activeWorkflows.remove('non-existent'); + + expect(result).toBe(false); + expect(scheduledTaskManager.deregisterCrons).not.toHaveBeenCalled(); + }); + + it('should handle TriggerCloseError when closing trigger', async () => { + const triggerCloseError = new TriggerCloseError(triggerNode, { level: 'warning' }); + (triggerResponse.closeFunction as jest.Mock).mockRejectedValueOnce(triggerCloseError); + + const result = await setupForRemoval(); + + expect(result).toBe(true); + expect(activeWorkflows.isActive(workflowId)).toBe(false); + expect(triggerResponse.closeFunction).toHaveBeenCalled(); + expect(errorReporter.error).toHaveBeenCalledWith(triggerCloseError, { + extra: { workflowId }, + }); + }); + + it('should throw WorkflowDeactivationError when closeFunction throws regular error', async () => { + const error = new Error('Close function failed'); + (triggerResponse.closeFunction as jest.Mock).mockRejectedValueOnce(error); + + await addWorkflow({ triggerNodes: [triggerNode] }); + + await expect(activeWorkflows.remove(workflowId)).rejects.toThrow( + `Failed to deactivate trigger of workflow ID "${workflowId}": "Close function failed"`, + ); + + expect(triggerResponse.closeFunction).toHaveBeenCalled(); + expect(errorReporter.error).not.toHaveBeenCalled(); + }); + }); + + describe('get() and isActive()', () => { + it('should return workflow data for active workflow', async () => { + await addWorkflow({ triggerNodes: [triggerNode] }); + + expect(activeWorkflows.isActive(workflowId)).toBe(true); + expect(activeWorkflows.get(workflowId)).toBeDefined(); + }); + + it('should return undefined for non-active workflow', () => { + expect(activeWorkflows.isActive('non-existent')).toBe(false); + expect(activeWorkflows.get('non-existent')).toBeUndefined(); + }); + }); + + describe('allActiveWorkflows()', () => { + it('should return all active workflow IDs', async () => { + await addWorkflow({ triggerNodes: [triggerNode] }); + + const activeIds = activeWorkflows.allActiveWorkflows(); + + expect(activeIds).toEqual([workflowId]); + }); + }); + + describe('removeAllTriggerAndPollerBasedWorkflows()', () => { + it('should remove all active workflows', async () => { + await addWorkflow({ triggerNodes: [triggerNode] }); + + await activeWorkflows.removeAllTriggerAndPollerBasedWorkflows(); + + expect(activeWorkflows.allActiveWorkflows()).toEqual([]); + expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId); + }); + }); +}); diff --git a/packages/core/test/TriggersAndPollers.test.ts b/packages/core/test/TriggersAndPollers.test.ts index c30a0693a6..27cc8b47d9 100644 --- a/packages/core/test/TriggersAndPollers.test.ts +++ b/packages/core/test/TriggersAndPollers.test.ts @@ -9,6 +9,8 @@ import type { INodeType, INodeTypes, ITriggerFunctions, + WorkflowHooks, + IRun, } from 'n8n-workflow'; import { TriggersAndPollers } from '@/TriggersAndPollers'; @@ -21,11 +23,13 @@ describe('TriggersAndPollers', () => { }); const nodeTypes = mock(); const workflow = mock({ nodeTypes }); + const hookFunctions = mock({ + sendResponse: [], + workflowExecuteAfter: [], + }); const additionalData = mock({ hooks: { - hookFunctions: { - sendResponse: [], - }, + hookFunctions, }, }); const triggersAndPollers = new TriggersAndPollers(); @@ -39,87 +43,80 @@ describe('TriggersAndPollers', () => { const triggerFunctions = mock(); const getTriggerFunctions = jest.fn().mockReturnValue(triggerFunctions); const triggerFn = jest.fn(); + const mockEmitData: INodeExecutionData[][] = [[{ json: { data: 'test' } }]]; + + const runTriggerHelper = async (mode: 'manual' | 'trigger' = 'trigger') => + await triggersAndPollers.runTrigger( + workflow, + node, + getTriggerFunctions, + additionalData, + mode, + 'init', + ); it('should throw error if node type does not have trigger function', async () => { - await expect( - triggersAndPollers.runTrigger( - workflow, - node, - getTriggerFunctions, - additionalData, - 'trigger', - 'init', - ), - ).rejects.toThrow(ApplicationError); + await expect(runTriggerHelper()).rejects.toThrow(ApplicationError); }); it('should call trigger function in regular mode', async () => { nodeType.trigger = triggerFn; triggerFn.mockResolvedValue({ test: true }); - const result = await triggersAndPollers.runTrigger( - workflow, - node, - getTriggerFunctions, - additionalData, - 'trigger', - 'init', - ); + const result = await runTriggerHelper(); expect(triggerFn).toHaveBeenCalled(); expect(result).toEqual({ test: true }); }); - it('should handle manual mode with promise resolution', async () => { - const mockEmitData: INodeExecutionData[][] = [[{ json: { data: 'test' } }]]; - const mockTriggerResponse = { workflowId: '123' }; + describe('manual mode', () => { + const getMockTriggerFunctions = () => getTriggerFunctions.mock.results[0]?.value; - nodeType.trigger = triggerFn; - triggerFn.mockResolvedValue(mockTriggerResponse); + beforeEach(() => { + nodeType.trigger = triggerFn; + triggerFn.mockResolvedValue({ workflowId: '123' }); + }); - const result = await triggersAndPollers.runTrigger( - workflow, - node, - getTriggerFunctions, - additionalData, - 'manual', - 'init', - ); + it('should handle promise resolution', async () => { + const result = await runTriggerHelper('manual'); - expect(result).toBeDefined(); - expect(result?.manualTriggerResponse).toBeInstanceOf(Promise); + expect(result?.manualTriggerResponse).toBeInstanceOf(Promise); + getMockTriggerFunctions()?.emit?.(mockEmitData); + }); - // Simulate emit - const mockTriggerFunctions = getTriggerFunctions.mock.results[0]?.value; - if (mockTriggerFunctions?.emit) { - mockTriggerFunctions.emit(mockEmitData); - } - }); + it('should handle error emission', async () => { + const testError = new Error('Test error'); + const result = await runTriggerHelper('manual'); - it('should handle error emission in manual mode', async () => { - const testError = new Error('Test error'); + getMockTriggerFunctions()?.emitError?.(testError); + await expect(result?.manualTriggerResponse).rejects.toThrow(testError); + }); - nodeType.trigger = triggerFn; - triggerFn.mockResolvedValue({}); + it('should handle response promise', async () => { + const responsePromise = { resolve: jest.fn(), reject: jest.fn() }; + await runTriggerHelper('manual'); - const result = await triggersAndPollers.runTrigger( - workflow, - node, - getTriggerFunctions, - additionalData, - 'manual', - 'init', - ); + getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise); - expect(result?.manualTriggerResponse).toBeInstanceOf(Promise); + expect(hookFunctions.sendResponse?.length).toBe(1); + await hookFunctions.sendResponse![0]?.({ testResponse: true }); + expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); + }); - // Simulate error - const mockTriggerFunctions = getTriggerFunctions.mock.results[0]?.value; - if (mockTriggerFunctions?.emitError) { - mockTriggerFunctions.emitError(testError); - } + it('should handle both response and done promises', async () => { + const responsePromise = { resolve: jest.fn(), reject: jest.fn() }; + const donePromise = { resolve: jest.fn(), reject: jest.fn() }; + const mockRunData = mock({ data: { resultData: { runData: {} } } }); - await expect(result?.manualTriggerResponse).rejects.toThrow(testError); + await runTriggerHelper('manual'); + getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise); + + await hookFunctions.sendResponse![0]?.({ testResponse: true }); + expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true }); + + await hookFunctions.workflowExecuteAfter?.[0]?.(mockRunData, {}); + expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData); + }); }); }); @@ -127,10 +124,11 @@ describe('TriggersAndPollers', () => { const pollFunctions = mock(); const pollFn = jest.fn(); + const runPollHelper = async () => + await triggersAndPollers.runPoll(workflow, node, pollFunctions); + it('should throw error if node type does not have poll function', async () => { - await expect(triggersAndPollers.runPoll(workflow, node, pollFunctions)).rejects.toThrow( - ApplicationError, - ); + await expect(runPollHelper()).rejects.toThrow(ApplicationError); }); it('should call poll function and return result', async () => { @@ -138,7 +136,7 @@ describe('TriggersAndPollers', () => { nodeType.poll = pollFn; pollFn.mockResolvedValue(mockPollResult); - const result = await triggersAndPollers.runPoll(workflow, node, pollFunctions); + const result = await runPollHelper(); expect(pollFn).toHaveBeenCalled(); expect(result).toBe(mockPollResult); @@ -148,10 +146,18 @@ describe('TriggersAndPollers', () => { nodeType.poll = pollFn; pollFn.mockResolvedValue(null); - const result = await triggersAndPollers.runPoll(workflow, node, pollFunctions); + const result = await runPollHelper(); expect(pollFn).toHaveBeenCalled(); expect(result).toBeNull(); }); + + it('should propagate errors from poll function', async () => { + nodeType.poll = pollFn; + pollFn.mockRejectedValue(new Error('Poll function failed')); + + await expect(runPollHelper()).rejects.toThrow('Poll function failed'); + expect(pollFn).toHaveBeenCalled(); + }); }); });