mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
fix(core): Make execution and its data creation atomic (#10276)
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
parent
c3e2e84065
commit
ae50bb95a8
|
@ -12,6 +12,7 @@ import {
|
|||
ExecutionCancelledError,
|
||||
sleep,
|
||||
} from 'n8n-workflow';
|
||||
import { strict as assert } from 'node:assert';
|
||||
|
||||
import type {
|
||||
ExecutionPayload,
|
||||
|
@ -74,9 +75,7 @@ export class ActiveExecutions {
|
|||
}
|
||||
|
||||
executionId = await this.executionRepository.createNewExecution(fullExecutionData);
|
||||
if (executionId === undefined) {
|
||||
throw new ApplicationError('There was an issue assigning an execution id to the execution');
|
||||
}
|
||||
assert(executionId);
|
||||
|
||||
await this.concurrencyControl.throttle({ mode, executionId });
|
||||
executionStatus = 'running';
|
||||
|
|
|
@ -270,17 +270,27 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
|||
return rest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a new execution and its execution data using a transaction.
|
||||
*/
|
||||
async createNewExecution(execution: ExecutionPayload): Promise<string> {
|
||||
const { data, workflowData, ...rest } = execution;
|
||||
const { identifiers: inserted } = await this.insert(rest);
|
||||
const { id: executionId } = inserted[0] as { id: string };
|
||||
const { connections, nodes, name, settings } = workflowData ?? {};
|
||||
await this.executionDataRepository.insert({
|
||||
executionId,
|
||||
workflowData: { connections, nodes, name, settings, id: workflowData.id },
|
||||
data: stringify(data),
|
||||
return await this.manager.transaction(async (transactionManager) => {
|
||||
const { data, workflowData, ...rest } = execution;
|
||||
const insertResult = await transactionManager.insert(ExecutionEntity, rest);
|
||||
const { id: executionId } = insertResult.identifiers[0] as { id: string };
|
||||
|
||||
const { connections, nodes, name, settings } = workflowData ?? {};
|
||||
await this.executionDataRepository.createExecutionDataForExecution(
|
||||
{
|
||||
executionId,
|
||||
workflowData: { connections, nodes, name, settings, id: workflowData.id },
|
||||
data: stringify(data),
|
||||
},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
return String(executionId);
|
||||
});
|
||||
return String(executionId);
|
||||
}
|
||||
|
||||
async markAsCrashed(executionIds: string | string[]) {
|
||||
|
|
|
@ -1,13 +1,32 @@
|
|||
import { Service } from 'typedi';
|
||||
import type { EntityManager } from '@n8n/typeorm';
|
||||
import type { IWorkflowBase } from 'n8n-workflow';
|
||||
import { DataSource, In, Repository } from '@n8n/typeorm';
|
||||
import { ExecutionData } from '../entities/ExecutionData';
|
||||
|
||||
export interface CreateExecutionDataOpts extends Pick<ExecutionData, 'data' | 'executionId'> {
|
||||
workflowData: Pick<IWorkflowBase, 'connections' | 'nodes' | 'name' | 'settings' | 'id'>;
|
||||
}
|
||||
|
||||
@Service()
|
||||
export class ExecutionDataRepository extends Repository<ExecutionData> {
|
||||
constructor(dataSource: DataSource) {
|
||||
super(ExecutionData, dataSource.manager);
|
||||
}
|
||||
|
||||
async createExecutionDataForExecution(
|
||||
executionData: CreateExecutionDataOpts,
|
||||
transactionManager: EntityManager,
|
||||
) {
|
||||
const { data, executionId, workflowData } = executionData;
|
||||
|
||||
return await transactionManager.insert(ExecutionData, {
|
||||
executionId,
|
||||
data,
|
||||
workflowData,
|
||||
});
|
||||
}
|
||||
|
||||
async findByExecutionIds(executionIds: string[]) {
|
||||
return await this.find({
|
||||
select: ['workflowData'],
|
||||
|
|
|
@ -52,5 +52,34 @@ describe('ExecutionRepository', () => {
|
|||
});
|
||||
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
|
||||
});
|
||||
|
||||
it('should not create execution if execution data insert fails', async () => {
|
||||
const executionRepo = Container.get(ExecutionRepository);
|
||||
const executionDataRepo = Container.get(ExecutionDataRepository);
|
||||
|
||||
const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
|
||||
jest
|
||||
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
|
||||
.mockRejectedValueOnce(new Error());
|
||||
|
||||
await expect(
|
||||
async () =>
|
||||
await executionRepo.createNewExecution({
|
||||
workflowId: workflow.id,
|
||||
data: {
|
||||
//@ts-expect-error This is not needed for tests
|
||||
resultData: {},
|
||||
},
|
||||
workflowData: workflow,
|
||||
mode: 'manual',
|
||||
startedAt: new Date(),
|
||||
status: 'new',
|
||||
finished: false,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
|
||||
const executionEntities = await executionRepo.find();
|
||||
expect(executionEntities).toBeEmptyArray();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue