mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Update execution entity and execution data in transaction (#12756)
This commit is contained in:
parent
3d9d5bf9d5
commit
1f43181360
|
@ -107,7 +107,7 @@ class MysqlConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Config
|
@Config
|
||||||
class SqliteConfig {
|
export class SqliteConfig {
|
||||||
/** SQLite database file name */
|
/** SQLite database file name */
|
||||||
@Env('DB_SQLITE_DATABASE')
|
@Env('DB_SQLITE_DATABASE')
|
||||||
database: string = 'database.sqlite';
|
database: string = 'database.sqlite';
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
import type { SqliteConfig } from '@n8n/config/src/configs/database.config';
|
||||||
import { Container } from '@n8n/di';
|
import { Container } from '@n8n/di';
|
||||||
import type { SelectQueryBuilder } from '@n8n/typeorm';
|
import type { SelectQueryBuilder } from '@n8n/typeorm';
|
||||||
import { Not, LessThanOrEqual } from '@n8n/typeorm';
|
import { Not, LessThanOrEqual } from '@n8n/typeorm';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
|
import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
|
|
||||||
import { ExecutionEntity } from '@/databases/entities/execution-entity';
|
import { ExecutionEntity } from '@/databases/entities/execution-entity';
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
|
import type { IExecutionResponse } from '@/interfaces';
|
||||||
import { mockInstance, mockEntityManager } from '@test/mocking';
|
import { mockInstance, mockEntityManager } from '@test/mocking';
|
||||||
|
|
||||||
describe('ExecutionRepository', () => {
|
describe('ExecutionRepository', () => {
|
||||||
|
@ -68,4 +71,39 @@ describe('ExecutionRepository', () => {
|
||||||
expect(binaryDataService.deleteMany).toHaveBeenCalledWith([{ executionId: '1', workflowId }]);
|
expect(binaryDataService.deleteMany).toHaveBeenCalledWith([{ executionId: '1', workflowId }]);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('updateExistingExecution', () => {
|
||||||
|
test.each(['sqlite', 'postgresdb', 'mysqldb'] as const)(
|
||||||
|
'should update execution and data in transaction on %s',
|
||||||
|
async (dbType) => {
|
||||||
|
globalConfig.database.type = dbType;
|
||||||
|
globalConfig.database.sqlite = mock<SqliteConfig>({ poolSize: 1 });
|
||||||
|
|
||||||
|
const executionId = '1';
|
||||||
|
const execution = mock<IExecutionResponse>({
|
||||||
|
id: executionId,
|
||||||
|
data: mock<IRunExecutionData>(),
|
||||||
|
workflowData: mock<IWorkflowBase>(),
|
||||||
|
status: 'success',
|
||||||
|
});
|
||||||
|
|
||||||
|
const txCallback = jest.fn();
|
||||||
|
entityManager.transaction.mockImplementation(async (cb) => {
|
||||||
|
// @ts-expect-error Mock
|
||||||
|
await cb(entityManager);
|
||||||
|
txCallback();
|
||||||
|
});
|
||||||
|
|
||||||
|
await executionRepository.updateExistingExecution(executionId, execution);
|
||||||
|
|
||||||
|
expect(entityManager.transaction).toHaveBeenCalled();
|
||||||
|
expect(entityManager.update).toHaveBeenCalledWith(
|
||||||
|
ExecutionEntity,
|
||||||
|
{ id: executionId },
|
||||||
|
expect.objectContaining({ status: 'success' }),
|
||||||
|
);
|
||||||
|
expect(txCallback).toHaveBeenCalledTimes(1);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -45,7 +45,7 @@ import type {
|
||||||
import { separate } from '@/utils';
|
import { separate } from '@/utils';
|
||||||
|
|
||||||
import { ExecutionDataRepository } from './execution-data.repository';
|
import { ExecutionDataRepository } from './execution-data.repository';
|
||||||
import type { ExecutionData } from '../entities/execution-data';
|
import { ExecutionData } from '../entities/execution-data';
|
||||||
import { ExecutionEntity } from '../entities/execution-entity';
|
import { ExecutionEntity } from '../entities/execution-entity';
|
||||||
import { ExecutionMetadata } from '../entities/execution-metadata';
|
import { ExecutionMetadata } from '../entities/execution-metadata';
|
||||||
import { SharedWorkflow } from '../entities/shared-workflow';
|
import { SharedWorkflow } from '../entities/shared-workflow';
|
||||||
|
@ -387,21 +387,42 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
customData,
|
customData,
|
||||||
...executionInformation
|
...executionInformation
|
||||||
} = execution;
|
} = execution;
|
||||||
if (Object.keys(executionInformation).length > 0) {
|
|
||||||
await this.update({ id: executionId }, executionInformation);
|
const executionData: Partial<ExecutionData> = {};
|
||||||
|
|
||||||
|
if (workflowData) executionData.workflowData = workflowData;
|
||||||
|
if (data) executionData.data = stringify(data);
|
||||||
|
|
||||||
|
const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database;
|
||||||
|
|
||||||
|
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) {
|
||||||
|
// TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped.
|
||||||
|
// In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database
|
||||||
|
|
||||||
|
if (Object.keys(executionInformation).length > 0) {
|
||||||
|
await this.update({ id: executionId }, executionInformation);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Object.keys(executionData).length > 0) {
|
||||||
|
// @ts-expect-error Fix typing
|
||||||
|
await this.executionDataRepository.update({ executionId }, executionData);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data || workflowData) {
|
// All other database drivers should update executions and execution-data atomically
|
||||||
const executionData: Partial<ExecutionData> = {};
|
|
||||||
if (workflowData) {
|
await this.manager.transaction(async (tx) => {
|
||||||
executionData.workflowData = workflowData;
|
if (Object.keys(executionInformation).length > 0) {
|
||||||
|
await tx.update(ExecutionEntity, { id: executionId }, executionInformation);
|
||||||
}
|
}
|
||||||
if (data) {
|
|
||||||
executionData.data = stringify(data);
|
if (Object.keys(executionData).length > 0) {
|
||||||
|
// @ts-expect-error Fix typing
|
||||||
|
await tx.update(ExecutionData, { executionId }, executionData);
|
||||||
}
|
}
|
||||||
// @ts-ignore
|
});
|
||||||
await this.executionDataRepository.update({ executionId }, executionData);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteExecutionsByFilter(
|
async deleteExecutionsByFilter(
|
||||||
|
|
Loading…
Reference in a new issue