mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-13 16:14:07 -08:00
c3ba0123ad
* first commit for postgres migration * (not working) * sqlite migration * quicksave * fix tests * fix pg test * fix postgres * fix variables import * fix execution saving * add user settings fix * change migration to single lines * patch preferences endpoint * cleanup * improve variable import * cleanup unusued code * Update packages/cli/src/PublicApi/v1/handlers/workflows/workflows.handler.ts Co-authored-by: Omar Ajoue <krynble@gmail.com> * address review notes * fix var update/import * refactor: Separate execution data to its own table (#6323) * wip: Temporary migration process * refactor: Create boilerplate repository methods for executions * fix: Lint issues * refactor: Added search endpoint to repository * refactor: Make the execution list work again * wip: Updating how we create and update executions everywhere * fix: Lint issues and remove most of the direct access to execution model * refactor: Remove includeWorkflowData flag and fix more tests * fix: Lint issues * fix: Fixed ordering of executions for FE, removed transaction when saving execution and removed unnecessary update * refactor: Add comment about missing feature * refactor: Refactor counting executions * refactor: Add migration for other dbms and fix issues found * refactor: Fix lint issues * refactor: Remove unnecessary comment and auto inject repo to internal hooks * refactor: remove type assertion * fix: Fix broken tests * fix: Remove unnecessary import * Remove unnecessary toString() call Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * fix: Address comments after review * refactor: Remove unused import * fix: Lint issues * fix: Add correct migration files --------- Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * remove null values from credential export * fix: Fix an issue with queue mode where all running execution would be returned * fix: Update n8n node to allow for workflow ids with letters * set upstream on set branch * remove typo * add nodeAccess to credentials * fix unsaved run check for undefined id * fix(core): Rename version control feature to source control (#6480) * rename versionControl to sourceControl * fix source control tooltip wording --------- Co-authored-by: Romain Minaud <romain.minaud@gmail.com> * fix(editor): Pay 548 hide the set up version control button (#6485) * feat(DebugHelper Node): Fix and include in main app (#6406) * improve node a bit * fixing continueOnFail() ton contain error in json * improve pairedItem * fix random data returning object results * fix nanoId length typo * update pnpm-lock file --------- Co-authored-by: Marcus <marcus@n8n.io> * fix(editor): Remove setup source control CTA button * fix(editor): Remove setup source control CTA button --------- Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com> Co-authored-by: Marcus <marcus@n8n.io> * fix(editor): Update source control docs links (#6488) * feat(DebugHelper Node): Fix and include in main app (#6406) * improve node a bit * fixing continueOnFail() ton contain error in json * improve pairedItem * fix random data returning object results * fix nanoId length typo * update pnpm-lock file --------- Co-authored-by: Marcus <marcus@n8n.io> * feat(editor): Replace root events with event bus events (no-changelog) (#6454) * feat: replace root events with event bus events * fix: prevent cypress from replacing global with globalThis in import path * feat: remove emitter mixin * fix: replace component events with event bus * fix: fix linting issue * fix: fix breaking expression switch * chore: prettify ndv e2e suite code * fix(editor): Update source control docs links --------- Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com> Co-authored-by: Marcus <marcus@n8n.io> Co-authored-by: Alex Grozav <alex@grozav.com> * fix tag endpoint regex --------- Co-authored-by: Omar Ajoue <krynble@gmail.com> Co-authored-by: Iván Ovejero <ivov.src@gmail.com> Co-authored-by: Romain Minaud <romain.minaud@gmail.com> Co-authored-by: Csaba Tuncsik <csaba@n8n.io> Co-authored-by: Marcus <marcus@n8n.io> Co-authored-by: Alex Grozav <alex@grozav.com>
210 lines
6.8 KiB
TypeScript
210 lines
6.8 KiB
TypeScript
/* eslint-disable @typescript-eslint/no-non-null-assertion */
|
|
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
|
|
/* eslint-disable @typescript-eslint/naming-convention */
|
|
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
|
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
|
import {
|
|
ErrorReporterProxy as ErrorReporter,
|
|
LoggerProxy as Logger,
|
|
WorkflowOperationError,
|
|
} from 'n8n-workflow';
|
|
import Container, { Service } from 'typedi';
|
|
import type { FindManyOptions, ObjectLiteral } from 'typeorm';
|
|
import { Not, LessThanOrEqual } from 'typeorm';
|
|
import { DateUtils } from 'typeorm/util/DateUtils';
|
|
|
|
import config from '@/config';
|
|
import * as ResponseHelper from '@/ResponseHelper';
|
|
import type {
|
|
IExecutionResponse,
|
|
IExecutionsStopData,
|
|
IWorkflowExecutionDataProcess,
|
|
} from '@/Interfaces';
|
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
|
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
|
|
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
|
|
import { ExecutionRepository } from './databases/repositories';
|
|
import type { ExecutionEntity } from './databases/entities/ExecutionEntity';
|
|
|
|
@Service()
|
|
export class WaitTracker {
|
|
private waitingExecutions: {
|
|
[key: string]: {
|
|
executionId: string;
|
|
timer: NodeJS.Timeout;
|
|
};
|
|
} = {};
|
|
|
|
mainTimer: NodeJS.Timeout;
|
|
|
|
constructor(private executionRepository: ExecutionRepository) {
|
|
// Poll every 60 seconds a list of upcoming executions
|
|
this.mainTimer = setInterval(() => {
|
|
void this.getWaitingExecutions();
|
|
}, 60000);
|
|
|
|
void this.getWaitingExecutions();
|
|
}
|
|
|
|
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
|
|
async getWaitingExecutions() {
|
|
Logger.debug('Wait tracker querying database for waiting executions');
|
|
// Find all the executions which should be triggered in the next 70 seconds
|
|
const findQuery: FindManyOptions<ExecutionEntity> = {
|
|
select: ['id', 'waitTill'],
|
|
where: {
|
|
waitTill: LessThanOrEqual(new Date(Date.now() + 70000)),
|
|
status: Not('crashed'),
|
|
},
|
|
order: {
|
|
waitTill: 'ASC',
|
|
},
|
|
};
|
|
|
|
const dbType = config.getEnv('database.type');
|
|
if (dbType === 'sqlite') {
|
|
// This is needed because of issue in TypeORM <> SQLite:
|
|
// https://github.com/typeorm/typeorm/issues/2286
|
|
(findQuery.where! as ObjectLiteral).waitTill = LessThanOrEqual(
|
|
DateUtils.mixedDateToUtcDatetimeString(new Date(Date.now() + 70000)),
|
|
);
|
|
}
|
|
|
|
const executions = await this.executionRepository.findMultipleExecutions(findQuery);
|
|
|
|
if (executions.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const executionIds = executions.map((execution) => execution.id).join(', ');
|
|
Logger.debug(
|
|
`Wait tracker found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
|
|
);
|
|
|
|
// Add timers for each waiting execution that they get started at the correct time
|
|
// eslint-disable-next-line no-restricted-syntax
|
|
for (const execution of executions) {
|
|
const executionId = execution.id;
|
|
if (this.waitingExecutions[executionId] === undefined) {
|
|
const triggerTime = execution.waitTill!.getTime() - new Date().getTime();
|
|
this.waitingExecutions[executionId] = {
|
|
executionId,
|
|
timer: setTimeout(() => {
|
|
this.startExecution(executionId);
|
|
}, triggerTime),
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
async stopExecution(executionId: string): Promise<IExecutionsStopData> {
|
|
if (this.waitingExecutions[executionId] !== undefined) {
|
|
// The waiting execution was already scheduled to execute.
|
|
// So stop timer and remove.
|
|
clearTimeout(this.waitingExecutions[executionId].timer);
|
|
delete this.waitingExecutions[executionId];
|
|
}
|
|
|
|
// Also check in database
|
|
const execution = await this.executionRepository.findSingleExecution(executionId, {
|
|
includeData: true,
|
|
});
|
|
|
|
if (!execution) {
|
|
throw new Error(`The execution ID "${executionId}" could not be found.`);
|
|
}
|
|
|
|
if (!['new', 'unknown', 'waiting', 'running'].includes(execution.status)) {
|
|
throw new Error(
|
|
`Only running or waiting executions can be stopped and ${executionId} is currently ${execution.status}.`,
|
|
);
|
|
}
|
|
let fullExecutionData: IExecutionResponse;
|
|
try {
|
|
fullExecutionData = ResponseHelper.unflattenExecutionData(execution);
|
|
} catch (error) {
|
|
// if the execution ended in an unforseen, non-cancelable state, try to recover it
|
|
await recoverExecutionDataFromEventLogMessages(executionId, [], true);
|
|
// find recovered data
|
|
const restoredExecution = await Container.get(ExecutionRepository).findSingleExecution(
|
|
executionId,
|
|
{
|
|
includeData: true,
|
|
unflattenData: true,
|
|
},
|
|
);
|
|
if (!restoredExecution) {
|
|
throw new Error(`Execution ${executionId} could not be recovered or canceled.`);
|
|
}
|
|
fullExecutionData = restoredExecution;
|
|
}
|
|
// Set in execution in DB as failed and remove waitTill time
|
|
const error = new WorkflowOperationError('Workflow-Execution has been canceled!');
|
|
|
|
fullExecutionData.data.resultData.error = {
|
|
...error,
|
|
message: error.message,
|
|
stack: error.stack,
|
|
};
|
|
|
|
fullExecutionData.stoppedAt = new Date();
|
|
fullExecutionData.waitTill = null;
|
|
fullExecutionData.status = 'canceled';
|
|
|
|
await Container.get(ExecutionRepository).updateExistingExecution(
|
|
executionId,
|
|
fullExecutionData,
|
|
);
|
|
|
|
return {
|
|
mode: fullExecutionData.mode,
|
|
startedAt: new Date(fullExecutionData.startedAt),
|
|
stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined,
|
|
finished: fullExecutionData.finished,
|
|
status: fullExecutionData.status,
|
|
};
|
|
}
|
|
|
|
startExecution(executionId: string) {
|
|
Logger.debug(`Wait tracker resuming execution ${executionId}`, { executionId });
|
|
delete this.waitingExecutions[executionId];
|
|
|
|
(async () => {
|
|
// Get the data to execute
|
|
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
|
|
includeData: true,
|
|
unflattenData: true,
|
|
});
|
|
|
|
if (!fullExecutionData) {
|
|
throw new Error(`The execution with the id "${executionId}" does not exist.`);
|
|
}
|
|
if (fullExecutionData.finished) {
|
|
throw new Error('The execution did succeed and can so not be started again.');
|
|
}
|
|
|
|
if (!fullExecutionData.workflowData.id) {
|
|
throw new Error('Only saved workflows can be resumed.');
|
|
}
|
|
const user = await getWorkflowOwner(fullExecutionData.workflowData.id);
|
|
|
|
const data: IWorkflowExecutionDataProcess = {
|
|
executionMode: fullExecutionData.mode,
|
|
executionData: fullExecutionData.data,
|
|
workflowData: fullExecutionData.workflowData,
|
|
userId: user.id,
|
|
};
|
|
|
|
// Start the execution again
|
|
const workflowRunner = new WorkflowRunner();
|
|
await workflowRunner.run(data, false, false, executionId);
|
|
})().catch((error: Error) => {
|
|
ErrorReporter.error(error);
|
|
Logger.error(
|
|
`There was a problem starting the waiting execution with id "${executionId}": "${error.message}"`,
|
|
{ executionId },
|
|
);
|
|
});
|
|
}
|
|
}
|