additional tests for saveExecutionProgress

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2025-01-30 15:12:26 +01:00
parent e833b1b1a3
commit 877511c0e1
No known key found for this signature in database
2 changed files with 147 additions and 83 deletions

View file

@ -8,63 +8,142 @@ import { mockInstance } from '@test/mocking';
import { saveExecutionProgress } from '../save-execution-progress';
mockInstance(Logger);
const errorReporter = mockInstance(ErrorReporter);
const executionRepository = mockInstance(ExecutionRepository);
describe('saveExecutionProgress', () => {
mockInstance(Logger);
const errorReporter = mockInstance(ErrorReporter);
const executionRepository = mockInstance(ExecutionRepository);
afterEach(() => {
jest.clearAllMocks();
});
afterEach(() => {
jest.resetAllMocks();
});
const commonArgs: [string, string, string, ITaskData, IRunExecutionData] = [
'some-workflow-id',
'some-execution-id',
'My Node',
{} as ITaskData,
{} as IRunExecutionData,
];
const commonArgs: [string, string, string, ITaskData, IRunExecutionData] = [
'some-workflow-id',
'some-execution-id',
'My Node',
{} as ITaskData,
{} as IRunExecutionData,
];
test('should ignore on leftover async call', async () => {
executionRepository.findSingleExecution.mockResolvedValue({
finished: true,
} as IExecutionResponse);
test('should not try to update non-existent executions', async () => {
executionRepository.findSingleExecution.mockResolvedValue(undefined);
await saveExecutionProgress(...commonArgs);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should handle DB errors on execution lookup', async () => {
const error = new Error('Something went wrong');
executionRepository.findSingleExecution.mockImplementation(() => {
throw error;
});
test('should update execution when saving progress is enabled', async () => {
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith('some-execution-id', {
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
test('should handle DB errors when updating the execution', async () => {
const error = new Error('Something went wrong');
executionRepository.findSingleExecution.mockResolvedValue({} as IExecutionResponse);
executionRepository.updateExistingExecution.mockImplementation(() => {
throw error;
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.findSingleExecution).toHaveBeenCalled();
expect(executionRepository.updateExistingExecution).toHaveBeenCalled();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
test('should not try to update finished executions', async () => {
executionRepository.findSingleExecution.mockResolvedValue({
finished: true,
} as IExecutionResponse);
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
});
test('should populate `.data` when it is missing', async () => {
const fullExecutionData = {} as IExecutionResponse;
executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData);
await saveExecutionProgress(...commonArgs);
expect(fullExecutionData).toEqual({
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}],
},
},
startData: {},
},
status: 'running',
});
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
'some-execution-id',
fullExecutionData,
);
expect(errorReporter.error).not.toHaveBeenCalled();
});
test('should augment `.data` if it already exists', async () => {
const fullExecutionData = {
data: {
startData: {},
resultData: {
runData: {
'My Node': [{}],
},
},
},
startData: {},
},
status: 'running',
} as unknown as IExecutionResponse;
executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData);
await saveExecutionProgress(...commonArgs);
expect(fullExecutionData).toEqual({
data: {
executionData: undefined,
resultData: {
lastNodeExecuted: 'My Node',
runData: {
'My Node': [{}, {}],
},
},
startData: {},
},
status: 'running',
});
expect(executionRepository.updateExistingExecution).toHaveBeenCalledWith(
'some-execution-id',
fullExecutionData,
);
});
expect(errorReporter.error).not.toHaveBeenCalled();
});
test('should set last executed node correctly', async () => {
const fullExecutionData = {
data: {
resultData: {
lastNodeExecuted: 'Another Node',
runData: {},
},
},
} as unknown as IExecutionResponse;
executionRepository.findSingleExecution.mockResolvedValue(fullExecutionData);
test('should report error on failure', async () => {
const error = new Error('Something went wrong');
await saveExecutionProgress(...commonArgs);
executionRepository.findSingleExecution.mockImplementation(() => {
throw error;
expect(fullExecutionData.data.resultData.lastNodeExecuted).toEqual('My Node');
});
await saveExecutionProgress(...commonArgs);
expect(executionRepository.updateExistingExecution).not.toHaveBeenCalled();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});

View file

@ -12,6 +12,8 @@ export async function saveExecutionProgress(
executionData: IRunExecutionData,
) {
const logger = Container.get(Logger);
const executionRepository = Container.get(ExecutionRepository);
const errorReporter = Container.get(ErrorReporter);
try {
logger.debug(`Save execution progress to database for execution ID ${executionId} `, {
@ -19,13 +21,10 @@ export async function saveExecutionProgress(
nodeName,
});
const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
const fullExecutionData = await executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
// Something went badly wrong if this happens.
@ -40,29 +39,22 @@ export async function saveExecutionProgress(
return;
}
if (fullExecutionData.data === undefined) {
fullExecutionData.data = {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
fullExecutionData.data ??= {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) {
// Append data if array exists
fullExecutionData.data.resultData.runData[nodeName].push(data);
} else {
// Initialize array and save data
fullExecutionData.data.resultData.runData[nodeName] = [data];
}
const { runData } = fullExecutionData.data.resultData;
(runData[nodeName] ??= []).push(data);
fullExecutionData.data.executionData = executionData.executionData;
@ -71,14 +63,11 @@ export async function saveExecutionProgress(
fullExecutionData.status = 'running';
await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
Container.get(ErrorReporter).error(error);
errorReporter.error(error);
// TODO: Improve in the future!
// Errors here might happen because of database access
// For busy machines, we may get "Database is locked" errors.
@ -86,11 +75,7 @@ export async function saveExecutionProgress(
// We do this to prevent crashes and executions ending in `unknown` state.
logger.error(
`Failed saving execution progress to database for execution ID ${executionId} (hookFunctionsPreExecute, nodeExecuteAfter)`,
{
...error,
executionId,
workflowId,
},
{ error, executionId, workflowId },
);
}
}