From 7d3ad669e2d179894d3ad32b9ae7bea32c107d51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 20 Nov 2024 12:38:29 +0100 Subject: [PATCH] feat(core): Defer task on launcher handshake (no-changelog) (#11786) --- .../@n8n/task-runner/src/message-types.ts | 7 ++ .../src/runners/__tests__/task-broker.test.ts | 89 +++++++++++++++++++ packages/cli/src/runners/errors.ts | 6 ++ .../cli/src/runners/task-broker.service.ts | 31 ++++++- 4 files changed, 129 insertions(+), 4 deletions(-) diff --git a/packages/@n8n/task-runner/src/message-types.ts b/packages/@n8n/task-runner/src/message-types.ts index b5f17f965e..71f236b52a 100644 --- a/packages/@n8n/task-runner/src/message-types.ts +++ b/packages/@n8n/task-runner/src/message-types.ts @@ -184,6 +184,12 @@ export namespace RunnerMessage { reason: string; } + /** Message where launcher (impersonating runner) requests broker to hold task until runner is ready. */ + export interface TaskDeferred { + type: 'runner:taskdeferred'; + taskId: string; + } + export interface TaskDone { type: 'runner:taskdone'; taskId: string; @@ -243,6 +249,7 @@ export namespace RunnerMessage { | TaskError | TaskAccepted | TaskRejected + | TaskDeferred | TaskOffer | RPC | TaskDataRequest diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 4cbc4ebfc0..2f9a6a26e7 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -55,6 +55,35 @@ describe('TaskBroker', () => { expect(offers).toHaveLength(1); expect(offers[0]).toEqual(validOffer); }); + + it('should not expire non-expiring task offers', () => { + const nonExpiringOffer: TaskOffer = { + offerId: 'nonExpiring', + runnerId: 'runner1', + taskType: 'taskType1', + validFor: -1, + validUntil: 0n, // sentinel value for non-expiring offer + }; + + const expiredOffer: TaskOffer = { + offerId: 'expired', + runnerId: 'runner2', + taskType: 'taskType1', + validFor: 1000, + validUntil: createValidUntil(-1000), // 1 second in the past + }; + + taskBroker.setPendingTaskOffers([ + nonExpiringOffer, // will not be removed + expiredOffer, // will be removed + ]); + + taskBroker.expireTasks(); + + const offers = taskBroker.getPendingTaskOffers(); + expect(offers).toHaveLength(1); + expect(offers[0]).toEqual(nonExpiringOffer); + }); }); describe('registerRunner', () => { @@ -595,6 +624,66 @@ describe('TaskBroker', () => { requestParams, }); }); + + it('should handle `runner:taskoffer` message with expiring offer', async () => { + const runnerId = 'runner1'; + const validFor = 1000; // 1 second + const message: RunnerMessage.ToBroker.TaskOffer = { + type: 'runner:taskoffer', + offerId: 'offer1', + taskType: 'taskType1', + validFor, + }; + + const beforeTime = process.hrtime.bigint(); + taskBroker.registerRunner(mock({ id: runnerId }), jest.fn()); + + await taskBroker.onRunnerMessage(runnerId, message); + + const afterTime = process.hrtime.bigint(); + + const offers = taskBroker.getPendingTaskOffers(); + expect(offers).toHaveLength(1); + + const expectedMinValidUntil = beforeTime + BigInt(validFor * 1_000_000); + const expectedMaxValidUntil = afterTime + BigInt(validFor * 1_000_000); + + expect(offers[0].validUntil).toBeGreaterThanOrEqual(expectedMinValidUntil); + expect(offers[0].validUntil).toBeLessThanOrEqual(expectedMaxValidUntil); + expect(offers[0]).toEqual( + expect.objectContaining({ + runnerId, + taskType: message.taskType, + offerId: message.offerId, + validFor, + }), + ); + }); + + it('should handle `runner:taskoffer` message with non-expiring offer', async () => { + const runnerId = 'runner1'; + const message: RunnerMessage.ToBroker.TaskOffer = { + type: 'runner:taskoffer', + offerId: 'offer1', + taskType: 'taskType1', + validFor: -1, + }; + + taskBroker.registerRunner(mock({ id: runnerId }), jest.fn()); + + await taskBroker.onRunnerMessage(runnerId, message); + + const offers = taskBroker.getPendingTaskOffers(); + + expect(offers).toHaveLength(1); + expect(offers[0]).toEqual({ + runnerId, + taskType: message.taskType, + offerId: message.offerId, + validFor: -1, + validUntil: 0n, + }); + }); }); describe('onRequesterMessage', () => { diff --git a/packages/cli/src/runners/errors.ts b/packages/cli/src/runners/errors.ts index cc53e18fd4..c530e5a95d 100644 --- a/packages/cli/src/runners/errors.ts +++ b/packages/cli/src/runners/errors.ts @@ -6,4 +6,10 @@ export class TaskRejectError extends ApplicationError { } } +export class TaskDeferredError extends ApplicationError { + constructor() { + super('Task deferred until runner is ready', { level: 'info' }); + } +} + export class TaskError extends ApplicationError {} diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 9af7b19f55..af76fb6cac 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -13,7 +13,7 @@ import config from '@/config'; import { Time } from '@/constants'; import { Logger } from '@/logging/logger.service'; -import { TaskRejectError } from './errors'; +import { TaskDeferredError, TaskRejectError } from './errors'; import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error'; import { RunnerLifecycleEvents } from './runner-lifecycle-events'; @@ -36,6 +36,8 @@ export interface TaskOffer { offerId: string; runnerId: TaskRunner['id']; taskType: string; + + /** How long (in milliseconds) the task offer is valid for. `-1` for non-expiring offer from launcher. */ validFor: number; validUntil: bigint; } @@ -57,7 +59,7 @@ type RunnerAcceptCallback = () => void; type RequesterAcceptCallback = ( settings: RequesterMessage.ToBroker.TaskSettings['settings'], ) => void; -type TaskRejectCallback = (reason: TaskRejectError) => void; +type TaskRejectCallback = (reason: TaskRejectError | TaskDeferredError) => void; @Service() export class TaskBroker { @@ -97,7 +99,9 @@ export class TaskBroker { expireTasks() { const now = process.hrtime.bigint(); for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) { - if (this.pendingTaskOffers[i].validUntil < now) { + const offer = this.pendingTaskOffers[i]; + if (offer.validFor === -1) continue; // non-expiring offer + if (offer.validUntil < now) { this.pendingTaskOffers.splice(i, 1); } } @@ -158,13 +162,19 @@ export class TaskBroker { case 'runner:taskrejected': this.handleRunnerReject(message.taskId, message.reason); break; + case 'runner:taskdeferred': + this.handleRunnerDeferred(message.taskId); + break; case 'runner:taskoffer': this.taskOffered({ runnerId, taskType: message.taskType, offerId: message.offerId, validFor: message.validFor, - validUntil: process.hrtime.bigint() + BigInt(message.validFor * 1_000_000), + validUntil: + message.validFor === -1 + ? 0n // sentinel value for non-expiring offer + : process.hrtime.bigint() + BigInt(message.validFor * 1_000_000), }); break; case 'runner:taskdone': @@ -223,6 +233,14 @@ export class TaskBroker { } } + handleRunnerDeferred(taskId: Task['id']) { + const acceptReject = this.runnerAcceptRejects.get(taskId); + if (acceptReject) { + acceptReject.reject(new TaskDeferredError()); + this.runnerAcceptRejects.delete(taskId); + } + } + async handleDataRequest( taskId: Task['id'], requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'], @@ -506,6 +524,11 @@ export class TaskBroker { this.logger.info(`Task (${taskId}) rejected by Runner with reason "${e.reason}"`); return; } + if (e instanceof TaskDeferredError) { + this.logger.info(`Task (${taskId}) deferred until runner is ready`); + this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner + return; + } throw e; }