From 921d213ae5507869bd2627acfa7e8de53bd28b10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 2 Nov 2023 12:24:25 +0100 Subject: [PATCH] refactor(core): Create pruning service (no-changelog) (#7564) https://linear.app/n8n/issue/PAY-954/ensure-only-main-instance-leader-handles-pruning --- packages/cli/src/Db.ts | 5 + packages/cli/src/commands/start.ts | 6 +- .../repositories/execution.repository.ts | 173 +-------------- .../main/MultiMainInstance.publisher.ee.ts | 6 +- packages/cli/src/services/pruning.service.ts | 208 ++++++++++++++++++ ...sitory.test.ts => pruning.service.test.ts} | 33 +-- 6 files changed, 243 insertions(+), 188 deletions(-) create mode 100644 packages/cli/src/services/pruning.service.ts rename packages/cli/test/integration/{repositories/execution.repository.test.ts => pruning.service.test.ts} (88%) diff --git a/packages/cli/src/Db.ts b/packages/cli/src/Db.ts index daebfa031d..2b5432a23f 100644 --- a/packages/cli/src/Db.ts +++ b/packages/cli/src/Db.ts @@ -41,6 +41,7 @@ import { WorkflowStatisticsRepository, WorkflowTagMappingRepository, } from '@db/repositories'; +import { PruningService } from '@/services/pruning.service'; export const collections = {} as IDatabaseCollections; @@ -191,6 +192,10 @@ export async function init(testConnectionOptions?: ConnectionOptions): Promise { private hardDeletionBatchSize = 100; - private rates: Record = { - softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE, - hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE, - }; - - private softDeletionInterval: NodeJS.Timer | undefined; - - private hardDeletionTimeout: NodeJS.Timeout | undefined; - - private isMainInstance = config.get('generic.instanceType') === 'main'; - - private isPruningEnabled = config.getEnv('executions.pruneData'); - constructor( dataSource: DataSource, private readonly logger: Logger, @@ -99,43 +76,6 @@ export class ExecutionRepository extends Repository { private readonly binaryDataService: BinaryDataService, ) { super(ExecutionEntity, dataSource.manager); - - if (!this.isMainInstance || inTest) return; - - if (this.isPruningEnabled) this.setSoftDeletionInterval(); - - this.scheduleHardDeletion(); - } - - clearTimers() { - if (!this.isMainInstance) return; - - this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)'); - - clearInterval(this.softDeletionInterval); - clearTimeout(this.hardDeletionTimeout); - } - - setSoftDeletionInterval(rateMs = this.rates.softDeletion) { - const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); - - this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`); - - this.softDeletionInterval = setInterval( - async () => this.softDeleteOnPruningCycle(), - this.rates.softDeletion, - ); - } - - scheduleHardDeletion(rateMs = this.rates.hardDeletion) { - const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); - - this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`); - - this.hardDeletionTimeout = setTimeout( - async () => this.hardDeleteOnPruningCycle(), - this.rates.hardDeletion, - ); } async findMultipleExecutions( @@ -478,115 +418,4 @@ export class ExecutionRepository extends Repository { await this.delete(batch); } while (executionIds.length > 0); } - - /** - * Mark executions as deleted based on age and count, in a pruning cycle. - */ - async softDeleteOnPruningCycle() { - this.logger.debug('Starting soft-deletion of executions (pruning cycle)'); - - const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h - const maxCount = config.getEnv('executions.pruneDataMaxCount'); - - // Find ids of all executions that were stopped longer that pruneDataMaxAge ago - const date = new Date(); - date.setHours(date.getHours() - maxAge); - - const toPrune: Array> = [ - // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 - { stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) }, - ]; - - if (maxCount > 0) { - const executions = await this.find({ - select: ['id'], - skip: maxCount, - take: 1, - order: { id: 'DESC' }, - }); - - if (executions[0]) { - toPrune.push({ id: LessThanOrEqual(executions[0].id) }); - } - } - - const [timeBasedWhere, countBasedWhere] = toPrune; - - const result = await this.createQueryBuilder() - .update(ExecutionEntity) - .set({ deletedAt: new Date() }) - .where({ - deletedAt: IsNull(), - // Only mark executions as deleted if they are in an end state - status: Not(In(['new', 'running', 'waiting'])), - }) - .andWhere( - new Brackets((qb) => - countBasedWhere - ? qb.where(timeBasedWhere).orWhere(countBasedWhere) - : qb.where(timeBasedWhere), - ), - ) - .execute(); - - if (result.affected === 0) { - this.logger.debug('Found no executions to soft-delete (pruning cycle)'); - } - } - - /** - * Permanently remove all soft-deleted executions and their binary data, in a pruning cycle. - */ - private async hardDeleteOnPruningCycle() { - const date = new Date(); - date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer')); - - const workflowIdsAndExecutionIds = ( - await this.find({ - select: ['workflowId', 'id'], - where: { - deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)), - }, - take: this.hardDeletionBatchSize, - - /** - * @important This ensures soft-deleted executions are included, - * else `@DeleteDateColumn()` at `deletedAt` will exclude them. - */ - withDeleted: true, - }) - ).map(({ id: executionId, workflowId }) => ({ workflowId, executionId })); - - const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId); - - if (executionIds.length === 0) { - this.logger.debug('Found no executions to hard-delete (pruning cycle)'); - this.scheduleHardDeletion(); - return; - } - - try { - this.logger.debug('Starting hard-deletion of executions (pruning cycle)', { - executionIds, - }); - - await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); - - await this.delete({ id: In(executionIds) }); - } catch (error) { - this.logger.error('Failed to hard-delete executions (pruning cycle)', { - executionIds, - error: error instanceof Error ? error.message : `${error}`, - }); - } - - /** - * For next batch, speed up hard-deletion cycle in high-volume case - * to prevent high concurrency from causing duplicate deletions. - */ - const isHighVolume = executionIds.length >= this.hardDeletionBatchSize; - const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; - - this.scheduleHardDeletion(rate); - } } diff --git a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts index 2c28fa61c8..1c7af70d4c 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainInstance.publisher.ee.ts @@ -13,10 +13,14 @@ export class MultiMainInstancePublisher extends SingleMainInstancePublisher { private leaderId: string | undefined; - private get isLeader() { + public get isLeader() { return this.id === this.leaderId; } + public get isFollower() { + return !this.isLeader; + } + private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; private readonly leaderKeyTtl = config.getEnv('leaderSelection.ttl'); diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts new file mode 100644 index 0000000000..7a5a36062c --- /dev/null +++ b/packages/cli/src/services/pruning.service.ts @@ -0,0 +1,208 @@ +import Container, { Service } from 'typedi'; +import { BinaryDataService } from 'n8n-core'; +import { LessThanOrEqual, IsNull, Not, In, Brackets } from 'typeorm'; +import { DateUtils } from 'typeorm/util/DateUtils'; +import type { FindOptionsWhere } from 'typeorm'; + +import { TIME, inTest } from '@/constants'; +import config from '@/config'; +import { ExecutionRepository } from '@/databases/repositories'; +import { Logger } from '@/Logger'; +import { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; + +@Service() +export class PruningService { + private hardDeletionBatchSize = 100; + + private rates: Record = { + softDeletion: config.getEnv('executions.pruneDataIntervals.softDelete') * TIME.MINUTE, + hardDeletion: config.getEnv('executions.pruneDataIntervals.hardDelete') * TIME.MINUTE, + }; + + public softDeletionInterval: NodeJS.Timer | undefined; + + public hardDeletionTimeout: NodeJS.Timeout | undefined; + + private isMultiMainScenario = + config.getEnv('executions.mode') === 'queue' && config.getEnv('leaderSelection.enabled'); + + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly binaryDataService: BinaryDataService, + ) {} + + async isPruningEnabled() { + if ( + !config.getEnv('executions.pruneData') || + inTest || + config.get('generic.instanceType') !== 'main' + ) { + return false; + } + + if (this.isMultiMainScenario) { + const { MultiMainInstancePublisher } = await import( + '@/services/orchestration/main/MultiMainInstance.publisher.ee' + ); + + return Container.get(MultiMainInstancePublisher).isLeader; + } + + return true; + } + + startPruning() { + this.setSoftDeletionInterval(); + this.scheduleHardDeletion(); + } + + async stopPruning() { + if (this.isMultiMainScenario) { + const { MultiMainInstancePublisher } = await import( + '@/services/orchestration/main/MultiMainInstance.publisher.ee' + ); + + if (Container.get(MultiMainInstancePublisher).isFollower) return; + } + + this.logger.debug('Clearing soft-deletion interval and hard-deletion timeout (pruning cycle)'); + + clearInterval(this.softDeletionInterval); + clearTimeout(this.hardDeletionTimeout); + } + + private setSoftDeletionInterval(rateMs = this.rates.softDeletion) { + const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); + + this.logger.debug(`Setting soft-deletion interval at every ${when} (pruning cycle)`); + + this.softDeletionInterval = setInterval( + async () => this.softDeleteOnPruningCycle(), + this.rates.softDeletion, + ); + } + + private scheduleHardDeletion(rateMs = this.rates.hardDeletion) { + const when = [(rateMs / TIME.MINUTE).toFixed(2), 'min'].join(' '); + + this.logger.debug(`Scheduling hard-deletion for next ${when} (pruning cycle)`); + + this.hardDeletionTimeout = setTimeout( + async () => this.hardDeleteOnPruningCycle(), + this.rates.hardDeletion, + ); + } + + /** + * Mark executions as deleted based on age and count, in a pruning cycle. + */ + async softDeleteOnPruningCycle() { + this.logger.debug('Starting soft-deletion of executions (pruning cycle)'); + + const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h + const maxCount = config.getEnv('executions.pruneDataMaxCount'); + + // Find ids of all executions that were stopped longer that pruneDataMaxAge ago + const date = new Date(); + date.setHours(date.getHours() - maxAge); + + const toPrune: Array> = [ + // date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286 + { stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) }, + ]; + + if (maxCount > 0) { + const executions = await this.executionRepository.find({ + select: ['id'], + skip: maxCount, + take: 1, + order: { id: 'DESC' }, + }); + + if (executions[0]) { + toPrune.push({ id: LessThanOrEqual(executions[0].id) }); + } + } + + const [timeBasedWhere, countBasedWhere] = toPrune; + + const result = await this.executionRepository + .createQueryBuilder() + .update(ExecutionEntity) + .set({ deletedAt: new Date() }) + .where({ + deletedAt: IsNull(), + // Only mark executions as deleted if they are in an end state + status: Not(In(['new', 'running', 'waiting'])), + }) + .andWhere( + new Brackets((qb) => + countBasedWhere + ? qb.where(timeBasedWhere).orWhere(countBasedWhere) + : qb.where(timeBasedWhere), + ), + ) + .execute(); + + if (result.affected === 0) { + this.logger.debug('Found no executions to soft-delete (pruning cycle)'); + } + } + + /** + * Permanently remove all soft-deleted executions and their binary data, in a pruning cycle. + */ + private async hardDeleteOnPruningCycle() { + const date = new Date(); + date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer')); + + const workflowIdsAndExecutionIds = ( + await this.executionRepository.find({ + select: ['workflowId', 'id'], + where: { + deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)), + }, + take: this.hardDeletionBatchSize, + + /** + * @important This ensures soft-deleted executions are included, + * else `@DeleteDateColumn()` at `deletedAt` will exclude them. + */ + withDeleted: true, + }) + ).map(({ id: executionId, workflowId }) => ({ workflowId, executionId })); + + const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId); + + if (executionIds.length === 0) { + this.logger.debug('Found no executions to hard-delete (pruning cycle)'); + this.scheduleHardDeletion(); + return; + } + + try { + this.logger.debug('Starting hard-deletion of executions (pruning cycle)', { + executionIds, + }); + + await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds); + + await this.executionRepository.delete({ id: In(executionIds) }); + } catch (error) { + this.logger.error('Failed to hard-delete executions (pruning cycle)', { + executionIds, + error: error instanceof Error ? error.message : `${error}`, + }); + } + + /** + * For next batch, speed up hard-deletion cycle in high-volume case + * to prevent high concurrency from causing duplicate deletions. + */ + const isHighVolume = executionIds.length >= this.hardDeletionBatchSize; + const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; + + this.scheduleHardDeletion(rate); + } +} diff --git a/packages/cli/test/integration/repositories/execution.repository.test.ts b/packages/cli/test/integration/pruning.service.test.ts similarity index 88% rename from packages/cli/test/integration/repositories/execution.repository.test.ts rename to packages/cli/test/integration/pruning.service.test.ts index 27aec5faa2..630f3b4777 100644 --- a/packages/cli/test/integration/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/pruning.service.test.ts @@ -1,24 +1,31 @@ import config from '@/config'; import * as Db from '@/Db'; -import * as testDb from '../shared/testDb'; +import * as testDb from './shared/testDb'; import type { ExecutionStatus } from 'n8n-workflow'; -import type { ExecutionRepository } from '@/databases/repositories'; import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; import { TIME } from '@/constants'; +import { PruningService } from '@/services/pruning.service'; +import { BinaryDataService } from 'n8n-core'; +import { Logger } from '@/Logger'; +import { mockInstance } from './shared/utils'; describe('softDeleteOnPruningCycle()', () => { + let pruningService: PruningService; + const now = new Date(); const yesterday = new Date(Date.now() - TIME.DAY); - let executionRepository: ExecutionRepository; let workflow: Awaited>; beforeAll(async () => { await testDb.init(); - const { Execution } = Db.collections; + pruningService = new PruningService( + mockInstance(Logger), + Db.collections.Execution, + mockInstance(BinaryDataService), + ); - executionRepository = Execution; workflow = await testDb.createWorkflow(); }); @@ -54,7 +61,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -73,7 +80,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -95,7 +102,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -114,7 +121,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -142,7 +149,7 @@ describe('softDeleteOnPruningCycle()', () => { ), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -166,7 +173,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -185,7 +192,7 @@ describe('softDeleteOnPruningCycle()', () => { ])('should prune %s executions', async (status, attributes) => { const execution = await testDb.createExecution({ status, ...attributes }, workflow); - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -203,7 +210,7 @@ describe('softDeleteOnPruningCycle()', () => { await testDb.createSuccessfulExecution(workflow), ]; - await executionRepository.softDeleteOnPruningCycle(); + await pruningService.softDeleteOnPruningCycle(); const result = await findAllExecutions(); expect(result).toEqual([