mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
fix(core): Make execution and its data creation atomic (#11392)
Some checks failed
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Has been cancelled
Some checks failed
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Benchmark Docker Image CI / build (push) Has been cancelled
This commit is contained in:
parent
0d0bd2932e
commit
ed30d43236
|
@ -1,4 +1,6 @@
|
||||||
import { DataSource, In, Repository } from '@n8n/typeorm';
|
import { DataSource, In, Repository } from '@n8n/typeorm';
|
||||||
|
import type { EntityManager } from '@n8n/typeorm';
|
||||||
|
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
import { ExecutionData } from '../entities/execution-data';
|
import { ExecutionData } from '../entities/execution-data';
|
||||||
|
@ -9,6 +11,13 @@ export class ExecutionDataRepository extends Repository<ExecutionData> {
|
||||||
super(ExecutionData, dataSource.manager);
|
super(ExecutionData, dataSource.manager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async createExecutionDataForExecution(
|
||||||
|
data: QueryDeepPartialEntity<ExecutionData>,
|
||||||
|
transactionManager: EntityManager,
|
||||||
|
) {
|
||||||
|
return await transactionManager.insert(ExecutionData, data);
|
||||||
|
}
|
||||||
|
|
||||||
async findByExecutionIds(executionIds: string[]) {
|
async findByExecutionIds(executionIds: string[]) {
|
||||||
return await this.find({
|
return await this.find({
|
||||||
select: ['workflowData'],
|
select: ['workflowData'],
|
||||||
|
|
|
@ -304,16 +304,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
* Insert a new execution and its execution data using a transaction.
|
* Insert a new execution and its execution data using a transaction.
|
||||||
*/
|
*/
|
||||||
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
|
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
|
||||||
const { data, workflowData, ...rest } = execution;
|
const { data: dataObj, workflowData: currentWorkflow, ...rest } = execution;
|
||||||
|
const { connections, nodes, name, settings } = currentWorkflow ?? {};
|
||||||
|
const workflowData = { connections, nodes, name, settings, id: currentWorkflow.id };
|
||||||
|
const data = stringify(dataObj);
|
||||||
|
|
||||||
|
const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database;
|
||||||
|
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) {
|
||||||
|
// TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped.
|
||||||
|
// In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database
|
||||||
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
|
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
|
||||||
const { id: executionId } = inserted[0] as { id: string };
|
const { id: executionId } = inserted[0] as { id: string };
|
||||||
const { connections, nodes, name, settings } = workflowData ?? {};
|
await this.executionDataRepository.insert({ executionId, workflowData, data });
|
||||||
await this.executionDataRepository.insert({
|
|
||||||
executionId,
|
|
||||||
workflowData: { connections, nodes, name, settings, id: workflowData.id },
|
|
||||||
data: stringify(data),
|
|
||||||
});
|
|
||||||
return String(executionId);
|
return String(executionId);
|
||||||
|
} else {
|
||||||
|
// All other database drivers should create executions and execution-data atomically
|
||||||
|
return await this.manager.transaction(async (transactionManager) => {
|
||||||
|
const { identifiers: inserted } = await transactionManager.insert(ExecutionEntity, {
|
||||||
|
...rest,
|
||||||
|
createdAt: new Date(),
|
||||||
|
});
|
||||||
|
const { id: executionId } = inserted[0] as { id: string };
|
||||||
|
await this.executionDataRepository.createExecutionDataForExecution(
|
||||||
|
{ executionId, workflowData, data },
|
||||||
|
transactionManager,
|
||||||
|
);
|
||||||
|
return String(executionId);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async markAsCrashed(executionIds: string | string[]) {
|
async markAsCrashed(executionIds: string | string[]) {
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import Container from 'typedi';
|
import Container from 'typedi';
|
||||||
|
|
||||||
import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository';
|
import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository';
|
||||||
|
@ -54,5 +55,38 @@ describe('ExecutionRepository', () => {
|
||||||
});
|
});
|
||||||
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
|
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should not create execution if execution data insert fails', async () => {
|
||||||
|
const { type: dbType, sqlite: sqliteConfig } = Container.get(GlobalConfig).database;
|
||||||
|
// Do not run this test for the legacy sqlite driver
|
||||||
|
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) return;
|
||||||
|
|
||||||
|
const executionRepo = Container.get(ExecutionRepository);
|
||||||
|
const executionDataRepo = Container.get(ExecutionDataRepository);
|
||||||
|
|
||||||
|
const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
|
||||||
|
jest
|
||||||
|
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
|
||||||
|
.mockRejectedValueOnce(new Error());
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
async () =>
|
||||||
|
await executionRepo.createNewExecution({
|
||||||
|
workflowId: workflow.id,
|
||||||
|
data: {
|
||||||
|
//@ts-expect-error This is not needed for tests
|
||||||
|
resultData: {},
|
||||||
|
},
|
||||||
|
workflowData: workflow,
|
||||||
|
mode: 'manual',
|
||||||
|
startedAt: new Date(),
|
||||||
|
status: 'new',
|
||||||
|
finished: false,
|
||||||
|
}),
|
||||||
|
).rejects.toThrow();
|
||||||
|
|
||||||
|
const executionEntities = await executionRepo.find();
|
||||||
|
expect(executionEntities).toBeEmptyArray();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue