fix(core): Fix pruning of non-finished executions (#7333)

This fixes a bug in the pruning (soft-delete). The pruning was a bit too
aggressive, as it also pruned executions that weren't in an end state
yet. This only becomes an issue if there are long-running executions
(e.g. workflow with Wait node) or the prune parameters are set to keep
only a tiny number of executions.
This commit is contained in:
Tomi Turtiainen 2023-10-04 16:32:05 +03:00 committed by GitHub
parent 942d0b91fc
commit 1b4848afcb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 251 additions and 92 deletions

View file

@ -310,7 +310,7 @@ export const schema = {
env: 'EXECUTIONS_DATA_PRUNE',
},
pruneDataMaxAge: {
doc: 'How old (hours) the execution data has to be to get deleted',
doc: 'How old (hours) the finished execution data has to be to get deleted',
format: Number,
default: 336,
env: 'EXECUTIONS_DATA_MAX_AGE',
@ -320,7 +320,7 @@ export const schema = {
// Deletes the oldest entries first
// Set to 0 for No limit
pruneDataMaxCount: {
doc: 'Maximum number of executions to keep in DB. 0 = no limit',
doc: "Maximum number of finished executions to keep in DB. Doesn't necessarily prune exactly to max number. 0 = no limit",
format: Number,
default: 10000,
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',

View file

@ -103,4 +103,5 @@ export const TIME = {
SECOND: 1000,
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
DAY: 24 * 60 * 60 * 1000,
};

View file

@ -1,5 +1,14 @@
import { Service } from 'typedi';
import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
import {
Brackets,
DataSource,
Not,
In,
IsNull,
LessThanOrEqual,
MoreThanOrEqual,
Repository,
} from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import type {
FindManyOptions,
@ -110,13 +119,21 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}
setSoftDeletionInterval() {
this.logger.debug('Setting soft-deletion interval (pruning) for executions');
this.logger.debug(
`Setting soft-deletion interval (pruning) for executions every ${
this.rates.softDeletion / TIME.MINUTE
} min`,
);
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion);
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.softDeletion);
}
setHardDeletionInterval() {
this.logger.debug('Setting hard-deletion interval for executions');
this.logger.debug(
`Setting hard-deletion interval for executions every ${
this.rates.hardDeletion / TIME.MINUTE
} min`,
);
this.intervals.hardDeletion = setInterval(
async () => this.hardDelete(),
@ -487,7 +504,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where(
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)

View file

@ -0,0 +1,218 @@
import config from '@/config';
import * as Db from '@/Db';
import * as testDb from '../shared/testDb';
import type { ExecutionStatus } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import type { ExecutionRepository } from '../../../src/databases/repositories';
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
import { TIME } from '../../../src/constants';
describe('ExecutionRepository.prune()', () => {
const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY);
let executionRepository: ExecutionRepository;
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;
beforeAll(async () => {
LoggerProxy.init(getLogger());
await testDb.init();
const { Execution } = Db.collections;
executionRepository = Execution;
workflow = await testDb.createWorkflow();
});
beforeEach(async () => {
await testDb.truncate(['Execution']);
});
afterAll(async () => {
await testDb.terminate();
});
afterEach(() => {
config.load(config.default);
});
async function findAllExecutions() {
return Db.collections.Execution.find({
order: { id: 'asc' },
withDeleted: true,
});
}
describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
beforeEach(() => {
config.set('executions.pruneDataMaxCount', 1);
config.set('executions.pruneDataMaxAge', 336);
});
test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
const executions = [
await testDb.createSuccessfulExecution(workflow),
await testDb.createSuccessfulExecution(workflow),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[2].id, deletedAt: null }),
]);
});
test('should not re-mark already marked executions', async () => {
const executions = [
await testDb.createExecution(
{ status: 'success', finished: true, startedAt: now, stoppedAt: now, deletedAt: now },
workflow,
),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: now }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['warning', { startedAt: now, stoppedAt: now }],
['unknown', { startedAt: now, stoppedAt: now }],
['canceled', { startedAt: now, stoppedAt: now }],
['crashed', { startedAt: now, stoppedAt: now }],
['failed', { startedAt: now, stoppedAt: now }],
['success', { finished: true, startedAt: now, stoppedAt: now }],
])('should prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['new', {}],
['running', { startedAt: now }],
['waiting', { startedAt: now, stoppedAt: now, waitTill: now }],
])('should not prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
});
describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
beforeEach(() => {
config.set('executions.pruneDataMaxAge', 1); // 1h
config.set('executions.pruneDataMaxCount', 0);
});
test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
const executions = [
await testDb.createExecution(
{ finished: true, startedAt: yesterday, stoppedAt: yesterday, status: 'success' },
workflow,
),
await testDb.createExecution(
{ finished: true, startedAt: now, stoppedAt: now, status: 'success' },
workflow,
),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
test('should not re-mark already marked executions', async () => {
const executions = [
await testDb.createExecution(
{
status: 'success',
finished: true,
startedAt: yesterday,
stoppedAt: yesterday,
deletedAt: yesterday,
},
workflow,
),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: yesterday }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['warning', { startedAt: yesterday, stoppedAt: yesterday }],
['unknown', { startedAt: yesterday, stoppedAt: yesterday }],
['canceled', { startedAt: yesterday, stoppedAt: yesterday }],
['crashed', { startedAt: yesterday, stoppedAt: yesterday }],
['failed', { startedAt: yesterday, stoppedAt: yesterday }],
['success', { finished: true, startedAt: yesterday, stoppedAt: yesterday }],
])('should prune %s executions', async (status, attributes) => {
const execution = await testDb.createExecution({ status, ...attributes }, workflow);
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: execution.id, deletedAt: expect.any(Date) }),
]);
});
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['new', {}],
['running', { startedAt: yesterday }],
['waiting', { startedAt: yesterday, stoppedAt: yesterday, waitTill: yesterday }],
])('should not prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];
await executionRepository.prune();
const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
});
});

View file

@ -360,11 +360,11 @@ export async function createManyExecutions(
/**
* Store a execution in the DB and assign it to a workflow.
*/
async function createExecution(
export async function createExecution(
attributes: Partial<ExecutionEntity & ExecutionData>,
workflow: WorkflowEntity,
) {
const { data, finished, mode, startedAt, stoppedAt, waitTill, status } = attributes;
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt } = attributes;
const execution = await Db.collections.Execution.save({
finished: finished ?? true,
@ -374,6 +374,7 @@ async function createExecution(
stoppedAt: stoppedAt ?? new Date(),
waitTill: waitTill ?? null,
status,
deletedAt,
});
await Db.collections.ExecutionData.save({

View file

@ -1,83 +0,0 @@
import { Container } from 'typedi';
import { DataSource, EntityManager } from 'typeorm';
import { mock } from 'jest-mock-extended';
import { mockInstance } from '../../integration/shared/utils/';
import { ExecutionRepository } from '@/databases/repositories';
import config from '@/config';
import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { TIME } from '@/constants';
import { DateUtils } from 'typeorm/util/DateUtils';
jest.mock('typeorm/util/DateUtils');
LoggerProxy.init(getLogger());
const { objectContaining } = expect;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const qb: any = {
update: jest.fn().mockReturnThis(),
set: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
execute: jest.fn().mockReturnThis(),
};
describe('ExecutionRepository', () => {
const entityManager = mockInstance(EntityManager);
const dataSource = mockInstance(DataSource, { manager: entityManager });
dataSource.getMetadata.mockReturnValue(mock());
Object.assign(entityManager, { connection: dataSource });
const executionRepository = Container.get(ExecutionRepository);
beforeAll(() => {
Container.set(ExecutionRepository, executionRepository);
LoggerProxy.init(getLogger());
});
beforeEach(() => {
config.load(config.default);
jest.clearAllMocks();
});
describe('pruneBySoftDeleting()', () => {
test('should limit pruning based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
const maxCount = 1;
config.set('executions.pruneDataMaxCount', maxCount);
const find = jest.spyOn(ExecutionRepository.prototype, 'find');
entityManager.find.mockResolvedValue([]);
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
await executionRepository.prune();
expect(find.mock.calls[0][0]).toEqual(objectContaining({ skip: maxCount }));
});
test('should limit pruning based on EXECUTIONS_DATA_MAX_AGE', async () => {
const maxAge = 5; // hours
config.set('executions.pruneDataMaxCount', 0); // disable prune-by-count path
config.set('executions.pruneDataMaxAge', 5);
entityManager.find.mockResolvedValue([]);
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
const dateFormat = jest.spyOn(DateUtils, 'mixedDateToUtcDatetimeString');
const now = Date.now();
await executionRepository.prune();
const argDate = dateFormat.mock.calls[0][0];
const difference = now - argDate.valueOf();
expect(Math.round(difference / TIME.HOUR)).toBe(maxAge);
});
});
});