From ddf787c087e523871891fd363f810075943b5e7b Mon Sep 17 00:00:00 2001 From: freya Date: Wed, 30 Nov 2022 13:00:28 +0000 Subject: [PATCH] fix(core): Ensure executions list is properly filtered for all users (#4765) Also updates executions API to have EE version --- packages/cli/src/Server.ts | 2 +- .../executions/executions.controller.ee.ts | 70 +++++ .../src/executions/executions.controller.ts | 74 +++++ .../src/executions/executions.service.ee.ts | 13 + .../executions.service.ts} | 287 +++++++----------- 5 files changed, 274 insertions(+), 172 deletions(-) create mode 100644 packages/cli/src/executions/executions.controller.ee.ts create mode 100644 packages/cli/src/executions/executions.controller.ts create mode 100644 packages/cli/src/executions/executions.service.ee.ts rename packages/cli/src/{api/executions.api.ts => executions/executions.service.ts} (74%) diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 13b11bc6dc..913d2ef69c 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -109,7 +109,7 @@ import type { import { userManagementRouter } from '@/UserManagement'; import { resolveJwt } from '@/UserManagement/auth/jwt'; -import { executionsController } from '@/api/executions.api'; +import { executionsController } from '@/executions/executions.controller'; import { nodeTypesController } from '@/api/nodeTypes.api'; import { tagsController } from '@/api/tags.api'; import { loadPublicApiVersions } from '@/PublicApi'; diff --git a/packages/cli/src/executions/executions.controller.ee.ts b/packages/cli/src/executions/executions.controller.ee.ts new file mode 100644 index 0000000000..200f6faf0b --- /dev/null +++ b/packages/cli/src/executions/executions.controller.ee.ts @@ -0,0 +1,70 @@ +import express from 'express'; +import config from '@/config'; +import { + IExecutionFlattedResponse, + IExecutionResponse, + IExecutionsListResponse, +} from '@/Interfaces'; +import type { ExecutionRequest } from '@/requests'; +import * as ResponseHelper from '@/ResponseHelper'; +import { isSharingEnabled } from '@/UserManagement/UserManagementHelper'; +import { EEExecutionsService } from './executions.service.ee'; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const EEExecutionsController = express.Router(); + +EEExecutionsController.use((req, res, next) => { + if (!isSharingEnabled() || !config.getEnv('enterprise.workflowSharingEnabled')) { + // skip ee router and use free one + next('router'); + return; + } + // use ee router + next(); +}); + +/** + * GET /executions + */ +EEExecutionsController.get( + '/', + ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise => { + return EEExecutionsService.getExecutionsList(req); + }), +); + +/** + * GET /executions/:id + */ +EEExecutionsController.get( + '/:id', + ResponseHelper.send( + async ( + req: ExecutionRequest.Get, + ): Promise => { + return EEExecutionsService.getExecution(req); + }, + ), +); + +/** + * POST /executions/:id/retry + */ +EEExecutionsController.post( + '/:id/retry', + ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise => { + return EEExecutionsService.retryExecution(req); + }), +); + +/** + * POST /executions/delete + * INFORMATION: We use POST instead of DELETE to not run into any issues with the query data + * getting too long + */ +EEExecutionsController.post( + '/delete', + ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise => { + await EEExecutionsService.deleteExecutions(req); + }), +); diff --git a/packages/cli/src/executions/executions.controller.ts b/packages/cli/src/executions/executions.controller.ts new file mode 100644 index 0000000000..af33367611 --- /dev/null +++ b/packages/cli/src/executions/executions.controller.ts @@ -0,0 +1,74 @@ +import express from 'express'; +import { LoggerProxy } from 'n8n-workflow'; +import { + IExecutionFlattedResponse, + IExecutionResponse, + IExecutionsListResponse, +} from '@/Interfaces'; +import * as ResponseHelper from '@/ResponseHelper'; +import { getLogger } from '@/Logger'; +import type { ExecutionRequest } from '@/requests'; +import { EEExecutionsController } from './executions.controller.ee'; +import { ExecutionsService } from './executions.service'; + +export const executionsController = express.Router(); + +/** + * Initialise Logger if needed + */ +executionsController.use((req, res, next) => { + try { + LoggerProxy.getInstance(); + } catch (error) { + LoggerProxy.init(getLogger()); + } + next(); +}); + +executionsController.use('/', EEExecutionsController); + +/** + * GET /executions + */ +executionsController.get( + '/', + ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise => { + return ExecutionsService.getExecutionsList(req); + }), +); + +/** + * GET /executions/:id + */ +executionsController.get( + '/:id', + ResponseHelper.send( + async ( + req: ExecutionRequest.Get, + ): Promise => { + return ExecutionsService.getExecution(req); + }, + ), +); + +/** + * POST /executions/:id/retry + */ +executionsController.post( + '/:id/retry', + ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise => { + return ExecutionsService.retryExecution(req); + }), +); + +/** + * POST /executions/delete + * INFORMATION: We use POST instead of DELETE to not run into any issues with the query data + * getting too long + */ +executionsController.post( + '/delete', + ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise => { + await ExecutionsService.deleteExecutions(req); + }), +); diff --git a/packages/cli/src/executions/executions.service.ee.ts b/packages/cli/src/executions/executions.service.ee.ts new file mode 100644 index 0000000000..6df410a562 --- /dev/null +++ b/packages/cli/src/executions/executions.service.ee.ts @@ -0,0 +1,13 @@ +import { User } from '@/databases/entities/User'; +import { getSharedWorkflowIds } from '@/WorkflowHelpers'; +import { ExecutionsService } from './executions.service'; + +export class EEExecutionsService extends ExecutionsService { + /** + * Function to get the workflow Ids for a User regardless of role + */ + static async getWorkflowIdsForUser(user: User): Promise { + // Get all workflows + return getSharedWorkflowIds(user); + } +} diff --git a/packages/cli/src/api/executions.api.ts b/packages/cli/src/executions/executions.service.ts similarity index 74% rename from packages/cli/src/api/executions.api.ts rename to packages/cli/src/executions/executions.service.ts index 8229e5557a..19982c7573 100644 --- a/packages/cli/src/api/executions.api.ts +++ b/packages/cli/src/executions/executions.service.ts @@ -1,47 +1,39 @@ -/* eslint-disable no-restricted-syntax */ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -/* eslint-disable @typescript-eslint/no-unsafe-return */ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ -/* eslint-disable @typescript-eslint/no-unsafe-argument */ -/* eslint-disable @typescript-eslint/no-unused-vars */ -import express from 'express'; import { validate as jsonSchemaValidate } from 'jsonschema'; import { BinaryDataManager } from 'n8n-core'; -import { - deepCopy, - IDataObject, - IWorkflowBase, - JsonObject, - jsonParse, - LoggerProxy, - Workflow, -} from 'n8n-workflow'; +import { deepCopy, IDataObject, LoggerProxy, JsonObject, jsonParse, Workflow } from 'n8n-workflow'; import { FindOperator, In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm'; - import * as ActiveExecutions from '@/ActiveExecutions'; -import * as Db from '@/Db'; -import * as GenericHelpers from '@/GenericHelpers'; +import config from '@/config'; +import { User } from '@/databases/entities/User'; +import { DEFAULT_EXECUTIONS_GET_ALL_LIMIT } from '@/GenericHelpers'; import { - DatabaseType, IExecutionFlattedResponse, IExecutionResponse, IExecutionsListResponse, + IWorkflowBase, IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import * as ResponseHelper from '@/ResponseHelper'; -import { WorkflowRunner } from '@/WorkflowRunner'; -import config from '@/config'; -import { User } from '@db/entities/User'; -import { DEFAULT_EXECUTIONS_GET_ALL_LIMIT } from '@/GenericHelpers'; -import { getLogger } from '@/Logger'; import * as Queue from '@/Queue'; import type { ExecutionRequest } from '@/requests'; +import * as ResponseHelper from '@/ResponseHelper'; import { getSharedWorkflowIds } from '@/WorkflowHelpers'; +import { WorkflowRunner } from '@/WorkflowRunner'; +import { DatabaseType, Db, GenericHelpers } from '..'; -export const executionsController = express.Router(); +interface IGetExecutionsQueryFilter { + id?: FindOperator; + finished?: boolean; + mode?: string; + retryOf?: string; + retrySuccessId?: string; + workflowId?: number | string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + waitTill?: FindOperator | boolean; +} const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', @@ -58,91 +50,68 @@ const schemaGetExecutionsQueryFilter = { const allowedExecutionsQueryFilterFields = Object.keys(schemaGetExecutionsQueryFilter.properties); -interface IGetExecutionsQueryFilter { - id?: FindOperator; - finished?: boolean; - mode?: string; - retryOf?: string; - retrySuccessId?: string; - workflowId?: number | string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - waitTill?: FindOperator | boolean; -} - -/** - * Initialise Logger if needed - */ -executionsController.use((req, res, next) => { - try { - LoggerProxy.getInstance(); - } catch (error) { - LoggerProxy.init(getLogger()); +export class ExecutionsService { + /** + * Function to get the workflow Ids for a User + * Overridden in EE version to ignore roles + */ + static async getWorkflowIdsForUser(user: User): Promise { + // Get all workflows using owner role + return getSharedWorkflowIds(user, ['owner']); } - next(); -}); -/** - * Helper function to retrieve count of Executions - */ -async function getExecutionsCount( - countFilter: IDataObject, - user: User, -): Promise<{ count: number; estimated: boolean }> { - const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType; - const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id'); + /** + * Helper function to retrieve count of Executions + */ + static async getExecutionsCount( + countFilter: IDataObject, + user: User, + ): Promise<{ count: number; estimated: boolean }> { + const dbType = (await GenericHelpers.getConfigValue('database.type')) as DatabaseType; + const filteredFields = Object.keys(countFilter).filter((field) => field !== 'id'); + + // For databases other than Postgres, do a regular count + // when filtering based on `workflowId` or `finished` fields. + if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') { + const sharedWorkflowIds = await this.getWorkflowIdsForUser(user); + + const countParams = { where: { workflowId: In(sharedWorkflowIds), ...countFilter } }; + const count = await Db.collections.Execution.count(countParams); + return { count, estimated: false }; + } + + try { + // Get an estimate of rows count. + const estimateRowsNumberSql = + "SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';"; + const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution.query( + estimateRowsNumberSql, + ); + + const estimate = parseInt(rows[0].n_live_tup, 10); + // If over 100k, return just an estimate. + if (estimate > 100_000) { + // if less than 100k, we get the real count as even a full + // table scan should not take so long. + return { count: estimate, estimated: true }; + } + } catch (error) { + LoggerProxy.warn(`Failed to get executions count from Postgres: ${error}`); + } - // For databases other than Postgres, do a regular count - // when filtering based on `workflowId` or `finished` fields. - if (dbType !== 'postgresdb' || filteredFields.length > 0 || user.globalRole.name !== 'owner') { const sharedWorkflowIds = await getSharedWorkflowIds(user); const count = await Db.collections.Execution.count({ where: { workflowId: In(sharedWorkflowIds), - ...countFilter, }, }); return { count, estimated: false }; } - try { - // Get an estimate of rows count. - const estimateRowsNumberSql = - "SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';"; - const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution.query( - estimateRowsNumberSql, - ); - - const estimate = parseInt(rows[0].n_live_tup, 10); - // If over 100k, return just an estimate. - if (estimate > 100_000) { - // if less than 100k, we get the real count as even a full - // table scan should not take so long. - return { count: estimate, estimated: true }; - } - } catch (error) { - LoggerProxy.warn(`Failed to get executions count from Postgres: ${error}`); - } - - const sharedWorkflowIds = await getSharedWorkflowIds(user); - - const count = await Db.collections.Execution.count({ - where: { - workflowId: In(sharedWorkflowIds), - }, - }); - - return { count, estimated: false }; -} - -/** - * GET /executions - */ -executionsController.get( - '/', - ResponseHelper.send(async (req: ExecutionRequest.GetAll): Promise => { - const sharedWorkflowIds = await getSharedWorkflowIds(req.user); + static async getExecutionsList(req: ExecutionRequest.GetAll): Promise { + const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (sharedWorkflowIds.length === 0) { // return early since without shared workflows there can be no hits // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) @@ -263,7 +232,10 @@ executionsController.get( const executions = await query.getMany(); - const { count, estimated } = await getExecutionsCount(countFilter as IDataObject, req.user); + const { count, estimated } = await this.getExecutionsCount( + countFilter as IDataObject, + req.user, + ); const formattedExecutions = executions.map((execution) => { return { @@ -285,66 +257,48 @@ executionsController.get( results: formattedExecutions, estimated, }; - }), -); + } -/** - * GET /executions/:id - */ -executionsController.get( - '/:id', - ResponseHelper.send( - async ( - req: ExecutionRequest.Get, - ): Promise => { - const { id: executionId } = req.params; + static async getExecution( + req: ExecutionRequest.Get, + ): Promise { + const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); + if (!sharedWorkflowIds.length) return undefined; - const sharedWorkflowIds = await getSharedWorkflowIds(req.user); - - if (!sharedWorkflowIds.length) return undefined; - - const execution = await Db.collections.Execution.findOne({ - where: { - id: executionId, - workflowId: In(sharedWorkflowIds), - }, - }); - - if (!execution) { - LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', { - userId: req.user.id, - executionId, - }); - return undefined; - } - - if (req.query.unflattedResponse === 'true') { - return ResponseHelper.unflattenExecutionData(execution); - } - - const { id, ...rest } = execution; - - // @ts-ignore - return { - id: id.toString(), - ...rest, - }; - }, - ), -); - -/** - * POST /executions/:id/retry - */ -executionsController.post( - '/:id/retry', - ResponseHelper.send(async (req: ExecutionRequest.Retry): Promise => { const { id: executionId } = req.params; + const execution = await Db.collections.Execution.findOne({ + where: { + id: executionId, + workflowId: In(sharedWorkflowIds), + }, + }); - const sharedWorkflowIds = await getSharedWorkflowIds(req.user); + if (!execution) { + LoggerProxy.info('Attempt to read execution was blocked due to insufficient permissions', { + userId: req.user.id, + executionId, + }); + return undefined; + } + if (req.query.unflattedResponse === 'true') { + return ResponseHelper.unflattenExecutionData(execution); + } + + const { id, ...rest } = execution; + + // @ts-ignore + return { + id: id.toString(), + ...rest, + }; + } + + static async retryExecution(req: ExecutionRequest.Retry): Promise { + const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); if (!sharedWorkflowIds.length) return false; + const { id: executionId } = req.params; const execution = await Db.collections.Execution.findOne({ where: { id: executionId, @@ -404,7 +358,7 @@ executionsController.post( if (req.body.loadWorkflow) { // Loads the currently saved workflow to execute instead of the // one saved at the time of the execution. - const workflowId = fullExecutionData.workflowData.id; + const workflowId = fullExecutionData.workflowData.id as string; const workflowData = (await Db.collections.Workflow.findOne(workflowId)) as IWorkflowBase; if (workflowData === undefined) { @@ -458,17 +412,15 @@ executionsController.post( } return !!executionData.finished; - }), -); + } -/** - * POST /executions/delete - * INFORMATION: We use POST instead of DELETE to not run into any issues with the query data - * getting too long - */ -executionsController.post( - '/delete', - ResponseHelper.send(async (req: ExecutionRequest.Delete): Promise => { + static async deleteExecutions(req: ExecutionRequest.Delete): Promise { + const sharedWorkflowIds = await this.getWorkflowIdsForUser(req.user); + if (sharedWorkflowIds.length === 0) { + // return early since without shared workflows there can be no hits + // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) + return; + } const { deleteBefore, ids, filters: requestFiltersRaw } = req.body; let requestFilters; if (requestFiltersRaw) { @@ -490,13 +442,6 @@ executionsController.post( throw new Error('Either "deleteBefore" or "ids" must be present in the request body'); } - const sharedWorkflowIds = await getSharedWorkflowIds(req.user); - if (sharedWorkflowIds.length === 0) { - // return early since without shared workflows there can be no hits - // (note: getSharedWorkflowIds() returns _all_ workflow ids for global owners) - return; - } - const binaryDataManager = BinaryDataManager.getInstance(); // delete executions by date, if user may access the underlying workflows @@ -558,5 +503,5 @@ executionsController.post( await Db.collections.Execution.delete(idsToDelete); } - }), -); + } +}