diff --git a/packages/cli/BREAKING-CHANGES.md b/packages/cli/BREAKING-CHANGES.md index aeb4f7e275..5435714e8e 100644 --- a/packages/cli/BREAKING-CHANGES.md +++ b/packages/cli/BREAKING-CHANGES.md @@ -2,6 +2,26 @@ This list shows all the versions which include breaking changes and how to upgrade. +## 0.117.0 + +### What changed? + +Changed the behavior for nodes that use Postgres Wire Protocol: Postgres, QuestDB, CrateDB and TimescaleDB. + +All nodes have been standardized and now follow the same patterns. Behavior will be the same for most cases, but new added functionality can now be explored. + +You can now also inform how you would like n8n to execute queries. Default mode is `Multiple queries` which translates to previous behavior, but you can now run them `Independently` or `Transaction`. Also, `Continue on Fail` now plays a major role for the new modes. + +The node output for `insert` operations now rely on the new parameter `Return fields`, just like `update` operations did previously. + +### When is action necessary? + +If you rely on the output returned by `insert` operations for any of the mentioned nodes, we recommend you review your workflows. + +By default, all `insert` operations will have `Return fields: *` as the default, setting, returning all information inserted. + +Previously, the node would return all information it received, without taking into account what actually happened in the database. + ## 0.113.0 ### What changed? diff --git a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts index a2577282b8..da74abdd6c 100644 --- a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts +++ b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts @@ -8,9 +8,12 @@ import { } from 'n8n-workflow'; import { + generateReturning, getItemCopy, + getItemsCopy, pgInsert, pgQuery, + pgUpdate, } from '../Postgres/Postgres.node.functions'; import * as pgPromise from 'pg-promise'; @@ -125,22 +128,23 @@ export class CrateDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update // ---------------------------------- + { + displayName: 'Schema', + name: 'schema', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: 'doc', + required: true, + description: 'Name of the schema the table belongs to', + }, { displayName: 'Table', name: 'table', @@ -166,7 +170,7 @@ export class CrateDb implements INodeType { default: 'id', required: true, description: - 'Name of the property which decides which rows in the database should be updated. Normally that would be "id".', + 'Comma separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', }, { displayName: 'Columns', @@ -182,6 +186,57 @@ export class CrateDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.', }, + + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -206,7 +261,7 @@ export class CrateDb implements INodeType { const db = pgp(config); - let returnItems = []; + let returnItems: INodeExecutionData[] = []; const items = this.getInputData(); const operation = this.getNodeParameter('operation', 0) as string; @@ -216,66 +271,68 @@ export class CrateDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - ...insertItems[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { // ---------------------------------- // update // ---------------------------------- - const tableName = this.getNodeParameter('table', 0) as string; - const updateKey = this.getNodeParameter('updateKey', 0) as string; - const queries : string[] = []; - const updatedKeys : string[] = []; - let updateKeyValue : string | number; - let columns : string[] = []; + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = additionalFields.mode ?? 'multiple' as string; - items.map(item => { - const setOperations : string[] = []; - columns = Object.keys(item.json); - columns.map((col : string) => { - if (col !== updateKey) { - if (typeof item.json[col] === 'string') { - setOperations.push(`${col} = \'${item.json[col]}\'`); - } else { - setOperations.push(`${col} = ${item.json[col]}`); - } + if(mode === 'independently') { + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); + + returnItems = this.helpers.returnJsonArray(updateItems); + } else if(mode === 'multiple') { + // Crate db does not support multiple-update queries + // Therefore we cannot invoke `pgUpdate` using multiple mode + // so we have to call multiple updates manually here + + const table = this.getNodeParameter('table', 0) as string; + const schema = this.getNodeParameter('schema', 0) as string; + const updateKeys = (this.getNodeParameter('updateKey', 0) as string).split(',').map(column => column.trim()); + const columns = (this.getNodeParameter('columns', 0) as string).split(',').map(column => column.trim()); + const queryColumns = columns.slice(); + + updateKeys.forEach(updateKey => { + if (!queryColumns.includes(updateKey)) { + columns.unshift(updateKey); + queryColumns.unshift('?' + updateKey); } }); - updateKeyValue = item.json[updateKey] as string | number; + const cs = new pgp.helpers.ColumnSet(queryColumns, { table: { table, schema } }); - if (updateKeyValue === undefined) { - throw new NodeOperationError(this.getNode(), 'No value found for update key!'); + const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey) + ' = ${' + updateKey + '}').join(' AND '); + // updateKeyValue = item.json[updateKey] as string | number; + // if (updateKeyValue === undefined) { + // throw new NodeOperationError(this.getNode(), 'No value found for update key!'); + // } + + const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string); + const queries:string[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columns); + queries.push(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning); } - - updatedKeys.push(updateKeyValue as string); - - const query = `UPDATE "${tableName}" SET ${setOperations.join(',')} WHERE ${updateKey} = ${updateKeyValue};`; - queries.push(query); - }); - - - await db.any(pgp.helpers.concat(queries)); - - returnItems = this.helpers.returnJsonArray(getItemCopy(items, columns) as IDataObject[]); + const updateItems = await db.multi(pgp.helpers.concat(queries)); + returnItems = this.helpers.returnJsonArray(getItemsCopy(items, columns) as IDataObject[]); + } } else { await pgp.end(); throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`); diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts index df1f10cbbc..30cf358579 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts @@ -3,29 +3,50 @@ import pgPromise = require('pg-promise'); import pg = require('pg-promise/typescript/pg-subset'); /** - * Returns of copy of the items which only contains the json data and + * Returns of a shallow copy of the items which only contains the json data and * of that only the define properties * * @param {INodeExecutionData[]} items The items to copy * @param {string[]} properties The properties it should include * @returns */ -export function getItemCopy(items: INodeExecutionData[], properties: string[]): IDataObject[] { - // Prepare the data to insert and copy it to be returned +export function getItemsCopy(items: INodeExecutionData[], properties: string[]): IDataObject[] { let newItem: IDataObject; return items.map(item => { newItem = {}; for (const property of properties) { - if (item.json[property] === undefined) { - newItem[property] = null; - } else { - newItem[property] = JSON.parse(JSON.stringify(item.json[property])); - } + newItem[property] = item.json[property]; } return newItem; }); } +/** + * Returns of a shallow copy of the item which only contains the json data and + * of that only the define properties + * + * @param {INodeExecutionData} item The item to copy + * @param {string[]} properties The properties it should include + * @returns + */ +export function getItemCopy(item: INodeExecutionData, properties: string[]): IDataObject { + const newItem: IDataObject = {}; + for (const property of properties) { + newItem[property] = item.json[property]; + } + return newItem; +} + +/** + * Returns a returning clause from a comma separated string + * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance + * @param string returning The comma separated string + * @returns string + */ +export function generateReturning(pgp: pgPromise.IMain<{}, pg.IClient>, returning: string): string { + return ' RETURNING ' + returning.split(',').map(returnedField => pgp.as.name(returnedField.trim())).join(', '); +} + /** * Executes the given SQL query on the database. * @@ -33,20 +54,53 @@ export function getItemCopy(items: INodeExecutionData[], properties: string[]): * @param {pgPromise.IMain<{}, pg.IClient>} pgp The pgPromise instance * @param {pgPromise.IDatabase<{}, pg.IClient>} db The pgPromise database connection * @param {input[]} input The Node's input data - * @returns Promise> + * @returns Promise> */ -export function pgQuery( +export async function pgQuery( getNodeParam: Function, pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, input: INodeExecutionData[], -): Promise { - const queries: string[] = []; - for (let i = 0; i < input.length; i++) { - queries.push(getNodeParam('query', i) as string); + continueOnFail: boolean, + overrideMode?: string, +): Promise { + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; + if (mode === 'multiple') { + const queries: string[] = []; + for (let i = 0; i < input.length; i++) { + queries.push(getNodeParam('query', i) as string); + } + return (await db.multi(pgp.helpers.concat(queries))).flat(1); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < input.length; i++) { + try { + Array.prototype.push.apply(result, await t.any(getNodeParam('query', i) as string)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...input[i].json, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < input.length; i++) { + try { + Array.prototype.push.apply(result, await t.any(getNodeParam('query', i) as string)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...input[i].json, code: err.code, message: err.message }); + } + } + return result; + }); } - - return db.any(pgp.helpers.concat(queries)); + throw new Error('multiple, independently or transaction are valid options'); } /** @@ -63,33 +117,63 @@ export async function pgInsert( pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, items: INodeExecutionData[], -): Promise { + continueOnFail: boolean, + overrideMode?: string, +): Promise { const table = getNodeParam('table', 0) as string; const schema = getNodeParam('schema', 0) as string; - let returnFields = (getNodeParam('returnFields', 0) as string).split(',') as string[]; const columnString = getNodeParam('columns', 0) as string; const columns = columnString.split(',') .map(column => column.trim().split(':')) .map(([name, cast]) => ({ name, cast })); - - const te = new pgp.helpers.TableName({ table, schema }); - - // Prepare the data to insert and copy it to be returned const columnNames = columns.map(column => column.name); - const insertItems = getItemCopy(items, columnNames); - const columnSet = new pgp.helpers.ColumnSet(columns); + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); - // Generate the multi-row insert query and return the id of new row - returnFields = returnFields.map(value => value.trim()).filter(value => !!value); - const query = - pgp.helpers.insert(insertItems, columnSet, te) + - (returnFields.length ? ` RETURNING ${returnFields.join(',')}` : ''); + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = overrideMode ? overrideMode : (additionalFields.mode ?? 'multiple') as string; - // Executing the query to insert the data - const insertData = await db.manyOrNone(query); + const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string); + if (mode === 'multiple') { + const query = pgp.helpers.insert(getItemsCopy(items, columnNames), cs) + returning; + return db.any(query); + } else if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + result.push(await t.one(pgp.helpers.insert(itemCopy, cs) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + const insertResult = await t.oneOrNone(pgp.helpers.insert(itemCopy, cs) + returning); + if (insertResult !== null) { + result.push(insertResult); + } + } catch (err) { + if (continueOnFail === false) { + throw err; + } + result.push({ ...itemCopy, code: err.code, message: err.message }); + } + } + return result; + }); + } - return [insertData, insertItems]; + throw new Error('multiple, independently or transaction are valid options'); } /** @@ -106,45 +190,80 @@ export async function pgUpdate( pgp: pgPromise.IMain<{}, pg.IClient>, db: pgPromise.IDatabase<{}, pg.IClient>, items: INodeExecutionData[], + continueOnFail = false, ): Promise { const table = getNodeParam('table', 0) as string; const schema = getNodeParam('schema', 0) as string; const updateKey = getNodeParam('updateKey', 0) as string; const columnString = getNodeParam('columns', 0) as string; - - const [updateColumnName, updateColumnCast] = updateKey.split(':'); - const updateColumn = { - name: updateColumnName, - cast: updateColumnCast, - }; - const columns = columnString.split(',') .map(column => column.trim().split(':')) .map(([name, cast]) => ({ name, cast })); - const te = new pgp.helpers.TableName({ table, schema }); + const updateKeys = updateKey.split(',').map(key => { + const [name, cast] = key.trim().split(':'); + const updateColumn = { name, cast }; + const targetCol = columns.find((column) => column.name === name); + if (!targetCol) { + columns.unshift(updateColumn); + } + else if (!targetCol.cast) { + targetCol.cast = updateColumn.cast || targetCol.cast; + } + return updateColumn; + }); - // Make sure that the updateKey does also get queried - const targetCol = columns.find((column) => column.name === updateColumn.name); - if (!targetCol) { - columns.unshift(updateColumn); - } - else if (!targetCol.cast) { - targetCol.cast = updateColumn.cast || targetCol.cast; - } + const additionalFields = getNodeParam('additionalFields', 0) as IDataObject; + const mode = additionalFields.mode ?? 'multiple' as string; + + const cs = new pgp.helpers.ColumnSet(columns, { table: { table, schema } }); // Prepare the data to update and copy it to be returned const columnNames = columns.map(column => column.name); - const updateItems = getItemCopy(items, columnNames); + const updateItems = getItemsCopy(items, columnNames); - const columnSet = new pgp.helpers.ColumnSet(columns); - - // Generate the multi-row update query - const query = - pgp.helpers.update(updateItems, columnSet, te) + ' WHERE v.' + updateColumn.name + ' = t.' + updateColumn.name; - - // Executing the query to update the data - await db.none(query); - - return updateItems; + const returning = generateReturning(pgp, getNodeParam('returnFields', 0) as string); + if (mode === 'multiple') { + const query = + pgp.helpers.update(updateItems, cs) + + ' WHERE ' + updateKeys.map(updateKey => { + const key = pgp.as.name(updateKey.name); + return 'v.' + key + ' = t.' + key; + }).join(' AND ') + + returning; + return await db.any(query); + } else { + const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey.name) + ' = ${' + updateKey.name + '}').join(' AND '); + if (mode === 'transaction') { + return db.tx(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + return result; + } + } + return result; + }); + } else if (mode === 'independently') { + return db.task(async t => { + const result: IDataObject[] = []; + for (let i = 0; i < items.length; i++) { + const itemCopy = getItemCopy(items[i], columnNames); + try { + Array.prototype.push.apply(result, await t.any(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning)); + } catch (err) { + if (continueOnFail === false) throw err; + result.push({ ...itemCopy, code: err.code, message: err.message }); + } + } + return result; + }); + } + } + throw new Error('multiple, independently or transaction are valid options'); } diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.ts index 9b62d57db5..ff30f77ecf 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.ts @@ -121,18 +121,6 @@ export class Postgres implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.
You can use type casting with colons (:) like id:int.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update @@ -147,7 +135,7 @@ export class Postgres implements INodeType { }, }, default: 'public', - required: true, + required: false, description: 'Name of the schema the table belongs to', }, { @@ -174,8 +162,7 @@ export class Postgres implements INodeType { }, default: 'id', required: true, - description: - 'Name of the property which decides which rows in the database should be updated. Normally that would be "id".', + description: 'Comma separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', }, { displayName: 'Columns', @@ -191,6 +178,62 @@ export class Postgres implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.
You can use type casting with colons (:) like id:int.', }, + + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // Additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -232,23 +275,19 @@ export class Postgres implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - ...insertItems[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { @@ -256,7 +295,7 @@ export class Postgres implements INodeType { // update // ---------------------------------- - const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items); + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); returnItems = this.helpers.returnJsonArray(updateItems); } else { diff --git a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts index e3f8b124bc..67a1ceca41 100644 --- a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts +++ b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts @@ -9,7 +9,10 @@ import { import * as pgPromise from 'pg-promise'; -import { pgQuery } from '../Postgres/Postgres.node.functions'; +import { + pgInsert, + pgQuery, +} from '../Postgres/Postgres.node.functions'; export class QuestDb implements INodeType { description: INodeTypeDescription = { @@ -81,7 +84,7 @@ export class QuestDb implements INodeType { { displayName: 'Schema', name: 'schema', - type: 'string', + type: 'hidden', // Schema is used by pgInsert displayOptions: { show: { operation: [ @@ -89,8 +92,7 @@ export class QuestDb implements INodeType { ], }, }, - default: 'public', - required: true, + default: '', description: 'Name of the schema the table belongs to', }, { @@ -108,10 +110,79 @@ export class QuestDb implements INodeType { required: true, description: 'Name of the table in which to insert data to.', }, + { + displayName: 'Columns', + name: 'columns', + type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '', + placeholder: 'id,name,description', + description: + 'Comma separated list of the properties which should used as columns for the new rows.', + }, { displayName: 'Return Fields', name: 'returnFields', type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + displayOptions: { + show: { + operation: [ + 'executeQuery', + ], + }, + }, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'independently', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'hidden', + default: {}, displayOptions: { show: { operation: [ @@ -119,8 +190,6 @@ export class QuestDb implements INodeType { ], }, }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', }, ], }; @@ -156,37 +225,30 @@ export class QuestDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; + const mode = (additionalFields.mode || 'independently') as string; - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail(), mode); + + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const tableName = this.getNodeParameter('table', 0) as string; + + // Transaction and multiple won't work properly with QuestDB. + // So we send queries independently. + await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail(), 'independently'); + const returnFields = this.getNodeParameter('returnFields', 0) as string; + const table = this.getNodeParameter('table', 0) as string; - const queries : string[] = []; - items.map(item => { - const columns = Object.keys(item.json); - - const values : string = columns.map((col : string) => { - if (typeof item.json[col] === 'string') { - return `\'${item.json[col]}\'`; - } else { - return item.json[col]; - } - }).join(','); - - const query = `INSERT INTO ${tableName} (${columns.join(',')}) VALUES (${values});`; - queries.push(query); + const insertData = await db.any('SELECT ${columns:name} from ${table:name}', { + columns: returnFields.split(',').map(value => value.trim()).filter(value => !!value), + table, }); - await db.any(pgp.helpers.concat(queries)); - - const returnedItems = await db.any(`SELECT ${returnFields} from ${tableName}`); - - returnItems = this.helpers.returnJsonArray(returnedItems as IDataObject[]); + returnItems = this.helpers.returnJsonArray(insertData); } else { await pgp.end(); throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`); diff --git a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts index 72bf645efe..77ad53fb6a 100644 --- a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts +++ b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts @@ -137,20 +137,6 @@ export class TimescaleDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for the new rows.', }, - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - displayOptions: { - show: { - operation: [ - 'insert', - ], - }, - }, - default: '*', - description: 'Comma separated list of the fields that the operation will return', - }, // ---------------------------------- // update @@ -217,6 +203,61 @@ export class TimescaleDb implements INodeType { description: 'Comma separated list of the properties which should used as columns for rows to update.', }, + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma separated list of the fields that the operation will return', + }, + // ---------------------------------- + // additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'multiple', + description: [ + 'The way queries should be sent to database.', + 'Can be used in conjunction with Continue on Fail.', + 'See the docs for more examples', + ].join('
'), + }, + ], + }, ], }; @@ -251,22 +292,20 @@ export class TimescaleDb implements INodeType { // executeQuery // ---------------------------------- - const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items); + const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail()); - returnItems = this.helpers.returnJsonArray(queryResult as IDataObject[]); + returnItems = this.helpers.returnJsonArray(queryResult); } else if (operation === 'insert') { // ---------------------------------- // insert // ---------------------------------- - const [insertData, insertItems] = await pgInsert(this.getNodeParameter, pgp, db, items); + const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail()); // Add the id to the data for (let i = 0; i < insertData.length; i++) { returnItems.push({ - json: { - ...insertData[i], - }, + json: insertData[i], }); } } else if (operation === 'update') { @@ -274,7 +313,7 @@ export class TimescaleDb implements INodeType { // update // ---------------------------------- - const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items); + const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail()); returnItems = this.helpers.returnJsonArray(updateItems); diff --git a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js index bc3bcbb758..dd2b37ee30 100644 --- a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js +++ b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js @@ -8,12 +8,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'id,name' + columns: 'id,name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -21,10 +23,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db if updateKey is not in columns', async () => { @@ -33,12 +34,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'name' + columns: 'name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -48,8 +51,7 @@ describe('pgUpdate', () => { const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values(1234,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db with cast as updateKey', async () => { @@ -58,12 +60,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id:uuid', - columns: 'name' + columns: 'name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -71,10 +75,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); it('runs query to update db with cast in target columns', async () => { @@ -83,12 +86,14 @@ describe('pgUpdate', () => { table: 'mytable', schema: 'myschema', updateKey: 'id', - columns: 'id:uuid,name' + columns: 'id:uuid,name', + additionalFields: {}, + returnFields: '*', }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const none = jest.fn(); - const db = {none}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -96,10 +101,9 @@ describe('pgUpdate', () => { } ]; - const results = await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) + await PostgresFun.pgUpdate(getNodeParam, pgp, db, items) - expect(db.none).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.id = t.id`); - expect(results).toEqual([updateItem]); + expect(db.any).toHaveBeenCalledWith(`update \"myschema\".\"mytable\" as t set \"id\"=v.\"id\",\"name\"=v.\"name\" from (values('1234'::uuid,'test')) as v(\"id\",\"name\") WHERE v.\"id\" = t.\"id\" RETURNING *`); }); }); @@ -113,11 +117,12 @@ describe('pgInsert', () => { schema: 'myschema', columns: 'id,name,age', returnFields: '*', + additionalFields: {}, }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const manyOrNone = jest.fn(); - const db = {manyOrNone}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -125,10 +130,9 @@ describe('pgInsert', () => { }, ]; - const results = await PostgresFun.pgInsert(getNodeParam, pgp, db, items); + await PostgresFun.pgInsert(getNodeParam, pgp, db, items); - expect(db.manyOrNone).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234,'test',34) RETURNING *`); - expect(results).toEqual([undefined, [insertItem]]); + expect(db.any).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234,'test',34) RETURNING *`); }); it('runs query to insert with type casting', async () => { @@ -138,11 +142,12 @@ describe('pgInsert', () => { schema: 'myschema', columns: 'id:int,name:text,age', returnFields: '*', + additionalFields: {}, }; const getNodeParam = (key) => nodeParams[key]; const pgp = pgPromise(); - const manyOrNone = jest.fn(); - const db = {manyOrNone}; + const any = jest.fn(); + const db = {any}; const items = [ { @@ -150,9 +155,8 @@ describe('pgInsert', () => { }, ]; - const results = await PostgresFun.pgInsert(getNodeParam, pgp, db, items); + await PostgresFun.pgInsert(getNodeParam, pgp, db, items); - expect(db.manyOrNone).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234::int,'test'::text,34) RETURNING *`); - expect(results).toEqual([undefined, [insertItem]]); + expect(db.any).toHaveBeenCalledWith(`insert into \"myschema\".\"mytable\"(\"id\",\"name\",\"age\") values(1234::int,'test'::text,34) RETURNING *`); }); }); diff --git a/packages/nodes-base/tsconfig.json b/packages/nodes-base/tsconfig.json index b7e291996e..cbcbf98937 100644 --- a/packages/nodes-base/tsconfig.json +++ b/packages/nodes-base/tsconfig.json @@ -1,7 +1,8 @@ { "compilerOptions": { "lib": [ - "es2017" + "es2017", + "es2019.array" ], "types": [ "node",