feat(core, editor): Support pairedItem for pinned data (#3843)

* 📘 Adjust interface

*  Adjust pindata in state store

*  Add utils

*  Replace utils calls

*  Adjust pindata intake and display

* 🔥 Remove excess BE fixes

* 📝 Update comment

* 🧪 Adjust tests

* 🔥 Remove unneeded helper

* 🚚 Improve naming

* 🧹 Clean up `ormconfig.ts`

* 📘 Add types and type guards

*  Improve serializer for sqlite

*  Create migration utils

*  Set up sqlite serializer

* 🗃️ Write sqlite migration

* 🗃️ Write MySQL migration

* 🗃️ Write Postgres migration

*  Add imports and exports to barrels

* 🚚 Rename `runChunked` to `runInBatches`

*  Improve migration loggers

* ♻️ Address feedback

* 🚚 Improve naming
This commit is contained in:
Iván Ovejero 2022-08-22 17:46:22 +02:00 committed by GitHub
parent 6bd7a09a45
commit b1e715299d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 399 additions and 143 deletions

View file

@ -22,7 +22,7 @@ import * as config from '../../../config';
import { DatabaseType, IWorkflowDb } from '../..';
import { TagEntity } from './TagEntity';
import { SharedWorkflow } from './SharedWorkflow';
import { objectRetriever, serializer } from '../utils/transformers';
import { objectRetriever, sqlite } from '../utils/transformers';
function resolveDataType(dataType: string) {
const dbType = config.getEnv('database.type');
@ -120,7 +120,7 @@ export class WorkflowEntity implements IWorkflowDb {
@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,
transformer: serializer,
transformer: sqlite.jsonColumn,
})
pinData: IPinData;

View file

@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
@ -22,7 +22,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
@ -65,7 +65,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
WHERE waitTill IS NOT NULL AND finished = 0
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
@ -158,7 +158,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
FROM ${tablePrefix}workflow_entity
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
@ -206,7 +206,7 @@ export class UpdateWorkflowCredentials1630451444017 implements MigrationInterfac
WHERE waitTill IS NOT NULL AND finished = 0
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;

View file

@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
import { v4 as uuid } from 'uuid';
// add node ids in workflow objects
@ -17,7 +17,7 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
let nodes = workflow.nodes;
if (typeof nodes === 'string') {
@ -31,16 +31,15 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
}
});
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});
@ -56,22 +55,21 @@ export class AddNodeIds1658932910559 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
nodes.forEach((node) => delete node.id );
nodes.forEach((node) => delete node.id);
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});

View file

@ -0,0 +1,46 @@
import {
logMigrationStart,
logMigrationEnd,
runInBatches,
getTablePrefix,
} from '../../utils/migrationHelpers';
import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData';
import type { MigrationInterface, QueryRunner } from 'typeorm';
/**
* Convert JSON-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659895550980 implements MigrationInterface {
name = 'AddJsonKeyPinData1659895550980';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const workflowTable = `${getTablePrefix()}workflow_entity`;
const PINDATA_SELECT_QUERY = `
SELECT id, pinData
FROM \`${workflowTable}\`
WHERE pinData IS NOT NULL;
`;
const PINDATA_UPDATE_STATEMENT = `
UPDATE \`${workflowTable}\`
SET \`pinData\` = :pinData
WHERE id = :id;
`;
await runInBatches(
queryRunner,
PINDATA_SELECT_QUERY,
addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT),
);
logMigrationEnd(this.name);
}
async down() {
// irreversible migration
}
}

View file

@ -18,6 +18,7 @@ import { CommunityNodes1652254514003 } from './1652254514003-CommunityNodes';
import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn';
import { IntroducePinData1654090101303 } from './1654090101303-IntroducePinData';
import { AddNodeIds1658932910559 } from './1658932910559-AddNodeIds';
import { AddJsonKeyPinData1659895550980 } from './1659895550980-AddJsonKeyPinData';
export const mysqlMigrations = [
InitialMigration1588157391238,
@ -40,4 +41,5 @@ export const mysqlMigrations = [
AddAPIKeyColumn1652905585850,
IntroducePinData1654090101303,
AddNodeIds1658932910559,
AddJsonKeyPinData1659895550980,
];

View file

@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
@ -17,7 +17,6 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
await queryRunner.query(`SET search_path TO ${schema};`);
const credentialsEntities = await queryRunner.query(`
SELECT id, name, type
FROM ${tablePrefix}credentials_entity
@ -29,7 +28,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
@ -72,7 +71,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
WHERE "waitTill" IS NOT NULL AND finished = FALSE
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;
@ -172,7 +171,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
FROM ${tablePrefix}workflow_entity
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
let credentialsUpdated = false;
@ -221,7 +220,7 @@ export class UpdateWorkflowCredentials1630419189837 implements MigrationInterfac
WHERE "waitTill" IS NOT NULL AND finished = FALSE
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = execution.workflowData;
let credentialsUpdated = false;

View file

@ -1,6 +1,6 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
import { v4 as uuid } from 'uuid';
// add node ids in workflow objects
@ -23,7 +23,7 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
@ -33,16 +33,15 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
}
});
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});
@ -64,22 +63,21 @@ export class AddNodeIds1658932090381 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = workflow.nodes;
// @ts-ignore
nodes.forEach((node) => delete node.id );
nodes.forEach((node) => delete node.id);
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE ${tablePrefix}workflow_entity
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});

View file

@ -0,0 +1,46 @@
import {
getTablePrefix,
logMigrationEnd,
logMigrationStart,
runInBatches,
} from '../../utils/migrationHelpers';
import { addJsonKeyToPinDataColumn } from '../sqlite/1659888469333-AddJsonKeyPinData';
import type { MigrationInterface, QueryRunner } from 'typeorm';
/**
* Convert JSON-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659902242948 implements MigrationInterface {
name = 'AddJsonKeyPinData1659902242948';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const workflowTable = `${getTablePrefix()}workflow_entity`;
const PINDATA_SELECT_QUERY = `
SELECT id, "pinData"
FROM ${workflowTable}
WHERE "pinData" IS NOT NULL;
`;
const PINDATA_UPDATE_STATEMENT = `
UPDATE ${workflowTable}
SET "pinData" = :pinData
WHERE id = :id;
`;
await runInBatches(
queryRunner,
PINDATA_SELECT_QUERY,
addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT),
);
logMigrationEnd(this.name);
}
async down() {
// irreversible migration
}
}

View file

@ -16,6 +16,7 @@ import { CommunityNodes1652254514002 } from './1652254514002-CommunityNodes';
import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn';
import { IntroducePinData1654090467022 } from './1654090467022-IntroducePinData';
import { AddNodeIds1658932090381 } from './1658932090381-AddNodeIds';
import { AddJsonKeyPinData1659902242948 } from './1659902242948-AddJsonKeyPinData';
export const postgresMigrations = [
InitialMigration1587669153312,
@ -36,4 +37,5 @@ export const postgresMigrations = [
AddAPIKeyColumn1652905585850,
IntroducePinData1654090467022,
AddNodeIds1658932090381,
AddJsonKeyPinData1659902242948,
];

View file

@ -1,7 +1,7 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
// replacing the credentials in workflows and execution
// `nodeType: name` changes to `nodeType: { id, name }`
@ -25,7 +25,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = JSON.parse(workflow.nodes);
let credentialsUpdated = false;
@ -68,7 +68,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
WHERE "waitTill" IS NOT NULL AND finished = 0
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
waitingExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData);
let credentialsUpdated = false;
@ -164,7 +164,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
// @ts-ignore
workflows.forEach(async (workflow) => {
const nodes = JSON.parse(workflow.nodes);
@ -214,7 +214,7 @@ export class UpdateWorkflowCredentials1630330987096 implements MigrationInterfac
`;
// @ts-ignore
await runChunked(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
await runInBatches(queryRunner, waitingExecutionsQuery, (waitingExecutions) => {
// @ts-ignore
waitingExecutions.forEach(async (execution) => {
const data = JSON.parse(execution.workflowData);

View file

@ -2,7 +2,7 @@ import { INode } from 'n8n-workflow';
import { MigrationInterface, QueryRunner } from 'typeorm';
import * as config from '../../../../config';
import { logMigrationEnd, logMigrationStart } from '../../utils/migrationHelpers';
import { runChunked } from '../../utils/migrationHelpers';
import { runInBatches } from '../../utils/migrationHelpers';
import { v4 as uuid } from 'uuid';
// add node ids in workflow objects
@ -21,7 +21,7 @@ export class AddNodeIds1658930531669 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = JSON.parse(workflow.nodes);
nodes.forEach((node: INode) => {
@ -30,16 +30,15 @@ export class AddNodeIds1658930531669 implements MigrationInterface {
}
});
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});
@ -48,7 +47,6 @@ export class AddNodeIds1658930531669 implements MigrationInterface {
logMigrationEnd(this.name);
}
public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = config.getEnv('database.tablePrefix');
@ -58,22 +56,21 @@ export class AddNodeIds1658930531669 implements MigrationInterface {
`;
// @ts-ignore
await runChunked(queryRunner, workflowsQuery, (workflows) => {
await runInBatches(queryRunner, workflowsQuery, (workflows) => {
workflows.forEach(async (workflow) => {
const nodes = JSON.parse(workflow.nodes);
// @ts-ignore
nodes.forEach((node) => delete node.id );
nodes.forEach((node) => delete node.id);
const [updateQuery, updateParams] =
queryRunner.connection.driver.escapeQueryWithParameters(
`
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET nodes = :nodes
WHERE id = '${workflow.id}'
`,
{ nodes: JSON.stringify(nodes) },
{},
);
{ nodes: JSON.stringify(nodes) },
{},
);
queryRunner.query(updateQuery, updateParams);
});

View file

@ -0,0 +1,93 @@
import {
logMigrationStart,
logMigrationEnd,
runInBatches,
getTablePrefix,
escapeQuery,
} from '../../utils/migrationHelpers';
import type { MigrationInterface, QueryRunner } from 'typeorm';
import { isJsonKeyObject, PinData } from '../../utils/migrations.types';
/**
* Convert TEXT-type `pinData` column in `workflow_entity` table from
* `{ [nodeName: string]: IDataObject[] }` to `{ [nodeName: string]: INodeExecutionData[] }`
*/
export class AddJsonKeyPinData1659888469333 implements MigrationInterface {
name = 'AddJsonKeyPinData1659888469333';
async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const workflowTable = `${getTablePrefix()}workflow_entity`;
const PINDATA_SELECT_QUERY = `
SELECT id, pinData
FROM "${workflowTable}"
WHERE pinData IS NOT NULL;
`;
const PINDATA_UPDATE_STATEMENT = `
UPDATE "${workflowTable}"
SET "pinData" = :pinData
WHERE id = :id;
`;
await runInBatches(
queryRunner,
PINDATA_SELECT_QUERY,
addJsonKeyToPinDataColumn(queryRunner, PINDATA_UPDATE_STATEMENT),
);
logMigrationEnd(this.name);
}
async down() {
// irreversible migration
}
}
export const addJsonKeyToPinDataColumn =
(queryRunner: QueryRunner, updateStatement: string) =>
async (fetchedWorkflows: PinData.FetchedWorkflow[]) => {
makeUpdateParams(fetchedWorkflows).forEach((param) => {
const params = {
pinData: param.pinData,
id: param.id,
};
const [escapedStatement, escapedParams] = escapeQuery(queryRunner, updateStatement, params);
queryRunner.query(escapedStatement, escapedParams);
});
};
function makeUpdateParams(fetchedWorkflows: PinData.FetchedWorkflow[]) {
return fetchedWorkflows.reduce<PinData.FetchedWorkflow[]>(
(updateParams, { id, pinData: rawPinData }) => {
const pinDataPerWorkflow: PinData.Old | PinData.New =
typeof rawPinData === 'string' ? JSON.parse(rawPinData) : rawPinData;
const newPinDataPerWorkflow = Object.keys(pinDataPerWorkflow).reduce<PinData.New>(
(newPinDataPerWorkflow, nodeName) => {
const pinDataPerNode = pinDataPerWorkflow[nodeName];
if (pinDataPerNode.every((item) => item.json)) return newPinDataPerWorkflow;
newPinDataPerWorkflow[nodeName] = pinDataPerNode.map((item) =>
isJsonKeyObject(item) ? item : { json: item },
);
return newPinDataPerWorkflow;
},
{},
);
if (Object.keys(newPinDataPerWorkflow).length > 0) {
updateParams.push({ id, pinData: JSON.stringify(newPinDataPerWorkflow) });
}
return updateParams;
},
[],
);
}

View file

@ -11,10 +11,11 @@ import { AddExecutionEntityIndexes1644421939510 } from './1644421939510-AddExecu
import { CreateUserManagement1646992772331 } from './1646992772331-CreateUserManagement';
import { LowerCaseUserEmail1648740597343 } from './1648740597343-LowerCaseUserEmail';
import { AddUserSettings1652367743993 } from './1652367743993-AddUserSettings';
import { CommunityNodes1652254514001 } from './1652254514001-CommunityNodes'
import { CommunityNodes1652254514001 } from './1652254514001-CommunityNodes';
import { AddAPIKeyColumn1652905585850 } from './1652905585850-AddAPIKeyColumn';
import { IntroducePinData1654089251344 } from './1654089251344-IntroducePinData';
import { AddNodeIds1658930531669 } from './1658930531669-AddNodeIds';
import { AddJsonKeyPinData1659888469333 } from './1659888469333-AddJsonKeyPinData';
const sqliteMigrations = [
InitialMigration1588102412422,
@ -34,6 +35,7 @@ const sqliteMigrations = [
AddAPIKeyColumn1652905585850,
IntroducePinData1654089251344,
AddNodeIds1658930531669,
AddJsonKeyPinData1659888469333,
];
export { sqliteMigrations };

View file

@ -2,6 +2,9 @@ import path from 'path';
import { UserSettings } from 'n8n-core';
import { entities } from './entities';
const MIGRATIONS_DIR = path.resolve('src', 'databases', 'migrations');
const ENTITIES_DIR = path.resolve('src', 'databases', 'entities');
export default [
{
name: 'sqlite',
@ -9,10 +12,10 @@ export default [
logging: true,
entities: Object.values(entities),
database: path.resolve(UserSettings.getUserN8nFolderPath(), 'database.sqlite'),
migrations: [path.resolve('migrations', 'sqlite', 'index.ts')],
migrations: [path.resolve(MIGRATIONS_DIR, 'sqlite', 'index.ts')],
cli: {
entitiesDir: path.resolve('entities'),
migrationsDir: path.resolve('migrations', 'sqlite'),
entitiesDir: ENTITIES_DIR,
migrationsDir: path.resolve(MIGRATIONS_DIR, 'sqlite'),
},
},
{
@ -26,10 +29,10 @@ export default [
port: 5432,
logging: false,
entities: Object.values(entities),
migrations: [path.resolve('migrations', 'postgresdb', 'index.ts')],
migrations: [path.resolve(MIGRATIONS_DIR, 'postgresdb', 'index.ts')],
cli: {
entitiesDir: path.resolve('entities'),
migrationsDir: path.resolve('migrations', 'postgresdb'),
entitiesDir: ENTITIES_DIR,
migrationsDir: path.resolve(MIGRATIONS_DIR, 'postgresdb'),
},
},
{
@ -42,10 +45,10 @@ export default [
port: 3306,
logging: false,
entities: Object.values(entities),
migrations: [path.resolve('migrations', 'mysqldb', 'index.ts')],
migrations: [path.resolve(MIGRATIONS_DIR, 'mysqldb', 'index.ts')],
cli: {
entitiesDir: path.resolve('entities'),
migrationsDir: path.resolve('migrations', 'mysqldb'),
entitiesDir: ENTITIES_DIR,
migrationsDir: path.resolve(MIGRATIONS_DIR, 'mysqldb'),
},
},
{
@ -58,10 +61,10 @@ export default [
port: 3306,
logging: false,
entities: Object.values(entities),
migrations: [path.resolve('migrations', 'mysqldb', 'index.ts')],
migrations: [path.resolve(MIGRATIONS_DIR, 'mysqldb', 'index.ts')],
cli: {
entitiesDir: path.resolve('entities'),
migrationsDir: path.resolve('migrations', 'mysqldb'),
entitiesDir: ENTITIES_DIR,
migrationsDir: path.resolve(MIGRATIONS_DIR, 'mysqldb'),
},
},
];

View file

@ -1,7 +1,8 @@
/* eslint-disable no-await-in-loop */
import { readFileSync, rmSync } from 'fs';
import { UserSettings } from 'n8n-core';
import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
import type { QueryRunner } from 'typeorm/query-runner/QueryRunner';
import config from '../../../config';
import { getLogger } from '../../Logger';
const PERSONALIZATION_SURVEY_FILENAME = 'personalizationSurvey.json';
@ -35,28 +36,36 @@ export function loadSurveyFromDisk(): string | null {
}
let logFinishTimeout: NodeJS.Timeout;
const disableLogging = process.argv[1].split('/').includes('jest');
export function logMigrationStart(migrationName: string): void {
export function logMigrationStart(
migrationName: string,
disableLogging = process.env.NODE_ENV === 'test',
): void {
if (disableLogging) return;
const logger = getLogger();
if (!logFinishTimeout) {
logger.warn('Migrations in progress, please do NOT stop the process.');
getLogger().warn('Migrations in progress, please do NOT stop the process.');
}
logger.debug(`Starting migration ${migrationName}`);
getLogger().debug(`Starting migration ${migrationName}`);
clearTimeout(logFinishTimeout);
}
export function logMigrationEnd(migrationName: string): void {
export function logMigrationEnd(
migrationName: string,
disableLogging = process.env.NODE_ENV === 'test',
): void {
if (disableLogging) return;
const logger = getLogger();
logger.debug(`Finished migration ${migrationName}`);
getLogger().debug(`Finished migration ${migrationName}`);
logFinishTimeout = setTimeout(() => {
logger.warn('Migrations finished.');
getLogger().warn('Migrations finished.');
}, 100);
}
export function chunkQuery(query: string, limit: number, offset = 0): string {
export function batchQuery(query: string, limit: number, offset = 0): string {
return `
${query}
LIMIT ${limit}
@ -64,7 +73,7 @@ export function chunkQuery(query: string, limit: number, offset = 0): string {
`;
}
export async function runChunked(
export async function runInBatches(
queryRunner: QueryRunner,
query: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@ -72,14 +81,42 @@ export async function runChunked(
limit = 100,
): Promise<void> {
let offset = 0;
let chunkedQuery: string;
let chunkedQueryResults: unknown[];
let batchedQuery: string;
let batchedQueryResults: unknown[];
// eslint-disable-next-line no-param-reassign
if (query.trim().endsWith(';')) query = query.trim().slice(0, -1);
do {
chunkedQuery = chunkQuery(query, limit, offset);
chunkedQueryResults = (await queryRunner.query(chunkedQuery)) as unknown[];
batchedQuery = batchQuery(query, limit, offset);
batchedQueryResults = (await queryRunner.query(batchedQuery)) as unknown[];
// pass a copy to prevent errors from mutation
await operation([...chunkedQueryResults]);
await operation([...batchedQueryResults]);
offset += limit;
} while (chunkedQueryResults.length === limit);
} while (batchedQueryResults.length === limit);
}
export const getTablePrefix = () => {
const tablePrefix = config.getEnv('database.tablePrefix');
if (config.getEnv('database.type') === 'postgresdb') {
const schema = config.getEnv('database.postgresdb.schema');
return [schema, tablePrefix].join('.');
}
return tablePrefix;
};
export const escapeQuery = (
queryRunner: QueryRunner,
query: string,
params: { [property: string]: unknown },
): [string, unknown[]] =>
queryRunner.connection.driver.escapeQueryWithParameters(
query,
{
pinData: params.pinData,
id: params.id,
},
{},
);

View file

@ -0,0 +1,22 @@
import type { IDataObject, INodeExecutionData } from 'n8n-workflow';
export namespace PinData {
export type Old = { [nodeName: string]: IDataObject[] };
export type New = { [nodeName: string]: INodeExecutionData[] };
export type FetchedWorkflow = { id: number; pinData: string | object };
}
export function isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } {
return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject);
}
export function isJsonKeyObject(item: unknown): item is {
json: unknown;
[otherKeys: string]: unknown;
} {
if (!isObjectLiteral(item)) return false;
return Object.keys(item).includes('json');
}

View file

@ -1,4 +1,5 @@
import { ValueTransformer } from 'typeorm';
import config from '../../../config';
export const idStringifier = {
from: (value: number): string | number => (typeof value === 'number' ? value.toString() : value),
@ -20,11 +21,14 @@ export const objectRetriever: ValueTransformer = {
};
/**
* Transformer to store object as string and retrieve string as object.
* Transformer for sqlite JSON columns to mimic JSON-as-object behavior
* from Postgres and MySQL.
*/
export const serializer: ValueTransformer = {
to: (value: object | string): string =>
typeof value === 'object' ? JSON.stringify(value) : value,
const jsonColumn: ValueTransformer = {
to: (value: object): string | object =>
config.getEnv('database.type') === 'sqlite' ? JSON.stringify(value) : value,
from: (value: string | object): object =>
typeof value === 'string' ? (JSON.parse(value) as object) : value,
};
export const sqlite = { jsonColumn };

View file

@ -3,8 +3,9 @@ import express from 'express';
import * as utils from './shared/utils';
import * as testDb from './shared/testDb';
import { WorkflowEntity } from '../../src/databases/entities/WorkflowEntity';
import type { Role } from '../../src/databases/entities/Role';
import { IPinData } from 'n8n-workflow';
import type { IPinData } from 'n8n-workflow';
jest.mock('../../src/telemetry');
@ -46,7 +47,7 @@ test('POST /workflows should store pin data for node in workflow', async () => {
const { pinData } = response.body.data as { pinData: IPinData };
expect(pinData).toMatchObject({ Spotify: [{ myKey: 'myValue' }] });
expect(pinData).toMatchObject(MOCK_PINDATA);
});
test('POST /workflows should set pin data to null if no pin data', async () => {
@ -80,7 +81,7 @@ test('GET /workflows/:id should return pin data', async () => {
const { pinData } = workflowRetrievalResponse.body.data as { pinData: IPinData };
expect(pinData).toMatchObject({ Spotify: [{ myKey: 'myValue' }] });
expect(pinData).toMatchObject(MOCK_PINDATA);
});
function makeWorkflow({ withPinData }: { withPinData: boolean }) {
@ -101,8 +102,10 @@ function makeWorkflow({ withPinData }: { withPinData: boolean }) {
];
if (withPinData) {
workflow.pinData = { Spotify: [{ myKey: 'myValue' }] };
workflow.pinData = MOCK_PINDATA;
}
return workflow;
}
const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] };

View file

@ -933,14 +933,9 @@ export class WorkflowExecute {
const { pinData } = this.runExecutionData.resultData;
if (pinData && !executionNode.disabled && pinData[executionNode.name] !== undefined) {
let nodePinData = pinData[executionNode.name];
const nodePinData = pinData[executionNode.name];
if (!Array.isArray(nodePinData)) nodePinData = [nodePinData];
const itemsPerRun = nodePinData.map((item, index) => {
return { json: item, pairedItem: { item: index } };
});
nodeSuccessData = [itemsPerRun]; // always zeroth runIndex
nodeSuccessData = [nodePinData]; // always zeroth runIndex
} else {
Logger.debug(`Running node "${executionNode.name}" started`, {
node: executionNode.name,

View file

@ -381,6 +381,7 @@ import { CodeEditor } from "@/components/forms";
import { dataPinningEventBus } from '../event-bus/data-pinning-event-bus';
import { stringSizeInBytes } from './helpers';
import RunDataTable from './RunDataTable.vue';
import { isJsonKeyObject } from '@/utils';
// A path that does not exist so that nothing is selected by default
const deselectedPlaceholder = '_!^&*';
@ -631,13 +632,7 @@ export default mixins(
let inputData = this.rawInputData;
if (this.node && this.pinData) {
inputData = Array.isArray(this.pinData)
? this.pinData.map((value) => ({
json: value,
}))
: [{
json: this.pinData,
}];
inputData = this.pinData;
}
const offset = this.pageSize * (this.currentPage - 1);
@ -734,7 +729,10 @@ export default mixins(
localStorage.setItem(LOCAL_STORAGE_PIN_DATA_DISCOVERY_CANVAS_FLAG, 'true');
},
enterEditMode({ origin }: EnterEditModeArgs) {
const inputData = this.pinData ? this.pinData : this.convertToJson(this.rawInputData);
const inputData = this.pinData
? this.clearJsonKey(this.pinData)
: this.convertToJson(this.rawInputData);
const data = inputData.length > 0
? inputData
: TEST_PIN_DATA;
@ -773,25 +771,18 @@ export default mixins(
}
this.$store.commit('ui/setOutputPanelEditModeEnabled', false);
this.$store.commit('pinData', { node: this.node, data: this.removeJsonKeys(value) });
this.$store.commit('pinData', { node: this.node, data: this.clearJsonKey(value) });
this.onDataPinningSuccess({ source: 'save-edit' });
this.onExitEditMode({ type: 'save' });
},
removeJsonKeys(value: string) {
const parsed = JSON.parse(value);
clearJsonKey(userInput: string | object) {
const parsedUserInput = typeof userInput === 'string' ? JSON.parse(userInput) : userInput;
return Array.isArray(parsed)
? parsed.map(item => this.isJsonKeyObject(item) ? item.json : item)
: parsed;
},
isJsonKeyObject(item: unknown): item is { json: unknown } {
if (!this.isObjectLiteral(item)) return false;
if (!Array.isArray(parsedUserInput)) return parsedUserInput;
const keys = Object.keys(item);
return keys.length === 1 && keys[0] === 'json';
return parsedUserInput.map(item => isJsonKeyObject(item) ? item.json : item);
},
onExitEditMode({ type }: { type: 'save' | 'cancel' }) {
this.$telemetry.track('User closed ndv edit state', {
@ -1186,7 +1177,7 @@ export default mixins(
let selectedValue = this.selectedOutput.value;
if (isNotSelected) {
if (this.hasPinData) {
selectedValue = this.pinData as object;
selectedValue = this.clearJsonKey(this.pinData as object);
} else {
selectedValue = this.convertToJson(this.getNodeInputData(this.node, this.runIndex, this.currentOutputIndex));
}

View file

@ -34,6 +34,7 @@ import { get } from 'lodash';
import mixins from 'vue-typed-mixins';
import { mapGetters } from 'vuex';
import { isObjectLiteral } from '@/utils';
export const nodeHelpers = mixins(
restApi,
@ -47,14 +48,10 @@ export const nodeHelpers = mixins(
return Object.keys(node.parameters).includes('nodeCredentialType');
},
isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } {
return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject);
},
isCustomApiCallSelected (nodeValues: INodeParameters): boolean {
const { parameters } = nodeValues;
if (!this.isObjectLiteral(parameters)) return false;
if (!isObjectLiteral(parameters)) return false;
return (
parameters.resource !== undefined && parameters.resource.includes(CUSTOM_API_CALL_KEY) ||

View file

@ -11,6 +11,7 @@ import {
IConnections,
IDataObject,
INodeConnections,
INodeExecutionData,
INodeIssueData,
INodeTypeDescription,
IPinData,
@ -48,6 +49,7 @@ import {stringSizeInBytes} from "@/components/helpers";
import {dataPinningEventBus} from "@/event-bus/data-pinning-event-bus";
import communityNodes from './modules/communityNodes';
import { isCommunityPackageName } from './components/helpers';
import { isJsonKeyObject } from './utils';
Vue.use(Vuex);
@ -214,15 +216,21 @@ export const store = new Vuex.Store({
},
// Pin data
pinData(state, payload: { node: INodeUi, data: IPinData[string] }) {
pinData(state, payload: { node: INodeUi, data: INodeExecutionData[] }) {
if (!state.workflow.pinData) {
Vue.set(state.workflow, 'pinData', {});
}
Vue.set(state.workflow.pinData!, payload.node.name, payload.data);
if (!Array.isArray(payload.data)) {
payload.data = [payload.data];
}
const storedPinData = payload.data.map(item => isJsonKeyObject(item) ? item : { json: item });
Vue.set(state.workflow.pinData!, payload.node.name, storedPinData);
state.stateIsDirty = true;
dataPinningEventBus.$emit('pin-data', { [payload.node.name]: payload.data });
dataPinningEventBus.$emit('pin-data', { [payload.node.name]: storedPinData });
},
unpinData(state, payload: { node: INodeUi }) {
if (!state.workflow.pinData) {

View file

@ -1 +1,14 @@
export const omit = (keyToOmit: string, { [keyToOmit]: _, ...remainder }) => remainder;
export function isObjectLiteral(maybeObject: unknown): maybeObject is { [key: string]: string } {
return typeof maybeObject === 'object' && maybeObject !== null && !Array.isArray(maybeObject);
}
export function isJsonKeyObject(item: unknown): item is {
json: unknown;
[otherKeys: string]: unknown;
} {
if (!isObjectLiteral(item)) return false;
return Object.keys(item).includes('json');
}

View file

@ -843,7 +843,7 @@ export interface INode {
}
export interface IPinData {
[nodeName: string]: IDataObject[];
[nodeName: string]: INodeExecutionData[];
}
export interface INodes {