fix(Postgres Node): Connection pool of the database object has been destroyed (#7074)

Github issue / Community forum post (link here to close automatically):
This commit is contained in:
Michael Kret 2023-09-01 22:19:10 +03:00 committed by GitHub
parent 008cdcce56
commit 9dd5f0e579
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 83 additions and 27 deletions

View file

@ -89,7 +89,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
},
],
},
options: {},
options: { typeVersion: 2.1 },
};
const nodeOptions = nodeParameters.options as IDataObject;
@ -168,7 +168,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
cachedResultName: 'my_table',
},
deleteCommand: 'drop',
options: {},
options: { typeVersion: 2.1 },
};
const nodeOptions = nodeParameters.options as IDataObject;
@ -256,7 +256,7 @@ describe('Test PostgresV2, insert operation', () => {
},
],
},
options: {},
options: { typeVersion: 2.1 },
};
const columnsInfo: ColumnInfo[] = [
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
@ -299,7 +299,7 @@ describe('Test PostgresV2, insert operation', () => {
mode: 'list',
},
dataMode: 'autoMapInputData',
options: {},
options: { typeVersion: 2.1 },
};
const columnsInfo: ColumnInfo[] = [
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
@ -509,6 +509,7 @@ describe('Test PostgresV2, update operation', () => {
},
options: {
outputColumns: ['json', 'foo'],
typeVersion: 2.1,
},
};
const columnsInfo: ColumnInfo[] = [
@ -565,7 +566,7 @@ describe('Test PostgresV2, update operation', () => {
},
dataMode: 'autoMapInputData',
columnToMatchOn: 'id',
options: {},
options: { typeVersion: 2.1 },
};
const columnsInfo: ColumnInfo[] = [
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
@ -668,6 +669,7 @@ describe('Test PostgresV2, upsert operation', () => {
},
options: {
outputColumns: ['json'],
typeVersion: 2.1,
},
};
const columnsInfo: ColumnInfo[] = [
@ -724,7 +726,7 @@ describe('Test PostgresV2, upsert operation', () => {
},
dataMode: 'autoMapInputData',
columnToMatchOn: 'id',
options: {},
options: { typeVersion: 2.1 },
};
const columnsInfo: ColumnInfo[] = [
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },

View file

@ -17,6 +17,7 @@ import type {
import {
addReturning,
checkItemAgainstSchema,
configureTableSchemaUpdater,
getTableSchema,
prepareItem,
replaceEmptyStringsByNulls,
@ -161,15 +162,28 @@ export async function execute(
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
const nodeVersion = nodeOptions.typeVersion as number;
let schema = this.getNodeParameter('schema', 0, undefined, {
extractValue: true,
}) as string;
let table = this.getNodeParameter('table', 0, undefined, {
extractValue: true,
}) as string;
const updateTableSchema = configureTableSchemaUpdater(schema, table);
let tableSchema = await getTableSchema(db, schema, table);
const queries: QueryWithValues[] = [];
for (let i = 0; i < items.length; i++) {
const schema = this.getNodeParameter('schema', i, undefined, {
schema = this.getNodeParameter('schema', i, undefined, {
extractValue: true,
}) as string;
const table = this.getNodeParameter('table', i, undefined, {
table = this.getNodeParameter('table', i, undefined, {
extractValue: true,
}) as string;
@ -183,7 +197,6 @@ export async function execute(
let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`;
let values: QueryValues = [schema, table];
const nodeVersion = this.getNode().typeVersion;
const dataMode =
nodeVersion < 2.2
? (this.getNodeParameter('dataMode', i) as string)
@ -209,7 +222,7 @@ export async function execute(
}
}
const tableSchema = await getTableSchema(db, schema, table);
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i));

View file

@ -18,6 +18,7 @@ import type {
import {
addReturning,
checkItemAgainstSchema,
configureTableSchemaUpdater,
doesRowExist,
getTableSchema,
prepareItem,
@ -198,19 +199,31 @@ export async function execute(
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
const nodeVersion = nodeOptions.typeVersion as number;
let schema = this.getNodeParameter('schema', 0, undefined, {
extractValue: true,
}) as string;
let table = this.getNodeParameter('table', 0, undefined, {
extractValue: true,
}) as string;
const updateTableSchema = configureTableSchemaUpdater(schema, table);
let tableSchema = await getTableSchema(db, schema, table);
const queries: QueryWithValues[] = [];
for (let i = 0; i < items.length; i++) {
const schema = this.getNodeParameter('schema', i, undefined, {
schema = this.getNodeParameter('schema', i, undefined, {
extractValue: true,
}) as string;
const table = this.getNodeParameter('table', i, undefined, {
table = this.getNodeParameter('table', i, undefined, {
extractValue: true,
}) as string;
const nodeVersion = this.getNode().typeVersion;
const columnsToMatchOn: string[] =
nodeVersion < 2.2
? [this.getNodeParameter('columnToMatchOn', i) as string]
@ -286,7 +299,7 @@ export async function execute(
}
}
const tableSchema = await getTableSchema(db, schema, table);
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);

View file

@ -21,6 +21,7 @@ import {
getTableSchema,
prepareItem,
replaceEmptyStringsByNulls,
configureTableSchemaUpdater,
} from '../../helpers/utils';
import { optionsCollection } from '../common.descriptions';
@ -197,19 +198,31 @@ export async function execute(
db: PgpDatabase,
): Promise<INodeExecutionData[]> {
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
const nodeVersion = nodeOptions.typeVersion as number;
let schema = this.getNodeParameter('schema', 0, undefined, {
extractValue: true,
}) as string;
let table = this.getNodeParameter('table', 0, undefined, {
extractValue: true,
}) as string;
const updateTableSchema = configureTableSchemaUpdater(schema, table);
let tableSchema = await getTableSchema(db, schema, table);
const queries: QueryWithValues[] = [];
for (let i = 0; i < items.length; i++) {
const schema = this.getNodeParameter('schema', i, undefined, {
schema = this.getNodeParameter('schema', i, undefined, {
extractValue: true,
}) as string;
const table = this.getNodeParameter('table', i, undefined, {
table = this.getNodeParameter('table', i, undefined, {
extractValue: true,
}) as string;
const nodeVersion = this.getNode().typeVersion;
const columnsToMatchOn: string[] =
nodeVersion < 2.2
? [this.getNodeParameter('columnToMatchOn', i) as string]
@ -255,7 +268,7 @@ export async function execute(
);
}
const tableSchema = await getTableSchema(db, schema, table);
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);

View file

@ -57,7 +57,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
if (sshClient) {
sshClient.end();
}
pgp.end();
await db.$pool.end();
}
return this.prepareOutputData(returnData);

View file

@ -15,7 +15,7 @@ export type ColumnInfo = {
column_name: string;
data_type: string;
is_nullable: string;
udt_name: string;
udt_name?: string;
column_default?: string;
};
export type EnumInfo = {

View file

@ -458,3 +458,16 @@ export function checkItemAgainstSchema(
return item;
}
export const configureTableSchemaUpdater = (initialSchema: string, initialTable: string) => {
let currentSchema = initialSchema;
let currentTable = initialTable;
return async (db: PgpDatabase, tableSchema: ColumnInfo[], schema: string, table: string) => {
if (currentSchema !== schema || currentTable !== table) {
currentSchema = schema;
currentTable = table;
tableSchema = await getTableSchema(db, schema, table);
}
return tableSchema;
};
};

View file

@ -6,7 +6,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const { db, sshClient } = await configurePostgres(credentials, options);
try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
@ -23,14 +23,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
if (sshClient) {
sshClient.end();
}
pgp.end();
await db.$pool.end();
}
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const { db, sshClient } = await configurePostgres(credentials, options);
const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
@ -54,6 +54,6 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
if (sshClient) {
sshClient.end();
}
pgp.end();
await db.$pool.end();
}
}

View file

@ -7,7 +7,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
const { db, sshClient } = await configurePostgres(credentials, options);
const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
@ -31,7 +31,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
if (sshClient) {
sshClient.end();
}
pgp.end();
await db.$pool.end();
}
}

View file

@ -70,7 +70,8 @@ export async function getMappingColumns(
const canBeUsedToMatch =
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
const type = mapPostgresType(col.data_type);
const options = type === 'options' ? getEnumValues(enumInfo, col.udt_name) : undefined;
const options =
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
const isAutoIncrement = col.column_default?.startsWith('nextval');
return {
id: col.column_name,