/* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { validate as jsonSchemaValidate } from 'jsonschema'; import { BinaryDataManager } from 'n8n-core'; import type { IDataObject, IWorkflowBase, JsonObject, ExecutionStatus, IRunExecutionData, NodeOperationError, IExecutionsSummary, } from 'n8n-workflow'; import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow'; import type { FindOperator, FindOptionsWhere } from 'typeorm'; import { In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import type { User } from '@db/entities/User'; import type { ExecutionEntity } from '@db/entities/ExecutionEntity'; import type { IExecutionFlattedResponse, IExecutionResponse, IExecutionsListResponse, IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import { Queue } from '@/Queue'; import type { ExecutionRequest } from '@/requests'; import * as ResponseHelper from '@/ResponseHelper'; import { getSharedWorkflowIds } from '@/WorkflowHelpers'; import { WorkflowRunner } from '@/WorkflowRunner'; import * as Db from '@/Db'; import * as GenericHelpers from '@/GenericHelpers'; import { parse } from 'flatted'; import { Container } from 'typedi'; import { getStatusUsingPreviousExecutionStatusMethod } from './executionHelpers'; interface IGetExecutionsQueryFilter { id?: FindOperator; finished?: boolean; mode?: string; retryOf?: string; retrySuccessId?: string; status?: ExecutionStatus[]; workflowId?: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any waitTill?: FindOperator | boolean; } const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', type: 'object', properties: { finished: { type: 'boolean' }, mode: { type: 'string' }, retryOf: { type: 'string' }, retrySuccessId: { type: 'string' }, status: { type: 'array', items: { type: 'string' }, }, waitTill: { type: 'boolean' }, workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] }, }, }; const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties); export class ExecutionsService { /** * Function to get the workflow Ids for a User * Overridden in EE version to ignore roles */ static async getWorkflowIdsForUser(user: User): Promise { // Get all workflows using owner role return getSharedWorkflowIds(user, ['owner']); } /** * Helper function to retrieve count of Executions */ static async getExecutionsCount( countFilter: IDataObject, user: User, ): Promise<{ count: number; estimated: boolean }> { const dbType = config.getEnv('database.type'); const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id'); // For databases other than Postgres, do a regular count // when filtering based on `workflowId` or `finished` fields. if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') { const sharedWorkflowIds = await this.getWorkflowIdsForUser(user); const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } }; const count = await Db.collections.Execution.count(countParams); return { count, estimated: false }; } try { // Get an estimate of rows count. const estimateRowsNumberSql = "SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';"; const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution.query( estimateRowsNumberSql, ); const estimate = parseInt(rows[0].n_live_tup, 10); // If over 100k, return just an estimate. if (estimate > 100_000) { // if less than 100k, we get the real count as even a full // table scan should not take so long. return { count: estimate, estimated: true }; } } catch (error) { LoggerProxy.warn(`Failed to get executions count from Postgres: ${error}`); } const sharedWorkflowIds = await getSharedWorkflowIds(user); const count = await Db.collections.Execution.count({ where: { workflowId: In(sharedWorkflowIds), }, }); return { count, estimated: false }; } static massageFilters(filter: IDataObject): void { if (filter) { if (filter.waitTill === true) { filter.waitTill = Not(IsNull()); // eslint-disable-next-line @typescript-eslint/no-unnecessary-boolean-literal-compare } else if (filter.finished === false) { filter.waitTill = IsNull(); } else { delete filter.waitTill; } } } static async getExecutionsList(req: ExecutionRequest.GetAll): Promise { const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (sharedWorkflowIds.length === 0) { // return early since without shared workflows there can be no hits // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) return { count: 0, estimated: false, results: [], }; } // parse incoming filter object and remove non-valid fields let filter: IGetExecutionsQueryFilter | undefined = undefined; if (req.query.filter) { try { const filterJson: JsonObject = jsonParse(req.query.filter); if (filterJson) { Object.keys(filterJson).map((key) => { if (!allowedExecutionsQueryFilterFields.includes(key)) delete filterJson[key]; }); if (jsonSchemaValidate(filterJson, schemaGetExecutionsQueryFilter).valid) { filter = filterJson as IGetExecutionsQueryFilter; } } } catch (error) { LoggerProxy.error('Failed to parse filter', { userId: req.user.id, filter: req.query.filter, }); throw new ResponseHelper.InternalServerError( 'Parameter "filter" contained invalid JSON string.', ); } } // safeguard against querying workflowIds not shared with the user const workflowId = filter?.workflowId?.toString(); if (workflowId !== undefined && !sharedWorkflowIds.includes(workflowId)) { LoggerProxy.verbose( `User ${req.user.id} attempted to query non-shared workflow ${workflowId}`, ); return { count: 0, estimated: false, results: [], }; } const limit = req.query.limit ? parseInt(req.query.limit, 10) : GenericHelpers.DEFAULT_EXECUTIONS_GET_ALL_LIMIT; const executingWorkflowIds: string[] = []; if (config.getEnv('executions.mode') === 'queue') { const queue = Container.get(Queue); const currentJobs = await queue.getJobs(['active', 'waiting']); executingWorkflowIds.push(...currentJobs.map(({ data }) => data.executionId)); } // We may have manual executions even with queue so we must account for these. executingWorkflowIds.push( ...Container.get(ActiveExecutions) .getActiveExecutions() .map(({ id }) => id), ); const findWhere: FindOptionsWhere = { workflowId: In(sharedWorkflowIds), }; if (filter?.status) { Object.assign(findWhere, { status: In(filter.status) }); } if (filter?.finished) { Object.assign(findWhere, { finished: filter.finished }); } const rangeQuery: string[] = []; const rangeQueryParams: { lastId?: string; firstId?: string; executingWorkflowIds?: string[]; } = {}; if (req.query.lastId) { rangeQuery.push('id < :lastId'); rangeQueryParams.lastId = req.query.lastId; } if (req.query.firstId) { rangeQuery.push('id > :firstId'); rangeQueryParams.firstId = req.query.firstId; } if (executingWorkflowIds.length > 0) { rangeQuery.push('id NOT IN (:...executingWorkflowIds)'); rangeQueryParams.executingWorkflowIds = executingWorkflowIds; } if (rangeQuery.length) { Object.assign(findWhere, { id: Raw(() => rangeQuery.join(' and '), rangeQueryParams), }); } // Omit `data` from the Execution since it is the largest and not necesary for the list. let query = Db.collections.Execution.createQueryBuilder('execution') .select([ 'execution.id', 'execution.finished', 'execution.mode', 'execution.retryOf', 'execution.retrySuccessId', 'execution.waitTill', 'execution.startedAt', 'execution.stoppedAt', 'execution.workflowData', 'execution.status', ]) .orderBy('id', 'DESC') .take(limit) .where(findWhere); const countFilter = deepCopy(filter ?? {}); // deepcopy breaks the In operator so we need to reapply it if (filter?.status) { Object.assign(filter, { status: In(filter.status) }); Object.assign(countFilter, { status: In(filter.status) }); } if (filter) { this.massageFilters(filter as IDataObject); query = query.andWhere(filter); } this.massageFilters(countFilter as IDataObject); countFilter.id = Not(In(executingWorkflowIds)); const executions = await query.getMany(); const { count, estimated } = await this.getExecutionsCount( countFilter as IDataObject, req.user, ); const formattedExecutions: IExecutionsSummary[] = executions.map((execution) => { // inject potential node execution errors into the execution response const nodeExecutionStatus = {}; let lastNodeExecuted; let executionError; // fill execution status for old executions that will return null if (!execution.status) { execution.status = getStatusUsingPreviousExecutionStatusMethod(execution); } try { const data = parse(execution.data) as IRunExecutionData; lastNodeExecuted = data?.resultData?.lastNodeExecuted ?? ''; executionError = data?.resultData?.error; if (data?.resultData?.runData) { for (const key of Object.keys(data.resultData.runData)) { const errors = data.resultData.runData[key] ?.filter((taskdata) => taskdata.error?.name) ?.map((taskdata) => { if (taskdata.error?.name === 'NodeOperationError') { return { name: (taskdata.error as NodeOperationError).name, message: (taskdata.error as NodeOperationError).message, description: (taskdata.error as NodeOperationError).description, }; } else { return { name: taskdata.error?.name, }; } }); Object.assign(nodeExecutionStatus, { [key]: { executionStatus: data.resultData.runData[key][0].executionStatus, errors, data: data.resultData.runData[key][0].data ?? undefined, }, }); } } } catch {} return { id: execution.id, finished: execution.finished, mode: execution.mode, retryOf: execution.retryOf?.toString(), retrySuccessId: execution?.retrySuccessId?.toString(), waitTill: execution.waitTill as Date | undefined, startedAt: execution.startedAt, stoppedAt: execution.stoppedAt, workflowId: execution.workflowData?.id ?? '', workflowName: execution.workflowData?.name, status: execution.status, lastNodeExecuted, executionError, nodeExecutionStatus, } as IExecutionsSummary; }); return { count, results: formattedExecutions, estimated, }; } static async getExecution( req: ExecutionRequest.Get, ): Promise { const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (!sharedWorkflowIds.length) return undefined; const { id: executionId } = req.params; const execution = await Db.collections.Execution.findOne({ where: { id: executionId, workflowId: In(sharedWorkflowIds), }, }); if (!execution) { LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', { userId: req.user.id, executionId, }); return undefined; } if (!execution.status) { execution.status = getStatusUsingPreviousExecutionStatusMethod(execution); } if (req.query.unflattedResponse === 'true') { return ResponseHelper.unflattenExecutionData(execution); } // @ts-ignore return execution; } static async retryExecution(req: ExecutionRequest.Retry): Promise { const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (!sharedWorkflowIds.length) return false; const { id: executionId } = req.params; const execution = await Db.collections.Execution.findOne({ where: { id: executionId, workflowId: In(sharedWorkflowIds), }, }); if (!execution) { LoggerProxy.info( 'Attempt to retry an execution was blocked due to insufficient permissions', { userId: req.user.id, executionId, }, ); throw new ResponseHelper.NotFoundError( `The execution with the ID "${executionId}" does not exist.`, ); } const fullExecutionData = ResponseHelper.unflattenExecutionData(execution); if (fullExecutionData.finished) { throw new Error('The execution succeeded, so it cannot be retried.'); } const executionMode = 'retry'; fullExecutionData.workflowData.active = false; // Start the workflow const data: IWorkflowExecutionDataProcess = { executionMode, executionData: fullExecutionData.data, retryOf: req.params.id, workflowData: fullExecutionData.workflowData, userId: req.user.id, }; const { lastNodeExecuted } = data.executionData!.resultData; if (lastNodeExecuted) { // Remove the old error and the data of the last run of the node that it can be replaced delete data.executionData!.resultData.error; const { length } = data.executionData!.resultData.runData[lastNodeExecuted]; if ( length > 0 && data.executionData!.resultData.runData[lastNodeExecuted][length - 1].error !== undefined ) { // Remove results only if it is an error. // If we are retrying due to a crash, the information is simply success info from last node data.executionData!.resultData.runData[lastNodeExecuted].pop(); // Stack will determine what to run next } } if (req.body.loadWorkflow) { // Loads the currently saved workflow to execute instead of the // one saved at the time of the execution. const workflowId = fullExecutionData.workflowData.id as string; const workflowData = (await Db.collections.Workflow.findOneBy({ id: workflowId, })) as IWorkflowBase; if (workflowData === undefined) { throw new Error( `The workflow with the ID "${workflowId}" could not be found and so the data not be loaded for the retry.`, ); } data.workflowData = workflowData; const nodeTypes = Container.get(NodeTypes); const workflowInstance = new Workflow({ id: workflowData.id as string, name: workflowData.name, nodes: workflowData.nodes, connections: workflowData.connections, active: false, nodeTypes, staticData: undefined, settings: workflowData.settings, }); // Replace all of the nodes in the execution stack with the ones of the new workflow for (const stack of data.executionData!.executionData!.nodeExecutionStack) { // Find the data of the last executed node in the new workflow const node = workflowInstance.getNode(stack.node.name); if (node === null) { LoggerProxy.error('Failed to retry an execution because a node could not be found', { userId: req.user.id, executionId, nodeName: stack.node.name, }); throw new Error( `Could not find the node "${stack.node.name}" in workflow. It probably got deleted or renamed. Without it the workflow can sadly not be retried.`, ); } // Replace the node data in the stack that it really uses the current data stack.node = node; } } const workflowRunner = new WorkflowRunner(); const retriedExecutionId = await workflowRunner.run(data); const executionData = await Container.get(ActiveExecutions).getPostExecutePromise( retriedExecutionId, ); if (!executionData) { throw new Error('The retry did not start for an unknown reason.'); } return !!executionData.finished; } static async deleteExecutions(req: ExecutionRequest.Delete): Promise { const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (sharedWorkflowIds.length === 0) { // return early since without shared workflows there can be no hits // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) return; } const { deleteBefore, ids, filters: requestFiltersRaw } = req.body; let requestFilters; if (requestFiltersRaw) { try { Object.keys(requestFiltersRaw).map((key) => { if (!allowedExecutionsQueryFilterFields.includes(key)) delete requestFiltersRaw[key]; }); if (jsonSchemaValidate(requestFiltersRaw, schemaGetExecutionsQueryFilter).valid) { requestFilters = requestFiltersRaw as IGetExecutionsQueryFilter; } } catch (error) { throw new ResponseHelper.InternalServerError( 'Parameter "filter" contained invalid JSON string.', ); } } if (!deleteBefore && !ids) { throw new Error('Either "deleteBefore" or "ids" must be present in the request body'); } const where: FindOptionsWhere = { workflowId: In(sharedWorkflowIds) }; if (deleteBefore) { // delete executions by date, if user may access the underlying workflows where.startedAt = LessThanOrEqual(deleteBefore); Object.assign(where, requestFilters); if (where.status) { where.status = In(requestFiltersRaw!.status as string[]); } } else if (ids) { // delete executions by IDs, if user may access the underlying workflows where.id = In(ids); } else return; const executions = await Db.collections.Execution.find({ select: ['id'], where, }); if (!executions.length) { if (ids) { LoggerProxy.error('Failed to delete an execution due to insufficient permissions', { userId: req.user.id, executionIds: ids, }); } return; } const idsToDelete = executions.map(({ id }) => id); const binaryDataManager = BinaryDataManager.getInstance(); await Promise.all( idsToDelete.map(async (id) => binaryDataManager.deleteBinaryDataByExecutionId(id)), ); do { // Delete in batches to avoid "SQLITE_ERROR: Expression tree is too large (maximum depth 1000)" error const batch = idsToDelete.splice(0, 500); await Db.collections.Execution.delete(batch); } while (idsToDelete.length > 0); } }