2022-10-26 04:30:35 -07:00
|
|
|
import { validate as jsonSchemaValidate } from 'jsonschema';
|
2023-06-20 10:13:18 -07:00
|
|
|
import type { IWorkflowBase, JsonObject, ExecutionStatus } from 'n8n-workflow';
|
2023-10-25 07:35:22 -07:00
|
|
|
import { jsonParse, Workflow } from 'n8n-workflow';
|
2023-06-20 10:13:18 -07:00
|
|
|
import type { FindOperator } from 'typeorm';
|
|
|
|
import { In } from 'typeorm';
|
2023-02-21 10:21:56 -08:00
|
|
|
import { ActiveExecutions } from '@/ActiveExecutions';
|
2022-11-30 05:00:28 -08:00
|
|
|
import config from '@/config';
|
2023-01-27 02:19:47 -08:00
|
|
|
import type { User } from '@db/entities/User';
|
2023-01-27 05:56:56 -08:00
|
|
|
import type {
|
2022-08-26 08:31:28 -07:00
|
|
|
IExecutionFlattedResponse,
|
|
|
|
IExecutionResponse,
|
|
|
|
IExecutionsListResponse,
|
|
|
|
IWorkflowExecutionDataProcess,
|
2022-11-09 06:25:00 -08:00
|
|
|
} from '@/Interfaces';
|
|
|
|
import { NodeTypes } from '@/NodeTypes';
|
2023-03-16 07:34:13 -07:00
|
|
|
import { Queue } from '@/Queue';
|
2022-11-09 06:25:00 -08:00
|
|
|
import type { ExecutionRequest } from '@/requests';
|
2022-11-30 05:00:28 -08:00
|
|
|
import * as ResponseHelper from '@/ResponseHelper';
|
2022-11-09 06:25:00 -08:00
|
|
|
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
|
2022-11-30 05:00:28 -08:00
|
|
|
import { WorkflowRunner } from '@/WorkflowRunner';
|
2022-12-21 01:46:26 -08:00
|
|
|
import * as Db from '@/Db';
|
|
|
|
import * as GenericHelpers from '@/GenericHelpers';
|
2023-02-21 10:21:56 -08:00
|
|
|
import { Container } from 'typedi';
|
2023-06-20 10:13:18 -07:00
|
|
|
import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers';
|
2023-07-13 01:14:48 -07:00
|
|
|
import { ExecutionRepository } from '@db/repositories';
|
2023-10-25 07:35:22 -07:00
|
|
|
import { Logger } from '@/Logger';
|
2023-06-20 10:13:18 -07:00
|
|
|
|
|
|
|
export interface IGetExecutionsQueryFilter {
|
2023-03-23 10:07:46 -07:00
|
|
|
id?: FindOperator<string> | string;
|
2022-11-30 05:00:28 -08:00
|
|
|
finished?: boolean;
|
|
|
|
mode?: string;
|
|
|
|
retryOf?: string;
|
|
|
|
retrySuccessId?: string;
|
2023-02-17 01:54:07 -08:00
|
|
|
status?: ExecutionStatus[];
|
2023-01-02 08:42:32 -08:00
|
|
|
workflowId?: string;
|
2022-11-30 05:00:28 -08:00
|
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
|
|
waitTill?: FindOperator<any> | boolean;
|
2023-03-23 10:07:46 -07:00
|
|
|
metadata?: Array<{ key: string; value: string }>;
|
|
|
|
startedAfter?: string;
|
|
|
|
startedBefore?: string;
|
2022-11-30 05:00:28 -08:00
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2022-10-26 04:30:35 -07:00
|
|
|
const schemaGetExecutionsQueryFilter = {
|
|
|
|
$id: '/IGetExecutionsQueryFilter',
|
|
|
|
type: 'object',
|
|
|
|
properties: {
|
2023-03-23 10:07:46 -07:00
|
|
|
id: { type: 'string' },
|
2022-10-26 04:30:35 -07:00
|
|
|
finished: { type: 'boolean' },
|
|
|
|
mode: { type: 'string' },
|
|
|
|
retryOf: { type: 'string' },
|
|
|
|
retrySuccessId: { type: 'string' },
|
2023-02-17 01:54:07 -08:00
|
|
|
status: {
|
|
|
|
type: 'array',
|
|
|
|
items: { type: 'string' },
|
|
|
|
},
|
2022-10-26 04:30:35 -07:00
|
|
|
waitTill: { type: 'boolean' },
|
|
|
|
workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] },
|
2023-03-23 10:07:46 -07:00
|
|
|
metadata: { type: 'array', items: { $ref: '#/$defs/metadata' } },
|
|
|
|
startedAfter: { type: 'date-time' },
|
|
|
|
startedBefore: { type: 'date-time' },
|
|
|
|
},
|
|
|
|
$defs: {
|
|
|
|
metadata: {
|
|
|
|
type: 'object',
|
|
|
|
required: ['key', 'value'],
|
|
|
|
properties: {
|
|
|
|
key: {
|
|
|
|
type: 'string',
|
|
|
|
},
|
|
|
|
value: { type: 'string' },
|
|
|
|
},
|
|
|
|
},
|
2022-10-26 04:30:35 -07:00
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties);
|
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
export class ExecutionsService {
|
|
|
|
/**
|
|
|
|
* Function to get the workflow Ids for a User
|
|
|
|
* Overridden in EE version to ignore roles
|
|
|
|
*/
|
2022-12-19 08:53:36 -08:00
|
|
|
static async getWorkflowIdsForUser(user: User): Promise<string[]> {
|
2022-11-30 05:00:28 -08:00
|
|
|
// Get all workflows using owner role
|
|
|
|
return getSharedWorkflowIds(user, ['owner']);
|
2022-08-26 08:31:28 -07:00
|
|
|
}
|
2022-11-30 05:00:28 -08:00
|
|
|
|
|
|
|
static async getExecutionsList(req: ExecutionRequest.GetAll): Promise<IExecutionsListResponse> {
|
|
|
|
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
|
2022-10-26 04:30:35 -07:00
|
|
|
if (sharedWorkflowIds.length === 0) {
|
|
|
|
// return early since without shared workflows there can be no hits
|
|
|
|
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
|
|
|
|
return {
|
|
|
|
count: 0,
|
|
|
|
estimated: false,
|
|
|
|
results: [],
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
// parse incoming filter object and remove non-valid fields
|
|
|
|
let filter: IGetExecutionsQueryFilter | undefined = undefined;
|
|
|
|
if (req.query.filter) {
|
|
|
|
try {
|
|
|
|
const filterJson: JsonObject = jsonParse(req.query.filter);
|
|
|
|
if (filterJson) {
|
|
|
|
Object.keys(filterJson).map((key) => {
|
|
|
|
if (!allowedExecutionsQueryFilterFields.includes(key)) delete filterJson[key];
|
|
|
|
});
|
|
|
|
if (jsonSchemaValidate(filterJson, schemaGetExecutionsQueryFilter).valid) {
|
|
|
|
filter = filterJson as IGetExecutionsQueryFilter;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (error) {
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).error('Failed to parse filter', {
|
2022-10-26 04:30:35 -07:00
|
|
|
userId: req.user.id,
|
|
|
|
filter: req.query.filter,
|
|
|
|
});
|
2022-11-22 05:00:36 -08:00
|
|
|
throw new ResponseHelper.InternalServerError(
|
2022-12-29 03:20:43 -08:00
|
|
|
'Parameter "filter" contained invalid JSON string.',
|
2022-10-26 04:30:35 -07:00
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// safeguard against querying workflowIds not shared with the user
|
2022-12-19 08:53:36 -08:00
|
|
|
const workflowId = filter?.workflowId?.toString();
|
|
|
|
if (workflowId !== undefined && !sharedWorkflowIds.includes(workflowId)) {
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).verbose(
|
2022-12-19 08:53:36 -08:00
|
|
|
`User ${req.user.id} attempted to query non-shared workflow ${workflowId}`,
|
|
|
|
);
|
|
|
|
return {
|
|
|
|
count: 0,
|
|
|
|
estimated: false,
|
|
|
|
results: [],
|
|
|
|
};
|
2022-10-26 04:30:35 -07:00
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
|
|
|
const limit = req.query.limit
|
|
|
|
? parseInt(req.query.limit, 10)
|
2022-12-21 01:46:26 -08:00
|
|
|
: GenericHelpers.DEFAULT_EXECUTIONS_GET_ALL_LIMIT;
|
2022-08-26 08:31:28 -07:00
|
|
|
|
|
|
|
const executingWorkflowIds: string[] = [];
|
|
|
|
|
|
|
|
if (config.getEnv('executions.mode') === 'queue') {
|
2023-03-16 07:34:13 -07:00
|
|
|
const queue = Container.get(Queue);
|
2023-01-02 03:14:39 -08:00
|
|
|
const currentJobs = await queue.getJobs(['active', 'waiting']);
|
2022-08-26 08:31:28 -07:00
|
|
|
executingWorkflowIds.push(...currentJobs.map(({ data }) => data.executionId));
|
|
|
|
}
|
|
|
|
|
|
|
|
// We may have manual executions even with queue so we must account for these.
|
|
|
|
executingWorkflowIds.push(
|
2023-02-21 10:21:56 -08:00
|
|
|
...Container.get(ActiveExecutions)
|
2022-08-26 08:31:28 -07:00
|
|
|
.getActiveExecutions()
|
|
|
|
.map(({ id }) => id),
|
|
|
|
);
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
const { count, estimated } = await Container.get(ExecutionRepository).countExecutions(
|
|
|
|
filter,
|
|
|
|
sharedWorkflowIds,
|
|
|
|
executingWorkflowIds,
|
|
|
|
req.user.globalRole.name === 'owner',
|
2022-11-30 05:00:28 -08:00
|
|
|
);
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
const formattedExecutions = await Container.get(ExecutionRepository).searchExecutions(
|
|
|
|
filter,
|
|
|
|
limit,
|
|
|
|
executingWorkflowIds,
|
|
|
|
sharedWorkflowIds,
|
|
|
|
{
|
|
|
|
lastId: req.query.lastId,
|
|
|
|
firstId: req.query.firstId,
|
|
|
|
},
|
|
|
|
);
|
2022-08-26 08:31:28 -07:00
|
|
|
return {
|
|
|
|
count,
|
|
|
|
results: formattedExecutions,
|
|
|
|
estimated,
|
|
|
|
};
|
2022-11-30 05:00:28 -08:00
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
static async getExecution(
|
|
|
|
req: ExecutionRequest.Get,
|
|
|
|
): Promise<IExecutionResponse | IExecutionFlattedResponse | undefined> {
|
|
|
|
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
|
|
|
|
if (!sharedWorkflowIds.length) return undefined;
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
const { id: executionId } = req.params;
|
2023-06-20 10:13:18 -07:00
|
|
|
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
|
2022-11-30 05:00:28 -08:00
|
|
|
where: {
|
|
|
|
id: executionId,
|
|
|
|
workflowId: In(sharedWorkflowIds),
|
|
|
|
},
|
2023-06-20 10:13:18 -07:00
|
|
|
includeData: true,
|
|
|
|
unflattenData: false,
|
2022-11-30 05:00:28 -08:00
|
|
|
});
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
if (!execution) {
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).info(
|
|
|
|
'Attempt to read execution was blocked due to insufficient permissions',
|
|
|
|
{
|
|
|
|
userId: req.user.id,
|
|
|
|
executionId,
|
|
|
|
},
|
|
|
|
);
|
2022-11-30 05:00:28 -08:00
|
|
|
return undefined;
|
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2023-02-21 12:44:46 -08:00
|
|
|
if (!execution.status) {
|
|
|
|
execution.status = getStatusUsingPreviousExecutionStatusMethod(execution);
|
|
|
|
}
|
|
|
|
|
2023-01-02 08:42:32 -08:00
|
|
|
return execution;
|
2022-11-30 05:00:28 -08:00
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
static async retryExecution(req: ExecutionRequest.Retry): Promise<boolean> {
|
|
|
|
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
|
2022-08-26 08:31:28 -07:00
|
|
|
if (!sharedWorkflowIds.length) return false;
|
|
|
|
|
2022-11-30 05:00:28 -08:00
|
|
|
const { id: executionId } = req.params;
|
2023-06-20 10:13:18 -07:00
|
|
|
const execution = await Container.get(ExecutionRepository).findSingleExecution(executionId, {
|
2022-08-26 08:31:28 -07:00
|
|
|
where: {
|
|
|
|
workflowId: In(sharedWorkflowIds),
|
|
|
|
},
|
2023-06-20 10:13:18 -07:00
|
|
|
includeData: true,
|
|
|
|
unflattenData: true,
|
2022-08-26 08:31:28 -07:00
|
|
|
});
|
|
|
|
|
|
|
|
if (!execution) {
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).info(
|
2022-08-26 08:31:28 -07:00
|
|
|
'Attempt to retry an execution was blocked due to insufficient permissions',
|
|
|
|
{
|
|
|
|
userId: req.user.id,
|
|
|
|
executionId,
|
|
|
|
},
|
|
|
|
);
|
2022-11-22 05:00:36 -08:00
|
|
|
throw new ResponseHelper.NotFoundError(
|
2022-08-26 08:31:28 -07:00
|
|
|
`The execution with the ID "${executionId}" does not exist.`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
if (execution.finished) {
|
2022-08-26 08:31:28 -07:00
|
|
|
throw new Error('The execution succeeded, so it cannot be retried.');
|
|
|
|
}
|
|
|
|
|
|
|
|
const executionMode = 'retry';
|
|
|
|
|
2023-06-20 10:13:18 -07:00
|
|
|
execution.workflowData.active = false;
|
2022-08-26 08:31:28 -07:00
|
|
|
|
|
|
|
// Start the workflow
|
|
|
|
const data: IWorkflowExecutionDataProcess = {
|
|
|
|
executionMode,
|
2023-06-20 10:13:18 -07:00
|
|
|
executionData: execution.data,
|
2022-08-26 08:31:28 -07:00
|
|
|
retryOf: req.params.id,
|
2023-06-20 10:13:18 -07:00
|
|
|
workflowData: execution.workflowData,
|
2022-08-26 08:31:28 -07:00
|
|
|
userId: req.user.id,
|
|
|
|
};
|
|
|
|
|
|
|
|
const { lastNodeExecuted } = data.executionData!.resultData;
|
|
|
|
|
|
|
|
if (lastNodeExecuted) {
|
|
|
|
// Remove the old error and the data of the last run of the node that it can be replaced
|
|
|
|
delete data.executionData!.resultData.error;
|
|
|
|
const { length } = data.executionData!.resultData.runData[lastNodeExecuted];
|
|
|
|
if (
|
|
|
|
length > 0 &&
|
|
|
|
data.executionData!.resultData.runData[lastNodeExecuted][length - 1].error !== undefined
|
|
|
|
) {
|
|
|
|
// Remove results only if it is an error.
|
|
|
|
// If we are retrying due to a crash, the information is simply success info from last node
|
|
|
|
data.executionData!.resultData.runData[lastNodeExecuted].pop();
|
|
|
|
// Stack will determine what to run next
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (req.body.loadWorkflow) {
|
|
|
|
// Loads the currently saved workflow to execute instead of the
|
|
|
|
// one saved at the time of the execution.
|
2023-06-20 10:13:18 -07:00
|
|
|
const workflowId = execution.workflowData.id as string;
|
2023-01-13 09:12:22 -08:00
|
|
|
const workflowData = (await Db.collections.Workflow.findOneBy({
|
|
|
|
id: workflowId,
|
|
|
|
})) as IWorkflowBase;
|
2022-08-26 08:31:28 -07:00
|
|
|
|
|
|
|
if (workflowData === undefined) {
|
|
|
|
throw new Error(
|
|
|
|
`The workflow with the ID "${workflowId}" could not be found and so the data not be loaded for the retry.`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
data.workflowData = workflowData;
|
2023-02-21 10:21:56 -08:00
|
|
|
const nodeTypes = Container.get(NodeTypes);
|
2022-08-26 08:31:28 -07:00
|
|
|
const workflowInstance = new Workflow({
|
|
|
|
id: workflowData.id as string,
|
|
|
|
name: workflowData.name,
|
|
|
|
nodes: workflowData.nodes,
|
|
|
|
connections: workflowData.connections,
|
|
|
|
active: false,
|
|
|
|
nodeTypes,
|
|
|
|
staticData: undefined,
|
|
|
|
settings: workflowData.settings,
|
|
|
|
});
|
|
|
|
|
|
|
|
// Replace all of the nodes in the execution stack with the ones of the new workflow
|
|
|
|
for (const stack of data.executionData!.executionData!.nodeExecutionStack) {
|
|
|
|
// Find the data of the last executed node in the new workflow
|
|
|
|
const node = workflowInstance.getNode(stack.node.name);
|
|
|
|
if (node === null) {
|
2023-10-25 07:35:22 -07:00
|
|
|
Container.get(Logger).error(
|
|
|
|
'Failed to retry an execution because a node could not be found',
|
|
|
|
{
|
|
|
|
userId: req.user.id,
|
|
|
|
executionId,
|
|
|
|
nodeName: stack.node.name,
|
|
|
|
},
|
|
|
|
);
|
2022-08-26 08:31:28 -07:00
|
|
|
throw new Error(
|
|
|
|
`Could not find the node "${stack.node.name}" in workflow. It probably got deleted or renamed. Without it the workflow can sadly not be retried.`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Replace the node data in the stack that it really uses the current data
|
|
|
|
stack.node = node;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const workflowRunner = new WorkflowRunner();
|
|
|
|
const retriedExecutionId = await workflowRunner.run(data);
|
|
|
|
|
2023-09-01 04:29:31 -07:00
|
|
|
const executionData =
|
|
|
|
await Container.get(ActiveExecutions).getPostExecutePromise(retriedExecutionId);
|
2022-08-26 08:31:28 -07:00
|
|
|
|
|
|
|
if (!executionData) {
|
|
|
|
throw new Error('The retry did not start for an unknown reason.');
|
|
|
|
}
|
|
|
|
|
|
|
|
return !!executionData.finished;
|
2022-11-30 05:00:28 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
static async deleteExecutions(req: ExecutionRequest.Delete): Promise<void> {
|
|
|
|
const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user);
|
|
|
|
if (sharedWorkflowIds.length === 0) {
|
|
|
|
// return early since without shared workflows there can be no hits
|
|
|
|
// (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners)
|
|
|
|
return;
|
|
|
|
}
|
2022-10-26 04:30:35 -07:00
|
|
|
const { deleteBefore, ids, filters: requestFiltersRaw } = req.body;
|
|
|
|
let requestFilters;
|
|
|
|
if (requestFiltersRaw) {
|
|
|
|
try {
|
|
|
|
Object.keys(requestFiltersRaw).map((key) => {
|
|
|
|
if (!allowedExecutionsQueryFilterFields.includes(key)) delete requestFiltersRaw[key];
|
|
|
|
});
|
|
|
|
if (jsonSchemaValidate(requestFiltersRaw, schemaGetExecutionsQueryFilter).valid) {
|
|
|
|
requestFilters = requestFiltersRaw as IGetExecutionsQueryFilter;
|
|
|
|
}
|
|
|
|
} catch (error) {
|
2022-11-22 05:00:36 -08:00
|
|
|
throw new ResponseHelper.InternalServerError(
|
2022-12-29 03:20:43 -08:00
|
|
|
'Parameter "filter" contained invalid JSON string.',
|
2022-10-26 04:30:35 -07:00
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
2022-08-26 08:31:28 -07:00
|
|
|
|
2023-09-20 06:21:42 -07:00
|
|
|
return Container.get(ExecutionRepository).deleteExecutionsByFilter(
|
|
|
|
requestFilters,
|
|
|
|
sharedWorkflowIds,
|
|
|
|
{
|
|
|
|
deleteBefore,
|
|
|
|
ids,
|
|
|
|
},
|
|
|
|
);
|
2022-11-30 05:00:28 -08:00
|
|
|
}
|
|
|
|
}
|