mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 20:24:05 -08:00
refactor(core): Create pruning service (no-changelog) (#7564)
https://linear.app/n8n/issue/PAY-954/ensure-only-main-instance-leader-handles-pruning
This commit is contained in:
parent
be49778388
commit
921d213ae5
|
@ -41,6 +41,7 @@ import {
|
||||||
WorkflowStatisticsRepository,
|
WorkflowStatisticsRepository,
|
||||||
WorkflowTagMappingRepository,
|
WorkflowTagMappingRepository,
|
||||||
} from '@db/repositories';
|
} from '@db/repositories';
|
||||||
|
import { PruningService } from '@/services/pruning.service';
|
||||||
|
|
||||||
export const collections = {} as IDatabaseCollections;
|
export const collections = {} as IDatabaseCollections;
|
||||||
|
|
||||||
|
@ -191,6 +192,10 @@ export async function init(testConnectionOptions?: ConnectionOptions): Promise<v
|
||||||
collections.Settings = Container.get(SettingsRepository);
|
collections.Settings = Container.get(SettingsRepository);
|
||||||
collections.Credentials = Container.get(CredentialsRepository);
|
collections.Credentials = Container.get(CredentialsRepository);
|
||||||
collections.Workflow = Container.get(WorkflowRepository);
|
collections.Workflow = Container.get(WorkflowRepository);
|
||||||
|
|
||||||
|
const pruningService = Container.get(PruningService);
|
||||||
|
|
||||||
|
if (await pruningService.isPruningEnabled()) pruningService.startPruning();
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function migrate() {
|
export async function migrate() {
|
||||||
|
|
|
@ -26,10 +26,10 @@ import { eventBus } from '@/eventbus';
|
||||||
import { BaseCommand } from './BaseCommand';
|
import { BaseCommand } from './BaseCommand';
|
||||||
import { InternalHooks } from '@/InternalHooks';
|
import { InternalHooks } from '@/InternalHooks';
|
||||||
import { License, FeatureNotLicensedError } from '@/License';
|
import { License, FeatureNotLicensedError } from '@/License';
|
||||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
|
||||||
import { IConfig } from '@oclif/config';
|
import { IConfig } from '@oclif/config';
|
||||||
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
import { SingleMainInstancePublisher } from '@/services/orchestration/main/SingleMainInstance.publisher';
|
||||||
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
|
||||||
|
import { PruningService } from '@/services/pruning.service';
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
|
||||||
const open = require('open');
|
const open = require('open');
|
||||||
|
@ -110,7 +110,9 @@ export class Start extends BaseCommand {
|
||||||
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
|
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
|
||||||
await Container.get(License).shutdown();
|
await Container.get(License).shutdown();
|
||||||
|
|
||||||
Container.get(ExecutionRepository).clearTimers();
|
const pruningService = Container.get(PruningService);
|
||||||
|
|
||||||
|
if (await pruningService.isPruningEnabled()) await pruningService.stopPruning();
|
||||||
|
|
||||||
if (config.getEnv('leaderSelection.enabled')) {
|
if (config.getEnv('leaderSelection.enabled')) {
|
||||||
const { MultiMainInstancePublisher } = await import(
|
const { MultiMainInstancePublisher } = await import(
|
||||||
|
|
|
@ -1,14 +1,5 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import {
|
import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
|
||||||
Brackets,
|
|
||||||
DataSource,
|
|
||||||
Not,
|
|
||||||
In,
|
|
||||||
IsNull,
|
|
||||||
LessThanOrEqual,
|
|
||||||
MoreThanOrEqual,
|
|
||||||
Repository,
|
|
||||||
} from 'typeorm';
|
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
import type {
|
import type {
|
||||||
FindManyOptions,
|
FindManyOptions,
|
||||||
|
@ -33,7 +24,6 @@ import type { ExecutionData } from '../entities/ExecutionData';
|
||||||
import { ExecutionEntity } from '../entities/ExecutionEntity';
|
import { ExecutionEntity } from '../entities/ExecutionEntity';
|
||||||
import { ExecutionMetadata } from '../entities/ExecutionMetadata';
|
import { ExecutionMetadata } from '../entities/ExecutionMetadata';
|
||||||
import { ExecutionDataRepository } from './executionData.repository';
|
import { ExecutionDataRepository } from './executionData.repository';
|
||||||
import { TIME, inTest } from '@/constants';
|
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
|
||||||
function parseFiltersToQueryBuilder(
|
function parseFiltersToQueryBuilder(
|
||||||
|
@ -79,19 +69,6 @@ function parseFiltersToQueryBuilder(
|
||||||
export class ExecutionRepository extends Repository<ExecutionEntity> {
|
export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
private hardDeletionBatchSize = 100;
|
private hardDeletionBatchSize = 100;
|
||||||
|
|
||||||
private rates: Record<string, number> = {
|
|
||||||
softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE,
|
|
||||||
hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE,
|
|
||||||
};
|
|
||||||
|
|
||||||
private softDeletionInterval: NodeJS.Timer | undefined;
|
|
||||||
|
|
||||||
private hardDeletionTimeout: NodeJS.Timeout | undefined;
|
|
||||||
|
|
||||||
private isMainInstance = config.get('generic.instanceType') === 'main';
|
|
||||||
|
|
||||||
private isPruningEnabled = config.getEnv('executions.pruneData');
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
dataSource: DataSource,
|
dataSource: DataSource,
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
|
@ -99,43 +76,6 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
private readonly binaryDataService: BinaryDataService,
|
private readonly binaryDataService: BinaryDataService,
|
||||||
) {
|
) {
|
||||||
super(ExecutionEntity, dataSource.manager);
|
super(ExecutionEntity, dataSource.manager);
|
||||||
|
|
||||||
if (!this.isMainInstance || inTest) return;
|
|
||||||
|
|
||||||
if (this.isPruningEnabled) this.setSoftDeletionInterval();
|
|
||||||
|
|
||||||
this.scheduleHardDeletion();
|
|
||||||
}
|
|
||||||
|
|
||||||
clearTimers() {
|
|
||||||
if (!this.isMainInstance) return;
|
|
||||||
|
|
||||||
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
|
|
||||||
|
|
||||||
clearInterval(this.softDeletionInterval);
|
|
||||||
clearTimeout(this.hardDeletionTimeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
|
||||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
|
||||||
|
|
||||||
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
|
|
||||||
|
|
||||||
this.softDeletionInterval = setInterval(
|
|
||||||
async () => this.softDeleteOnPruningCycle(),
|
|
||||||
this.rates.softDeletion,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
|
||||||
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
|
||||||
|
|
||||||
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
|
|
||||||
|
|
||||||
this.hardDeletionTimeout = setTimeout(
|
|
||||||
async () => this.hardDeleteOnPruningCycle(),
|
|
||||||
this.rates.hardDeletion,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async findMultipleExecutions(
|
async findMultipleExecutions(
|
||||||
|
@ -478,115 +418,4 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
await this.delete(batch);
|
await this.delete(batch);
|
||||||
} while (executionIds.length > 0);
|
} while (executionIds.length > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark executions as deleted based on age and count, in a pruning cycle.
|
|
||||||
*/
|
|
||||||
async softDeleteOnPruningCycle() {
|
|
||||||
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
|
|
||||||
|
|
||||||
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
|
||||||
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
|
||||||
|
|
||||||
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
|
|
||||||
const date = new Date();
|
|
||||||
date.setHours(date.getHours() - maxAge);
|
|
||||||
|
|
||||||
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
|
|
||||||
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
|
|
||||||
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
|
|
||||||
];
|
|
||||||
|
|
||||||
if (maxCount > 0) {
|
|
||||||
const executions = await this.find({
|
|
||||||
select: ['id'],
|
|
||||||
skip: maxCount,
|
|
||||||
take: 1,
|
|
||||||
order: { id: 'DESC' },
|
|
||||||
});
|
|
||||||
|
|
||||||
if (executions[0]) {
|
|
||||||
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const [timeBasedWhere, countBasedWhere] = toPrune;
|
|
||||||
|
|
||||||
const result = await this.createQueryBuilder()
|
|
||||||
.update(ExecutionEntity)
|
|
||||||
.set({ deletedAt: new Date() })
|
|
||||||
.where({
|
|
||||||
deletedAt: IsNull(),
|
|
||||||
// Only mark executions as deleted if they are in an end state
|
|
||||||
status: Not(In(['new', 'running', 'waiting'])),
|
|
||||||
})
|
|
||||||
.andWhere(
|
|
||||||
new Brackets((qb) =>
|
|
||||||
countBasedWhere
|
|
||||||
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
|
||||||
: qb.where(timeBasedWhere),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
if (result.affected === 0) {
|
|
||||||
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
|
|
||||||
*/
|
|
||||||
private async hardDeleteOnPruningCycle() {
|
|
||||||
const date = new Date();
|
|
||||||
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));
|
|
||||||
|
|
||||||
const workflowIdsAndExecutionIds = (
|
|
||||||
await this.find({
|
|
||||||
select: ['workflowId', 'id'],
|
|
||||||
where: {
|
|
||||||
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
|
||||||
},
|
|
||||||
take: this.hardDeletionBatchSize,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @important This ensures soft-deleted executions are included,
|
|
||||||
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
|
|
||||||
*/
|
|
||||||
withDeleted: true,
|
|
||||||
})
|
|
||||||
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));
|
|
||||||
|
|
||||||
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
|
||||||
|
|
||||||
if (executionIds.length === 0) {
|
|
||||||
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
|
|
||||||
this.scheduleHardDeletion();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
|
|
||||||
executionIds,
|
|
||||||
});
|
|
||||||
|
|
||||||
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
|
||||||
|
|
||||||
await this.delete({ id: In(executionIds) });
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
|
|
||||||
executionIds,
|
|
||||||
error: error instanceof Error ? error.message : `${error}`,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For next batch, speed up hard-deletion cycle in high-volume case
|
|
||||||
* to prevent high concurrency from causing duplicate deletions.
|
|
||||||
*/
|
|
||||||
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
|
|
||||||
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
|
|
||||||
|
|
||||||
this.scheduleHardDeletion(rate);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,10 +13,14 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher {
|
||||||
|
|
||||||
private leaderId: string | undefined;
|
private leaderId: string | undefined;
|
||||||
|
|
||||||
private get isLeader() {
|
public get isLeader() {
|
||||||
return this.id === this.leaderId;
|
return this.id === this.leaderId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public get isFollower() {
|
||||||
|
return !this.isLeader;
|
||||||
|
}
|
||||||
|
|
||||||
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
private readonly leaderKey = getRedisPrefix() + ':main_instance_leader';
|
||||||
|
|
||||||
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
|
private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl');
|
||||||
|
|
208
packages/cli/src/services/pruning.service.ts
Normal file
208
packages/cli/src/services/pruning.service.ts
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
import Container, { Service } from 'typedi';
|
||||||
|
import { BinaryDataService } from 'n8n-core';
|
||||||
|
import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm';
|
||||||
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
|
import type { FindOptionsWhere } from 'typeorm';
|
||||||
|
|
||||||
|
import { TIME, inTest } from '@/constants';
|
||||||
|
import config from '@/config';
|
||||||
|
import { ExecutionRepository } from '@/databases/repositories';
|
||||||
|
import { Logger } from '@/Logger';
|
||||||
|
import { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
|
||||||
|
|
||||||
|
@Service()
|
||||||
|
export class PruningService {
|
||||||
|
private hardDeletionBatchSize = 100;
|
||||||
|
|
||||||
|
private rates: Record<string, number> = {
|
||||||
|
softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE,
|
||||||
|
hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE,
|
||||||
|
};
|
||||||
|
|
||||||
|
public softDeletionInterval: NodeJS.Timer | undefined;
|
||||||
|
|
||||||
|
public hardDeletionTimeout: NodeJS.Timeout | undefined;
|
||||||
|
|
||||||
|
private isMultiMainScenario =
|
||||||
|
config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled');
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly logger: Logger,
|
||||||
|
private readonly executionRepository: ExecutionRepository,
|
||||||
|
private readonly binaryDataService: BinaryDataService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
async isPruningEnabled() {
|
||||||
|
if (
|
||||||
|
!config.getEnv('executions.pruneData') ||
|
||||||
|
inTest ||
|
||||||
|
config.get('generic.instanceType') !== 'main'
|
||||||
|
) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.isMultiMainScenario) {
|
||||||
|
const { MultiMainInstancePublisher } = await import(
|
||||||
|
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||||
|
);
|
||||||
|
|
||||||
|
return Container.get(MultiMainInstancePublisher).isLeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
startPruning() {
|
||||||
|
this.setSoftDeletionInterval();
|
||||||
|
this.scheduleHardDeletion();
|
||||||
|
}
|
||||||
|
|
||||||
|
async stopPruning() {
|
||||||
|
if (this.isMultiMainScenario) {
|
||||||
|
const { MultiMainInstancePublisher } = await import(
|
||||||
|
'@/services/orchestration/main/MultiMainInstance.publisher.ee'
|
||||||
|
);
|
||||||
|
|
||||||
|
if (Container.get(MultiMainInstancePublisher).isFollower) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
|
||||||
|
|
||||||
|
clearInterval(this.softDeletionInterval);
|
||||||
|
clearTimeout(this.hardDeletionTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
||||||
|
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||||
|
|
||||||
|
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
|
||||||
|
|
||||||
|
this.softDeletionInterval = setInterval(
|
||||||
|
async () => this.softDeleteOnPruningCycle(),
|
||||||
|
this.rates.softDeletion,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
||||||
|
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||||
|
|
||||||
|
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
|
||||||
|
|
||||||
|
this.hardDeletionTimeout = setTimeout(
|
||||||
|
async () => this.hardDeleteOnPruningCycle(),
|
||||||
|
this.rates.hardDeletion,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark executions as deleted based on age and count, in a pruning cycle.
|
||||||
|
*/
|
||||||
|
async softDeleteOnPruningCycle() {
|
||||||
|
this.logger.debug('Starting soft-deletion of executions (pruning cycle)');
|
||||||
|
|
||||||
|
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
||||||
|
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
||||||
|
|
||||||
|
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
|
||||||
|
const date = new Date();
|
||||||
|
date.setHours(date.getHours() - maxAge);
|
||||||
|
|
||||||
|
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
|
||||||
|
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
|
||||||
|
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
|
||||||
|
];
|
||||||
|
|
||||||
|
if (maxCount > 0) {
|
||||||
|
const executions = await this.executionRepository.find({
|
||||||
|
select: ['id'],
|
||||||
|
skip: maxCount,
|
||||||
|
take: 1,
|
||||||
|
order: { id: 'DESC' },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (executions[0]) {
|
||||||
|
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const [timeBasedWhere, countBasedWhere] = toPrune;
|
||||||
|
|
||||||
|
const result = await this.executionRepository
|
||||||
|
.createQueryBuilder()
|
||||||
|
.update(ExecutionEntity)
|
||||||
|
.set({ deletedAt: new Date() })
|
||||||
|
.where({
|
||||||
|
deletedAt: IsNull(),
|
||||||
|
// Only mark executions as deleted if they are in an end state
|
||||||
|
status: Not(In(['new', 'running', 'waiting'])),
|
||||||
|
})
|
||||||
|
.andWhere(
|
||||||
|
new Brackets((qb) =>
|
||||||
|
countBasedWhere
|
||||||
|
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
||||||
|
: qb.where(timeBasedWhere),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
if (result.affected === 0) {
|
||||||
|
this.logger.debug('Found no executions to soft-delete (pruning cycle)');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
|
||||||
|
*/
|
||||||
|
private async hardDeleteOnPruningCycle() {
|
||||||
|
const date = new Date();
|
||||||
|
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));
|
||||||
|
|
||||||
|
const workflowIdsAndExecutionIds = (
|
||||||
|
await this.executionRepository.find({
|
||||||
|
select: ['workflowId', 'id'],
|
||||||
|
where: {
|
||||||
|
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
||||||
|
},
|
||||||
|
take: this.hardDeletionBatchSize,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @important This ensures soft-deleted executions are included,
|
||||||
|
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
|
||||||
|
*/
|
||||||
|
withDeleted: true,
|
||||||
|
})
|
||||||
|
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));
|
||||||
|
|
||||||
|
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
||||||
|
|
||||||
|
if (executionIds.length === 0) {
|
||||||
|
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
|
||||||
|
this.scheduleHardDeletion();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
|
||||||
|
executionIds,
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
||||||
|
|
||||||
|
await this.executionRepository.delete({ id: In(executionIds) });
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
|
||||||
|
executionIds,
|
||||||
|
error: error instanceof Error ? error.message : `${error}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For next batch, speed up hard-deletion cycle in high-volume case
|
||||||
|
* to prevent high concurrency from causing duplicate deletions.
|
||||||
|
*/
|
||||||
|
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
|
||||||
|
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
|
||||||
|
|
||||||
|
this.scheduleHardDeletion(rate);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,24 +1,31 @@
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import * as Db from '@/Db';
|
import * as Db from '@/Db';
|
||||||
|
|
||||||
import * as testDb from '../shared/testDb';
|
import * as testDb from './shared/testDb';
|
||||||
import type { ExecutionStatus } from 'n8n-workflow';
|
import type { ExecutionStatus } from 'n8n-workflow';
|
||||||
import type { ExecutionRepository } from '@/databases/repositories';
|
|
||||||
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
|
import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity';
|
||||||
import { TIME } from '@/constants';
|
import { TIME } from '@/constants';
|
||||||
|
import { PruningService } from '@/services/pruning.service';
|
||||||
|
import { BinaryDataService } from 'n8n-core';
|
||||||
|
import { Logger } from '@/Logger';
|
||||||
|
import { mockInstance } from './shared/utils';
|
||||||
|
|
||||||
describe('softDeleteOnPruningCycle()', () => {
|
describe('softDeleteOnPruningCycle()', () => {
|
||||||
|
let pruningService: PruningService;
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
const yesterday = new Date(Date.now() - TIME.DAY);
|
const yesterday = new Date(Date.now() - TIME.DAY);
|
||||||
let executionRepository: ExecutionRepository;
|
|
||||||
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;
|
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;
|
||||||
|
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
await testDb.init();
|
await testDb.init();
|
||||||
|
|
||||||
const { Execution } = Db.collections;
|
pruningService = new PruningService(
|
||||||
|
mockInstance(Logger),
|
||||||
|
Db.collections.Execution,
|
||||||
|
mockInstance(BinaryDataService),
|
||||||
|
);
|
||||||
|
|
||||||
executionRepository = Execution;
|
|
||||||
workflow = await testDb.createWorkflow();
|
workflow = await testDb.createWorkflow();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -54,7 +61,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -73,7 +80,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -95,7 +102,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -114,7 +121,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -142,7 +149,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -166,7 +173,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -185,7 +192,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
])('should prune %s executions', async (status, attributes) => {
|
])('should prune %s executions', async (status, attributes) => {
|
||||||
const execution = await testDb.createExecution({ status, ...attributes }, workflow);
|
const execution = await testDb.createExecution({ status, ...attributes }, workflow);
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
||||||
|
@ -203,7 +210,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
||||||
await testDb.createSuccessfulExecution(workflow),
|
await testDb.createSuccessfulExecution(workflow),
|
||||||
];
|
];
|
||||||
|
|
||||||
await executionRepository.softDeleteOnPruningCycle();
|
await pruningService.softDeleteOnPruningCycle();
|
||||||
|
|
||||||
const result = await findAllExecutions();
|
const result = await findAllExecutions();
|
||||||
expect(result).toEqual([
|
expect(result).toEqual([
|
Loading…
Reference in a new issue