mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
refactor(core): Make executions pruning more resilient (#7480)
This PR converts the hard-deletion interval to a timeout: - to prevent the interval from not being restored when hard deletion throws, and - to prevent a long-running hard deletion from leading to duplicate deletions.
This commit is contained in:
parent
05e6f2a6ac
commit
671c95760b
|
@ -79,18 +79,17 @@ function parseFiltersToQueryBuilder(
|
||||||
export class ExecutionRepository extends Repository<ExecutionEntity> {
|
export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
private logger = Logger;
|
private logger = Logger;
|
||||||
|
|
||||||
deletionBatchSize = 100;
|
private hardDeletionBatchSize = 100;
|
||||||
|
|
||||||
private intervals: Record<string, NodeJS.Timer | undefined> = {
|
|
||||||
softDeletion: undefined,
|
|
||||||
hardDeletion: undefined,
|
|
||||||
};
|
|
||||||
|
|
||||||
private rates: Record<string, number> = {
|
private rates: Record<string, number> = {
|
||||||
softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE,
|
softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE,
|
||||||
hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE,
|
hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private softDeletionInterval: NodeJS.Timer | undefined;
|
||||||
|
|
||||||
|
private hardDeletionTimeout: NodeJS.Timeout | undefined;
|
||||||
|
|
||||||
private isMainInstance = config.get('generic.instanceType') === 'main';
|
private isMainInstance = config.get('generic.instanceType') === 'main';
|
||||||
|
|
||||||
private isPruningEnabled = config.getEnv('executions.pruneData');
|
private isPruningEnabled = config.getEnv('executions.pruneData');
|
||||||
|
@ -106,39 +105,35 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
|
|
||||||
if (this.isPruningEnabled) this.setSoftDeletionInterval();
|
if (this.isPruningEnabled) this.setSoftDeletionInterval();
|
||||||
|
|
||||||
this.setHardDeletionInterval();
|
this.scheduleHardDeletion();
|
||||||
}
|
}
|
||||||
|
|
||||||
clearTimers() {
|
clearTimers() {
|
||||||
if (!this.isMainInstance) return;
|
if (!this.isMainInstance) return;
|
||||||
|
|
||||||
this.logger.debug('Clearing soft-deletion and hard-deletion intervals for executions');
|
this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)');
|
||||||
|
|
||||||
clearInterval(this.intervals.softDeletion);
|
clearInterval(this.softDeletionInterval);
|
||||||
clearInterval(this.intervals.hardDeletion);
|
clearTimeout(this.hardDeletionTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
setSoftDeletionInterval() {
|
setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
|
||||||
this.logger.debug(
|
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||||
`Setting soft-deletion interval (pruning) for executions every ${
|
|
||||||
this.rates.softDeletion / TIME.MINUTE
|
|
||||||
} min`,
|
|
||||||
);
|
|
||||||
|
|
||||||
this.intervals.softDeletion = setInterval(
|
this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`);
|
||||||
|
|
||||||
|
this.softDeletionInterval = setInterval(
|
||||||
async () => this.softDeleteOnPruningCycle(),
|
async () => this.softDeleteOnPruningCycle(),
|
||||||
this.rates.softDeletion,
|
this.rates.softDeletion,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
setHardDeletionInterval() {
|
scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
|
||||||
this.logger.debug(
|
const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' ');
|
||||||
`Setting hard-deletion interval for executions every ${
|
|
||||||
this.rates.hardDeletion / TIME.MINUTE
|
|
||||||
} min`,
|
|
||||||
);
|
|
||||||
|
|
||||||
this.intervals.hardDeletion = setInterval(
|
this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`);
|
||||||
|
|
||||||
|
this.hardDeletionTimeout = setTimeout(
|
||||||
async () => this.hardDeleteOnPruningCycle(),
|
async () => this.hardDeleteOnPruningCycle(),
|
||||||
this.rates.hardDeletion,
|
this.rates.hardDeletion,
|
||||||
);
|
);
|
||||||
|
@ -476,7 +471,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
const executionIds = executions.map(({ id }) => id);
|
const executionIds = executions.map(({ id }) => id);
|
||||||
do {
|
do {
|
||||||
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
|
// Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error
|
||||||
const batch = executionIds.splice(0, this.deletionBatchSize);
|
const batch = executionIds.splice(0, this.hardDeletionBatchSize);
|
||||||
await this.delete(batch);
|
await this.delete(batch);
|
||||||
} while (executionIds.length > 0);
|
} while (executionIds.length > 0);
|
||||||
}
|
}
|
||||||
|
@ -485,7 +480,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
* Mark executions as deleted based on age and count, in a pruning cycle.
|
* Mark executions as deleted based on age and count, in a pruning cycle.
|
||||||
*/
|
*/
|
||||||
async softDeleteOnPruningCycle() {
|
async softDeleteOnPruningCycle() {
|
||||||
Logger.verbose('Soft-deleting execution data from database (pruning cycle)');
|
Logger.debug('Starting soft-deletion of executions (pruning cycle)');
|
||||||
|
|
||||||
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
||||||
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
||||||
|
@ -514,7 +509,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
|
|
||||||
const [timeBasedWhere, countBasedWhere] = toPrune;
|
const [timeBasedWhere, countBasedWhere] = toPrune;
|
||||||
|
|
||||||
await this.createQueryBuilder()
|
const result = await this.createQueryBuilder()
|
||||||
.update(ExecutionEntity)
|
.update(ExecutionEntity)
|
||||||
.set({ deletedAt: new Date() })
|
.set({ deletedAt: new Date() })
|
||||||
.where({
|
.where({
|
||||||
|
@ -530,6 +525,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
|
if (result.affected === 0) {
|
||||||
|
Logger.debug('Found no executions to soft-delete (pruning cycle)');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -545,7 +544,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
where: {
|
where: {
|
||||||
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
||||||
},
|
},
|
||||||
take: this.deletionBatchSize,
|
take: this.hardDeletionBatchSize,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @important This ensures soft-deleted executions are included,
|
* @important This ensures soft-deleted executions are included,
|
||||||
|
@ -558,38 +557,33 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
||||||
|
|
||||||
if (executionIds.length === 0) {
|
if (executionIds.length === 0) {
|
||||||
this.logger.debug('Found no executions to hard-delete from database');
|
this.logger.debug('Found no executions to hard-delete (pruning cycle)');
|
||||||
|
this.scheduleHardDeletion();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); // only in FS mode
|
try {
|
||||||
|
this.logger.debug('Starting hard-deletion of executions (pruning cycle)', {
|
||||||
this.logger.debug(
|
|
||||||
`Hard-deleting ${executionIds.length} executions from database (pruning cycle)`,
|
|
||||||
{
|
|
||||||
executionIds,
|
executionIds,
|
||||||
},
|
});
|
||||||
);
|
|
||||||
|
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
||||||
|
|
||||||
// Actually delete these executions
|
|
||||||
await this.delete({ id: In(executionIds) });
|
await this.delete({ id: In(executionIds) });
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error('Failed to hard-delete executions (pruning cycle)', {
|
||||||
|
executionIds,
|
||||||
|
error: error instanceof Error ? error.message : `${error}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the volume of executions to prune is as high as the batch size, there is a risk
|
* For next batch, speed up hard-deletion cycle in high-volume case
|
||||||
* that the pruning process is unable to catch up to the creation of new executions,
|
* to prevent high concurrency from causing duplicate deletions.
|
||||||
* with high concurrency possibly leading to errors from duplicate deletions.
|
|
||||||
*
|
|
||||||
* Therefore, in this high-volume case we speed up the hard deletion cycle, until
|
|
||||||
* the number of executions to prune is low enough to fit in a single batch.
|
|
||||||
*/
|
*/
|
||||||
if (executionIds.length === this.deletionBatchSize) {
|
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
|
||||||
clearInterval(this.intervals.hardDeletion);
|
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
|
||||||
|
|
||||||
setTimeout(async () => this.hardDeleteOnPruningCycle(), 1 * TIME.SECOND);
|
this.scheduleHardDeletion(rate);
|
||||||
} else {
|
|
||||||
if (this.intervals.hardDeletion) return;
|
|
||||||
|
|
||||||
this.setHardDeletionInterval();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import type { ExecutionRepository } from '../../../src/databases/repositories';
|
||||||
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
|
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
|
||||||
import { TIME } from '../../../src/constants';
|
import { TIME } from '../../../src/constants';
|
||||||
|
|
||||||
describe('ExecutionRepository.prune()', () => {
|
describe('softDeleteOnPruningCycle()', () => {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
const yesterday = new Date(Date.now() - TIME.DAY);
|
const yesterday = new Date(Date.now() - TIME.DAY);
|
||||||
let executionRepository: ExecutionRepository;
|
let executionRepository: ExecutionRepository;
|
||||||
|
|
Loading…
Reference in a new issue