diff --git a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts index f46eba1481..6dcb91ce7d 100644 --- a/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts +++ b/packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts @@ -102,7 +102,6 @@ export async function searchSchema(this: ILoadOptionsFunctions): Promise 1 && !node.executeOnce) { diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts index c20319d5ac..67482d4381 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts @@ -9,18 +9,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise ({ - name: schema.schema_name as string, - value: schema.schema_name as string, - })), - }; - } finally { - if (!db.$pool.ending) await db.$pool.end(); - } + return { + results: response.map((schema) => ({ + name: schema.schema_name as string, + value: schema.schema_name as string, + })), + }; } export async function tableSearch(this: ILoadOptionsFunctions): Promise { const credentials = await this.getCredentials('postgres'); @@ -32,19 +28,15 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise ({ - name: table.table_name as string, - value: table.table_name as string, - })), - }; - } finally { - if (!db.$pool.ending) await db.$pool.end(); - } + return { + results: response.map((table) => ({ + name: table.table_name as string, + value: table.table_name as string, + })), + }; } diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts index d906c7e0bd..c8fb75436d 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts @@ -18,17 +18,13 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise ({ - name: column.column_name, - value: column.column_name, - description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`, - })); - } finally { - if (!db.$pool.ending) await db.$pool.end(); - } + return columns.map((column) => ({ + name: column.column_name, + value: column.column_name, + description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`, + })); } export async function getColumnsMultiOptions( diff --git a/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts b/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts index 99f40cb1da..acc44b5314 100644 --- a/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts +++ b/packages/nodes-base/nodes/Postgres/v2/methods/resourceMapping.ts @@ -63,34 +63,30 @@ export async function getMappingColumns( extractValue: true, }) as string; - try { - const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true }); - const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : []; - const enumInfo = await getEnums(db); - const fields = await Promise.all( - columns.map(async (col) => { - const canBeUsedToMatch = - operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true; - const type = mapPostgresType(col.data_type); - const options = - type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined; - const hasDefault = Boolean(col.column_default); - const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS'; - const nullable = col.is_nullable === 'YES'; - return { - id: col.column_name, - displayName: col.column_name, - required: !nullable && !hasDefault && !isGenerated, - defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false, - display: true, - type, - canBeUsedToMatch, - options, - }; - }), - ); - return { fields }; - } finally { - if (!db.$pool.ending) await db.$pool.end(); - } + const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true }); + const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : []; + const enumInfo = await getEnums(db); + const fields = await Promise.all( + columns.map(async (col) => { + const canBeUsedToMatch = + operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true; + const type = mapPostgresType(col.data_type); + const options = + type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined; + const hasDefault = Boolean(col.column_default); + const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS'; + const nullable = col.is_nullable === 'YES'; + return { + id: col.column_name, + displayName: col.column_name, + required: !nullable && !hasDefault && !isGenerated, + defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false, + display: true, + type, + canBeUsedToMatch, + options, + }; + }), + ); + return { fields }; } diff --git a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts index 07362be3dc..0f1dfbf72b 100644 --- a/packages/nodes-base/nodes/Postgres/v2/transport/index.ts +++ b/packages/nodes-base/nodes/Postgres/v2/transport/index.ts @@ -7,6 +7,7 @@ import type { import { createServer, type AddressInfo } from 'node:net'; import pgPromise from 'pg-promise'; +import { ConnectionPoolManager } from '@utils/connection-pool-manager'; import { LOCALHOST } from '@utils/constants'; import { formatPrivateKey } from '@utils/utilities'; @@ -56,120 +57,135 @@ export async function configurePostgres( credentials: PostgresNodeCredentials, options: PostgresNodeOptions = {}, ): Promise { - const pgp = pgPromise({ - // prevent spam in console "WARNING: Creating a duplicate database object for the same connection." - // duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time - noWarnings: true, - }); + const poolManager = ConnectionPoolManager.getInstance(); - if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) { - // Always return dates as ISO strings - [pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => { - pgp.pg.types.setTypeParser(type, (value: string) => { - const parsedDate = new Date(value); + const fallBackHandler = async () => { + const pgp = pgPromise({ + // prevent spam in console "WARNING: Creating a duplicate database object for the same connection." + // duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time + noWarnings: true, + }); - if (isNaN(parsedDate.getTime())) { - return value; + if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) { + // Always return dates as ISO strings + [pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => { + pgp.pg.types.setTypeParser(type, (value: string) => { + const parsedDate = new Date(value); + + if (isNaN(parsedDate.getTime())) { + return value; + } + + return parsedDate.toISOString(); + }); + }); + } + + if (options.largeNumbersOutput === 'numbers') { + pgp.pg.types.setTypeParser(20, (value: string) => { + return parseInt(value, 10); + }); + pgp.pg.types.setTypeParser(1700, (value: string) => { + return parseFloat(value); + }); + } + + const dbConfig = getPostgresConfig(credentials, options); + + if (!credentials.sshTunnel) { + const db = pgp(dbConfig); + + return { db, pgp }; + } else { + if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) { + credentials.privateKey = formatPrivateKey(credentials.privateKey); + } + const sshClient = await this.helpers.getSSHClient(credentials); + + // Create a TCP proxy listening on a random available port + const proxy = createServer(); + const proxyPort = await new Promise((resolve) => { + proxy.listen(0, LOCALHOST, () => { + resolve((proxy.address() as AddressInfo).port); + }); + }); + + const close = () => { + proxy.close(); + sshClient.off('end', close); + sshClient.off('error', close); + }; + sshClient.on('end', close); + sshClient.on('error', close); + + await new Promise((resolve, reject) => { + proxy.on('error', (err) => reject(err)); + proxy.on('connection', (localSocket) => { + sshClient.forwardOut( + LOCALHOST, + localSocket.remotePort!, + credentials.host, + credentials.port, + (err, clientChannel) => { + if (err) { + proxy.close(); + localSocket.destroy(); + } else { + localSocket.pipe(clientChannel); + clientChannel.pipe(localSocket); + } + }, + ); + }); + resolve(); + }).catch((err) => { + proxy.close(); + + let message = err.message; + let description = err.description; + + if (err.message.includes('ECONNREFUSED')) { + message = 'Connection refused'; + try { + description = err.message.split('ECONNREFUSED ')[1].trim(); + } catch (e) {} } - return parsedDate.toISOString(); + if (err.message.includes('ENOTFOUND')) { + message = 'Host not found'; + try { + description = err.message.split('ENOTFOUND ')[1].trim(); + } catch (e) {} + } + + if (err.message.includes('ETIMEDOUT')) { + message = 'Connection timed out'; + try { + description = err.message.split('ETIMEDOUT ')[1].trim(); + } catch (e) {} + } + + err.message = message; + err.description = description; + throw err; }); - }); - } - if (options.largeNumbersOutput === 'numbers') { - pgp.pg.types.setTypeParser(20, (value: string) => { - return parseInt(value, 10); - }); - pgp.pg.types.setTypeParser(1700, (value: string) => { - return parseFloat(value); - }); - } - - const dbConfig = getPostgresConfig(credentials, options); - - if (!credentials.sshTunnel) { - const db = pgp(dbConfig); - return { db, pgp }; - } else { - if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) { - credentials.privateKey = formatPrivateKey(credentials.privateKey); + const db = pgp({ + ...dbConfig, + port: proxyPort, + host: LOCALHOST, + }); + return { db, pgp }; } - const sshClient = await this.helpers.getSSHClient(credentials); + }; - // Create a TCP proxy listening on a random available port - const proxy = createServer(); - const proxyPort = await new Promise((resolve) => { - proxy.listen(0, LOCALHOST, () => { - resolve((proxy.address() as AddressInfo).port); - }); - }); - - const close = () => { - proxy.close(); - sshClient.off('end', close); - sshClient.off('error', close); - }; - sshClient.on('end', close); - sshClient.on('error', close); - - await new Promise((resolve, reject) => { - proxy.on('error', (err) => reject(err)); - proxy.on('connection', (localSocket) => { - sshClient.forwardOut( - LOCALHOST, - localSocket.remotePort!, - credentials.host, - credentials.port, - (err, clientChannel) => { - if (err) { - proxy.close(); - localSocket.destroy(); - } else { - localSocket.pipe(clientChannel); - clientChannel.pipe(localSocket); - } - }, - ); - }); - resolve(); - }).catch((err) => { - proxy.close(); - - let message = err.message; - let description = err.description; - - if (err.message.includes('ECONNREFUSED')) { - message = 'Connection refused'; - try { - description = err.message.split('ECONNREFUSED ')[1].trim(); - } catch (e) {} - } - - if (err.message.includes('ENOTFOUND')) { - message = 'Host not found'; - try { - description = err.message.split('ENOTFOUND ')[1].trim(); - } catch (e) {} - } - - if (err.message.includes('ETIMEDOUT')) { - message = 'Connection timed out'; - try { - description = err.message.split('ETIMEDOUT ')[1].trim(); - } catch (e) {} - } - - err.message = message; - err.description = description; - throw err; - }); - - const db = pgp({ - ...dbConfig, - port: proxyPort, - host: LOCALHOST, - }); - return { db, pgp }; - } + return await poolManager.getConnection({ + credentials, + nodeType: 'postgres', + nodeVersion: options.nodeVersion as unknown as string, + fallBackHandler, + cleanUpHandler: async ({ db }) => { + await db.$pool.end(); + }, + }); } diff --git a/packages/nodes-base/utils/__tests__/connection-pool-manager.test.ts b/packages/nodes-base/utils/__tests__/connection-pool-manager.test.ts new file mode 100644 index 0000000000..fe2c814582 --- /dev/null +++ b/packages/nodes-base/utils/__tests__/connection-pool-manager.test.ts @@ -0,0 +1,179 @@ +import { ConnectionPoolManager } from '@utils/connection-pool-manager'; + +const ttl = 5 * 60 * 1000; +const cleanUpInterval = 60 * 1000; + +let cpm: ConnectionPoolManager; + +beforeAll(() => { + jest.useFakeTimers(); + cpm = ConnectionPoolManager.getInstance(); +}); + +beforeEach(async () => { + await cpm.purgeConnections(); +}); + +afterAll(() => { + cpm.onShutdown(); +}); + +test('getInstance returns a singleton', () => { + const instance1 = ConnectionPoolManager.getInstance(); + const instance2 = ConnectionPoolManager.getInstance(); + + expect(instance1).toBe(instance2); +}); + +describe('getConnection', () => { + test('calls fallBackHandler only once and returns the first value', async () => { + // ARRANGE + const connectionType = {}; + const fallBackHandler = jest.fn().mockResolvedValue(connectionType); + const cleanUpHandler = jest.fn(); + const options = { + credentials: {}, + nodeType: 'example', + nodeVersion: '1', + fallBackHandler, + cleanUpHandler, + }; + + // ACT 1 + const connection = await cpm.getConnection(options); + + // ASSERT 1 + expect(fallBackHandler).toHaveBeenCalledTimes(1); + expect(connection).toBe(connectionType); + + // ACT 2 + const connection2 = await cpm.getConnection(options); + // ASSERT 2 + expect(fallBackHandler).toHaveBeenCalledTimes(1); + expect(connection2).toBe(connectionType); + }); + + test('creates different pools for different node versions', async () => { + // ARRANGE + const connectionType1 = {}; + const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1); + const cleanUpHandler1 = jest.fn(); + + const connectionType2 = {}; + const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2); + const cleanUpHandler2 = jest.fn(); + + // ACT 1 + const connection1 = await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '1', + fallBackHandler: fallBackHandler1, + cleanUpHandler: cleanUpHandler1, + }); + const connection2 = await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '2', + fallBackHandler: fallBackHandler2, + cleanUpHandler: cleanUpHandler2, + }); + + // ASSERT + expect(fallBackHandler1).toHaveBeenCalledTimes(1); + expect(connection1).toBe(connectionType1); + + expect(fallBackHandler2).toHaveBeenCalledTimes(1); + expect(connection2).toBe(connectionType2); + + expect(connection1).not.toBe(connection2); + }); + + test('calls cleanUpHandler after TTL expires', async () => { + // ARRANGE + const connectionType = {}; + const fallBackHandler = jest.fn().mockResolvedValue(connectionType); + const cleanUpHandler = jest.fn(); + await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '1', + fallBackHandler, + cleanUpHandler, + }); + + // ACT + jest.advanceTimersByTime(ttl + cleanUpInterval * 2); + + // ASSERT + expect(cleanUpHandler).toHaveBeenCalledTimes(1); + }); +}); + +describe('onShutdown', () => { + test('calls all clean up handlers', async () => { + // ARRANGE + const connectionType1 = {}; + const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1); + const cleanUpHandler1 = jest.fn(); + await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '1', + fallBackHandler: fallBackHandler1, + cleanUpHandler: cleanUpHandler1, + }); + + const connectionType2 = {}; + const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2); + const cleanUpHandler2 = jest.fn(); + await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '2', + fallBackHandler: fallBackHandler2, + cleanUpHandler: cleanUpHandler2, + }); + + // ACT 1 + cpm.onShutdown(); + + // ASSERT + expect(cleanUpHandler1).toHaveBeenCalledTimes(1); + expect(cleanUpHandler2).toHaveBeenCalledTimes(1); + }); + + test('calls all clean up handlers when `exit` is emitted on process', async () => { + // ARRANGE + const connectionType1 = {}; + const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1); + const cleanUpHandler1 = jest.fn(); + await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '1', + fallBackHandler: fallBackHandler1, + cleanUpHandler: cleanUpHandler1, + }); + + const connectionType2 = {}; + const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2); + const cleanUpHandler2 = jest.fn(); + await cpm.getConnection({ + credentials: {}, + nodeType: 'example', + nodeVersion: '2', + fallBackHandler: fallBackHandler2, + cleanUpHandler: cleanUpHandler2, + }); + + // ACT 1 + // @ts-expect-error we're not supposed to emit `exit` so it's missing from + // the type definition + process.emit('exit'); + + // ASSERT + expect(cleanUpHandler1).toHaveBeenCalledTimes(1); + expect(cleanUpHandler2).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/nodes-base/utils/connection-pool-manager.ts b/packages/nodes-base/utils/connection-pool-manager.ts new file mode 100644 index 0000000000..9cdf86b0b4 --- /dev/null +++ b/packages/nodes-base/utils/connection-pool-manager.ts @@ -0,0 +1,137 @@ +import { createHash } from 'crypto'; + +let instance: ConnectionPoolManager; + +// 5 minutes +const ttl = 5 * 60 * 1000; + +// 1 minute +const cleanUpInterval = 60 * 1000; + +type RegistrationOptions = { + credentials: unknown; + nodeType: string; + nodeVersion?: string; +}; + +type GetConnectionOption = RegistrationOptions & { + /** When a node requests for a connection pool, but none is available, this handler is called to create new instance of the pool, which then cached and re-used until it goes stale. */ + fallBackHandler: () => Promise; + + /** When a pool hasn't been used in a while, or when the server is shutting down, this handler is invoked to close the pool */ + cleanUpHandler: (pool: Pool) => Promise; +}; + +type Registration = { + /** This is an instance of a Connection Pool class, that gets reused across multiple executions */ + pool: Pool; + + /** @see GetConnectionOption['closeHandler'] */ + cleanUpHandler: (pool: Pool) => Promise; + + /** We keep this timestamp to check if a pool hasn't been used in a while, and if it needs to be closed */ + lastUsed: number; +}; + +export class ConnectionPoolManager { + /** + * Gets the singleton instance of the ConnectionPoolManager. + * Creates a new instance if one doesn't exist. + */ + static getInstance(): ConnectionPoolManager { + if (!instance) { + instance = new ConnectionPoolManager(); + } + return instance; + } + + private map = new Map>(); + + /** + * Private constructor that initializes the connection pool manager. + * Sets up cleanup handlers for process exit and stale connections. + */ + private constructor() { + // Close all open pools when the process exits + process.on('exit', () => this.onShutdown()); + + // Regularly close stale pools + setInterval(() => this.cleanupStaleConnections(), cleanUpInterval); + } + + /** + * Generates a unique key for connection pool identification. + * Hashes the credentials and node information for security. + */ + private makeKey({ credentials, nodeType, nodeVersion }: RegistrationOptions): string { + // The credential contains decrypted secrets, that's why we hash it. + return createHash('sha1') + .update( + JSON.stringify({ + credentials, + nodeType, + nodeVersion, + }), + ) + .digest('base64'); + } + + /** + * Gets or creates a connection pool for the given options. + * Updates the last used timestamp for existing connections. + */ + async getConnection(options: GetConnectionOption): Promise { + const key = this.makeKey(options); + + let value = this.map.get(key); + if (!value) { + value = { + pool: await options.fallBackHandler(), + cleanUpHandler: options.cleanUpHandler, + } as Registration; + } + + this.map.set(key, { ...value, lastUsed: Date.now() }); + return value.pool as T; + } + + /** + * Removes and cleans up connection pools that haven't been used within the + * TTL. + */ + private cleanupStaleConnections() { + const now = Date.now(); + for (const [key, { cleanUpHandler, lastUsed, pool }] of this.map.entries()) { + if (now - lastUsed > ttl) { + void cleanUpHandler(pool); + this.map.delete(key); + } + } + } + + /** + * Removes and cleans up all existing connection pools. + */ + async purgeConnections(): Promise { + await Promise.all( + [...this.map.entries()].map(async ([key, value]) => { + this.map.delete(key); + + return await value.cleanUpHandler(value.pool); + }), + ); + } + + /** + * Cleans up all connection pools when the process is shutting down. + * Does not wait for cleanup promises to resolve also does not remove the + * references from the pool. + * + * Only call this on process shutdown. + */ + onShutdown() { + for (const { cleanUpHandler, pool } of this.map.values()) { + void cleanUpHandler(pool); + } + } +}