n8n/packages/cli/src/executions/executions.service.ts
Michael Auerswald c3ba0123ad
feat: Migrate integer primary keys to nanoids (#6345)
* first commit for postgres migration

* (not working)

* sqlite migration

* quicksave

* fix tests

* fix pg test

* fix postgres

* fix variables import

* fix execution saving

* add user settings fix

* change migration to single lines

* patch preferences endpoint

* cleanup

* improve variable import

* cleanup unusued code

* Update packages/cli/src/PublicApi/v1/handlers/workflows/workflows.handler.ts

Co-authored-by: Omar Ajoue <krynble@gmail.com>

* address review notes

* fix var update/import

* refactor: Separate execution data to its own table (#6323)

* wip: Temporary migration process

* refactor: Create boilerplate repository methods for executions

* fix: Lint issues

* refactor: Added search endpoint to repository

* refactor: Make the execution list work again

* wip: Updating how we create and update executions everywhere

* fix: Lint issues and remove most of the direct access to execution model

* refactor: Remove includeWorkflowData flag and fix more tests

* fix: Lint issues

* fix: Fixed ordering of executions for FE, removed transaction when saving execution and removed unnecessary update

* refactor: Add comment about missing feature

* refactor: Refactor counting executions

* refactor: Add migration for other dbms and fix issues found

* refactor: Fix lint issues

* refactor: Remove unnecessary comment and auto inject repo to internal hooks

* refactor: remove type assertion

* fix: Fix broken tests

* fix: Remove unnecessary import

* Remove unnecessary toString() call

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* fix: Address comments after review

* refactor: Remove unused import

* fix: Lint issues

* fix: Add correct migration files

---------

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* remove null values from credential export

* fix: Fix an issue with queue mode where all running execution would be returned

* fix: Update n8n node to allow for workflow ids with letters

* set upstream on set branch

* remove typo

* add nodeAccess to credentials

* fix unsaved run check for undefined id

* fix(core): Rename version control feature to source control (#6480)

* rename versionControl to sourceControl

* fix source control tooltip wording

---------

Co-authored-by: Romain Minaud <romain.minaud@gmail.com>

* fix(editor): Pay 548 hide the set up version control button (#6485)

* feat(DebugHelper Node): Fix and include in main app (#6406)

* improve node a bit

* fixing continueOnFail() ton contain error in json

* improve pairedItem

* fix random data returning object results

* fix nanoId length typo

* update pnpm-lock file

---------

Co-authored-by: Marcus <marcus@n8n.io>

* fix(editor): Remove setup source control CTA button

* fix(editor): Remove setup source control CTA button

---------

Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com>
Co-authored-by: Marcus <marcus@n8n.io>

* fix(editor): Update source control docs links (#6488)

* feat(DebugHelper Node): Fix and include in main app (#6406)

* improve node a bit

* fixing continueOnFail() ton contain error in json

* improve pairedItem

* fix random data returning object results

* fix nanoId length typo

* update pnpm-lock file

---------

Co-authored-by: Marcus <marcus@n8n.io>

* feat(editor): Replace root events with event bus events (no-changelog) (#6454)

* feat: replace root events with event bus events

* fix: prevent cypress from replacing global with globalThis in import path

* feat: remove emitter mixin

* fix: replace component events with event bus

* fix: fix linting issue

* fix: fix breaking expression switch

* chore: prettify ndv e2e suite code

* fix(editor): Update source control docs links

---------

Co-authored-by: Michael Auerswald <michael.auerswald@gmail.com>
Co-authored-by: Marcus <marcus@n8n.io>
Co-authored-by: Alex Grozav <alex@grozav.com>

* fix tag endpoint regex

---------

Co-authored-by: Omar Ajoue <krynble@gmail.com>
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
Co-authored-by: Romain Minaud <romain.minaud@gmail.com>
Co-authored-by: Csaba Tuncsik <csaba@n8n.io>
Co-authored-by: Marcus <marcus@n8n.io>
Co-authored-by: Alex Grozav <alex@grozav.com>
2023-06-20 19:13:18 +02:00

364 lines
11 KiB
TypeScript

/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { validate as jsonSchemaValidate } from 'jsonschema';
import type { IWorkflowBase, JsonObject, ExecutionStatus } from 'n8n-workflow';
import { LoggerProxy, jsonParse, Workflow } from 'n8n-workflow';
import type { FindOperator } from 'typeorm';
import { In } from 'typeorm';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import type { User } from '@db/entities/User';
import type {
IExecutionFlattedResponse,
IExecutionResponse,
IExecutionsListResponse,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { Queue } from '@/Queue';
import type { ExecutionRequest } from '@/requests';
import * as ResponseHelper from '@/ResponseHelper';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import { Container } from 'typedi';
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
import { ExecutionRepository } from '@/databases/repositories';
export interface IGetExecutionsQueryFilter {
id?: FindOperator<string> | string;
finished?: boolean;
mode?: string;
retryOf?: string;
retrySuccessId?: string;
status?: ExecutionStatus[];
workflowId?: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
waitTill?: FindOperator<any> | boolean;
metadata?: Array<{ key: string; value: string }>;
startedAfter?: string;
startedBefore?: string;
}
const schemaGetExecutionsQueryFilter = {
$id: '/IGetExecutionsQueryFilter',
type: 'object',
properties: {
id: { type: 'string' },
finished: { type: 'boolean' },
mode: { type: 'string' },
retryOf: { type: 'string' },
retrySuccessId: { type: 'string' },
status: {
type: 'array',
items: { type: 'string' },
},
waitTill: { type: 'boolean' },
workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] },
metadata: { type: 'array', items: { $ref: '#/$defs/metadata' } },
startedAfter: { type: 'date-time' },
startedBefore: { type: 'date-time' },
},
$defs: {
metadata: {
type: 'object',
required: ['key', 'value'],
properties: {
key: {
type: 'string',
},
value: { type: 'string' },
},
},
},
};
const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties);
export class ExecutionsService {
/**
* Function to get the workflow Ids for a User
* Overridden in EE version to ignore roles
*/
static async getWorkflowIdsForUser(user: User): Promise<string[]> {
// Get all workflows using owner role
return getSharedWorkflowIds(user, ['owner']);
}
static async getExecutionsList(req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
if (sharedWorkflowIds.length === 0) {
// return early since without shared workflows there can be no hits
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
return {
count: 0,
estimated: false,
results: [],
};
}
// parse incoming filter object and remove non-valid fields
let filter: IGetExecutionsQueryFilter | undefined = undefined;
if (req.query.filter) {
try {
const filterJson: JsonObject = jsonParse(req.query.filter);
if (filterJson) {
Object.keys(filterJson).map((key) => {
if (!allowedExecutionsQueryFilterFields.includes(key)) delete filterJson[key];
});
if (jsonSchemaValidate(filterJson, schemaGetExecutionsQueryFilter).valid) {
filter = filterJson as IGetExecutionsQueryFilter;
}
}
} catch (error) {
LoggerProxy.error('Failed to parse filter', {
userId: req.user.id,
filter: req.query.filter,
});
throw new ResponseHelper.InternalServerError(
'Parameter "filter" contained invalid JSON string.',
);
}
}
// safeguard against querying workflowIds not shared with the user
const workflowId = filter?.workflowId?.toString();
if (workflowId !== undefined && !sharedWorkflowIds.includes(workflowId)) {
LoggerProxy.verbose(
`User ${req.user.id} attempted to query non-shared workflow ${workflowId}`,
);
return {
count: 0,
estimated: false,
results: [],
};
}
const limit = req.query.limit
? parseInt(req.query.limit, 10)
: GenericHelpers.DEFAULT_EXECUTIONS_GET_ALL_LIMIT;
const executingWorkflowIds: string[] = [];
if (config.getEnv('executions.mode') === 'queue') {
const queue = Container.get(Queue);
const currentJobs = await queue.getJobs(['active', 'waiting']);
executingWorkflowIds.push(...currentJobs.map(({ data }) => data.executionId));
}
// We may have manual executions even with queue so we must account for these.
executingWorkflowIds.push(
...Container.get(ActiveExecutions)
.getActiveExecutions()
.map(({ id }) => id),
);
const { count, estimated } = await Container.get(ExecutionRepository).countExecutions(
filter,
sharedWorkflowIds,
executingWorkflowIds,
req.user.globalRole.name === 'owner',
);
const formattedExecutions = await Container.get(ExecutionRepository).searchExecutions(
filter,
limit,
executingWorkflowIds,
sharedWorkflowIds,
{
lastId: req.query.lastId,
firstId: req.query.firstId,
},
);
return {
count,
results: formattedExecutions,
estimated,
};
}
static async getExecution(
req: ExecutionRequest.Get,
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
if (!sharedWorkflowIds.length) return undefined;
const { id: executionId } = req.params;
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
where: {
id: executionId,
workflowId: In(sharedWorkflowIds),
},
includeData: true,
unflattenData: false,
});
if (!execution) {
LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', {
userId: req.user.id,
executionId,
});
return undefined;
}
if (!execution.status) {
execution.status = getStatusUsingPreviousExecutionStatusMethod(execution);
}
return execution;
}
static async retryExecution(req: ExecutionRequest.Retry): Promise<boolean> {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
if (!sharedWorkflowIds.length) return false;
const { id: executionId } = req.params;
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
where: {
workflowId: In(sharedWorkflowIds),
},
includeData: true,
unflattenData: true,
});
if (!execution) {
LoggerProxy.info(
'Attempt to retry an execution was blocked due to insufficient permissions',
{
userId: req.user.id,
executionId,
},
);
throw new ResponseHelper.NotFoundError(
`The execution with the ID "${executionId}" does not exist.`,
);
}
if (execution.finished) {
throw new Error('The execution succeeded, so it cannot be retried.');
}
const executionMode = 'retry';
execution.workflowData.active = false;
// Start the workflow
const data: IWorkflowExecutionDataProcess = {
executionMode,
executionData: execution.data,
retryOf: req.params.id,
workflowData: execution.workflowData,
userId: req.user.id,
};
const { lastNodeExecuted } = data.executionData!.resultData;
if (lastNodeExecuted) {
// Remove the old error and the data of the last run of the node that it can be replaced
delete data.executionData!.resultData.error;
const { length } = data.executionData!.resultData.runData[lastNodeExecuted];
if (
length > 0 &&
data.executionData!.resultData.runData[lastNodeExecuted][length - 1].error !== undefined
) {
// Remove results only if it is an error.
// If we are retrying due to a crash, the information is simply success info from last node
data.executionData!.resultData.runData[lastNodeExecuted].pop();
// Stack will determine what to run next
}
}
if (req.body.loadWorkflow) {
// Loads the currently saved workflow to execute instead of the
// one saved at the time of the execution.
const workflowId = execution.workflowData.id as string;
const workflowData = (await Db.collections.Workflow.findOneBy({
id: workflowId,
})) as IWorkflowBase;
if (workflowData === undefined) {
throw new Error(
`The workflow with the ID "${workflowId}" could not be found and so the data not be loaded for the retry.`,
);
}
data.workflowData = workflowData;
const nodeTypes = Container.get(NodeTypes);
const workflowInstance = new Workflow({
id: workflowData.id as string,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: false,
nodeTypes,
staticData: undefined,
settings: workflowData.settings,
});
// Replace all of the nodes in the execution stack with the ones of the new workflow
for (const stack of data.executionData!.executionData!.nodeExecutionStack) {
// Find the data of the last executed node in the new workflow
const node = workflowInstance.getNode(stack.node.name);
if (node === null) {
LoggerProxy.error('Failed to retry an execution because a node could not be found', {
userId: req.user.id,
executionId,
nodeName: stack.node.name,
});
throw new Error(
`Could not find the node "${stack.node.name}" in workflow. It probably got deleted or renamed. Without it the workflow can sadly not be retried.`,
);
}
// Replace the node data in the stack that it really uses the current data
stack.node = node;
}
}
const workflowRunner = new WorkflowRunner();
const retriedExecutionId = await workflowRunner.run(data);
const executionData = await Container.get(ActiveExecutions).getPostExecutePromise(
retriedExecutionId,
);
if (!executionData) {
throw new Error('The retry did not start for an unknown reason.');
}
return !!executionData.finished;
}
static async deleteExecutions(req: ExecutionRequest.Delete): Promise<void> {
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
if (sharedWorkflowIds.length === 0) {
// return early since without shared workflows there can be no hits
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
return;
}
const { deleteBefore, ids, filters: requestFiltersRaw } = req.body;
let requestFilters;
if (requestFiltersRaw) {
try {
Object.keys(requestFiltersRaw).map((key) => {
if (!allowedExecutionsQueryFilterFields.includes(key)) delete requestFiltersRaw[key];
});
if (jsonSchemaValidate(requestFiltersRaw, schemaGetExecutionsQueryFilter).valid) {
requestFilters = requestFiltersRaw as IGetExecutionsQueryFilter;
}
} catch (error) {
throw new ResponseHelper.InternalServerError(
'Parameter "filter" contained invalid JSON string.',
);
}
}
return Container.get(ExecutionRepository).deleteExecutions(requestFilters, sharedWorkflowIds, {
deleteBefore,
ids,
});
}
}