diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index 66c26f00c6..49e8517272 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -1,33 +1,46 @@ import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; +import type { IWorkflowBase } from 'n8n-workflow'; +import type { Project } from '@/databases/entities/project'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/interfaces'; import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { OrchestrationService } from '@/services/orchestration.service'; +import type { OwnershipService } from '@/services/ownership.service'; import { WaitTracker } from '@/wait-tracker'; +import type { WorkflowRunner } from '@/workflow-runner'; import { mockLogger } from '@test/mocking'; jest.useFakeTimers(); describe('WaitTracker', () => { + const ownershipService = mock(); + const workflowRunner = mock(); const executionRepository = mock(); const multiMainSetup = mock(); const orchestrationService = new OrchestrationService(mock(), multiMainSetup, mock()); const instanceSettings = mock({ isLeader: true }); + const project = mock({ id: 'projectId' }); const execution = mock({ id: '123', + finished: false, waitTill: new Date(Date.now() + 1000), + mode: 'manual', + data: mock({ + pushRef: 'push_ref', + }), }); + execution.workflowData = mock({ id: 'abcd' }); let waitTracker: WaitTracker; beforeEach(() => { waitTracker = new WaitTracker( mockLogger(), executionRepository, - mock(), - mock(), + ownershipService, + workflowRunner, orchestrationService, instanceSettings, ); @@ -64,29 +77,31 @@ describe('WaitTracker', () => { }); describe('if execution to start', () => { - it('if not enough time passed, should not start execution', async () => { + let startExecutionSpy: jest.SpyInstance, [executionId: string]>; + + beforeEach(() => { + executionRepository.findSingleExecution.mockResolvedValue(execution); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + ownershipService.getWorkflowProjectCached.mockResolvedValue(project); + + startExecutionSpy = jest + .spyOn(waitTracker, 'startExecution') + .mockImplementation(async () => {}); + waitTracker.init(); + }); - executionRepository.getWaitingExecutions.mockResolvedValue([execution]); + it('if not enough time passed, should not start execution', async () => { await waitTracker.getWaitingExecutions(); - const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution'); - jest.advanceTimersByTime(100); expect(startExecutionSpy).not.toHaveBeenCalled(); }); it('if enough time passed, should start execution', async () => { - executionRepository.getWaitingExecutions.mockResolvedValue([]); - waitTracker.init(); - - executionRepository.getWaitingExecutions.mockResolvedValue([execution]); await waitTracker.getWaitingExecutions(); - const startExecutionSpy = jest.spyOn(waitTracker, 'startExecution'); - jest.advanceTimersByTime(2_000); expect(startExecutionSpy).toHaveBeenCalledWith(execution.id); @@ -100,13 +115,27 @@ describe('WaitTracker', () => { waitTracker.init(); executionRepository.findSingleExecution.mockResolvedValue(execution); - waitTracker.startExecution(execution.id); - jest.advanceTimersByTime(5); + ownershipService.getWorkflowProjectCached.mockResolvedValue(project); + + await waitTracker.startExecution(execution.id); expect(executionRepository.findSingleExecution).toHaveBeenCalledWith(execution.id, { includeData: true, unflattenData: true, }); + + expect(workflowRunner.run).toHaveBeenCalledWith( + { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + pushRef: execution.data.pushRef, + }, + false, + false, + execution.id, + ); }); }); @@ -135,8 +164,8 @@ describe('WaitTracker', () => { const waitTracker = new WaitTracker( mockLogger(), executionRepository, - mock(), - mock(), + ownershipService, + workflowRunner, orchestrationService, mock({ isLeader: false }), ); diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 868fafa526..7035db3cbe 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -1,9 +1,5 @@ import { InstanceSettings } from 'n8n-core'; -import { - ApplicationError, - ErrorReporterProxy as ErrorReporter, - type IWorkflowExecutionDataProcess, -} from 'n8n-workflow'; +import { ApplicationError, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import { Service } from 'typedi'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; @@ -88,7 +84,7 @@ export class WaitTracker { this.waitingExecutions[executionId] = { executionId, timer: setTimeout(() => { - this.startExecution(executionId); + void this.startExecution(executionId); }, triggerTime), }; } @@ -103,46 +99,40 @@ export class WaitTracker { delete this.waitingExecutions[executionId]; } - startExecution(executionId: string) { + async startExecution(executionId: string) { this.logger.debug(`Resuming execution ${executionId}`, { executionId }); delete this.waitingExecutions[executionId]; - (async () => { - // Get the data to execute - const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (!fullExecutionData) { - throw new ApplicationError('Execution does not exist.', { extra: { executionId } }); - } - if (fullExecutionData.finished) { - throw new ApplicationError('The execution did succeed and can so not be started again.'); - } - - if (!fullExecutionData.workflowData.id) { - throw new ApplicationError('Only saved workflows can be resumed.'); - } - const workflowId = fullExecutionData.workflowData.id; - const project = await this.ownershipService.getWorkflowProjectCached(workflowId); - - const data: IWorkflowExecutionDataProcess = { - executionMode: fullExecutionData.mode, - executionData: fullExecutionData.data, - workflowData: fullExecutionData.workflowData, - projectId: project.id, - }; - - // Start the execution again - await this.workflowRunner.run(data, false, false, executionId); - })().catch((error: Error) => { - ErrorReporter.error(error); - this.logger.error( - `There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`, - { executionId }, - ); + // Get the data to execute + const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, }); + + if (!fullExecutionData) { + throw new ApplicationError('Execution does not exist.', { extra: { executionId } }); + } + if (fullExecutionData.finished) { + throw new ApplicationError('The execution did succeed and can so not be started again.'); + } + + if (!fullExecutionData.workflowData.id) { + throw new ApplicationError('Only saved workflows can be resumed.'); + } + + const workflowId = fullExecutionData.workflowData.id; + const project = await this.ownershipService.getWorkflowProjectCached(workflowId); + + const data: IWorkflowExecutionDataProcess = { + executionMode: fullExecutionData.mode, + executionData: fullExecutionData.data, + workflowData: fullExecutionData.workflowData, + projectId: project.id, + pushRef: fullExecutionData.data.pushRef, + }; + + // Start the execution again + await this.workflowRunner.run(data, false, false, executionId); } stopTracking() {