feat(core): Defer task on launcher handshake (no-changelog) (#11786)

This commit is contained in:
Iván Ovejero 2024-11-20 12:38:29 +01:00 committed by GitHub
parent b05d435199
commit 7d3ad669e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 129 additions and 4 deletions

View file

@ -184,6 +184,12 @@ export namespace RunnerMessage {
reason: string;
}
/** Message where launcher (impersonating runner) requests broker to hold task until runner is ready. */
export interface TaskDeferred {
type: 'runner:taskdeferred';
taskId: string;
}
export interface TaskDone {
type: 'runner:taskdone';
taskId: string;
@ -243,6 +249,7 @@ export namespace RunnerMessage {
| TaskError
| TaskAccepted
| TaskRejected
| TaskDeferred
| TaskOffer
| RPC
| TaskDataRequest

View file

@ -55,6 +55,35 @@ describe('TaskBroker', () => {
expect(offers).toHaveLength(1);
expect(offers[0]).toEqual(validOffer);
});
it('should not expire non-expiring task offers', () => {
const nonExpiringOffer: TaskOffer = {
offerId: 'nonExpiring',
runnerId: 'runner1',
taskType: 'taskType1',
validFor: -1,
validUntil: 0n, // sentinel value for non-expiring offer
};
const expiredOffer: TaskOffer = {
offerId: 'expired',
runnerId: 'runner2',
taskType: 'taskType1',
validFor: 1000,
validUntil: createValidUntil(-1000), // 1 second in the past
};
taskBroker.setPendingTaskOffers([
nonExpiringOffer, // will not be removed
expiredOffer, // will be removed
]);
taskBroker.expireTasks();
const offers = taskBroker.getPendingTaskOffers();
expect(offers).toHaveLength(1);
expect(offers[0]).toEqual(nonExpiringOffer);
});
});
describe('registerRunner', () => {
@ -595,6 +624,66 @@ describe('TaskBroker', () => {
requestParams,
});
});
it('should handle `runner:taskoffer` message with expiring offer', async () => {
const runnerId = 'runner1';
const validFor = 1000; // 1 second
const message: RunnerMessage.ToBroker.TaskOffer = {
type: 'runner:taskoffer',
offerId: 'offer1',
taskType: 'taskType1',
validFor,
};
const beforeTime = process.hrtime.bigint();
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
await taskBroker.onRunnerMessage(runnerId, message);
const afterTime = process.hrtime.bigint();
const offers = taskBroker.getPendingTaskOffers();
expect(offers).toHaveLength(1);
const expectedMinValidUntil = beforeTime + BigInt(validFor * 1_000_000);
const expectedMaxValidUntil = afterTime + BigInt(validFor * 1_000_000);
expect(offers[0].validUntil).toBeGreaterThanOrEqual(expectedMinValidUntil);
expect(offers[0].validUntil).toBeLessThanOrEqual(expectedMaxValidUntil);
expect(offers[0]).toEqual(
expect.objectContaining({
runnerId,
taskType: message.taskType,
offerId: message.offerId,
validFor,
}),
);
});
it('should handle `runner:taskoffer` message with non-expiring offer', async () => {
const runnerId = 'runner1';
const message: RunnerMessage.ToBroker.TaskOffer = {
type: 'runner:taskoffer',
offerId: 'offer1',
taskType: 'taskType1',
validFor: -1,
};
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
await taskBroker.onRunnerMessage(runnerId, message);
const offers = taskBroker.getPendingTaskOffers();
expect(offers).toHaveLength(1);
expect(offers[0]).toEqual({
runnerId,
taskType: message.taskType,
offerId: message.offerId,
validFor: -1,
validUntil: 0n,
});
});
});
describe('onRequesterMessage', () => {

View file

@ -6,4 +6,10 @@ export class TaskRejectError extends ApplicationError {
}
}
export class TaskDeferredError extends ApplicationError {
constructor() {
super('Task deferred until runner is ready', { level: 'info' });
}
}
export class TaskError extends ApplicationError {}

View file

@ -13,7 +13,7 @@ import config from '@/config';
import { Time } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { TaskRejectError } from './errors';
import { TaskDeferredError, TaskRejectError } from './errors';
import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error';
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
@ -36,6 +36,8 @@ export interface TaskOffer {
offerId: string;
runnerId: TaskRunner['id'];
taskType: string;
/** How long (in milliseconds) the task offer is valid for. `-1` for non-expiring offer from launcher. */
validFor: number;
validUntil: bigint;
}
@ -57,7 +59,7 @@ type RunnerAcceptCallback = () => void;
type RequesterAcceptCallback = (
settings: RequesterMessage.ToBroker.TaskSettings['settings'],
) => void;
type TaskRejectCallback = (reason: TaskRejectError) => void;
type TaskRejectCallback = (reason: TaskRejectError | TaskDeferredError) => void;
@Service()
export class TaskBroker {
@ -97,7 +99,9 @@ export class TaskBroker {
expireTasks() {
const now = process.hrtime.bigint();
for (let i = this.pendingTaskOffers.length - 1; i >= 0; i--) {
if (this.pendingTaskOffers[i].validUntil < now) {
const offer = this.pendingTaskOffers[i];
if (offer.validFor === -1) continue; // non-expiring offer
if (offer.validUntil < now) {
this.pendingTaskOffers.splice(i, 1);
}
}
@ -158,13 +162,19 @@ export class TaskBroker {
case 'runner:taskrejected':
this.handleRunnerReject(message.taskId, message.reason);
break;
case 'runner:taskdeferred':
this.handleRunnerDeferred(message.taskId);
break;
case 'runner:taskoffer':
this.taskOffered({
runnerId,
taskType: message.taskType,
offerId: message.offerId,
validFor: message.validFor,
validUntil: process.hrtime.bigint() + BigInt(message.validFor * 1_000_000),
validUntil:
message.validFor === -1
? 0n // sentinel value for non-expiring offer
: process.hrtime.bigint() + BigInt(message.validFor * 1_000_000),
});
break;
case 'runner:taskdone':
@ -223,6 +233,14 @@ export class TaskBroker {
}
}
handleRunnerDeferred(taskId: Task['id']) {
const acceptReject = this.runnerAcceptRejects.get(taskId);
if (acceptReject) {
acceptReject.reject(new TaskDeferredError());
this.runnerAcceptRejects.delete(taskId);
}
}
async handleDataRequest(
taskId: Task['id'],
requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'],
@ -506,6 +524,11 @@ export class TaskBroker {
this.logger.info(`Task (${taskId}) rejected by Runner with reason "${e.reason}"`);
return;
}
if (e instanceof TaskDeferredError) {
this.logger.info(`Task (${taskId}) deferred until runner is ready`);
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
return;
}
throw e;
}