mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
refactor(core): Enforce range for shutdown priority (no-changelog) (#9944)
This commit is contained in:
parent
c82579bf76
commit
757feaf585
|
@ -9,7 +9,8 @@ import {
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { HIGHEST_PRIORITY, OnShutdown } from './decorators/OnShutdown';
|
import { OnShutdown } from './decorators/OnShutdown';
|
||||||
|
import { HIGHEST_SHUTDOWN_PRIORITY } from './constants';
|
||||||
|
|
||||||
export type JobId = Bull.JobId;
|
export type JobId = Bull.JobId;
|
||||||
export type Job = Bull.Job<JobData>;
|
export type Job = Bull.Job<JobData>;
|
||||||
|
@ -109,7 +110,7 @@ export class Queue {
|
||||||
return await this.jobQueue.client.ping();
|
return await this.jobQueue.client.ping();
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnShutdown(HIGHEST_PRIORITY)
|
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||||
// Stop accepting new jobs, `doNotWaitActive` allows reporting progress
|
// Stop accepting new jobs, `doNotWaitActive` allows reporting progress
|
||||||
async pause(): Promise<void> {
|
async pause(): Promise<void> {
|
||||||
return await this.jobQueue?.pause(true, true);
|
return await this.jobQueue?.pause(true, true);
|
||||||
|
|
|
@ -165,3 +165,7 @@ export const ARTIFICIAL_TASK_DATA = {
|
||||||
],
|
],
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const LOWEST_SHUTDOWN_PRIORITY = 0;
|
||||||
|
export const DEFAULT_SHUTDOWN_PRIORITY = 100;
|
||||||
|
export const HIGHEST_SHUTDOWN_PRIORITY = 200;
|
||||||
|
|
|
@ -1,10 +1,7 @@
|
||||||
import { Container } from 'typedi';
|
import { Container } from 'typedi';
|
||||||
import { ApplicationError } from 'n8n-workflow';
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service';
|
import { type ServiceClass, ShutdownService } from '@/shutdown/Shutdown.service';
|
||||||
|
import { DEFAULT_SHUTDOWN_PRIORITY } from '@/constants';
|
||||||
export const LOWEST_PRIORITY = 0;
|
|
||||||
export const DEFAULT_PRIORITY = 100;
|
|
||||||
export const HIGHEST_PRIORITY = 200;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decorator that registers a method as a shutdown hook. The method will
|
* Decorator that registers a method as a shutdown hook. The method will
|
||||||
|
@ -26,7 +23,7 @@ export const HIGHEST_PRIORITY = 200;
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
export const OnShutdown =
|
export const OnShutdown =
|
||||||
(priority = DEFAULT_PRIORITY): MethodDecorator =>
|
(priority = DEFAULT_SHUTDOWN_PRIORITY): MethodDecorator =>
|
||||||
(prototype, propertyKey, descriptor) => {
|
(prototype, propertyKey, descriptor) => {
|
||||||
const serviceClass = prototype.constructor as ServiceClass;
|
const serviceClass = prototype.constructor as ServiceClass;
|
||||||
const methodName = String(propertyKey);
|
const methodName = String(propertyKey);
|
||||||
|
|
|
@ -4,7 +4,8 @@ import { Logger } from '@/Logger';
|
||||||
import ioRedis from 'ioredis';
|
import ioRedis from 'ioredis';
|
||||||
import type { Cluster, RedisOptions } from 'ioredis';
|
import type { Cluster, RedisOptions } from 'ioredis';
|
||||||
import type { RedisClientType } from './RedisServiceBaseClasses';
|
import type { RedisClientType } from './RedisServiceBaseClasses';
|
||||||
import { LOWEST_PRIORITY, OnShutdown } from '@/decorators/OnShutdown';
|
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||||
|
import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class RedisClientService {
|
export class RedisClientService {
|
||||||
|
@ -23,7 +24,7 @@ export class RedisClientService {
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnShutdown(LOWEST_PRIORITY)
|
@OnShutdown(LOWEST_SHUTDOWN_PRIORITY)
|
||||||
disconnectClients() {
|
disconnectClients() {
|
||||||
for (const client of this.clients) {
|
for (const client of this.clients) {
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
|
|
|
@ -2,6 +2,7 @@ import { Container, Service } from 'typedi';
|
||||||
import { ApplicationError, ErrorReporterProxy, assert } from 'n8n-workflow';
|
import { ApplicationError, ErrorReporterProxy, assert } from 'n8n-workflow';
|
||||||
import type { Class } from 'n8n-core';
|
import type { Class } from 'n8n-core';
|
||||||
import { Logger } from '@/Logger';
|
import { Logger } from '@/Logger';
|
||||||
|
import { LOWEST_SHUTDOWN_PRIORITY, HIGHEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||||
|
|
||||||
type HandlerFn = () => Promise<void> | void;
|
type HandlerFn = () => Promise<void> | void;
|
||||||
export type ServiceClass = Class<Record<string, HandlerFn>>;
|
export type ServiceClass = Class<Record<string, HandlerFn>>;
|
||||||
|
@ -33,6 +34,13 @@ export class ShutdownService {
|
||||||
|
|
||||||
/** Registers given listener to be notified when the application is shutting down */
|
/** Registers given listener to be notified when the application is shutting down */
|
||||||
register(priority: number, handler: ShutdownHandler) {
|
register(priority: number, handler: ShutdownHandler) {
|
||||||
|
if (priority < LOWEST_SHUTDOWN_PRIORITY || priority > HIGHEST_SHUTDOWN_PRIORITY) {
|
||||||
|
throw new ApplicationError(
|
||||||
|
`Invalid shutdown priority. Please set it between ${LOWEST_SHUTDOWN_PRIORITY} and ${HIGHEST_SHUTDOWN_PRIORITY}.`,
|
||||||
|
{ extra: { priority } },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.handlersByPriority[priority]) {
|
if (!this.handlersByPriority[priority]) {
|
||||||
this.handlersByPriority[priority] = [];
|
this.handlersByPriority[priority] = [];
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,4 +73,26 @@ describe('OnShutdown', () => {
|
||||||
new TestClass();
|
new TestClass();
|
||||||
}).toThrow('TestClass.onShutdown() must be a method on TestClass to use "OnShutdown"');
|
}).toThrow('TestClass.onShutdown() must be a method on TestClass to use "OnShutdown"');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should throw if the priority is invalid', () => {
|
||||||
|
expect(() => {
|
||||||
|
@Service()
|
||||||
|
class TestClass {
|
||||||
|
@OnShutdown(201)
|
||||||
|
async onShutdown() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
new TestClass();
|
||||||
|
}).toThrow('Invalid shutdown priority. Please set it between 0 and 200.');
|
||||||
|
|
||||||
|
expect(() => {
|
||||||
|
@Service()
|
||||||
|
class TestClass {
|
||||||
|
@OnShutdown(-1)
|
||||||
|
async onShutdown() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
new TestClass();
|
||||||
|
}).toThrow('Invalid shutdown priority. Please set it between 0 and 200.');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue