refactor(core): Minor improvements to pruning service (#11578)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Iván Ovejero 2024-11-06 13:16:23 +01:00 committed by GitHub
parent 9468eea405
commit befa26f89a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 111 additions and 142 deletions

View file

@ -127,6 +127,9 @@ export const TIME = {
* Eventually this will superseed `TIME` above
*/
export const Time = {
milliseconds: {
toMinutes: 1 / (60 * 1000),
},
seconds: {
toMilliseconds: 1000,
},

View file

@ -20,7 +20,7 @@ export class OrchestrationService {
private subscriber: Subscriber;
protected isInitialized = false;
isInitialized = false;
private isMultiMainSetupLicensed = false;

View file

@ -1,4 +1,4 @@
import type { GlobalConfig } from '@n8n/config';
import type { PruningConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
@ -8,9 +8,13 @@ import { mockLogger } from '@test/mocking';
import { PruningService } from '../pruning.service';
jest.mock('@/db', () => ({
connectionState: { migrated: true },
}));
describe('PruningService', () => {
describe('init', () => {
it('should start pruning if leader', () => {
it('should start pruning on main instance that is the leader', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true }),
@ -29,7 +33,7 @@ describe('PruningService', () => {
expect(startPruningSpy).toHaveBeenCalled();
});
it('should not start pruning if follower', () => {
it('should not start pruning on main instance that is a follower', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: false }),
@ -48,7 +52,7 @@ describe('PruningService', () => {
expect(startPruningSpy).not.toHaveBeenCalled();
});
it('should register leadership events if multi-main setup is enabled', () => {
it('should register leadership events if main on multi-main setup', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true }),
@ -88,13 +92,10 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: true } }),
mock<PruningConfig>({ isEnabled: true }),
);
// @ts-expect-error Private method
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(true);
expect(pruningService.isEnabled).toBe(true);
});
it('should return `false` based on config if leader main', () => {
@ -107,16 +108,13 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: false } }),
mock<PruningConfig>({ isEnabled: false }),
);
// @ts-expect-error Private method
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
expect(pruningService.isEnabled).toBe(false);
});
it('should return `false` if non-main even if enabled', () => {
it('should return `false` if non-main even if config is enabled', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker' }),
@ -126,16 +124,13 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: true } }),
mock<PruningConfig>({ isEnabled: true }),
);
// @ts-expect-error Private method
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
expect(pruningService.isEnabled).toBe(false);
});
it('should return `false` if follower main even if enabled', () => {
it('should return `false` if follower main even if config is enabled', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: false, isFollower: true, instanceType: 'main' }),
@ -145,13 +140,10 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: true }, multiMainSetup: { enabled: true } }),
mock<PruningConfig>({ isEnabled: true }),
);
// @ts-expect-error Private method
const isEnabled = pruningService.isEnabled();
expect(isEnabled).toBe(false);
expect(pruningService.isEnabled).toBe(false);
});
});
@ -166,22 +158,25 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: false } }),
mock<PruningConfig>({ isEnabled: false }),
);
const scheduleRollingSoftDeletionsSpy = jest.spyOn(
pruningService,
// @ts-expect-error Private method
'scheduleRollingSoftDeletions',
);
// @ts-expect-error Private method
const setSoftDeletionInterval = jest.spyOn(pruningService, 'setSoftDeletionInterval');
// @ts-expect-error Private method
const scheduleHardDeletion = jest.spyOn(pruningService, 'scheduleHardDeletion');
const scheduleNextHardDeletionSpy = jest.spyOn(pruningService, 'scheduleNextHardDeletion');
pruningService.startPruning();
expect(setSoftDeletionInterval).not.toHaveBeenCalled();
expect(scheduleHardDeletion).not.toHaveBeenCalled();
expect(scheduleRollingSoftDeletionsSpy).not.toHaveBeenCalled();
expect(scheduleNextHardDeletionSpy).not.toHaveBeenCalled();
});
it('should start pruning if service is enabled', () => {
it('should start pruning if service is enabled and DB is migrated', () => {
const pruningService = new PruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main' }),
@ -191,23 +186,23 @@ describe('PruningService', () => {
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>(),
}),
mock<GlobalConfig>({ pruning: { isEnabled: true } }),
mock<PruningConfig>({ isEnabled: true }),
);
const setSoftDeletionInterval = jest
const scheduleRollingSoftDeletionsSpy = jest
// @ts-expect-error Private method
.spyOn(pruningService, 'setSoftDeletionInterval')
.spyOn(pruningService, 'scheduleRollingSoftDeletions')
.mockImplementation();
const scheduleHardDeletion = jest
const scheduleNextHardDeletionSpy = jest
// @ts-expect-error Private method
.spyOn(pruningService, 'scheduleHardDeletion')
.spyOn(pruningService, 'scheduleNextHardDeletion')
.mockImplementation();
pruningService.startPruning();
expect(setSoftDeletionInterval).toHaveBeenCalled();
expect(scheduleHardDeletion).toHaveBeenCalled();
expect(scheduleRollingSoftDeletionsSpy).toHaveBeenCalled();
expect(scheduleNextHardDeletionSpy).toHaveBeenCalled();
});
});
});

View file

@ -1,27 +1,37 @@
import { GlobalConfig } from '@n8n/config';
import { PruningConfig } from '@n8n/config';
import { BinaryDataService, InstanceSettings } from 'n8n-core';
import { jsonStringify } from 'n8n-workflow';
import { ensureError } from 'n8n-workflow';
import { strict } from 'node:assert';
import { Service } from 'typedi';
import { TIME } from '@/constants';
import { Time } from '@/constants';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { connectionState as dbConnectionState } from '@/db';
import { OnShutdown } from '@/decorators/on-shutdown';
import { Logger } from '@/logging/logger.service';
import { OrchestrationService } from '../orchestration.service';
/**
* Responsible for pruning executions from the database and their associated binary data
* from the filesystem, on a rolling basis. By default we soft-delete execution rows
* every cycle and hard-delete them and their binary data every 4th cycle.
*/
@Service()
export class PruningService {
private hardDeletionBatchSize = 100;
/** Timer for soft-deleting executions on a rolling basis. */
private softDeletionInterval: NodeJS.Timer | undefined;
private rates: Record<string, number> = {
softDeletion: this.globalConfig.pruning.softDeleteInterval * TIME.MINUTE,
hardDeletion: this.globalConfig.pruning.hardDeleteInterval * TIME.MINUTE,
/** Timeout for next hard-deletion of soft-deleted executions. */
private hardDeletionTimeout: NodeJS.Timeout | undefined;
private readonly rates = {
softDeletion: this.pruningConfig.softDeleteInterval * Time.minutes.toMilliseconds,
hardDeletion: this.pruningConfig.hardDeleteInterval * Time.minutes.toMilliseconds,
};
public softDeletionInterval: NodeJS.Timer | undefined;
public hardDeletionTimeout: NodeJS.Timeout | undefined;
/** Max number of executions to hard-delete in a cycle. */
private readonly batchSize = 100;
private isShuttingDown = false;
@ -31,103 +41,68 @@ export class PruningService {
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
private readonly orchestrationService: OrchestrationService,
private readonly globalConfig: GlobalConfig,
private readonly pruningConfig: PruningConfig,
) {
this.logger = this.logger.scoped('pruning');
}
/**
* @important Requires `OrchestrationService` to be initialized.
*/
init() {
const { isLeader } = this.instanceSettings;
const { isMultiMainSetupEnabled } = this.orchestrationService;
strict(this.instanceSettings.instanceRole !== 'unset', 'Instance role is not set');
if (isLeader) this.startPruning();
if (this.instanceSettings.isLeader) this.startPruning();
if (isMultiMainSetupEnabled) {
if (this.orchestrationService.isMultiMainSetupEnabled) {
this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning());
this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning());
}
}
private isEnabled() {
const { instanceType, isFollower } = this.instanceSettings;
if (!this.globalConfig.pruning.isEnabled || instanceType !== 'main') {
return false;
get isEnabled() {
return (
this.pruningConfig.isEnabled &&
this.instanceSettings.instanceType === 'main' &&
this.instanceSettings.isLeader
);
}
if (this.globalConfig.multiMainSetup.enabled && instanceType === 'main' && isFollower) {
return false;
}
return true;
}
/**
* @important Call this method only after DB migrations have completed.
*/
startPruning() {
if (!this.isEnabled()) return;
if (!this.isEnabled || !dbConnectionState.migrated || this.isShuttingDown) return;
if (this.isShuttingDown) {
this.logger.warn('Cannot start pruning while shutting down');
return;
}
this.logger.debug('Starting soft-deletion and hard-deletion timers');
this.setSoftDeletionInterval();
this.scheduleHardDeletion();
this.scheduleRollingSoftDeletions();
this.scheduleNextHardDeletion();
}
stopPruning() {
if (!this.isEnabled()) return;
this.logger.debug('Removing soft-deletion and hard-deletion timers');
if (!this.isEnabled) return;
clearInterval(this.softDeletionInterval);
clearTimeout(this.hardDeletionTimeout);
}
private setSoftDeletionInterval(rateMs = this.rates.softDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
private scheduleRollingSoftDeletions(rateMs = this.rates.softDeletion) {
this.softDeletionInterval = setInterval(
async () => await this.softDeleteOnPruningCycle(),
async () => await this.softDelete(),
this.rates.softDeletion,
);
this.logger.debug(`Soft-deletion scheduled every ${when}`);
this.logger.debug(`Soft-deletion every ${rateMs * Time.milliseconds.toMinutes} minutes`);
}
private scheduleHardDeletion(rateMs = this.rates.hardDeletion) {
const when = [rateMs / TIME.MINUTE, 'min'].join(' ');
private scheduleNextHardDeletion(rateMs = this.rates.hardDeletion) {
this.hardDeletionTimeout = setTimeout(() => {
this.hardDeleteOnPruningCycle()
.then((rate) => this.scheduleHardDeletion(rate))
this.hardDelete()
.then((rate) => this.scheduleNextHardDeletion(rate))
.catch((error) => {
this.scheduleHardDeletion(1 * TIME.SECOND);
const errorMessage =
error instanceof Error
? error.message
: jsonStringify(error, { replaceCircularRefs: true });
this.logger.error('Failed to hard-delete executions', { errorMessage });
this.scheduleNextHardDeletion(1_000);
this.logger.error('Failed to hard-delete executions', { error: ensureError(error) });
});
}, rateMs);
this.logger.debug(`Hard-deletion scheduled for next ${when}`);
this.logger.debug(`Hard-deletion in next ${rateMs * Time.milliseconds.toMinutes} minutes`);
}
/**
* Mark executions as deleted based on age and count, in a pruning cycle.
*/
async softDeleteOnPruningCycle() {
this.logger.debug('Starting soft-deletion of executions');
/** Soft-delete executions based on max age and/or max count. */
async softDelete() {
const result = await this.executionRepository.softDeletePrunableExecutions();
if (result.affected === 0) {
@ -145,10 +120,11 @@ export class PruningService {
}
/**
* Permanently remove all soft-deleted executions and their binary data, in a pruning cycle.
* @return Delay in ms after which the next cycle should be started
* Delete all soft-deleted executions and their binary data.
*
* @returns Delay in milliseconds until next hard-deletion
*/
private async hardDeleteOnPruningCycle() {
private async hardDelete(): Promise<number> {
const ids = await this.executionRepository.findSoftDeletedExecutions();
const executionIds = ids.map((o) => o.executionId);
@ -160,8 +136,6 @@ export class PruningService {
}
try {
this.logger.debug('Starting hard-deletion of executions', { executionIds });
await this.binaryDataService.deleteMany(ids);
await this.executionRepository.deleteByIds(executionIds);
@ -170,16 +144,13 @@ export class PruningService {
} catch (error) {
this.logger.error('Failed to hard-delete executions', {
executionIds,
error: error instanceof Error ? error.message : `${error}`,
error: ensureError(error),
});
}
/**
* For next batch, speed up hard-deletion cycle in high-volume case
* to prevent high concurrency from causing duplicate deletions.
*/
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
// if high volume, speed up next hard-deletion
if (executionIds.length >= this.batchSize) return 1 * Time.seconds.toMilliseconds;
return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
return this.rates.hardDeletion;
}
}

View file

@ -1,4 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import { PruningConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import { BinaryDataService, InstanceSettings } from 'n8n-core';
import type { ExecutionStatus } from 'n8n-workflow';
@ -27,19 +27,19 @@ describe('softDeleteOnPruningCycle()', () => {
const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY);
let workflow: WorkflowEntity;
let globalConfig: GlobalConfig;
let pruningConfig: PruningConfig;
beforeAll(async () => {
await testDb.init();
globalConfig = Container.get(GlobalConfig);
pruningConfig = Container.get(PruningConfig);
pruningService = new PruningService(
mockLogger(),
instanceSettings,
Container.get(ExecutionRepository),
mockInstance(BinaryDataService),
mock(),
globalConfig,
pruningConfig,
);
workflow = await createWorkflow();
@ -62,8 +62,8 @@ describe('softDeleteOnPruningCycle()', () => {
describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
beforeAll(() => {
globalConfig.pruning.maxAge = 336;
globalConfig.pruning.maxCount = 1;
pruningConfig.maxAge = 336;
pruningConfig.maxCount = 1;
});
test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
@ -73,7 +73,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -92,7 +92,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -113,7 +113,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -132,7 +132,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -150,7 +150,7 @@ describe('softDeleteOnPruningCycle()', () => {
await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]);
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -163,8 +163,8 @@ describe('softDeleteOnPruningCycle()', () => {
describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
beforeAll(() => {
globalConfig.pruning.maxAge = 1;
globalConfig.pruning.maxCount = 0;
pruningConfig.maxAge = 1;
pruningConfig.maxCount = 0;
});
test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
@ -179,7 +179,7 @@ describe('softDeleteOnPruningCycle()', () => {
),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -203,7 +203,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -221,7 +221,7 @@ describe('softDeleteOnPruningCycle()', () => {
])('should prune %s executions', async (status, attributes) => {
const execution = await createExecution({ status, ...attributes }, workflow);
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -239,7 +239,7 @@ describe('softDeleteOnPruningCycle()', () => {
await createSuccessfulExecution(workflow),
];
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([
@ -266,7 +266,7 @@ describe('softDeleteOnPruningCycle()', () => {
await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]);
await pruningService.softDeleteOnPruningCycle();
await pruningService.softDelete();
const result = await findAllExecutions();
expect(result).toEqual([