n8n/packages/cli/src/WaitTracker.ts
Iván Ovejero 56c4c6991f
🎨 Set up linting and formatting (#2120)
* ⬆️ Upgrade TS to 4.3.5

* 👕 Add ESLint configs

* 🎨 Add Prettier config

* 📦 Add deps and commands

*  Adjust global .editorconfig to new ruleset

* 🔥 Remove unneeded local .editorconfig

* 📦 Update deps in editor-ui

* 🔨 Limit Prettier to only TS files

*  Add recommended VSCode extensions

* 👕 Fix build

* 🔥 Remove Vue setting from global config

*  Disable prefer-default-export per feedback

* ✏️ Add forgotten divider

* 👕 Disable no-plusplus

* 👕 Disable class-methods-use-this

* ✏️ Alphabetize overrides

* 👕 Add one-var consecutive override

*  Revert one-var consecutive override

This reverts commit b9252cf935.

* 🎨 👕 Lint and format workflow package (#2121)

* 🎨 Format /workflow package

* 👕 Lint /workflow package

* 🎨 Re-format /workflow package

* 👕 Re-lint /workflow package

* ✏️ Fix typo

*  Consolidate if-checks

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format node-dev package (#2122)

* 🎨 Format /node-dev package

*  Exclude templates from ESLint config

This keeps the templates consistent with the codebase while preventing lint exceptions from being made part of the templates.

* 👕 Lint /node-dev package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🎨 👕 Lint and format core package (#2123)

* 🎨 Format /core package

* 👕 Lint /core package

* 🎨 Re-format /core package

* 👕 Re-lint /core package

* 🔥 Remove prefer-default-export exceptions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 🎨 👕 Lint and format cli package (#2124)

* 🎨 Format /cli package

* 👕 Exclude migrations from linting

* 👕 Lint /cli package

* 🎨 Re-format /cli package

* 👕 Re-lint /cli package

* 👕 Fix build

* 🔥 Remove prefer-default-export exceptions

*  Update exceptions in ActiveExecutions

* 🔥 Remove no-plusplus exceptions

* 🔥 Remove class-methods-use-this exceptions

* 👕 fix lint issues

* 🔧 use package specific linter, remove tslint command

* 🔨 resolve build issue, sync dependencies

* 🔧 change lint command

Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com>
2021-08-29 20:58:11 +02:00

187 lines
5.8 KiB
TypeScript

/* eslint-disable import/no-cycle */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
/* eslint-disable @typescript-eslint/naming-convention */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-floating-promises */
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { IRun, LoggerProxy as Logger, WorkflowOperationError } from 'n8n-workflow';
import { FindManyOptions, LessThanOrEqual, ObjectLiteral } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import {
ActiveExecutions,
DatabaseType,
Db,
GenericHelpers,
IExecutionFlattedDb,
IExecutionsStopData,
IWorkflowExecutionDataProcess,
ResponseHelper,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
WorkflowCredentials,
WorkflowRunner,
} from '.';
export class WaitTrackerClass {
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
private waitingExecutions: {
[key: string]: {
executionId: string;
timer: NodeJS.Timeout;
};
} = {};
mainTimer: NodeJS.Timeout;
constructor() {
this.activeExecutionsInstance = ActiveExecutions.getInstance();
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
this.getwaitingExecutions();
}, 60000);
this.getwaitingExecutions();
}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async getwaitingExecutions() {
Logger.debug('Wait tracker querying database for waiting executions');
// Find all the executions which should be triggered in the next 70 seconds
const findQuery: FindManyOptions<IExecutionFlattedDb> = {
select: ['id', 'waitTill'],
where: {
waitTill: LessThanOrEqual(new Date(Date.now() + 70000)),
},
order: {
waitTill: 'ASC',
},
};
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
if (dbType === 'sqlite') {
// This is needed because of issue in TypeORM <> SQLite:
// https://github.com/typeorm/typeorm/issues/2286
(findQuery.where! as ObjectLiteral).waitTill = LessThanOrEqual(
DateUtils.mixedDateToUtcDatetimeString(new Date(Date.now() + 70000)),
);
}
const executions = await Db.collections.Execution!.find(findQuery);
if (executions.length === 0) {
return;
}
const executionIds = executions.map((execution) => execution.id.toString()).join(', ');
Logger.debug(
`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
);
// Add timers for each waiting execution that they get started at the correct time
// eslint-disable-next-line no-restricted-syntax
for (const execution of executions) {
const executionId = execution.id.toString();
if (this.waitingExecutions[executionId] === undefined) {
const triggerTime = execution.waitTill!.getTime() - new Date().getTime();
this.waitingExecutions[executionId] = {
executionId,
timer: setTimeout(() => {
this.startExecution(executionId);
}, triggerTime),
};
}
}
}
async stopExecution(executionId: string): Promise<IExecutionsStopData> {
if (this.waitingExecutions[executionId] !== undefined) {
// The waiting execution was already sheduled to execute.
// So stop timer and remove.
clearTimeout(this.waitingExecutions[executionId].timer);
delete this.waitingExecutions[executionId];
}
// Also check in database
const execution = await Db.collections.Execution!.findOne(executionId);
if (execution === undefined || !execution.waitTill) {
throw new Error(`The execution ID "${executionId}" could not be found.`);
}
const fullExecutionData = ResponseHelper.unflattenExecutionData(execution);
// Set in execution in DB as failed and remove waitTill time
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
fullExecutionData.data.resultData.error = {
...error,
message: error.message,
stack: error.stack,
};
fullExecutionData.stoppedAt = new Date();
fullExecutionData.waitTill = undefined;
await Db.collections.Execution!.update(
executionId,
ResponseHelper.flattenExecutionData(fullExecutionData),
);
return {
mode: fullExecutionData.mode,
startedAt: new Date(fullExecutionData.startedAt),
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
finished: fullExecutionData.finished,
};
}
startExecution(executionId: string) {
Logger.debug(`Wait tracker resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId];
(async () => {
// Get the data to execute
const fullExecutionDataFlatted = await Db.collections.Execution!.findOne(executionId);
if (fullExecutionDataFlatted === undefined) {
throw new Error(`The execution with the id "${executionId}" does not exist.`);
}
const fullExecutionData = ResponseHelper.unflattenExecutionData(fullExecutionDataFlatted);
if (fullExecutionData.finished) {
throw new Error('The execution did succeed and can so not be started again.');
}
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
};
// Start the execution again
const workflowRunner = new WorkflowRunner();
await workflowRunner.run(data, false, false, executionId);
})().catch((error) => {
Logger.error(
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
{ executionId },
);
});
}
}
let waitTrackerInstance: WaitTrackerClass | undefined;
export function WaitTracker(): WaitTrackerClass {
if (waitTrackerInstance === undefined) {
waitTrackerInstance = new WaitTrackerClass();
}
return waitTrackerInstance;
}