mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 05:17:28 -08:00
fix(Postgres Node): Connection pool of the database object has been destroyed (#7074)
Github issue / Community forum post (link here to close automatically):
This commit is contained in:
parent
008cdcce56
commit
9dd5f0e579
|
@ -89,7 +89,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
|
|||
},
|
||||
],
|
||||
},
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const nodeOptions = nodeParameters.options as IDataObject;
|
||||
|
||||
|
@ -168,7 +168,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
|
|||
cachedResultName: 'my_table',
|
||||
},
|
||||
deleteCommand: 'drop',
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const nodeOptions = nodeParameters.options as IDataObject;
|
||||
|
||||
|
@ -256,7 +256,7 @@ describe('Test PostgresV2, insert operation', () => {
|
|||
},
|
||||
],
|
||||
},
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||
|
@ -299,7 +299,7 @@ describe('Test PostgresV2, insert operation', () => {
|
|||
mode: 'list',
|
||||
},
|
||||
dataMode: 'autoMapInputData',
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||
|
@ -509,6 +509,7 @@ describe('Test PostgresV2, update operation', () => {
|
|||
},
|
||||
options: {
|
||||
outputColumns: ['json', 'foo'],
|
||||
typeVersion: 2.1,
|
||||
},
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
|
@ -565,7 +566,7 @@ describe('Test PostgresV2, update operation', () => {
|
|||
},
|
||||
dataMode: 'autoMapInputData',
|
||||
columnToMatchOn: 'id',
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||
|
@ -668,6 +669,7 @@ describe('Test PostgresV2, upsert operation', () => {
|
|||
},
|
||||
options: {
|
||||
outputColumns: ['json'],
|
||||
typeVersion: 2.1,
|
||||
},
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
|
@ -724,7 +726,7 @@ describe('Test PostgresV2, upsert operation', () => {
|
|||
},
|
||||
dataMode: 'autoMapInputData',
|
||||
columnToMatchOn: 'id',
|
||||
options: {},
|
||||
options: { typeVersion: 2.1 },
|
||||
};
|
||||
const columnsInfo: ColumnInfo[] = [
|
||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||
|
|
|
@ -17,6 +17,7 @@ import type {
|
|||
import {
|
||||
addReturning,
|
||||
checkItemAgainstSchema,
|
||||
configureTableSchemaUpdater,
|
||||
getTableSchema,
|
||||
prepareItem,
|
||||
replaceEmptyStringsByNulls,
|
||||
|
@ -161,15 +162,28 @@ export async function execute(
|
|||
db: PgpDatabase,
|
||||
): Promise<INodeExecutionData[]> {
|
||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||
const nodeVersion = nodeOptions.typeVersion as number;
|
||||
|
||||
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
let table = this.getNodeParameter('table', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||
|
||||
let tableSchema = await getTableSchema(db, schema, table);
|
||||
|
||||
const queries: QueryWithValues[] = [];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
||||
schema = this.getNodeParameter('schema', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const table = this.getNodeParameter('table', i, undefined, {
|
||||
table = this.getNodeParameter('table', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
|
@ -183,7 +197,6 @@ export async function execute(
|
|||
let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`;
|
||||
let values: QueryValues = [schema, table];
|
||||
|
||||
const nodeVersion = this.getNode().typeVersion;
|
||||
const dataMode =
|
||||
nodeVersion < 2.2
|
||||
? (this.getNodeParameter('dataMode', i) as string)
|
||||
|
@ -209,7 +222,7 @@ export async function execute(
|
|||
}
|
||||
}
|
||||
|
||||
const tableSchema = await getTableSchema(db, schema, table);
|
||||
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||
|
||||
values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i));
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import type {
|
|||
import {
|
||||
addReturning,
|
||||
checkItemAgainstSchema,
|
||||
configureTableSchemaUpdater,
|
||||
doesRowExist,
|
||||
getTableSchema,
|
||||
prepareItem,
|
||||
|
@ -198,19 +199,31 @@ export async function execute(
|
|||
db: PgpDatabase,
|
||||
): Promise<INodeExecutionData[]> {
|
||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||
const nodeVersion = nodeOptions.typeVersion as number;
|
||||
|
||||
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
let table = this.getNodeParameter('table', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||
|
||||
let tableSchema = await getTableSchema(db, schema, table);
|
||||
|
||||
const queries: QueryWithValues[] = [];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
||||
schema = this.getNodeParameter('schema', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const table = this.getNodeParameter('table', i, undefined, {
|
||||
table = this.getNodeParameter('table', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const nodeVersion = this.getNode().typeVersion;
|
||||
const columnsToMatchOn: string[] =
|
||||
nodeVersion < 2.2
|
||||
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
||||
|
@ -286,7 +299,7 @@ export async function execute(
|
|||
}
|
||||
}
|
||||
|
||||
const tableSchema = await getTableSchema(db, schema, table);
|
||||
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||
|
||||
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import {
|
|||
getTableSchema,
|
||||
prepareItem,
|
||||
replaceEmptyStringsByNulls,
|
||||
configureTableSchemaUpdater,
|
||||
} from '../../helpers/utils';
|
||||
|
||||
import { optionsCollection } from '../common.descriptions';
|
||||
|
@ -197,19 +198,31 @@ export async function execute(
|
|||
db: PgpDatabase,
|
||||
): Promise<INodeExecutionData[]> {
|
||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||
const nodeVersion = nodeOptions.typeVersion as number;
|
||||
|
||||
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
let table = this.getNodeParameter('table', 0, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||
|
||||
let tableSchema = await getTableSchema(db, schema, table);
|
||||
|
||||
const queries: QueryWithValues[] = [];
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
||||
schema = this.getNodeParameter('schema', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const table = this.getNodeParameter('table', i, undefined, {
|
||||
table = this.getNodeParameter('table', i, undefined, {
|
||||
extractValue: true,
|
||||
}) as string;
|
||||
|
||||
const nodeVersion = this.getNode().typeVersion;
|
||||
const columnsToMatchOn: string[] =
|
||||
nodeVersion < 2.2
|
||||
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
||||
|
@ -255,7 +268,7 @@ export async function execute(
|
|||
);
|
||||
}
|
||||
|
||||
const tableSchema = await getTableSchema(db, schema, table);
|
||||
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||
|
||||
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
||||
|
||||
|
|
|
@ -57,7 +57,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
|
|||
if (sshClient) {
|
||||
sshClient.end();
|
||||
}
|
||||
pgp.end();
|
||||
|
||||
await db.$pool.end();
|
||||
}
|
||||
|
||||
return this.prepareOutputData(returnData);
|
||||
|
|
|
@ -15,7 +15,7 @@ export type ColumnInfo = {
|
|||
column_name: string;
|
||||
data_type: string;
|
||||
is_nullable: string;
|
||||
udt_name: string;
|
||||
udt_name?: string;
|
||||
column_default?: string;
|
||||
};
|
||||
export type EnumInfo = {
|
||||
|
|
|
@ -458,3 +458,16 @@ export function checkItemAgainstSchema(
|
|||
|
||||
return item;
|
||||
}
|
||||
|
||||
export const configureTableSchemaUpdater = (initialSchema: string, initialTable: string) => {
|
||||
let currentSchema = initialSchema;
|
||||
let currentTable = initialTable;
|
||||
return async (db: PgpDatabase, tableSchema: ColumnInfo[], schema: string, table: string) => {
|
||||
if (currentSchema !== schema || currentTable !== table) {
|
||||
currentSchema = schema;
|
||||
currentTable = table;
|
||||
tableSchema = await getTableSchema(db, schema, table);
|
||||
}
|
||||
return tableSchema;
|
||||
};
|
||||
};
|
||||
|
|
|
@ -6,7 +6,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
|
|||
const credentials = await this.getCredentials('postgres');
|
||||
const options = { nodeVersion: this.getNode().typeVersion };
|
||||
|
||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
||||
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||
|
||||
try {
|
||||
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
|
||||
|
@ -23,14 +23,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
|
|||
if (sshClient) {
|
||||
sshClient.end();
|
||||
}
|
||||
pgp.end();
|
||||
await db.$pool.end();
|
||||
}
|
||||
}
|
||||
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
|
||||
const credentials = await this.getCredentials('postgres');
|
||||
const options = { nodeVersion: this.getNode().typeVersion };
|
||||
|
||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
||||
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||
|
||||
const schema = this.getNodeParameter('schema', 0, {
|
||||
extractValue: true,
|
||||
|
@ -54,6 +54,6 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
|
|||
if (sshClient) {
|
||||
sshClient.end();
|
||||
}
|
||||
pgp.end();
|
||||
await db.$pool.end();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
|
|||
const credentials = await this.getCredentials('postgres');
|
||||
const options = { nodeVersion: this.getNode().typeVersion };
|
||||
|
||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
||||
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||
|
||||
const schema = this.getNodeParameter('schema', 0, {
|
||||
extractValue: true,
|
||||
|
@ -31,7 +31,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
|
|||
if (sshClient) {
|
||||
sshClient.end();
|
||||
}
|
||||
pgp.end();
|
||||
await db.$pool.end();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,8 @@ export async function getMappingColumns(
|
|||
const canBeUsedToMatch =
|
||||
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
|
||||
const type = mapPostgresType(col.data_type);
|
||||
const options = type === 'options' ? getEnumValues(enumInfo, col.udt_name) : undefined;
|
||||
const options =
|
||||
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
|
||||
const isAutoIncrement = col.column_default?.startsWith('nextval');
|
||||
return {
|
||||
id: col.column_name,
|
||||
|
|
Loading…
Reference in a new issue