diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index f5b91a3f2c..5d627ba341 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -5,6 +5,8 @@ import type { RunnerMessage, TaskResultData } from '../runner-types'; import { TaskBroker } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; +const createValidUntil = (ms: number) => process.hrtime.bigint() + BigInt(ms * 1_000_000); + describe('TaskBroker', () => { let taskBroker: TaskBroker; @@ -15,14 +17,12 @@ describe('TaskBroker', () => { describe('expireTasks', () => { it('should remove expired task offers and keep valid task offers', () => { - const now = process.hrtime.bigint(); - const validOffer: TaskOffer = { offerId: 'valid', runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future + validUntil: createValidUntil(1000), // 1 second in the future }; const expiredOffer1: TaskOffer = { @@ -30,7 +30,7 @@ describe('TaskBroker', () => { runnerId: 'runner2', taskType: 'taskType1', validFor: 1000, - validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past + validUntil: createValidUntil(-1000), // 1 second in the past }; const expiredOffer2: TaskOffer = { @@ -38,7 +38,7 @@ describe('TaskBroker', () => { runnerId: 'runner3', taskType: 'taskType1', validFor: 2000, - validUntil: now - BigInt(2000 * 1_000_000), // 2 seconds in the past + validUntil: createValidUntil(-2000), // 2 seconds in the past }; taskBroker.setPendingTaskOffers([validOffer, expiredOffer1, expiredOffer2]); @@ -102,6 +102,55 @@ describe('TaskBroker', () => { expect(runnerIds).toHaveLength(0); }); + + it('should remove any pending offers for that runner', () => { + const runnerId = 'runner1'; + const runner = mock({ id: runnerId }); + const messageCallback = jest.fn(); + + taskBroker.registerRunner(runner, messageCallback); + taskBroker.taskOffered({ + offerId: 'offer1', + runnerId, + taskType: 'mock', + validFor: 1000, + validUntil: createValidUntil(1000), + }); + taskBroker.taskOffered({ + offerId: 'offer2', + runnerId: 'runner2', + taskType: 'mock', + validFor: 1000, + validUntil: createValidUntil(1000), + }); + taskBroker.deregisterRunner(runnerId); + + const offers = taskBroker.getPendingTaskOffers(); + expect(offers).toHaveLength(1); + expect(offers[0].runnerId).toBe('runner2'); + }); + + it('should fail any running tasks for that runner', () => { + const runnerId = 'runner1'; + const runner = mock({ id: runnerId }); + const messageCallback = jest.fn(); + + const taskId = 'task1'; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const failSpy = jest.spyOn(taskBroker as any, 'failTask'); + const rejectSpy = jest.spyOn(taskBroker, 'handleRunnerReject'); + + taskBroker.registerRunner(runner, messageCallback); + taskBroker.setTasks({ + [taskId]: { id: taskId, requesterId: 'requester1', runnerId, taskType: 'mock' }, + task2: { id: 'task2', requesterId: 'requester1', runnerId: 'runner2', taskType: 'mock' }, + }); + taskBroker.deregisterRunner(runnerId); + + expect(failSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`); + expect(rejectSpy).toBeCalledWith(taskId, `The Task Runner (${runnerId}) has disconnected`); + }); }); describe('deregisterRequester', () => { @@ -121,14 +170,12 @@ describe('TaskBroker', () => { describe('taskRequested', () => { it('should match a pending offer to an incoming request', async () => { - const now = process.hrtime.bigint(); - const offer: TaskOffer = { offerId: 'offer1', runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), + validUntil: createValidUntil(1000), }; taskBroker.setPendingTaskOffers([offer]); @@ -150,8 +197,6 @@ describe('TaskBroker', () => { describe('taskOffered', () => { it('should match a pending request to an incoming offer', () => { - const now = process.hrtime.bigint(); - const request: TaskRequest = { requestId: 'request1', requesterId: 'requester1', @@ -166,7 +211,7 @@ describe('TaskBroker', () => { runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), + validUntil: createValidUntil(1000), }; jest.spyOn(taskBroker, 'acceptOffer').mockResolvedValue(); // allow Jest to exit cleanly @@ -180,14 +225,12 @@ describe('TaskBroker', () => { describe('settleTasks', () => { it('should match task offers with task requests by task type', () => { - const now = process.hrtime.bigint(); - const offer1: TaskOffer = { offerId: 'offer1', runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), + validUntil: createValidUntil(1000), }; const offer2: TaskOffer = { @@ -195,7 +238,7 @@ describe('TaskBroker', () => { runnerId: 'runner2', taskType: 'taskType2', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), + validUntil: createValidUntil(1000), }; const request1: TaskRequest = { @@ -235,14 +278,12 @@ describe('TaskBroker', () => { }); it('should not match a request whose acceptance is in progress', () => { - const now = process.hrtime.bigint(); - const offer: TaskOffer = { offerId: 'offer1', runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), + validUntil: createValidUntil(1000), }; const request: TaskRequest = { @@ -271,14 +312,12 @@ describe('TaskBroker', () => { }); it('should expire tasks before settling', () => { - const now = process.hrtime.bigint(); - const validOffer: TaskOffer = { offerId: 'valid', runnerId: 'runner1', taskType: 'taskType1', validFor: 1000, - validUntil: now + BigInt(1000 * 1_000_000), // 1 second in the future + validUntil: createValidUntil(1000), // 1 second in the future }; const expiredOffer: TaskOffer = { @@ -286,7 +325,7 @@ describe('TaskBroker', () => { runnerId: 'runner2', taskType: 'taskType2', // will be removed before matching validFor: 1000, - validUntil: now - BigInt(1000 * 1_000_000), // 1 second in the past + validUntil: createValidUntil(-1000), // 1 second in the past }; const request1: TaskRequest = { diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 829910b468..a63cbbda21 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -75,15 +75,11 @@ export class TaskBroker { expireTasks() { const now = process.hrtime.bigint(); - const invalidOffers: number[] = []; - for (let i = 0; i < this.pendingTaskOffers.length; i++) { + for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) { if (this.pendingTaskOffers[i].validUntil < now) { - invalidOffers.push(i); + this.pendingTaskOffers.splice(i, 1); } } - - // We reverse the list so the later indexes are valid after deleting earlier ones - invalidOffers.reverse().forEach((i) => this.pendingTaskOffers.splice(i, 1)); } registerRunner(runner: TaskRunner, messageCallback: MessageCallback) { @@ -92,6 +88,21 @@ export class TaskBroker { deregisterRunner(runnerId: string) { this.knownRunners.delete(runnerId); + + // Remove any pending offers + for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) { + if (this.pendingTaskOffers[i].runnerId === runnerId) { + this.pendingTaskOffers.splice(i, 1); + } + } + + // Fail any tasks + for (const task of this.tasks.values()) { + if (task.runnerId === runnerId) { + void this.failTask(task.id, `The Task Runner (${runnerId}) has disconnected`); + this.handleRunnerReject(task.id, `The Task Runner (${runnerId}) has disconnected`); + } + } } registerRequester(requesterId: string, messageCallback: RequesterMessageCallback) {