diff --git a/packages/nodes-base/credentials/Postgres.credentials.ts b/packages/nodes-base/credentials/Postgres.credentials.ts index e217b41c05..caf139e3d1 100644 --- a/packages/nodes-base/credentials/Postgres.credentials.ts +++ b/packages/nodes-base/credentials/Postgres.credentials.ts @@ -81,5 +81,120 @@ export class Postgres implements ICredentialType { type: 'number', default: 5432, }, + { + displayName: 'SSH Tunnel', + name: 'sshTunnel', + type: 'boolean', + default: false, + }, + { + displayName: 'SSH Authenticate with', + name: 'sshAuthenticateWith', + type: 'options', + default: 'password', + options: [ + { + name: 'Password', + value: 'password', + }, + { + name: 'Private Key', + value: 'privateKey', + }, + ], + displayOptions: { + show: { + sshTunnel: [true], + }, + }, + }, + { + displayName: 'SSH Host', + name: 'sshHost', + type: 'string', + default: 'localhost', + displayOptions: { + show: { + sshTunnel: [true], + }, + }, + }, + { + displayName: 'SSH Port', + name: 'sshPort', + type: 'number', + default: 22, + displayOptions: { + show: { + sshTunnel: [true], + }, + }, + }, + { + displayName: 'SSH Postgres Port', + name: 'sshPostgresPort', + type: 'number', + default: 5432, + displayOptions: { + show: { + sshTunnel: [true], + }, + }, + }, + { + displayName: 'SSH User', + name: 'sshUser', + type: 'string', + default: 'root', + displayOptions: { + show: { + sshTunnel: [true], + }, + }, + }, + { + displayName: 'SSH Password', + name: 'sshPassword', + type: 'string', + typeOptions: { + password: true, + }, + default: '', + displayOptions: { + show: { + sshTunnel: [true], + sshAuthenticateWith: ['password'], + }, + }, + }, + { + displayName: 'Private Key', + name: 'privateKey', + type: 'string', + typeOptions: { + rows: 4, + password: true, + }, + default: '', + displayOptions: { + show: { + sshTunnel: [true], + sshAuthenticateWith: ['privateKey'], + }, + }, + }, + { + displayName: 'Passphrase', + name: 'passphrase', + type: 'string', + default: '', + description: 'Passphase used to create the key, if no passphase was used leave empty', + displayOptions: { + show: { + sshTunnel: [true], + sshAuthenticateWith: ['privateKey'], + }, + }, + }, ]; } diff --git a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts index 489bbdb617..a11ff18670 100644 --- a/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts +++ b/packages/nodes-base/nodes/CrateDb/CrateDb.node.ts @@ -13,7 +13,7 @@ import { pgInsert, pgQuery, pgUpdate, -} from '../Postgres/Postgres.node.functions'; +} from '../Postgres/v1/genericFunctions'; import pgPromise from 'pg-promise'; diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.ts b/packages/nodes-base/nodes/Postgres/Postgres.node.ts index ed1ca9171b..69aee12ada 100644 --- a/packages/nodes-base/nodes/Postgres/Postgres.node.ts +++ b/packages/nodes-base/nodes/Postgres/Postgres.node.ts @@ -1,407 +1,25 @@ -import type { - IExecuteFunctions, - ICredentialsDecrypted, - ICredentialTestFunctions, - IDataObject, - INodeCredentialTestResult, - INodeExecutionData, - INodeType, - INodeTypeDescription, -} from 'n8n-workflow'; -import { NodeOperationError } from 'n8n-workflow'; +import type { INodeTypeBaseDescription, IVersionedNodeType } from 'n8n-workflow'; +import { VersionedNodeType } from 'n8n-workflow'; -import pgPromise from 'pg-promise'; +import { PostgresV1 } from './v1/PostgresV1.node'; +import { PostgresV2 } from './v2/PostgresV2.node'; -import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './Postgres.node.functions'; - -export class Postgres implements INodeType { - description: INodeTypeDescription = { - displayName: 'Postgres', - name: 'postgres', - icon: 'file:postgres.svg', - group: ['input'], - version: 1, - description: 'Get, add and update data in Postgres', - defaults: { - name: 'Postgres', - }, - inputs: ['main'], - outputs: ['main'], - credentials: [ - { - name: 'postgres', - required: true, - testedBy: 'postgresConnectionTest', - }, - ], - properties: [ - { - displayName: 'Operation', - name: 'operation', - type: 'options', - noDataExpression: true, - options: [ - { - name: 'Execute Query', - value: 'executeQuery', - description: 'Execute an SQL query', - action: 'Execute a SQL query', - }, - { - name: 'Insert', - value: 'insert', - description: 'Insert rows in database', - action: 'Insert rows in database', - }, - { - name: 'Update', - value: 'update', - description: 'Update rows in database', - action: 'Update rows in database', - }, - ], - default: 'insert', - }, - - // ---------------------------------- - // executeQuery - // ---------------------------------- - { - displayName: 'Query', - name: 'query', - type: 'string', - displayOptions: { - show: { - operation: ['executeQuery'], - }, - }, - default: '', - placeholder: 'SELECT id, name FROM product WHERE quantity > $1 AND price <= $2', - required: true, - description: - 'The SQL query to execute. You can use n8n expressions or $1 and $2 in conjunction with query parameters.', - }, - // ---------------------------------- - // insert - // ---------------------------------- - { - displayName: 'Schema', - name: 'schema', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: 'public', - required: true, - description: 'Name of the schema the table belongs to', - }, - { - displayName: 'Table', - name: 'table', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '', - required: true, - description: 'Name of the table in which to insert data to', - }, - { - displayName: 'Columns', - name: 'columns', - type: 'string', - displayOptions: { - show: { - operation: ['insert'], - }, - }, - default: '', - // eslint-disable-next-line n8n-nodes-base/node-param-placeholder-miscased-id - placeholder: 'id:int,name:text,description', - // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id - description: - 'Comma-separated list of the properties which should used as columns for the new rows. You can use type casting with colons (:) like id:int.', - }, - - // ---------------------------------- - // update - // ---------------------------------- - { - displayName: 'Schema', - name: 'schema', - type: 'string', - displayOptions: { - show: { - operation: ['update'], - }, - }, - default: 'public', - description: 'Name of the schema the table belongs to', - }, - { - displayName: 'Table', - name: 'table', - type: 'string', - displayOptions: { - show: { - operation: ['update'], - }, - }, - default: '', - required: true, - description: 'Name of the table in which to update data in', - }, - { - displayName: 'Update Key', - name: 'updateKey', - type: 'string', - displayOptions: { - show: { - operation: ['update'], - }, - }, - default: 'id', - required: true, - // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id - description: - 'Comma-separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', - }, - { - displayName: 'Columns', - name: 'columns', - type: 'string', - displayOptions: { - show: { - operation: ['update'], - }, - }, - default: '', - placeholder: 'name:text,description', - // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id - description: - 'Comma-separated list of the properties which should used as columns for rows to update. You can use type casting with colons (:) like id:int.', - }, - - // ---------------------------------- - // insert,update - // ---------------------------------- - { - displayName: 'Return Fields', - name: 'returnFields', - type: 'string', - requiresDataPath: 'multiple', - displayOptions: { - show: { - operation: ['insert', 'update'], - }, - }, - default: '*', - description: 'Comma-separated list of the fields that the operation will return', - }, - // ---------------------------------- - // Additional fields - // ---------------------------------- - { - displayName: 'Additional Fields', - name: 'additionalFields', - type: 'collection', - placeholder: 'Add Field', - default: {}, - options: [ - { - displayName: 'Mode', - name: 'mode', - type: 'options', - options: [ - { - name: 'Independently', - value: 'independently', - description: 'Execute each query independently', - }, - { - name: 'Multiple Queries', - value: 'multiple', - description: 'Default. Sends multiple queries at once to database.', - }, - { - name: 'Transaction', - value: 'transaction', - description: 'Executes all queries in a single transaction', - }, - ], - default: 'multiple', - description: - 'The way queries should be sent to database. Can be used in conjunction with Continue on Fail. See the docs for more examples', - }, - { - displayName: 'Output Large-Format Numbers As', - name: 'largeNumbersOutput', - type: 'options', - options: [ - { - name: 'Numbers', - value: 'numbers', - }, - { - name: 'Text', - value: 'text', - description: - 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)', - }, - ], - hint: 'Applies to NUMERIC and BIGINT columns only', - default: 'text', - }, - { - displayName: 'Query Parameters', - name: 'queryParams', - type: 'string', - displayOptions: { - show: { - '/operation': ['executeQuery'], - }, - }, - default: '', - placeholder: 'quantity,price', - description: - 'Comma-separated list of properties which should be used as query parameters', - }, - ], - }, - ], - }; - - methods = { - credentialTest: { - async postgresConnectionTest( - this: ICredentialTestFunctions, - credential: ICredentialsDecrypted, - ): Promise { - const credentials = credential.data as IDataObject; - try { - const pgp = pgPromise(); - const config: IDataObject = { - host: credentials.host as string, - port: credentials.port as number, - database: credentials.database as string, - user: credentials.user as string, - password: credentials.password as string, - }; - - if (credentials.allowUnauthorizedCerts === true) { - config.ssl = { - rejectUnauthorized: false, - }; - } else { - config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); - config.sslmode = (credentials.ssl as string) || 'disable'; - } - - const db = pgp(config); - await db.connect(); - pgp.end(); - } catch (error) { - return { - status: 'Error', - message: error.message, - }; - } - return { - status: 'OK', - message: 'Connection successful!', - }; - }, - }, - }; - - async execute(this: IExecuteFunctions): Promise { - const credentials = await this.getCredentials('postgres'); - const largeNumbersOutput = this.getNodeParameter( - 'additionalFields.largeNumbersOutput', - 0, - '', - ) as string; - - const pgp = pgPromise(); - - if (largeNumbersOutput === 'numbers') { - pgp.pg.types.setTypeParser(20, (value: string) => { - return parseInt(value, 10); - }); - pgp.pg.types.setTypeParser(1700, (value: string) => { - return parseFloat(value); - }); - } - - const config: IDataObject = { - host: credentials.host as string, - port: credentials.port as number, - database: credentials.database as string, - user: credentials.user as string, - password: credentials.password as string, +export class Postgres extends VersionedNodeType { + constructor() { + const baseDescription: INodeTypeBaseDescription = { + displayName: 'Postgres', + name: 'postgres', + icon: 'file:postgres.svg', + group: ['input'], + defaultVersion: 2, + description: 'Get, add and update data in Postgres', }; - if (credentials.allowUnauthorizedCerts === true) { - config.ssl = { - rejectUnauthorized: false, - }; - } else { - config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); - config.sslmode = (credentials.ssl as string) || 'disable'; - } + const nodeVersions: IVersionedNodeType['nodeVersions'] = { + 1: new PostgresV1(baseDescription), + 2: new PostgresV2(baseDescription), + }; - const db = pgp(config); - - let returnItems: INodeExecutionData[] = []; - - const items = this.getInputData(); - const operation = this.getNodeParameter('operation', 0); - - if (operation === 'executeQuery') { - // ---------------------------------- - // executeQuery - // ---------------------------------- - - const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail()); - returnItems = queryResult as INodeExecutionData[]; - } else if (operation === 'insert') { - // ---------------------------------- - // insert - // ---------------------------------- - - const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail()); - - // returnItems = this.helpers.returnJsonArray(insertData); - returnItems = insertData as INodeExecutionData[]; - } else if (operation === 'update') { - // ---------------------------------- - // update - // ---------------------------------- - - const updateItems = await pgUpdate( - this.getNodeParameter, - pgp, - db, - items, - this.continueOnFail(), - ); - - returnItems = wrapData(updateItems); - } else { - pgp.end(); - throw new NodeOperationError( - this.getNode(), - `The operation "${operation}" is not supported!`, - ); - } - - // Close the connection - pgp.end(); - - return this.prepareOutputData(returnItems); + super(nodeVersions, baseDescription); } } diff --git a/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts new file mode 100644 index 0000000000..4de24f6b4c --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/test/v2/operations.test.ts @@ -0,0 +1,817 @@ +import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode } from 'n8n-workflow'; + +import type { ColumnInfo, PgpDatabase, QueriesRunner } from '../../v2/helpers/interfaces'; + +import { get } from 'lodash'; + +import * as deleteTable from '../../v2/actions/database/deleteTable.operation'; +import * as executeQuery from '../../v2/actions/database/executeQuery.operation'; +import * as insert from '../../v2/actions/database/insert.operation'; +import * as select from '../../v2/actions/database/select.operation'; +import * as update from '../../v2/actions/database/update.operation'; +import * as upsert from '../../v2/actions/database/upsert.operation'; + +const runQueries: QueriesRunner = jest.fn(); + +const node: INode = { + id: '1', + name: 'Postgres node', + typeVersion: 2, + type: 'n8n-nodes-base.postgres', + position: [60, 760], + parameters: { + operation: 'executeQuery', + }, +}; + +const items = [{ json: {} }]; + +const createMockExecuteFunction = (nodeParameters: IDataObject) => { + const fakeExecuteFunction = { + getNodeParameter( + parameterName: string, + _itemIndex: number, + fallbackValue?: IDataObject | undefined, + options?: IGetNodeParameterOptions | undefined, + ) { + const parameter = options?.extractValue ? `${parameterName}.value` : parameterName; + return get(nodeParameters, parameter, fallbackValue); + }, + getNode() { + return node; + }, + } as unknown as IExecuteFunctions; + return fakeExecuteFunction; +}; + +const createMockDb = (columnInfo: ColumnInfo[]) => { + return { + async any() { + return columnInfo; + }, + } as unknown as PgpDatabase; +}; + +// if node parameters copied from canvas all default parameters has to be added manualy as JSON would not have them +describe('Test PostgresV2, deleteTable operation', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('deleteCommand: delete, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'deleteTable', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + cachedResultName: 'my_table', + }, + deleteCommand: 'delete', + where: { + values: [ + { + column: 'id', + condition: 'LIKE', + value: '1', + }, + ], + }, + options: {}, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await deleteTable.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'DELETE FROM $1:name.$2:name WHERE $3:name LIKE $4', + values: ['public', 'my_table', 'id', '1'], + }, + ], + items, + nodeOptions, + ); + }); + + it('deleteCommand: truncate, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'deleteTable', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + cachedResultName: 'my_table', + }, + deleteCommand: 'truncate', + restartSequences: true, + options: { + cascade: true, + }, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await deleteTable.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'TRUNCATE TABLE $1:name.$2:name RESTART IDENTITY CASCADE', + values: ['public', 'my_table'], + }, + ], + items, + nodeOptions, + ); + }); + + it('deleteCommand: drop, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'deleteTable', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + cachedResultName: 'my_table', + }, + deleteCommand: 'drop', + options: {}, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await deleteTable.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'DROP TABLE IF EXISTS $1:name.$2:name', + values: ['public', 'my_table'], + }, + ], + items, + nodeOptions, + ); + }); +}); + +describe('Test PostgresV2, executeQuery operation', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'executeQuery', + query: 'select * from $1:name;', + options: { + queryReplacement: 'my_table', + }, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await executeQuery.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [{ query: 'select * from $1:name;', values: ['my_table'] }], + items, + nodeOptions, + ); + }); +}); + +describe('Test PostgresV2, insert operation', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('dataMode: define, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'defineBelow', + valuesToSend: { + values: [ + { + column: 'json', + value: '{"test":15}', + }, + { + column: 'foo', + value: 'select 5', + }, + { + column: 'id', + value: '4', + }, + ], + }, + options: {}, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await insert.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *', + values: ['public', 'my_table', { json: '{"test":15}', foo: 'select 5', id: '4' }], + }, + ], + items, + nodeOptions, + ); + }); + + it('dataMode: autoMapInputData, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'autoMapInputData', + options: {}, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const inputItems = [ + { + json: { + id: 1, + json: { + test: 15, + }, + foo: 'data 1', + }, + }, + { + json: { + id: 2, + json: { + test: 10, + }, + foo: 'data 2', + }, + }, + { + json: { + id: 3, + json: { + test: 5, + }, + foo: 'data 3', + }, + }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await insert.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + inputItems, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *', + values: ['public', 'my_table', { id: 1, json: { test: 15 }, foo: 'data 1' }], + }, + { + query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *', + values: ['public', 'my_table', { id: 2, json: { test: 10 }, foo: 'data 2' }], + }, + { + query: 'INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv) RETURNING *', + values: ['public', 'my_table', { id: 3, json: { test: 5 }, foo: 'data 3' }], + }, + ], + inputItems, + nodeOptions, + ); + }); +}); + +describe('Test PostgresV2, select operation', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('returnAll, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'select', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + cachedResultName: 'my_table', + }, + returnAll: true, + options: {}, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await select.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: 'SELECT * FROM $1:name.$2:name', + values: ['public', 'my_table'], + }, + ], + items, + nodeOptions, + ); + }); + + it('limit, whereClauses, sortRules, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'select', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + cachedResultName: 'my_table', + }, + limit: 5, + where: { + values: [ + { + column: 'id', + condition: '>=', + value: 2, + }, + { + column: 'foo', + condition: 'equal', + value: 'data 2', + }, + ], + }, + sort: { + values: [ + { + column: 'id', + }, + ], + }, + options: { + outputColumns: ['json', 'id'], + }, + }; + const nodeOptions = nodeParameters.options as IDataObject; + + await select.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: + 'SELECT $3:name FROM $1:name.$2:name WHERE $4:name >= $5 AND $6:name = $7 ORDER BY $8:name ASC LIMIT 5', + values: ['public', 'my_table', ['json', 'id'], 'id', 2, 'foo', 'data 2', 'id'], + }, + ], + items, + nodeOptions, + ); + }); +}); + +describe('Test PostgresV2, update operation', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('dataMode: define, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'update', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'defineBelow', + columnToMatchOn: 'id', + valueToMatchOn: '1', + valuesToSend: { + values: [ + { + column: 'json', + value: { text: 'some text' }, + }, + { + column: 'foo', + value: 'updated', + }, + ], + }, + options: { + outputColumns: ['json', 'foo'], + }, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await update.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: + 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING $9:name', + values: [ + 'public', + 'my_table', + 'id', + '1', + 'json', + { text: 'some text' }, + 'foo', + 'updated', + ['json', 'foo'], + ], + }, + ], + items, + nodeOptions, + ); + }); + + it('dataMode: autoMapInputData, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'update', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'autoMapInputData', + columnToMatchOn: 'id', + options: {}, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const inputItems = [ + { + json: { + id: 1, + json: { + test: 15, + }, + foo: 'data 1', + }, + }, + { + json: { + id: 2, + json: { + test: 10, + }, + foo: 'data 2', + }, + }, + { + json: { + id: 3, + json: { + test: 5, + }, + foo: 'data 3', + }, + }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await update.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + inputItems, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: + 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *', + values: ['public', 'my_table', 'id', 1, 'json', { test: 15 }, 'foo', 'data 1'], + }, + { + query: + 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *', + values: ['public', 'my_table', 'id', 2, 'json', { test: 10 }, 'foo', 'data 2'], + }, + { + query: + 'UPDATE $1:name.$2:name SET $5:name = $6, $7:name = $8 WHERE $3:name = $4 RETURNING *', + values: ['public', 'my_table', 'id', 3, 'json', { test: 5 }, 'foo', 'data 3'], + }, + ], + inputItems, + nodeOptions, + ); + }); +}); + +describe('Test PostgresV2, upsert operation', () => { + it('dataMode: define, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'upsert', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'defineBelow', + columnToMatchOn: 'id', + valueToMatchOn: '5', + valuesToSend: { + values: [ + { + column: 'json', + value: '{ "test": 5 }', + }, + { + column: 'foo', + value: 'data 5', + }, + ], + }, + options: { + outputColumns: ['json'], + }, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await upsert.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + items, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: + 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING $9:name', + values: [ + 'public', + 'my_table', + 'id', + { json: '{ "test": 5 }', foo: 'data 5', id: '5' }, + 'json', + '{ "test": 5 }', + 'foo', + 'data 5', + ['json'], + ], + }, + ], + items, + nodeOptions, + ); + }); + + it('dataMode: autoMapInputData, should call runQueries with', async () => { + const nodeParameters: IDataObject = { + operation: 'upsert', + schema: { + __rl: true, + mode: 'list', + value: 'public', + }, + table: { + __rl: true, + value: 'my_table', + mode: 'list', + }, + dataMode: 'autoMapInputData', + columnToMatchOn: 'id', + options: {}, + }; + const columnsInfo: ColumnInfo[] = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const inputItems = [ + { + json: { + id: 1, + json: { + test: 15, + }, + foo: 'data 1', + }, + }, + { + json: { + id: 2, + json: { + test: 10, + }, + foo: 'data 2', + }, + }, + { + json: { + id: 3, + json: { + test: 5, + }, + foo: 'data 3', + }, + }, + ]; + + const nodeOptions = nodeParameters.options as IDataObject; + + await upsert.execute.call( + createMockExecuteFunction(nodeParameters), + runQueries, + inputItems, + nodeOptions, + createMockDb(columnsInfo), + ); + + expect(runQueries).toHaveBeenCalledWith( + [ + { + query: + 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *', + values: [ + 'public', + 'my_table', + 'id', + { id: 1, json: { test: 15 }, foo: 'data 1' }, + 'json', + { test: 15 }, + 'foo', + 'data 1', + ], + }, + { + query: + 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *', + values: [ + 'public', + 'my_table', + 'id', + { id: 2, json: { test: 10 }, foo: 'data 2' }, + 'json', + { test: 10 }, + 'foo', + 'data 2', + ], + }, + { + query: + 'INSERT INTO $1:name.$2:name($4:name) VALUES($4:csv) ON CONFLICT ($3:name) DO UPDATE SET $5:name = $6, $7:name = $8 RETURNING *', + values: [ + 'public', + 'my_table', + 'id', + { id: 3, json: { test: 5 }, foo: 'data 3' }, + 'json', + { test: 5 }, + 'foo', + 'data 3', + ], + }, + ], + inputItems, + nodeOptions, + ); + }); +}); diff --git a/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts new file mode 100644 index 0000000000..9bf94936fe --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/test/v2/runQueries.test.ts @@ -0,0 +1,53 @@ +import { constructExecutionMetaData } from 'n8n-core'; +import type { IDataObject, INode } from 'n8n-workflow'; + +import type { PgpDatabase } from '../../v2/helpers/interfaces'; +import { configureQueryRunner } from '../../v2/helpers/utils'; + +import pgPromise from 'pg-promise'; + +const node: INode = { + id: '1', + name: 'Postgres node', + typeVersion: 2, + type: 'n8n-nodes-base.postgres', + position: [60, 760], + parameters: { + operation: 'executeQuery', + }, +}; + +const createMockDb = (returnData: IDataObject | IDataObject[]) => { + return { + async any() { + return returnData; + }, + async multi() { + return returnData; + }, + async tx() { + return returnData; + }, + async task() { + return returnData; + }, + } as unknown as PgpDatabase; +}; + +describe('Test PostgresV2, runQueries', () => { + it('should execute, should return success true', async () => { + const pgp = pgPromise(); + const db = createMockDb([]); + + const dbMultiSpy = jest.spyOn(db, 'multi'); + + const runQueries = configureQueryRunner(node, constructExecutionMetaData, false, pgp, db); + + const result = await runQueries([{ query: 'SELECT * FROM table', values: [] }], [], {}); + + expect(result).toBeDefined(); + expect(result).toHaveLength(1); + expect(result).toEqual([{ json: { success: true } }]); + expect(dbMultiSpy).toHaveBeenCalledWith('SELECT * FROM table'); + }); +}); diff --git a/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts b/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts new file mode 100644 index 0000000000..8a707d592b --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/test/v2/utils.test.ts @@ -0,0 +1,375 @@ +import type { IDataObject, INode } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; +import { + addSortRules, + addReturning, + addWhereClauses, + checkItemAgainstSchema, + parsePostgresError, + prepareErrorItem, + prepareItem, + replaceEmptyStringsByNulls, + wrapData, +} from '../../v2/helpers/utils'; + +const node: INode = { + id: '1', + name: 'Postgres node', + typeVersion: 2, + type: 'n8n-nodes-base.postgres', + position: [60, 760], + parameters: { + operation: 'executeQuery', + }, +}; + +describe('Test PostgresV2, wrapData', () => { + it('should wrap object in json', () => { + const data = { + id: 1, + name: 'Name', + }; + const wrappedData = wrapData(data); + expect(wrappedData).toBeDefined(); + expect(wrappedData).toEqual([{ json: data }]); + }); + it('should wrap each object in array in json', () => { + const data = [ + { + id: 1, + name: 'Name', + }, + { + id: 2, + name: 'Name 2', + }, + ]; + const wrappedData = wrapData(data); + expect(wrappedData).toBeDefined(); + expect(wrappedData).toEqual([{ json: data[0] }, { json: data[1] }]); + }); + it('json key from source should be inside json', () => { + const data = { + json: { + id: 1, + name: 'Name', + }, + }; + const wrappedData = wrapData(data); + expect(wrappedData).toBeDefined(); + expect(wrappedData).toEqual([{ json: data }]); + expect(Object.keys(wrappedData[0].json)).toContain('json'); + }); +}); + +describe('Test PostgresV2, prepareErrorItem', () => { + it('should return error info item', () => { + const items = [ + { + json: { + id: 1, + name: 'Name 1', + }, + }, + { + json: { + id: 2, + name: 'Name 2', + }, + }, + ]; + + const error = new Error('Test error'); + const item = prepareErrorItem(items, error, 1); + expect(item).toBeDefined(); + + expect((item.json.item as IDataObject)?.id).toEqual(2); + expect(item.json.message).toEqual('Test error'); + expect(item.json.error).toBeDefined(); + }); +}); + +describe('Test PostgresV2, parsePostgresError', () => { + it('should return NodeOperationError', () => { + const error = new Error('Test error'); + + const parsedError = parsePostgresError(node, error, [], 1); + expect(parsedError).toBeDefined(); + expect(parsedError.message).toEqual('Test error'); + expect(parsedError instanceof NodeOperationError).toEqual(true); + }); + + it('should update message that includes ECONNREFUSED', () => { + const error = new Error('ECONNREFUSED'); + + const parsedError = parsePostgresError(node, error, [], 1); + expect(parsedError).toBeDefined(); + expect(parsedError.message).toEqual('Connection refused'); + expect(parsedError instanceof NodeOperationError).toEqual(true); + }); + + it('should update message with syntax error', () => { + // eslint-disable-next-line n8n-local-rules/no-unneeded-backticks + const errorMessage = String.raw`syntax error at or near "seelect"`; + const error = new Error(); + error.message = errorMessage; + + const parsedError = parsePostgresError(node, error, [ + { query: 'seelect * from my_table', values: [] }, + ]); + expect(parsedError).toBeDefined(); + expect(parsedError.message).toEqual('Syntax error at line 1 near "seelect"'); + expect(parsedError instanceof NodeOperationError).toEqual(true); + }); +}); + +describe('Test PostgresV2, addWhereClauses', () => { + it('should add where clauses to query', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const whereClauses = [{ column: 'id', condition: 'equal', value: '1' }]; + + const [updatedQuery, updatedValues] = addWhereClauses( + node, + 0, + query, + whereClauses, + values, + 'AND', + ); + + expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name WHERE $3:name = $4'); + expect(updatedValues).toEqual(['public', 'my_table', 'id', '1']); + }); + + it('should combine where clauses by OR', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const whereClauses = [ + { column: 'id', condition: 'equal', value: '1' }, + { column: 'foo', condition: 'equal', value: 'select 2' }, + ]; + + const [updatedQuery, updatedValues] = addWhereClauses( + node, + 0, + query, + whereClauses, + values, + 'OR', + ); + + expect(updatedQuery).toEqual( + 'SELECT * FROM $1:name.$2:name WHERE $3:name = $4 OR $5:name = $6', + ); + expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'select 2']); + }); + + it('should ignore incorect combine conition ad use AND', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const whereClauses = [ + { column: 'id', condition: 'equal', value: '1' }, + { column: 'foo', condition: 'equal', value: 'select 2' }, + ]; + + const [updatedQuery, updatedValues] = addWhereClauses( + node, + 0, + query, + whereClauses, + values, + 'SELECT * FROM my_table', + ); + + expect(updatedQuery).toEqual( + 'SELECT * FROM $1:name.$2:name WHERE $3:name = $4 AND $5:name = $6', + ); + expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'select 2']); + }); +}); + +describe('Test PostgresV2, addSortRules', () => { + it('should ORDER BY ASC', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const sortRules = [{ column: 'id', direction: 'ASC' }]; + + const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values); + + expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC'); + expect(updatedValues).toEqual(['public', 'my_table', 'id']); + }); + it('should ORDER BY DESC', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const sortRules = [{ column: 'id', direction: 'DESC' }]; + + const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values); + + expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name DESC'); + expect(updatedValues).toEqual(['public', 'my_table', 'id']); + }); + it('should ignore incorect direction', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const sortRules = [{ column: 'id', direction: 'SELECT * FROM my_table' }]; + + const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values); + + expect(updatedQuery).toEqual('SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC'); + expect(updatedValues).toEqual(['public', 'my_table', 'id']); + }); + it('should add multiple sort rules', () => { + const query = 'SELECT * FROM $1:name.$2:name'; + const values = ['public', 'my_table']; + const sortRules = [ + { column: 'id', direction: 'ASC' }, + { column: 'foo', direction: 'DESC' }, + ]; + + const [updatedQuery, updatedValues] = addSortRules(query, sortRules, values); + + expect(updatedQuery).toEqual( + 'SELECT * FROM $1:name.$2:name ORDER BY $3:name ASC, $4:name DESC', + ); + expect(updatedValues).toEqual(['public', 'my_table', 'id', 'foo']); + }); +}); + +describe('Test PostgresV2, addReturning', () => { + it('should add RETURNING', () => { + const query = 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4'; + const values = ['public', 'my_table', 'id', '1', 'foo', 'updated']; + const outputColumns = ['id', 'foo']; + + const [updatedQuery, updatedValues] = addReturning(query, outputColumns, values); + + expect(updatedQuery).toEqual( + 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4 RETURNING $7:name', + ); + expect(updatedValues).toEqual([ + 'public', + 'my_table', + 'id', + '1', + 'foo', + 'updated', + ['id', 'foo'], + ]); + }); + it('should add RETURNING *', () => { + const query = 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4'; + const values = ['public', 'my_table', 'id', '1', 'foo', 'updated']; + const outputColumns = ['id', 'foo', '*']; + + const [updatedQuery, updatedValues] = addReturning(query, outputColumns, values); + + expect(updatedQuery).toEqual( + 'UPDATE $1:name.$2:name SET $5:name = $6 WHERE $3:name = $4 RETURNING *', + ); + expect(updatedValues).toEqual(['public', 'my_table', 'id', '1', 'foo', 'updated']); + }); +}); + +describe('Test PostgresV2, replaceEmptyStringsByNulls', () => { + it('should replace empty string by null', () => { + const items = [ + { json: { foo: 'bar', bar: '', spam: undefined } }, + { json: { foo: '', bar: '', spam: '' } }, + { json: { foo: 0, bar: NaN, spam: false } }, + ]; + + const updatedItems = replaceEmptyStringsByNulls(items, true); + + expect(updatedItems).toBeDefined(); + expect(updatedItems).toEqual([ + { json: { foo: 'bar', bar: null, spam: undefined } }, + { json: { foo: null, bar: null, spam: null } }, + { json: { foo: 0, bar: NaN, spam: false } }, + ]); + }); + it('should do nothing', () => { + const items = [ + { json: { foo: 'bar', bar: '', spam: undefined } }, + { json: { foo: '', bar: '', spam: '' } }, + { json: { foo: 0, bar: NaN, spam: false } }, + ]; + + const updatedItems = replaceEmptyStringsByNulls(items); + + expect(updatedItems).toBeDefined(); + expect(updatedItems).toEqual(items); + }); +}); + +describe('Test PostgresV2, prepareItem', () => { + it('should convert fixedColections values to object', () => { + const values = [ + { + column: 'id', + value: '1', + }, + { + column: 'foo', + value: 'bar', + }, + { + column: 'bar', + value: 'foo', + }, + ]; + + const item = prepareItem(values); + + expect(item).toBeDefined(); + expect(item).toEqual({ + id: '1', + foo: 'bar', + bar: 'foo', + }); + }); +}); + +describe('Test PostgresV2, checkItemAgainstSchema', () => { + it('should not throw error', () => { + const item = { foo: 'updated', id: 2 }; + const columnsInfo = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + const result = checkItemAgainstSchema(node, item, columnsInfo, 0); + + expect(result).toBeDefined(); + expect(result).toEqual(item); + }); + it('should throw error on not existing column', () => { + const item = { foo: 'updated', bar: 'updated' }; + const columnsInfo = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'json', data_type: 'json', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + try { + checkItemAgainstSchema(node, item, columnsInfo, 0); + } catch (error) { + expect(error.message).toEqual("Column 'bar' does not exist in selected table"); + } + }); + it('should throw error on not nullable column', () => { + const item = { foo: null }; + const columnsInfo = [ + { column_name: 'id', data_type: 'integer', is_nullable: 'NO' }, + { column_name: 'foo', data_type: 'text', is_nullable: 'NO' }, + ]; + + try { + checkItemAgainstSchema(node, item, columnsInfo, 0); + } catch (error) { + expect(error.message).toEqual("Column 'foo' is not nullable"); + } + }); +}); diff --git a/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts b/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts new file mode 100644 index 0000000000..ef5cef53e6 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v1/PostgresV1.node.ts @@ -0,0 +1,424 @@ +/* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import type { IExecuteFunctions } from 'n8n-core'; +import type { + ICredentialsDecrypted, + ICredentialTestFunctions, + IDataObject, + INodeCredentialTestResult, + INodeExecutionData, + INodeType, + INodeTypeBaseDescription, + INodeTypeDescription, +} from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import pgPromise from 'pg-promise'; + +import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './genericFunctions'; + +const versionDescription: INodeTypeDescription = { + displayName: 'Postgres', + name: 'postgres', + icon: 'file:postgres.svg', + group: ['input'], + version: 1, + description: 'Get, add and update data in Postgres', + defaults: { + name: 'Postgres', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'postgres', + required: true, + testedBy: 'postgresConnectionTest', + }, + ], + properties: [ + { + displayName: 'Version 1', + name: 'versionNotice', + type: 'notice', + default: '', + }, + { + displayName: 'Operation', + name: 'operation', + type: 'options', + noDataExpression: true, + options: [ + { + name: 'Execute Query', + value: 'executeQuery', + description: 'Execute an SQL query', + action: 'Execute a SQL query', + }, + { + name: 'Insert', + value: 'insert', + description: 'Insert rows in database', + action: 'Insert rows in database', + }, + { + name: 'Update', + value: 'update', + description: 'Update rows in database', + action: 'Update rows in database', + }, + ], + default: 'insert', + }, + + // ---------------------------------- + // executeQuery + // ---------------------------------- + { + displayName: 'Query', + name: 'query', + type: 'string', + displayOptions: { + show: { + operation: ['executeQuery'], + }, + }, + default: '', + placeholder: 'SELECT id, name FROM product WHERE quantity > $1 AND price <= $2', + required: true, + description: + 'The SQL query to execute. You can use n8n expressions or $1 and $2 in conjunction with query parameters.', + }, + // ---------------------------------- + // insert + // ---------------------------------- + { + displayName: 'Schema', + name: 'schema', + type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: 'public', + required: true, + description: 'Name of the schema the table belongs to', + }, + { + displayName: 'Table', + name: 'table', + type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '', + required: true, + description: 'Name of the table in which to insert data to', + }, + { + displayName: 'Columns', + name: 'columns', + type: 'string', + displayOptions: { + show: { + operation: ['insert'], + }, + }, + default: '', + // eslint-disable-next-line n8n-nodes-base/node-param-placeholder-miscased-id + placeholder: 'id:int,name:text,description', + // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id + description: + 'Comma-separated list of the properties which should used as columns for the new rows. You can use type casting with colons (:) like id:int.', + }, + + // ---------------------------------- + // update + // ---------------------------------- + { + displayName: 'Schema', + name: 'schema', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: 'public', + description: 'Name of the schema the table belongs to', + }, + { + displayName: 'Table', + name: 'table', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: '', + required: true, + description: 'Name of the table in which to update data in', + }, + { + displayName: 'Update Key', + name: 'updateKey', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: 'id', + required: true, + // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id + description: + 'Comma-separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".', + }, + { + displayName: 'Columns', + name: 'columns', + type: 'string', + displayOptions: { + show: { + operation: ['update'], + }, + }, + default: '', + placeholder: 'name:text,description', + // eslint-disable-next-line n8n-nodes-base/node-param-description-miscased-id + description: + 'Comma-separated list of the properties which should used as columns for rows to update. You can use type casting with colons (:) like id:int.', + }, + + // ---------------------------------- + // insert,update + // ---------------------------------- + { + displayName: 'Return Fields', + name: 'returnFields', + type: 'string', + requiresDataPath: 'multiple', + displayOptions: { + show: { + operation: ['insert', 'update'], + }, + }, + default: '*', + description: 'Comma-separated list of the fields that the operation will return', + }, + // ---------------------------------- + // Additional fields + // ---------------------------------- + { + displayName: 'Additional Fields', + name: 'additionalFields', + type: 'collection', + placeholder: 'Add Field', + default: {}, + options: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Independently', + value: 'independently', + description: 'Execute each query independently', + }, + { + name: 'Multiple Queries', + value: 'multiple', + description: 'Default. Sends multiple queries at once to database.', + }, + { + name: 'Transaction', + value: 'transaction', + description: 'Executes all queries in a single transaction', + }, + ], + default: 'multiple', + description: + 'The way queries should be sent to database. Can be used in conjunction with Continue on Fail. See the docs for more examples', + }, + { + displayName: 'Output Large-Format Numbers As', + name: 'largeNumbersOutput', + type: 'options', + options: [ + { + name: 'Numbers', + value: 'numbers', + }, + { + name: 'Text', + value: 'text', + description: + 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)', + }, + ], + hint: 'Applies to NUMERIC and BIGINT columns only', + default: 'text', + }, + { + displayName: 'Query Parameters', + name: 'queryParams', + type: 'string', + displayOptions: { + show: { + '/operation': ['executeQuery'], + }, + }, + default: '', + placeholder: 'quantity,price', + description: + 'Comma-separated list of properties which should be used as query parameters', + }, + ], + }, + ], +}; + +export class PostgresV1 implements INodeType { + description: INodeTypeDescription; + + constructor(baseDescription: INodeTypeBaseDescription) { + this.description = { + ...baseDescription, + ...versionDescription, + }; + } + + methods = { + credentialTest: { + async postgresConnectionTest( + this: ICredentialTestFunctions, + credential: ICredentialsDecrypted, + ): Promise { + const credentials = credential.data as IDataObject; + try { + const pgp = pgPromise(); + const config: IDataObject = { + host: credentials.host as string, + port: credentials.port as number, + database: credentials.database as string, + user: credentials.user as string, + password: credentials.password as string, + }; + + if (credentials.allowUnauthorizedCerts === true) { + config.ssl = { + rejectUnauthorized: false, + }; + } else { + config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); + config.sslmode = (credentials.ssl as string) || 'disable'; + } + + const db = pgp(config); + await db.connect(); + pgp.end(); + } catch (error) { + return { + status: 'Error', + message: error.message, + }; + } + return { + status: 'OK', + message: 'Connection successful!', + }; + }, + }, + }; + + async execute(this: IExecuteFunctions): Promise { + const credentials = await this.getCredentials('postgres'); + const largeNumbersOutput = this.getNodeParameter( + 'additionalFields.largeNumbersOutput', + 0, + '', + ) as string; + + const pgp = pgPromise(); + + if (largeNumbersOutput === 'numbers') { + pgp.pg.types.setTypeParser(20, (value: string) => { + return parseInt(value, 10); + }); + pgp.pg.types.setTypeParser(1700, (value: string) => { + return parseFloat(value); + }); + } + + const config: IDataObject = { + host: credentials.host as string, + port: credentials.port as number, + database: credentials.database as string, + user: credentials.user as string, + password: credentials.password as string, + }; + + if (credentials.allowUnauthorizedCerts === true) { + config.ssl = { + rejectUnauthorized: false, + }; + } else { + config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); + config.sslmode = (credentials.ssl as string) || 'disable'; + } + + const db = pgp(config); + + let returnItems: INodeExecutionData[] = []; + + const items = this.getInputData(); + const operation = this.getNodeParameter('operation', 0); + + if (operation === 'executeQuery') { + // ---------------------------------- + // executeQuery + // ---------------------------------- + + const queryResult = await pgQueryV2.call(this, pgp, db, items, this.continueOnFail()); + returnItems = queryResult as INodeExecutionData[]; + } else if (operation === 'insert') { + // ---------------------------------- + // insert + // ---------------------------------- + + const insertData = await pgInsertV2.call(this, pgp, db, items, this.continueOnFail()); + + // returnItems = this.helpers.returnJsonArray(insertData); + returnItems = insertData as INodeExecutionData[]; + } else if (operation === 'update') { + // ---------------------------------- + // update + // ---------------------------------- + + const updateItems = await pgUpdate( + this.getNodeParameter, + pgp, + db, + items, + this.continueOnFail(), + ); + + returnItems = wrapData(updateItems); + } else { + pgp.end(); + throw new NodeOperationError( + this.getNode(), + `The operation "${operation}" is not supported!`, + ); + } + + // Close the connection + pgp.end(); + + return this.prepareOutputData(returnItems); + } +} diff --git a/packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts b/packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts similarity index 100% rename from packages/nodes-base/nodes/Postgres/Postgres.node.functions.ts rename to packages/nodes-base/nodes/Postgres/v1/genericFunctions.ts diff --git a/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts b/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts new file mode 100644 index 0000000000..e435f4ddbe --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/PostgresV2.node.ts @@ -0,0 +1,29 @@ +/* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import type { IExecuteFunctions } from 'n8n-core'; +import type { + INodeExecutionData, + INodeType, + INodeTypeBaseDescription, + INodeTypeDescription, +} from 'n8n-workflow'; +import { router } from './actions/router'; + +import { versionDescription } from './actions/versionDescription'; +import { credentialTest, listSearch, loadOptions } from './methods'; + +export class PostgresV2 implements INodeType { + description: INodeTypeDescription; + + constructor(baseDescription: INodeTypeBaseDescription) { + this.description = { + ...baseDescription, + ...versionDescription, + }; + } + + methods = { credentialTest, listSearch, loadOptions }; + + async execute(this: IExecuteFunctions): Promise { + return router.call(this); + } +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts new file mode 100644 index 0000000000..0ea7b35962 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/common.descriptions.ts @@ -0,0 +1,335 @@ +import type { INodeProperties } from 'n8n-workflow'; + +export const optionsCollection: INodeProperties = { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Cascade', + name: 'cascade', + type: 'boolean', + default: false, + description: + 'Whether to drop all objects that depend on the table, such as views and sequences', + displayOptions: { + show: { + '/operation': ['deleteTable'], + }, + hide: { + '/deleteCommand': ['delete'], + }, + }, + }, + { + displayName: 'Connection Timeout', + name: 'connectionTimeout', + type: 'number', + default: 30, + description: 'Number of seconds reserved for connecting to the database', + }, + { + displayName: 'Query Batching', + name: 'queryBatching', + type: 'options', + noDataExpression: true, + options: [ + { + name: 'Single Query', + value: 'single', + description: 'A single query for all incoming items', + }, + { + name: 'Independently', + value: 'independently', + description: 'Execute one query per incoming item of the run', + }, + { + name: 'Transaction', + value: 'transaction', + description: + 'Execute all queries in a transaction, if a failure occurs, all changes are rolled back', + }, + ], + default: 'single', + description: 'The way queries should be sent to the database', + }, + { + displayName: 'Query Parameters', + name: 'queryReplacement', + type: 'string', + default: '', + description: + 'Comma-separated list of the values you want to use as query parameters. More info.', + hint: 'Comma-separated list of values: reference them in your query as $1, $2, $3…', + placeholder: 'e.g. value1,value2,value3', + displayOptions: { + show: { '/operation': ['executeQuery'] }, + }, + }, + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-multi-options + displayName: 'Output Columns', + name: 'outputColumns', + type: 'multiOptions', + description: + 'Choose from the list, or specify IDs using an expression', + typeOptions: { + loadOptionsMethod: 'getColumnsMultiOptions', + loadOptionsDependsOn: ['table.value'], + }, + default: [], + displayOptions: { + show: { '/operation': ['select', 'insert', 'update', 'upsert'] }, + }, + }, + { + displayName: 'Output Large-Format Numbers As', + name: 'largeNumbersOutput', + type: 'options', + options: [ + { + name: 'Numbers', + value: 'numbers', + }, + { + name: 'Text', + value: 'text', + description: + 'Use this if you expect numbers longer than 16 digits (otherwise numbers may be incorrect)', + }, + ], + hint: 'Applies to NUMERIC and BIGINT columns only', + default: 'text', + }, + { + displayName: 'Skip on Conflict', + name: 'skipOnConflict', + type: 'boolean', + default: false, + description: + 'Whether to skip the row and do not throw error if a unique constraint or exclusion constraint is violated', + displayOptions: { + show: { + '/operation': ['insert'], + }, + }, + }, + { + displayName: 'Replace Empty Strings with NULL', + name: 'replaceEmptyStrings', + type: 'boolean', + default: false, + description: + 'Whether to replace empty strings with NULL in input, could be useful when data come from spreadsheet', + displayOptions: { + show: { + '/operation': ['insert', 'update', 'upsert', 'executeQuery'], + }, + }, + }, + ], +}; + +export const schemaRLC: INodeProperties = { + displayName: 'Schema', + name: 'schema', + type: 'resourceLocator', + default: { mode: 'list', value: 'public' }, + required: true, + placeholder: 'e.g. public', + description: 'The schema that contains the table you want to work on', + modes: [ + { + displayName: 'From List', + name: 'list', + type: 'list', + typeOptions: { + searchListMethod: 'schemaSearch', + }, + }, + { + displayName: 'By Name', + name: 'name', + type: 'string', + }, + ], +}; + +export const tableRLC: INodeProperties = { + displayName: 'Table', + name: 'table', + type: 'resourceLocator', + default: { mode: 'list', value: '' }, + required: true, + description: 'The table you want to work on', + modes: [ + { + displayName: 'From List', + name: 'list', + type: 'list', + typeOptions: { + searchListMethod: 'tableSearch', + }, + }, + { + displayName: 'By Name', + name: 'name', + type: 'string', + }, + ], +}; + +export const whereFixedCollection: INodeProperties = { + displayName: 'Select Rows', + name: 'where', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + placeholder: 'Add Condition', + default: {}, + description: 'If not set, all rows will be selected', + options: [ + { + displayName: 'Values', + name: 'values', + values: [ + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column', + name: 'column', + type: 'options', + description: + 'Choose from the list, or specify an ID using an expression', + default: '', + placeholder: 'e.g. ID', + typeOptions: { + loadOptionsMethod: 'getColumns', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + }, + { + displayName: 'Operator', + name: 'condition', + type: 'options', + description: + "The operator to check the column against. When using 'LIKE' operator percent sign ( %) matches zero or more characters, underscore ( _ ) matches any single character.", + // eslint-disable-next-line n8n-nodes-base/node-param-options-type-unsorted-items + options: [ + { + name: 'Equal', + value: 'equal', + }, + { + name: 'Not Equal', + value: '!=', + }, + { + name: 'Like', + value: 'LIKE', + }, + { + name: 'Greater Than', + value: '>', + }, + { + name: 'Less Than', + value: '<', + }, + { + name: 'Greater Than Or Equal', + value: '>=', + }, + { + name: 'Less Than Or Equal', + value: '<=', + }, + { + name: 'Is Null', + value: 'IS NULL', + }, + ], + default: 'equal', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], +}; + +export const sortFixedCollection: INodeProperties = { + displayName: 'Sort', + name: 'sort', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + placeholder: 'Add Sort Rule', + default: {}, + options: [ + { + displayName: 'Values', + name: 'values', + values: [ + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column', + name: 'column', + type: 'options', + description: + 'Choose from the list, or specify an ID using an expression', + default: '', + typeOptions: { + loadOptionsMethod: 'getColumns', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + }, + { + displayName: 'Direction', + name: 'direction', + type: 'options', + options: [ + { + name: 'ASC', + value: 'ASC', + }, + { + name: 'DESC', + value: 'DESC', + }, + ], + default: 'ASC', + }, + ], + }, + ], +}; + +export const combineConditionsCollection: INodeProperties = { + displayName: 'Combine Conditions', + name: 'combineConditions', + type: 'options', + description: + 'How to combine the conditions defined in "Select Rows": AND requires all conditions to be true, OR requires at least one condition to be true', + options: [ + { + name: 'AND', + value: 'AND', + description: 'Only rows that meet all the conditions are selected', + }, + { + name: 'OR', + value: 'OR', + description: 'Rows that meet at least one condition are selected', + }, + ], + default: 'AND', +}; diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts new file mode 100644 index 0000000000..125ebaa1d7 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/Database.resource.ts @@ -0,0 +1,74 @@ +import type { INodeProperties } from 'n8n-workflow'; +import { schemaRLC, tableRLC } from '../common.descriptions'; + +import * as deleteTable from './deleteTable.operation'; +import * as executeQuery from './executeQuery.operation'; +import * as insert from './insert.operation'; +import * as select from './select.operation'; +import * as update from './update.operation'; +import * as upsert from './upsert.operation'; + +export { deleteTable, executeQuery, insert, select, update, upsert }; + +export const description: INodeProperties[] = [ + { + displayName: 'Operation', + name: 'operation', + type: 'options', + noDataExpression: true, + options: [ + { + name: 'Delete', + value: 'deleteTable', + description: 'Delete an entire table or rows in a table', + action: 'Delete table or rows', + }, + { + name: 'Execute Query', + value: 'executeQuery', + description: 'Execute an SQL query', + action: 'Execute a SQL query', + }, + { + name: 'Insert', + value: 'insert', + description: 'Insert rows in a table', + action: 'Insert rows in a table', + }, + { + // eslint-disable-next-line n8n-nodes-base/node-param-option-name-wrong-for-upsert + name: 'Insert or Update', + value: 'upsert', + // eslint-disable-next-line n8n-nodes-base/node-param-description-wrong-for-upsert + description: 'Insert or update rows in a table', + action: 'Insert or update rows in a table', + }, + { + name: 'Select', + value: 'select', + description: 'Select rows from a table', + action: 'Select rows from a table', + }, + { + name: 'Update', + value: 'update', + description: 'Update rows in a table', + action: 'Update rows in a table', + }, + ], + displayOptions: { + show: { + resource: ['database'], + }, + }, + default: 'insert', + }, + { ...schemaRLC, displayOptions: { hide: { operation: ['executeQuery'] } } }, + { ...tableRLC, displayOptions: { hide: { operation: ['executeQuery'] } } }, + ...deleteTable.description, + ...executeQuery.description, + ...insert.description, + ...select.description, + ...update.description, + ...upsert.description, +]; diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts new file mode 100644 index 0000000000..c0d4eba9a4 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/deleteTable.operation.ts @@ -0,0 +1,159 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { + PgpDatabase, + QueriesRunner, + QueryValues, + QueryWithValues, + WhereClause, +} from '../../helpers/interfaces'; + +import { addWhereClauses } from '../../helpers/utils'; + +import { + combineConditionsCollection, + optionsCollection, + whereFixedCollection, +} from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Command', + name: 'deleteCommand', + type: 'options', + default: 'truncate', + options: [ + { + name: 'Truncate', + value: 'truncate', + description: "Only removes the table's data and preserves the table's structure", + }, + { + name: 'Delete', + value: 'delete', + description: + "Delete the rows that match the 'Select Rows' conditions below. If no selection is made, all rows in the table are deleted.", + }, + { + name: 'Drop', + value: 'drop', + description: "Deletes the table's data and also the table's structure permanently", + }, + ], + }, + { + displayName: 'Restart Sequences', + name: 'restartSequences', + type: 'boolean', + default: false, + description: 'Whether to reset identity (auto-increment) columns to their initial values', + displayOptions: { + show: { + deleteCommand: ['truncate'], + }, + }, + }, + { + ...whereFixedCollection, + displayOptions: { + show: { + deleteCommand: ['delete'], + }, + }, + }, + { + ...combineConditionsCollection, + displayOptions: { + show: { + deleteCommand: ['delete'], + }, + }, + }, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['deleteTable'], + }, + hide: { + table: [''], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + _db?: PgpDatabase, +): Promise { + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const options = this.getNodeParameter('options', i, {}); + + const schema = this.getNodeParameter('schema', i, undefined, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', i, undefined, { + extractValue: true, + }) as string; + + const deleteCommand = this.getNodeParameter('deleteCommand', i) as string; + + let query = ''; + let values: QueryValues = [schema, table]; + + if (deleteCommand === 'drop') { + const cascade = options.cascade ? ' CASCADE' : ''; + query = `DROP TABLE IF EXISTS $1:name.$2:name${cascade}`; + } + + if (deleteCommand === 'truncate') { + const identity = this.getNodeParameter('restartSequences', i, false) + ? ' RESTART IDENTITY' + : ''; + const cascade = options.cascade ? ' CASCADE' : ''; + query = `TRUNCATE TABLE $1:name.$2:name${identity}${cascade}`; + } + + if (deleteCommand === 'delete') { + const whereClauses = + ((this.getNodeParameter('where', i, []) as IDataObject).values as WhereClause[]) || []; + + const combineConditions = this.getNodeParameter('combineConditions', i, 'AND') as string; + + [query, values] = addWhereClauses( + this.getNode(), + i, + 'DELETE FROM $1:name.$2:name', + whereClauses, + values, + combineConditions, + ); + } + + if (query === '') { + throw new NodeOperationError( + this.getNode(), + 'Invalid delete command, only drop, delete and truncate are supported ', + { itemIndex: i }, + ); + } + + const queryWithValues = { query, values }; + + queries.push(queryWithValues); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts new file mode 100644 index 0000000000..41aedcf156 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/executeQuery.operation.ts @@ -0,0 +1,84 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { PgpDatabase, QueriesRunner, QueryWithValues } from '../../helpers/interfaces'; + +import { replaceEmptyStringsByNulls } from '../../helpers/utils'; + +import { optionsCollection } from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Query', + name: 'query', + type: 'string', + default: '', + placeholder: 'e.g. SELECT id, name FROM product WHERE quantity > $1 AND price <= $2', + required: true, + description: + "The SQL query to execute. You can use n8n expressions and $1, $2, $3, etc to refer to the 'Query Parameters' set in options below.", + typeOptions: { + rows: 3, + }, + hint: 'Prefer using query parameters over n8n expressions to avoid SQL injection attacks', + }, + { + displayName: ` + To use query parameters in your SQL query, reference them as $1, $2, $3, etc in the corresponding order. More info. + `, + name: 'notice', + type: 'notice', + default: '', + }, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['executeQuery'], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + _db?: PgpDatabase, +): Promise { + items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); + + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const query = this.getNodeParameter('query', i) as string; + + let values: IDataObject[] = []; + + let queryReplacement = this.getNodeParameter('options.queryReplacement', i, ''); + + if (typeof queryReplacement === 'string') { + queryReplacement = queryReplacement.split(',').map((entry) => entry.trim()); + } + + if (Array.isArray(queryReplacement)) { + values = queryReplacement as IDataObject[]; + } else { + throw new NodeOperationError( + this.getNode(), + 'Query Replacement must be a string of comma-separated values, or an array of values', + { itemIndex: i }, + ); + } + + queries.push({ query, values }); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts new file mode 100644 index 0000000000..a30b178bff --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/insert.operation.ts @@ -0,0 +1,172 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { + PgpDatabase, + QueriesRunner, + QueryValues, + QueryWithValues, +} from '../../helpers/interfaces'; + +import { + addReturning, + checkItemAgainstSchema, + getTableSchema, + prepareItem, + replaceEmptyStringsByNulls, +} from '../../helpers/utils'; + +import { optionsCollection } from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Data Mode', + name: 'dataMode', + type: 'options', + options: [ + { + name: 'Auto-Map Input Data to Columns', + value: 'autoMapInputData', + description: 'Use when node input properties names exactly match the table column names', + }, + { + name: 'Map Each Column Manually', + value: 'defineBelow', + description: 'Set the value for each destination column manually', + }, + ], + default: 'autoMapInputData', + description: + 'Whether to map node input properties and the table data automatically or manually', + }, + { + displayName: ` + In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names. + `, + name: 'notice', + type: 'notice', + default: '', + displayOptions: { + show: { + dataMode: ['autoMapInputData'], + }, + }, + }, + { + displayName: 'Values to Send', + name: 'valuesToSend', + placeholder: 'Add Value', + type: 'fixedCollection', + typeOptions: { + multipleValueButtonText: 'Add Value', + multipleValues: true, + }, + displayOptions: { + show: { + dataMode: ['defineBelow'], + }, + }, + default: {}, + options: [ + { + displayName: 'Values', + name: 'values', + values: [ + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column', + name: 'column', + type: 'options', + description: + 'Choose from the list, or specify an ID using an expression', + typeOptions: { + loadOptionsMethod: 'getColumns', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + default: [], + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['insert'], + }, + hide: { + table: [''], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + db: PgpDatabase, +): Promise { + items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); + + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const schema = this.getNodeParameter('schema', i, undefined, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', i, undefined, { + extractValue: true, + }) as string; + + const options = this.getNodeParameter('options', i, {}); + + let onConflict = ''; + if (options.skipOnConflict) { + onConflict = ' ON CONFLICT DO NOTHING'; + } + + let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`; + let values: QueryValues = [schema, table]; + + const dataMode = this.getNodeParameter('dataMode', i) as string; + + let item: IDataObject = {}; + + if (dataMode === 'autoMapInputData') { + item = items[i].json; + } + + if (dataMode === 'defineBelow') { + const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject) + .values as IDataObject[]; + + item = prepareItem(valuesToSend); + } + + const tableSchema = await getTableSchema(db, schema, table); + + values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i)); + + const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[]; + + [query, values] = addReturning(query, outputColumns, values); + + queries.push({ query, values }); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts new file mode 100644 index 0000000000..2844893dc9 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/select.operation.ts @@ -0,0 +1,134 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { + PgpDatabase, + QueriesRunner, + QueryValues, + QueryWithValues, + SortRule, + WhereClause, +} from '../../helpers/interfaces'; + +import { addSortRules, addWhereClauses, replaceEmptyStringsByNulls } from '../../helpers/utils'; + +import { + combineConditionsCollection, + optionsCollection, + sortFixedCollection, + whereFixedCollection, +} from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Return All', + name: 'returnAll', + type: 'boolean', + default: false, + description: 'Whether to return all results or only up to a given limit', + displayOptions: { + show: { + resource: ['event'], + operation: ['getAll'], + }, + }, + }, + { + displayName: 'Limit', + name: 'limit', + type: 'number', + default: 50, + description: 'Max number of results to return', + typeOptions: { + minValue: 1, + }, + displayOptions: { + show: { + returnAll: [false], + }, + }, + }, + whereFixedCollection, + combineConditionsCollection, + sortFixedCollection, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['select'], + }, + hide: { + table: [''], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + _db?: PgpDatabase, +): Promise { + items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); + + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const schema = this.getNodeParameter('schema', i, undefined, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', i, undefined, { + extractValue: true, + }) as string; + + let values: QueryValues = [schema, table]; + + const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[]; + + let query = ''; + + if (outputColumns.includes('*')) { + query = 'SELECT * FROM $1:name.$2:name'; + } else { + values.push(outputColumns); + query = `SELECT $${values.length}:name FROM $1:name.$2:name`; + } + + const whereClauses = + ((this.getNodeParameter('where', i, []) as IDataObject).values as WhereClause[]) || []; + + const combineConditions = this.getNodeParameter('combineConditions', i, 'AND') as string; + + [query, values] = addWhereClauses( + this.getNode(), + i, + query, + whereClauses, + values, + combineConditions, + ); + + const sortRules = + ((this.getNodeParameter('sort', i, []) as IDataObject).values as SortRule[]) || []; + + [query, values] = addSortRules(query, sortRules, values); + + const returnAll = this.getNodeParameter('returnAll', i, false); + if (!returnAll) { + const limit = this.getNodeParameter('limit', i, 50); + query += ` LIMIT ${limit}`; + } + + const queryWithValues = { query, values }; + queries.push(queryWithValues); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts new file mode 100644 index 0000000000..be474904b6 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/update.operation.ts @@ -0,0 +1,216 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { + PgpDatabase, + QueriesRunner, + QueryValues, + QueryWithValues, +} from '../../helpers/interfaces'; + +import { + addReturning, + checkItemAgainstSchema, + getTableSchema, + prepareItem, + replaceEmptyStringsByNulls, +} from '../../helpers/utils'; + +import { optionsCollection } from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Data Mode', + name: 'dataMode', + type: 'options', + options: [ + { + name: 'Auto-Map Input Data to Columns', + value: 'autoMapInputData', + description: 'Use when node input properties names exactly match the table column names', + }, + { + name: 'Map Each Column Manually', + value: 'defineBelow', + description: 'Set the value for each destination column manually', + }, + ], + default: 'autoMapInputData', + description: + 'Whether to map node input properties and the table data automatically or manually', + }, + { + displayName: ` + In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names. + `, + name: 'notice', + type: 'notice', + default: '', + displayOptions: { + show: { + dataMode: ['autoMapInputData'], + }, + }, + }, + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased, n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column to Match On', + name: 'columnToMatchOn', + type: 'options', + required: true, + description: + 'The column to compare when finding the rows to update. Choose from the list, or specify an ID using an expression.', + typeOptions: { + loadOptionsMethod: 'getColumns', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + default: '', + hint: 'The column that identifies the row(s) to modify', + }, + { + displayName: 'Value of Column to Match On', + name: 'valueToMatchOn', + type: 'string', + default: '', + description: + 'Rows with a value in the specified "Column to Match On" that corresponds to the value in this field will be updated', + displayOptions: { + show: { + dataMode: ['defineBelow'], + }, + }, + }, + { + displayName: 'Values to Send', + name: 'valuesToSend', + placeholder: 'Add Value', + type: 'fixedCollection', + typeOptions: { + multipleValueButtonText: 'Add Value', + multipleValues: true, + }, + displayOptions: { + show: { + dataMode: ['defineBelow'], + }, + }, + default: {}, + options: [ + { + displayName: 'Values', + name: 'values', + values: [ + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column', + name: 'column', + type: 'options', + description: + 'Choose from the list, or specify an ID using an expression', + typeOptions: { + loadOptionsMethod: 'getColumnsWithoutColumnToMatchOn', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + default: [], + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['update'], + }, + hide: { + table: [''], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + db: PgpDatabase, +): Promise { + items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); + + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const schema = this.getNodeParameter('schema', i, undefined, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', i, undefined, { + extractValue: true, + }) as string; + + const columnToMatchOn = this.getNodeParameter('columnToMatchOn', i) as string; + + const dataMode = this.getNodeParameter('dataMode', i) as string; + + let item: IDataObject = {}; + let valueToMatchOn: string | IDataObject = ''; + + if (dataMode === 'autoMapInputData') { + item = items[i].json; + valueToMatchOn = item[columnToMatchOn] as string; + } + + if (dataMode === 'defineBelow') { + const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject) + .values as IDataObject[]; + + item = prepareItem(valuesToSend); + + valueToMatchOn = this.getNodeParameter('valueToMatchOn', i) as string; + } + + const tableSchema = await getTableSchema(db, schema, table); + + item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i); + + let values: QueryValues = [schema, table]; + + let valuesLength = values.length + 1; + + const condition = `$${valuesLength}:name = $${valuesLength + 1}`; + valuesLength = valuesLength + 2; + values.push(columnToMatchOn, valueToMatchOn); + + const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn); + + const updates: string[] = []; + + for (const column of updateColumns) { + updates.push(`$${valuesLength}:name = $${valuesLength + 1}`); + valuesLength = valuesLength + 2; + values.push(column, item[column] as string); + } + + let query = `UPDATE $1:name.$2:name SET ${updates.join(', ')} WHERE ${condition}`; + + const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[]; + + [query, values] = addReturning(query, outputColumns, values); + + queries.push({ query, values }); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts new file mode 100644 index 0000000000..972f171d9c --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/database/upsert.operation.ts @@ -0,0 +1,217 @@ +import type { IExecuteFunctions } from 'n8n-core'; +import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; + +import { updateDisplayOptions } from '../../../../../utils/utilities'; + +import type { + PgpDatabase, + QueriesRunner, + QueryValues, + QueryWithValues, +} from '../../helpers/interfaces'; + +import { + addReturning, + checkItemAgainstSchema, + getTableSchema, + prepareItem, + replaceEmptyStringsByNulls, +} from '../../helpers/utils'; + +import { optionsCollection } from '../common.descriptions'; + +const properties: INodeProperties[] = [ + { + displayName: 'Data Mode', + name: 'dataMode', + type: 'options', + options: [ + { + name: 'Auto-Map Input Data to Columns', + value: 'autoMapInputData', + description: 'Use when node input properties names exactly match the table column names', + }, + { + name: 'Map Each Column Manually', + value: 'defineBelow', + description: 'Set the value for each destination column manually', + }, + ], + default: 'autoMapInputData', + description: + 'Whether to map node input properties and the table data automatically or manually', + }, + { + displayName: ` + In this mode, make sure incoming data fields are named the same as the columns in your table. If needed, use a 'Set' node before this node to change the field names. + `, + name: 'notice', + type: 'notice', + default: '', + displayOptions: { + show: { + dataMode: ['autoMapInputData'], + }, + }, + }, + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-miscased, n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column to Match On', + name: 'columnToMatchOn', + type: 'options', + required: true, + description: + 'The column to compare when finding the rows to update. Choose from the list, or specify an ID using an expression.', + typeOptions: { + loadOptionsMethod: 'getColumns', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + default: '', + hint: "Used to find the correct row(s) to update. Doesn't get changed. Has to be unique.", + }, + { + displayName: 'Value of Column to Match On', + name: 'valueToMatchOn', + type: 'string', + default: '', + description: + 'Rows with a value in the specified "Column to Match On" that corresponds to the value in this field will be updated. New rows will be created for non-matching items.', + displayOptions: { + show: { + dataMode: ['defineBelow'], + }, + }, + }, + { + displayName: 'Values to Send', + name: 'valuesToSend', + placeholder: 'Add Value', + type: 'fixedCollection', + typeOptions: { + multipleValueButtonText: 'Add Value', + multipleValues: true, + }, + displayOptions: { + show: { + dataMode: ['defineBelow'], + }, + }, + default: {}, + options: [ + { + displayName: 'Values', + name: 'values', + values: [ + { + // eslint-disable-next-line n8n-nodes-base/node-param-display-name-wrong-for-dynamic-options + displayName: 'Column', + name: 'column', + type: 'options', + description: + 'Choose from the list, or specify an ID using an expression', + typeOptions: { + loadOptionsMethod: 'getColumnsWithoutColumnToMatchOn', + loadOptionsDependsOn: ['schema.value', 'table.value'], + }, + default: [], + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + optionsCollection, +]; + +const displayOptions = { + show: { + resource: ['database'], + operation: ['upsert'], + }, + hide: { + table: [''], + }, +}; + +export const description = updateDisplayOptions(displayOptions, properties); + +export async function execute( + this: IExecuteFunctions, + runQueries: QueriesRunner, + items: INodeExecutionData[], + nodeOptions: IDataObject, + db: PgpDatabase, +): Promise { + items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean); + + const queries: QueryWithValues[] = []; + + for (let i = 0; i < items.length; i++) { + const schema = this.getNodeParameter('schema', i, undefined, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', i, undefined, { + extractValue: true, + }) as string; + + const columnToMatchOn = this.getNodeParameter('columnToMatchOn', i) as string; + + const dataMode = this.getNodeParameter('dataMode', i) as string; + + let item: IDataObject = {}; + + if (dataMode === 'autoMapInputData') { + item = items[i].json; + } + + if (dataMode === 'defineBelow') { + const valuesToSend = (this.getNodeParameter('valuesToSend', i, []) as IDataObject) + .values as IDataObject[]; + + item = prepareItem(valuesToSend); + + item[columnToMatchOn] = this.getNodeParameter('valueToMatchOn', i) as string; + } + + const tableSchema = await getTableSchema(db, schema, table); + + item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i); + + let values: QueryValues = [schema, table]; + + let valuesLength = values.length + 1; + const onConflict = ` ON CONFLICT ($${valuesLength}:name) DO UPDATE `; + valuesLength = valuesLength + 1; + values.push(columnToMatchOn); + + const insertQuery = `INSERT INTO $1:name.$2:name($${valuesLength}:name) VALUES($${valuesLength}:csv)${onConflict}`; + valuesLength = valuesLength + 1; + values.push(item); + + const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn); + + const updates: string[] = []; + + for (const column of updateColumns) { + updates.push(`$${valuesLength}:name = $${valuesLength + 1}`); + valuesLength = valuesLength + 2; + values.push(column, item[column] as string); + } + + let query = `${insertQuery} SET ${updates.join(', ')}`; + + const outputColumns = this.getNodeParameter('options.outputColumns', i, ['*']) as string[]; + + [query, values] = addReturning(query, outputColumns, values); + + queries.push({ query, values }); + } + + return runQueries(queries, items, nodeOptions); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts b/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts new file mode 100644 index 0000000000..80a11ce0f8 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/node.type.ts @@ -0,0 +1,9 @@ +import type { AllEntities, Entity } from 'n8n-workflow'; + +type PostgresMap = { + database: 'deleteTable' | 'executeQuery' | 'insert' | 'select' | 'update' | 'upsert'; +}; + +export type PostgresType = AllEntities; + +export type PostgresDatabaseType = Entity; diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/router.ts b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts new file mode 100644 index 0000000000..68289d5a87 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/router.ts @@ -0,0 +1,67 @@ +import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import type { PostgresType } from './node.type'; + +import * as database from './database/Database.resource'; +import { Connections } from '../transport'; +import { configureQueryRunner } from '../helpers/utils'; +import type { ConnectionsData } from '../helpers/interfaces'; + +export async function router(this: IExecuteFunctions): Promise { + let returnData: INodeExecutionData[] = []; + + const items = this.getInputData(); + const resource = this.getNodeParameter('resource', 0); + const operation = this.getNodeParameter('operation', 0); + + const credentials = await this.getCredentials('postgres'); + const options = this.getNodeParameter('options', 0, {}); + + const { db, pgp, sshClient } = (await Connections.getInstance( + credentials, + options, + true, + )) as ConnectionsData; + + const runQueries = configureQueryRunner( + this.getNode(), + this.helpers.constructExecutionMetaData, + this.continueOnFail(), + pgp, + db, + ); + + const postgresNodeData = { + resource, + operation, + } as PostgresType; + + try { + switch (postgresNodeData.resource) { + case 'database': + returnData = await database[postgresNodeData.operation].execute.call( + this, + runQueries, + items, + options, + db, + ); + break; + default: + throw new NodeOperationError( + this.getNode(), + `The operation "${operation}" is not supported!`, + ); + } + } catch (error) { + throw error; + } finally { + if (sshClient) { + sshClient.end(); + } + pgp.end(); + } + + return this.prepareOutputData(returnData); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts b/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts new file mode 100644 index 0000000000..031f178d2e --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/actions/versionDescription.ts @@ -0,0 +1,42 @@ +/* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import type { INodeTypeDescription } from 'n8n-workflow'; + +import * as database from './database/Database.resource'; + +export const versionDescription: INodeTypeDescription = { + displayName: 'Postgres', + name: 'postgres', + icon: 'file:postgres.svg', + group: ['input'], + version: 2, + subtitle: '={{ $parameter["operation"] }}', + description: 'Get, add and update data in Postgres', + defaults: { + name: 'Postgres', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'postgres', + required: true, + testedBy: 'postgresConnectionTest', + }, + ], + properties: [ + { + displayName: 'Resource', + name: 'resource', + type: 'hidden', + noDataExpression: true, + options: [ + { + name: 'Database', + value: 'database', + }, + ], + default: 'database', + }, + ...database.description, + ], +}; diff --git a/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts new file mode 100644 index 0000000000..e96cae5b1e --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/helpers/interfaces.ts @@ -0,0 +1,37 @@ +import type { + IDataObject, + INodeExecutionData, + IPairedItemData, + NodeExecutionWithMetadata, +} from 'n8n-workflow'; +import type pgPromise from 'pg-promise'; +import type pg from 'pg-promise/typescript/pg-subset'; +import type { Client } from 'ssh2'; + +export type QueryMode = 'single' | 'transaction' | 'independently'; + +export type QueryValue = string | number | IDataObject | string[]; +export type QueryValues = QueryValue[]; +export type QueryWithValues = { query: string; values?: QueryValues }; + +export type WhereClause = { column: string; condition: string; value: string | number }; +export type SortRule = { column: string; direction: string }; +export type ColumnInfo = { column_name: string; data_type: string; is_nullable: string }; + +export type PgpClient = pgPromise.IMain<{}, pg.IClient>; +export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>; +export type PgpConnectionParameters = pg.IConnectionParameters; +export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient; sshClient?: Client }; + +export type ConstructExecutionMetaData = ( + inputData: INodeExecutionData[], + options: { + itemData: IPairedItemData | IPairedItemData[]; + }, +) => NodeExecutionWithMetadata[]; + +export type QueriesRunner = ( + queries: QueryWithValues[], + items: INodeExecutionData[], + options: IDataObject, +) => Promise; diff --git a/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts b/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts new file mode 100644 index 0000000000..152c4a344f --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/helpers/utils.ts @@ -0,0 +1,360 @@ +import type { IDataObject, INode, INodeExecutionData } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +import type { + ColumnInfo, + ConstructExecutionMetaData, + PgpClient, + PgpDatabase, + QueryMode, + QueryValues, + QueryWithValues, + SortRule, + WhereClause, +} from './interfaces'; + +export function wrapData(data: IDataObject | IDataObject[]): INodeExecutionData[] { + if (!Array.isArray(data)) { + return [{ json: data }]; + } + return data.map((item) => ({ + json: item, + })); +} + +export function prepareErrorItem( + items: INodeExecutionData[], + error: IDataObject | NodeOperationError | Error, + index: number, +) { + return { + json: { message: error.message, item: { ...items[index].json }, error: { ...error } }, + pairedItem: { item: index }, + } as INodeExecutionData; +} + +export function parsePostgresError( + node: INode, + error: any, + queries: QueryWithValues[], + itemIndex?: number, +) { + if (error.message.includes('syntax error at or near') && queries.length) { + try { + const snippet = error.message.match(/syntax error at or near "(.*)"/)[1] as string; + const failedQureryIndex = queries.findIndex((query) => query.query.includes(snippet)); + + if (failedQureryIndex !== -1) { + if (!itemIndex) { + itemIndex = failedQureryIndex; + } + const failedQuery = queries[failedQureryIndex].query; + const lines = failedQuery.split('\n'); + const lineIndex = lines.findIndex((line) => line.includes(snippet)); + const errorMessage = `Syntax error at line ${lineIndex + 1} near "${snippet}"`; + error.message = errorMessage; + } + } catch {} + } + + let message = error.message; + const errorDescription = error.description ? error.description : error.detail || error.hint; + let description = errorDescription; + + if (!description && queries[itemIndex || 0]?.query) { + description = `Failed query: ${queries[itemIndex || 0].query}`; + } + + if (error.message.includes('ECONNREFUSED')) { + message = 'Connection refused'; + try { + description = error.message.split('ECONNREFUSED ')[1].trim(); + } catch (e) {} + } + + if (error.message.includes('ENOTFOUND')) { + message = 'Host not found'; + try { + description = error.message.split('ENOTFOUND ')[1].trim(); + } catch (e) {} + } + + if (error.message.includes('ETIMEDOUT')) { + message = 'Connection timed out'; + try { + description = error.message.split('ETIMEDOUT ')[1].trim(); + } catch (e) {} + } + + return new NodeOperationError(node, error as Error, { + message, + description, + itemIndex, + }); +} + +export function addWhereClauses( + node: INode, + itemIndex: number, + query: string, + clauses: WhereClause[], + replacements: QueryValues, + combineConditions: string, +): [string, QueryValues] { + if (clauses.length === 0) return [query, replacements]; + + let combineWith = 'AND'; + + if (combineConditions === 'OR') { + combineWith = 'OR'; + } + + let replacementIndex = replacements.length + 1; + + let whereQuery = ' WHERE'; + const values: QueryValues = []; + + clauses.forEach((clause, index) => { + if (clause.condition === 'equal') { + clause.condition = '='; + } + if (['>', '<', '>=', '<='].includes(clause.condition)) { + const value = Number(clause.value); + + if (Number.isNaN(value)) { + throw new NodeOperationError( + node, + `Operator in entry ${index + 1} of 'Select Rows' works with numbers, but value ${ + clause.value + } is not a number`, + { + itemIndex, + }, + ); + } + + clause.value = value; + } + const columnReplacement = `$${replacementIndex}:name`; + values.push(clause.column); + replacementIndex = replacementIndex + 1; + + let valueReplacement = ''; + if (clause.condition !== 'IS NULL') { + valueReplacement = ` $${replacementIndex}`; + values.push(clause.value); + replacementIndex = replacementIndex + 1; + } + + const operator = index === clauses.length - 1 ? '' : ` ${combineWith}`; + + whereQuery += ` ${columnReplacement} ${clause.condition}${valueReplacement}${operator}`; + }); + + return [`${query}${whereQuery}`, replacements.concat(...values)]; +} + +export function addSortRules( + query: string, + rules: SortRule[], + replacements: QueryValues, +): [string, QueryValues] { + if (rules.length === 0) return [query, replacements]; + + let replacementIndex = replacements.length + 1; + + let orderByQuery = ' ORDER BY'; + const values: string[] = []; + + rules.forEach((rule, index) => { + const columnReplacement = `$${replacementIndex}:name`; + values.push(rule.column); + replacementIndex = replacementIndex + 1; + + const endWith = index === rules.length - 1 ? '' : ','; + + const sortDirection = rule.direction === 'DESC' ? 'DESC' : 'ASC'; + + orderByQuery += ` ${columnReplacement} ${sortDirection}${endWith}`; + }); + + return [`${query}${orderByQuery}`, replacements.concat(...values)]; +} + +export function addReturning( + query: string, + outputColumns: string[], + replacements: QueryValues, +): [string, QueryValues] { + if (outputColumns.includes('*')) return [`${query} RETURNING *`, replacements]; + + const replacementIndex = replacements.length + 1; + + return [`${query} RETURNING $${replacementIndex}:name`, [...replacements, outputColumns]]; +} + +export const configureQueryRunner = + ( + node: INode, + constructExecutionMetaData: ConstructExecutionMetaData, + continueOnFail: boolean, + pgp: PgpClient, + db: PgpDatabase, + ) => + async (queries: QueryWithValues[], items: INodeExecutionData[], options: IDataObject) => { + let returnData: INodeExecutionData[] = []; + + const queryBatching = (options.queryBatching as QueryMode) || 'single'; + + if (queryBatching === 'single') { + try { + returnData = (await db.multi(pgp.helpers.concat(queries))) + .map((result, i) => { + return constructExecutionMetaData(wrapData(result as IDataObject[]), { + itemData: { item: i }, + }); + }) + .flat(); + returnData = returnData.length ? returnData : [{ json: { success: true } }]; + } catch (err) { + const error = parsePostgresError(node, err, queries); + if (!continueOnFail) throw error; + + return [ + { + json: { + message: error.message, + error: { ...error }, + }, + }, + ]; + } + } + + if (queryBatching === 'transaction') { + returnData = await db.tx(async (transaction) => { + const result: INodeExecutionData[] = []; + for (let i = 0; i < queries.length; i++) { + try { + const transactionResult: IDataObject[] = await transaction.any( + queries[i].query, + queries[i].values, + ); + + const executionData = constructExecutionMetaData( + wrapData(transactionResult.length ? transactionResult : [{ success: true }]), + { itemData: { item: i } }, + ); + + result.push(...executionData); + } catch (err) { + const error = parsePostgresError(node, err, queries, i); + if (!continueOnFail) throw error; + result.push(prepareErrorItem(items, error, i)); + return result; + } + } + return result; + }); + } + + if (queryBatching === 'independently') { + returnData = await db.task(async (t) => { + const result: INodeExecutionData[] = []; + for (let i = 0; i < queries.length; i++) { + try { + const transactionResult: IDataObject[] = await t.any( + queries[i].query, + queries[i].values, + ); + + const executionData = constructExecutionMetaData( + wrapData(transactionResult.length ? transactionResult : [{ success: true }]), + { itemData: { item: i } }, + ); + + result.push(...executionData); + } catch (err) { + const error = parsePostgresError(node, err, queries, i); + if (!continueOnFail) throw error; + result.push(prepareErrorItem(items, error, i)); + } + } + return result; + }); + } + + return returnData; + }; + +export function replaceEmptyStringsByNulls( + items: INodeExecutionData[], + replace?: boolean, +): INodeExecutionData[] { + if (!replace) return items; + + const returnData: INodeExecutionData[] = items.map((item) => { + const newItem = { ...item }; + const keys = Object.keys(newItem.json); + + for (const key of keys) { + if (newItem.json[key] === '') { + newItem.json[key] = null; + } + } + + return newItem; + }); + + return returnData; +} + +export function prepareItem(values: IDataObject[]) { + const item = values.reduce((acc, { column, value }) => { + acc[column as string] = value; + return acc; + }, {} as IDataObject); + + return item; +} + +export async function getTableSchema( + db: PgpDatabase, + schema: string, + table: string, +): Promise { + const columns = await db.any( + 'SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2', + [schema, table], + ); + + return columns; +} + +export function checkItemAgainstSchema( + node: INode, + item: IDataObject, + columnsInfo: ColumnInfo[], + index: number, +) { + if (columnsInfo.length === 0) return item; + const schema = columnsInfo.reduce((acc, { column_name, data_type, is_nullable }) => { + acc[column_name] = { type: data_type.toUpperCase(), nullable: is_nullable === 'YES' }; + return acc; + }, {} as IDataObject); + + for (const key of Object.keys(item)) { + if (schema[key] === undefined) { + throw new NodeOperationError(node, `Column '${key}' does not exist in selected table`, { + itemIndex: index, + }); + } + if (item[key] === null && !(schema[key] as IDataObject)?.nullable) { + throw new NodeOperationError(node, `Column '${key}' is not nullable`, { + itemIndex: index, + }); + } + } + + return item; +} diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts new file mode 100644 index 0000000000..614f732859 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts @@ -0,0 +1,68 @@ +import type { + ICredentialsDecrypted, + ICredentialTestFunctions, + IDataObject, + INodeCredentialTestResult, +} from 'n8n-workflow'; + +import { Connections } from '../transport'; + +import { Client } from 'ssh2'; +import type { ConnectionsData, PgpClient } from '../helpers/interfaces'; + +export async function postgresConnectionTest( + this: ICredentialTestFunctions, + credential: ICredentialsDecrypted, +): Promise { + const credentials = credential.data as IDataObject; + + let sshClientCreated: Client | undefined = new Client(); + let pgpClientCreated: PgpClient | undefined; + + try { + const { db, pgp, sshClient } = (await Connections.getInstance( + credentials, + {}, + true, + sshClientCreated, + )) as ConnectionsData; + + sshClientCreated = sshClient; + pgpClientCreated = pgp; + + await db.connect(); + } catch (error) { + let message = error.message as string; + + if (error.message.includes('ECONNREFUSED')) { + message = 'Connection refused'; + } + + if (error.message.includes('ENOTFOUND')) { + message = 'Host not found, please check your host name'; + } + + if (error.message.includes('ETIMEDOUT')) { + message = 'Connection timed out'; + } + + return { + status: 'Error', + message, + }; + } finally { + if (sshClientCreated) { + sshClientCreated.end(); + } + if (pgpClientCreated) { + pgpClientCreated.end(); + } + + //set the connection instance to null so that it can be recreated + await Connections.getInstance({}, {}, false, undefined, true); + } + return { + status: 'OK', + message: 'Connection successful!', + }; +} diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/index.ts b/packages/nodes-base/nodes/Postgres/v2/methods/index.ts new file mode 100644 index 0000000000..8d2d0278dd --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/methods/index.ts @@ -0,0 +1,3 @@ +export * as credentialTest from './credentialTest'; +export * as listSearch from './listSearch'; +export * as loadOptions from './loadOptions'; diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts new file mode 100644 index 0000000000..8a8b35ab39 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts @@ -0,0 +1,47 @@ +import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow'; +import type { ConnectionsData } from '../helpers/interfaces'; +import { Connections } from '../transport'; + +export async function schemaSearch(this: ILoadOptionsFunctions): Promise { + const credentials = await this.getCredentials('postgres'); + + const { db } = (await Connections.getInstance(credentials)) as ConnectionsData; + + try { + const response = await db.any('SELECT schema_name FROM information_schema.schemata'); + + return { + results: response.map((schema) => ({ + name: schema.schema_name as string, + value: schema.schema_name as string, + })), + }; + } catch (error) { + throw error; + } +} +export async function tableSearch(this: ILoadOptionsFunctions): Promise { + const credentials = await this.getCredentials('postgres'); + + const { db } = (await Connections.getInstance(credentials)) as ConnectionsData; + + const schema = this.getNodeParameter('schema', 0, { + extractValue: true, + }) as string; + + try { + const response = await db.any( + 'SELECT table_name FROM information_schema.tables WHERE table_schema=$1', + [schema], + ); + + return { + results: response.map((table) => ({ + name: table.table_name as string, + value: table.table_name as string, + })), + }; + } catch (error) { + throw error; + } +} diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts new file mode 100644 index 0000000000..acc6fb9247 --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts @@ -0,0 +1,46 @@ +import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow'; +import type { ConnectionsData } from '../helpers/interfaces'; +import { getTableSchema } from '../helpers/utils'; +import { Connections } from '../transport'; + +export async function getColumns(this: ILoadOptionsFunctions): Promise { + const credentials = await this.getCredentials('postgres'); + + const { db } = (await Connections.getInstance(credentials)) as ConnectionsData; + + const schema = this.getNodeParameter('schema', 0, { + extractValue: true, + }) as string; + + const table = this.getNodeParameter('table', 0, { + extractValue: true, + }) as string; + + try { + const columns = await getTableSchema(db, schema, table); + + return columns.map((column) => ({ + name: column.column_name, + value: column.column_name, + description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`, + })); + } catch (error) { + throw error; + } +} + +export async function getColumnsMultiOptions( + this: ILoadOptionsFunctions, +): Promise { + const returnData = await getColumns.call(this); + const returnAll = { name: '*', value: '*', description: 'All columns' }; + return [returnAll, ...returnData]; +} + +export async function getColumnsWithoutColumnToMatchOn( + this: ILoadOptionsFunctions, +): Promise { + const columnToMatchOn = this.getNodeParameter('columnToMatchOn') as string; + const returnData = await getColumns.call(this); + return returnData.filter((column) => column.value !== columnToMatchOn); +} diff --git a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts new file mode 100644 index 0000000000..76240a05ab --- /dev/null +++ b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts @@ -0,0 +1,209 @@ +import type { IDataObject } from 'n8n-workflow'; + +import { Client } from 'ssh2'; +import type { ConnectConfig } from 'ssh2'; + +import type { Server } from 'net'; +import { createServer } from 'net'; + +import pgPromise from 'pg-promise'; + +import { rm, writeFile } from 'fs/promises'; +import { file } from 'tmp-promise'; + +import type { PgpClient, PgpDatabase } from '../helpers/interfaces'; + +async function createSshConnectConfig(credentials: IDataObject) { + if (credentials.sshAuthenticateWith === 'password') { + return { + host: credentials.sshHost as string, + port: credentials.sshPort as number, + username: credentials.sshUser as string, + password: credentials.sshPassword as string, + } as ConnectConfig; + } else { + const { path } = await file({ prefix: 'n8n-ssh-' }); + await writeFile(path, credentials.privateKey as string); + + const options: ConnectConfig = { + host: credentials.host as string, + username: credentials.username as string, + port: credentials.port as number, + privateKey: path, + }; + + if (credentials.passphrase) { + options.passphrase = credentials.passphrase as string; + } + + return options; + } +} + +async function configurePostgres( + credentials: IDataObject, + options: IDataObject = {}, + createdSshClient?: Client, +) { + const pgp = pgPromise(); + + if (options.largeNumbersOutput === 'numbers') { + pgp.pg.types.setTypeParser(20, (value: string) => { + return parseInt(value, 10); + }); + pgp.pg.types.setTypeParser(1700, (value: string) => { + return parseFloat(value); + }); + } + + const dbConfig: IDataObject = { + host: credentials.host as string, + port: credentials.port as number, + database: credentials.database as string, + user: credentials.user as string, + password: credentials.password as string, + }; + + if (options.connectionTimeout) { + dbConfig.connectionTimeoutMillis = (options.connectionTimeout as number) * 1000; + } + + if (credentials.allowUnauthorizedCerts === true) { + dbConfig.ssl = { + rejectUnauthorized: false, + }; + } else { + dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); + dbConfig.sslmode = (credentials.ssl as string) || 'disable'; + } + + if (!credentials.sshTunnel) { + const db = pgp(dbConfig); + return { db, pgp }; + } else { + const sshClient = createdSshClient || new Client(); + + const tunnelConfig = await createSshConnectConfig(credentials); + + const localHost = '127.0.0.1'; + const localPort = credentials.sshPostgresPort as number; + + let proxy: Server | undefined; + + const db = await new Promise((resolve, reject) => { + let sshClientReady = false; + + proxy = createServer((socket) => { + if (!sshClientReady) return socket.destroy(); + + sshClient.forwardOut( + socket.remoteAddress as string, + socket.remotePort as number, + credentials.host as string, + credentials.port as number, + (err, stream) => { + if (err) reject(err); + + socket.pipe(stream); + stream.pipe(socket); + }, + ); + }).listen(localPort, localHost); + + proxy.on('error', (err) => { + reject(err); + }); + + sshClient.connect(tunnelConfig); + + sshClient.on('ready', () => { + sshClientReady = true; + + const updatedDbConfig = { + ...dbConfig, + port: localPort, + host: localHost, + }; + const dbConnection = pgp(updatedDbConfig); + resolve(dbConnection); + }); + + sshClient.on('error', (err) => { + reject(err); + }); + + sshClient.on('end', async () => { + if (tunnelConfig.privateKey) { + await rm(tunnelConfig.privateKey as string, { force: true }); + } + if (proxy) proxy.close(); + }); + }).catch((err) => { + if (proxy) proxy.close(); + if (sshClient) sshClient.end(); + + let message = err.message; + let description = err.description; + + if (err.message.includes('ECONNREFUSED')) { + message = 'Connection refused'; + try { + description = err.message.split('ECONNREFUSED ')[1].trim(); + } catch (e) {} + } + + if (err.message.includes('ENOTFOUND')) { + message = 'Host not found'; + try { + description = err.message.split('ENOTFOUND ')[1].trim(); + } catch (e) {} + } + + if (err.message.includes('ETIMEDOUT')) { + message = 'Connection timed out'; + try { + description = err.message.split('ETIMEDOUT ')[1].trim(); + } catch (e) {} + } + + err.message = message; + err.description = description; + throw err; + }); + + return { db, pgp, sshClient }; + } +} + +export const Connections = (function () { + let instance: { db: PgpDatabase; pgp: PgpClient; sshClient?: Client } | null = null; + + return { + async getInstance( + credentials: IDataObject = {}, + options: IDataObject = {}, + reload = false, + createdSshClient?: Client, + nulify = false, + ) { + if (nulify) { + instance = null; + return instance; + } + + if (instance !== null && reload) { + if (instance.sshClient) { + instance.sshClient.end(); + } + instance.pgp.end(); + + instance = null; + } + + if (instance === null && Object.keys(credentials).length) { + instance = await configurePostgres(credentials, options, createdSshClient); + } + return instance; + }, + }; +})(); diff --git a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts index b21699c725..5987d4007e 100644 --- a/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts +++ b/packages/nodes-base/nodes/QuestDb/QuestDb.node.ts @@ -8,7 +8,7 @@ import { NodeOperationError } from 'n8n-workflow'; import pgPromise from 'pg-promise'; -import { pgInsert, pgQuery } from '../Postgres/Postgres.node.functions'; +import { pgInsert, pgQuery } from '../Postgres/v1/genericFunctions'; export class QuestDb implements INodeType { description: INodeTypeDescription = { diff --git a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts index 3381fa25b0..5a43f7c500 100644 --- a/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts +++ b/packages/nodes-base/nodes/TimescaleDb/TimescaleDb.node.ts @@ -6,7 +6,7 @@ import type { } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; -import { pgInsert, pgQuery, pgUpdate } from '../Postgres/Postgres.node.functions'; +import { pgInsert, pgQuery, pgUpdate } from '../Postgres/v1/genericFunctions'; import pgPromise from 'pg-promise'; diff --git a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js index db409d1165..29b72932c8 100644 --- a/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js +++ b/packages/nodes-base/test/nodes/Postgres/Postgres.node.functions.test.js @@ -1,4 +1,4 @@ -const PostgresFun = require('../../../nodes/Postgres/Postgres.node.functions'); +const PostgresFun = require('../../../nodes/Postgres/v1/genericFunctions'); const pgPromise = require('pg-promise'); describe('pgUpdate', () => {