From ae50bb95a8e5bf1cdbf9483da54b84094b82e260 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:46:35 +0300 Subject: [PATCH] fix(core): Make execution and its data creation atomic (#10276) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/cli/src/ActiveExecutions.ts | 5 ++-- .../repositories/execution.repository.ts | 28 ++++++++++++------ .../repositories/executionData.repository.ts | 19 ++++++++++++ .../repositories/execution.repository.test.ts | 29 +++++++++++++++++++ 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 97313d5cb2..c1a6e8ffd6 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -12,6 +12,7 @@ import { ExecutionCancelledError, sleep, } from 'n8n-workflow'; +import { strict as assert } from 'node:assert'; import type { ExecutionPayload, @@ -74,9 +75,7 @@ export class ActiveExecutions { } executionId = await this.executionRepository.createNewExecution(fullExecutionData); - if (executionId === undefined) { - throw new ApplicationError('There was an issue assigning an execution id to the execution'); - } + assert(executionId); await this.concurrencyControl.throttle({ mode, executionId }); executionStatus = 'running'; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 1ebb22d8eb..a8605147aa 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -270,17 +270,27 @@ export class ExecutionRepository extends Repository { return rest; } + /** + * Insert a new execution and its execution data using a transaction. + */ async createNewExecution(execution: ExecutionPayload): Promise { - const { data, workflowData, ...rest } = execution; - const { identifiers: inserted } = await this.insert(rest); - const { id: executionId } = inserted[0] as { id: string }; - const { connections, nodes, name, settings } = workflowData ?? {}; - await this.executionDataRepository.insert({ - executionId, - workflowData: { connections, nodes, name, settings, id: workflowData.id }, - data: stringify(data), + return await this.manager.transaction(async (transactionManager) => { + const { data, workflowData, ...rest } = execution; + const insertResult = await transactionManager.insert(ExecutionEntity, rest); + const { id: executionId } = insertResult.identifiers[0] as { id: string }; + + const { connections, nodes, name, settings } = workflowData ?? {}; + await this.executionDataRepository.createExecutionDataForExecution( + { + executionId, + workflowData: { connections, nodes, name, settings, id: workflowData.id }, + data: stringify(data), + }, + transactionManager, + ); + + return String(executionId); }); - return String(executionId); } async markAsCrashed(executionIds: string | string[]) { diff --git a/packages/cli/src/databases/repositories/executionData.repository.ts b/packages/cli/src/databases/repositories/executionData.repository.ts index 5872f9888c..013453d998 100644 --- a/packages/cli/src/databases/repositories/executionData.repository.ts +++ b/packages/cli/src/databases/repositories/executionData.repository.ts @@ -1,13 +1,32 @@ import { Service } from 'typedi'; +import type { EntityManager } from '@n8n/typeorm'; +import type { IWorkflowBase } from 'n8n-workflow'; import { DataSource, In, Repository } from '@n8n/typeorm'; import { ExecutionData } from '../entities/ExecutionData'; +export interface CreateExecutionDataOpts extends Pick { + workflowData: Pick; +} + @Service() export class ExecutionDataRepository extends Repository { constructor(dataSource: DataSource) { super(ExecutionData, dataSource.manager); } + async createExecutionDataForExecution( + executionData: CreateExecutionDataOpts, + transactionManager: EntityManager, + ) { + const { data, executionId, workflowData } = executionData; + + return await transactionManager.insert(ExecutionData, { + executionId, + data, + workflowData, + }); + } + async findByExecutionIds(executionIds: string[]) { return await this.find({ select: ['workflowData'], diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index cfb897d627..e777f62429 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -52,5 +52,34 @@ describe('ExecutionRepository', () => { }); expect(executionData?.data).toEqual('[{"resultData":"1"},{}]'); }); + + it('should not create execution if execution data insert fails', async () => { + const executionRepo = Container.get(ExecutionRepository); + const executionDataRepo = Container.get(ExecutionDataRepository); + + const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } }); + jest + .spyOn(executionDataRepo, 'createExecutionDataForExecution') + .mockRejectedValueOnce(new Error()); + + await expect( + async () => + await executionRepo.createNewExecution({ + workflowId: workflow.id, + data: { + //@ts-expect-error This is not needed for tests + resultData: {}, + }, + workflowData: workflow, + mode: 'manual', + startedAt: new Date(), + status: 'new', + finished: false, + }), + ).rejects.toThrow(); + + const executionEntities = await executionRepo.find(); + expect(executionEntities).toBeEmptyArray(); + }); }); });