fix(Postgres Node): Remove reusable connections (no-changelog) (#6259)

This commit is contained in:
Michael Kret 2023-05-19 16:42:24 +03:00 committed by GitHub
parent 4b5cbe7750
commit be5d3264ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 64 additions and 61 deletions

View file

@ -1,5 +1,6 @@
import type { IExecuteFunctions } from 'n8n-core'; import type { IExecuteFunctions } from 'n8n-core';
import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { updateDisplayOptions } from '../../../../../utils/utilities'; import { updateDisplayOptions } from '../../../../../utils/utilities';
@ -181,6 +182,13 @@ export async function execute(
valueToMatchOn = this.getNodeParameter('valueToMatchOn', i) as string; valueToMatchOn = this.getNodeParameter('valueToMatchOn', i) as string;
} }
if (!item[columnToMatchOn] && dataMode === 'autoMapInputData') {
throw new NodeOperationError(
this.getNode(),
"Column to match on not found in input item. Add a column to match on or set the 'Data Mode' to 'Define Below' to define the value to match on.",
);
}
const tableSchema = await getTableSchema(db, schema, table); const tableSchema = await getTableSchema(db, schema, table);
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i); item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
@ -195,6 +203,13 @@ export async function execute(
const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn); const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn);
if (!Object.keys(updateColumns).length) {
throw new NodeOperationError(
this.getNode(),
"Add values to update to the input item or set the 'Data Mode' to 'Define Below' to define the values to update.",
);
}
const updates: string[] = []; const updates: string[] = [];
for (const column of updateColumns) { for (const column of updateColumns) {

View file

@ -1,5 +1,6 @@
import type { IExecuteFunctions } from 'n8n-core'; import type { IExecuteFunctions } from 'n8n-core';
import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow'; import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { updateDisplayOptions } from '../../../../../utils/utilities'; import { updateDisplayOptions } from '../../../../../utils/utilities';
@ -179,6 +180,20 @@ export async function execute(
item[columnToMatchOn] = this.getNodeParameter('valueToMatchOn', i) as string; item[columnToMatchOn] = this.getNodeParameter('valueToMatchOn', i) as string;
} }
if (!item[columnToMatchOn]) {
throw new NodeOperationError(
this.getNode(),
"Column to match on not found in input item. Add a column to match on or set the 'Data Mode' to 'Define Below' to define the value to match on.",
);
}
if (item[columnToMatchOn] && Object.keys(item).length === 1) {
throw new NodeOperationError(
this.getNode(),
"Add values to update or insert to the input item or set the 'Data Mode' to 'Define Below' to define the values to insert or update.",
);
}
const tableSchema = await getTableSchema(db, schema, table); const tableSchema = await getTableSchema(db, schema, table);
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i); item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);

View file

@ -4,9 +4,8 @@ import { NodeOperationError } from 'n8n-workflow';
import type { PostgresType } from './node.type'; import type { PostgresType } from './node.type';
import * as database from './database/Database.resource'; import * as database from './database/Database.resource';
import { Connections } from '../transport'; import { configurePostgres } from '../transport';
import { configureQueryRunner } from '../helpers/utils'; import { configureQueryRunner } from '../helpers/utils';
import type { ConnectionsData } from '../helpers/interfaces';
export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = []; let returnData: INodeExecutionData[] = [];
@ -19,11 +18,7 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
const options = this.getNodeParameter('options', 0, {}); const options = this.getNodeParameter('options', 0, {});
options.nodeVersion = this.getNode().typeVersion; options.nodeVersion = this.getNode().typeVersion;
const { db, pgp, sshClient } = (await Connections.getInstance( const { db, pgp, sshClient } = await configurePostgres(credentials, options);
credentials,
options,
true,
)) as ConnectionsData;
const runQueries = configureQueryRunner( const runQueries = configureQueryRunner(
this.getNode(), this.getNode(),

View file

@ -5,10 +5,10 @@ import type {
INodeCredentialTestResult, INodeCredentialTestResult,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { Connections } from '../transport'; import { configurePostgres } from '../transport';
import { Client } from 'ssh2'; import { Client } from 'ssh2';
import type { ConnectionsData, PgpClient } from '../helpers/interfaces'; import type { PgpClient } from '../helpers/interfaces';
export async function postgresConnectionTest( export async function postgresConnectionTest(
this: ICredentialTestFunctions, this: ICredentialTestFunctions,
@ -20,12 +20,7 @@ export async function postgresConnectionTest(
let pgpClientCreated: PgpClient | undefined; let pgpClientCreated: PgpClient | undefined;
try { try {
const { db, pgp, sshClient } = (await Connections.getInstance( const { db, pgp, sshClient } = await configurePostgres(credentials, {}, sshClientCreated);
credentials,
{},
true,
sshClientCreated,
)) as ConnectionsData;
sshClientCreated = sshClient; sshClientCreated = sshClient;
pgpClientCreated = pgp; pgpClientCreated = pgp;
@ -57,9 +52,6 @@ export async function postgresConnectionTest(
if (pgpClientCreated) { if (pgpClientCreated) {
pgpClientCreated.end(); pgpClientCreated.end();
} }
//set the connection instance to null so that it can be recreated
await Connections.getInstance({}, {}, false, undefined, true);
} }
return { return {
status: 'OK', status: 'OK',

View file

@ -1,12 +1,12 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow'; import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
import type { ConnectionsData } from '../helpers/interfaces';
import { Connections } from '../transport'; import { configurePostgres } from '../transport';
export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> { export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres'); const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion }; const options = { nodeVersion: this.getNode().typeVersion };
const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData; const { db, pgp, sshClient } = await configurePostgres(credentials, options);
try { try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata'); const response = await db.any('SELECT schema_name FROM information_schema.schemata');
@ -19,13 +19,18 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
}; };
} catch (error) { } catch (error) {
throw error; throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
} }
} }
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> { export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres'); const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion }; const options = { nodeVersion: this.getNode().typeVersion };
const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData; const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const schema = this.getNodeParameter('schema', 0, { const schema = this.getNodeParameter('schema', 0, {
extractValue: true, extractValue: true,
@ -45,5 +50,10 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
}; };
} catch (error) { } catch (error) {
throw error; throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
} }
} }

View file

@ -1,13 +1,13 @@
import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow'; import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
import type { ConnectionsData } from '../helpers/interfaces';
import { getTableSchema } from '../helpers/utils'; import { getTableSchema } from '../helpers/utils';
import { Connections } from '../transport'; import { configurePostgres } from '../transport';
export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> { export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials('postgres'); const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion }; const options = { nodeVersion: this.getNode().typeVersion };
const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData; const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const schema = this.getNodeParameter('schema', 0, { const schema = this.getNodeParameter('schema', 0, {
extractValue: true, extractValue: true,
@ -27,6 +27,11 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
})); }));
} catch (error) { } catch (error) {
throw error; throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
} }
} }

View file

@ -11,7 +11,7 @@ import pgPromise from 'pg-promise';
import { rm, writeFile } from 'fs/promises'; import { rm, writeFile } from 'fs/promises';
import { file } from 'tmp-promise'; import { file } from 'tmp-promise';
import type { PgpClient, PgpDatabase } from '../helpers/interfaces'; import type { PgpDatabase } from '../helpers/interfaces';
async function createSshConnectConfig(credentials: IDataObject) { async function createSshConnectConfig(credentials: IDataObject) {
if (credentials.sshAuthenticateWith === 'password') { if (credentials.sshAuthenticateWith === 'password') {
@ -40,12 +40,16 @@ async function createSshConnectConfig(credentials: IDataObject) {
} }
} }
async function configurePostgres( export async function configurePostgres(
credentials: IDataObject, credentials: IDataObject,
options: IDataObject = {}, options: IDataObject = {},
createdSshClient?: Client, createdSshClient?: Client,
) { ) {
const pgp = pgPromise(); const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed imidiatly after, but several could be open at the same time
noWarnings: true,
});
if (typeof options.nodeVersion == 'number' && options.nodeVersion >= 2.1) { if (typeof options.nodeVersion == 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings // Always return dates as ISO strings
@ -183,36 +187,3 @@ async function configurePostgres(
return { db, pgp, sshClient }; return { db, pgp, sshClient };
} }
} }
export const Connections = (function () {
let instance: { db: PgpDatabase; pgp: PgpClient; sshClient?: Client } | null = null;
return {
async getInstance(
credentials: IDataObject = {},
options: IDataObject = {},
reload = false,
createdSshClient?: Client,
nulify = false,
) {
if (nulify) {
instance = null;
return instance;
}
if (instance !== null && reload) {
if (instance.sshClient) {
instance.sshClient.end();
}
instance.pgp.end();
instance = null;
}
if (instance === null && Object.keys(credentials).length) {
instance = await configurePostgres(credentials, options, createdSshClient);
}
return instance;
},
};
})();