refactor(Postgres Node): Backport connection pooling to postgres v1 (#12484)

This commit is contained in:
Danny Martini 2025-01-16 11:09:12 +01:00 committed by GitHub
parent c97bd48a77
commit 35cb10c5e7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 49 additions and 69 deletions

View file

@ -1,9 +1,9 @@
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */ /* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import { PostgresChatMessageHistory } from '@langchain/community/stores/message/postgres'; import { PostgresChatMessageHistory } from '@langchain/community/stores/message/postgres';
import { BufferMemory, BufferWindowMemory } from 'langchain/memory'; import { BufferMemory, BufferWindowMemory } from 'langchain/memory';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/transport';
import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces'; import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces';
import { postgresConnectionTest } from 'n8n-nodes-base/dist/nodes/Postgres/v2/methods/credentialTest'; import { postgresConnectionTest } from 'n8n-nodes-base/dist/nodes/Postgres/v2/methods/credentialTest';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/v2/transport';
import type { import type {
ISupplyDataFunctions, ISupplyDataFunctions,
INodeType, INodeType,

View file

@ -4,8 +4,8 @@ import {
type PGVectorStoreArgs, type PGVectorStoreArgs,
} from '@langchain/community/vectorstores/pgvector'; } from '@langchain/community/vectorstores/pgvector';
import type { EmbeddingsInterface } from '@langchain/core/embeddings'; import type { EmbeddingsInterface } from '@langchain/core/embeddings';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/transport';
import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces'; import type { PostgresNodeCredentials } from 'n8n-nodes-base/dist/nodes/Postgres/v2/helpers/interfaces';
import { configurePostgres } from 'n8n-nodes-base/dist/nodes/Postgres/v2/transport';
import type { INodeProperties } from 'n8n-workflow'; import type { INodeProperties } from 'n8n-workflow';
import type pg from 'pg'; import type pg from 'pg';

View file

@ -37,6 +37,14 @@ export class Postgres implements ICredentialType {
}, },
default: '', default: '',
}, },
{
displayName: 'Maximum Number of Connections',
name: 'maxConnections',
type: 'number',
default: 100,
description:
'Make sure this value times the number of workers you have is lower than the maximum number of connections your postgres instance allows.',
},
{ {
displayName: 'Ignore SSL Issues (Insecure)', displayName: 'Ignore SSL Issues (Insecure)',
name: 'allowUnauthorizedCerts', name: 'allowUnauthorizedCerts',

View file

@ -7,8 +7,8 @@ import type {
INodeListSearchItems, INodeListSearchItems,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { configurePostgres } from './transport';
import type { PgpDatabase, PostgresNodeCredentials } from './v2/helpers/interfaces'; import type { PgpDatabase, PostgresNodeCredentials } from './v2/helpers/interfaces';
import { configurePostgres } from './v2/transport';
export function prepareNames(id: string, mode: string, additionalFields: IDataObject) { export function prepareNames(id: string, mode: string, additionalFields: IDataObject) {
let suffix = id.replace(/-/g, '_'); let suffix = id.replace(/-/g, '_');

View file

@ -16,7 +16,7 @@ import type {
PgpConnectionParameters, PgpConnectionParameters,
PostgresNodeCredentials, PostgresNodeCredentials,
PostgresNodeOptions, PostgresNodeOptions,
} from '../helpers/interfaces'; } from '../v2/helpers/interfaces';
const getPostgresConfig = ( const getPostgresConfig = (
credentials: PostgresNodeCredentials, credentials: PostgresNodeCredentials,
@ -29,6 +29,7 @@ const getPostgresConfig = (
user: credentials.user, user: credentials.user,
password: credentials.password, password: credentials.password,
keepAlive: true, keepAlive: true,
max: credentials.maxConnections,
}; };
if (options.connectionTimeout) { if (options.connectionTimeout) {

View file

@ -1,7 +1,6 @@
import type { import type {
ICredentialsDecrypted, ICredentialsDecrypted,
ICredentialTestFunctions, ICredentialTestFunctions,
IDataObject,
IExecuteFunctions, IExecuteFunctions,
INodeCredentialTestResult, INodeCredentialTestResult,
INodeExecutionData, INodeExecutionData,
@ -10,11 +9,12 @@ import type {
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import pgPromise from 'pg-promise';
import { oldVersionNotice } from '@utils/descriptions'; import { oldVersionNotice } from '@utils/descriptions';
import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './genericFunctions'; import { pgInsertV2, pgQueryV2, pgUpdate, wrapData } from './genericFunctions';
import { configurePostgres } from '../transport';
import type { PgpConnection, PostgresNodeCredentials } from '../v2/helpers/interfaces';
const versionDescription: INodeTypeDescription = { const versionDescription: INodeTypeDescription = {
displayName: 'Postgres', displayName: 'Postgres',
@ -298,33 +298,27 @@ export class PostgresV1 implements INodeType {
this: ICredentialTestFunctions, this: ICredentialTestFunctions,
credential: ICredentialsDecrypted, credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> { ): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject; const credentials = credential.data as PostgresNodeCredentials;
let connection: PgpConnection | undefined;
try { try {
const pgp = pgPromise(); const { db } = await configurePostgres.call(this, credentials, {});
const config: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
};
if (credentials.allowUnauthorizedCerts === true) { // Acquires a new connection that can be used to to run multiple
config.ssl = { // queries on the same connection and must be released again
rejectUnauthorized: false, // manually.
}; connection = await db.connect();
} else {
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
config.sslmode = (credentials.ssl as string) || 'disable';
}
const db = pgp(config);
await db.connect();
} catch (error) { } catch (error) {
return { return {
status: 'Error', status: 'Error',
message: error.message, message: error.message,
}; };
} finally {
if (connection) {
// release connection
await connection.done();
}
} }
return { return {
status: 'OK', status: 'OK',
@ -335,42 +329,19 @@ export class PostgresV1 implements INodeType {
}; };
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const credentials = await this.getCredentials('postgres'); const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
const largeNumbersOutput = this.getNodeParameter( const largeNumbersOutput = this.getNodeParameter(
'additionalFields.largeNumbersOutput', 'additionalFields.largeNumbersOutput',
0, 0,
'', '',
) as string; ) as string;
const pgp = pgPromise(); const { db, pgp } = await configurePostgres.call(this, credentials, {
largeNumbersOutput:
if (largeNumbersOutput === 'numbers') { largeNumbersOutput === 'numbers' || largeNumbersOutput === 'text'
pgp.pg.types.setTypeParser(20, (value: string) => { ? largeNumbersOutput
return parseInt(value, 10); : undefined,
}); });
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const config: IDataObject = {
host: credentials.host as string,
port: credentials.port as number,
database: credentials.database as string,
user: credentials.user as string,
password: credentials.password as string,
};
if (credentials.allowUnauthorizedCerts === true) {
config.ssl = {
rejectUnauthorized: false,
};
} else {
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
config.sslmode = (credentials.ssl as string) || 'disable';
}
const db = pgp(config);
let returnItems: INodeExecutionData[] = []; let returnItems: INodeExecutionData[] = [];

View file

@ -3,9 +3,9 @@ import { NodeExecutionOutput, NodeOperationError } from 'n8n-workflow';
import * as database from './database/Database.resource'; import * as database from './database/Database.resource';
import type { PostgresType } from './node.type'; import type { PostgresType } from './node.type';
import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces'; import type { PostgresNodeCredentials, PostgresNodeOptions } from '../helpers/interfaces';
import { configureQueryRunner } from '../helpers/utils'; import { configureQueryRunner } from '../helpers/utils';
import { configurePostgres } from '../transport';
export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = []; let returnData: INodeExecutionData[] = [];

View file

@ -28,6 +28,7 @@ export type EnumInfo = {
export type PgpClient = pgPromise.IMain<{}, pg.IClient>; export type PgpClient = pgPromise.IMain<{}, pg.IClient>;
export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>; export type PgpDatabase = pgPromise.IDatabase<{}, pg.IClient>;
export type PgpConnectionParameters = pg.IConnectionParameters<pg.IClient>; export type PgpConnectionParameters = pg.IConnectionParameters<pg.IClient>;
export type PgpConnection = pgPromise.IConnected<{}, pg.IClient>;
export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient }; export type ConnectionsData = { db: PgpDatabase; pgp: PgpClient };
export type QueriesRunner = ( export type QueriesRunner = (
@ -57,6 +58,7 @@ export type PostgresNodeCredentials = {
database: string; database: string;
user: string; user: string;
password: string; password: string;
maxConnections: number;
allowUnauthorizedCerts?: boolean; allowUnauthorizedCerts?: boolean;
ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full'; ssl?: 'disable' | 'allow' | 'require' | 'verify' | 'verify-full';
} & ( } & (

View file

@ -4,8 +4,8 @@ import type {
INodeCredentialTestResult, INodeCredentialTestResult,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { PgpClient, PostgresNodeCredentials } from '../helpers/interfaces'; import { configurePostgres } from '../../transport';
import { configurePostgres } from '../transport'; import type { PgpConnection, PostgresNodeCredentials } from '../helpers/interfaces';
export async function postgresConnectionTest( export async function postgresConnectionTest(
this: ICredentialTestFunctions, this: ICredentialTestFunctions,
@ -13,14 +13,12 @@ export async function postgresConnectionTest(
): Promise<INodeCredentialTestResult> { ): Promise<INodeCredentialTestResult> {
const credentials = credential.data as PostgresNodeCredentials; const credentials = credential.data as PostgresNodeCredentials;
let pgpClientCreated: PgpClient | undefined; let connection: PgpConnection | undefined;
try { try {
const { db, pgp } = await configurePostgres.call(this, credentials, {}); const { db } = await configurePostgres.call(this, credentials, {});
pgpClientCreated = pgp; connection = await db.connect();
await db.connect();
} catch (error) { } catch (error) {
let message = error.message as string; let message = error.message as string;
@ -41,8 +39,8 @@ export async function postgresConnectionTest(
message, message,
}; };
} finally { } finally {
if (pgpClientCreated) { if (connection) {
pgpClientCreated.end(); await connection.done();
} }
} }
return { return {

View file

@ -1,7 +1,7 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow'; import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces'; import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { configurePostgres } from '../transport';
export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> { export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres'); const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');

View file

@ -1,8 +1,8 @@
import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow'; import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces'; import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { getTableSchema } from '../helpers/utils'; import { getTableSchema } from '../helpers/utils';
import { configurePostgres } from '../transport';
export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> { export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres'); const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');

View file

@ -1,8 +1,8 @@
import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow'; import type { ILoadOptionsFunctions, ResourceMapperFields, FieldType } from 'n8n-workflow';
import { configurePostgres } from '../../transport';
import type { PostgresNodeCredentials } from '../helpers/interfaces'; import type { PostgresNodeCredentials } from '../helpers/interfaces';
import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils'; import { getEnumValues, getEnums, getTableSchema, uniqueColumns } from '../helpers/utils';
import { configurePostgres } from '../transport';
const fieldTypeMapping: Partial<Record<FieldType, string[]>> = { const fieldTypeMapping: Partial<Record<FieldType, string[]>> = {
string: ['text', 'varchar', 'character varying', 'character', 'char'], string: ['text', 'varchar', 'character varying', 'character', 'char'],