feat(core): Run evaluation workflow per each test case (no-changelog) (#11757)

This commit is contained in:
Eugene 2024-11-27 11:19:22 +01:00 committed by GitHub
parent 05f8692399
commit 80c5242e16
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 199 additions and 15 deletions

View file

@ -21,7 +21,7 @@
"conditions": [
{
"id": "9d3abc8d-3270-4bec-9a59-82622d5dbb5a",
"leftValue": "={{ $json.actual.Code[0].data.main[0].length }}",
"leftValue": "={{ $json.newExecution.Code[0].data.main[0].length }}",
"rightValue": 3,
"operator": {
"type": "number",
@ -30,7 +30,7 @@
},
{
"id": "894ce84b-13a4-4415-99c0-0c25182903bb",
"leftValue": "={{ $json.actual.Code[0].data.main[0][0].json.random }}",
"leftValue": "={{ $json.newExecution.Code[0].data.main[0][0].json.random }}",
"rightValue": 0.7,
"operator": {
"type": "number",

View file

@ -2,6 +2,7 @@ import type { SelectQueryBuilder } from '@n8n/typeorm';
import { stringify } from 'flatted';
import { readFileSync } from 'fs';
import { mock, mockDeep } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import path from 'path';
import type { ActiveExecutions } from '@/active-executions';
@ -18,6 +19,10 @@ const wfUnderTestJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }),
);
const wfEvaluationJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.evaluation.json'), { encoding: 'utf-8' }),
);
const executionDataJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/execution-data.json'), { encoding: 'utf-8' }),
);
@ -41,6 +46,16 @@ const executionMocks = [
}),
];
function mockExecutionData() {
return mock<IRun>({
data: {
resultData: {
runData: {},
},
},
});
}
describe('TestRunnerService', () => {
const executionRepository = mock<ExecutionRepository>();
const workflowRepository = mock<WorkflowRepository>();
@ -62,6 +77,11 @@ describe('TestRunnerService', () => {
.mockResolvedValueOnce(executionMocks[1]);
});
afterEach(() => {
activeExecutions.getPostExecutePromise.mockClear();
workflowRunner.run.mockClear();
});
test('should create an instance of TestRunnerService', async () => {
const testRunnerService = new TestRunnerService(
workflowRepository,
@ -86,12 +106,18 @@ describe('TestRunnerService', () => {
...wfUnderTestJson,
});
workflowRepository.findById.calledWith('evaluation-workflow-id').mockResolvedValueOnce({
id: 'evaluation-workflow-id',
...wfEvaluationJson,
});
workflowRunner.run.mockResolvedValue('test-execution-id');
await testRunnerService.runTest(
mock<User>(),
mock<TestDefinition>({
workflowId: 'workflow-under-test-id',
evaluationWorkflowId: 'evaluation-workflow-id',
}),
);
@ -99,4 +125,87 @@ describe('TestRunnerService', () => {
expect(executionRepository.findOne).toHaveBeenCalledTimes(2);
expect(workflowRunner.run).toHaveBeenCalledTimes(2);
});
test('should run both workflow under test and evaluation workflow', async () => {
const testRunnerService = new TestRunnerService(
workflowRepository,
workflowRunner,
executionRepository,
activeExecutions,
);
workflowRepository.findById.calledWith('workflow-under-test-id').mockResolvedValueOnce({
id: 'workflow-under-test-id',
...wfUnderTestJson,
});
workflowRepository.findById.calledWith('evaluation-workflow-id').mockResolvedValueOnce({
id: 'evaluation-workflow-id',
...wfEvaluationJson,
});
workflowRunner.run.mockResolvedValueOnce('some-execution-id');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-2');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-3');
workflowRunner.run.mockResolvedValueOnce('some-execution-id-4');
// Mock executions of workflow under test
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id')
.mockResolvedValue(mockExecutionData());
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-2')
.mockResolvedValue(mockExecutionData());
// Mock executions of evaluation workflow
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-3')
.mockResolvedValue(mockExecutionData());
activeExecutions.getPostExecutePromise
.calledWith('some-execution-id-4')
.mockResolvedValue(mockExecutionData());
await testRunnerService.runTest(
mock<User>(),
mock<TestDefinition>({
workflowId: 'workflow-under-test-id',
evaluationWorkflowId: 'evaluation-workflow-id',
}),
);
expect(workflowRunner.run).toHaveBeenCalledTimes(4);
// Check workflow under test was executed
expect(workflowRunner.run).toHaveBeenCalledWith(
expect.objectContaining({
executionMode: 'evaluation',
pinData: {
'When clicking Test workflow':
executionDataJson.resultData.runData['When clicking Test workflow'][0].data.main[0],
},
workflowData: expect.objectContaining({
id: 'workflow-under-test-id',
}),
}),
);
// Check evaluation workflow was executed
expect(workflowRunner.run).toHaveBeenCalledWith(
expect.objectContaining({
executionMode: 'evaluation',
executionData: expect.objectContaining({
executionData: expect.objectContaining({
nodeExecutionStack: expect.arrayContaining([
expect.objectContaining({ data: expect.anything() }),
]),
}),
}),
workflowData: expect.objectContaining({
id: 'evaluation-workflow-id',
}),
}),
);
});
});

View file

@ -1,5 +1,11 @@
import { parse } from 'flatted';
import type { IPinData, IRun, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import type {
IDataObject,
IPinData,
IRun,
IRunData,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import assert from 'node:assert';
import { Service } from 'typedi';
@ -11,6 +17,7 @@ import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { IExecutionResponse } from '@/interfaces';
import { getRunData } from '@/workflow-execute-additional-data';
import { WorkflowRunner } from '@/workflow-runner';
/**
@ -18,7 +25,8 @@ import { WorkflowRunner } from '@/workflow-runner';
* It uses the test definitions to find
* past executions, creates pin data from them,
* and runs the workflow-under-test with the pin data.
* TODO: Evaluation workflows
* After the workflow-under-test finishes, it runs the evaluation workflow
* with the original and new run data.
* TODO: Node pinning
* TODO: Collect metrics
*/
@ -32,14 +40,12 @@ export class TestRunnerService {
) {}
/**
* Extracts the execution data from the past execution.
* Creates a pin data object from the past execution data
* for the given workflow.
* For now, it only pins trigger nodes.
*/
private createPinDataFromExecution(
workflow: WorkflowEntity,
execution: ExecutionEntity,
): IPinData {
private createTestDataFromExecution(workflow: WorkflowEntity, execution: ExecutionEntity) {
const executionData = parse(execution.executionData.data) as IExecutionResponse['data'];
const triggerNodes = workflow.nodes.filter((node) => /trigger$/i.test(node.type));
@ -53,7 +59,7 @@ export class TestRunnerService {
}
}
return pinData;
return { pinData, executionData };
}
/**
@ -65,6 +71,7 @@ export class TestRunnerService {
testCasePinData: IPinData,
userId: string,
): Promise<IRun | undefined> {
// Prepare the data to run the workflow
const data: IWorkflowExecutionDataProcess = {
executionMode: 'evaluation',
runData: {},
@ -78,12 +85,55 @@ export class TestRunnerService {
const executionId = await this.workflowRunner.run(data);
assert(executionId);
// Wait for the workflow to finish execution
// Wait for the execution to finish
const executePromise = this.activeExecutions.getPostExecutePromise(executionId);
return await executePromise;
}
/**
* Run the evaluation workflow with the expected and actual run data.
*/
private async runTestCaseEvaluation(
evaluationWorkflow: WorkflowEntity,
expectedData: IRunData,
actualData: IRunData,
) {
// Prepare the evaluation wf input data.
// Provide both the expected data and the actual data
const evaluationInputData = {
json: {
originalExecution: expectedData,
newExecution: actualData,
},
};
// Prepare the data to run the evaluation workflow
const data = await getRunData(evaluationWorkflow, [evaluationInputData]);
data.executionMode = 'evaluation';
// Trigger the evaluation workflow
const executionId = await this.workflowRunner.run(data);
assert(executionId);
// Wait for the execution to finish
const executePromise = this.activeExecutions.getPostExecutePromise(executionId);
return await executePromise;
}
private extractEvaluationResult(execution: IRun): IDataObject {
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted;
assert(lastNodeExecuted, 'Could not find the last node executed in evaluation workflow');
// Extract the output of the last node executed in the evaluation workflow
// We use only the first item of a first main output
const lastNodeTaskData = execution.data.resultData.runData[lastNodeExecuted]?.[0];
const mainConnectionData = lastNodeTaskData?.data?.main?.[0];
return mainConnectionData?.[0]?.json ?? {};
}
/**
* Creates a new test run for the given test definition.
*/
@ -91,6 +141,9 @@ export class TestRunnerService {
const workflow = await this.workflowRepository.findById(test.workflowId);
assert(workflow, 'Workflow not found');
const evaluationWorkflow = await this.workflowRepository.findById(test.evaluationWorkflowId);
assert(evaluationWorkflow, 'Evaluation workflow not found');
// 1. Make test cases from previous executions
// Select executions with the annotation tag and workflow ID of the test.
@ -105,7 +158,7 @@ export class TestRunnerService {
.andWhere('execution.workflowId = :workflowId', { workflowId: test.workflowId })
.getMany();
// 2. Run the test cases
// 2. Run over all the test cases
for (const { id: pastExecutionId } of pastExecutions) {
const pastExecution = await this.executionRepository.findOne({
@ -114,16 +167,38 @@ export class TestRunnerService {
});
assert(pastExecution, 'Execution not found');
const pinData = this.createPinDataFromExecution(workflow, pastExecution);
const testData = this.createTestDataFromExecution(workflow, pastExecution);
const { pinData, executionData } = testData;
// Run the test case and wait for it to finish
const execution = await this.runTestCase(workflow, pinData, user.id);
const testCaseExecution = await this.runTestCase(workflow, pinData, user.id);
if (!execution) {
// In case of a permission check issue, the test case execution will be undefined.
// Skip them and continue with the next test case
if (!testCaseExecution) {
continue;
}
// TODO: 2.3 Collect the run data
// Collect the results of the test case execution
const testCaseRunData = testCaseExecution.data.resultData.runData;
// Get the original runData from the test case execution data
const originalRunData = executionData.resultData.runData;
// Run the evaluation workflow with the original and new run data
const evalExecution = await this.runTestCaseEvaluation(
evaluationWorkflow,
originalRunData,
testCaseRunData,
);
assert(evalExecution);
// Extract the output of the last node executed in the evaluation workflow
this.extractEvaluationResult(evalExecution);
// TODO: collect metrics
}
// TODO: 3. Aggregate the results
}
}