mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
test(core): Add tests for scaling service (no-changelog) (#10320)
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
This commit is contained in:
parent
a0b021bbe7
commit
aa95059cf0
259
packages/cli/src/scaling/__tests__/scaling.service.test.ts
Normal file
259
packages/cli/src/scaling/__tests__/scaling.service.test.ts
Normal file
|
@ -0,0 +1,259 @@
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import { ScalingService } from '../scaling.service';
|
||||||
|
import { JOB_TYPE_NAME, QUEUE_NAME } from '../constants';
|
||||||
|
import config from '@/config';
|
||||||
|
import * as BullModule from 'bull';
|
||||||
|
import type { Job, JobData, JobOptions, JobQueue } from '../types';
|
||||||
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
|
|
||||||
|
const queue = mock<JobQueue>({
|
||||||
|
client: { ping: jest.fn() },
|
||||||
|
});
|
||||||
|
|
||||||
|
jest.mock('bull', () => ({
|
||||||
|
__esModule: true,
|
||||||
|
default: jest.fn(() => queue),
|
||||||
|
}));
|
||||||
|
|
||||||
|
describe('ScalingService', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
config.set('generic.instanceType', 'main');
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('setupQueue', () => {
|
||||||
|
it('should set up the queue', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
const { prefix, settings } = config.get('queue.bull');
|
||||||
|
const Bull = jest.mocked(BullModule.default);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(Bull).toHaveBeenCalledWith(QUEUE_NAME, {
|
||||||
|
prefix,
|
||||||
|
settings,
|
||||||
|
createClient: expect.any(Function),
|
||||||
|
});
|
||||||
|
expect(queue.on).toHaveBeenCalledWith('global:progress', expect.any(Function));
|
||||||
|
expect(queue.on).toHaveBeenCalledWith('error', expect.any(Function));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('setupWorker', () => {
|
||||||
|
it('should set up a worker with concurrency', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('generic.instanceType', 'worker');
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
const concurrency = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
scalingService.setupWorker(concurrency);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.process).toHaveBeenCalledWith(JOB_TYPE_NAME, concurrency, expect.any(Function));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw if called on a non-worker instance', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act and Assert
|
||||||
|
*/
|
||||||
|
expect(() => scalingService.setupWorker(5)).toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('pauseQueue', () => {
|
||||||
|
it('should pause the queue', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await scalingService.pauseQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.pause).toHaveBeenCalledWith(true, true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('pingQueue', () => {
|
||||||
|
it('should ping the queue', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
await scalingService.pingQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.client.ping).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('addJob', () => {
|
||||||
|
it('should add a job', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
queue.add.mockResolvedValue(mock<Job>({ id: '456' }));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const jobData = mock<JobData>({ executionId: '123' });
|
||||||
|
const jobOptions = mock<JobOptions>();
|
||||||
|
await scalingService.addJob(jobData, jobOptions);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.add).toHaveBeenCalledWith(JOB_TYPE_NAME, jobData, jobOptions);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getJob', () => {
|
||||||
|
it('should get a job', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
const jobId = '123';
|
||||||
|
queue.getJob.mockResolvedValue(mock<Job>({ id: jobId }));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const job = await scalingService.getJob(jobId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.getJob).toHaveBeenCalledWith(jobId);
|
||||||
|
expect(job?.id).toBe(jobId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('findJobsByStatus', () => {
|
||||||
|
it('should find jobs by status', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
queue.getJobs.mockResolvedValue([mock<Job>({ id: '123' })]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const jobs = await scalingService.findJobsByStatus(['active']);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(queue.getJobs).toHaveBeenCalledWith(['active']);
|
||||||
|
expect(jobs).toHaveLength(1);
|
||||||
|
expect(jobs.at(0)?.id).toBe('123');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('stopJob', () => {
|
||||||
|
it('should stop an active job', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(true) });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const result = await scalingService.stopJob(job);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' });
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should stop an inactive job', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
const job = mock<Job>({ isActive: jest.fn().mockResolvedValue(false) });
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const result = await scalingService.stopJob(job);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(job.remove).toHaveBeenCalled();
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should report failure to stop a job', async () => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
const scalingService = new ScalingService(mock(), mock(), mock());
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
const job = mock<Job>({
|
||||||
|
isActive: jest.fn().mockImplementation(() => {
|
||||||
|
throw new ApplicationError('Something went wrong');
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
const result = await scalingService.stopJob(job);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -16,7 +16,7 @@ import type PCancelable from 'p-cancelable';
|
||||||
*/
|
*/
|
||||||
@Service()
|
@Service()
|
||||||
export class JobProcessor {
|
export class JobProcessor {
|
||||||
private readonly runningJobs: { [jobId: JobId]: RunningJob } = {};
|
private readonly runningJobs: Record<JobId, RunningJob> = {};
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
|
|
|
@ -94,12 +94,12 @@ export class ScalingService {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (await job.isActive()) {
|
if (await job.isActive()) {
|
||||||
await job.progress({ kind: 'abort-job' });
|
await job.progress({ kind: 'abort-job' }); // being processed by worker
|
||||||
this.logger.debug('[ScalingService] Stopped active job', props);
|
this.logger.debug('[ScalingService] Stopped active job', props);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
await job.remove();
|
await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
|
||||||
this.logger.debug('[ScalingService] Stopped inactive job', props);
|
this.logger.debug('[ScalingService] Stopped inactive job', props);
|
||||||
return true;
|
return true;
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
|
|
Loading…
Reference in a new issue