Run migration in chunks (#2393)

This commit is contained in:
Ben Hesseldieck 2021-11-03 16:12:48 +01:00 committed by GitHub
parent 4f9aee14b5
commit 0877f485d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 560 additions and 247 deletions

View file

@ -0,0 +1,39 @@
import { QueryRunner } from 'typeorm';
export class MigrationHelpers {
queryRunner: QueryRunner;
constructor(queryRunner: QueryRunner) {
this.queryRunner = queryRunner;
}
// runs an operation sequential on chunks of a query that returns a potentially large Array.
/* eslint-disable no-await-in-loop */
async runChunked(
query: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
operation: (results: any[]) => Promise<void>,
limit = 100,
): Promise<void> {
let offset = 0;
let chunkedQuery: string;
let chunkedQueryResults: unknown[];
do {
chunkedQuery = this.chunkQuery(query, limit, offset);
chunkedQueryResults = (await this.queryRunner.query(chunkedQuery)) as unknown[];
// pass a copy to prevent errors from mutation
await operation([...chunkedQueryResults]);
offset += limit;
} while (chunkedQueryResults.length === limit);
}
/* eslint-enable no-await-in-loop */
private chunkQuery(query: string, limit: number, offset = 0): string {
return `
${query}
LIMIT ${limit}
OFFSET ${offset}
`;
}
}

View file

@ -1,5 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm'; import { MigrationInterface, QueryRunner } from 'typeorm';
import config = require('../../../../config'); import config = require('../../../../config');
import { MigrationHelpers } from '../../MigrationHelpers';
// replacing the credentials in workflows and execution // replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }` // `nodeType: name` changes to `nodeType: { id, name }`
@ -8,58 +9,100 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
name = 'UpdateWorkflowCredentials1630451444017'; name = 'UpdateWorkflowCredentials1630451444017';
public async up(queryRunner: QueryRunner): Promise<void> { public async up(queryRunner: QueryRunner): Promise<void> {
console.log('Start migration', this.name);
console.time(this.name);
const tablePrefix = config.get('database.tablePrefix'); const tablePrefix = config.get('database.tablePrefix');
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM ${tablePrefix}credentials_entity FROM ${tablePrefix}credentials_entity
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM ${tablePrefix}workflow_entity FROM ${tablePrefix}workflow_entity
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = workflow.nodes; workflows.forEach(async (workflow) => {
let credentialsUpdated = false; const nodes = workflow.nodes;
// @ts-ignore let credentialsUpdated = false;
nodes.forEach((node) => { // @ts-ignore
if (node.credentials) { nodes.forEach((node) => {
const allNodeCredentials = Object.entries(node.credentials); if (node.credentials) {
for (const [type, name] of allNodeCredentials) { const allNodeCredentials = Object.entries(node.credentials);
if (typeof name === 'string') { for (const [type, name] of allNodeCredentials) {
// @ts-ignore if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.name === name && credentials.type === type, (credentials) => credentials.name === name && credentials.type === type,
); );
node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name };
credentialsUpdated = true; credentialsUpdated = true;
}
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, workflowData SELECT id, workflowData
FROM ${tablePrefix}execution_entity FROM ${tablePrefix}execution_entity
WHERE waitTill IS NOT NULL AND finished = 0 WHERE waitTill IS NOT NULL AND finished = 0
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}execution_entity
SET workflowData = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, workflowData SELECT id, workflowData
@ -68,8 +111,8 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
ORDER BY startedAt DESC ORDER BY startedAt DESC
LIMIT 200 LIMIT 200
`); `);
// @ts-ignore
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { retryableExecutions.forEach(async (execution) => {
const data = execution.workflowData; const data = execution.workflowData;
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -78,7 +121,6 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
const allNodeCredentials = Object.entries(node.credentials); const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) { for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') { if (typeof name === 'string') {
// @ts-ignore
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.name === name && credentials.type === type, (credentials) => credentials.name === name && credentials.type === type,
@ -92,77 +134,124 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
if (credentialsUpdated) { if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
` `
UPDATE ${tablePrefix}execution_entity UPDATE ${tablePrefix}execution_entity
SET workflowData = :data SET workflowData = :data
WHERE id = '${execution.id}' WHERE id = '${execution.id}'
`, `,
{ data: JSON.stringify(data) }, { data: JSON.stringify(data) },
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
console.timeEnd(this.name);
} }
public async down(queryRunner: QueryRunner): Promise<void> { public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix'); const tablePrefix = config.get('database.tablePrefix');
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM ${tablePrefix}credentials_entity FROM ${tablePrefix}credentials_entity
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM ${tablePrefix}workflow_entity FROM ${tablePrefix}workflow_entity
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = workflow.nodes; workflows.forEach(async (workflow) => {
let credentialsUpdated = false; const nodes = workflow.nodes;
// @ts-ignore let credentialsUpdated = false;
nodes.forEach((node) => { // @ts-ignore
if (node.credentials) { nodes.forEach((node) => {
const allNodeCredentials = Object.entries(node.credentials); if (node.credentials) {
for (const [type, creds] of allNodeCredentials) { const allNodeCredentials = Object.entries(node.credentials);
if (typeof creds === 'object') { for (const [type, creds] of allNodeCredentials) {
// @ts-ignore if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type, (credentials) => credentials.id === creds.id && credentials.type === type,
); );
if (matchingCredentials) { if (matchingCredentials) {
node.credentials[type] = matchingCredentials.name; node.credentials[type] = matchingCredentials.name;
} else { } else {
// @ts-ignore // @ts-ignore
node.credentials[type] = creds.name; node.credentials[type] = creds.name;
}
credentialsUpdated = true;
} }
credentialsUpdated = true;
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, workflowData SELECT id, workflowData
FROM ${tablePrefix}execution_entity FROM ${tablePrefix}execution_entity
WHERE waitTill IS NOT NULL AND finished = 0 WHERE waitTill IS NOT NULL AND finished = 0
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
// @ts-ignore
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type,
);
if (matchingCredentials) {
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}execution_entity
SET workflowData = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, workflowData SELECT id, workflowData
@ -171,8 +260,8 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
ORDER BY startedAt DESC ORDER BY startedAt DESC
LIMIT 200 LIMIT 200
`); `);
// @ts-ignore
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { retryableExecutions.forEach(async (execution) => {
const data = execution.workflowData; const data = execution.workflowData;
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -200,15 +289,15 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
if (credentialsUpdated) { if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
` `
UPDATE ${tablePrefix}execution_entity UPDATE ${tablePrefix}execution_entity
SET workflowData = :data SET workflowData = :data
WHERE id = '${execution.id}' WHERE id = '${execution.id}'
`, `,
{ data: JSON.stringify(data) }, { data: JSON.stringify(data) },
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
} }

View file

@ -1,5 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm'; import { MigrationInterface, QueryRunner } from 'typeorm';
import config = require('../../../../config'); import config = require('../../../../config');
import { MigrationHelpers } from '../../MigrationHelpers';
// replacing the credentials in workflows and execution // replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }` // `nodeType: name` changes to `nodeType: { id, name }`
@ -8,62 +9,104 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
name = 'UpdateWorkflowCredentials1630419189837'; name = 'UpdateWorkflowCredentials1630419189837';
public async up(queryRunner: QueryRunner): Promise<void> { public async up(queryRunner: QueryRunner): Promise<void> {
console.log('Start migration', this.name);
console.time(this.name);
let tablePrefix = config.get('database.tablePrefix'); let tablePrefix = config.get('database.tablePrefix');
const schema = config.get('database.postgresdb.schema'); const schema = config.get('database.postgresdb.schema');
if (schema) { if (schema) {
tablePrefix = schema + '.' + tablePrefix; tablePrefix = schema + '.' + tablePrefix;
} }
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM ${tablePrefix}credentials_entity FROM ${tablePrefix}credentials_entity
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM ${tablePrefix}workflow_entity FROM ${tablePrefix}workflow_entity
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = workflow.nodes; workflows.forEach(async (workflow) => {
let credentialsUpdated = false; const nodes = workflow.nodes;
// @ts-ignore let credentialsUpdated = false;
nodes.forEach((node) => { // @ts-ignore
if (node.credentials) { nodes.forEach((node) => {
const allNodeCredentials = Object.entries(node.credentials); if (node.credentials) {
for (const [type, name] of allNodeCredentials) { const allNodeCredentials = Object.entries(node.credentials);
if (typeof name === 'string') { for (const [type, name] of allNodeCredentials) {
// @ts-ignore if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.name === name && credentials.type === type, (credentials) => credentials.name === name && credentials.type === type,
); );
node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name }; node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name };
credentialsUpdated = true; credentialsUpdated = true;
}
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, "workflowData" SELECT id, "workflowData"
FROM ${tablePrefix}execution_entity FROM ${tablePrefix}execution_entity
WHERE "waitTill" IS NOT NULL AND finished = FALSE WHERE "waitTill" IS NOT NULL AND finished = FALSE
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id.toString() || null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}execution_entity
SET "workflowData" = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, "workflowData" SELECT id, "workflowData"
@ -73,7 +116,8 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
LIMIT 200 LIMIT 200
`); `);
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { // @ts-ignore
retryableExecutions.forEach(async (execution) => {
const data = execution.workflowData; const data = execution.workflowData;
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -104,9 +148,10 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
console.timeEnd(this.name);
} }
public async down(queryRunner: QueryRunner): Promise<void> { public async down(queryRunner: QueryRunner): Promise<void> {
@ -115,62 +160,109 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
if (schema) { if (schema) {
tablePrefix = schema + '.' + tablePrefix; tablePrefix = schema + '.' + tablePrefix;
} }
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM ${tablePrefix}credentials_entity FROM ${tablePrefix}credentials_entity
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM ${tablePrefix}workflow_entity FROM ${tablePrefix}workflow_entity
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = workflow.nodes; workflows.forEach(async (workflow) => {
let credentialsUpdated = false; const nodes = workflow.nodes;
// @ts-ignore let credentialsUpdated = false;
nodes.forEach((node) => { // @ts-ignore
if (node.credentials) { nodes.forEach((node) => {
const allNodeCredentials = Object.entries(node.credentials); if (node.credentials) {
for (const [type, creds] of allNodeCredentials) { const allNodeCredentials = Object.entries(node.credentials);
if (typeof creds === 'object') { for (const [type, creds] of allNodeCredentials) {
// @ts-ignore if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type, const matchingCredentials = credentialsEntities.find(
); // @ts-ignore
if (matchingCredentials) { (credentials) => credentials.id === creds.id && credentials.type === type,
node.credentials[type] = matchingCredentials.name; );
} else { if (matchingCredentials) {
// @ts-ignore node.credentials[type] = matchingCredentials.name;
node.credentials[type] = creds.name; } else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
} }
credentialsUpdated = true;
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, "workflowData" SELECT id, "workflowData"
FROM ${tablePrefix}execution_entity FROM ${tablePrefix}execution_entity
WHERE "waitTill" IS NOT NULL AND finished = FALSE WHERE "waitTill" IS NOT NULL AND finished = FALSE
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
// @ts-ignore
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type,
);
if (matchingCredentials) {
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}execution_entity
SET "workflowData" = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, "workflowData" SELECT id, "workflowData"
@ -179,8 +271,8 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
ORDER BY "startedAt" DESC ORDER BY "startedAt" DESC
LIMIT 200 LIMIT 200
`); `);
// @ts-ignore
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { retryableExecutions.forEach(async (execution) => {
const data = execution.workflowData; const data = execution.workflowData;
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -208,15 +300,15 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
if (credentialsUpdated) { if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
` `
UPDATE ${tablePrefix}execution_entity UPDATE ${tablePrefix}execution_entity
SET "workflowData" = :data SET "workflowData" = :data
WHERE id = '${execution.id}' WHERE id = '${execution.id}'
`, `,
{ data: JSON.stringify(data) }, { data: JSON.stringify(data) },
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
} }

View file

@ -1,5 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm'; import { MigrationInterface, QueryRunner } from 'typeorm';
import config = require('../../../../config'); import config = require('../../../../config');
import { MigrationHelpers } from '../../MigrationHelpers';
// replacing the credentials in workflows and execution // replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }` // `nodeType: name` changes to `nodeType: { id, name }`
@ -8,58 +9,101 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
name = 'UpdateWorkflowCredentials1630330987096'; name = 'UpdateWorkflowCredentials1630330987096';
public async up(queryRunner: QueryRunner): Promise<void> { public async up(queryRunner: QueryRunner): Promise<void> {
console.log('Start migration', this.name);
console.time(this.name);
const tablePrefix = config.get('database.tablePrefix'); const tablePrefix = config.get('database.tablePrefix');
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM "${tablePrefix}credentials_entity" FROM "${tablePrefix}credentials_entity"
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM "${tablePrefix}workflow_entity" FROM "${tablePrefix}workflow_entity"
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = JSON.parse(workflow.nodes); workflows.forEach(async (workflow) => {
let credentialsUpdated = false; const nodes = JSON.parse(workflow.nodes);
// @ts-ignore let credentialsUpdated = false;
nodes.forEach((node) => { // @ts-ignore
if (node.credentials) { nodes.forEach((node) => {
const allNodeCredentials = Object.entries(node.credentials); if (node.credentials) {
for (const [type, name] of allNodeCredentials) { const allNodeCredentials = Object.entries(node.credentials);
if (typeof name === 'string') { for (const [type, name] of allNodeCredentials) {
// @ts-ignore if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.name === name && credentials.type === type, (credentials) => credentials.name === name && credentials.type === type,
); );
node.credentials[type] = { id: matchingCredentials?.id || null, name }; node.credentials[type] = { id: matchingCredentials?.id || null, name };
credentialsUpdated = true; credentialsUpdated = true;
}
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, "workflowData" SELECT id, "workflowData"
FROM "${tablePrefix}execution_entity" FROM "${tablePrefix}execution_entity"
WHERE "waitTill" IS NOT NULL AND finished = 0 WHERE "waitTill" IS NOT NULL AND finished = 0
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData);
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') {
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.name === name && credentials.type === type,
);
node.credentials[type] = { id: matchingCredentials?.id || null, name };
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}execution_entity"
SET "workflowData" = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, "workflowData" SELECT id, "workflowData"
@ -68,8 +112,8 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
ORDER BY "startedAt" DESC ORDER BY "startedAt" DESC
LIMIT 200 LIMIT 200
`); `);
// @ts-ignore
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { retryableExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData); const data = JSON.parse(execution.workflowData);
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -78,7 +122,6 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
const allNodeCredentials = Object.entries(node.credentials); const allNodeCredentials = Object.entries(node.credentials);
for (const [type, name] of allNodeCredentials) { for (const [type, name] of allNodeCredentials) {
if (typeof name === 'string') { if (typeof name === 'string') {
// @ts-ignore
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.name === name && credentials.type === type, (credentials) => credentials.name === name && credentials.type === type,
@ -92,77 +135,127 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
if (credentialsUpdated) { if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
` `
UPDATE "${tablePrefix}execution_entity" UPDATE "${tablePrefix}execution_entity"
SET "workflowData" = :data SET "workflowData" = :data
WHERE id = '${execution.id}' WHERE id = '${execution.id}'
`, `,
{ data: JSON.stringify(data) }, { data: JSON.stringify(data) },
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
console.timeEnd(this.name);
} }
public async down(queryRunner: QueryRunner): Promise<void> { public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.get('database.tablePrefix'); const tablePrefix = config.get('database.tablePrefix');
const helpers = new MigrationHelpers(queryRunner);
const credentialsEntities = await queryRunner.query(` const credentialsEntities = await queryRunner.query(`
SELECT id, name, type SELECT id, name, type
FROM "${tablePrefix}credentials_entity" FROM "${tablePrefix}credentials_entity"
`); `);
const workflows = await queryRunner.query(` const workflowsQuery = `
SELECT id, nodes SELECT id, nodes
FROM "${tablePrefix}workflow_entity" FROM "${tablePrefix}workflow_entity"
`); `;
// @ts-ignore // @ts-ignore
workflows.forEach(async (workflow) => { await helpers.runChunked(workflowsQuery, (workflows) => {
const nodes = JSON.parse(workflow.nodes);
let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
nodes.forEach((node) => { workflows.forEach(async (workflow) => {
if (node.credentials) { const nodes = JSON.parse(workflow.nodes);
const allNodeCredentials = Object.entries(node.credentials); let credentialsUpdated = false;
for (const [type, creds] of allNodeCredentials) { // @ts-ignore
if (typeof creds === 'object') { nodes.forEach((node) => {
// @ts-ignore if (node.credentials) {
const matchingCredentials = credentialsEntities.find( const allNodeCredentials = Object.entries(node.credentials);
// @ts-ignore for (const [type, creds] of allNodeCredentials) {
(credentials) => credentials.id === creds.id && credentials.type === type, if (typeof creds === 'object') {
); const matchingCredentials = credentialsEntities.find(
if (matchingCredentials) { // @ts-ignore
node.credentials[type] = matchingCredentials.name; (credentials) => credentials.id === creds.id && credentials.type === type,
} else { );
// @ts-ignore if (matchingCredentials) {
node.credentials[type] = creds.name; node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
} }
credentialsUpdated = true;
} }
} }
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
} }
}); });
if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
}); });
const waitingExecutions = await queryRunner.query(` const waitingExecutionsQuery = `
SELECT id, "workflowData" SELECT id, "workflowData"
FROM "${tablePrefix}execution_entity" FROM "${tablePrefix}execution_entity"
WHERE "waitTill" IS NOT NULL AND finished = 0 WHERE "waitTill" IS NOT NULL AND finished = 0
`); `;
// @ts-ignore
await helpers.runChunked(waitingExecutionsQuery, (waitingExecutions) => {
// @ts-ignore
waitingExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData);
let credentialsUpdated = false;
// @ts-ignore
data.nodes.forEach((node) => {
if (node.credentials) {
const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') {
const matchingCredentials = credentialsEntities.find(
// @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type,
);
if (matchingCredentials) {
node.credentials[type] = matchingCredentials.name;
} else {
// @ts-ignore
node.credentials[type] = creds.name;
}
credentialsUpdated = true;
}
}
}
});
if (credentialsUpdated) {
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}execution_entity"
SET "workflowData" = :data
WHERE id = '${execution.id}'
`,
{ data: JSON.stringify(data) },
{},
);
await queryRunner.query(updateQuery, updateParams);
}
});
});
const retryableExecutions = await queryRunner.query(` const retryableExecutions = await queryRunner.query(`
SELECT id, "workflowData" SELECT id, "workflowData"
@ -172,7 +265,8 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
LIMIT 200 LIMIT 200
`); `);
[...waitingExecutions, ...retryableExecutions].forEach(async (execution) => { // @ts-ignore
retryableExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData); const data = JSON.parse(execution.workflowData);
let credentialsUpdated = false; let credentialsUpdated = false;
// @ts-ignore // @ts-ignore
@ -181,7 +275,6 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
const allNodeCredentials = Object.entries(node.credentials); const allNodeCredentials = Object.entries(node.credentials);
for (const [type, creds] of allNodeCredentials) { for (const [type, creds] of allNodeCredentials) {
if (typeof creds === 'object') { if (typeof creds === 'object') {
// @ts-ignore
const matchingCredentials = credentialsEntities.find( const matchingCredentials = credentialsEntities.find(
// @ts-ignore // @ts-ignore
(credentials) => credentials.id === creds.id && credentials.type === type, (credentials) => credentials.id === creds.id && credentials.type === type,
@ -200,15 +293,15 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
if (credentialsUpdated) { if (credentialsUpdated) {
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
` `
UPDATE "${tablePrefix}execution_entity" UPDATE "${tablePrefix}execution_entity"
SET "workflowData" = :data SET "workflowData" = :data
WHERE id = '${execution.id}' WHERE id = '${execution.id}'
`, `,
{ data: JSON.stringify(data) }, { data: JSON.stringify(data) },
{}, {},
); );
await queryRunner.query(updateQuery, updateParams); queryRunner.query(updateQuery, updateParams);
} }
}); });
} }