diff --git a/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts b/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts index caf176d01f..d91a2d0ee7 100644 --- a/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts +++ b/packages/cli/src/evaluation.ee/test-runner/__tests__/test-runner.service.ee.test.ts @@ -8,6 +8,7 @@ import type { ITaskData } from 'n8n-workflow'; import path from 'path'; import type { ActiveExecutions } from '@/active-executions'; +import config from '@/config'; import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { TestDefinition } from '@/databases/entities/test-definition.ee'; import type { TestMetric } from '@/databases/entities/test-metric.ee'; @@ -753,6 +754,120 @@ describe('TestRunnerService', () => { }); }); + test('should create proper execution data for queue mode in runTestCase', async () => { + config.set('executions.mode', 'queue'); + + const testRunnerService = new TestRunnerService( + logger, + telemetry, + workflowRepository, + workflowRunner, + executionRepository, + activeExecutions, + testRunRepository, + testCaseExecutionRepository, + testMetricRepository, + mockNodeTypes, + errorReporter, + ); + + // Spy on workflowRunner.run to capture the data passed to it + jest.spyOn(workflowRunner, 'run').mockImplementation(async (data) => { + // Verify the data structure is correct for queue mode + expect(data.executionMode).toBe('evaluation'); + + // Check that executionData field is properly defined + expect(data.executionData).toBeDefined(); + expect(data.executionData!.startData).toBeDefined(); + expect(data.executionData!.startData!.startNodes).toBeDefined(); + expect(data.executionData!.resultData.pinData).toBeDefined(); + expect(data.executionData!.resultData.runData).toBeDefined(); + expect(data.executionData!.manualData!.userId).toBeDefined(); + expect(data.executionData!.manualData!.partialExecutionVersion).toBe(2); + expect(data.executionData!.manualData!.triggerToStartFrom).toBeDefined(); + + return 'mock-execution-id'; + }); + + // Mock activeExecutions.getPostExecutePromise to return a successful execution + activeExecutions.getPostExecutePromise.mockResolvedValue(mockExecutionData()); + + // Create an AbortController for the test + const abortController = new AbortController(); + + // Setup test metadata + const metadata: any = { + testRunId: 'test-run-id', + userId: 'user-id', + pastExecutionId: 'past-execution-id', + }; + + // Call runTestCase directly to test the executionData construction + await (testRunnerService as any).runTestCase( + wfUnderTestJson, + executionDataJson, + wfUnderTestJson, + [{ id: '72256d90-3a67-4e29-b032-47df4e5768af' }], + metadata, + abortController.signal, + ); + + expect(workflowRunner.run).toHaveBeenCalledTimes(1); + }); + + test('should create proper execution data for regular mode in runTestCase', async () => { + config.set('executions.mode', 'regular'); + + const testRunnerService = new TestRunnerService( + logger, + telemetry, + workflowRepository, + workflowRunner, + executionRepository, + activeExecutions, + testRunRepository, + testCaseExecutionRepository, + testMetricRepository, + mockNodeTypes, + errorReporter, + ); + + // Spy on workflowRunner.run to capture the data passed to it + jest.spyOn(workflowRunner, 'run').mockImplementation(async (data) => { + expect(data.executionMode).toBe('evaluation'); + + // Check that executionData field is NOT defined + expect(data.executionData).not.toBeDefined(); + + return 'mock-execution-id'; + }); + + // Mock activeExecutions.getPostExecutePromise to return a successful execution + activeExecutions.getPostExecutePromise.mockResolvedValue(mockExecutionData()); + + // Create an AbortController for the test + const abortController = new AbortController(); + + // Setup test metadata + const metadata: any = { + testRunId: 'test-run-id', + userId: 'user-id', + pastExecutionId: 'past-execution-id', + }; + + // Call runTestCase directly to test the executionData construction + await (testRunnerService as any).runTestCase( + wfUnderTestJson, + executionDataJson, + wfUnderTestJson, + [{ id: '72256d90-3a67-4e29-b032-47df4e5768af' }], + metadata, + abortController.signal, + ); + + expect(workflowRunner.run).toHaveBeenCalledTimes(1); + }); + describe('Test Run cancellation', () => { beforeAll(() => { jest.useFakeTimers(); diff --git a/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts b/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts index 433c86cbcf..38e38c9803 100644 --- a/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts +++ b/packages/cli/src/evaluation.ee/test-runner/test-runner.service.ee.ts @@ -12,6 +12,7 @@ import type { import assert from 'node:assert'; import { ActiveExecutions } from '@/active-executions'; +import config from '@/config'; import type { ExecutionEntity } from '@/databases/entities/execution-entity'; import type { MockedNodeItem, TestDefinition } from '@/databases/entities/test-definition.ee'; import type { TestRun } from '@/databases/entities/test-run.ee'; @@ -164,9 +165,17 @@ export class TestRunnerService { pastExecutionWorkflowData, ); + const startNodesData = this.getStartNodesData( + workflow, + pastExecutionData, + pastExecutionWorkflowData, + ); + // Prepare the data to run the workflow + // Evaluation executions should run the same way as manual, + // because they need pinned data and partial execution logic const data: IWorkflowExecutionDataProcess = { - ...this.getStartNodesData(workflow, pastExecutionData, pastExecutionWorkflowData), + ...startNodesData, executionMode: 'evaluation', runData: {}, pinData, @@ -175,6 +184,25 @@ export class TestRunnerService { partialExecutionVersion: 2, }; + // When in queue mode, we need to pass additional data to the execution + // the same way as it would be passed in manual mode + if (config.getEnv('executions.mode') === 'queue') { + data.executionData = { + startData: { + startNodes: startNodesData.startNodes, + }, + resultData: { + pinData, + runData: {}, + }, + manualData: { + userId: metadata.userId, + partialExecutionVersion: 2, + triggerToStartFrom: startNodesData.triggerToStartFrom, + }, + }; + } + // Trigger the workflow under test with mocked data const executionId = await this.workflowRunner.run(data); assert(executionId); diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts index 897986c915..b3236268b5 100644 --- a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -1,11 +1,34 @@ import { mock } from 'jest-mock-extended'; +import type { Logger } from 'n8n-core'; +import { mockInstance } from 'n8n-core/test/utils'; +import type { IRunExecutionData, WorkflowExecuteMode } from 'n8n-workflow/src'; +import { CredentialsHelper } from '@/credentials-helper'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { VariablesService } from '@/environments.ee/variables/variables.service.ee'; +import { ExternalHooks } from '@/external-hooks'; import type { IExecutionResponse } from '@/interfaces'; +import type { ManualExecutionService } from '@/manual-execution.service'; +import { SecretsHelper } from '@/secrets-helpers.ee'; +import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; +import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { JobProcessor } from '../job-processor'; import type { Job } from '../scaling.types'; +mockInstance(VariablesService, { + getAllCached: jest.fn().mockResolvedValue([]), +}); +mockInstance(CredentialsHelper); +mockInstance(SecretsHelper); +mockInstance(WorkflowStaticDataService); +mockInstance(WorkflowStatisticsService); +mockInstance(ExternalHooks); + +const logger = mock({ + scoped: jest.fn().mockImplementation(() => logger), +}); + describe('JobProcessor', () => { it('should refrain from processing a crashed execution', async () => { const executionRepository = mock(); @@ -13,7 +36,7 @@ describe('JobProcessor', () => { mock({ status: 'crashed' }), ); const jobProcessor = new JobProcessor( - mock(), + logger, mock(), executionRepository, mock(), @@ -26,4 +49,35 @@ describe('JobProcessor', () => { expect(result).toEqual({ success: false }); }); + + it.each(['manual', 'evaluation'] satisfies WorkflowExecuteMode[])( + 'should use manualExecutionService to process a job in %p mode', + async (mode) => { + const executionRepository = mock(); + executionRepository.findSingleExecution.mockResolvedValue( + mock({ + mode, + workflowData: { nodes: [] }, + data: mock({ + isTestWebhook: false, + }), + }), + ); + + const manualExecutionService = mock(); + const jobProcessor = new JobProcessor( + logger, + mock(), + executionRepository, + mock(), + mock(), + mock(), + manualExecutionService, + ); + + await jobProcessor.processJob(mock()); + + expect(manualExecutionService.runManually).toHaveBeenCalledTimes(1); + }, + ); }); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 2fa9288a79..1c54f0dbc4 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -170,7 +170,7 @@ export class JobProcessor { const { startData, resultData, manualData, isTestWebhook } = execution.data; - if (execution.mode === 'manual' && !isTestWebhook) { + if (['manual', 'evaluation'].includes(execution.mode) && !isTestWebhook) { const data: IWorkflowExecutionDataProcess = { executionMode: execution.mode, workflowData: execution.workflowData,