From 41669c0e0f8911661ef22c1efc029e7abe324029 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Sat, 24 Apr 2021 22:55:14 +0200 Subject: [PATCH] :sparkles: Add options to run queries as transactions (#1612) * add multi return * add independently and transaction to query * pgInsert normal and transaction * independently for pgInsert * normal, transaction and independently for pgUpdate * cleanup * implement it in other nodes * multiple fixes * add optional returning support * clean up Postgres functions * fix other postgres based dbs * Added option to run queries as a transaction to Postgres This commit allows users to configure Postgres, CrateDB, TimescaleDB and QuestDB to run queries independently or as transactions as well as the previous mode which is to execute multiple queries at once. Previous behavior remains untouched so we only added new options. * Standardize behavior across nodes that use postgres protocol Also fixed unit tests. * Added breaking change notice * Added more information to breaking changes * :zap: Styling fixes Co-authored-by: lublak Co-authored-by: Jan Oberhauser --- packages/cli/BREAKING-CHANGES.md | 20 ++ .../nodes-base/nodes/CrateDb/CrateDb.node.ts | 161 ++++++++---- .../nodes/Postgres/Postgres.node.functions.ts | 237 +++++++++++++----- .../nodes/Postgres/Postgres.node.ts | 87 +++++-- .../nodes-base/nodes/QuestDb/QuestDb.node.ts | 118 ++++++--- .../nodes/TimescaleDb/TimescaleDb.node.ts | 81 ++++-- .../Postgres/Postgres.node.functions.test.js | 70 +++--- packages/nodes-base/tsconfig.json | 3 +- 8 files changed, 559 insertions(+), 218 deletions(-) diff --git a/packages/cli/BREAKING-CHANGES.md b/packages/cli/BREAKING-CHANGES.md index aeb4f7e275..5435714e8e 100644 --- a/packages/cli/BREAKING-CHANGES.md +++ b/packages/cli/BREAKING-CHANGES.md @@ -2,6 +2,26 @@ This list shows all the versions which include breaking changes and how to upgrade. +## 0.117.0 + +### What changed? + +Changed the behavior for nodes that use Postgres Wire Protocol: Postgres, QuestDB, CrateDB and TimescaleDB. + +All nodes have been standardized and now follow the same patterns. Behavior will be the same for most cases, but new added functionality can now be explored. + +You can now also inform how you would like n8n to execute queries. Default mode is `Multiple queries` which translates to previous behavior, but you can now run them `Independently` or `Transaction`. Also, `Continue on Fail` now plays a major role for the new modes. + +The node output for `insert` operations now rely on the new parameter `Return fields`, just like `update` operations did previously. + +### When is action necessary? + +If you rely on the output returned by `insert` operations for any of the mentioned nodes, we recommend you review your workflows. + +By default, all `insert` operations will have `Return fields: *` as the default, setting, returning all information inserted. + +Previously, the node would return all information it received, without taking into account what actually happened in the database. + ## 0.113.0 ### What changed? diff --git a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts index a2577282b8..da74abdd6c 100644 --- a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts +++ b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts @@ -8,9 +8,12 @@ import { } from 'n8n-workflow'; import { + generateReturning, getItemCopy, + getItemsCopy, pgInsert, pgQuery, + pgUpdate, } from '../Postgres/Postgres.node.functions'; import * as pgPromise from 'pg-promise'; @@ -125,22 +128,23 @@ export class CrateDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update // ---------------------------------- + { + displayName: 'Schema', + name: 'schema', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: 'doc', + required: true, + description: 'Name of the schema the table belongs to', + }, { displayName: 'Table', name: 'table', @@ -166,7 +170,7 @@ export class CrateDb implements INodeType { default: 'id', required: true, description: - 'Name of the property which decides which rows in the database should be updated. Normally that would be "id".', + 'Comma separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', }, { displayName: 'Columns', @@ -182,6 +186,57 @@ export class CrateDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.', }, + + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -206,7 +261,7 @@ export class CrateDb implements INodeType { const db = pgp(config); - let returnItems = []; + let returnItems: INodeExecutionData[] = []; const items = this.getInputData(); const operation = this.getNodeParameter('operation', 0) as string; @@ -216,66 +271,68 @@ export class CrateDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - ...insertItems[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { // ---------------------------------- // update // ---------------------------------- - const tableName = this.getNodeParameter('table', 0) as string; - const updateKey = this.getNodeParameter('updateKey', 0) as string; - const queries : string[] = []; - const updatedKeys : string[] = []; - let updateKeyValue : string | number; - let columns : string[] = []; + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = additionalFields.mode ?? 'multiple' as string; - items.map(item => { - const setOperations : string[] = []; - columns = Object.keys(item.json); - columns.map((col : string) => { - if (col !== updateKey) { - if (typeof item.json[col] === 'string') { - setOperations.push(`${col} = \'${item.json[col]}\'`); - } else { - setOperations.push(`${col} = ${item.json[col]}`); - } + if(mode === 'independently') { + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); + + returnItems = this.helpers.returnJsonArray(updateItems); + } else if(mode === 'multiple') { + // Crate db does not support multiple-update queries + // Therefore we cannot invoke `pgUpdate` using multiple mode + // so we have to call multiple updates manually here + + const table = this.getNodeParameter('table', 0) as string; + const schema = this.getNodeParameter('schema', 0) as string; + const updateKeys = (this.getNodeParameter('updateKey', 0) as string).split(',').map(column => column.trim()); + const columns = (this.getNodeParameter('columns', 0) as string).split(',').map(column => column.trim()); + const queryColumns = columns.slice(); + + updateKeys.forEach(updateKey => { + if (!queryColumns.includes(updateKey)) { + columns.unshift(updateKey); + queryColumns.unshift('?' + updateKey); } }); - updateKeyValue = item.json[updateKey] as string | number; + const cs = new pgp.helpers.ColumnSet(queryColumns, { table: { table, schema } }); - if (updateKeyValue === undefined) { - throw new NodeOperationError(this.getNode(), 'No value found for update key!'); + const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey) + ' = ${' + updateKey + '}').join(' AND '); + // updateKeyValue = item.json[updateKey] as string | number; + // if (updateKeyValue === undefined) { + // throw new NodeOperationError(this.getNode(), 'No value found for update key!'); + // } + + const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string); + const queries:string[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columns); + queries.push(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning); } - - updatedKeys.push(updateKeyValue as string); - - const query = `UPDATE "${tableName}" SET ${setOperations.join(',')} WHERE ${updateKey} = ${updateKeyValue};`; - queries.push(query); - }); - - - await db.any(pgp.helpers.concat(queries)); - - returnItems = this.helpers.returnJsonArray(getItemCopy(items, columns) as IDataObject[]); + const updateItems = await db.multi(pgp.helpers.concat(queries)); + returnItems = this.helpers.returnJsonArray(getItemsCopy(items, columns) as IDataObject[]); + } } else { await pgp.end(); throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`); diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts index df1f10cbbc..30cf358579 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts @@ -3,29 +3,50 @@ import pgPromise = require('pg-promise'); import pg = require('pg-promise/typescript/pg-subset'); /** - * Returns of copy of the items which only contains the json data and + * Returns of a shallow copy of the items which only contains the json data and * of that only the define properties * * @param {INodeExecutionData[]} items The items to copy * @param {string[]} properties The properties it should include * @returns */ -export function getItemCopy(items: INodeExecutionData[], properties: string[]): IDataObject[] { - // Prepare the data to insert and copy it to be returned +export function getItemsCopy(items: INodeExecutionData[], properties: string[]): IDataObject[] { let newItem: IDataObject; return items.map(item => { newItem = {}; for (const property of properties) { - if (item.json[property] === undefined) { - newItem[property] = null; - } else { - newItem[property] = JSON.parse(JSON.stringify(item.json[property])); - } + newItem[property] = item.json[property]; } return newItem; }); } +/** + * Returns of a shallow copy of the item which only contains the json data and + * of that only the define properties + * + * @param {INodeExecutionData} item The item to copy + * @param {string[]} properties The properties it should include + * @returns + */ +export function getItemCopy(item: INodeExecutionData, properties: string[]): IDataObject { + const newItem: IDataObject = {}; + for (const property of properties) { + newItem[property] = item.json[property]; + } + return newItem; +} + +/** + * Returns a returning clause from a comma separated string + * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance + * @param string returning The comma separated string + * @returns string + */ +export function generateReturning(pgp: pgPromise.IMain<{}, pg.IClient>, returning: string): string { + return ' RETURNING ' + returning.split(',').map(returnedField => pgp.as.name(returnedField.trim())).join(', '); +} + /** * Executes the given SQL query on the database. * @@ -33,20 +54,53 @@ export function getItemCopy(items: INodeExecutionData[], properties: string[]): * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance * @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection * @param {input[]} input The Node's input data - * @returns Promise> + * @returns Promise> */ -export function pgQuery( +export async function pgQuery( getNodeParam: Function, pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, input: INodeExecutionData[], -): Promise { - const queries: string[] = []; - for (let i = 0; i < input.length; i++) { - queries.push(getNodeParam('query', i) as string); + continueOnFail: boolean, + overrideMode?: string, +): Promise { + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; + if (mode === 'multiple') { + const queries: string[] = []; + for (let i = 0; i < input.length; i++) { + queries.push(getNodeParam('query', i) as string); + } + return (await db.multi(pgp.helpers.concat(queries))).flat(1); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < input.length; i++) { + try { + Array.prototype.push.apply(result, await t.any(getNodeParam('query', i) as string)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...input[i].json, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < input.length; i++) { + try { + Array.prototype.push.apply(result, await t.any(getNodeParam('query', i) as string)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...input[i].json, code: err.code, message: err.message }); + } + } + return result; + }); } - - return db.any(pgp.helpers.concat(queries)); + throw new Error('multiple, independently or transaction are valid options'); } /** @@ -63,33 +117,63 @@ export async function pgInsert( pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, items: INodeExecutionData[], -): Promise { + continueOnFail: boolean, + overrideMode?: string, +): Promise { const table = getNodeParam('table', 0) as string; const schema = getNodeParam('schema', 0) as string; - let returnFields = (getNodeParam('returnFields', 0) as string).split(',') as string[]; const columnString = getNodeParam('columns', 0) as string; const columns = columnString.split(',') .map(column => column.trim().split(':')) .map(([name, cast]) => ({ name, cast })); - - const te = new pgp.helpers.TableName({ table, schema }); - - // Prepare the data to insert and copy it to be returned const columnNames = columns.map(column => column.name); - const insertItems = getItemCopy(items, columnNames); - const columnSet = new pgp.helpers.ColumnSet(columns); + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); - // Generate the multi-row insert query and return the id of new row - returnFields = returnFields.map(value => value.trim()).filter(value => !!value); - const query = - pgp.helpers.insert(insertItems, columnSet, te) + - (returnFields.length ? ` RETURNING ${returnFields.join(',')}` : ''); + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; - // Executing the query to insert the data - const insertData = await db.manyOrNone(query); + const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string); + if (mode === 'multiple') { + const query = pgp.helpers.insert(getItemsCopy(items, columnNames), cs) + returning; + return db.any(query); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + result.push(await t.one(pgp.helpers.insert(itemCopy, cs) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + const insertResult = await t.oneOrNone(pgp.helpers.insert(itemCopy, cs) + returning); + if (insertResult !== null) { + result.push(insertResult); + } + } catch (err) { + if (continueOnFail === false) { + throw err; + } + result.push({ ...itemCopy, code: err.code, message: err.message }); + } + } + return result; + }); + } - return [insertData, insertItems]; + throw new Error('multiple, independently or transaction are valid options'); } /** @@ -106,45 +190,80 @@ export async function pgUpdate( pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, items: INodeExecutionData[], + continueOnFail = false, ): Promise { const table = getNodeParam('table', 0) as string; const schema = getNodeParam('schema', 0) as string; const updateKey = getNodeParam('updateKey', 0) as string; const columnString = getNodeParam('columns', 0) as string; - - const [updateColumnName, updateColumnCast] = updateKey.split(':'); - const updateColumn = { - name: updateColumnName, - cast: updateColumnCast, - }; - const columns = columnString.split(',') .map(column => column.trim().split(':')) .map(([name, cast]) => ({ name, cast })); - const te = new pgp.helpers.TableName({ table, schema }); + const updateKeys = updateKey.split(',').map(key => { + const [name, cast] = key.trim().split(':'); + const updateColumn = { name, cast }; + const targetCol = columns.find((column) => column.name === name); + if (!targetCol) { + columns.unshift(updateColumn); + } + else if (!targetCol.cast) { + targetCol.cast = updateColumn.cast || targetCol.cast; + } + return updateColumn; + }); - // Make sure that the updateKey does also get queried - const targetCol = columns.find((column) => column.name === updateColumn.name); - if (!targetCol) { - columns.unshift(updateColumn); - } - else if (!targetCol.cast) { - targetCol.cast = updateColumn.cast || targetCol.cast; - } + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = additionalFields.mode ?? 'multiple' as string; + + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); // Prepare the data to update and copy it to be returned const columnNames = columns.map(column => column.name); - const updateItems = getItemCopy(items, columnNames); + const updateItems = getItemsCopy(items, columnNames); - const columnSet = new pgp.helpers.ColumnSet(columns); - - // Generate the multi-row update query - const query = - pgp.helpers.update(updateItems, columnSet, te) + ' WHERE v.' + updateColumn.name + ' = t.' + updateColumn.name; - - // Executing the query to update the data - await db.none(query); - - return updateItems; + const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string); + if (mode === 'multiple') { + const query = + pgp.helpers.update(updateItems, cs) + + ' WHERE ' + updateKeys.map(updateKey => { + const key = pgp.as.name(updateKey.name); + return 'v.' + key + ' = t.' + key; + }).join(' AND ') + + returning; + return await db.any(query); + } else { + const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey.name) + ' = ${' + updateKey.name + '}').join(' AND '); + if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + } + } + return result; + }); + } + } + throw new Error('multiple, independently or transaction are valid options'); } diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.ts index 9b62d57db5..ff30f77ecf 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.ts @@ -121,18 +121,6 @@ export class Postgres implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.
You can use type casting with colons (:) like id:int.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update @@ -147,7 +135,7 @@ export class Postgres implements INodeType { }, }, default: 'public', - required: true, + required: false, description: 'Name of the schema the table belongs to', }, { @@ -174,8 +162,7 @@ export class Postgres implements INodeType { }, default: 'id', required: true, - description: - 'Name of the property which decides which rows in the database should be updated. Normally that would be "id".', + description: 'Comma separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', }, { displayName: 'Columns', @@ -191,6 +178,62 @@ export class Postgres implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.
You can use type casting with colons (:) like id:int.', }, + + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // Additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -232,23 +275,19 @@ export class Postgres implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - ...insertItems[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { @@ -256,7 +295,7 @@ export class Postgres implements INodeType { // update // ---------------------------------- - const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items); + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); returnItems = this.helpers.returnJsonArray(updateItems); } else { diff --git a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts index e3f8b124bc..67a1ceca41 100644 --- a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts +++ b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts @@ -9,7 +9,10 @@ import { import * as pgPromise from 'pg-promise'; -import { pgQuery } from '../Postgres/Postgres.node.functions'; +import { + pgInsert, + pgQuery, +} from '../Postgres/Postgres.node.functions'; export class QuestDb implements INodeType { description: INodeTypeDescription = { @@ -81,7 +84,7 @@ export class QuestDb implements INodeType { { displayName: 'Schema', name: 'schema', - type: 'string', + type: 'hidden', // Schema is used by pgInsert displayOptions: { show: { operation: [ @@ -89,8 +92,7 @@ export class QuestDb implements INodeType { ], }, }, - default: 'public', - required: true, + default: '', description: 'Name of the schema the table belongs to', }, { @@ -108,10 +110,79 @@ export class QuestDb implements INodeType { required: true, description: 'Name of the table in which to insert data to.', }, + { + displayName: 'Columns', + name: 'columns', + type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '', + placeholder: 'id,name,description', + description: + 'Comma separated list of the properties which should used as columns for the new rows.', + }, { displayName: 'Return Fields', name: 'returnFields', type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + displayOptions: { + show: { + operation: [ + 'executeQuery', + ], + }, + }, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'independently', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'hidden', + default: {}, displayOptions: { show: { operation: [ @@ -119,8 +190,6 @@ export class QuestDb implements INodeType { ], }, }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', }, ], }; @@ -156,37 +225,30 @@ export class QuestDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = (additionalFields.mode || 'independently') as string; - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail(), mode); + + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const tableName = this.getNodeParameter('table', 0) as string; + + // Transaction and multiple won't work properly with QuestDB. + // So we send queries independently. + await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail(), 'independently'); + const returnFields = this.getNodeParameter('returnFields', 0) as string; + const table = this.getNodeParameter('table', 0) as string; - const queries : string[] = []; - items.map(item => { - const columns = Object.keys(item.json); - - const values : string = columns.map((col : string) => { - if (typeof item.json[col] === 'string') { - return `\'${item.json[col]}\'`; - } else { - return item.json[col]; - } - }).join(','); - - const query = `INSERT INTO ${tableName} (${columns.join(',')}) VALUES (${values});`; - queries.push(query); + const insertData = await db.any('SELECT ${columns:name} from ${table:name}', { + columns: returnFields.split(',').map(value => value.trim()).filter(value => !!value), + table, }); - await db.any(pgp.helpers.concat(queries)); - - const returnedItems = await db.any(`SELECT ${returnFields} from ${tableName}`); - - returnItems = this.helpers.returnJsonArray(returnedItems as IDataObject[]); + returnItems = this.helpers.returnJsonArray(insertData); } else { await pgp.end(); throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`); diff --git a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts index 72bf645efe..77ad53fb6a 100644 --- a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts +++ b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts @@ -137,20 +137,6 @@ export class TimescaleDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: [ - 'insert', - ], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update @@ -217,6 +203,61 @@ export class TimescaleDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.', }, + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -251,22 +292,20 @@ export class TimescaleDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { @@ -274,7 +313,7 @@ export class TimescaleDb implements INodeType { // update // ---------------------------------- - const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items); + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); returnItems = this.helpers.returnJsonArray(updateItems); diff --git a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js index bc3bcbb758..dd2b37ee30 100644 --- a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js +++ b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js @@ -8,12 +8,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'id,name' + columns: 'id,name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -21,10 +23,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db if updateKey is not in columns', async () => { @@ -33,12 +34,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'name' + columns: 'name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -48,8 +51,7 @@ describe('pgUpdate', () => { const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db with cast as updateKey', async () => { @@ -58,12 +60,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id:uuid', - columns: 'name' + columns: 'name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -71,10 +75,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db with cast in target columns', async () => { @@ -83,12 +86,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'id:uuid,name' + columns: 'id:uuid,name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -96,10 +101,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); }); @@ -113,11 +117,12 @@ describe('pgInsert', () => { schema: 'myschema', columns: 'id,name,age', returnFields: '*', + additionalFields: {}, }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const manyOrNone = jest.fn(); - const db = {manyOrNone}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -125,10 +130,9 @@ describe('pgInsert', () => { }, ]; - const results = await PostgresFun.pgInsert(getNodeParam, pgp, db, items); + await PostgresFun.pgInsert(getNodeParam, pgp, db, items); - expect(db.manyOrNone).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234,'test',34) RETURNING *`); - expect(results).toEqual([undefined, [insertItem]]); + expect(db.any).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234,'test',34) RETURNING *`); }); it('runs query to insert with type casting', async () => { @@ -138,11 +142,12 @@ describe('pgInsert', () => { schema: 'myschema', columns: 'id:int,name:text,age', returnFields: '*', + additionalFields: {}, }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const manyOrNone = jest.fn(); - const db = {manyOrNone}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -150,9 +155,8 @@ describe('pgInsert', () => { }, ]; - const results = await PostgresFun.pgInsert(getNodeParam, pgp, db, items); + await PostgresFun.pgInsert(getNodeParam, pgp, db, items); - expect(db.manyOrNone).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234::int,'test'::text,34) RETURNING *`); - expect(results).toEqual([undefined, [insertItem]]); + expect(db.any).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234::int,'test'::text,34) RETURNING *`); }); }); diff --git a/packages/nodes-base/tsconfig.json b/packages/nodes-base/tsconfig.json index b7e291996e..cbcbf98937 100644 --- a/packages/nodes-base/tsconfig.json +++ b/packages/nodes-base/tsconfig.json @@ -1,7 +1,8 @@ { "compilerOptions": { "lib": [ - "es2017" + "es2017", + "es2019.array" ], "types": [ "node",