mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
fix(core): Prevent multiple values in the execution metadata for the same key and executionId (#9953)
This commit is contained in:
parent
3a179439c7
commit
2e6b03b2cb
|
@ -13,6 +13,8 @@ export class Column {
|
|||
|
||||
private defaultValue: unknown;
|
||||
|
||||
private primaryKeyConstraintName: string | undefined;
|
||||
|
||||
constructor(private name: string) {}
|
||||
|
||||
get bool() {
|
||||
|
@ -57,6 +59,12 @@ export class Column {
|
|||
return this;
|
||||
}
|
||||
|
||||
primaryWithName(name?: string) {
|
||||
this.isPrimary = true;
|
||||
this.primaryKeyConstraintName = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
get notNull() {
|
||||
this.isNullable = false;
|
||||
return this;
|
||||
|
@ -74,12 +82,14 @@ export class Column {
|
|||
|
||||
// eslint-disable-next-line complexity
|
||||
toOptions(driver: Driver): TableColumnOptions {
|
||||
const { name, type, isNullable, isPrimary, isGenerated, length } = this;
|
||||
const { name, type, isNullable, isPrimary, isGenerated, length, primaryKeyConstraintName } =
|
||||
this;
|
||||
const isMysql = 'mysql' in driver;
|
||||
const isPostgres = 'postgres' in driver;
|
||||
const isSqlite = 'sqlite' in driver;
|
||||
|
||||
const options: TableColumnOptions = {
|
||||
primaryKeyConstraintName,
|
||||
name,
|
||||
isNullable,
|
||||
isPrimary,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn, RelationId } from '@n8n/typeorm';
|
||||
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn } from '@n8n/typeorm';
|
||||
import { ExecutionEntity } from './ExecutionEntity';
|
||||
|
||||
@Entity()
|
||||
|
@ -11,8 +11,8 @@ export class ExecutionMetadata {
|
|||
})
|
||||
execution: ExecutionEntity;
|
||||
|
||||
@RelationId((executionMetadata: ExecutionMetadata) => executionMetadata.execution)
|
||||
executionId: number;
|
||||
@Column()
|
||||
executionId: string;
|
||||
|
||||
@Column('text')
|
||||
key: string;
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
import type { MigrationContext, ReversibleMigration } from '@db/types';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
export class AddConstraintToExecutionMetadata1720101653148 implements ReversibleMigration {
|
||||
async up(context: MigrationContext) {
|
||||
const { createTable, dropTable, column } = context.schemaBuilder;
|
||||
const { escape } = context;
|
||||
|
||||
const executionMetadataTableRaw = 'execution_metadata';
|
||||
const executionMetadataTable = escape.tableName(executionMetadataTableRaw);
|
||||
const executionMetadataTableTempRaw = 'execution_metadata_temp';
|
||||
const executionMetadataTableTemp = escape.tableName(executionMetadataTableTempRaw);
|
||||
const id = escape.columnName('id');
|
||||
const executionId = escape.columnName('executionId');
|
||||
const key = escape.columnName('key');
|
||||
const value = escape.columnName('value');
|
||||
|
||||
await createTable(executionMetadataTableTempRaw)
|
||||
.withColumns(
|
||||
column('id').int.notNull.primary.autoGenerate,
|
||||
column('executionId').int.notNull,
|
||||
// NOTE: This is a varchar(255) instead of text, because a unique index
|
||||
// on text is not supported on mysql, also why should we support
|
||||
// arbitrary length keys?
|
||||
column('key').varchar(255).notNull,
|
||||
column('value').text.notNull,
|
||||
)
|
||||
.withForeignKey('executionId', {
|
||||
tableName: 'execution_entity',
|
||||
columnName: 'id',
|
||||
onDelete: 'CASCADE',
|
||||
// In MySQL foreignKey names must be unique across all tables and
|
||||
// TypeORM creates predictable names based on the columnName.
|
||||
// So the temp table's foreignKey clashes with the current table's.
|
||||
name: context.isMysql ? nanoid() : undefined,
|
||||
})
|
||||
.withIndexOn(['executionId', 'key'], true);
|
||||
|
||||
if (context.isMysql) {
|
||||
await context.runQuery(`
|
||||
INSERT INTO ${executionMetadataTableTemp} (${id}, ${executionId}, ${key}, ${value})
|
||||
SELECT MAX(${id}) as ${id}, ${executionId}, ${key}, MAX(${value})
|
||||
FROM ${executionMetadataTable}
|
||||
GROUP BY ${executionId}, ${key}
|
||||
ON DUPLICATE KEY UPDATE
|
||||
id = IF(VALUES(${id}) > ${executionMetadataTableTemp}.${id}, VALUES(${id}), ${executionMetadataTableTemp}.${id}),
|
||||
value = IF(VALUES(${id}) > ${executionMetadataTableTemp}.${id}, VALUES(${value}), ${executionMetadataTableTemp}.${value});
|
||||
`);
|
||||
} else {
|
||||
await context.runQuery(`
|
||||
INSERT INTO ${executionMetadataTableTemp} (${id}, ${executionId}, ${key}, ${value})
|
||||
SELECT MAX(${id}) as ${id}, ${executionId}, ${key}, MAX(${value})
|
||||
FROM ${executionMetadataTable}
|
||||
GROUP BY ${executionId}, ${key}
|
||||
ON CONFLICT (${executionId}, ${key}) DO UPDATE SET
|
||||
id = EXCLUDED.id,
|
||||
value = EXCLUDED.value
|
||||
WHERE EXCLUDED.id > ${executionMetadataTableTemp}.id;
|
||||
`);
|
||||
}
|
||||
|
||||
await dropTable(executionMetadataTableRaw);
|
||||
await context.runQuery(
|
||||
`ALTER TABLE ${executionMetadataTableTemp} RENAME TO ${executionMetadataTable};`,
|
||||
);
|
||||
}
|
||||
|
||||
async down(context: MigrationContext) {
|
||||
const { createTable, dropTable, column } = context.schemaBuilder;
|
||||
const { escape } = context;
|
||||
|
||||
const executionMetadataTableRaw = 'execution_metadata';
|
||||
const executionMetadataTable = escape.tableName(executionMetadataTableRaw);
|
||||
const executionMetadataTableTempRaw = 'execution_metadata_temp';
|
||||
const executionMetadataTableTemp = escape.tableName(executionMetadataTableTempRaw);
|
||||
const id = escape.columnName('id');
|
||||
const executionId = escape.columnName('executionId');
|
||||
const key = escape.columnName('key');
|
||||
const value = escape.columnName('value');
|
||||
|
||||
await createTable(executionMetadataTableTempRaw)
|
||||
.withColumns(
|
||||
// INFO: The PK names that TypeORM creates are predictable and thus it
|
||||
// will create a PK name which already exists in the current
|
||||
// execution_metadata table. That's why we have to randomize the PK name
|
||||
// here.
|
||||
column('id').int.notNull.primaryWithName(nanoid()).autoGenerate,
|
||||
column('executionId').int.notNull,
|
||||
column('key').text.notNull,
|
||||
column('value').text.notNull,
|
||||
)
|
||||
.withForeignKey('executionId', {
|
||||
tableName: 'execution_entity',
|
||||
columnName: 'id',
|
||||
onDelete: 'CASCADE',
|
||||
// In MySQL foreignKey names must be unique across all tables and
|
||||
// TypeORM creates predictable names based on the columnName.
|
||||
// So the temp table's foreignKey clashes with the current table's.
|
||||
name: context.isMysql ? nanoid() : undefined,
|
||||
});
|
||||
|
||||
await context.runQuery(`
|
||||
INSERT INTO ${executionMetadataTableTemp} (${id}, ${executionId}, ${key}, ${value})
|
||||
SELECT ${id}, ${executionId}, ${key}, ${value} FROM ${executionMetadataTable};
|
||||
`);
|
||||
|
||||
await dropTable(executionMetadataTableRaw);
|
||||
await context.runQuery(
|
||||
`ALTER TABLE ${executionMetadataTableTemp} RENAME TO ${executionMetadataTable};`,
|
||||
);
|
||||
}
|
||||
}
|
|
@ -58,6 +58,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
|
||||
export const mysqlMigrations: Migration[] = [
|
||||
InitialMigration1588157391238,
|
||||
|
@ -119,4 +120,5 @@ export const mysqlMigrations: Migration[] = [
|
|||
CreateProject1714133768519,
|
||||
MakeExecutionStatusNonNullable1714133768521,
|
||||
AddActivatedAtUserSetting1717498465931,
|
||||
AddConstraintToExecutionMetadata1720101653148,
|
||||
];
|
||||
|
|
|
@ -57,6 +57,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
|
||||
export const postgresMigrations: Migration[] = [
|
||||
InitialMigration1587669153312,
|
||||
|
@ -117,4 +118,5 @@ export const postgresMigrations: Migration[] = [
|
|||
CreateProject1714133768519,
|
||||
MakeExecutionStatusNonNullable1714133768521,
|
||||
AddActivatedAtUserSetting1717498465931,
|
||||
AddConstraintToExecutionMetadata1720101653148,
|
||||
];
|
||||
|
|
|
@ -55,6 +55,7 @@ import { MoveSshKeysToDatabase1711390882123 } from '../common/1711390882123-Move
|
|||
import { RemoveNodesAccess1712044305787 } from '../common/1712044305787-RemoveNodesAccess';
|
||||
import { MakeExecutionStatusNonNullable1714133768521 } from '../common/1714133768521-MakeExecutionStatusNonNullable';
|
||||
import { AddActivatedAtUserSetting1717498465931 } from './1717498465931-AddActivatedAtUserSetting';
|
||||
import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101653148-AddConstraintToExecutionMetadata';
|
||||
|
||||
const sqliteMigrations: Migration[] = [
|
||||
InitialMigration1588102412422,
|
||||
|
@ -113,6 +114,7 @@ const sqliteMigrations: Migration[] = [
|
|||
CreateProject1714133768519,
|
||||
MakeExecutionStatusNonNullable1714133768521,
|
||||
AddActivatedAtUserSetting1717498465931,
|
||||
AddConstraintToExecutionMetadata1720101653148,
|
||||
];
|
||||
|
||||
export { sqliteMigrations };
|
||||
|
|
|
@ -6,19 +6,18 @@ import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata';
|
|||
export class ExecutionMetadataService {
|
||||
constructor(private readonly executionMetadataRepository: ExecutionMetadataRepository) {}
|
||||
|
||||
async save(
|
||||
executionId: string,
|
||||
executionMetadata: Record<string, string>,
|
||||
): Promise<ExecutionMetadata[]> {
|
||||
const metadataRows = [];
|
||||
async save(executionId: string, executionMetadata: Record<string, string>): Promise<void> {
|
||||
const metadataRows: Array<Pick<ExecutionMetadata, 'executionId' | 'key' | 'value'>> = [];
|
||||
for (const [key, value] of Object.entries(executionMetadata)) {
|
||||
metadataRows.push({
|
||||
execution: { id: executionId },
|
||||
executionId,
|
||||
key,
|
||||
value,
|
||||
});
|
||||
}
|
||||
|
||||
return await this.executionMetadataRepository.save(metadataRows);
|
||||
await this.executionMetadataRepository.upsert(metadataRows, {
|
||||
conflictPaths: { executionId: true, key: true },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
import * as testDb from '../shared/testDb';
|
||||
import Container from 'typedi';
|
||||
import { ExecutionMetadataRepository } from '@/databases/repositories/executionMetadata.repository';
|
||||
import { ExecutionMetadataService } from '@/services/executionMetadata.service';
|
||||
import { createExecution } from '@test-integration/db/executions';
|
||||
import { createWorkflow } from '@test-integration/db/workflows';
|
||||
|
||||
let executionMetadataRepository: ExecutionMetadataRepository;
|
||||
let executionMetadataService: ExecutionMetadataService;
|
||||
|
||||
beforeAll(async () => {
|
||||
await testDb.init();
|
||||
|
||||
executionMetadataRepository = Container.get(ExecutionMetadataRepository);
|
||||
executionMetadataService = Container.get(ExecutionMetadataService);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await testDb.truncate(['User']);
|
||||
});
|
||||
|
||||
describe('ProjectService', () => {
|
||||
describe('save', () => {
|
||||
it('should deduplicate entries by exeuctionId and key, keeping the latest one', async () => {
|
||||
//
|
||||
// ARRANGE
|
||||
//
|
||||
const workflow = await createWorkflow();
|
||||
const execution = await createExecution({}, workflow);
|
||||
const key = 'key';
|
||||
const value1 = 'value1';
|
||||
const value2 = 'value2';
|
||||
|
||||
//
|
||||
// ACT
|
||||
//
|
||||
await executionMetadataService.save(execution.id, { [key]: value1 });
|
||||
await executionMetadataService.save(execution.id, { [key]: value2 });
|
||||
|
||||
//
|
||||
// ASSERT
|
||||
//
|
||||
const rows = await executionMetadataRepository.find({
|
||||
where: { executionId: execution.id, key },
|
||||
});
|
||||
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0]).toHaveProperty('value', value2);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -15,20 +15,26 @@ describe('ExecutionMetadataService', () => {
|
|||
|
||||
await Container.get(ExecutionMetadataService).save(executionId, toSave);
|
||||
|
||||
expect(repository.save).toHaveBeenCalledTimes(1);
|
||||
expect(repository.save.mock.calls[0]).toEqual([
|
||||
expect(repository.upsert).toHaveBeenCalledTimes(1);
|
||||
expect(repository.upsert.mock.calls[0]).toEqual([
|
||||
[
|
||||
{
|
||||
execution: { id: executionId },
|
||||
executionId,
|
||||
key: 'test1',
|
||||
value: 'value1',
|
||||
},
|
||||
{
|
||||
execution: { id: executionId },
|
||||
executionId,
|
||||
key: 'test2',
|
||||
value: 'value2',
|
||||
},
|
||||
],
|
||||
{
|
||||
conflictPaths: {
|
||||
executionId: true,
|
||||
key: true,
|
||||
},
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue