diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts index e12770f770..ef1d65a737 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts @@ -1,32 +1,44 @@ import { WebSocket } from 'ws'; -import { TaskRunner } from '@/task-runner'; +import { TaskRunner, type TaskRunnerOpts } from '@/task-runner'; class TestRunner extends TaskRunner {} jest.mock('ws'); describe('TestRunner', () => { + let runner: TestRunner; + + const newTestRunner = (opts: Partial = {}) => + new TestRunner({ + taskType: 'test-task', + maxConcurrency: 5, + idleTimeout: 60, + grantToken: 'test-token', + maxPayloadSize: 1024, + taskBrokerUri: 'http://localhost:8080', + timezone: 'America/New_York', + taskTimeout: 60, + healthcheckServer: { + enabled: false, + host: 'localhost', + port: 8081, + }, + ...opts, + }); + + afterEach(() => { + runner?.clearIdleTimer(); + }); + describe('constructor', () => { afterEach(() => { jest.clearAllMocks(); }); it('should correctly construct WebSocket URI with provided taskBrokerUri', () => { - const runner = new TestRunner({ - taskType: 'test-task', - maxConcurrency: 5, - idleTimeout: 60, - grantToken: 'test-token', - maxPayloadSize: 1024, + runner = newTestRunner({ taskBrokerUri: 'http://localhost:8080', - timezone: 'America/New_York', - taskTimeout: 60, - healthcheckServer: { - enabled: false, - host: 'localhost', - port: 8081, - }, }); expect(WebSocket).toHaveBeenCalledWith( @@ -38,25 +50,11 @@ describe('TestRunner', () => { maxPayload: 1024, }), ); - - runner.clearIdleTimer(); }); it('should handle different taskBrokerUri formats correctly', () => { - const runner = new TestRunner({ - taskType: 'test-task', - maxConcurrency: 5, - idleTimeout: 60, - grantToken: 'test-token', - maxPayloadSize: 1024, + runner = newTestRunner({ taskBrokerUri: 'https://example.com:3000/path', - timezone: 'America/New_York', - taskTimeout: 60, - healthcheckServer: { - enabled: false, - host: 'localhost', - port: 8081, - }, }); expect(WebSocket).toHaveBeenCalledWith( @@ -68,49 +66,140 @@ describe('TestRunner', () => { maxPayload: 1024, }), ); - - runner.clearIdleTimer(); }); it('should throw an error if taskBrokerUri is invalid', () => { - expect( - () => - new TestRunner({ - taskType: 'test-task', - maxConcurrency: 5, - idleTimeout: 60, - grantToken: 'test-token', - maxPayloadSize: 1024, - taskBrokerUri: 'not-a-valid-uri', - timezone: 'America/New_York', - taskTimeout: 60, - healthcheckServer: { - enabled: false, - host: 'localhost', - port: 8081, - }, - }), + expect(() => + newTestRunner({ + taskBrokerUri: 'not-a-valid-uri', + }), ).toThrowError(/Invalid URL/); }); }); + describe('sendOffers', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.clearAllTimers(); + }); + + it('should not send offers if canSendOffers is false', () => { + runner = newTestRunner({ + taskType: 'test-task', + maxConcurrency: 2, + }); + const sendSpy = jest.spyOn(runner, 'send'); + expect(runner.canSendOffers).toBe(false); + + runner.sendOffers(); + + expect(sendSpy).toHaveBeenCalledTimes(0); + }); + + it('should enable sending of offer on runnerregistered message', () => { + runner = newTestRunner({ + taskType: 'test-task', + maxConcurrency: 2, + }); + runner.onMessage({ + type: 'broker:runnerregistered', + }); + + expect(runner.canSendOffers).toBe(true); + }); + + it('should send maxConcurrency offers when there are no offers', () => { + runner = newTestRunner({ + taskType: 'test-task', + maxConcurrency: 2, + }); + runner.onMessage({ + type: 'broker:runnerregistered', + }); + + const sendSpy = jest.spyOn(runner, 'send'); + + runner.sendOffers(); + runner.sendOffers(); + + expect(sendSpy).toHaveBeenCalledTimes(2); + expect(sendSpy.mock.calls).toEqual([ + [ + { + type: 'runner:taskoffer', + taskType: 'test-task', + offerId: expect.any(String), + validFor: expect.any(Number), + }, + ], + [ + { + type: 'runner:taskoffer', + taskType: 'test-task', + offerId: expect.any(String), + validFor: expect.any(Number), + }, + ], + ]); + }); + + it('should send up to maxConcurrency offers when there is a running task', () => { + runner = newTestRunner({ + taskType: 'test-task', + maxConcurrency: 2, + }); + runner.onMessage({ + type: 'broker:runnerregistered', + }); + runner.runningTasks.set('test-task', { + taskId: 'test-task', + active: true, + cancelled: false, + }); + const sendSpy = jest.spyOn(runner, 'send'); + + runner.sendOffers(); + + expect(sendSpy).toHaveBeenCalledTimes(1); + expect(sendSpy.mock.calls).toEqual([ + [ + { + type: 'runner:taskoffer', + taskType: 'test-task', + offerId: expect.any(String), + validFor: expect.any(Number), + }, + ], + ]); + }); + + it('should delete stale offers and send new ones', () => { + runner = newTestRunner({ + taskType: 'test-task', + maxConcurrency: 2, + }); + runner.onMessage({ + type: 'broker:runnerregistered', + }); + + const sendSpy = jest.spyOn(runner, 'send'); + + runner.sendOffers(); + expect(sendSpy).toHaveBeenCalledTimes(2); + sendSpy.mockClear(); + + jest.advanceTimersByTime(6000); + runner.sendOffers(); + expect(sendSpy).toHaveBeenCalledTimes(2); + }); + }); + describe('taskCancelled', () => { it('should reject pending requests when task is cancelled', () => { - const runner = new TestRunner({ - taskType: 'test-task', - maxConcurrency: 5, - idleTimeout: 60, - grantToken: 'test-token', - maxPayloadSize: 1024, - taskBrokerUri: 'http://localhost:8080', - timezone: 'America/New_York', - taskTimeout: 60, - healthcheckServer: { - enabled: false, - host: 'localhost', - port: 8081, - }, - }); + const runner = newTestRunner(); const taskId = 'test-task'; runner.runningTasks.set(taskId, { diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index d9e288eaab..78ec83961c 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -174,9 +174,11 @@ export abstract class TaskRunner extends EventEmitter { sendOffers() { this.deleteStaleOffers(); - const offersToSend = - this.maxConcurrency - - (Object.values(this.openOffers).length + Object.values(this.runningTasks).length); + if (!this.canSendOffers) { + return; + } + + const offersToSend = this.maxConcurrency - (this.openOffers.size + this.runningTasks.size); for (let i = 0; i < offersToSend; i++) { // Add a bit of randomness so that not all offers expire at the same time @@ -255,11 +257,12 @@ export abstract class TaskRunner extends EventEmitter { } hasOpenTasks() { - return Object.values(this.runningTasks).length < this.maxConcurrency; + return this.runningTasks.size < this.maxConcurrency; } offerAccepted(offerId: string, taskId: string) { if (!this.hasOpenTasks()) { + this.openOffers.delete(offerId); this.send({ type: 'runner:taskrejected', taskId, @@ -267,6 +270,7 @@ export abstract class TaskRunner extends EventEmitter { }); return; } + const offer = this.openOffers.get(offerId); if (!offer) { this.send({