From 07dc0e4b4075f1fac98d5685a99a38187bca741b Mon Sep 17 00:00:00 2001
From: Michael Kret <88898367+michael-radency@users.noreply.github.com>
Date: Mon, 3 Apr 2023 18:18:01 +0300
Subject: [PATCH] feat(Postgres Node): Overhaul node
---
.../credentials/Postgres.credentials.ts | 115 +++
.../nodes-base/nodes/CrateDb/CrateDb.node.ts | 2 +-
.../nodes/Postgres/Postgres.node.ts | 418 +--------
.../nodes/Postgres/test/v2/operations.test.ts | 817 ++++++++++++++++++
.../nodes/Postgres/test/v2/runQueries.test.ts | 53 ++
.../nodes/Postgres/test/v2/utils.test.ts | 375 ++++++++
.../nodes/Postgres/v1/PostgresV1.node.ts | 424 +++++++++
.../genericFunctions.ts} | 0
.../nodes/Postgres/v2/PostgresV2.node.ts | 29 +
.../v2/actions/common.descriptions.ts | 335 +++++++
.../v2/actions/database/Database.resource.ts | 74 ++
.../actions/database/deleteTable.operation.ts | 159 ++++
.../database/executeQuery.operation.ts | 84 ++
.../v2/actions/database/insert.operation.ts | 172 ++++
.../v2/actions/database/select.operation.ts | 134 +++
.../v2/actions/database/update.operation.ts | 216 +++++
.../v2/actions/database/upsert.operation.ts | 217 +++++
.../nodes/Postgres/v2/actions/node.type.ts | 9 +
.../nodes/Postgres/v2/actions/router.ts | 67 ++
.../Postgres/v2/actions/versionDescription.ts | 42 +
.../nodes/Postgres/v2/helpers/interfaces.ts | 37 +
.../nodes/Postgres/v2/helpers/utils.ts | 360 ++++++++
.../Postgres/v2/methods/credentialTest.ts | 68 ++
.../nodes/Postgres/v2/methods/index.ts | 3 +
.../nodes/Postgres/v2/methods/listSearch.ts | 47 +
.../nodes/Postgres/v2/methods/loadOptions.ts | 46 +
.../nodes/Postgres/v2/transport/index.ts | 209 +++++
.../nodes-base/nodes/QuestDb/QuestDb.node.ts | 2 +-
.../nodes/TimescaleDb/TimescaleDb.node.ts | 2 +-
.../Postgres/Postgres.node.functions.test.js | 2 +-
30 files changed, 4114 insertions(+), 404 deletions(-)
create mode 100644 packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts
create mode 100644 packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts
create mode 100644 packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts
rename packages/nodes-base/nodes/Postgres/{Postgres.node.functions.ts => v1/genericFunctions.ts} (100%)
create mode 100644 packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/router.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/methods/index.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
create mode 100644 packages/nodes-base/nodes/Postgres/v2/transport/index.ts
diff --git a/packages/nodes-base/credentials/Postgres.credentials.ts b/packages/nodes-base/credentials/Postgres.credentials.ts
index e217b41c05..caf139e3d1 100644
--- a/packages/nodes-base/credentials/Postgres.credentials.ts
+++ b/packages/nodes-base/credentials/Postgres.credentials.ts
@@ -81,5 +81,120 @@ export class Postgres implements ICredentialType {
type: 'number',
default: 5432,
},
+ {
+ displayName: 'SSH Tunnel',
+ name: 'sshTunnel',
+ type: 'boolean',
+ default: false,
+ },
+ {
+ displayName: 'SSH Authenticate with',
+ name: 'sshAuthenticateWith',
+ type: 'options',
+ default: 'password',
+ options: [
+ {
+ name: 'Password',
+ value: 'password',
+ },
+ {
+ name: 'Private Key',
+ value: 'privateKey',
+ },
+ ],
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ },
+ },
+ },
+ {
+ displayName: 'SSH Host',
+ name: 'sshHost',
+ type: 'string',
+ default: 'localhost',
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ },
+ },
+ },
+ {
+ displayName: 'SSH Port',
+ name: 'sshPort',
+ type: 'number',
+ default: 22,
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ },
+ },
+ },
+ {
+ displayName: 'SSH Postgres Port',
+ name: 'sshPostgresPort',
+ type: 'number',
+ default: 5432,
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ },
+ },
+ },
+ {
+ displayName: 'SSH User',
+ name: 'sshUser',
+ type: 'string',
+ default: 'root',
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ },
+ },
+ },
+ {
+ displayName: 'SSH Password',
+ name: 'sshPassword',
+ type: 'string',
+ typeOptions: {
+ password: true,
+ },
+ default: '',
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ sshAuthenticateWith: ['password'],
+ },
+ },
+ },
+ {
+ displayName: 'Private Key',
+ name: 'privateKey',
+ type: 'string',
+ typeOptions: {
+ rows: 4,
+ password: true,
+ },
+ default: '',
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ sshAuthenticateWith: ['privateKey'],
+ },
+ },
+ },
+ {
+ displayName: 'Passphrase',
+ name: 'passphrase',
+ type: 'string',
+ default: '',
+ description: 'Passphase used to create the key, if no passphase was used leave empty',
+ displayOptions: {
+ show: {
+ sshTunnel: [true],
+ sshAuthenticateWith: ['privateKey'],
+ },
+ },
+ },
];
}
diff --git a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts
index 489bbdb617..a11ff18670 100644
--- a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts
+++ b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts
@@ -13,7 +13,7 @@ import {
pgInsert,
pgQuery,
pgUpdate,
-} from '../Postgres/Postgres.node.functions';
+} from '../Postgres/v1/genericFunctions';
import pgPromise from 'pg-promise';
diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.ts
index ed1ca9171b..69aee12ada 100644
--- a/packages/nodes-base/nodes/Postgres/Postgres.node.ts
+++ b/packages/nodes-base/nodes/Postgres/Postgres.node.ts
@@ -1,407 +1,25 @@
-import type {
- IExecuteFunctions,
- ICredentialsDecrypted,
- ICredentialTestFunctions,
- IDataObject,
- INodeCredentialTestResult,
- INodeExecutionData,
- INodeType,
- INodeTypeDescription,
-} from 'n8n-workflow';
-import { NodeOperationError } from 'n8n-workflow';
+import type { INodeTypeBaseDescription, IVersionedNodeType } from 'n8n-workflow';
+import { VersionedNodeType } from 'n8n-workflow';
-import pgPromise from 'pg-promise';
+import { PostgresV1 } from './v1/PostgresV1.node';
+import { PostgresV2 } from './v2/PostgresV2.node';
-import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './Postgres.node.functions';
-
-export class Postgres implements INodeType {
- description: INodeTypeDescription = {
- displayName: 'Postgres',
- name: 'postgres',
- icon: 'file:postgres.svg',
- group: ['input'],
- version: 1,
- description: 'Get, add and update data in Postgres',
- defaults: {
- name: 'Postgres',
- },
- inputs: ['main'],
- outputs: ['main'],
- credentials: [
- {
- name: 'postgres',
- required: true,
- testedBy: 'postgresConnectionTest',
- },
- ],
- properties: [
- {
- displayName: 'Operation',
- name: 'operation',
- type: 'options',
- noDataExpression: true,
- options: [
- {
- name: 'Execute Query',
- value: 'executeQuery',
- description: 'Execute an SQL query',
- action: 'Execute a SQL query',
- },
- {
- name: 'Insert',
- value: 'insert',
- description: 'Insert rows in database',
- action: 'Insert rows in database',
- },
- {
- name: 'Update',
- value: 'update',
- description: 'Update rows in database',
- action: 'Update rows in database',
- },
- ],
- default: 'insert',
- },
-
- // ----------------------------------
- // executeQuery
- // ----------------------------------
- {
- displayName: 'Query',
- name: 'query',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['executeQuery'],
- },
- },
- default: '',
- placeholder: 'SELECT id, name FROM product WHERE quantity > $1 AND price <= $2',
- required: true,
- description:
- 'The SQL query to execute. You can use n8n expressions or $1 and $2 in conjunction with query parameters.',
- },
- // ----------------------------------
- // insert
- // ----------------------------------
- {
- displayName: 'Schema',
- name: 'schema',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['insert'],
- },
- },
- default: 'public',
- required: true,
- description: 'Name of the schema the table belongs to',
- },
- {
- displayName: 'Table',
- name: 'table',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['insert'],
- },
- },
- default: '',
- required: true,
- description: 'Name of the table in which to insert data to',
- },
- {
- displayName: 'Columns',
- name: 'columns',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['insert'],
- },
- },
- default: '',
- // eslint-disable-next-line n8n-nodes-base/node-param-placeholder-miscased-id
- placeholder: 'id:int,name:text,description',
- // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
- description:
- 'Comma-separated list of the properties which should used as columns for the new rows. You can use type casting with colons (:) like id:int.',
- },
-
- // ----------------------------------
- // update
- // ----------------------------------
- {
- displayName: 'Schema',
- name: 'schema',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['update'],
- },
- },
- default: 'public',
- description: 'Name of the schema the table belongs to',
- },
- {
- displayName: 'Table',
- name: 'table',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['update'],
- },
- },
- default: '',
- required: true,
- description: 'Name of the table in which to update data in',
- },
- {
- displayName: 'Update Key',
- name: 'updateKey',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['update'],
- },
- },
- default: 'id',
- required: true,
- // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
- description:
- 'Comma-separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".',
- },
- {
- displayName: 'Columns',
- name: 'columns',
- type: 'string',
- displayOptions: {
- show: {
- operation: ['update'],
- },
- },
- default: '',
- placeholder: 'name:text,description',
- // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
- description:
- 'Comma-separated list of the properties which should used as columns for rows to update. You can use type casting with colons (:) like id:int.',
- },
-
- // ----------------------------------
- // insert,update
- // ----------------------------------
- {
- displayName: 'Return Fields',
- name: 'returnFields',
- type: 'string',
- requiresDataPath: 'multiple',
- displayOptions: {
- show: {
- operation: ['insert', 'update'],
- },
- },
- default: '*',
- description: 'Comma-separated list of the fields that the operation will return',
- },
- // ----------------------------------
- // Additional fields
- // ----------------------------------
- {
- displayName: 'Additional Fields',
- name: 'additionalFields',
- type: 'collection',
- placeholder: 'Add Field',
- default: {},
- options: [
- {
- displayName: 'Mode',
- name: 'mode',
- type: 'options',
- options: [
- {
- name: 'Independently',
- value: 'independently',
- description: 'Execute each query independently',
- },
- {
- name: 'Multiple Queries',
- value: 'multiple',
- description: 'Default. Sends multiple queries at once to database.',
- },
- {
- name: 'Transaction',
- value: 'transaction',
- description: 'Executes all queries in a single transaction',
- },
- ],
- default: 'multiple',
- description:
- 'The way queries should be sent to database. Can be used in conjunction with Continue on Fail. See the docs for more examples',
- },
- {
- displayName: 'Output Large-Format Numbers As',
- name: 'largeNumbersOutput',
- type: 'options',
- options: [
- {
- name: 'Numbers',
- value: 'numbers',
- },
- {
- name: 'Text',
- value: 'text',
- description:
- 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)',
- },
- ],
- hint: 'Applies to NUMERIC and BIGINT columns only',
- default: 'text',
- },
- {
- displayName: 'Query Parameters',
- name: 'queryParams',
- type: 'string',
- displayOptions: {
- show: {
- '/operation': ['executeQuery'],
- },
- },
- default: '',
- placeholder: 'quantity,price',
- description:
- 'Comma-separated list of properties which should be used as query parameters',
- },
- ],
- },
- ],
- };
-
- methods = {
- credentialTest: {
- async postgresConnectionTest(
- this: ICredentialTestFunctions,
- credential: ICredentialsDecrypted,
- ): Promise {
- const credentials = credential.data as IDataObject;
- try {
- const pgp = pgPromise();
- const config: IDataObject = {
- host: credentials.host as string,
- port: credentials.port as number,
- database: credentials.database as string,
- user: credentials.user as string,
- password: credentials.password as string,
- };
-
- if (credentials.allowUnauthorizedCerts === true) {
- config.ssl = {
- rejectUnauthorized: false,
- };
- } else {
- config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
- config.sslmode = (credentials.ssl as string) || 'disable';
- }
-
- const db = pgp(config);
- await db.connect();
- pgp.end();
- } catch (error) {
- return {
- status: 'Error',
- message: error.message,
- };
- }
- return {
- status: 'OK',
- message: 'Connection successful!',
- };
- },
- },
- };
-
- async execute(this: IExecuteFunctions): Promise {
- const credentials = await this.getCredentials('postgres');
- const largeNumbersOutput = this.getNodeParameter(
- 'additionalFields.largeNumbersOutput',
- 0,
- '',
- ) as string;
-
- const pgp = pgPromise();
-
- if (largeNumbersOutput === 'numbers') {
- pgp.pg.types.setTypeParser(20, (value: string) => {
- return parseInt(value, 10);
- });
- pgp.pg.types.setTypeParser(1700, (value: string) => {
- return parseFloat(value);
- });
- }
-
- const config: IDataObject = {
- host: credentials.host as string,
- port: credentials.port as number,
- database: credentials.database as string,
- user: credentials.user as string,
- password: credentials.password as string,
+export class Postgres extends VersionedNodeType {
+ constructor() {
+ const baseDescription: INodeTypeBaseDescription = {
+ displayName: 'Postgres',
+ name: 'postgres',
+ icon: 'file:postgres.svg',
+ group: ['input'],
+ defaultVersion: 2,
+ description: 'Get, add and update data in Postgres',
};
- if (credentials.allowUnauthorizedCerts === true) {
- config.ssl = {
- rejectUnauthorized: false,
- };
- } else {
- config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
- config.sslmode = (credentials.ssl as string) || 'disable';
- }
+ const nodeVersions: IVersionedNodeType['nodeVersions'] = {
+ 1: new PostgresV1(baseDescription),
+ 2: new PostgresV2(baseDescription),
+ };
- const db = pgp(config);
-
- let returnItems: INodeExecutionData[] = [];
-
- const items = this.getInputData();
- const operation = this.getNodeParameter('operation', 0);
-
- if (operation === 'executeQuery') {
- // ----------------------------------
- // executeQuery
- // ----------------------------------
-
- const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail());
- returnItems = queryResult as INodeExecutionData[];
- } else if (operation === 'insert') {
- // ----------------------------------
- // insert
- // ----------------------------------
-
- const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail());
-
- // returnItems = this.helpers.returnJsonArray(insertData);
- returnItems = insertData as INodeExecutionData[];
- } else if (operation === 'update') {
- // ----------------------------------
- // update
- // ----------------------------------
-
- const updateItems = await pgUpdate(
- this.getNodeParameter,
- pgp,
- db,
- items,
- this.continueOnFail(),
- );
-
- returnItems = wrapData(updateItems);
- } else {
- pgp.end();
- throw new NodeOperationError(
- this.getNode(),
- `The operation "${operation}" is not supported!`,
- );
- }
-
- // Close the connection
- pgp.end();
-
- return this.prepareOutputData(returnItems);
+ super(nodeVersions, baseDescription);
}
}
diff --git a/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts
new file mode 100644
index 0000000000..4de24f6b4c
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts
@@ -0,0 +1,817 @@
+import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode } from 'n8n-workflow';
+
+import type { ColumnInfo, PgpDatabase, QueriesRunner } from '../../v2/helpers/interfaces';
+
+import { get } from 'lodash';
+
+import * as deleteTable from '../../v2/actions/database/deleteTable.operation';
+import * as executeQuery from '../../v2/actions/database/executeQuery.operation';
+import * as insert from '../../v2/actions/database/insert.operation';
+import * as select from '../../v2/actions/database/select.operation';
+import * as update from '../../v2/actions/database/update.operation';
+import * as upsert from '../../v2/actions/database/upsert.operation';
+
+const runQueries: QueriesRunner = jest.fn();
+
+const node: INode = {
+ id: '1',
+ name: 'Postgres node',
+ typeVersion: 2,
+ type: 'n8n-nodes-base.postgres',
+ position: [60, 760],
+ parameters: {
+ operation: 'executeQuery',
+ },
+};
+
+const items = [{ json: {} }];
+
+const createMockExecuteFunction = (nodeParameters: IDataObject) => {
+ const fakeExecuteFunction = {
+ getNodeParameter(
+ parameterName: string,
+ _itemIndex: number,
+ fallbackValue?: IDataObject | undefined,
+ options?: IGetNodeParameterOptions | undefined,
+ ) {
+ const parameter = options?.extractValue ? `${parameterName}.value` : parameterName;
+ return get(nodeParameters, parameter, fallbackValue);
+ },
+ getNode() {
+ return node;
+ },
+ } as unknown as IExecuteFunctions;
+ return fakeExecuteFunction;
+};
+
+const createMockDb = (columnInfo: ColumnInfo[]) => {
+ return {
+ async any() {
+ return columnInfo;
+ },
+ } as unknown as PgpDatabase;
+};
+
+// if node parameters copied from canvas all default parameters has to be added manualy as JSON would not have them
+describe('Test PostgresV2, deleteTable operation', () => {
+ afterEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it('deleteCommand: delete, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'deleteTable',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ cachedResultName: 'my_table',
+ },
+ deleteCommand: 'delete',
+ where: {
+ values: [
+ {
+ column: 'id',
+ condition: 'LIKE',
+ value: '1',
+ },
+ ],
+ },
+ options: {},
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await deleteTable.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'DELETE FROM $1:name.$2:name WHERE $3:name LIKE $4',
+ values: ['public', 'my_table', 'id', '1'],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('deleteCommand: truncate, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'deleteTable',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ cachedResultName: 'my_table',
+ },
+ deleteCommand: 'truncate',
+ restartSequences: true,
+ options: {
+ cascade: true,
+ },
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await deleteTable.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'TRUNCATE TABLE $1:name.$2:name RESTART IDENTITY CASCADE',
+ values: ['public', 'my_table'],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('deleteCommand: drop, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'deleteTable',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ cachedResultName: 'my_table',
+ },
+ deleteCommand: 'drop',
+ options: {},
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await deleteTable.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'DROP TABLE IF EXISTS $1:name.$2:name',
+ values: ['public', 'my_table'],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+});
+
+describe('Test PostgresV2, executeQuery operation', () => {
+ afterEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it('should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'executeQuery',
+ query: 'select * from $1:name;',
+ options: {
+ queryReplacement: 'my_table',
+ },
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await executeQuery.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [{ query: 'select * from $1:name;', values: ['my_table'] }],
+ items,
+ nodeOptions,
+ );
+ });
+});
+
+describe('Test PostgresV2, insert operation', () => {
+ afterEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it('dataMode: define, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'defineBelow',
+ valuesToSend: {
+ values: [
+ {
+ column: 'json',
+ value: '{"test":15}',
+ },
+ {
+ column: 'foo',
+ value: 'select 5',
+ },
+ {
+ column: 'id',
+ value: '4',
+ },
+ ],
+ },
+ options: {},
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await insert.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *',
+ values: ['public', 'my_table', { json: '{"test":15}', foo: 'select 5', id: '4' }],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('dataMode: autoMapInputData, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'autoMapInputData',
+ options: {},
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const inputItems = [
+ {
+ json: {
+ id: 1,
+ json: {
+ test: 15,
+ },
+ foo: 'data 1',
+ },
+ },
+ {
+ json: {
+ id: 2,
+ json: {
+ test: 10,
+ },
+ foo: 'data 2',
+ },
+ },
+ {
+ json: {
+ id: 3,
+ json: {
+ test: 5,
+ },
+ foo: 'data 3',
+ },
+ },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await insert.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ inputItems,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *',
+ values: ['public', 'my_table', { id: 1, json: { test: 15 }, foo: 'data 1' }],
+ },
+ {
+ query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *',
+ values: ['public', 'my_table', { id: 2, json: { test: 10 }, foo: 'data 2' }],
+ },
+ {
+ query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *',
+ values: ['public', 'my_table', { id: 3, json: { test: 5 }, foo: 'data 3' }],
+ },
+ ],
+ inputItems,
+ nodeOptions,
+ );
+ });
+});
+
+describe('Test PostgresV2, select operation', () => {
+ afterEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it('returnAll, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'select',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ cachedResultName: 'my_table',
+ },
+ returnAll: true,
+ options: {},
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await select.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query: 'SELECT * FROM $1:name.$2:name',
+ values: ['public', 'my_table'],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('limit, whereClauses, sortRules, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'select',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ cachedResultName: 'my_table',
+ },
+ limit: 5,
+ where: {
+ values: [
+ {
+ column: 'id',
+ condition: '>=',
+ value: 2,
+ },
+ {
+ column: 'foo',
+ condition: 'equal',
+ value: 'data 2',
+ },
+ ],
+ },
+ sort: {
+ values: [
+ {
+ column: 'id',
+ },
+ ],
+ },
+ options: {
+ outputColumns: ['json', 'id'],
+ },
+ };
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await select.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query:
+ 'SELECT $3:name FROM $1:name.$2:name WHERE $4:name >= $5 AND $6:name = $7 ORDER BY $8:name ASC LIMIT 5',
+ values: ['public', 'my_table', ['json', 'id'], 'id', 2, 'foo', 'data 2', 'id'],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+});
+
+describe('Test PostgresV2, update operation', () => {
+ afterEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it('dataMode: define, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'update',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'defineBelow',
+ columnToMatchOn: 'id',
+ valueToMatchOn: '1',
+ valuesToSend: {
+ values: [
+ {
+ column: 'json',
+ value: { text: 'some text' },
+ },
+ {
+ column: 'foo',
+ value: 'updated',
+ },
+ ],
+ },
+ options: {
+ outputColumns: ['json', 'foo'],
+ },
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await update.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query:
+ 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING $9:name',
+ values: [
+ 'public',
+ 'my_table',
+ 'id',
+ '1',
+ 'json',
+ { text: 'some text' },
+ 'foo',
+ 'updated',
+ ['json', 'foo'],
+ ],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('dataMode: autoMapInputData, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'update',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'autoMapInputData',
+ columnToMatchOn: 'id',
+ options: {},
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const inputItems = [
+ {
+ json: {
+ id: 1,
+ json: {
+ test: 15,
+ },
+ foo: 'data 1',
+ },
+ },
+ {
+ json: {
+ id: 2,
+ json: {
+ test: 10,
+ },
+ foo: 'data 2',
+ },
+ },
+ {
+ json: {
+ id: 3,
+ json: {
+ test: 5,
+ },
+ foo: 'data 3',
+ },
+ },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await update.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ inputItems,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query:
+ 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *',
+ values: ['public', 'my_table', 'id', 1, 'json', { test: 15 }, 'foo', 'data 1'],
+ },
+ {
+ query:
+ 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *',
+ values: ['public', 'my_table', 'id', 2, 'json', { test: 10 }, 'foo', 'data 2'],
+ },
+ {
+ query:
+ 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *',
+ values: ['public', 'my_table', 'id', 3, 'json', { test: 5 }, 'foo', 'data 3'],
+ },
+ ],
+ inputItems,
+ nodeOptions,
+ );
+ });
+});
+
+describe('Test PostgresV2, upsert operation', () => {
+ it('dataMode: define, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'upsert',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'defineBelow',
+ columnToMatchOn: 'id',
+ valueToMatchOn: '5',
+ valuesToSend: {
+ values: [
+ {
+ column: 'json',
+ value: '{ "test": 5 }',
+ },
+ {
+ column: 'foo',
+ value: 'data 5',
+ },
+ ],
+ },
+ options: {
+ outputColumns: ['json'],
+ },
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await upsert.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ items,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query:
+ 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING $9:name',
+ values: [
+ 'public',
+ 'my_table',
+ 'id',
+ { json: '{ "test": 5 }', foo: 'data 5', id: '5' },
+ 'json',
+ '{ "test": 5 }',
+ 'foo',
+ 'data 5',
+ ['json'],
+ ],
+ },
+ ],
+ items,
+ nodeOptions,
+ );
+ });
+
+ it('dataMode: autoMapInputData, should call runQueries with', async () => {
+ const nodeParameters: IDataObject = {
+ operation: 'upsert',
+ schema: {
+ __rl: true,
+ mode: 'list',
+ value: 'public',
+ },
+ table: {
+ __rl: true,
+ value: 'my_table',
+ mode: 'list',
+ },
+ dataMode: 'autoMapInputData',
+ columnToMatchOn: 'id',
+ options: {},
+ };
+ const columnsInfo: ColumnInfo[] = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const inputItems = [
+ {
+ json: {
+ id: 1,
+ json: {
+ test: 15,
+ },
+ foo: 'data 1',
+ },
+ },
+ {
+ json: {
+ id: 2,
+ json: {
+ test: 10,
+ },
+ foo: 'data 2',
+ },
+ },
+ {
+ json: {
+ id: 3,
+ json: {
+ test: 5,
+ },
+ foo: 'data 3',
+ },
+ },
+ ];
+
+ const nodeOptions = nodeParameters.options as IDataObject;
+
+ await upsert.execute.call(
+ createMockExecuteFunction(nodeParameters),
+ runQueries,
+ inputItems,
+ nodeOptions,
+ createMockDb(columnsInfo),
+ );
+
+ expect(runQueries).toHaveBeenCalledWith(
+ [
+ {
+ query:
+ 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *',
+ values: [
+ 'public',
+ 'my_table',
+ 'id',
+ { id: 1, json: { test: 15 }, foo: 'data 1' },
+ 'json',
+ { test: 15 },
+ 'foo',
+ 'data 1',
+ ],
+ },
+ {
+ query:
+ 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *',
+ values: [
+ 'public',
+ 'my_table',
+ 'id',
+ { id: 2, json: { test: 10 }, foo: 'data 2' },
+ 'json',
+ { test: 10 },
+ 'foo',
+ 'data 2',
+ ],
+ },
+ {
+ query:
+ 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *',
+ values: [
+ 'public',
+ 'my_table',
+ 'id',
+ { id: 3, json: { test: 5 }, foo: 'data 3' },
+ 'json',
+ { test: 5 },
+ 'foo',
+ 'data 3',
+ ],
+ },
+ ],
+ inputItems,
+ nodeOptions,
+ );
+ });
+});
diff --git a/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts
new file mode 100644
index 0000000000..9bf94936fe
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts
@@ -0,0 +1,53 @@
+import { constructExecutionMetaData } from 'n8n-core';
+import type { IDataObject, INode } from 'n8n-workflow';
+
+import type { PgpDatabase } from '../../v2/helpers/interfaces';
+import { configureQueryRunner } from '../../v2/helpers/utils';
+
+import pgPromise from 'pg-promise';
+
+const node: INode = {
+ id: '1',
+ name: 'Postgres node',
+ typeVersion: 2,
+ type: 'n8n-nodes-base.postgres',
+ position: [60, 760],
+ parameters: {
+ operation: 'executeQuery',
+ },
+};
+
+const createMockDb = (returnData: IDataObject | IDataObject[]) => {
+ return {
+ async any() {
+ return returnData;
+ },
+ async multi() {
+ return returnData;
+ },
+ async tx() {
+ return returnData;
+ },
+ async task() {
+ return returnData;
+ },
+ } as unknown as PgpDatabase;
+};
+
+describe('Test PostgresV2, runQueries', () => {
+ it('should execute, should return success true', async () => {
+ const pgp = pgPromise();
+ const db = createMockDb([]);
+
+ const dbMultiSpy = jest.spyOn(db, 'multi');
+
+ const runQueries = configureQueryRunner(node, constructExecutionMetaData, false, pgp, db);
+
+ const result = await runQueries([{ query: 'SELECT * FROM table', values: [] }], [], {});
+
+ expect(result).toBeDefined();
+ expect(result).toHaveLength(1);
+ expect(result).toEqual([{ json: { success: true } }]);
+ expect(dbMultiSpy).toHaveBeenCalledWith('SELECT * FROM table');
+ });
+});
diff --git a/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts
new file mode 100644
index 0000000000..8a707d592b
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts
@@ -0,0 +1,375 @@
+import type { IDataObject, INode } from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+import {
+ addSortRules,
+ addReturning,
+ addWhereClauses,
+ checkItemAgainstSchema,
+ parsePostgresError,
+ prepareErrorItem,
+ prepareItem,
+ replaceEmptyStringsByNulls,
+ wrapData,
+} from '../../v2/helpers/utils';
+
+const node: INode = {
+ id: '1',
+ name: 'Postgres node',
+ typeVersion: 2,
+ type: 'n8n-nodes-base.postgres',
+ position: [60, 760],
+ parameters: {
+ operation: 'executeQuery',
+ },
+};
+
+describe('Test PostgresV2, wrapData', () => {
+ it('should wrap object in json', () => {
+ const data = {
+ id: 1,
+ name: 'Name',
+ };
+ const wrappedData = wrapData(data);
+ expect(wrappedData).toBeDefined();
+ expect(wrappedData).toEqual([{ json: data }]);
+ });
+ it('should wrap each object in array in json', () => {
+ const data = [
+ {
+ id: 1,
+ name: 'Name',
+ },
+ {
+ id: 2,
+ name: 'Name 2',
+ },
+ ];
+ const wrappedData = wrapData(data);
+ expect(wrappedData).toBeDefined();
+ expect(wrappedData).toEqual([{ json: data[0] }, { json: data[1] }]);
+ });
+ it('json key from source should be inside json', () => {
+ const data = {
+ json: {
+ id: 1,
+ name: 'Name',
+ },
+ };
+ const wrappedData = wrapData(data);
+ expect(wrappedData).toBeDefined();
+ expect(wrappedData).toEqual([{ json: data }]);
+ expect(Object.keys(wrappedData[0].json)).toContain('json');
+ });
+});
+
+describe('Test PostgresV2, prepareErrorItem', () => {
+ it('should return error info item', () => {
+ const items = [
+ {
+ json: {
+ id: 1,
+ name: 'Name 1',
+ },
+ },
+ {
+ json: {
+ id: 2,
+ name: 'Name 2',
+ },
+ },
+ ];
+
+ const error = new Error('Test error');
+ const item = prepareErrorItem(items, error, 1);
+ expect(item).toBeDefined();
+
+ expect((item.json.item as IDataObject)?.id).toEqual(2);
+ expect(item.json.message).toEqual('Test error');
+ expect(item.json.error).toBeDefined();
+ });
+});
+
+describe('Test PostgresV2, parsePostgresError', () => {
+ it('should return NodeOperationError', () => {
+ const error = new Error('Test error');
+
+ const parsedError = parsePostgresError(node, error, [], 1);
+ expect(parsedError).toBeDefined();
+ expect(parsedError.message).toEqual('Test error');
+ expect(parsedError instanceof NodeOperationError).toEqual(true);
+ });
+
+ it('should update message that includes ECONNREFUSED', () => {
+ const error = new Error('ECONNREFUSED');
+
+ const parsedError = parsePostgresError(node, error, [], 1);
+ expect(parsedError).toBeDefined();
+ expect(parsedError.message).toEqual('Connection refused');
+ expect(parsedError instanceof NodeOperationError).toEqual(true);
+ });
+
+ it('should update message with syntax error', () => {
+ // eslint-disable-next-line n8n-local-rules/no-unneeded-backticks
+ const errorMessage = String.raw`syntax error at or near "seelect"`;
+ const error = new Error();
+ error.message = errorMessage;
+
+ const parsedError = parsePostgresError(node, error, [
+ { query: 'seelect * from my_table', values: [] },
+ ]);
+ expect(parsedError).toBeDefined();
+ expect(parsedError.message).toEqual('Syntax error at line 1 near "seelect"');
+ expect(parsedError instanceof NodeOperationError).toEqual(true);
+ });
+});
+
+describe('Test PostgresV2, addWhereClauses', () => {
+ it('should add where clauses to query', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const whereClauses = [{ column: 'id', condition: 'equal', value: '1' }];
+
+ const [updatedQuery, updatedValues] = addWhereClauses(
+ node,
+ 0,
+ query,
+ whereClauses,
+ values,
+ 'AND',
+ );
+
+ expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name WHERE $3:name = $4');
+ expect(updatedValues).toEqual(['public', 'my_table', 'id', '1']);
+ });
+
+ it('should combine where clauses by OR', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const whereClauses = [
+ { column: 'id', condition: 'equal', value: '1' },
+ { column: 'foo', condition: 'equal', value: 'select 2' },
+ ];
+
+ const [updatedQuery, updatedValues] = addWhereClauses(
+ node,
+ 0,
+ query,
+ whereClauses,
+ values,
+ 'OR',
+ );
+
+ expect(updatedQuery).toEqual(
+ 'SELECT * FROM $1:name.$2:name WHERE $3:name = $4 OR $5:name = $6',
+ );
+ expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'select 2']);
+ });
+
+ it('should ignore incorect combine conition ad use AND', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const whereClauses = [
+ { column: 'id', condition: 'equal', value: '1' },
+ { column: 'foo', condition: 'equal', value: 'select 2' },
+ ];
+
+ const [updatedQuery, updatedValues] = addWhereClauses(
+ node,
+ 0,
+ query,
+ whereClauses,
+ values,
+ 'SELECT * FROM my_table',
+ );
+
+ expect(updatedQuery).toEqual(
+ 'SELECT * FROM $1:name.$2:name WHERE $3:name = $4 AND $5:name = $6',
+ );
+ expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'select 2']);
+ });
+});
+
+describe('Test PostgresV2, addSortRules', () => {
+ it('should ORDER BY ASC', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const sortRules = [{ column: 'id', direction: 'ASC' }];
+
+ const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values);
+
+ expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC');
+ expect(updatedValues).toEqual(['public', 'my_table', 'id']);
+ });
+ it('should ORDER BY DESC', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const sortRules = [{ column: 'id', direction: 'DESC' }];
+
+ const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values);
+
+ expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name DESC');
+ expect(updatedValues).toEqual(['public', 'my_table', 'id']);
+ });
+ it('should ignore incorect direction', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const sortRules = [{ column: 'id', direction: 'SELECT * FROM my_table' }];
+
+ const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values);
+
+ expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC');
+ expect(updatedValues).toEqual(['public', 'my_table', 'id']);
+ });
+ it('should add multiple sort rules', () => {
+ const query = 'SELECT * FROM $1:name.$2:name';
+ const values = ['public', 'my_table'];
+ const sortRules = [
+ { column: 'id', direction: 'ASC' },
+ { column: 'foo', direction: 'DESC' },
+ ];
+
+ const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values);
+
+ expect(updatedQuery).toEqual(
+ 'SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC, $4:name DESC',
+ );
+ expect(updatedValues).toEqual(['public', 'my_table', 'id', 'foo']);
+ });
+});
+
+describe('Test PostgresV2, addReturning', () => {
+ it('should add RETURNING', () => {
+ const query = 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4';
+ const values = ['public', 'my_table', 'id', '1', 'foo', 'updated'];
+ const outputColumns = ['id', 'foo'];
+
+ const [updatedQuery, updatedValues] = addReturning(query, outputColumns, values);
+
+ expect(updatedQuery).toEqual(
+ 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4 RETURNING $7:name',
+ );
+ expect(updatedValues).toEqual([
+ 'public',
+ 'my_table',
+ 'id',
+ '1',
+ 'foo',
+ 'updated',
+ ['id', 'foo'],
+ ]);
+ });
+ it('should add RETURNING *', () => {
+ const query = 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4';
+ const values = ['public', 'my_table', 'id', '1', 'foo', 'updated'];
+ const outputColumns = ['id', 'foo', '*'];
+
+ const [updatedQuery, updatedValues] = addReturning(query, outputColumns, values);
+
+ expect(updatedQuery).toEqual(
+ 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4 RETURNING *',
+ );
+ expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'updated']);
+ });
+});
+
+describe('Test PostgresV2, replaceEmptyStringsByNulls', () => {
+ it('should replace empty string by null', () => {
+ const items = [
+ { json: { foo: 'bar', bar: '', spam: undefined } },
+ { json: { foo: '', bar: '', spam: '' } },
+ { json: { foo: 0, bar: NaN, spam: false } },
+ ];
+
+ const updatedItems = replaceEmptyStringsByNulls(items, true);
+
+ expect(updatedItems).toBeDefined();
+ expect(updatedItems).toEqual([
+ { json: { foo: 'bar', bar: null, spam: undefined } },
+ { json: { foo: null, bar: null, spam: null } },
+ { json: { foo: 0, bar: NaN, spam: false } },
+ ]);
+ });
+ it('should do nothing', () => {
+ const items = [
+ { json: { foo: 'bar', bar: '', spam: undefined } },
+ { json: { foo: '', bar: '', spam: '' } },
+ { json: { foo: 0, bar: NaN, spam: false } },
+ ];
+
+ const updatedItems = replaceEmptyStringsByNulls(items);
+
+ expect(updatedItems).toBeDefined();
+ expect(updatedItems).toEqual(items);
+ });
+});
+
+describe('Test PostgresV2, prepareItem', () => {
+ it('should convert fixedColections values to object', () => {
+ const values = [
+ {
+ column: 'id',
+ value: '1',
+ },
+ {
+ column: 'foo',
+ value: 'bar',
+ },
+ {
+ column: 'bar',
+ value: 'foo',
+ },
+ ];
+
+ const item = prepareItem(values);
+
+ expect(item).toBeDefined();
+ expect(item).toEqual({
+ id: '1',
+ foo: 'bar',
+ bar: 'foo',
+ });
+ });
+});
+
+describe('Test PostgresV2, checkItemAgainstSchema', () => {
+ it('should not throw error', () => {
+ const item = { foo: 'updated', id: 2 };
+ const columnsInfo = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ const result = checkItemAgainstSchema(node, item, columnsInfo, 0);
+
+ expect(result).toBeDefined();
+ expect(result).toEqual(item);
+ });
+ it('should throw error on not existing column', () => {
+ const item = { foo: 'updated', bar: 'updated' };
+ const columnsInfo = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'json', data_type: 'json', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ try {
+ checkItemAgainstSchema(node, item, columnsInfo, 0);
+ } catch (error) {
+ expect(error.message).toEqual("Column 'bar' does not exist in selected table");
+ }
+ });
+ it('should throw error on not nullable column', () => {
+ const item = { foo: null };
+ const columnsInfo = [
+ { column_name: 'id', data_type: 'integer', is_nullable: 'NO' },
+ { column_name: 'foo', data_type: 'text', is_nullable: 'NO' },
+ ];
+
+ try {
+ checkItemAgainstSchema(node, item, columnsInfo, 0);
+ } catch (error) {
+ expect(error.message).toEqual("Column 'foo' is not nullable");
+ }
+ });
+});
diff --git a/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts b/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts
new file mode 100644
index 0000000000..ef5cef53e6
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts
@@ -0,0 +1,424 @@
+/* eslint-disable n8n-nodes-base/node-filename-against-convention */
+import type { IExecuteFunctions } from 'n8n-core';
+import type {
+ ICredentialsDecrypted,
+ ICredentialTestFunctions,
+ IDataObject,
+ INodeCredentialTestResult,
+ INodeExecutionData,
+ INodeType,
+ INodeTypeBaseDescription,
+ INodeTypeDescription,
+} from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+
+import pgPromise from 'pg-promise';
+
+import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './genericFunctions';
+
+const versionDescription: INodeTypeDescription = {
+ displayName: 'Postgres',
+ name: 'postgres',
+ icon: 'file:postgres.svg',
+ group: ['input'],
+ version: 1,
+ description: 'Get, add and update data in Postgres',
+ defaults: {
+ name: 'Postgres',
+ },
+ inputs: ['main'],
+ outputs: ['main'],
+ credentials: [
+ {
+ name: 'postgres',
+ required: true,
+ testedBy: 'postgresConnectionTest',
+ },
+ ],
+ properties: [
+ {
+ displayName: 'Version 1',
+ name: 'versionNotice',
+ type: 'notice',
+ default: '',
+ },
+ {
+ displayName: 'Operation',
+ name: 'operation',
+ type: 'options',
+ noDataExpression: true,
+ options: [
+ {
+ name: 'Execute Query',
+ value: 'executeQuery',
+ description: 'Execute an SQL query',
+ action: 'Execute a SQL query',
+ },
+ {
+ name: 'Insert',
+ value: 'insert',
+ description: 'Insert rows in database',
+ action: 'Insert rows in database',
+ },
+ {
+ name: 'Update',
+ value: 'update',
+ description: 'Update rows in database',
+ action: 'Update rows in database',
+ },
+ ],
+ default: 'insert',
+ },
+
+ // ----------------------------------
+ // executeQuery
+ // ----------------------------------
+ {
+ displayName: 'Query',
+ name: 'query',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['executeQuery'],
+ },
+ },
+ default: '',
+ placeholder: 'SELECT id, name FROM product WHERE quantity > $1 AND price <= $2',
+ required: true,
+ description:
+ 'The SQL query to execute. You can use n8n expressions or $1 and $2 in conjunction with query parameters.',
+ },
+ // ----------------------------------
+ // insert
+ // ----------------------------------
+ {
+ displayName: 'Schema',
+ name: 'schema',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['insert'],
+ },
+ },
+ default: 'public',
+ required: true,
+ description: 'Name of the schema the table belongs to',
+ },
+ {
+ displayName: 'Table',
+ name: 'table',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['insert'],
+ },
+ },
+ default: '',
+ required: true,
+ description: 'Name of the table in which to insert data to',
+ },
+ {
+ displayName: 'Columns',
+ name: 'columns',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['insert'],
+ },
+ },
+ default: '',
+ // eslint-disable-next-line n8n-nodes-base/node-param-placeholder-miscased-id
+ placeholder: 'id:int,name:text,description',
+ // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
+ description:
+ 'Comma-separated list of the properties which should used as columns for the new rows. You can use type casting with colons (:) like id:int.',
+ },
+
+ // ----------------------------------
+ // update
+ // ----------------------------------
+ {
+ displayName: 'Schema',
+ name: 'schema',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['update'],
+ },
+ },
+ default: 'public',
+ description: 'Name of the schema the table belongs to',
+ },
+ {
+ displayName: 'Table',
+ name: 'table',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['update'],
+ },
+ },
+ default: '',
+ required: true,
+ description: 'Name of the table in which to update data in',
+ },
+ {
+ displayName: 'Update Key',
+ name: 'updateKey',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['update'],
+ },
+ },
+ default: 'id',
+ required: true,
+ // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
+ description:
+ 'Comma-separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".',
+ },
+ {
+ displayName: 'Columns',
+ name: 'columns',
+ type: 'string',
+ displayOptions: {
+ show: {
+ operation: ['update'],
+ },
+ },
+ default: '',
+ placeholder: 'name:text,description',
+ // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id
+ description:
+ 'Comma-separated list of the properties which should used as columns for rows to update. You can use type casting with colons (:) like id:int.',
+ },
+
+ // ----------------------------------
+ // insert,update
+ // ----------------------------------
+ {
+ displayName: 'Return Fields',
+ name: 'returnFields',
+ type: 'string',
+ requiresDataPath: 'multiple',
+ displayOptions: {
+ show: {
+ operation: ['insert', 'update'],
+ },
+ },
+ default: '*',
+ description: 'Comma-separated list of the fields that the operation will return',
+ },
+ // ----------------------------------
+ // Additional fields
+ // ----------------------------------
+ {
+ displayName: 'Additional Fields',
+ name: 'additionalFields',
+ type: 'collection',
+ placeholder: 'Add Field',
+ default: {},
+ options: [
+ {
+ displayName: 'Mode',
+ name: 'mode',
+ type: 'options',
+ options: [
+ {
+ name: 'Independently',
+ value: 'independently',
+ description: 'Execute each query independently',
+ },
+ {
+ name: 'Multiple Queries',
+ value: 'multiple',
+ description: 'Default. Sends multiple queries at once to database.',
+ },
+ {
+ name: 'Transaction',
+ value: 'transaction',
+ description: 'Executes all queries in a single transaction',
+ },
+ ],
+ default: 'multiple',
+ description:
+ 'The way queries should be sent to database. Can be used in conjunction with Continue on Fail. See the docs for more examples',
+ },
+ {
+ displayName: 'Output Large-Format Numbers As',
+ name: 'largeNumbersOutput',
+ type: 'options',
+ options: [
+ {
+ name: 'Numbers',
+ value: 'numbers',
+ },
+ {
+ name: 'Text',
+ value: 'text',
+ description:
+ 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)',
+ },
+ ],
+ hint: 'Applies to NUMERIC and BIGINT columns only',
+ default: 'text',
+ },
+ {
+ displayName: 'Query Parameters',
+ name: 'queryParams',
+ type: 'string',
+ displayOptions: {
+ show: {
+ '/operation': ['executeQuery'],
+ },
+ },
+ default: '',
+ placeholder: 'quantity,price',
+ description:
+ 'Comma-separated list of properties which should be used as query parameters',
+ },
+ ],
+ },
+ ],
+};
+
+export class PostgresV1 implements INodeType {
+ description: INodeTypeDescription;
+
+ constructor(baseDescription: INodeTypeBaseDescription) {
+ this.description = {
+ ...baseDescription,
+ ...versionDescription,
+ };
+ }
+
+ methods = {
+ credentialTest: {
+ async postgresConnectionTest(
+ this: ICredentialTestFunctions,
+ credential: ICredentialsDecrypted,
+ ): Promise {
+ const credentials = credential.data as IDataObject;
+ try {
+ const pgp = pgPromise();
+ const config: IDataObject = {
+ host: credentials.host as string,
+ port: credentials.port as number,
+ database: credentials.database as string,
+ user: credentials.user as string,
+ password: credentials.password as string,
+ };
+
+ if (credentials.allowUnauthorizedCerts === true) {
+ config.ssl = {
+ rejectUnauthorized: false,
+ };
+ } else {
+ config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
+ config.sslmode = (credentials.ssl as string) || 'disable';
+ }
+
+ const db = pgp(config);
+ await db.connect();
+ pgp.end();
+ } catch (error) {
+ return {
+ status: 'Error',
+ message: error.message,
+ };
+ }
+ return {
+ status: 'OK',
+ message: 'Connection successful!',
+ };
+ },
+ },
+ };
+
+ async execute(this: IExecuteFunctions): Promise {
+ const credentials = await this.getCredentials('postgres');
+ const largeNumbersOutput = this.getNodeParameter(
+ 'additionalFields.largeNumbersOutput',
+ 0,
+ '',
+ ) as string;
+
+ const pgp = pgPromise();
+
+ if (largeNumbersOutput === 'numbers') {
+ pgp.pg.types.setTypeParser(20, (value: string) => {
+ return parseInt(value, 10);
+ });
+ pgp.pg.types.setTypeParser(1700, (value: string) => {
+ return parseFloat(value);
+ });
+ }
+
+ const config: IDataObject = {
+ host: credentials.host as string,
+ port: credentials.port as number,
+ database: credentials.database as string,
+ user: credentials.user as string,
+ password: credentials.password as string,
+ };
+
+ if (credentials.allowUnauthorizedCerts === true) {
+ config.ssl = {
+ rejectUnauthorized: false,
+ };
+ } else {
+ config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
+ config.sslmode = (credentials.ssl as string) || 'disable';
+ }
+
+ const db = pgp(config);
+
+ let returnItems: INodeExecutionData[] = [];
+
+ const items = this.getInputData();
+ const operation = this.getNodeParameter('operation', 0);
+
+ if (operation === 'executeQuery') {
+ // ----------------------------------
+ // executeQuery
+ // ----------------------------------
+
+ const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail());
+ returnItems = queryResult as INodeExecutionData[];
+ } else if (operation === 'insert') {
+ // ----------------------------------
+ // insert
+ // ----------------------------------
+
+ const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail());
+
+ // returnItems = this.helpers.returnJsonArray(insertData);
+ returnItems = insertData as INodeExecutionData[];
+ } else if (operation === 'update') {
+ // ----------------------------------
+ // update
+ // ----------------------------------
+
+ const updateItems = await pgUpdate(
+ this.getNodeParameter,
+ pgp,
+ db,
+ items,
+ this.continueOnFail(),
+ );
+
+ returnItems = wrapData(updateItems);
+ } else {
+ pgp.end();
+ throw new NodeOperationError(
+ this.getNode(),
+ `The operation "${operation}" is not supported!`,
+ );
+ }
+
+ // Close the connection
+ pgp.end();
+
+ return this.prepareOutputData(returnItems);
+ }
+}
diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts b/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts
similarity index 100%
rename from packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts
rename to packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts
diff --git a/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts b/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts
new file mode 100644
index 0000000000..e435f4ddbe
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts
@@ -0,0 +1,29 @@
+/* eslint-disable n8n-nodes-base/node-filename-against-convention */
+import type { IExecuteFunctions } from 'n8n-core';
+import type {
+ INodeExecutionData,
+ INodeType,
+ INodeTypeBaseDescription,
+ INodeTypeDescription,
+} from 'n8n-workflow';
+import { router } from './actions/router';
+
+import { versionDescription } from './actions/versionDescription';
+import { credentialTest, listSearch, loadOptions } from './methods';
+
+export class PostgresV2 implements INodeType {
+ description: INodeTypeDescription;
+
+ constructor(baseDescription: INodeTypeBaseDescription) {
+ this.description = {
+ ...baseDescription,
+ ...versionDescription,
+ };
+ }
+
+ methods = { credentialTest, listSearch, loadOptions };
+
+ async execute(this: IExecuteFunctions): Promise {
+ return router.call(this);
+ }
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts
new file mode 100644
index 0000000000..0ea7b35962
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts
@@ -0,0 +1,335 @@
+import type { INodeProperties } from 'n8n-workflow';
+
+export const optionsCollection: INodeProperties = {
+ displayName: 'Options',
+ name: 'options',
+ type: 'collection',
+ placeholder: 'Add Option',
+ default: {},
+ options: [
+ {
+ displayName: 'Cascade',
+ name: 'cascade',
+ type: 'boolean',
+ default: false,
+ description:
+ 'Whether to drop all objects that depend on the table, such as views and sequences',
+ displayOptions: {
+ show: {
+ '/operation': ['deleteTable'],
+ },
+ hide: {
+ '/deleteCommand': ['delete'],
+ },
+ },
+ },
+ {
+ displayName: 'Connection Timeout',
+ name: 'connectionTimeout',
+ type: 'number',
+ default: 30,
+ description: 'Number of seconds reserved for connecting to the database',
+ },
+ {
+ displayName: 'Query Batching',
+ name: 'queryBatching',
+ type: 'options',
+ noDataExpression: true,
+ options: [
+ {
+ name: 'Single Query',
+ value: 'single',
+ description: 'A single query for all incoming items',
+ },
+ {
+ name: 'Independently',
+ value: 'independently',
+ description: 'Execute one query per incoming item of the run',
+ },
+ {
+ name: 'Transaction',
+ value: 'transaction',
+ description:
+ 'Execute all queries in a transaction, if a failure occurs, all changes are rolled back',
+ },
+ ],
+ default: 'single',
+ description: 'The way queries should be sent to the database',
+ },
+ {
+ displayName: 'Query Parameters',
+ name: 'queryReplacement',
+ type: 'string',
+ default: '',
+ description:
+ 'Comma-separated list of the values you want to use as query parameters. More info.',
+ hint: 'Comma-separated list of values: reference them in your query as $1, $2, $3…',
+ placeholder: 'e.g. value1,value2,value3',
+ displayOptions: {
+ show: { '/operation': ['executeQuery'] },
+ },
+ },
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-multi-options
+ displayName: 'Output Columns',
+ name: 'outputColumns',
+ type: 'multiOptions',
+ description:
+ 'Choose from the list, or specify IDs using an expression',
+ typeOptions: {
+ loadOptionsMethod: 'getColumnsMultiOptions',
+ loadOptionsDependsOn: ['table.value'],
+ },
+ default: [],
+ displayOptions: {
+ show: { '/operation': ['select', 'insert', 'update', 'upsert'] },
+ },
+ },
+ {
+ displayName: 'Output Large-Format Numbers As',
+ name: 'largeNumbersOutput',
+ type: 'options',
+ options: [
+ {
+ name: 'Numbers',
+ value: 'numbers',
+ },
+ {
+ name: 'Text',
+ value: 'text',
+ description:
+ 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)',
+ },
+ ],
+ hint: 'Applies to NUMERIC and BIGINT columns only',
+ default: 'text',
+ },
+ {
+ displayName: 'Skip on Conflict',
+ name: 'skipOnConflict',
+ type: 'boolean',
+ default: false,
+ description:
+ 'Whether to skip the row and do not throw error if a unique constraint or exclusion constraint is violated',
+ displayOptions: {
+ show: {
+ '/operation': ['insert'],
+ },
+ },
+ },
+ {
+ displayName: 'Replace Empty Strings with NULL',
+ name: 'replaceEmptyStrings',
+ type: 'boolean',
+ default: false,
+ description:
+ 'Whether to replace empty strings with NULL in input, could be useful when data come from spreadsheet',
+ displayOptions: {
+ show: {
+ '/operation': ['insert', 'update', 'upsert', 'executeQuery'],
+ },
+ },
+ },
+ ],
+};
+
+export const schemaRLC: INodeProperties = {
+ displayName: 'Schema',
+ name: 'schema',
+ type: 'resourceLocator',
+ default: { mode: 'list', value: 'public' },
+ required: true,
+ placeholder: 'e.g. public',
+ description: 'The schema that contains the table you want to work on',
+ modes: [
+ {
+ displayName: 'From List',
+ name: 'list',
+ type: 'list',
+ typeOptions: {
+ searchListMethod: 'schemaSearch',
+ },
+ },
+ {
+ displayName: 'By Name',
+ name: 'name',
+ type: 'string',
+ },
+ ],
+};
+
+export const tableRLC: INodeProperties = {
+ displayName: 'Table',
+ name: 'table',
+ type: 'resourceLocator',
+ default: { mode: 'list', value: '' },
+ required: true,
+ description: 'The table you want to work on',
+ modes: [
+ {
+ displayName: 'From List',
+ name: 'list',
+ type: 'list',
+ typeOptions: {
+ searchListMethod: 'tableSearch',
+ },
+ },
+ {
+ displayName: 'By Name',
+ name: 'name',
+ type: 'string',
+ },
+ ],
+};
+
+export const whereFixedCollection: INodeProperties = {
+ displayName: 'Select Rows',
+ name: 'where',
+ type: 'fixedCollection',
+ typeOptions: {
+ multipleValues: true,
+ },
+ placeholder: 'Add Condition',
+ default: {},
+ description: 'If not set, all rows will be selected',
+ options: [
+ {
+ displayName: 'Values',
+ name: 'values',
+ values: [
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column',
+ name: 'column',
+ type: 'options',
+ description:
+ 'Choose from the list, or specify an ID using an expression',
+ default: '',
+ placeholder: 'e.g. ID',
+ typeOptions: {
+ loadOptionsMethod: 'getColumns',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ },
+ {
+ displayName: 'Operator',
+ name: 'condition',
+ type: 'options',
+ description:
+ "The operator to check the column against. When using 'LIKE' operator percent sign ( %) matches zero or more characters, underscore ( _ ) matches any single character.",
+ // eslint-disable-next-line n8n-nodes-base/node-param-options-type-unsorted-items
+ options: [
+ {
+ name: 'Equal',
+ value: 'equal',
+ },
+ {
+ name: 'Not Equal',
+ value: '!=',
+ },
+ {
+ name: 'Like',
+ value: 'LIKE',
+ },
+ {
+ name: 'Greater Than',
+ value: '>',
+ },
+ {
+ name: 'Less Than',
+ value: '<',
+ },
+ {
+ name: 'Greater Than Or Equal',
+ value: '>=',
+ },
+ {
+ name: 'Less Than Or Equal',
+ value: '<=',
+ },
+ {
+ name: 'Is Null',
+ value: 'IS NULL',
+ },
+ ],
+ default: 'equal',
+ },
+ {
+ displayName: 'Value',
+ name: 'value',
+ type: 'string',
+ default: '',
+ },
+ ],
+ },
+ ],
+};
+
+export const sortFixedCollection: INodeProperties = {
+ displayName: 'Sort',
+ name: 'sort',
+ type: 'fixedCollection',
+ typeOptions: {
+ multipleValues: true,
+ },
+ placeholder: 'Add Sort Rule',
+ default: {},
+ options: [
+ {
+ displayName: 'Values',
+ name: 'values',
+ values: [
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column',
+ name: 'column',
+ type: 'options',
+ description:
+ 'Choose from the list, or specify an ID using an expression',
+ default: '',
+ typeOptions: {
+ loadOptionsMethod: 'getColumns',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ },
+ {
+ displayName: 'Direction',
+ name: 'direction',
+ type: 'options',
+ options: [
+ {
+ name: 'ASC',
+ value: 'ASC',
+ },
+ {
+ name: 'DESC',
+ value: 'DESC',
+ },
+ ],
+ default: 'ASC',
+ },
+ ],
+ },
+ ],
+};
+
+export const combineConditionsCollection: INodeProperties = {
+ displayName: 'Combine Conditions',
+ name: 'combineConditions',
+ type: 'options',
+ description:
+ 'How to combine the conditions defined in "Select Rows": AND requires all conditions to be true, OR requires at least one condition to be true',
+ options: [
+ {
+ name: 'AND',
+ value: 'AND',
+ description: 'Only rows that meet all the conditions are selected',
+ },
+ {
+ name: 'OR',
+ value: 'OR',
+ description: 'Rows that meet at least one condition are selected',
+ },
+ ],
+ default: 'AND',
+};
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts
new file mode 100644
index 0000000000..125ebaa1d7
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts
@@ -0,0 +1,74 @@
+import type { INodeProperties } from 'n8n-workflow';
+import { schemaRLC, tableRLC } from '../common.descriptions';
+
+import * as deleteTable from './deleteTable.operation';
+import * as executeQuery from './executeQuery.operation';
+import * as insert from './insert.operation';
+import * as select from './select.operation';
+import * as update from './update.operation';
+import * as upsert from './upsert.operation';
+
+export { deleteTable, executeQuery, insert, select, update, upsert };
+
+export const description: INodeProperties[] = [
+ {
+ displayName: 'Operation',
+ name: 'operation',
+ type: 'options',
+ noDataExpression: true,
+ options: [
+ {
+ name: 'Delete',
+ value: 'deleteTable',
+ description: 'Delete an entire table or rows in a table',
+ action: 'Delete table or rows',
+ },
+ {
+ name: 'Execute Query',
+ value: 'executeQuery',
+ description: 'Execute an SQL query',
+ action: 'Execute a SQL query',
+ },
+ {
+ name: 'Insert',
+ value: 'insert',
+ description: 'Insert rows in a table',
+ action: 'Insert rows in a table',
+ },
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-option-name-wrong-for-upsert
+ name: 'Insert or Update',
+ value: 'upsert',
+ // eslint-disable-next-line n8n-nodes-base/node-param-description-wrong-for-upsert
+ description: 'Insert or update rows in a table',
+ action: 'Insert or update rows in a table',
+ },
+ {
+ name: 'Select',
+ value: 'select',
+ description: 'Select rows from a table',
+ action: 'Select rows from a table',
+ },
+ {
+ name: 'Update',
+ value: 'update',
+ description: 'Update rows in a table',
+ action: 'Update rows in a table',
+ },
+ ],
+ displayOptions: {
+ show: {
+ resource: ['database'],
+ },
+ },
+ default: 'insert',
+ },
+ { ...schemaRLC, displayOptions: { hide: { operation: ['executeQuery'] } } },
+ { ...tableRLC, displayOptions: { hide: { operation: ['executeQuery'] } } },
+ ...deleteTable.description,
+ ...executeQuery.description,
+ ...insert.description,
+ ...select.description,
+ ...update.description,
+ ...upsert.description,
+];
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts
new file mode 100644
index 0000000000..c0d4eba9a4
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts
@@ -0,0 +1,159 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type {
+ PgpDatabase,
+ QueriesRunner,
+ QueryValues,
+ QueryWithValues,
+ WhereClause,
+} from '../../helpers/interfaces';
+
+import { addWhereClauses } from '../../helpers/utils';
+
+import {
+ combineConditionsCollection,
+ optionsCollection,
+ whereFixedCollection,
+} from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Command',
+ name: 'deleteCommand',
+ type: 'options',
+ default: 'truncate',
+ options: [
+ {
+ name: 'Truncate',
+ value: 'truncate',
+ description: "Only removes the table's data and preserves the table's structure",
+ },
+ {
+ name: 'Delete',
+ value: 'delete',
+ description:
+ "Delete the rows that match the 'Select Rows' conditions below. If no selection is made, all rows in the table are deleted.",
+ },
+ {
+ name: 'Drop',
+ value: 'drop',
+ description: "Deletes the table's data and also the table's structure permanently",
+ },
+ ],
+ },
+ {
+ displayName: 'Restart Sequences',
+ name: 'restartSequences',
+ type: 'boolean',
+ default: false,
+ description: 'Whether to reset identity (auto-increment) columns to their initial values',
+ displayOptions: {
+ show: {
+ deleteCommand: ['truncate'],
+ },
+ },
+ },
+ {
+ ...whereFixedCollection,
+ displayOptions: {
+ show: {
+ deleteCommand: ['delete'],
+ },
+ },
+ },
+ {
+ ...combineConditionsCollection,
+ displayOptions: {
+ show: {
+ deleteCommand: ['delete'],
+ },
+ },
+ },
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['deleteTable'],
+ },
+ hide: {
+ table: [''],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ _db?: PgpDatabase,
+): Promise {
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const options = this.getNodeParameter('options', i, {});
+
+ const schema = this.getNodeParameter('schema', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const deleteCommand = this.getNodeParameter('deleteCommand', i) as string;
+
+ let query = '';
+ let values: QueryValues = [schema, table];
+
+ if (deleteCommand === 'drop') {
+ const cascade = options.cascade ? ' CASCADE' : '';
+ query = `DROP TABLE IF EXISTS $1:name.$2:name${cascade}`;
+ }
+
+ if (deleteCommand === 'truncate') {
+ const identity = this.getNodeParameter('restartSequences', i, false)
+ ? ' RESTART IDENTITY'
+ : '';
+ const cascade = options.cascade ? ' CASCADE' : '';
+ query = `TRUNCATE TABLE $1:name.$2:name${identity}${cascade}`;
+ }
+
+ if (deleteCommand === 'delete') {
+ const whereClauses =
+ ((this.getNodeParameter('where', i, []) as IDataObject).values as WhereClause[]) || [];
+
+ const combineConditions = this.getNodeParameter('combineConditions', i, 'AND') as string;
+
+ [query, values] = addWhereClauses(
+ this.getNode(),
+ i,
+ 'DELETE FROM $1:name.$2:name',
+ whereClauses,
+ values,
+ combineConditions,
+ );
+ }
+
+ if (query === '') {
+ throw new NodeOperationError(
+ this.getNode(),
+ 'Invalid delete command, only drop, delete and truncate are supported ',
+ { itemIndex: i },
+ );
+ }
+
+ const queryWithValues = { query, values };
+
+ queries.push(queryWithValues);
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts
new file mode 100644
index 0000000000..41aedcf156
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts
@@ -0,0 +1,84 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type { PgpDatabase, QueriesRunner, QueryWithValues } from '../../helpers/interfaces';
+
+import { replaceEmptyStringsByNulls } from '../../helpers/utils';
+
+import { optionsCollection } from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Query',
+ name: 'query',
+ type: 'string',
+ default: '',
+ placeholder: 'e.g. SELECT id, name FROM product WHERE quantity > $1 AND price <= $2',
+ required: true,
+ description:
+ "The SQL query to execute. You can use n8n expressions and $1, $2, $3, etc to refer to the 'Query Parameters' set in options below.",
+ typeOptions: {
+ rows: 3,
+ },
+ hint: 'Prefer using query parameters over n8n expressions to avoid SQL injection attacks',
+ },
+ {
+ displayName: `
+ To use query parameters in your SQL query, reference them as $1, $2, $3, etc in the corresponding order. More info.
+ `,
+ name: 'notice',
+ type: 'notice',
+ default: '',
+ },
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['executeQuery'],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ _db?: PgpDatabase,
+): Promise {
+ items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
+
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const query = this.getNodeParameter('query', i) as string;
+
+ let values: IDataObject[] = [];
+
+ let queryReplacement = this.getNodeParameter('options.queryReplacement', i, '');
+
+ if (typeof queryReplacement === 'string') {
+ queryReplacement = queryReplacement.split(',').map((entry) => entry.trim());
+ }
+
+ if (Array.isArray(queryReplacement)) {
+ values = queryReplacement as IDataObject[];
+ } else {
+ throw new NodeOperationError(
+ this.getNode(),
+ 'Query Replacement must be a string of comma-separated values, or an array of values',
+ { itemIndex: i },
+ );
+ }
+
+ queries.push({ query, values });
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts
new file mode 100644
index 0000000000..a30b178bff
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts
@@ -0,0 +1,172 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type {
+ PgpDatabase,
+ QueriesRunner,
+ QueryValues,
+ QueryWithValues,
+} from '../../helpers/interfaces';
+
+import {
+ addReturning,
+ checkItemAgainstSchema,
+ getTableSchema,
+ prepareItem,
+ replaceEmptyStringsByNulls,
+} from '../../helpers/utils';
+
+import { optionsCollection } from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Data Mode',
+ name: 'dataMode',
+ type: 'options',
+ options: [
+ {
+ name: 'Auto-Map Input Data to Columns',
+ value: 'autoMapInputData',
+ description: 'Use when node input properties names exactly match the table column names',
+ },
+ {
+ name: 'Map Each Column Manually',
+ value: 'defineBelow',
+ description: 'Set the value for each destination column manually',
+ },
+ ],
+ default: 'autoMapInputData',
+ description:
+ 'Whether to map node input properties and the table data automatically or manually',
+ },
+ {
+ displayName: `
+ In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names.
+ `,
+ name: 'notice',
+ type: 'notice',
+ default: '',
+ displayOptions: {
+ show: {
+ dataMode: ['autoMapInputData'],
+ },
+ },
+ },
+ {
+ displayName: 'Values to Send',
+ name: 'valuesToSend',
+ placeholder: 'Add Value',
+ type: 'fixedCollection',
+ typeOptions: {
+ multipleValueButtonText: 'Add Value',
+ multipleValues: true,
+ },
+ displayOptions: {
+ show: {
+ dataMode: ['defineBelow'],
+ },
+ },
+ default: {},
+ options: [
+ {
+ displayName: 'Values',
+ name: 'values',
+ values: [
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column',
+ name: 'column',
+ type: 'options',
+ description:
+ 'Choose from the list, or specify an ID using an expression',
+ typeOptions: {
+ loadOptionsMethod: 'getColumns',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ default: [],
+ },
+ {
+ displayName: 'Value',
+ name: 'value',
+ type: 'string',
+ default: '',
+ },
+ ],
+ },
+ ],
+ },
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['insert'],
+ },
+ hide: {
+ table: [''],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ db: PgpDatabase,
+): Promise {
+ items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
+
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const schema = this.getNodeParameter('schema', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const options = this.getNodeParameter('options', i, {});
+
+ let onConflict = '';
+ if (options.skipOnConflict) {
+ onConflict = ' ON CONFLICT DO NOTHING';
+ }
+
+ let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`;
+ let values: QueryValues = [schema, table];
+
+ const dataMode = this.getNodeParameter('dataMode', i) as string;
+
+ let item: IDataObject = {};
+
+ if (dataMode === 'autoMapInputData') {
+ item = items[i].json;
+ }
+
+ if (dataMode === 'defineBelow') {
+ const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject)
+ .values as IDataObject[];
+
+ item = prepareItem(valuesToSend);
+ }
+
+ const tableSchema = await getTableSchema(db, schema, table);
+
+ values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i));
+
+ const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[];
+
+ [query, values] = addReturning(query, outputColumns, values);
+
+ queries.push({ query, values });
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts
new file mode 100644
index 0000000000..2844893dc9
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts
@@ -0,0 +1,134 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type {
+ PgpDatabase,
+ QueriesRunner,
+ QueryValues,
+ QueryWithValues,
+ SortRule,
+ WhereClause,
+} from '../../helpers/interfaces';
+
+import { addSortRules, addWhereClauses, replaceEmptyStringsByNulls } from '../../helpers/utils';
+
+import {
+ combineConditionsCollection,
+ optionsCollection,
+ sortFixedCollection,
+ whereFixedCollection,
+} from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Return All',
+ name: 'returnAll',
+ type: 'boolean',
+ default: false,
+ description: 'Whether to return all results or only up to a given limit',
+ displayOptions: {
+ show: {
+ resource: ['event'],
+ operation: ['getAll'],
+ },
+ },
+ },
+ {
+ displayName: 'Limit',
+ name: 'limit',
+ type: 'number',
+ default: 50,
+ description: 'Max number of results to return',
+ typeOptions: {
+ minValue: 1,
+ },
+ displayOptions: {
+ show: {
+ returnAll: [false],
+ },
+ },
+ },
+ whereFixedCollection,
+ combineConditionsCollection,
+ sortFixedCollection,
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['select'],
+ },
+ hide: {
+ table: [''],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ _db?: PgpDatabase,
+): Promise {
+ items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
+
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const schema = this.getNodeParameter('schema', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ let values: QueryValues = [schema, table];
+
+ const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[];
+
+ let query = '';
+
+ if (outputColumns.includes('*')) {
+ query = 'SELECT * FROM $1:name.$2:name';
+ } else {
+ values.push(outputColumns);
+ query = `SELECT $${values.length}:name FROM $1:name.$2:name`;
+ }
+
+ const whereClauses =
+ ((this.getNodeParameter('where', i, []) as IDataObject).values as WhereClause[]) || [];
+
+ const combineConditions = this.getNodeParameter('combineConditions', i, 'AND') as string;
+
+ [query, values] = addWhereClauses(
+ this.getNode(),
+ i,
+ query,
+ whereClauses,
+ values,
+ combineConditions,
+ );
+
+ const sortRules =
+ ((this.getNodeParameter('sort', i, []) as IDataObject).values as SortRule[]) || [];
+
+ [query, values] = addSortRules(query, sortRules, values);
+
+ const returnAll = this.getNodeParameter('returnAll', i, false);
+ if (!returnAll) {
+ const limit = this.getNodeParameter('limit', i, 50);
+ query += ` LIMIT ${limit}`;
+ }
+
+ const queryWithValues = { query, values };
+ queries.push(queryWithValues);
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts
new file mode 100644
index 0000000000..be474904b6
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts
@@ -0,0 +1,216 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type {
+ PgpDatabase,
+ QueriesRunner,
+ QueryValues,
+ QueryWithValues,
+} from '../../helpers/interfaces';
+
+import {
+ addReturning,
+ checkItemAgainstSchema,
+ getTableSchema,
+ prepareItem,
+ replaceEmptyStringsByNulls,
+} from '../../helpers/utils';
+
+import { optionsCollection } from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Data Mode',
+ name: 'dataMode',
+ type: 'options',
+ options: [
+ {
+ name: 'Auto-Map Input Data to Columns',
+ value: 'autoMapInputData',
+ description: 'Use when node input properties names exactly match the table column names',
+ },
+ {
+ name: 'Map Each Column Manually',
+ value: 'defineBelow',
+ description: 'Set the value for each destination column manually',
+ },
+ ],
+ default: 'autoMapInputData',
+ description:
+ 'Whether to map node input properties and the table data automatically or manually',
+ },
+ {
+ displayName: `
+ In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names.
+ `,
+ name: 'notice',
+ type: 'notice',
+ default: '',
+ displayOptions: {
+ show: {
+ dataMode: ['autoMapInputData'],
+ },
+ },
+ },
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased, n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column to Match On',
+ name: 'columnToMatchOn',
+ type: 'options',
+ required: true,
+ description:
+ 'The column to compare when finding the rows to update. Choose from the list, or specify an ID using an expression.',
+ typeOptions: {
+ loadOptionsMethod: 'getColumns',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ default: '',
+ hint: 'The column that identifies the row(s) to modify',
+ },
+ {
+ displayName: 'Value of Column to Match On',
+ name: 'valueToMatchOn',
+ type: 'string',
+ default: '',
+ description:
+ 'Rows with a value in the specified "Column to Match On" that corresponds to the value in this field will be updated',
+ displayOptions: {
+ show: {
+ dataMode: ['defineBelow'],
+ },
+ },
+ },
+ {
+ displayName: 'Values to Send',
+ name: 'valuesToSend',
+ placeholder: 'Add Value',
+ type: 'fixedCollection',
+ typeOptions: {
+ multipleValueButtonText: 'Add Value',
+ multipleValues: true,
+ },
+ displayOptions: {
+ show: {
+ dataMode: ['defineBelow'],
+ },
+ },
+ default: {},
+ options: [
+ {
+ displayName: 'Values',
+ name: 'values',
+ values: [
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column',
+ name: 'column',
+ type: 'options',
+ description:
+ 'Choose from the list, or specify an ID using an expression',
+ typeOptions: {
+ loadOptionsMethod: 'getColumnsWithoutColumnToMatchOn',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ default: [],
+ },
+ {
+ displayName: 'Value',
+ name: 'value',
+ type: 'string',
+ default: '',
+ },
+ ],
+ },
+ ],
+ },
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['update'],
+ },
+ hide: {
+ table: [''],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ db: PgpDatabase,
+): Promise {
+ items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
+
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const schema = this.getNodeParameter('schema', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const columnToMatchOn = this.getNodeParameter('columnToMatchOn', i) as string;
+
+ const dataMode = this.getNodeParameter('dataMode', i) as string;
+
+ let item: IDataObject = {};
+ let valueToMatchOn: string | IDataObject = '';
+
+ if (dataMode === 'autoMapInputData') {
+ item = items[i].json;
+ valueToMatchOn = item[columnToMatchOn] as string;
+ }
+
+ if (dataMode === 'defineBelow') {
+ const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject)
+ .values as IDataObject[];
+
+ item = prepareItem(valuesToSend);
+
+ valueToMatchOn = this.getNodeParameter('valueToMatchOn', i) as string;
+ }
+
+ const tableSchema = await getTableSchema(db, schema, table);
+
+ item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
+
+ let values: QueryValues = [schema, table];
+
+ let valuesLength = values.length + 1;
+
+ const condition = `$${valuesLength}:name = $${valuesLength + 1}`;
+ valuesLength = valuesLength + 2;
+ values.push(columnToMatchOn, valueToMatchOn);
+
+ const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn);
+
+ const updates: string[] = [];
+
+ for (const column of updateColumns) {
+ updates.push(`$${valuesLength}:name = $${valuesLength + 1}`);
+ valuesLength = valuesLength + 2;
+ values.push(column, item[column] as string);
+ }
+
+ let query = `UPDATE $1:name.$2:name SET ${updates.join(', ')} WHERE ${condition}`;
+
+ const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[];
+
+ [query, values] = addReturning(query, outputColumns, values);
+
+ queries.push({ query, values });
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts
new file mode 100644
index 0000000000..972f171d9c
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts
@@ -0,0 +1,217 @@
+import type { IExecuteFunctions } from 'n8n-core';
+import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
+
+import { updateDisplayOptions } from '../../../../../utils/utilities';
+
+import type {
+ PgpDatabase,
+ QueriesRunner,
+ QueryValues,
+ QueryWithValues,
+} from '../../helpers/interfaces';
+
+import {
+ addReturning,
+ checkItemAgainstSchema,
+ getTableSchema,
+ prepareItem,
+ replaceEmptyStringsByNulls,
+} from '../../helpers/utils';
+
+import { optionsCollection } from '../common.descriptions';
+
+const properties: INodeProperties[] = [
+ {
+ displayName: 'Data Mode',
+ name: 'dataMode',
+ type: 'options',
+ options: [
+ {
+ name: 'Auto-Map Input Data to Columns',
+ value: 'autoMapInputData',
+ description: 'Use when node input properties names exactly match the table column names',
+ },
+ {
+ name: 'Map Each Column Manually',
+ value: 'defineBelow',
+ description: 'Set the value for each destination column manually',
+ },
+ ],
+ default: 'autoMapInputData',
+ description:
+ 'Whether to map node input properties and the table data automatically or manually',
+ },
+ {
+ displayName: `
+ In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names.
+ `,
+ name: 'notice',
+ type: 'notice',
+ default: '',
+ displayOptions: {
+ show: {
+ dataMode: ['autoMapInputData'],
+ },
+ },
+ },
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased, n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column to Match On',
+ name: 'columnToMatchOn',
+ type: 'options',
+ required: true,
+ description:
+ 'The column to compare when finding the rows to update. Choose from the list, or specify an ID using an expression.',
+ typeOptions: {
+ loadOptionsMethod: 'getColumns',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ default: '',
+ hint: "Used to find the correct row(s) to update. Doesn't get changed. Has to be unique.",
+ },
+ {
+ displayName: 'Value of Column to Match On',
+ name: 'valueToMatchOn',
+ type: 'string',
+ default: '',
+ description:
+ 'Rows with a value in the specified "Column to Match On" that corresponds to the value in this field will be updated. New rows will be created for non-matching items.',
+ displayOptions: {
+ show: {
+ dataMode: ['defineBelow'],
+ },
+ },
+ },
+ {
+ displayName: 'Values to Send',
+ name: 'valuesToSend',
+ placeholder: 'Add Value',
+ type: 'fixedCollection',
+ typeOptions: {
+ multipleValueButtonText: 'Add Value',
+ multipleValues: true,
+ },
+ displayOptions: {
+ show: {
+ dataMode: ['defineBelow'],
+ },
+ },
+ default: {},
+ options: [
+ {
+ displayName: 'Values',
+ name: 'values',
+ values: [
+ {
+ // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options
+ displayName: 'Column',
+ name: 'column',
+ type: 'options',
+ description:
+ 'Choose from the list, or specify an ID using an expression',
+ typeOptions: {
+ loadOptionsMethod: 'getColumnsWithoutColumnToMatchOn',
+ loadOptionsDependsOn: ['schema.value', 'table.value'],
+ },
+ default: [],
+ },
+ {
+ displayName: 'Value',
+ name: 'value',
+ type: 'string',
+ default: '',
+ },
+ ],
+ },
+ ],
+ },
+ optionsCollection,
+];
+
+const displayOptions = {
+ show: {
+ resource: ['database'],
+ operation: ['upsert'],
+ },
+ hide: {
+ table: [''],
+ },
+};
+
+export const description = updateDisplayOptions(displayOptions, properties);
+
+export async function execute(
+ this: IExecuteFunctions,
+ runQueries: QueriesRunner,
+ items: INodeExecutionData[],
+ nodeOptions: IDataObject,
+ db: PgpDatabase,
+): Promise {
+ items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
+
+ const queries: QueryWithValues[] = [];
+
+ for (let i = 0; i < items.length; i++) {
+ const schema = this.getNodeParameter('schema', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', i, undefined, {
+ extractValue: true,
+ }) as string;
+
+ const columnToMatchOn = this.getNodeParameter('columnToMatchOn', i) as string;
+
+ const dataMode = this.getNodeParameter('dataMode', i) as string;
+
+ let item: IDataObject = {};
+
+ if (dataMode === 'autoMapInputData') {
+ item = items[i].json;
+ }
+
+ if (dataMode === 'defineBelow') {
+ const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject)
+ .values as IDataObject[];
+
+ item = prepareItem(valuesToSend);
+
+ item[columnToMatchOn] = this.getNodeParameter('valueToMatchOn', i) as string;
+ }
+
+ const tableSchema = await getTableSchema(db, schema, table);
+
+ item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
+
+ let values: QueryValues = [schema, table];
+
+ let valuesLength = values.length + 1;
+ const onConflict = ` ON CONFLICT ($${valuesLength}:name) DO UPDATE `;
+ valuesLength = valuesLength + 1;
+ values.push(columnToMatchOn);
+
+ const insertQuery = `INSERT INTO $1:name.$2:name($${valuesLength}:name) VALUES($${valuesLength}:csv)${onConflict}`;
+ valuesLength = valuesLength + 1;
+ values.push(item);
+
+ const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn);
+
+ const updates: string[] = [];
+
+ for (const column of updateColumns) {
+ updates.push(`$${valuesLength}:name = $${valuesLength + 1}`);
+ valuesLength = valuesLength + 2;
+ values.push(column, item[column] as string);
+ }
+
+ let query = `${insertQuery} SET ${updates.join(', ')}`;
+
+ const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[];
+
+ [query, values] = addReturning(query, outputColumns, values);
+
+ queries.push({ query, values });
+ }
+
+ return runQueries(queries, items, nodeOptions);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts b/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts
new file mode 100644
index 0000000000..80a11ce0f8
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts
@@ -0,0 +1,9 @@
+import type { AllEntities, Entity } from 'n8n-workflow';
+
+type PostgresMap = {
+ database: 'deleteTable' | 'executeQuery' | 'insert' | 'select' | 'update' | 'upsert';
+};
+
+export type PostgresType = AllEntities;
+
+export type PostgresDatabaseType = Entity;
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/router.ts b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts
new file mode 100644
index 0000000000..68289d5a87
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts
@@ -0,0 +1,67 @@
+import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+
+import type { PostgresType } from './node.type';
+
+import * as database from './database/Database.resource';
+import { Connections } from '../transport';
+import { configureQueryRunner } from '../helpers/utils';
+import type { ConnectionsData } from '../helpers/interfaces';
+
+export async function router(this: IExecuteFunctions): Promise {
+ let returnData: INodeExecutionData[] = [];
+
+ const items = this.getInputData();
+ const resource = this.getNodeParameter('resource', 0);
+ const operation = this.getNodeParameter('operation', 0);
+
+ const credentials = await this.getCredentials('postgres');
+ const options = this.getNodeParameter('options', 0, {});
+
+ const { db, pgp, sshClient } = (await Connections.getInstance(
+ credentials,
+ options,
+ true,
+ )) as ConnectionsData;
+
+ const runQueries = configureQueryRunner(
+ this.getNode(),
+ this.helpers.constructExecutionMetaData,
+ this.continueOnFail(),
+ pgp,
+ db,
+ );
+
+ const postgresNodeData = {
+ resource,
+ operation,
+ } as PostgresType;
+
+ try {
+ switch (postgresNodeData.resource) {
+ case 'database':
+ returnData = await database[postgresNodeData.operation].execute.call(
+ this,
+ runQueries,
+ items,
+ options,
+ db,
+ );
+ break;
+ default:
+ throw new NodeOperationError(
+ this.getNode(),
+ `The operation "${operation}" is not supported!`,
+ );
+ }
+ } catch (error) {
+ throw error;
+ } finally {
+ if (sshClient) {
+ sshClient.end();
+ }
+ pgp.end();
+ }
+
+ return this.prepareOutputData(returnData);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts b/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts
new file mode 100644
index 0000000000..031f178d2e
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts
@@ -0,0 +1,42 @@
+/* eslint-disable n8n-nodes-base/node-filename-against-convention */
+import type { INodeTypeDescription } from 'n8n-workflow';
+
+import * as database from './database/Database.resource';
+
+export const versionDescription: INodeTypeDescription = {
+ displayName: 'Postgres',
+ name: 'postgres',
+ icon: 'file:postgres.svg',
+ group: ['input'],
+ version: 2,
+ subtitle: '={{ $parameter["operation"] }}',
+ description: 'Get, add and update data in Postgres',
+ defaults: {
+ name: 'Postgres',
+ },
+ inputs: ['main'],
+ outputs: ['main'],
+ credentials: [
+ {
+ name: 'postgres',
+ required: true,
+ testedBy: 'postgresConnectionTest',
+ },
+ ],
+ properties: [
+ {
+ displayName: 'Resource',
+ name: 'resource',
+ type: 'hidden',
+ noDataExpression: true,
+ options: [
+ {
+ name: 'Database',
+ value: 'database',
+ },
+ ],
+ default: 'database',
+ },
+ ...database.description,
+ ],
+};
diff --git a/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
new file mode 100644
index 0000000000..e96cae5b1e
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts
@@ -0,0 +1,37 @@
+import type {
+ IDataObject,
+ INodeExecutionData,
+ IPairedItemData,
+ NodeExecutionWithMetadata,
+} from 'n8n-workflow';
+import type pgPromise from 'pg-promise';
+import type pg from 'pg-promise/typescript/pg-subset';
+import type { Client } from 'ssh2';
+
+export type QueryMode = 'single' | 'transaction' | 'independently';
+
+export type QueryValue = string | number | IDataObject | string[];
+export type QueryValues = QueryValue[];
+export type QueryWithValues = { query: string; values?: QueryValues };
+
+export type WhereClause = { column: string; condition: string; value: string | number };
+export type SortRule = { column: string; direction: string };
+export type ColumnInfo = { column_name: string; data_type: string; is_nullable: string };
+
+export type PgpClient = pgPromise.IMain<{}, pg.IClient>;
+export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>;
+export type PgpConnectionParameters = pg.IConnectionParameters;
+export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient; sshClient?: Client };
+
+export type ConstructExecutionMetaData = (
+ inputData: INodeExecutionData[],
+ options: {
+ itemData: IPairedItemData | IPairedItemData[];
+ },
+) => NodeExecutionWithMetadata[];
+
+export type QueriesRunner = (
+ queries: QueryWithValues[],
+ items: INodeExecutionData[],
+ options: IDataObject,
+) => Promise;
diff --git a/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts b/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts
new file mode 100644
index 0000000000..152c4a344f
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts
@@ -0,0 +1,360 @@
+import type { IDataObject, INode, INodeExecutionData } from 'n8n-workflow';
+import { NodeOperationError } from 'n8n-workflow';
+
+import type {
+ ColumnInfo,
+ ConstructExecutionMetaData,
+ PgpClient,
+ PgpDatabase,
+ QueryMode,
+ QueryValues,
+ QueryWithValues,
+ SortRule,
+ WhereClause,
+} from './interfaces';
+
+export function wrapData(data: IDataObject | IDataObject[]): INodeExecutionData[] {
+ if (!Array.isArray(data)) {
+ return [{ json: data }];
+ }
+ return data.map((item) => ({
+ json: item,
+ }));
+}
+
+export function prepareErrorItem(
+ items: INodeExecutionData[],
+ error: IDataObject | NodeOperationError | Error,
+ index: number,
+) {
+ return {
+ json: { message: error.message, item: { ...items[index].json }, error: { ...error } },
+ pairedItem: { item: index },
+ } as INodeExecutionData;
+}
+
+export function parsePostgresError(
+ node: INode,
+ error: any,
+ queries: QueryWithValues[],
+ itemIndex?: number,
+) {
+ if (error.message.includes('syntax error at or near') && queries.length) {
+ try {
+ const snippet = error.message.match(/syntax error at or near "(.*)"/)[1] as string;
+ const failedQureryIndex = queries.findIndex((query) => query.query.includes(snippet));
+
+ if (failedQureryIndex !== -1) {
+ if (!itemIndex) {
+ itemIndex = failedQureryIndex;
+ }
+ const failedQuery = queries[failedQureryIndex].query;
+ const lines = failedQuery.split('\n');
+ const lineIndex = lines.findIndex((line) => line.includes(snippet));
+ const errorMessage = `Syntax error at line ${lineIndex + 1} near "${snippet}"`;
+ error.message = errorMessage;
+ }
+ } catch {}
+ }
+
+ let message = error.message;
+ const errorDescription = error.description ? error.description : error.detail || error.hint;
+ let description = errorDescription;
+
+ if (!description && queries[itemIndex || 0]?.query) {
+ description = `Failed query: ${queries[itemIndex || 0].query}`;
+ }
+
+ if (error.message.includes('ECONNREFUSED')) {
+ message = 'Connection refused';
+ try {
+ description = error.message.split('ECONNREFUSED ')[1].trim();
+ } catch (e) {}
+ }
+
+ if (error.message.includes('ENOTFOUND')) {
+ message = 'Host not found';
+ try {
+ description = error.message.split('ENOTFOUND ')[1].trim();
+ } catch (e) {}
+ }
+
+ if (error.message.includes('ETIMEDOUT')) {
+ message = 'Connection timed out';
+ try {
+ description = error.message.split('ETIMEDOUT ')[1].trim();
+ } catch (e) {}
+ }
+
+ return new NodeOperationError(node, error as Error, {
+ message,
+ description,
+ itemIndex,
+ });
+}
+
+export function addWhereClauses(
+ node: INode,
+ itemIndex: number,
+ query: string,
+ clauses: WhereClause[],
+ replacements: QueryValues,
+ combineConditions: string,
+): [string, QueryValues] {
+ if (clauses.length === 0) return [query, replacements];
+
+ let combineWith = 'AND';
+
+ if (combineConditions === 'OR') {
+ combineWith = 'OR';
+ }
+
+ let replacementIndex = replacements.length + 1;
+
+ let whereQuery = ' WHERE';
+ const values: QueryValues = [];
+
+ clauses.forEach((clause, index) => {
+ if (clause.condition === 'equal') {
+ clause.condition = '=';
+ }
+ if (['>', '<', '>=', '<='].includes(clause.condition)) {
+ const value = Number(clause.value);
+
+ if (Number.isNaN(value)) {
+ throw new NodeOperationError(
+ node,
+ `Operator in entry ${index + 1} of 'Select Rows' works with numbers, but value ${
+ clause.value
+ } is not a number`,
+ {
+ itemIndex,
+ },
+ );
+ }
+
+ clause.value = value;
+ }
+ const columnReplacement = `$${replacementIndex}:name`;
+ values.push(clause.column);
+ replacementIndex = replacementIndex + 1;
+
+ let valueReplacement = '';
+ if (clause.condition !== 'IS NULL') {
+ valueReplacement = ` $${replacementIndex}`;
+ values.push(clause.value);
+ replacementIndex = replacementIndex + 1;
+ }
+
+ const operator = index === clauses.length - 1 ? '' : ` ${combineWith}`;
+
+ whereQuery += ` ${columnReplacement} ${clause.condition}${valueReplacement}${operator}`;
+ });
+
+ return [`${query}${whereQuery}`, replacements.concat(...values)];
+}
+
+export function addSortRules(
+ query: string,
+ rules: SortRule[],
+ replacements: QueryValues,
+): [string, QueryValues] {
+ if (rules.length === 0) return [query, replacements];
+
+ let replacementIndex = replacements.length + 1;
+
+ let orderByQuery = ' ORDER BY';
+ const values: string[] = [];
+
+ rules.forEach((rule, index) => {
+ const columnReplacement = `$${replacementIndex}:name`;
+ values.push(rule.column);
+ replacementIndex = replacementIndex + 1;
+
+ const endWith = index === rules.length - 1 ? '' : ',';
+
+ const sortDirection = rule.direction === 'DESC' ? 'DESC' : 'ASC';
+
+ orderByQuery += ` ${columnReplacement} ${sortDirection}${endWith}`;
+ });
+
+ return [`${query}${orderByQuery}`, replacements.concat(...values)];
+}
+
+export function addReturning(
+ query: string,
+ outputColumns: string[],
+ replacements: QueryValues,
+): [string, QueryValues] {
+ if (outputColumns.includes('*')) return [`${query} RETURNING *`, replacements];
+
+ const replacementIndex = replacements.length + 1;
+
+ return [`${query} RETURNING $${replacementIndex}:name`, [...replacements, outputColumns]];
+}
+
+export const configureQueryRunner =
+ (
+ node: INode,
+ constructExecutionMetaData: ConstructExecutionMetaData,
+ continueOnFail: boolean,
+ pgp: PgpClient,
+ db: PgpDatabase,
+ ) =>
+ async (queries: QueryWithValues[], items: INodeExecutionData[], options: IDataObject) => {
+ let returnData: INodeExecutionData[] = [];
+
+ const queryBatching = (options.queryBatching as QueryMode) || 'single';
+
+ if (queryBatching === 'single') {
+ try {
+ returnData = (await db.multi(pgp.helpers.concat(queries)))
+ .map((result, i) => {
+ return constructExecutionMetaData(wrapData(result as IDataObject[]), {
+ itemData: { item: i },
+ });
+ })
+ .flat();
+ returnData = returnData.length ? returnData : [{ json: { success: true } }];
+ } catch (err) {
+ const error = parsePostgresError(node, err, queries);
+ if (!continueOnFail) throw error;
+
+ return [
+ {
+ json: {
+ message: error.message,
+ error: { ...error },
+ },
+ },
+ ];
+ }
+ }
+
+ if (queryBatching === 'transaction') {
+ returnData = await db.tx(async (transaction) => {
+ const result: INodeExecutionData[] = [];
+ for (let i = 0; i < queries.length; i++) {
+ try {
+ const transactionResult: IDataObject[] = await transaction.any(
+ queries[i].query,
+ queries[i].values,
+ );
+
+ const executionData = constructExecutionMetaData(
+ wrapData(transactionResult.length ? transactionResult : [{ success: true }]),
+ { itemData: { item: i } },
+ );
+
+ result.push(...executionData);
+ } catch (err) {
+ const error = parsePostgresError(node, err, queries, i);
+ if (!continueOnFail) throw error;
+ result.push(prepareErrorItem(items, error, i));
+ return result;
+ }
+ }
+ return result;
+ });
+ }
+
+ if (queryBatching === 'independently') {
+ returnData = await db.task(async (t) => {
+ const result: INodeExecutionData[] = [];
+ for (let i = 0; i < queries.length; i++) {
+ try {
+ const transactionResult: IDataObject[] = await t.any(
+ queries[i].query,
+ queries[i].values,
+ );
+
+ const executionData = constructExecutionMetaData(
+ wrapData(transactionResult.length ? transactionResult : [{ success: true }]),
+ { itemData: { item: i } },
+ );
+
+ result.push(...executionData);
+ } catch (err) {
+ const error = parsePostgresError(node, err, queries, i);
+ if (!continueOnFail) throw error;
+ result.push(prepareErrorItem(items, error, i));
+ }
+ }
+ return result;
+ });
+ }
+
+ return returnData;
+ };
+
+export function replaceEmptyStringsByNulls(
+ items: INodeExecutionData[],
+ replace?: boolean,
+): INodeExecutionData[] {
+ if (!replace) return items;
+
+ const returnData: INodeExecutionData[] = items.map((item) => {
+ const newItem = { ...item };
+ const keys = Object.keys(newItem.json);
+
+ for (const key of keys) {
+ if (newItem.json[key] === '') {
+ newItem.json[key] = null;
+ }
+ }
+
+ return newItem;
+ });
+
+ return returnData;
+}
+
+export function prepareItem(values: IDataObject[]) {
+ const item = values.reduce((acc, { column, value }) => {
+ acc[column as string] = value;
+ return acc;
+ }, {} as IDataObject);
+
+ return item;
+}
+
+export async function getTableSchema(
+ db: PgpDatabase,
+ schema: string,
+ table: string,
+): Promise {
+ const columns = await db.any(
+ 'SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2',
+ [schema, table],
+ );
+
+ return columns;
+}
+
+export function checkItemAgainstSchema(
+ node: INode,
+ item: IDataObject,
+ columnsInfo: ColumnInfo[],
+ index: number,
+) {
+ if (columnsInfo.length === 0) return item;
+ const schema = columnsInfo.reduce((acc, { column_name, data_type, is_nullable }) => {
+ acc[column_name] = { type: data_type.toUpperCase(), nullable: is_nullable === 'YES' };
+ return acc;
+ }, {} as IDataObject);
+
+ for (const key of Object.keys(item)) {
+ if (schema[key] === undefined) {
+ throw new NodeOperationError(node, `Column '${key}' does not exist in selected table`, {
+ itemIndex: index,
+ });
+ }
+ if (item[key] === null && !(schema[key] as IDataObject)?.nullable) {
+ throw new NodeOperationError(node, `Column '${key}' is not nullable`, {
+ itemIndex: index,
+ });
+ }
+ }
+
+ return item;
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
new file mode 100644
index 0000000000..614f732859
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
@@ -0,0 +1,68 @@
+import type {
+ ICredentialsDecrypted,
+ ICredentialTestFunctions,
+ IDataObject,
+ INodeCredentialTestResult,
+} from 'n8n-workflow';
+
+import { Connections } from '../transport';
+
+import { Client } from 'ssh2';
+import type { ConnectionsData, PgpClient } from '../helpers/interfaces';
+
+export async function postgresConnectionTest(
+ this: ICredentialTestFunctions,
+ credential: ICredentialsDecrypted,
+): Promise {
+ const credentials = credential.data as IDataObject;
+
+ let sshClientCreated: Client | undefined = new Client();
+ let pgpClientCreated: PgpClient | undefined;
+
+ try {
+ const { db, pgp, sshClient } = (await Connections.getInstance(
+ credentials,
+ {},
+ true,
+ sshClientCreated,
+ )) as ConnectionsData;
+
+ sshClientCreated = sshClient;
+ pgpClientCreated = pgp;
+
+ await db.connect();
+ } catch (error) {
+ let message = error.message as string;
+
+ if (error.message.includes('ECONNREFUSED')) {
+ message = 'Connection refused';
+ }
+
+ if (error.message.includes('ENOTFOUND')) {
+ message = 'Host not found, please check your host name';
+ }
+
+ if (error.message.includes('ETIMEDOUT')) {
+ message = 'Connection timed out';
+ }
+
+ return {
+ status: 'Error',
+ message,
+ };
+ } finally {
+ if (sshClientCreated) {
+ sshClientCreated.end();
+ }
+ if (pgpClientCreated) {
+ pgpClientCreated.end();
+ }
+
+ //set the connection instance to null so that it can be recreated
+ await Connections.getInstance({}, {}, false, undefined, true);
+ }
+ return {
+ status: 'OK',
+ message: 'Connection successful!',
+ };
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/index.ts b/packages/nodes-base/nodes/Postgres/v2/methods/index.ts
new file mode 100644
index 0000000000..8d2d0278dd
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/methods/index.ts
@@ -0,0 +1,3 @@
+export * as credentialTest from './credentialTest';
+export * as listSearch from './listSearch';
+export * as loadOptions from './loadOptions';
diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
new file mode 100644
index 0000000000..8a8b35ab39
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
@@ -0,0 +1,47 @@
+import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
+import type { ConnectionsData } from '../helpers/interfaces';
+import { Connections } from '../transport';
+
+export async function schemaSearch(this: ILoadOptionsFunctions): Promise {
+ const credentials = await this.getCredentials('postgres');
+
+ const { db } = (await Connections.getInstance(credentials)) as ConnectionsData;
+
+ try {
+ const response = await db.any('SELECT schema_name FROM information_schema.schemata');
+
+ return {
+ results: response.map((schema) => ({
+ name: schema.schema_name as string,
+ value: schema.schema_name as string,
+ })),
+ };
+ } catch (error) {
+ throw error;
+ }
+}
+export async function tableSearch(this: ILoadOptionsFunctions): Promise {
+ const credentials = await this.getCredentials('postgres');
+
+ const { db } = (await Connections.getInstance(credentials)) as ConnectionsData;
+
+ const schema = this.getNodeParameter('schema', 0, {
+ extractValue: true,
+ }) as string;
+
+ try {
+ const response = await db.any(
+ 'SELECT table_name FROM information_schema.tables WHERE table_schema=$1',
+ [schema],
+ );
+
+ return {
+ results: response.map((table) => ({
+ name: table.table_name as string,
+ value: table.table_name as string,
+ })),
+ };
+ } catch (error) {
+ throw error;
+ }
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
new file mode 100644
index 0000000000..acc6fb9247
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
@@ -0,0 +1,46 @@
+import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
+import type { ConnectionsData } from '../helpers/interfaces';
+import { getTableSchema } from '../helpers/utils';
+import { Connections } from '../transport';
+
+export async function getColumns(this: ILoadOptionsFunctions): Promise {
+ const credentials = await this.getCredentials('postgres');
+
+ const { db } = (await Connections.getInstance(credentials)) as ConnectionsData;
+
+ const schema = this.getNodeParameter('schema', 0, {
+ extractValue: true,
+ }) as string;
+
+ const table = this.getNodeParameter('table', 0, {
+ extractValue: true,
+ }) as string;
+
+ try {
+ const columns = await getTableSchema(db, schema, table);
+
+ return columns.map((column) => ({
+ name: column.column_name,
+ value: column.column_name,
+ description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
+ }));
+ } catch (error) {
+ throw error;
+ }
+}
+
+export async function getColumnsMultiOptions(
+ this: ILoadOptionsFunctions,
+): Promise {
+ const returnData = await getColumns.call(this);
+ const returnAll = { name: '*', value: '*', description: 'All columns' };
+ return [returnAll, ...returnData];
+}
+
+export async function getColumnsWithoutColumnToMatchOn(
+ this: ILoadOptionsFunctions,
+): Promise {
+ const columnToMatchOn = this.getNodeParameter('columnToMatchOn') as string;
+ const returnData = await getColumns.call(this);
+ return returnData.filter((column) => column.value !== columnToMatchOn);
+}
diff --git a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts
new file mode 100644
index 0000000000..76240a05ab
--- /dev/null
+++ b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts
@@ -0,0 +1,209 @@
+import type { IDataObject } from 'n8n-workflow';
+
+import { Client } from 'ssh2';
+import type { ConnectConfig } from 'ssh2';
+
+import type { Server } from 'net';
+import { createServer } from 'net';
+
+import pgPromise from 'pg-promise';
+
+import { rm, writeFile } from 'fs/promises';
+import { file } from 'tmp-promise';
+
+import type { PgpClient, PgpDatabase } from '../helpers/interfaces';
+
+async function createSshConnectConfig(credentials: IDataObject) {
+ if (credentials.sshAuthenticateWith === 'password') {
+ return {
+ host: credentials.sshHost as string,
+ port: credentials.sshPort as number,
+ username: credentials.sshUser as string,
+ password: credentials.sshPassword as string,
+ } as ConnectConfig;
+ } else {
+ const { path } = await file({ prefix: 'n8n-ssh-' });
+ await writeFile(path, credentials.privateKey as string);
+
+ const options: ConnectConfig = {
+ host: credentials.host as string,
+ username: credentials.username as string,
+ port: credentials.port as number,
+ privateKey: path,
+ };
+
+ if (credentials.passphrase) {
+ options.passphrase = credentials.passphrase as string;
+ }
+
+ return options;
+ }
+}
+
+async function configurePostgres(
+ credentials: IDataObject,
+ options: IDataObject = {},
+ createdSshClient?: Client,
+) {
+ const pgp = pgPromise();
+
+ if (options.largeNumbersOutput === 'numbers') {
+ pgp.pg.types.setTypeParser(20, (value: string) => {
+ return parseInt(value, 10);
+ });
+ pgp.pg.types.setTypeParser(1700, (value: string) => {
+ return parseFloat(value);
+ });
+ }
+
+ const dbConfig: IDataObject = {
+ host: credentials.host as string,
+ port: credentials.port as number,
+ database: credentials.database as string,
+ user: credentials.user as string,
+ password: credentials.password as string,
+ };
+
+ if (options.connectionTimeout) {
+ dbConfig.connectionTimeoutMillis = (options.connectionTimeout as number) * 1000;
+ }
+
+ if (credentials.allowUnauthorizedCerts === true) {
+ dbConfig.ssl = {
+ rejectUnauthorized: false,
+ };
+ } else {
+ dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
+ dbConfig.sslmode = (credentials.ssl as string) || 'disable';
+ }
+
+ if (!credentials.sshTunnel) {
+ const db = pgp(dbConfig);
+ return { db, pgp };
+ } else {
+ const sshClient = createdSshClient || new Client();
+
+ const tunnelConfig = await createSshConnectConfig(credentials);
+
+ const localHost = '127.0.0.1';
+ const localPort = credentials.sshPostgresPort as number;
+
+ let proxy: Server | undefined;
+
+ const db = await new Promise((resolve, reject) => {
+ let sshClientReady = false;
+
+ proxy = createServer((socket) => {
+ if (!sshClientReady) return socket.destroy();
+
+ sshClient.forwardOut(
+ socket.remoteAddress as string,
+ socket.remotePort as number,
+ credentials.host as string,
+ credentials.port as number,
+ (err, stream) => {
+ if (err) reject(err);
+
+ socket.pipe(stream);
+ stream.pipe(socket);
+ },
+ );
+ }).listen(localPort, localHost);
+
+ proxy.on('error', (err) => {
+ reject(err);
+ });
+
+ sshClient.connect(tunnelConfig);
+
+ sshClient.on('ready', () => {
+ sshClientReady = true;
+
+ const updatedDbConfig = {
+ ...dbConfig,
+ port: localPort,
+ host: localHost,
+ };
+ const dbConnection = pgp(updatedDbConfig);
+ resolve(dbConnection);
+ });
+
+ sshClient.on('error', (err) => {
+ reject(err);
+ });
+
+ sshClient.on('end', async () => {
+ if (tunnelConfig.privateKey) {
+ await rm(tunnelConfig.privateKey as string, { force: true });
+ }
+ if (proxy) proxy.close();
+ });
+ }).catch((err) => {
+ if (proxy) proxy.close();
+ if (sshClient) sshClient.end();
+
+ let message = err.message;
+ let description = err.description;
+
+ if (err.message.includes('ECONNREFUSED')) {
+ message = 'Connection refused';
+ try {
+ description = err.message.split('ECONNREFUSED ')[1].trim();
+ } catch (e) {}
+ }
+
+ if (err.message.includes('ENOTFOUND')) {
+ message = 'Host not found';
+ try {
+ description = err.message.split('ENOTFOUND ')[1].trim();
+ } catch (e) {}
+ }
+
+ if (err.message.includes('ETIMEDOUT')) {
+ message = 'Connection timed out';
+ try {
+ description = err.message.split('ETIMEDOUT ')[1].trim();
+ } catch (e) {}
+ }
+
+ err.message = message;
+ err.description = description;
+ throw err;
+ });
+
+ return { db, pgp, sshClient };
+ }
+}
+
+export const Connections = (function () {
+ let instance: { db: PgpDatabase; pgp: PgpClient; sshClient?: Client } | null = null;
+
+ return {
+ async getInstance(
+ credentials: IDataObject = {},
+ options: IDataObject = {},
+ reload = false,
+ createdSshClient?: Client,
+ nulify = false,
+ ) {
+ if (nulify) {
+ instance = null;
+ return instance;
+ }
+
+ if (instance !== null && reload) {
+ if (instance.sshClient) {
+ instance.sshClient.end();
+ }
+ instance.pgp.end();
+
+ instance = null;
+ }
+
+ if (instance === null && Object.keys(credentials).length) {
+ instance = await configurePostgres(credentials, options, createdSshClient);
+ }
+ return instance;
+ },
+ };
+})();
diff --git a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts
index b21699c725..5987d4007e 100644
--- a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts
+++ b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts
@@ -8,7 +8,7 @@ import { NodeOperationError } from 'n8n-workflow';
import pgPromise from 'pg-promise';
-import { pgInsert, pgQuery } from '../Postgres/Postgres.node.functions';
+import { pgInsert, pgQuery } from '../Postgres/v1/genericFunctions';
export class QuestDb implements INodeType {
description: INodeTypeDescription = {
diff --git a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts
index 3381fa25b0..5a43f7c500 100644
--- a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts
+++ b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts
@@ -6,7 +6,7 @@ import type {
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
-import { pgInsert, pgQuery, pgUpdate } from '../Postgres/Postgres.node.functions';
+import { pgInsert, pgQuery, pgUpdate } from '../Postgres/v1/genericFunctions';
import pgPromise from 'pg-promise';
diff --git a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js
index db409d1165..29b72932c8 100644
--- a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js
+++ b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js
@@ -1,4 +1,4 @@
-const PostgresFun = require('../../../nodes/Postgres/Postgres.node.functions');
+const PostgresFun = require('../../../nodes/Postgres/v1/genericFunctions');
const pgPromise = require('pg-promise');
describe('pgUpdate', () => {