mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-26 05:04:05 -08:00
fix(core): Fix pruning of non-finished executions (#7333)
This fixes a bug in the pruning (soft-delete). The pruning was a bit too aggressive, as it also pruned executions that weren't in an end state yet. This only becomes an issue if there are long-running executions (e.g. workflow with Wait node) or the prune parameters are set to keep only a tiny number of executions.
This commit is contained in:
parent
942d0b91fc
commit
1b4848afcb
|
@ -310,7 +310,7 @@ export const schema = {
|
||||||
env: 'EXECUTIONS_DATA_PRUNE',
|
env: 'EXECUTIONS_DATA_PRUNE',
|
||||||
},
|
},
|
||||||
pruneDataMaxAge: {
|
pruneDataMaxAge: {
|
||||||
doc: 'How old (hours) the execution data has to be to get deleted',
|
doc: 'How old (hours) the finished execution data has to be to get deleted',
|
||||||
format: Number,
|
format: Number,
|
||||||
default: 336,
|
default: 336,
|
||||||
env: 'EXECUTIONS_DATA_MAX_AGE',
|
env: 'EXECUTIONS_DATA_MAX_AGE',
|
||||||
|
@ -320,7 +320,7 @@ export const schema = {
|
||||||
// Deletes the oldest entries first
|
// Deletes the oldest entries first
|
||||||
// Set to 0 for No limit
|
// Set to 0 for No limit
|
||||||
pruneDataMaxCount: {
|
pruneDataMaxCount: {
|
||||||
doc: 'Maximum number of executions to keep in DB. 0 = no limit',
|
doc: "Maximum number of finished executions to keep in DB. Doesn't necessarily prune exactly to max number. 0 = no limit",
|
||||||
format: Number,
|
format: Number,
|
||||||
default: 10000,
|
default: 10000,
|
||||||
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
|
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
|
||||||
|
|
|
@ -103,4 +103,5 @@ export const TIME = {
|
||||||
SECOND: 1000,
|
SECOND: 1000,
|
||||||
MINUTE: 60 * 1000,
|
MINUTE: 60 * 1000,
|
||||||
HOUR: 60 * 60 * 1000,
|
HOUR: 60 * 60 * 1000,
|
||||||
|
DAY: 24 * 60 * 60 * 1000,
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,5 +1,14 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
|
import {
|
||||||
|
Brackets,
|
||||||
|
DataSource,
|
||||||
|
Not,
|
||||||
|
In,
|
||||||
|
IsNull,
|
||||||
|
LessThanOrEqual,
|
||||||
|
MoreThanOrEqual,
|
||||||
|
Repository,
|
||||||
|
} from 'typeorm';
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
import type {
|
import type {
|
||||||
FindManyOptions,
|
FindManyOptions,
|
||||||
|
@ -110,13 +119,21 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
}
|
}
|
||||||
|
|
||||||
setSoftDeletionInterval() {
|
setSoftDeletionInterval() {
|
||||||
this.logger.debug('Setting soft-deletion interval (pruning) for executions');
|
this.logger.debug(
|
||||||
|
`Setting soft-deletion interval (pruning) for executions every ${
|
||||||
|
this.rates.softDeletion / TIME.MINUTE
|
||||||
|
} min`,
|
||||||
|
);
|
||||||
|
|
||||||
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion);
|
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.softDeletion);
|
||||||
}
|
}
|
||||||
|
|
||||||
setHardDeletionInterval() {
|
setHardDeletionInterval() {
|
||||||
this.logger.debug('Setting hard-deletion interval for executions');
|
this.logger.debug(
|
||||||
|
`Setting hard-deletion interval for executions every ${
|
||||||
|
this.rates.hardDeletion / TIME.MINUTE
|
||||||
|
} min`,
|
||||||
|
);
|
||||||
|
|
||||||
this.intervals.hardDeletion = setInterval(
|
this.intervals.hardDeletion = setInterval(
|
||||||
async () => this.hardDelete(),
|
async () => this.hardDelete(),
|
||||||
|
@ -487,7 +504,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
await this.createQueryBuilder()
|
await this.createQueryBuilder()
|
||||||
.update(ExecutionEntity)
|
.update(ExecutionEntity)
|
||||||
.set({ deletedAt: new Date() })
|
.set({ deletedAt: new Date() })
|
||||||
.where(
|
.where({
|
||||||
|
deletedAt: IsNull(),
|
||||||
|
// Only mark executions as deleted if they are in an end state
|
||||||
|
status: Not(In(['new', 'running', 'waiting'])),
|
||||||
|
})
|
||||||
|
.andWhere(
|
||||||
new Brackets((qb) =>
|
new Brackets((qb) =>
|
||||||
countBasedWhere
|
countBasedWhere
|
||||||
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
||||||
|
|
|
@ -0,0 +1,218 @@
|
||||||
|
import config from '@/config';
|
||||||
|
import * as Db from '@/Db';
|
||||||
|
|
||||||
|
import * as testDb from '../shared/testDb';
|
||||||
|
import type { ExecutionStatus } from 'n8n-workflow';
|
||||||
|
import { LoggerProxy } from 'n8n-workflow';
|
||||||
|
import { getLogger } from '@/Logger';
|
||||||
|
import type { ExecutionRepository } from '../../../src/databases/repositories';
|
||||||
|
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
|
||||||
|
import { TIME } from '../../../src/constants';
|
||||||
|
|
||||||
|
describe('ExecutionRepository.prune()', () => {
|
||||||
|
const now = new Date();
|
||||||
|
const yesterday = new Date(Date.now() - TIME.DAY);
|
||||||
|
let executionRepository: ExecutionRepository;
|
||||||
|
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
LoggerProxy.init(getLogger());
|
||||||
|
await testDb.init();
|
||||||
|
|
||||||
|
const { Execution } = Db.collections;
|
||||||
|
|
||||||
|
executionRepository = Execution;
|
||||||
|
workflow = await testDb.createWorkflow();
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await testDb.truncate(['Execution']);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await testDb.terminate();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
config.load(config.default);
|
||||||
|
});
|
||||||
|
|
||||||
|
async function findAllExecutions() {
|
||||||
|
return Db.collections.Execution.find({
|
||||||
|
order: { id: 'asc' },
|
||||||
|
withDeleted: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
config.set('executions.pruneDataMaxCount', 1);
|
||||||
|
config.set('executions.pruneDataMaxAge', 336);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: expect.any(Date) }),
|
||||||
|
expect.objectContaining({ id: executions[2].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should not re-mark already marked executions', async () => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution(
|
||||||
|
{ status: 'success', finished: true, startedAt: now, stoppedAt: now, deletedAt: now },
|
||||||
|
workflow,
|
||||||
|
),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: now }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||||
|
['warning', { startedAt: now, stoppedAt: now }],
|
||||||
|
['unknown', { startedAt: now, stoppedAt: now }],
|
||||||
|
['canceled', { startedAt: now, stoppedAt: now }],
|
||||||
|
['crashed', { startedAt: now, stoppedAt: now }],
|
||||||
|
['failed', { startedAt: now, stoppedAt: now }],
|
||||||
|
['success', { finished: true, startedAt: now, stoppedAt: now }],
|
||||||
|
])('should prune %s executions', async (status, attributes) => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||||
|
['new', {}],
|
||||||
|
['running', { startedAt: now }],
|
||||||
|
['waiting', { startedAt: now, stoppedAt: now, waitTill: now }],
|
||||||
|
])('should not prune %s executions', async (status, attributes) => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
config.set('executions.pruneDataMaxAge', 1); // 1h
|
||||||
|
config.set('executions.pruneDataMaxCount', 0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution(
|
||||||
|
{ finished: true, startedAt: yesterday, stoppedAt: yesterday, status: 'success' },
|
||||||
|
workflow,
|
||||||
|
),
|
||||||
|
await testDb.createExecution(
|
||||||
|
{ finished: true, startedAt: now, stoppedAt: now, status: 'success' },
|
||||||
|
workflow,
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should not re-mark already marked executions', async () => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution(
|
||||||
|
{
|
||||||
|
status: 'success',
|
||||||
|
finished: true,
|
||||||
|
startedAt: yesterday,
|
||||||
|
stoppedAt: yesterday,
|
||||||
|
deletedAt: yesterday,
|
||||||
|
},
|
||||||
|
workflow,
|
||||||
|
),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: yesterday }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||||
|
['warning', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
['unknown', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
['canceled', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
['crashed', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
['failed', { startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
['success', { finished: true, startedAt: yesterday, stoppedAt: yesterday }],
|
||||||
|
])('should prune %s executions', async (status, attributes) => {
|
||||||
|
const execution = await testDb.createExecution({ status, ...attributes }, workflow);
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: execution.id, deletedAt: expect.any(Date) }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
|
||||||
|
['new', {}],
|
||||||
|
['running', { startedAt: yesterday }],
|
||||||
|
['waiting', { startedAt: yesterday, stoppedAt: yesterday, waitTill: yesterday }],
|
||||||
|
])('should not prune %s executions', async (status, attributes) => {
|
||||||
|
const executions = [
|
||||||
|
await testDb.createExecution({ status, ...attributes }, workflow),
|
||||||
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
|
];
|
||||||
|
|
||||||
|
await executionRepository.prune();
|
||||||
|
|
||||||
|
const result = await findAllExecutions();
|
||||||
|
expect(result).toEqual([
|
||||||
|
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
|
||||||
|
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -360,11 +360,11 @@ export async function createManyExecutions(
|
||||||
/**
|
/**
|
||||||
* Store a execution in the DB and assign it to a workflow.
|
* Store a execution in the DB and assign it to a workflow.
|
||||||
*/
|
*/
|
||||||
async function createExecution(
|
export async function createExecution(
|
||||||
attributes: Partial<ExecutionEntity & ExecutionData>,
|
attributes: Partial<ExecutionEntity & ExecutionData>,
|
||||||
workflow: WorkflowEntity,
|
workflow: WorkflowEntity,
|
||||||
) {
|
) {
|
||||||
const { data, finished, mode, startedAt, stoppedAt, waitTill, status } = attributes;
|
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt } = attributes;
|
||||||
|
|
||||||
const execution = await Db.collections.Execution.save({
|
const execution = await Db.collections.Execution.save({
|
||||||
finished: finished ?? true,
|
finished: finished ?? true,
|
||||||
|
@ -374,6 +374,7 @@ async function createExecution(
|
||||||
stoppedAt: stoppedAt ?? new Date(),
|
stoppedAt: stoppedAt ?? new Date(),
|
||||||
waitTill: waitTill ?? null,
|
waitTill: waitTill ?? null,
|
||||||
status,
|
status,
|
||||||
|
deletedAt,
|
||||||
});
|
});
|
||||||
|
|
||||||
await Db.collections.ExecutionData.save({
|
await Db.collections.ExecutionData.save({
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
import { Container } from 'typedi';
|
|
||||||
import { DataSource, EntityManager } from 'typeorm';
|
|
||||||
import { mock } from 'jest-mock-extended';
|
|
||||||
import { mockInstance } from '../../integration/shared/utils/';
|
|
||||||
import { ExecutionRepository } from '@/databases/repositories';
|
|
||||||
import config from '@/config';
|
|
||||||
import { LoggerProxy } from 'n8n-workflow';
|
|
||||||
import { getLogger } from '@/Logger';
|
|
||||||
import { TIME } from '@/constants';
|
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
|
||||||
|
|
||||||
jest.mock('typeorm/util/DateUtils');
|
|
||||||
|
|
||||||
LoggerProxy.init(getLogger());
|
|
||||||
|
|
||||||
const { objectContaining } = expect;
|
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
const qb: any = {
|
|
||||||
update: jest.fn().mockReturnThis(),
|
|
||||||
set: jest.fn().mockReturnThis(),
|
|
||||||
where: jest.fn().mockReturnThis(),
|
|
||||||
execute: jest.fn().mockReturnThis(),
|
|
||||||
};
|
|
||||||
|
|
||||||
describe('ExecutionRepository', () => {
|
|
||||||
const entityManager = mockInstance(EntityManager);
|
|
||||||
const dataSource = mockInstance(DataSource, { manager: entityManager });
|
|
||||||
dataSource.getMetadata.mockReturnValue(mock());
|
|
||||||
Object.assign(entityManager, { connection: dataSource });
|
|
||||||
|
|
||||||
const executionRepository = Container.get(ExecutionRepository);
|
|
||||||
|
|
||||||
beforeAll(() => {
|
|
||||||
Container.set(ExecutionRepository, executionRepository);
|
|
||||||
LoggerProxy.init(getLogger());
|
|
||||||
});
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
config.load(config.default);
|
|
||||||
|
|
||||||
jest.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('pruneBySoftDeleting()', () => {
|
|
||||||
test('should limit pruning based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
|
|
||||||
const maxCount = 1;
|
|
||||||
|
|
||||||
config.set('executions.pruneDataMaxCount', maxCount);
|
|
||||||
|
|
||||||
const find = jest.spyOn(ExecutionRepository.prototype, 'find');
|
|
||||||
entityManager.find.mockResolvedValue([]);
|
|
||||||
|
|
||||||
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
|
|
||||||
|
|
||||||
await executionRepository.prune();
|
|
||||||
|
|
||||||
expect(find.mock.calls[0][0]).toEqual(objectContaining({ skip: maxCount }));
|
|
||||||
});
|
|
||||||
|
|
||||||
test('should limit pruning based on EXECUTIONS_DATA_MAX_AGE', async () => {
|
|
||||||
const maxAge = 5; // hours
|
|
||||||
|
|
||||||
config.set('executions.pruneDataMaxCount', 0); // disable prune-by-count path
|
|
||||||
config.set('executions.pruneDataMaxAge', 5);
|
|
||||||
|
|
||||||
entityManager.find.mockResolvedValue([]);
|
|
||||||
|
|
||||||
jest.spyOn(ExecutionRepository.prototype, 'createQueryBuilder').mockReturnValueOnce(qb);
|
|
||||||
|
|
||||||
const dateFormat = jest.spyOn(DateUtils, 'mixedDateToUtcDatetimeString');
|
|
||||||
|
|
||||||
const now = Date.now();
|
|
||||||
|
|
||||||
await executionRepository.prune();
|
|
||||||
|
|
||||||
const argDate = dateFormat.mock.calls[0][0];
|
|
||||||
const difference = now - argDate.valueOf();
|
|
||||||
|
|
||||||
expect(Math.round(difference / TIME.HOUR)).toBe(maxAge);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
Loading…
Reference in a new issue