mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-25 20:54:07 -08:00
fix(core): Fix pruning of non-finished executions (#7333)
This fixes a bug in the pruning (soft-delete). The pruning was a bit too aggressive, as it also pruned executions that weren't in an end state yet. This only becomes an issue if there are long-running executions (e.g. workflow with Wait node) or the prune parameters are set to keep only a tiny number of executions.
This commit is contained in:
parent
942d0b91fc
commit
1b4848afcb
|
@ -310,7 +310,7 @@ export const schema = {
|
|||
env: 'EXECUTIONS_DATA_PRUNE',
|
||||
},
|
||||
pruneDataMaxAge: {
|
||||
doc: 'How old (hours) the execution data has to be to get deleted',
|
||||
doc: 'How old (hours) the finished execution data has to be to get deleted',
|
||||
format: Number,
|
||||
default: 336,
|
||||
env: 'EXECUTIONS_DATA_MAX_AGE',
|
||||
|
@ -320,7 +320,7 @@ export const schema = {
|
|||
// Deletes the oldest entries first
|
||||
// Set to 0 for No limit
|
||||
pruneDataMaxCount: {
|
||||
doc: 'Maximum number of executions to keep in DB. 0 = no limit',
|
||||
doc: "Maximum number of finished executions to keep in DB. Doesn't necessarily prune exactly to max number. 0 = no limit",
|
||||
format: Number,
|
||||
default: 10000,
|
||||
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
|
||||
|
|
|
@ -103,4 +103,5 @@ export const TIME = {
|
|||
SECOND: 1000,
|
||||
MINUTE: 60 * 1000,
|
||||
HOUR: 60 * 60 * 1000,
|
||||
DAY: 24 * 60 * 60 * 1000,
|
||||
};
|
||||
|
|
|
@ -1,5 +1,14 @@
|
|||
import { Service } from 'typedi';
|
||||
import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
|
||||
import {
|
||||
Brackets,
|
||||
DataSource,
|
||||
Not,
|
||||
In,
|
||||
IsNull,
|
||||
LessThanOrEqual,
|
||||
MoreThanOrEqual,
|
||||
Repository,
|
||||
} from 'typeorm';
|
||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||
import type {
|
||||
FindManyOptions,
|
||||
|
@ -110,13 +119,21 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
|||
}
|
||||
|
||||
setSoftDeletionInterval() {
|
||||
this.logger.debug('Setting soft-deletion interval (pruning) for executions');
|
||||
this.logger.debug(
|
||||
`Setting soft-deletion interval (pruning) for executions every ${
|
||||
this.rates.softDeletion / TIME.MINUTE
|
||||
} min`,
|
||||
);
|
||||
|
||||
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion);
|
||||
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.softDeletion);
|
||||
}
|
||||
|
||||
setHardDeletionInterval() {
|
||||
this.logger.debug('Setting hard-deletion interval for executions');
|
||||
this.logger.debug(
|
||||
`Setting hard-deletion interval for executions every ${
|
||||
this.rates.hardDeletion / TIME.MINUTE
|
||||
} min`,
|
||||
);
|
||||
|
||||
this.intervals.hardDeletion = setInterval(
|
||||
async () => this.hardDelete(),
|
||||
|
@ -487,7 +504,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
|||
await this.createQueryBuilder()
|
||||
.update(ExecutionEntity)
|
||||
.set({ deletedAt: new Date() })
|
||||
.where(
|
||||
.where({
|
||||
deletedAt: IsNull(),
|
||||
// Only mark executions as deleted if they are in an end state
|
||||
status: Not(In(['new', 'running', 'waiting'])),
|
||||
})
|
||||
.andWhere(
|
||||
new Brackets((qb) =>
|
||||
countBasedWhere
|
||||
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
import config from '@/config';
|
||||
import * as Db from '@/Db';
|
||||
|
||||
import * as testDb from '../shared/testDb';
|
||||
import type { ExecutionStatus } from 'n8n-workflow';
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
import { getLogger } from '@/Logger';
|
||||
import type { ExecutionRepository } from '../../../src/databases/repositories';
|
||||
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
|
||||
import { TIME } from '../../../src/constants';
|
||||
|
||||
describe('ExecutionRepository.prune()', () => {
|
||||
const now = new Date();
|
||||
const yesterday = new Date(Date.now() - TIME.DAY);
|
||||
let executionRepository: ExecutionRepository;
|
||||
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;
|
||||
|
||||
beforeAll(async () => {
|
||||
LoggerProxy.init(getLogger());
|
||||
await testDb.init();
|
||||
|
||||
const { Execution } = Db.collections;
|
||||
|
||||
executionRepository = Execution;
|
||||
workflow = await testDb.createWorkflow();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await testDb.truncate(['Execution']);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
config.load(config.default);
|
||||
});
|
||||
|
||||
async function findAllExecutions() {
|
||||
return Db.collections.Execution.find({
|
||||
order: { id: 'asc' },
|
||||
withDeleted: true,
|
||||
});
|
||||
}
|
||||
|
||||
describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
|
||||
beforeEach(() => {
|
||||
config.set('executions.pruneDataMaxCount', 1);
|
||||
config.set('executions.pruneDataMaxAge', 336);
|
||||
});
|
||||
|
||||
test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
|
||||
const executions = [
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: expect.any(Date) }),
|
||||
expect.objectContaining({ id: executions[2].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
|
||||
test('should not re-mark already marked executions', async () => {
|
||||
const executions = [
|
||||
await testDb.createExecution(
|
||||
{ status: 'success', finished: true, startedAt: now, stoppedAt: now, deletedAt: now },
|
||||
workflow,
|
||||
),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: now }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
|
||||
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||
['warning', { startedAt: now, stoppedAt: now }],
|
||||
['unknown', { startedAt: now, stoppedAt: now }],
|
||||
['canceled', { startedAt: now, stoppedAt: now }],
|
||||
['crashed', { startedAt: now, stoppedAt: now }],
|
||||
['failed', { startedAt: now, stoppedAt: now }],
|
||||
['success', { finished: true, startedAt: now, stoppedAt: now }],
|
||||
])('should prune %s executions', async (status, attributes) => {
|
||||
const executions = [
|
||||
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
|
||||
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||
['new', {}],
|
||||
['running', { startedAt: now }],
|
||||
['waiting', { startedAt: now, stoppedAt: now, waitTill: now }],
|
||||
])('should not prune %s executions', async (status, attributes) => {
|
||||
const executions = [
|
||||
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
|
||||
beforeEach(() => {
|
||||
config.set('executions.pruneDataMaxAge', 1); // 1h
|
||||
config.set('executions.pruneDataMaxCount', 0);
|
||||
});
|
||||
|
||||
test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
|
||||
const executions = [
|
||||
await testDb.createExecution(
|
||||
{ finished: true, startedAt: yesterday, stoppedAt: yesterday, status: 'success' },
|
||||
workflow,
|
||||
),
|
||||
await testDb.createExecution(
|
||||
{ finished: true, startedAt: now, stoppedAt: now, status: 'success' },
|
||||
workflow,
|
||||
),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
|
||||
test('should not re-mark already marked executions', async () => {
|
||||
const executions = [
|
||||
await testDb.createExecution(
|
||||
{
|
||||
status: 'success',
|
||||
finished: true,
|
||||
startedAt: yesterday,
|
||||
stoppedAt: yesterday,
|
||||
deletedAt: yesterday,
|
||||
},
|
||||
workflow,
|
||||
),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: yesterday }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
|
||||
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||
['warning', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||
['unknown', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||
['canceled', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||
['crashed', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||
['failed', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||
['success', { finished: true, startedAt: yesterday, stoppedAt: yesterday }],
|
||||
])('should prune %s executions', async (status, attributes) => {
|
||||
const execution = await testDb.createExecution({ status, ...attributes }, workflow);
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: execution.id, deletedAt: expect.any(Date) }),
|
||||
]);
|
||||
});
|
||||
|
||||
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||
['new', {}],
|
||||
['running', { startedAt: yesterday }],
|
||||
['waiting', { startedAt: yesterday, stoppedAt: yesterday, waitTill: yesterday }],
|
||||
])('should not prune %s executions', async (status, attributes) => {
|
||||
const executions = [
|
||||
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||
await testDb.createSuccessfulExecution(workflow),
|
||||
];
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const result = await findAllExecutions();
|
||||
expect(result).toEqual([
|
||||
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
|
||||
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||
]);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -360,11 +360,11 @@ export async function createManyExecutions(
|
|||
/**
|
||||
* Store a execution in the DB and assign it to a workflow.
|
||||
*/
|
||||
async function createExecution(
|
||||
export async function createExecution(
|
||||
attributes: Partial<ExecutionEntity & ExecutionData>,
|
||||
workflow: WorkflowEntity,
|
||||
) {
|
||||
const { data, finished, mode, startedAt, stoppedAt, waitTill, status } = attributes;
|
||||
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt } = attributes;
|
||||
|
||||
const execution = await Db.collections.Execution.save({
|
||||
finished: finished ?? true,
|
||||
|
@ -374,6 +374,7 @@ async function createExecution(
|
|||
stoppedAt: stoppedAt ?? new Date(),
|
||||
waitTill: waitTill ?? null,
|
||||
status,
|
||||
deletedAt,
|
||||
});
|
||||
|
||||
await Db.collections.ExecutionData.save({
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
import { Container } from 'typedi';
|
||||
import { DataSource, EntityManager } from 'typeorm';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import { mockInstance } from '../../integration/shared/utils/';
|
||||
import { ExecutionRepository } from '@/databases/repositories';
|
||||
import config from '@/config';
|
||||
import { LoggerProxy } from 'n8n-workflow';
|
||||
import { getLogger } from '@/Logger';
|
||||
import { TIME } from '@/constants';
|
||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||
|
||||
jest.mock('typeorm/util/DateUtils');
|
||||
|
||||
LoggerProxy.init(getLogger());
|
||||
|
||||
const { objectContaining } = expect;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const qb: any = {
|
||||
update: jest.fn().mockReturnThis(),
|
||||
set: jest.fn().mockReturnThis(),
|
||||
where: jest.fn().mockReturnThis(),
|
||||
execute: jest.fn().mockReturnThis(),
|
||||
};
|
||||
|
||||
describe('ExecutionRepository', () => {
|
||||
const entityManager = mockInstance(EntityManager);
|
||||
const dataSource = mockInstance(DataSource, { manager: entityManager });
|
||||
dataSource.getMetadata.mockReturnValue(mock());
|
||||
Object.assign(entityManager, { connection: dataSource });
|
||||
|
||||
const executionRepository = Container.get(ExecutionRepository);
|
||||
|
||||
beforeAll(() => {
|
||||
Container.set(ExecutionRepository, executionRepository);
|
||||
LoggerProxy.init(getLogger());
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
config.load(config.default);
|
||||
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('pruneBySoftDeleting()', () => {
|
||||
test('should limit pruning based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
|
||||
const maxCount = 1;
|
||||
|
||||
config.set('executions.pruneDataMaxCount', maxCount);
|
||||
|
||||
const find = jest.spyOn(ExecutionRepository.prototype, 'find');
|
||||
entityManager.find.mockResolvedValue([]);
|
||||
|
||||
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
expect(find.mock.calls[0][0]).toEqual(objectContaining({ skip: maxCount }));
|
||||
});
|
||||
|
||||
test('should limit pruning based on EXECUTIONS_DATA_MAX_AGE', async () => {
|
||||
const maxAge = 5; // hours
|
||||
|
||||
config.set('executions.pruneDataMaxCount', 0); // disable prune-by-count path
|
||||
config.set('executions.pruneDataMaxAge', 5);
|
||||
|
||||
entityManager.find.mockResolvedValue([]);
|
||||
|
||||
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
|
||||
|
||||
const dateFormat = jest.spyOn(DateUtils, 'mixedDateToUtcDatetimeString');
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
await executionRepository.prune();
|
||||
|
||||
const argDate = dateFormat.mock.calls[0][0];
|
||||
const difference = now - argDate.valueOf();
|
||||
|
||||
expect(Math.round(difference / TIME.HOUR)).toBe(maxAge);
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Reference in a new issue