mirror of
https://github.com/n8n-io/n8n.git
synced 2025-02-21 02:56:40 -08:00
refactor(core): Move typeorm
operators from PruningService
to ExecutionRepository
(no-changelog) (#8145)
Follow-up to https://github.com/n8n-io/n8n/pull/8143
This commit is contained in:
parent
a59d78de18
commit
7b26a7a621
|
@ -1,5 +1,14 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
|
import {
|
||||||
|
Brackets,
|
||||||
|
DataSource,
|
||||||
|
In,
|
||||||
|
IsNull,
|
||||||
|
LessThanOrEqual,
|
||||||
|
MoreThanOrEqual,
|
||||||
|
Not,
|
||||||
|
Repository,
|
||||||
|
} from 'typeorm';
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
||||||
import type {
|
import type {
|
||||||
FindManyOptions,
|
FindManyOptions,
|
||||||
|
@ -434,4 +443,77 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
},
|
},
|
||||||
}).then((executions) => executions.map(({ id }) => id));
|
}).then((executions) => executions.map(({ id }) => id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async softDeletePrunableExecutions() {
|
||||||
|
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
||||||
|
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
||||||
|
|
||||||
|
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
|
||||||
|
const date = new Date();
|
||||||
|
date.setHours(date.getHours() - maxAge);
|
||||||
|
|
||||||
|
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
|
||||||
|
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
|
||||||
|
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
|
||||||
|
];
|
||||||
|
|
||||||
|
if (maxCount > 0) {
|
||||||
|
const executions = await this.find({
|
||||||
|
select: ['id'],
|
||||||
|
skip: maxCount,
|
||||||
|
take: 1,
|
||||||
|
order: { id: 'DESC' },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (executions[0]) {
|
||||||
|
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const [timeBasedWhere, countBasedWhere] = toPrune;
|
||||||
|
|
||||||
|
return this.createQueryBuilder()
|
||||||
|
.update(ExecutionEntity)
|
||||||
|
.set({ deletedAt: new Date() })
|
||||||
|
.where({
|
||||||
|
deletedAt: IsNull(),
|
||||||
|
// Only mark executions as deleted if they are in an end state
|
||||||
|
status: Not(In(['new', 'running', 'waiting'])),
|
||||||
|
})
|
||||||
|
.andWhere(
|
||||||
|
new Brackets((qb) =>
|
||||||
|
countBasedWhere
|
||||||
|
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
||||||
|
: qb.where(timeBasedWhere),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
async hardDeleteSoftDeletedExecutions() {
|
||||||
|
const date = new Date();
|
||||||
|
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));
|
||||||
|
|
||||||
|
const workflowIdsAndExecutionIds = (
|
||||||
|
await this.find({
|
||||||
|
select: ['workflowId', 'id'],
|
||||||
|
where: {
|
||||||
|
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
||||||
|
},
|
||||||
|
take: this.hardDeletionBatchSize,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @important This ensures soft-deleted executions are included,
|
||||||
|
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
|
||||||
|
*/
|
||||||
|
withDeleted: true,
|
||||||
|
})
|
||||||
|
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));
|
||||||
|
|
||||||
|
return workflowIdsAndExecutionIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
async deleteByIds(executionIds: string[]) {
|
||||||
|
return this.delete({ id: In(executionIds) });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,9 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { BinaryDataService } from 'n8n-core';
|
import { BinaryDataService } from 'n8n-core';
|
||||||
import type { FindOptionsWhere } from 'typeorm';
|
|
||||||
import { Brackets, In, IsNull, LessThanOrEqual, Not } from 'typeorm';
|
|
||||||
import { DateUtils } from 'typeorm/util/DateUtils';
|
|
||||||
|
|
||||||
import { inTest, TIME } from '@/constants';
|
import { inTest, TIME } from '@/constants';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
import { ExecutionRepository } from '@db/repositories/execution.repository';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
|
|
||||||
import { jsonStringify } from 'n8n-workflow';
|
import { jsonStringify } from 'n8n-workflow';
|
||||||
import { OnShutdown } from '@/decorators/OnShutdown';
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||||
|
|
||||||
|
@ -113,50 +108,7 @@ export class PruningService {
|
||||||
async softDeleteOnPruningCycle() {
|
async softDeleteOnPruningCycle() {
|
||||||
this.logger.debug('[Pruning] Starting soft-deletion of executions');
|
this.logger.debug('[Pruning] Starting soft-deletion of executions');
|
||||||
|
|
||||||
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
|
const result = await this.executionRepository.softDeletePrunableExecutions();
|
||||||
const maxCount = config.getEnv('executions.pruneDataMaxCount');
|
|
||||||
|
|
||||||
// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
|
|
||||||
const date = new Date();
|
|
||||||
date.setHours(date.getHours() - maxAge);
|
|
||||||
|
|
||||||
const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
|
|
||||||
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
|
|
||||||
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
|
|
||||||
];
|
|
||||||
|
|
||||||
if (maxCount > 0) {
|
|
||||||
const executions = await this.executionRepository.find({
|
|
||||||
select: ['id'],
|
|
||||||
skip: maxCount,
|
|
||||||
take: 1,
|
|
||||||
order: { id: 'DESC' },
|
|
||||||
});
|
|
||||||
|
|
||||||
if (executions[0]) {
|
|
||||||
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const [timeBasedWhere, countBasedWhere] = toPrune;
|
|
||||||
|
|
||||||
const result = await this.executionRepository
|
|
||||||
.createQueryBuilder()
|
|
||||||
.update(ExecutionEntity)
|
|
||||||
.set({ deletedAt: new Date() })
|
|
||||||
.where({
|
|
||||||
deletedAt: IsNull(),
|
|
||||||
// Only mark executions as deleted if they are in an end state
|
|
||||||
status: Not(In(['new', 'running', 'waiting'])),
|
|
||||||
})
|
|
||||||
.andWhere(
|
|
||||||
new Brackets((qb) =>
|
|
||||||
countBasedWhere
|
|
||||||
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
|
|
||||||
: qb.where(timeBasedWhere),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
if (result.affected === 0) {
|
if (result.affected === 0) {
|
||||||
this.logger.debug('[Pruning] Found no executions to soft-delete');
|
this.logger.debug('[Pruning] Found no executions to soft-delete');
|
||||||
|
@ -177,40 +129,22 @@ export class PruningService {
|
||||||
* @return Delay in ms after which the next cycle should be started
|
* @return Delay in ms after which the next cycle should be started
|
||||||
*/
|
*/
|
||||||
private async hardDeleteOnPruningCycle() {
|
private async hardDeleteOnPruningCycle() {
|
||||||
const date = new Date();
|
const ids = await this.executionRepository.hardDeleteSoftDeletedExecutions();
|
||||||
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));
|
|
||||||
|
|
||||||
const workflowIdsAndExecutionIds = (
|
const executionIds = ids.map((o) => o.executionId);
|
||||||
await this.executionRepository.find({
|
|
||||||
select: ['workflowId', 'id'],
|
|
||||||
where: {
|
|
||||||
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
|
|
||||||
},
|
|
||||||
take: this.hardDeletionBatchSize,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @important This ensures soft-deleted executions are included,
|
|
||||||
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
|
|
||||||
*/
|
|
||||||
withDeleted: true,
|
|
||||||
})
|
|
||||||
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));
|
|
||||||
|
|
||||||
const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
|
|
||||||
|
|
||||||
if (executionIds.length === 0) {
|
if (executionIds.length === 0) {
|
||||||
this.logger.debug('[Pruning] Found no executions to hard-delete');
|
this.logger.debug('[Pruning] Found no executions to hard-delete');
|
||||||
|
|
||||||
return this.rates.hardDeletion;
|
return this.rates.hardDeletion;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
|
this.logger.debug('[Pruning] Starting hard-deletion of executions', { executionIds });
|
||||||
executionIds,
|
|
||||||
});
|
|
||||||
|
|
||||||
await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
|
await this.binaryDataService.deleteMany(ids);
|
||||||
|
|
||||||
await this.executionRepository.delete({ id: In(executionIds) });
|
await this.executionRepository.deleteByIds(executionIds);
|
||||||
|
|
||||||
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
|
this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
@ -225,7 +159,7 @@ export class PruningService {
|
||||||
* to prevent high concurrency from causing duplicate deletions.
|
* to prevent high concurrency from causing duplicate deletions.
|
||||||
*/
|
*/
|
||||||
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
|
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
|
||||||
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
|
|
||||||
return rate;
|
return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue