diff --git a/packages/cli/src/databases/repositories/execution-data.repository.ts b/packages/cli/src/databases/repositories/execution-data.repository.ts index 7f54a6f214..f7de742941 100644 --- a/packages/cli/src/databases/repositories/execution-data.repository.ts +++ b/packages/cli/src/databases/repositories/execution-data.repository.ts @@ -1,4 +1,6 @@ import { DataSource, In, Repository } from '@n8n/typeorm'; +import type { EntityManager } from '@n8n/typeorm'; +import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity'; import { Service } from 'typedi'; import { ExecutionData } from '../entities/execution-data'; @@ -9,6 +11,13 @@ export class ExecutionDataRepository extends Repository { super(ExecutionData, dataSource.manager); } + async createExecutionDataForExecution( + data: QueryDeepPartialEntity, + transactionManager: EntityManager, + ) { + return await transactionManager.insert(ExecutionData, data); + } + async findByExecutionIds(executionIds: string[]) { return await this.find({ select: ['workflowData'], diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 7b26463969..ce4bedac2a 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -304,16 +304,34 @@ export class ExecutionRepository extends Repository { * Insert a new execution and its execution data using a transaction. */ async createNewExecution(execution: CreateExecutionPayload): Promise { - const { data, workflowData, ...rest } = execution; - const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() }); - const { id: executionId } = inserted[0] as { id: string }; - const { connections, nodes, name, settings } = workflowData ?? {}; - await this.executionDataRepository.insert({ - executionId, - workflowData: { connections, nodes, name, settings, id: workflowData.id }, - data: stringify(data), - }); - return String(executionId); + const { data: dataObj, workflowData: currentWorkflow, ...rest } = execution; + const { connections, nodes, name, settings } = currentWorkflow ?? {}; + const workflowData = { connections, nodes, name, settings, id: currentWorkflow.id }; + const data = stringify(dataObj); + + const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database; + if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) { + // TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped. + // In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database + const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() }); + const { id: executionId } = inserted[0] as { id: string }; + await this.executionDataRepository.insert({ executionId, workflowData, data }); + return String(executionId); + } else { + // All other database drivers should create executions and execution-data atomically + return await this.manager.transaction(async (transactionManager) => { + const { identifiers: inserted } = await transactionManager.insert(ExecutionEntity, { + ...rest, + createdAt: new Date(), + }); + const { id: executionId } = inserted[0] as { id: string }; + await this.executionDataRepository.createExecutionDataForExecution( + { executionId, workflowData, data }, + transactionManager, + ); + return String(executionId); + }); + } } async markAsCrashed(executionIds: string | string[]) { diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index 52884bd3e6..1b50415686 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -1,3 +1,4 @@ +import { GlobalConfig } from '@n8n/config'; import Container from 'typedi'; import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository'; @@ -54,5 +55,38 @@ describe('ExecutionRepository', () => { }); expect(executionData?.data).toEqual('[{"resultData":"1"},{}]'); }); + + it('should not create execution if execution data insert fails', async () => { + const { type: dbType, sqlite: sqliteConfig } = Container.get(GlobalConfig).database; + // Do not run this test for the legacy sqlite driver + if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) return; + + const executionRepo = Container.get(ExecutionRepository); + const executionDataRepo = Container.get(ExecutionDataRepository); + + const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } }); + jest + .spyOn(executionDataRepo, 'createExecutionDataForExecution') + .mockRejectedValueOnce(new Error()); + + await expect( + async () => + await executionRepo.createNewExecution({ + workflowId: workflow.id, + data: { + //@ts-expect-error This is not needed for tests + resultData: {}, + }, + workflowData: workflow, + mode: 'manual', + startedAt: new Date(), + status: 'new', + finished: false, + }), + ).rejects.toThrow(); + + const executionEntities = await executionRepo.find(); + expect(executionEntities).toBeEmptyArray(); + }); }); });