diff --git a/packages/cli/src/databases/MigrationHelpers.ts b/packages/cli/src/databases/MigrationHelpers.ts new file mode 100644 index 0000000000..7db121bd85 --- /dev/null +++ b/packages/cli/src/databases/MigrationHelpers.ts @@ -0,0 +1,39 @@ +import { QueryRunner } from 'typeorm'; + +export class MigrationHelpers { + queryRunner: QueryRunner; + + constructor(queryRunner: QueryRunner) { + this.queryRunner = queryRunner; + } + + // runs an operation sequential on chunks of a query that returns a potentially large Array. + /* eslint-disable no-await-in-loop */ + async runChunked( + query: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + operation: (results: any[]) => Promise, + limit = 100, + ): Promise { + let offset = 0; + let chunkedQuery: string; + let chunkedQueryResults: unknown[]; + + do { + chunkedQuery = this.chunkQuery(query, limit, offset); + chunkedQueryResults = (await this.queryRunner.query(chunkedQuery)) as unknown[]; + // pass a copy to prevent errors from mutation + await operation([...chunkedQueryResults]); + offset += limit; + } while (chunkedQueryResults.length === limit); + } + /* eslint-enable no-await-in-loop */ + + private chunkQuery(query: string, limit: number, offset = 0): string { + return ` + ${query} + LIMIT ${limit} + OFFSET ${offset} + `; + } +} diff --git a/packages/cli/src/databases/mysqldb/migrations/1630451444017-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/mysqldb/migrations/1630451444017-UpdateWorkflowCredentials.ts index 0012ee0aa1..0061052c2a 100644 --- a/packages/cli/src/databases/mysqldb/migrations/1630451444017-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/mysqldb/migrations/1630451444017-UpdateWorkflowCredentials.ts @@ -1,5 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import config = require('../../../../config'); +import { MigrationHelpers } from '../../MigrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -8,58 +9,100 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac name = 'UpdateWorkflowCredentials1630451444017'; public async up(queryRunner: QueryRunner): Promise { + console.log('Start migration', this.name); + console.time(this.name); const tablePrefix = config.get('database.tablePrefix'); + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM ${tablePrefix}credentials_entity `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM ${tablePrefix}workflow_entity - `); + `; // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = workflow.nodes; - let credentialsUpdated = false; - // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, name] of allNodeCredentials) { - if (typeof name === 'string') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( - // @ts-ignore - (credentials) => credentials.name === name && credentials.type === type, - ); - node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; - credentialsUpdated = true; + await helpers.runChunked(workflowsQuery, (workflows) => { + workflows.forEach(async (workflow) => { + const nodes = workflow.nodes; + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; + credentialsUpdated = true; + } } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}workflow_entity + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE ${tablePrefix}workflow_entity - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, workflowData FROM ${tablePrefix}execution_entity WHERE waitTill IS NOT NULL AND finished = 0 - `); + `; + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + waitingExecutions.forEach(async (execution) => { + const data = execution.workflowData; + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}execution_entity + SET workflowData = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, workflowData @@ -68,8 +111,8 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac ORDER BY startedAt DESC LIMIT 200 `); - - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; // @ts-ignore @@ -78,7 +121,6 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac const allNodeCredentials = Object.entries(node.credentials); for (const [type, name] of allNodeCredentials) { if (typeof name === 'string') { - // @ts-ignore const matchingCredentials = credentialsEntities.find( // @ts-ignore (credentials) => credentials.name === name && credentials.type === type, @@ -92,77 +134,124 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac if (credentialsUpdated) { const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( ` - UPDATE ${tablePrefix}execution_entity - SET workflowData = :data - WHERE id = '${execution.id}' - `, + UPDATE ${tablePrefix}execution_entity + SET workflowData = :data + WHERE id = '${execution.id}' + `, { data: JSON.stringify(data) }, {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); + console.timeEnd(this.name); } public async down(queryRunner: QueryRunner): Promise { const tablePrefix = config.get('database.tablePrefix'); + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM ${tablePrefix}credentials_entity `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM ${tablePrefix}workflow_entity - `); + `; // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = workflow.nodes; - let credentialsUpdated = false; - // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, creds] of allNodeCredentials) { - if (typeof creds === 'object') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( - // @ts-ignore - (credentials) => credentials.id === creds.id && credentials.type === type, - ); - if (matchingCredentials) { - node.credentials[type] = matchingCredentials.name; - } else { - // @ts-ignore - node.credentials[type] = creds.name; + await helpers.runChunked(workflowsQuery, (workflows) => { + workflows.forEach(async (workflow) => { + const nodes = workflow.nodes; + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; } - credentialsUpdated = true; } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}workflow_entity + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE ${tablePrefix}workflow_entity - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, workflowData FROM ${tablePrefix}execution_entity WHERE waitTill IS NOT NULL AND finished = 0 - `); + `; + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + waitingExecutions.forEach(async (execution) => { + const data = execution.workflowData; + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { + // @ts-ignore + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}execution_entity + SET workflowData = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, workflowData @@ -171,8 +260,8 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac ORDER BY startedAt DESC LIMIT 200 `); - - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; // @ts-ignore @@ -200,15 +289,15 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac if (credentialsUpdated) { const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( ` - UPDATE ${tablePrefix}execution_entity - SET workflowData = :data - WHERE id = '${execution.id}' - `, + UPDATE ${tablePrefix}execution_entity + SET workflowData = :data + WHERE id = '${execution.id}' + `, { data: JSON.stringify(data) }, {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); } diff --git a/packages/cli/src/databases/postgresdb/migrations/1630419189837-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/postgresdb/migrations/1630419189837-UpdateWorkflowCredentials.ts index 357d7c2974..ad3e44f0e6 100644 --- a/packages/cli/src/databases/postgresdb/migrations/1630419189837-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/postgresdb/migrations/1630419189837-UpdateWorkflowCredentials.ts @@ -1,5 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import config = require('../../../../config'); +import { MigrationHelpers } from '../../MigrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -8,62 +9,104 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac name = 'UpdateWorkflowCredentials1630419189837'; public async up(queryRunner: QueryRunner): Promise { + console.log('Start migration', this.name); + console.time(this.name); let tablePrefix = config.get('database.tablePrefix'); const schema = config.get('database.postgresdb.schema'); if (schema) { tablePrefix = schema + '.' + tablePrefix; } + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM ${tablePrefix}credentials_entity `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM ${tablePrefix}workflow_entity - `); + `; // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = workflow.nodes; - let credentialsUpdated = false; - // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, name] of allNodeCredentials) { - if (typeof name === 'string') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( - // @ts-ignore - (credentials) => credentials.name === name && credentials.type === type, - ); - node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; - credentialsUpdated = true; + await helpers.runChunked(workflowsQuery, (workflows) => { + workflows.forEach(async (workflow) => { + const nodes = workflow.nodes; + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; + credentialsUpdated = true; + } } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}workflow_entity + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE ${tablePrefix}workflow_entity - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, "workflowData" FROM ${tablePrefix}execution_entity WHERE "waitTill" IS NOT NULL AND finished = FALSE - `); + `; + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + waitingExecutions.forEach(async (execution) => { + const data = execution.workflowData; + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}execution_entity + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, "workflowData" @@ -73,7 +116,8 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac LIMIT 200 `); - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; // @ts-ignore @@ -104,9 +148,10 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); + console.timeEnd(this.name); } public async down(queryRunner: QueryRunner): Promise { @@ -115,62 +160,109 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac if (schema) { tablePrefix = schema + '.' + tablePrefix; } + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM ${tablePrefix}credentials_entity `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM ${tablePrefix}workflow_entity - `); + `; // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = workflow.nodes; - let credentialsUpdated = false; - // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, creds] of allNodeCredentials) { - if (typeof creds === 'object') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( + await helpers.runChunked(workflowsQuery, (workflows) => { + workflows.forEach(async (workflow) => { + const nodes = workflow.nodes; + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { // @ts-ignore - (credentials) => credentials.id === creds.id && credentials.type === type, - ); - if (matchingCredentials) { - node.credentials[type] = matchingCredentials.name; - } else { - // @ts-ignore - node.credentials[type] = creds.name; + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; } - credentialsUpdated = true; } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}workflow_entity + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE ${tablePrefix}workflow_entity - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, "workflowData" FROM ${tablePrefix}execution_entity WHERE "waitTill" IS NOT NULL AND finished = FALSE - `); + `; + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + waitingExecutions.forEach(async (execution) => { + const data = execution.workflowData; + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { + // @ts-ignore + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE ${tablePrefix}execution_entity + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, "workflowData" @@ -179,8 +271,8 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac ORDER BY "startedAt" DESC LIMIT 200 `); - - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = execution.workflowData; let credentialsUpdated = false; // @ts-ignore @@ -208,15 +300,15 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac if (credentialsUpdated) { const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( ` - UPDATE ${tablePrefix}execution_entity - SET "workflowData" = :data - WHERE id = '${execution.id}' - `, + UPDATE ${tablePrefix}execution_entity + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, { data: JSON.stringify(data) }, {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); } diff --git a/packages/cli/src/databases/sqlite/migrations/1630330987096-UpdateWorkflowCredentials.ts b/packages/cli/src/databases/sqlite/migrations/1630330987096-UpdateWorkflowCredentials.ts index f2a6f0a19a..147e5e49b2 100644 --- a/packages/cli/src/databases/sqlite/migrations/1630330987096-UpdateWorkflowCredentials.ts +++ b/packages/cli/src/databases/sqlite/migrations/1630330987096-UpdateWorkflowCredentials.ts @@ -1,5 +1,6 @@ import { MigrationInterface, QueryRunner } from 'typeorm'; import config = require('../../../../config'); +import { MigrationHelpers } from '../../MigrationHelpers'; // replacing the credentials in workflows and execution // `nodeType: name` changes to `nodeType: { id, name }` @@ -8,58 +9,101 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac name = 'UpdateWorkflowCredentials1630330987096'; public async up(queryRunner: QueryRunner): Promise { + console.log('Start migration', this.name); + console.time(this.name); const tablePrefix = config.get('database.tablePrefix'); + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM "${tablePrefix}credentials_entity" `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM "${tablePrefix}workflow_entity" - `); + `; + // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = JSON.parse(workflow.nodes); - let credentialsUpdated = false; - // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, name] of allNodeCredentials) { - if (typeof name === 'string') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( - // @ts-ignore - (credentials) => credentials.name === name && credentials.type === type, - ); - node.credentials[type] = { id: matchingCredentials?.id || null, name }; - credentialsUpdated = true; + await helpers.runChunked(workflowsQuery, (workflows) => { + workflows.forEach(async (workflow) => { + const nodes = JSON.parse(workflow.nodes); + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id || null, name }; + credentialsUpdated = true; + } } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}workflow_entity" + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE "${tablePrefix}workflow_entity" - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, "workflowData" FROM "${tablePrefix}execution_entity" WHERE "waitTill" IS NOT NULL AND finished = 0 - `); + `; + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + waitingExecutions.forEach(async (execution) => { + const data = JSON.parse(execution.workflowData); + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, name] of allNodeCredentials) { + if (typeof name === 'string') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.name === name && credentials.type === type, + ); + node.credentials[type] = { id: matchingCredentials?.id || null, name }; + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}execution_entity" + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, "workflowData" @@ -68,8 +112,8 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac ORDER BY "startedAt" DESC LIMIT 200 `); - - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = JSON.parse(execution.workflowData); let credentialsUpdated = false; // @ts-ignore @@ -78,7 +122,6 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac const allNodeCredentials = Object.entries(node.credentials); for (const [type, name] of allNodeCredentials) { if (typeof name === 'string') { - // @ts-ignore const matchingCredentials = credentialsEntities.find( // @ts-ignore (credentials) => credentials.name === name && credentials.type === type, @@ -92,77 +135,127 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac if (credentialsUpdated) { const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( ` - UPDATE "${tablePrefix}execution_entity" - SET "workflowData" = :data - WHERE id = '${execution.id}' - `, + UPDATE "${tablePrefix}execution_entity" + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, { data: JSON.stringify(data) }, {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); + console.timeEnd(this.name); } public async down(queryRunner: QueryRunner): Promise { const tablePrefix = config.get('database.tablePrefix'); + const helpers = new MigrationHelpers(queryRunner); const credentialsEntities = await queryRunner.query(` SELECT id, name, type FROM "${tablePrefix}credentials_entity" `); - const workflows = await queryRunner.query(` + const workflowsQuery = ` SELECT id, nodes FROM "${tablePrefix}workflow_entity" - `); + `; + // @ts-ignore - workflows.forEach(async (workflow) => { - const nodes = JSON.parse(workflow.nodes); - let credentialsUpdated = false; + await helpers.runChunked(workflowsQuery, (workflows) => { // @ts-ignore - nodes.forEach((node) => { - if (node.credentials) { - const allNodeCredentials = Object.entries(node.credentials); - for (const [type, creds] of allNodeCredentials) { - if (typeof creds === 'object') { - // @ts-ignore - const matchingCredentials = credentialsEntities.find( - // @ts-ignore - (credentials) => credentials.id === creds.id && credentials.type === type, - ); - if (matchingCredentials) { - node.credentials[type] = matchingCredentials.name; - } else { - // @ts-ignore - node.credentials[type] = creds.name; + workflows.forEach(async (workflow) => { + const nodes = JSON.parse(workflow.nodes); + let credentialsUpdated = false; + // @ts-ignore + nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; } - credentialsUpdated = true; } } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}workflow_entity" + SET nodes = :nodes + WHERE id = '${workflow.id}' + `, + { nodes: JSON.stringify(nodes) }, + {}, + ); + + queryRunner.query(updateQuery, updateParams); } }); - if (credentialsUpdated) { - const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( - ` - UPDATE "${tablePrefix}workflow_entity" - SET nodes = :nodes - WHERE id = '${workflow.id}' - `, - { nodes: JSON.stringify(nodes) }, - {}, - ); - - await queryRunner.query(updateQuery, updateParams); - } }); - const waitingExecutions = await queryRunner.query(` + const waitingExecutionsQuery = ` SELECT id, "workflowData" FROM "${tablePrefix}execution_entity" WHERE "waitTill" IS NOT NULL AND finished = 0 - `); + `; + + // @ts-ignore + await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => { + // @ts-ignore + waitingExecutions.forEach(async (execution) => { + const data = JSON.parse(execution.workflowData); + let credentialsUpdated = false; + // @ts-ignore + data.nodes.forEach((node) => { + if (node.credentials) { + const allNodeCredentials = Object.entries(node.credentials); + for (const [type, creds] of allNodeCredentials) { + if (typeof creds === 'object') { + const matchingCredentials = credentialsEntities.find( + // @ts-ignore + (credentials) => credentials.id === creds.id && credentials.type === type, + ); + if (matchingCredentials) { + node.credentials[type] = matchingCredentials.name; + } else { + // @ts-ignore + node.credentials[type] = creds.name; + } + credentialsUpdated = true; + } + } + } + }); + if (credentialsUpdated) { + const [updateQuery, updateParams] = + queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}execution_entity" + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, + { data: JSON.stringify(data) }, + {}, + ); + + await queryRunner.query(updateQuery, updateParams); + } + }); + }); const retryableExecutions = await queryRunner.query(` SELECT id, "workflowData" @@ -172,7 +265,8 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac LIMIT 200 `); - [...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { + // @ts-ignore + retryableExecutions.forEach(async (execution) => { const data = JSON.parse(execution.workflowData); let credentialsUpdated = false; // @ts-ignore @@ -181,7 +275,6 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac const allNodeCredentials = Object.entries(node.credentials); for (const [type, creds] of allNodeCredentials) { if (typeof creds === 'object') { - // @ts-ignore const matchingCredentials = credentialsEntities.find( // @ts-ignore (credentials) => credentials.id === creds.id && credentials.type === type, @@ -200,15 +293,15 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac if (credentialsUpdated) { const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( ` - UPDATE "${tablePrefix}execution_entity" - SET "workflowData" = :data - WHERE id = '${execution.id}' - `, + UPDATE "${tablePrefix}execution_entity" + SET "workflowData" = :data + WHERE id = '${execution.id}' + `, { data: JSON.stringify(data) }, {}, ); - await queryRunner.query(updateQuery, updateParams); + queryRunner.query(updateQuery, updateParams); } }); }