n8n/packages/cli/src/WaitTracker.ts
Iván Ovejero 4bad43dd66
refactor(core): Move typeorm operators from WaitTracker to ExecutionRepository (no-changelog) (#8163)
Follow-up to: #8145

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
2023-12-28 19:22:09 +01:00

188 lines
5.8 KiB
TypeScript

import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
WorkflowOperationError,
} from 'n8n-workflow';
import { Container, Service } from 'typedi';
import * as ResponseHelper from '@/ResponseHelper';
import type {
IExecutionResponse,
IExecutionsStopData,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from './services/ownership.service';
import { Logger } from '@/Logger';
@Service()
export class WaitTracker {
private waitingExecutions: {
[key: string]: {
executionId: string;
timer: NodeJS.Timeout;
};
} = {};
mainTimer: NodeJS.Timeout;
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService,
) {
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
void this.getWaitingExecutions();
}, 60000);
void this.getWaitingExecutions();
}
async getWaitingExecutions() {
this.logger.debug('Wait tracker querying database for waiting executions');
const executions = await this.executionRepository.getWaitingExecutions();
if (executions.length === 0) {
return;
}
const executionIds = executions.map((execution) => execution.id).join(', ');
this.logger.debug(
`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
);
// Add timers for each waiting execution that they get started at the correct time
for (const execution of executions) {
const executionId = execution.id;
if (this.waitingExecutions[executionId] === undefined) {
const triggerTime = execution.waitTill!.getTime() - new Date().getTime();
this.waitingExecutions[executionId] = {
executionId,
timer: setTimeout(() => {
this.startExecution(executionId);
}, triggerTime),
};
}
}
}
async stopExecution(executionId: string): Promise<IExecutionsStopData> {
if (this.waitingExecutions[executionId] !== undefined) {
// The waiting execution was already scheduled to execute.
// So stop timer and remove.
clearTimeout(this.waitingExecutions[executionId].timer);
delete this.waitingExecutions[executionId];
}
// Also check in database
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
});
if (!execution) {
throw new ApplicationError('Execution not found.', {
extra: { executionId },
});
}
if (!['new', 'unknown', 'waiting', 'running'].includes(execution.status)) {
throw new WorkflowOperationError(
`Only running or waiting executions can be stopped and ${executionId} is currently ${execution.status}.`,
);
}
let fullExecutionData: IExecutionResponse;
try {
fullExecutionData = ResponseHelper.unflattenExecutionData(execution);
} catch (error) {
// if the execution ended in an unforseen, non-cancelable state, try to recover it
await recoverExecutionDataFromEventLogMessages(executionId, [], true);
// find recovered data
const restoredExecution = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: true,
unflattenData: true,
},
);
if (!restoredExecution) {
throw new ApplicationError('Execution could not be recovered or canceled.', {
extra: { executionId },
});
}
fullExecutionData = restoredExecution;
}
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
fullExecutionData.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};
fullExecutionData.stoppedAt = new Date();
fullExecutionData.waitTill = null;
fullExecutionData.status = 'canceled';
await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
return {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
status: fullExecutionData.status,
};
}
startExecution(executionId: string) {
this.logger.debug(`Wait tracker resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId];
(async () => {
// Get the data to execute
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
throw new ApplicationError('Execution does not exist.', { extra: { executionId } });
}
if (fullExecutionData.finished) {
throw new ApplicationError('The execution did succeed and can so not be started again.');
}
if (!fullExecutionData.workflowData.id) {
throw new ApplicationError('Only saved workflows can be resumed.');
}
const workflowId = fullExecutionData.workflowData.id;
const user = await this.ownershipService.getWorkflowOwnerCached(workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
userId: user.id,
};
// Start the execution again
const workflowRunner = new WorkflowRunner();
await workflowRunner.run(data, false, false, executionId);
})().catch((error: Error) => {
ErrorReporter.error(error);
this.logger.error(
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
{ executionId },
);
});
}
}