fix(core): Fix test runner in queue mode (no-changelog) (#13479)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
Eugene 2025-02-27 15:13:07 +03:00 committed by GitHub
parent afba8f9ff8
commit a1fee65713
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 200 additions and 3 deletions

View file

@ -8,6 +8,7 @@ import type { ITaskData } from 'n8n-workflow';
import path from 'path';
import type { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { TestDefinition } from '@/databases/entities/test-definition.ee';
import type { TestMetric } from '@/databases/entities/test-metric.ee';
@ -753,6 +754,120 @@ describe('TestRunnerService', () => {
});
});
test('should create proper execution data for queue mode in runTestCase', async () => {
config.set('executions.mode', 'queue');
const testRunnerService = new TestRunnerService(
logger,
telemetry,
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
testRunRepository,
testCaseExecutionRepository,
testMetricRepository,
mockNodeTypes,
errorReporter,
);
// Spy on workflowRunner.run to capture the data passed to it
jest.spyOn(workflowRunner, 'run').mockImplementation(async (data) => {
// Verify the data structure is correct for queue mode
expect(data.executionMode).toBe('evaluation');
// Check that executionData field is properly defined
expect(data.executionData).toBeDefined();
expect(data.executionData!.startData).toBeDefined();
expect(data.executionData!.startData!.startNodes).toBeDefined();
expect(data.executionData!.resultData.pinData).toBeDefined();
expect(data.executionData!.resultData.runData).toBeDefined();
expect(data.executionData!.manualData!.userId).toBeDefined();
expect(data.executionData!.manualData!.partialExecutionVersion).toBe(2);
expect(data.executionData!.manualData!.triggerToStartFrom).toBeDefined();
return 'mock-execution-id';
});
// Mock activeExecutions.getPostExecutePromise to return a successful execution
activeExecutions.getPostExecutePromise.mockResolvedValue(mockExecutionData());
// Create an AbortController for the test
const abortController = new AbortController();
// Setup test metadata
const metadata: any = {
testRunId: 'test-run-id',
userId: 'user-id',
pastExecutionId: 'past-execution-id',
};
// Call runTestCase directly to test the executionData construction
await (testRunnerService as any).runTestCase(
wfUnderTestJson,
executionDataJson,
wfUnderTestJson,
[{ id: '72256d90-3a67-4e29-b032-47df4e5768af' }],
metadata,
abortController.signal,
);
expect(workflowRunner.run).toHaveBeenCalledTimes(1);
});
test('should create proper execution data for regular mode in runTestCase', async () => {
config.set('executions.mode', 'regular');
const testRunnerService = new TestRunnerService(
logger,
telemetry,
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
testRunRepository,
testCaseExecutionRepository,
testMetricRepository,
mockNodeTypes,
errorReporter,
);
// Spy on workflowRunner.run to capture the data passed to it
jest.spyOn(workflowRunner, 'run').mockImplementation(async (data) => {
expect(data.executionMode).toBe('evaluation');
// Check that executionData field is NOT defined
expect(data.executionData).not.toBeDefined();
return 'mock-execution-id';
});
// Mock activeExecutions.getPostExecutePromise to return a successful execution
activeExecutions.getPostExecutePromise.mockResolvedValue(mockExecutionData());
// Create an AbortController for the test
const abortController = new AbortController();
// Setup test metadata
const metadata: any = {
testRunId: 'test-run-id',
userId: 'user-id',
pastExecutionId: 'past-execution-id',
};
// Call runTestCase directly to test the executionData construction
await (testRunnerService as any).runTestCase(
wfUnderTestJson,
executionDataJson,
wfUnderTestJson,
[{ id: '72256d90-3a67-4e29-b032-47df4e5768af' }],
metadata,
abortController.signal,
);
expect(workflowRunner.run).toHaveBeenCalledTimes(1);
});
describe('Test Run cancellation', () => {
beforeAll(() => {
jest.useFakeTimers();

View file

@ -12,6 +12,7 @@ import type {
import assert from 'node:assert';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import type { ExecutionEntity } from '@/databases/entities/execution-entity';
import type { MockedNodeItem, TestDefinition } from '@/databases/entities/test-definition.ee';
import type { TestRun } from '@/databases/entities/test-run.ee';
@ -164,9 +165,17 @@ export class TestRunnerService {
pastExecutionWorkflowData,
);
const startNodesData = this.getStartNodesData(
workflow,
pastExecutionData,
pastExecutionWorkflowData,
);
// Prepare the data to run the workflow
// Evaluation executions should run the same way as manual,
// because they need pinned data and partial execution logic
const data: IWorkflowExecutionDataProcess = {
...this.getStartNodesData(workflow, pastExecutionData, pastExecutionWorkflowData),
...startNodesData,
executionMode: 'evaluation',
runData: {},
pinData,
@ -175,6 +184,25 @@ export class TestRunnerService {
partialExecutionVersion: 2,
};
// When in queue mode, we need to pass additional data to the execution
// the same way as it would be passed in manual mode
if (config.getEnv('executions.mode') === 'queue') {
data.executionData = {
startData: {
startNodes: startNodesData.startNodes,
},
resultData: {
pinData,
runData: {},
},
manualData: {
userId: metadata.userId,
partialExecutionVersion: 2,
triggerToStartFrom: startNodesData.triggerToStartFrom,
},
};
}
// Trigger the workflow under test with mocked data
const executionId = await this.workflowRunner.run(data);
assert(executionId);

View file

@ -1,11 +1,34 @@
import { mock } from 'jest-mock-extended';
import type { Logger } from 'n8n-core';
import { mockInstance } from 'n8n-core/test/utils';
import type { IRunExecutionData, WorkflowExecuteMode } from 'n8n-workflow/src';
import { CredentialsHelper } from '@/credentials-helper';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { VariablesService } from '@/environments.ee/variables/variables.service.ee';
import { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse } from '@/interfaces';
import type { ManualExecutionService } from '@/manual-execution.service';
import { SecretsHelper } from '@/secrets-helpers.ee';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { JobProcessor } from '../job-processor';
import type { Job } from '../scaling.types';
mockInstance(VariablesService, {
getAllCached: jest.fn().mockResolvedValue([]),
});
mockInstance(CredentialsHelper);
mockInstance(SecretsHelper);
mockInstance(WorkflowStaticDataService);
mockInstance(WorkflowStatisticsService);
mockInstance(ExternalHooks);
const logger = mock<Logger>({
scoped: jest.fn().mockImplementation(() => logger),
});
describe('JobProcessor', () => {
it('should refrain from processing a crashed execution', async () => {
const executionRepository = mock<ExecutionRepository>();
@ -13,7 +36,7 @@ describe('JobProcessor', () => {
mock<IExecutionResponse>({ status: 'crashed' }),
);
const jobProcessor = new JobProcessor(
mock(),
logger,
mock(),
executionRepository,
mock(),
@ -26,4 +49,35 @@ describe('JobProcessor', () => {
expect(result).toEqual({ success: false });
});
it.each(['manual', 'evaluation'] satisfies WorkflowExecuteMode[])(
'should use manualExecutionService to process a job in %p mode',
async (mode) => {
const executionRepository = mock<ExecutionRepository>();
executionRepository.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({
mode,
workflowData: { nodes: [] },
data: mock<IRunExecutionData>({
isTestWebhook: false,
}),
}),
);
const manualExecutionService = mock<ManualExecutionService>();
const jobProcessor = new JobProcessor(
logger,
mock(),
executionRepository,
mock(),
mock(),
mock(),
manualExecutionService,
);
await jobProcessor.processJob(mock<Job>());
expect(manualExecutionService.runManually).toHaveBeenCalledTimes(1);
},
);
});

View file

@ -170,7 +170,7 @@ export class JobProcessor {
const { startData, resultData, manualData, isTestWebhook } = execution.data;
if (execution.mode === 'manual' && !isTestWebhook) {
if (['manual', 'evaluation'].includes(execution.mode) && !isTestWebhook) {
const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
workflowData: execution.workflowData,