This commit is contained in:
Iván Ovejero 2024-11-19 18:00:20 +01:00
parent 0715ec2512
commit 403b76bdfc
No known key found for this signature in database
3 changed files with 21 additions and 24 deletions

View file

@ -21,6 +21,7 @@ export namespace BrokerMessage {
type: 'broker:taskofferaccept';
taskId: string;
offerId: string;
requestId: string;
}
export interface TaskCancel {
@ -188,6 +189,7 @@ export namespace RunnerMessage {
export interface TaskDeferred {
type: 'runner:taskdeferred';
taskId: string;
requestId: string;
}
export interface TaskDone {

View file

@ -6,10 +6,4 @@ export class TaskRejectError extends ApplicationError {
}
}
export class TaskDeferredError extends ApplicationError {
constructor() {
super('Task deferred until runner is ready', { level: 'info' });
}
}
export class TaskError extends ApplicationError {}

View file

@ -13,7 +13,7 @@ import config from '@/config';
import { Time } from '@/constants';
import { Logger } from '@/logging/logger.service';
import { TaskDeferredError, TaskRejectError } from './errors';
import { TaskRejectError } from './errors';
import { TaskRunnerTimeoutError } from './errors/task-runner-timeout.error';
import { RunnerLifecycleEvents } from './runner-lifecycle-events';
@ -59,7 +59,7 @@ type RunnerAcceptCallback = () => void;
type RequesterAcceptCallback = (
settings: RequesterMessage.ToBroker.TaskSettings['settings'],
) => void;
type TaskRejectCallback = (reason: TaskRejectError | TaskDeferredError) => void;
type TaskRejectCallback = (reason: TaskRejectError) => void;
@Service()
export class TaskBroker {
@ -163,7 +163,7 @@ export class TaskBroker {
this.handleRunnerReject(message.taskId, message.reason);
break;
case 'runner:taskdeferred':
this.handleRunnerDeferred(message.taskId);
this.handleRunnerDeferred(message.taskId, message.requestId);
break;
case 'runner:taskoffer':
this.taskOffered({
@ -233,12 +233,15 @@ export class TaskBroker {
}
}
handleRunnerDeferred(taskId: Task['id']) {
const acceptReject = this.runnerAcceptRejects.get(taskId);
if (acceptReject) {
acceptReject.reject(new TaskDeferredError());
this.runnerAcceptRejects.delete(taskId);
}
handleRunnerDeferred(taskId: Task['id'], requestId: TaskRequest['requestId']) {
const request = this.pendingTaskRequests.find((ptr) => ptr.requestId === requestId);
if (!request) throw new ApplicationError('Failed to find request to defer');
this.logger.info(`Request for task (${taskId}) deferred until runner is ready`);
request.acceptInProgress = false;
this.pendingTaskRequests.push(request);
}
async handleDataRequest(
@ -505,16 +508,19 @@ export class TaskBroker {
const acceptPromise = new Promise((resolve, reject) => {
this.runnerAcceptRejects.set(taskId, { accept: resolve as () => void, reject });
if (offer.validFor !== -1) {
// TODO: customisable timeout
setTimeout(() => {
reject('Runner timed out');
}, 2000);
}
});
await this.messageRunner(offer.runnerId, {
type: 'broker:taskofferaccept',
offerId: offer.offerId,
taskId,
requestId: request.requestId,
});
await acceptPromise;
@ -524,11 +530,6 @@ export class TaskBroker {
this.logger.info(`Task (${taskId}) rejected by Runner with reason "${e.reason}"`);
return;
}
if (e instanceof TaskDeferredError) {
this.logger.info(`Task (${taskId}) deferred until runner is ready`);
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
return;
}
throw e;
}