fix(core): Make execution and its data creation atomic (#11392)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-10-24 16:07:26 +02:00 committed by Iván Ovejero
parent 343eb53116
commit 480987e4ab
No known key found for this signature in database
3 changed files with 71 additions and 10 deletions

View file

@ -1,4 +1,6 @@
import { DataSource, In, Repository } from '@n8n/typeorm'; import { DataSource, In, Repository } from '@n8n/typeorm';
import type { EntityManager } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { Service } from 'typedi'; import { Service } from 'typedi';
import { ExecutionData } from '../entities/execution-data'; import { ExecutionData } from '../entities/execution-data';
@ -9,6 +11,13 @@ export class ExecutionDataRepository extends Repository<ExecutionData> {
super(ExecutionData, dataSource.manager); super(ExecutionData, dataSource.manager);
} }
async createExecutionDataForExecution(
data: QueryDeepPartialEntity<ExecutionData>,
transactionManager: EntityManager,
) {
return await transactionManager.insert(ExecutionData, data);
}
async findByExecutionIds(executionIds: string[]) { async findByExecutionIds(executionIds: string[]) {
return await this.find({ return await this.find({
select: ['workflowData'], select: ['workflowData'],

View file

@ -304,16 +304,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
* Insert a new execution and its execution data using a transaction. * Insert a new execution and its execution data using a transaction.
*/ */
async createNewExecution(execution: CreateExecutionPayload): Promise<string> { async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution; const { data: dataObj, workflowData: currentWorkflow, ...rest } = execution;
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() }); const { connections, nodes, name, settings } = currentWorkflow ?? {};
const { id: executionId } = inserted[0] as { id: string }; const workflowData = { connections, nodes, name, settings, id: currentWorkflow.id };
const { connections, nodes, name, settings } = workflowData ?? {}; const data = stringify(dataObj);
await this.executionDataRepository.insert({
executionId, const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database;
workflowData: { connections, nodes, name, settings, id: workflowData.id }, if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) {
data: stringify(data), // TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped.
}); // In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database
return String(executionId); const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.insert({ executionId, workflowData, data });
return String(executionId);
} else {
// All other database drivers should create executions and execution-data atomically
return await this.manager.transaction(async (transactionManager) => {
const { identifiers: inserted } = await transactionManager.insert(ExecutionEntity, {
...rest,
createdAt: new Date(),
});
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.createExecutionDataForExecution(
{ executionId, workflowData, data },
transactionManager,
);
return String(executionId);
});
}
} }
async markAsCrashed(executionIds: string | string[]) { async markAsCrashed(executionIds: string | string[]) {

View file

@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import Container from 'typedi'; import Container from 'typedi';
import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository'; import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository';
@ -54,5 +55,38 @@ describe('ExecutionRepository', () => {
}); });
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]'); expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
}); });
it('should not create execution if execution data insert fails', async () => {
const { type: dbType, sqlite: sqliteConfig } = Container.get(GlobalConfig).database;
// Do not run this test for the legacy sqlite driver
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) return;
const executionRepo = Container.get(ExecutionRepository);
const executionDataRepo = Container.get(ExecutionDataRepository);
const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
jest
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
.mockRejectedValueOnce(new Error());
await expect(
async () =>
await executionRepo.createNewExecution({
workflowId: workflow.id,
data: {
//@ts-expect-error This is not needed for tests
resultData: {},
},
workflowData: workflow,
mode: 'manual',
startedAt: new Date(),
status: 'new',
finished: false,
}),
).rejects.toThrow();
const executionEntities = await executionRepo.find();
expect(executionEntities).toBeEmptyArray();
});
}); });
}); });