fix(Postgres Node): Re-use connection pool across executions (#12346)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Danny Martini 2024-12-27 08:15:37 +00:00 committed by GitHub
parent 7b2630d1a0
commit 2ca37f5f7f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 504 additions and 200 deletions

View file

@ -102,7 +102,6 @@ export async function searchSchema(this: ILoadOptionsFunctions): Promise<INodeLi
name: s.schema_name as string,
value: s.schema_name as string,
}));
await db.$pool.end();
return { results };
}
@ -122,6 +121,5 @@ export async function searchTables(this: ILoadOptionsFunctions): Promise<INodeLi
name: s.table_name as string,
value: s.table_name as string,
}));
await db.$pool.end();
return { results };
}

View file

@ -321,7 +321,6 @@ export class PostgresTrigger implements INodeType {
}
} finally {
connection.client.removeListener('notification', onNotification);
if (!db.$pool.ending) await db.$pool.end();
}
};

View file

@ -320,7 +320,6 @@ export class PostgresV1 implements INodeType {
const db = pgp(config);
await db.connect();
await db.$pool.end();
} catch (error) {
return {
status: 'Error',
@ -409,16 +408,12 @@ export class PostgresV1 implements INodeType {
returnItems = wrapData(updateItems);
} else {
await db.$pool.end();
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}
// shuts down the connection pool associated with the db object to allow the process to finish
await db.$pool.end();
return [returnItems];
}
}

View file

@ -35,25 +35,21 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
operation,
} as PostgresType;
try {
switch (postgresNodeData.resource) {
case 'database':
returnData = await database[postgresNodeData.operation].execute.call(
this,
runQueries,
items,
options,
db,
);
break;
default:
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}
} finally {
if (!db.$pool.ending) await db.$pool.end();
switch (postgresNodeData.resource) {
case 'database':
returnData = await database[postgresNodeData.operation].execute.call(
this,
runQueries,
items,
options,
db,
);
break;
default:
throw new NodeOperationError(
this.getNode(),
`The operation "${operation}" is not supported!`,
);
}
if (operation === 'select' && items.length > 1 && !node.executeOnce) {

View file

@ -9,18 +9,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
const { db } = await configurePostgres.call(this, credentials, options);
try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
return {
results: response.map((schema) => ({
name: schema.schema_name as string,
value: schema.schema_name as string,
})),
};
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
return {
results: response.map((schema) => ({
name: schema.schema_name as string,
value: schema.schema_name as string,
})),
};
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials<PostgresNodeCredentials>('postgres');
@ -32,19 +28,15 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
extractValue: true,
}) as string;
try {
const response = await db.any(
'SELECT table_name FROM information_schema.tables WHERE table_schema=$1',
[schema],
);
const response = await db.any(
'SELECT table_name FROM information_schema.tables WHERE table_schema=$1',
[schema],
);
return {
results: response.map((table) => ({
name: table.table_name as string,
value: table.table_name as string,
})),
};
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
return {
results: response.map((table) => ({
name: table.table_name as string,
value: table.table_name as string,
})),
};
}

View file

@ -18,17 +18,13 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
extractValue: true,
}) as string;
try {
const columns = await getTableSchema(db, schema, table);
const columns = await getTableSchema(db, schema, table);
return columns.map((column) => ({
name: column.column_name,
value: column.column_name,
description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
}));
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
return columns.map((column) => ({
name: column.column_name,
value: column.column_name,
description: `Type: ${column.data_type.toUpperCase()}, Nullable: ${column.is_nullable}`,
}));
}
export async function getColumnsMultiOptions(

View file

@ -63,34 +63,30 @@ export async function getMappingColumns(
extractValue: true,
}) as string;
try {
const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true });
const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : [];
const enumInfo = await getEnums(db);
const fields = await Promise.all(
columns.map(async (col) => {
const canBeUsedToMatch =
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
const type = mapPostgresType(col.data_type);
const options =
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
const hasDefault = Boolean(col.column_default);
const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS';
const nullable = col.is_nullable === 'YES';
return {
id: col.column_name,
displayName: col.column_name,
required: !nullable && !hasDefault && !isGenerated,
defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false,
display: true,
type,
canBeUsedToMatch,
options,
};
}),
);
return { fields };
} finally {
if (!db.$pool.ending) await db.$pool.end();
}
const columns = await getTableSchema(db, schema, table, { getColumnsForResourceMapper: true });
const unique = operation === 'upsert' ? await uniqueColumns(db, table, schema) : [];
const enumInfo = await getEnums(db);
const fields = await Promise.all(
columns.map(async (col) => {
const canBeUsedToMatch =
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
const type = mapPostgresType(col.data_type);
const options =
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
const hasDefault = Boolean(col.column_default);
const isGenerated = col.is_generated === 'ALWAYS' || col.identity_generation === 'ALWAYS';
const nullable = col.is_nullable === 'YES';
return {
id: col.column_name,
displayName: col.column_name,
required: !nullable && !hasDefault && !isGenerated,
defaultMatch: (col.column_name === 'id' && canBeUsedToMatch) || false,
display: true,
type,
canBeUsedToMatch,
options,
};
}),
);
return { fields };
}

View file

@ -7,6 +7,7 @@ import type {
import { createServer, type AddressInfo } from 'node:net';
import pgPromise from 'pg-promise';
import { ConnectionPoolManager } from '@utils/connection-pool-manager';
import { LOCALHOST } from '@utils/constants';
import { formatPrivateKey } from '@utils/utilities';
@ -56,120 +57,135 @@ export async function configurePostgres(
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
): Promise<ConnectionsData> {
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time
noWarnings: true,
});
const poolManager = ConnectionPoolManager.getInstance();
if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
[pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => {
pgp.pg.types.setTypeParser(type, (value: string) => {
const parsedDate = new Date(value);
const fallBackHandler = async () => {
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time
noWarnings: true,
});
if (isNaN(parsedDate.getTime())) {
return value;
if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
[pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => {
pgp.pg.types.setTypeParser(type, (value: string) => {
const parsedDate = new Date(value);
if (isNaN(parsedDate.getTime())) {
return value;
}
return parsedDate.toISOString();
});
});
}
if (options.largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const dbConfig = getPostgresConfig(credentials, options);
if (!credentials.sshTunnel) {
const db = pgp(dbConfig);
return { db, pgp };
} else {
if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) {
credentials.privateKey = formatPrivateKey(credentials.privateKey);
}
const sshClient = await this.helpers.getSSHClient(credentials);
// Create a TCP proxy listening on a random available port
const proxy = createServer();
const proxyPort = await new Promise<number>((resolve) => {
proxy.listen(0, LOCALHOST, () => {
resolve((proxy.address() as AddressInfo).port);
});
});
const close = () => {
proxy.close();
sshClient.off('end', close);
sshClient.off('error', close);
};
sshClient.on('end', close);
sshClient.on('error', close);
await new Promise<void>((resolve, reject) => {
proxy.on('error', (err) => reject(err));
proxy.on('connection', (localSocket) => {
sshClient.forwardOut(
LOCALHOST,
localSocket.remotePort!,
credentials.host,
credentials.port,
(err, clientChannel) => {
if (err) {
proxy.close();
localSocket.destroy();
} else {
localSocket.pipe(clientChannel);
clientChannel.pipe(localSocket);
}
},
);
});
resolve();
}).catch((err) => {
proxy.close();
let message = err.message;
let description = err.description;
if (err.message.includes('ECONNREFUSED')) {
message = 'Connection refused';
try {
description = err.message.split('ECONNREFUSED ')[1].trim();
} catch (e) {}
}
return parsedDate.toISOString();
if (err.message.includes('ENOTFOUND')) {
message = 'Host not found';
try {
description = err.message.split('ENOTFOUND ')[1].trim();
} catch (e) {}
}
if (err.message.includes('ETIMEDOUT')) {
message = 'Connection timed out';
try {
description = err.message.split('ETIMEDOUT ')[1].trim();
} catch (e) {}
}
err.message = message;
err.description = description;
throw err;
});
});
}
if (options.largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const dbConfig = getPostgresConfig(credentials, options);
if (!credentials.sshTunnel) {
const db = pgp(dbConfig);
return { db, pgp };
} else {
if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) {
credentials.privateKey = formatPrivateKey(credentials.privateKey);
const db = pgp({
...dbConfig,
port: proxyPort,
host: LOCALHOST,
});
return { db, pgp };
}
const sshClient = await this.helpers.getSSHClient(credentials);
};
// Create a TCP proxy listening on a random available port
const proxy = createServer();
const proxyPort = await new Promise<number>((resolve) => {
proxy.listen(0, LOCALHOST, () => {
resolve((proxy.address() as AddressInfo).port);
});
});
const close = () => {
proxy.close();
sshClient.off('end', close);
sshClient.off('error', close);
};
sshClient.on('end', close);
sshClient.on('error', close);
await new Promise<void>((resolve, reject) => {
proxy.on('error', (err) => reject(err));
proxy.on('connection', (localSocket) => {
sshClient.forwardOut(
LOCALHOST,
localSocket.remotePort!,
credentials.host,
credentials.port,
(err, clientChannel) => {
if (err) {
proxy.close();
localSocket.destroy();
} else {
localSocket.pipe(clientChannel);
clientChannel.pipe(localSocket);
}
},
);
});
resolve();
}).catch((err) => {
proxy.close();
let message = err.message;
let description = err.description;
if (err.message.includes('ECONNREFUSED')) {
message = 'Connection refused';
try {
description = err.message.split('ECONNREFUSED ')[1].trim();
} catch (e) {}
}
if (err.message.includes('ENOTFOUND')) {
message = 'Host not found';
try {
description = err.message.split('ENOTFOUND ')[1].trim();
} catch (e) {}
}
if (err.message.includes('ETIMEDOUT')) {
message = 'Connection timed out';
try {
description = err.message.split('ETIMEDOUT ')[1].trim();
} catch (e) {}
}
err.message = message;
err.description = description;
throw err;
});
const db = pgp({
...dbConfig,
port: proxyPort,
host: LOCALHOST,
});
return { db, pgp };
}
return await poolManager.getConnection({
credentials,
nodeType: 'postgres',
nodeVersion: options.nodeVersion as unknown as string,
fallBackHandler,
cleanUpHandler: async ({ db }) => {
await db.$pool.end();
},
});
}

View file

@ -0,0 +1,179 @@
import { ConnectionPoolManager } from '@utils/connection-pool-manager';
const ttl = 5 * 60 * 1000;
const cleanUpInterval = 60 * 1000;
let cpm: ConnectionPoolManager;
beforeAll(() => {
jest.useFakeTimers();
cpm = ConnectionPoolManager.getInstance();
});
beforeEach(async () => {
await cpm.purgeConnections();
});
afterAll(() => {
cpm.onShutdown();
});
test('getInstance returns a singleton', () => {
const instance1 = ConnectionPoolManager.getInstance();
const instance2 = ConnectionPoolManager.getInstance();
expect(instance1).toBe(instance2);
});
describe('getConnection', () => {
test('calls fallBackHandler only once and returns the first value', async () => {
// ARRANGE
const connectionType = {};
const fallBackHandler = jest.fn().mockResolvedValue(connectionType);
const cleanUpHandler = jest.fn();
const options = {
credentials: {},
nodeType: 'example',
nodeVersion: '1',
fallBackHandler,
cleanUpHandler,
};
// ACT 1
const connection = await cpm.getConnection<string>(options);
// ASSERT 1
expect(fallBackHandler).toHaveBeenCalledTimes(1);
expect(connection).toBe(connectionType);
// ACT 2
const connection2 = await cpm.getConnection<string>(options);
// ASSERT 2
expect(fallBackHandler).toHaveBeenCalledTimes(1);
expect(connection2).toBe(connectionType);
});
test('creates different pools for different node versions', async () => {
// ARRANGE
const connectionType1 = {};
const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1);
const cleanUpHandler1 = jest.fn();
const connectionType2 = {};
const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2);
const cleanUpHandler2 = jest.fn();
// ACT 1
const connection1 = await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '1',
fallBackHandler: fallBackHandler1,
cleanUpHandler: cleanUpHandler1,
});
const connection2 = await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '2',
fallBackHandler: fallBackHandler2,
cleanUpHandler: cleanUpHandler2,
});
// ASSERT
expect(fallBackHandler1).toHaveBeenCalledTimes(1);
expect(connection1).toBe(connectionType1);
expect(fallBackHandler2).toHaveBeenCalledTimes(1);
expect(connection2).toBe(connectionType2);
expect(connection1).not.toBe(connection2);
});
test('calls cleanUpHandler after TTL expires', async () => {
// ARRANGE
const connectionType = {};
const fallBackHandler = jest.fn().mockResolvedValue(connectionType);
const cleanUpHandler = jest.fn();
await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '1',
fallBackHandler,
cleanUpHandler,
});
// ACT
jest.advanceTimersByTime(ttl + cleanUpInterval * 2);
// ASSERT
expect(cleanUpHandler).toHaveBeenCalledTimes(1);
});
});
describe('onShutdown', () => {
test('calls all clean up handlers', async () => {
// ARRANGE
const connectionType1 = {};
const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1);
const cleanUpHandler1 = jest.fn();
await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '1',
fallBackHandler: fallBackHandler1,
cleanUpHandler: cleanUpHandler1,
});
const connectionType2 = {};
const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2);
const cleanUpHandler2 = jest.fn();
await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '2',
fallBackHandler: fallBackHandler2,
cleanUpHandler: cleanUpHandler2,
});
// ACT 1
cpm.onShutdown();
// ASSERT
expect(cleanUpHandler1).toHaveBeenCalledTimes(1);
expect(cleanUpHandler2).toHaveBeenCalledTimes(1);
});
test('calls all clean up handlers when `exit` is emitted on process', async () => {
// ARRANGE
const connectionType1 = {};
const fallBackHandler1 = jest.fn().mockResolvedValue(connectionType1);
const cleanUpHandler1 = jest.fn();
await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '1',
fallBackHandler: fallBackHandler1,
cleanUpHandler: cleanUpHandler1,
});
const connectionType2 = {};
const fallBackHandler2 = jest.fn().mockResolvedValue(connectionType2);
const cleanUpHandler2 = jest.fn();
await cpm.getConnection<string>({
credentials: {},
nodeType: 'example',
nodeVersion: '2',
fallBackHandler: fallBackHandler2,
cleanUpHandler: cleanUpHandler2,
});
// ACT 1
// @ts-expect-error we're not supposed to emit `exit` so it's missing from
// the type definition
process.emit('exit');
// ASSERT
expect(cleanUpHandler1).toHaveBeenCalledTimes(1);
expect(cleanUpHandler2).toHaveBeenCalledTimes(1);
});
});

View file

@ -0,0 +1,137 @@
import { createHash } from 'crypto';
let instance: ConnectionPoolManager;
// 5 minutes
const ttl = 5 * 60 * 1000;
// 1 minute
const cleanUpInterval = 60 * 1000;
type RegistrationOptions = {
credentials: unknown;
nodeType: string;
nodeVersion?: string;
};
type GetConnectionOption<Pool> = RegistrationOptions & {
/** When a node requests for a connection pool, but none is available, this handler is called to create new instance of the pool, which then cached and re-used until it goes stale. */
fallBackHandler: () => Promise<Pool>;
/** When a pool hasn't been used in a while, or when the server is shutting down, this handler is invoked to close the pool */
cleanUpHandler: (pool: Pool) => Promise<void>;
};
type Registration<Pool> = {
/** This is an instance of a Connection Pool class, that gets reused across multiple executions */
pool: Pool;
/** @see GetConnectionOption['closeHandler'] */
cleanUpHandler: (pool: Pool) => Promise<void>;
/** We keep this timestamp to check if a pool hasn't been used in a while, and if it needs to be closed */
lastUsed: number;
};
export class ConnectionPoolManager {
/**
* Gets the singleton instance of the ConnectionPoolManager.
* Creates a new instance if one doesn't exist.
*/
static getInstance(): ConnectionPoolManager {
if (!instance) {
instance = new ConnectionPoolManager();
}
return instance;
}
private map = new Map<string, Registration<unknown>>();
/**
* Private constructor that initializes the connection pool manager.
* Sets up cleanup handlers for process exit and stale connections.
*/
private constructor() {
// Close all open pools when the process exits
process.on('exit', () => this.onShutdown());
// Regularly close stale pools
setInterval(() => this.cleanupStaleConnections(), cleanUpInterval);
}
/**
* Generates a unique key for connection pool identification.
* Hashes the credentials and node information for security.
*/
private makeKey({ credentials, nodeType, nodeVersion }: RegistrationOptions): string {
// The credential contains decrypted secrets, that's why we hash it.
return createHash('sha1')
.update(
JSON.stringify({
credentials,
nodeType,
nodeVersion,
}),
)
.digest('base64');
}
/**
* Gets or creates a connection pool for the given options.
* Updates the last used timestamp for existing connections.
*/
async getConnection<T>(options: GetConnectionOption<T>): Promise<T> {
const key = this.makeKey(options);
let value = this.map.get(key);
if (!value) {
value = {
pool: await options.fallBackHandler(),
cleanUpHandler: options.cleanUpHandler,
} as Registration<unknown>;
}
this.map.set(key, { ...value, lastUsed: Date.now() });
return value.pool as T;
}
/**
* Removes and cleans up connection pools that haven't been used within the
* TTL.
*/
private cleanupStaleConnections() {
const now = Date.now();
for (const [key, { cleanUpHandler, lastUsed, pool }] of this.map.entries()) {
if (now - lastUsed > ttl) {
void cleanUpHandler(pool);
this.map.delete(key);
}
}
}
/**
* Removes and cleans up all existing connection pools.
*/
async purgeConnections(): Promise<void> {
await Promise.all(
[...this.map.entries()].map(async ([key, value]) => {
this.map.delete(key);
return await value.cleanUpHandler(value.pool);
}),
);
}
/**
* Cleans up all connection pools when the process is shutting down.
* Does not wait for cleanup promises to resolve also does not remove the
* references from the pool.
*
* Only call this on process shutdown.
*/
onShutdown() {
for (const { cleanUpHandler, pool } of this.map.values()) {
void cleanUpHandler(pool);
}
}
}