mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-15 17:14:05 -08:00
56c4c6991f
* ⬆️ Upgrade TS to 4.3.5
* 👕 Add ESLint configs
* 🎨 Add Prettier config
* 📦 Add deps and commands
* ⚡ Adjust global .editorconfig to new ruleset
* 🔥 Remove unneeded local .editorconfig
* 📦 Update deps in editor-ui
* 🔨 Limit Prettier to only TS files
* ⚡ Add recommended VSCode extensions
* 👕 Fix build
* 🔥 Remove Vue setting from global config
* ⚡ Disable prefer-default-export per feedback
* ✏️ Add forgotten divider
* 👕 Disable no-plusplus
* 👕 Disable class-methods-use-this
* ✏️ Alphabetize overrides
* 👕 Add one-var consecutive override
* ⏪ Revert one-var consecutive override
This reverts commit b9252cf935
.
* 🎨 👕 Lint and format workflow package (#2121)
* 🎨 Format /workflow package
* 👕 Lint /workflow package
* 🎨 Re-format /workflow package
* 👕 Re-lint /workflow package
* ✏️ Fix typo
* ⚡ Consolidate if-checks
* 🔥 Remove prefer-default-export exceptions
* 🔥 Remove no-plusplus exceptions
* 🔥 Remove class-methods-use-this exceptions
* 🎨 👕 Lint and format node-dev package (#2122)
* 🎨 Format /node-dev package
* ⚡ Exclude templates from ESLint config
This keeps the templates consistent with the codebase while preventing lint exceptions from being made part of the templates.
* 👕 Lint /node-dev package
* 🔥 Remove prefer-default-export exceptions
* 🔥 Remove no-plusplus exceptions
* 🎨 👕 Lint and format core package (#2123)
* 🎨 Format /core package
* 👕 Lint /core package
* 🎨 Re-format /core package
* 👕 Re-lint /core package
* 🔥 Remove prefer-default-export exceptions
* 🔥 Remove no-plusplus exceptions
* 🔥 Remove class-methods-use-this exceptions
* 🎨 👕 Lint and format cli package (#2124)
* 🎨 Format /cli package
* 👕 Exclude migrations from linting
* 👕 Lint /cli package
* 🎨 Re-format /cli package
* 👕 Re-lint /cli package
* 👕 Fix build
* 🔥 Remove prefer-default-export exceptions
* ⚡ Update exceptions in ActiveExecutions
* 🔥 Remove no-plusplus exceptions
* 🔥 Remove class-methods-use-this exceptions
* 👕 fix lint issues
* 🔧 use package specific linter, remove tslint command
* 🔨 resolve build issue, sync dependencies
* 🔧 change lint command
Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com>
187 lines
5.8 KiB
TypeScript
187 lines
5.8 KiB
TypeScript
/* eslint-disable import/no-cycle */
|
|
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
|
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
|
|
/* eslint-disable @typescript-eslint/naming-convention */
|
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
|
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
|
/* eslint-disable @typescript-eslint/no-floating-promises */
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
import { IRun, LoggerProxy as Logger, WorkflowOperationError } from 'n8n-workflow';
|
|
|
|
import { FindManyOptions, LessThanOrEqual, ObjectLiteral } from 'typeorm';
|
|
|
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
|
import {
|
|
ActiveExecutions,
|
|
DatabaseType,
|
|
Db,
|
|
GenericHelpers,
|
|
IExecutionFlattedDb,
|
|
IExecutionsStopData,
|
|
IWorkflowExecutionDataProcess,
|
|
ResponseHelper,
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
WorkflowCredentials,
|
|
WorkflowRunner,
|
|
} from '.';
|
|
|
|
export class WaitTrackerClass {
|
|
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
|
|
|
|
private waitingExecutions: {
|
|
[key: string]: {
|
|
executionId: string;
|
|
timer: NodeJS.Timeout;
|
|
};
|
|
} = {};
|
|
|
|
mainTimer: NodeJS.Timeout;
|
|
|
|
constructor() {
|
|
this.activeExecutionsInstance = ActiveExecutions.getInstance();
|
|
|
|
// Poll every 60 seconds a list of upcoming executions
|
|
this.mainTimer = setInterval(() => {
|
|
this.getwaitingExecutions();
|
|
}, 60000);
|
|
|
|
this.getwaitingExecutions();
|
|
}
|
|
|
|
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
|
async getwaitingExecutions() {
|
|
Logger.debug('Wait tracker querying database for waiting executions');
|
|
// Find all the executions which should be triggered in the next 70 seconds
|
|
const findQuery: FindManyOptions<IExecutionFlattedDb> = {
|
|
select: ['id', 'waitTill'],
|
|
where: {
|
|
waitTill: LessThanOrEqual(new Date(Date.now() + 70000)),
|
|
},
|
|
order: {
|
|
waitTill: 'ASC',
|
|
},
|
|
};
|
|
const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType;
|
|
if (dbType === 'sqlite') {
|
|
// This is needed because of issue in TypeORM <> SQLite:
|
|
// https://github.com/typeorm/typeorm/issues/2286
|
|
(findQuery.where! as ObjectLiteral).waitTill = LessThanOrEqual(
|
|
DateUtils.mixedDateToUtcDatetimeString(new Date(Date.now() + 70000)),
|
|
);
|
|
}
|
|
|
|
const executions = await Db.collections.Execution!.find(findQuery);
|
|
|
|
if (executions.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const executionIds = executions.map((execution) => execution.id.toString()).join(', ');
|
|
Logger.debug(
|
|
`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
|
|
);
|
|
|
|
// Add timers for each waiting execution that they get started at the correct time
|
|
// eslint-disable-next-line no-restricted-syntax
|
|
for (const execution of executions) {
|
|
const executionId = execution.id.toString();
|
|
if (this.waitingExecutions[executionId] === undefined) {
|
|
const triggerTime = execution.waitTill!.getTime() - new Date().getTime();
|
|
this.waitingExecutions[executionId] = {
|
|
executionId,
|
|
timer: setTimeout(() => {
|
|
this.startExecution(executionId);
|
|
}, triggerTime),
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
async stopExecution(executionId: string): Promise<IExecutionsStopData> {
|
|
if (this.waitingExecutions[executionId] !== undefined) {
|
|
// The waiting execution was already sheduled to execute.
|
|
// So stop timer and remove.
|
|
clearTimeout(this.waitingExecutions[executionId].timer);
|
|
delete this.waitingExecutions[executionId];
|
|
}
|
|
|
|
// Also check in database
|
|
const execution = await Db.collections.Execution!.findOne(executionId);
|
|
|
|
if (execution === undefined || !execution.waitTill) {
|
|
throw new Error(`The execution ID "${executionId}" could not be found.`);
|
|
}
|
|
|
|
const fullExecutionData = ResponseHelper.unflattenExecutionData(execution);
|
|
|
|
// Set in execution in DB as failed and remove waitTill time
|
|
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
|
|
|
|
fullExecutionData.data.resultData.error = {
|
|
...error,
|
|
message: error.message,
|
|
stack: error.stack,
|
|
};
|
|
|
|
fullExecutionData.stoppedAt = new Date();
|
|
fullExecutionData.waitTill = undefined;
|
|
|
|
await Db.collections.Execution!.update(
|
|
executionId,
|
|
ResponseHelper.flattenExecutionData(fullExecutionData),
|
|
);
|
|
|
|
return {
|
|
mode: fullExecutionData.mode,
|
|
startedAt: new Date(fullExecutionData.startedAt),
|
|
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
|
|
finished: fullExecutionData.finished,
|
|
};
|
|
}
|
|
|
|
startExecution(executionId: string) {
|
|
Logger.debug(`Wait tracker resuming execution ${executionId}`, { executionId });
|
|
delete this.waitingExecutions[executionId];
|
|
|
|
(async () => {
|
|
// Get the data to execute
|
|
const fullExecutionDataFlatted = await Db.collections.Execution!.findOne(executionId);
|
|
|
|
if (fullExecutionDataFlatted === undefined) {
|
|
throw new Error(`The execution with the id "${executionId}" does not exist.`);
|
|
}
|
|
|
|
const fullExecutionData = ResponseHelper.unflattenExecutionData(fullExecutionDataFlatted);
|
|
|
|
if (fullExecutionData.finished) {
|
|
throw new Error('The execution did succeed and can so not be started again.');
|
|
}
|
|
|
|
const data: IWorkflowExecutionDataProcess = {
|
|
executionMode: fullExecutionData.mode,
|
|
executionData: fullExecutionData.data,
|
|
workflowData: fullExecutionData.workflowData,
|
|
};
|
|
|
|
// Start the execution again
|
|
const workflowRunner = new WorkflowRunner();
|
|
await workflowRunner.run(data, false, false, executionId);
|
|
})().catch((error) => {
|
|
Logger.error(
|
|
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
|
|
{ executionId },
|
|
);
|
|
});
|
|
}
|
|
}
|
|
|
|
let waitTrackerInstance: WaitTrackerClass | undefined;
|
|
|
|
export function WaitTracker(): WaitTrackerClass {
|
|
if (waitTrackerInstance === undefined) {
|
|
waitTrackerInstance = new WaitTrackerClass();
|
|
}
|
|
|
|
return waitTrackerInstance;
|
|
}
|