From 3d80129a28169043a2ab20feae7a7141c1e820cf Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Wed, 10 Mar 2021 10:50:07 +0100 Subject: [PATCH] :bug: Fix performance and crashes with latest updates (#1524) * Removing unnecessary data from process communication * Wrapping progress save in a try ... catch block to prevent crashes * Migration to fix mysql problems with big data on executions * :zap: Minor formatting fixes Co-authored-by: Jan Oberhauser --- .../cli/src/WorkflowExecuteAdditionalData.ts | 98 ++++++++++--------- packages/cli/src/WorkflowRunnerProcess.ts | 4 +- .../1615306975123-ChangeDataSize.ts | 18 ++++ .../src/databases/mysqldb/migrations/index.ts | 2 + 4 files changed, 76 insertions(+), 46 deletions(-) create mode 100644 packages/cli/src/databases/mysqldb/migrations/1615306975123-ChangeDataSize.ts diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index d1ef25985e..b40491cea4 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -230,54 +230,64 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx return; } - const execution = await Db.collections.Execution!.findOne(this.executionId); + try { + const execution = await Db.collections.Execution!.findOne(this.executionId); - if (execution === undefined) { - // Something went badly wrong if this happens. - // This check is here mostly to make typescript happy. - return undefined; - } - const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution); + if (execution === undefined) { + // Something went badly wrong if this happens. + // This check is here mostly to make typescript happy. + return undefined; + } + const fullExecutionData: IExecutionResponse = ResponseHelper.unflattenExecutionData(execution); - if (fullExecutionData.finished) { - // We already received ´workflowExecuteAfter´ webhook, so this is just an async call - // that was left behind. We skip saving because the other call should have saved everything - // so this one is safe to ignore - return; + if (fullExecutionData.finished) { + // We already received ´workflowExecuteAfter´ webhook, so this is just an async call + // that was left behind. We skip saving because the other call should have saved everything + // so this one is safe to ignore + return; + } + + + if (fullExecutionData.data === undefined) { + fullExecutionData.data = { + startData: { + }, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + waitingExecution: {}, + }, + }; + } + + if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) { + // Append data if array exists + fullExecutionData.data.resultData.runData[nodeName].push(data); + } else { + // Initialize array and save data + fullExecutionData.data.resultData.runData[nodeName] = [data]; + } + + fullExecutionData.data.executionData = executionData.executionData; + + // Set last executed node so that it may resume on failure + fullExecutionData.data.resultData.lastNodeExecuted = nodeName; + + const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData); + + await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb); + } catch (err) { + // TODO: Improve in the future! + // Errors here might happen because of database access + // For busy machines, we may get "Database is locked" errors. + + // We do this to prevent crashes and executions ending in `unknown` state. + console.log(`Failed saving execution progress to database for execution ID ${this.executionId}`, err); } - - if (fullExecutionData.data === undefined) { - fullExecutionData.data = { - startData: { - }, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack: [], - waitingExecution: {}, - }, - }; - } - - if (Array.isArray(fullExecutionData.data.resultData.runData[nodeName])) { - // Append data if array exists - fullExecutionData.data.resultData.runData[nodeName].push(data); - } else { - // Initialize array and save data - fullExecutionData.data.resultData.runData[nodeName] = [data]; - } - - fullExecutionData.data.executionData = executionData.executionData; - - // Set last executed node so that it may resume on failure - fullExecutionData.data.resultData.lastNodeExecuted = nodeName; - - const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData); - - await Db.collections.Execution!.update(this.executionId, flattenedExecutionData as IExecutionFlattedDb); }, ], }; diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 95b9bf5adb..50c623beef 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -174,8 +174,8 @@ export class WorkflowRunnerProcess { }, ], nodeExecuteAfter: [ - async (nodeName: string, data: ITaskData, executionData: IRunExecutionData): Promise => { - this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data, executionData]); + async (nodeName: string, data: ITaskData): Promise => { + this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); }, ], workflowExecuteBefore: [ diff --git a/packages/cli/src/databases/mysqldb/migrations/1615306975123-ChangeDataSize.ts b/packages/cli/src/databases/mysqldb/migrations/1615306975123-ChangeDataSize.ts new file mode 100644 index 0000000000..793b178ef3 --- /dev/null +++ b/packages/cli/src/databases/mysqldb/migrations/1615306975123-ChangeDataSize.ts @@ -0,0 +1,18 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import * as config from '../../../../config'; + +export class ChangeDataSize1615306975123 implements MigrationInterface { + name = 'ChangeDataSize1615306975123'; + + async up(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` MODIFY COLUMN `data` MEDIUMTEXT NOT NULL'); + } + + async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.get('database.tablePrefix'); + + await queryRunner.query('ALTER TABLE `' + tablePrefix + 'execution_entity` MODIFY COLUMN `data` TEXT NOT NULL'); + } +} diff --git a/packages/cli/src/databases/mysqldb/migrations/index.ts b/packages/cli/src/databases/mysqldb/migrations/index.ts index 4c736d57ed..08ca45edbf 100644 --- a/packages/cli/src/databases/mysqldb/migrations/index.ts +++ b/packages/cli/src/databases/mysqldb/migrations/index.ts @@ -3,6 +3,7 @@ import { WebhookModel1592447867632 } from './1592447867632-WebhookModel'; import { CreateIndexStoppedAt1594902918301 } from './1594902918301-CreateIndexStoppedAt'; import { AddWebhookId1611149998770 } from './1611149998770-AddWebhookId'; import { MakeStoppedAtNullable1607431743767 } from './1607431743767-MakeStoppedAtNullable'; +import { ChangeDataSize1615306975123 } from './1615306975123-ChangeDataSize'; export const mysqlMigrations = [ InitialMigration1588157391238, @@ -10,4 +11,5 @@ export const mysqlMigrations = [ CreateIndexStoppedAt1594902918301, AddWebhookId1611149998770, MakeStoppedAtNullable1607431743767, + ChangeDataSize1615306975123, ];