From 1f4318136011bffaad04527790a9eba79effce35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 23 Jan 2025 10:16:17 +0100 Subject: [PATCH] fix(core): Update execution entity and execution data in transaction (#12756) --- .../config/src/configs/database.config.ts | 2 +- .../__tests__/execution.repository.test.ts | 38 ++++++++++++++++ .../repositories/execution.repository.ts | 45 ++++++++++++++----- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/packages/@n8n/config/src/configs/database.config.ts b/packages/@n8n/config/src/configs/database.config.ts index cebbf2191a..dc8bdde98d 100644 --- a/packages/@n8n/config/src/configs/database.config.ts +++ b/packages/@n8n/config/src/configs/database.config.ts @@ -107,7 +107,7 @@ class MysqlConfig { } @Config -class SqliteConfig { +export class SqliteConfig { /** SQLite database file name */ @Env('DB_SQLITE_DATABASE') database: string = 'database.sqlite'; diff --git a/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts b/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts index 8e36f0189b..a6195b4f67 100644 --- a/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts +++ b/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts @@ -1,13 +1,16 @@ import { GlobalConfig } from '@n8n/config'; +import type { SqliteConfig } from '@n8n/config/src/configs/database.config'; import { Container } from '@n8n/di'; import type { SelectQueryBuilder } from '@n8n/typeorm'; import { Not, LessThanOrEqual } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; import { BinaryDataService } from 'n8n-core'; +import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { ExecutionEntity } from '@/databases/entities/execution-entity'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { IExecutionResponse } from '@/interfaces'; import { mockInstance, mockEntityManager } from '@test/mocking'; describe('ExecutionRepository', () => { @@ -68,4 +71,39 @@ describe('ExecutionRepository', () => { expect(binaryDataService.deleteMany).toHaveBeenCalledWith([{ executionId: '1', workflowId }]); }); }); + + describe('updateExistingExecution', () => { + test.each(['sqlite', 'postgresdb', 'mysqldb'] as const)( + 'should update execution and data in transaction on %s', + async (dbType) => { + globalConfig.database.type = dbType; + globalConfig.database.sqlite = mock({ poolSize: 1 }); + + const executionId = '1'; + const execution = mock({ + id: executionId, + data: mock(), + workflowData: mock(), + status: 'success', + }); + + const txCallback = jest.fn(); + entityManager.transaction.mockImplementation(async (cb) => { + // @ts-expect-error Mock + await cb(entityManager); + txCallback(); + }); + + await executionRepository.updateExistingExecution(executionId, execution); + + expect(entityManager.transaction).toHaveBeenCalled(); + expect(entityManager.update).toHaveBeenCalledWith( + ExecutionEntity, + { id: executionId }, + expect.objectContaining({ status: 'success' }), + ); + expect(txCallback).toHaveBeenCalledTimes(1); + }, + ); + }); }); diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 975ab601b1..9c24cea5c2 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -45,7 +45,7 @@ import type { import { separate } from '@/utils'; import { ExecutionDataRepository } from './execution-data.repository'; -import type { ExecutionData } from '../entities/execution-data'; +import { ExecutionData } from '../entities/execution-data'; import { ExecutionEntity } from '../entities/execution-entity'; import { ExecutionMetadata } from '../entities/execution-metadata'; import { SharedWorkflow } from '../entities/shared-workflow'; @@ -387,21 +387,42 @@ export class ExecutionRepository extends Repository { customData, ...executionInformation } = execution; - if (Object.keys(executionInformation).length > 0) { - await this.update({ id: executionId }, executionInformation); + + const executionData: Partial = {}; + + if (workflowData) executionData.workflowData = workflowData; + if (data) executionData.data = stringify(data); + + const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database; + + if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) { + // TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped. + // In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database + + if (Object.keys(executionInformation).length > 0) { + await this.update({ id: executionId }, executionInformation); + } + + if (Object.keys(executionData).length > 0) { + // @ts-expect-error Fix typing + await this.executionDataRepository.update({ executionId }, executionData); + } + + return; } - if (data || workflowData) { - const executionData: Partial = {}; - if (workflowData) { - executionData.workflowData = workflowData; + // All other database drivers should update executions and execution-data atomically + + await this.manager.transaction(async (tx) => { + if (Object.keys(executionInformation).length > 0) { + await tx.update(ExecutionEntity, { id: executionId }, executionInformation); } - if (data) { - executionData.data = stringify(data); + + if (Object.keys(executionData).length > 0) { + // @ts-expect-error Fix typing + await tx.update(ExecutionData, { executionId }, executionData); } - // @ts-ignore - await this.executionDataRepository.update({ executionId }, executionData); - } + }); } async deleteExecutionsByFilter(