n8n/packages/nodes-base/nodes/Postgres/v2/transport/index.ts

169 lines
4.5 KiB
TypeScript

import { createServer, type AddressInfo } from 'node:net';
import pgPromise from 'pg-promise';
import type {
IExecuteFunctions,
ICredentialTestFunctions,
ILoadOptionsFunctions,
ITriggerFunctions,
} from 'n8n-workflow';
import type {
ConnectionsData,
PgpConnectionParameters,
PostgresNodeCredentials,
PostgresNodeOptions,
} from '../helpers/interfaces';
import { formatPrivateKey } from '@utils/utilities';
import { LOCALHOST } from '@utils/constants';
const getPostgresConfig = (
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
) => {
const dbConfig: PgpConnectionParameters = {
host: credentials.host,
port: credentials.port,
database: credentials.database,
user: credentials.user,
password: credentials.password,
keepAlive: true,
};
if (options.connectionTimeout) {
dbConfig.connectionTimeoutMillis = options.connectionTimeout * 1000;
}
if (options.delayClosingIdleConnection) {
dbConfig.keepAliveInitialDelayMillis = options.delayClosingIdleConnection * 1000;
}
if (credentials.allowUnauthorizedCerts === true) {
dbConfig.ssl = {
rejectUnauthorized: false,
};
} else {
dbConfig.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined);
// @ts-ignore these typings need to be updated
dbConfig.sslmode = credentials.ssl || 'disable';
}
return dbConfig;
};
export async function configurePostgres(
this: IExecuteFunctions | ICredentialTestFunctions | ILoadOptionsFunctions | ITriggerFunctions,
credentials: PostgresNodeCredentials,
options: PostgresNodeOptions = {},
): Promise<ConnectionsData> {
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed immediately after, but several could be open at the same time
noWarnings: true,
});
if (typeof options.nodeVersion === 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
[pgp.pg.types.builtins.TIMESTAMP, pgp.pg.types.builtins.TIMESTAMPTZ].forEach((type) => {
pgp.pg.types.setTypeParser(type, (value: string) => {
return new Date(value).toISOString();
});
});
}
if (options.largeNumbersOutput === 'numbers') {
pgp.pg.types.setTypeParser(20, (value: string) => {
return parseInt(value, 10);
});
pgp.pg.types.setTypeParser(1700, (value: string) => {
return parseFloat(value);
});
}
const dbConfig = getPostgresConfig(credentials, options);
if (!credentials.sshTunnel) {
const db = pgp(dbConfig);
return { db, pgp };
} else {
if (credentials.sshAuthenticateWith === 'privateKey' && credentials.privateKey) {
credentials.privateKey = formatPrivateKey(credentials.privateKey);
}
const sshClient = await this.helpers.getSSHClient(credentials);
// Create a TCP proxy listening on a random available port
const proxy = createServer();
const proxyPort = await new Promise<number>((resolve) => {
proxy.listen(0, LOCALHOST, () => {
resolve((proxy.address() as AddressInfo).port);
});
});
const close = () => {
proxy.close();
sshClient.off('end', close);
sshClient.off('error', close);
};
sshClient.on('end', close);
sshClient.on('error', close);
await new Promise<void>((resolve, reject) => {
proxy.on('error', (err) => reject(err));
proxy.on('connection', (localSocket) => {
sshClient.forwardOut(
LOCALHOST,
localSocket.remotePort!,
credentials.host,
credentials.port,
(err, clientChannel) => {
if (err) {
proxy.close();
localSocket.destroy();
} else {
localSocket.pipe(clientChannel);
clientChannel.pipe(localSocket);
}
},
);
});
resolve();
}).catch((err) => {
proxy.close();
let message = err.message;
let description = err.description;
if (err.message.includes('ECONNREFUSED')) {
message = 'Connection refused';
try {
description = err.message.split('ECONNREFUSED ')[1].trim();
} catch (e) {}
}
if (err.message.includes('ENOTFOUND')) {
message = 'Host not found';
try {
description = err.message.split('ENOTFOUND ')[1].trim();
} catch (e) {}
}
if (err.message.includes('ETIMEDOUT')) {
message = 'Connection timed out';
try {
description = err.message.split('ETIMEDOUT ')[1].trim();
} catch (e) {}
}
err.message = message;
err.description = description;
throw err;
});
const db = pgp({
...dbConfig,
port: proxyPort,
host: LOCALHOST,
});
return { db, pgp };
}
}