mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
fix(Postgres Node): Connection pool of the database object has been destroyed (#7074)
Github issue / Community forum post (link here to close automatically):
This commit is contained in:
parent
008cdcce56
commit
9dd5f0e579
|
@ -89,7 +89,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const nodeOptions = nodeParameters.options as IDataObject;
|
const nodeOptions = nodeParameters.options as IDataObject;
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ describe('Test PostgresV2, deleteTable operation', () => {
|
||||||
cachedResultName: 'my_table',
|
cachedResultName: 'my_table',
|
||||||
},
|
},
|
||||||
deleteCommand: 'drop',
|
deleteCommand: 'drop',
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const nodeOptions = nodeParameters.options as IDataObject;
|
const nodeOptions = nodeParameters.options as IDataObject;
|
||||||
|
|
||||||
|
@ -256,7 +256,7 @@ describe('Test PostgresV2, insert operation', () => {
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||||
|
@ -299,7 +299,7 @@ describe('Test PostgresV2, insert operation', () => {
|
||||||
mode: 'list',
|
mode: 'list',
|
||||||
},
|
},
|
||||||
dataMode: 'autoMapInputData',
|
dataMode: 'autoMapInputData',
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||||
|
@ -509,6 +509,7 @@ describe('Test PostgresV2, update operation', () => {
|
||||||
},
|
},
|
||||||
options: {
|
options: {
|
||||||
outputColumns: ['json', 'foo'],
|
outputColumns: ['json', 'foo'],
|
||||||
|
typeVersion: 2.1,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
|
@ -565,7 +566,7 @@ describe('Test PostgresV2, update operation', () => {
|
||||||
},
|
},
|
||||||
dataMode: 'autoMapInputData',
|
dataMode: 'autoMapInputData',
|
||||||
columnToMatchOn: 'id',
|
columnToMatchOn: 'id',
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||||
|
@ -668,6 +669,7 @@ describe('Test PostgresV2, upsert operation', () => {
|
||||||
},
|
},
|
||||||
options: {
|
options: {
|
||||||
outputColumns: ['json'],
|
outputColumns: ['json'],
|
||||||
|
typeVersion: 2.1,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
|
@ -724,7 +726,7 @@ describe('Test PostgresV2, upsert operation', () => {
|
||||||
},
|
},
|
||||||
dataMode: 'autoMapInputData',
|
dataMode: 'autoMapInputData',
|
||||||
columnToMatchOn: 'id',
|
columnToMatchOn: 'id',
|
||||||
options: {},
|
options: { typeVersion: 2.1 },
|
||||||
};
|
};
|
||||||
const columnsInfo: ColumnInfo[] = [
|
const columnsInfo: ColumnInfo[] = [
|
||||||
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
{ column_name: 'id', data_type: 'integer', is_nullable: 'NO', udt_name: '' },
|
||||||
|
|
|
@ -17,6 +17,7 @@ import type {
|
||||||
import {
|
import {
|
||||||
addReturning,
|
addReturning,
|
||||||
checkItemAgainstSchema,
|
checkItemAgainstSchema,
|
||||||
|
configureTableSchemaUpdater,
|
||||||
getTableSchema,
|
getTableSchema,
|
||||||
prepareItem,
|
prepareItem,
|
||||||
replaceEmptyStringsByNulls,
|
replaceEmptyStringsByNulls,
|
||||||
|
@ -161,15 +162,28 @@ export async function execute(
|
||||||
db: PgpDatabase,
|
db: PgpDatabase,
|
||||||
): Promise<INodeExecutionData[]> {
|
): Promise<INodeExecutionData[]> {
|
||||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||||
|
const nodeVersion = nodeOptions.typeVersion as number;
|
||||||
|
|
||||||
|
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
let table = this.getNodeParameter('table', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||||
|
|
||||||
|
let tableSchema = await getTableSchema(db, schema, table);
|
||||||
|
|
||||||
const queries: QueryWithValues[] = [];
|
const queries: QueryWithValues[] = [];
|
||||||
|
|
||||||
for (let i = 0; i < items.length; i++) {
|
for (let i = 0; i < items.length; i++) {
|
||||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
schema = this.getNodeParameter('schema', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
const table = this.getNodeParameter('table', i, undefined, {
|
table = this.getNodeParameter('table', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
|
@ -183,7 +197,6 @@ export async function execute(
|
||||||
let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`;
|
let query = `INSERT INTO $1:name.$2:name($3:name) VALUES($3:csv)${onConflict}`;
|
||||||
let values: QueryValues = [schema, table];
|
let values: QueryValues = [schema, table];
|
||||||
|
|
||||||
const nodeVersion = this.getNode().typeVersion;
|
|
||||||
const dataMode =
|
const dataMode =
|
||||||
nodeVersion < 2.2
|
nodeVersion < 2.2
|
||||||
? (this.getNodeParameter('dataMode', i) as string)
|
? (this.getNodeParameter('dataMode', i) as string)
|
||||||
|
@ -209,7 +222,7 @@ export async function execute(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const tableSchema = await getTableSchema(db, schema, table);
|
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||||
|
|
||||||
values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i));
|
values.push(checkItemAgainstSchema(this.getNode(), item, tableSchema, i));
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ import type {
|
||||||
import {
|
import {
|
||||||
addReturning,
|
addReturning,
|
||||||
checkItemAgainstSchema,
|
checkItemAgainstSchema,
|
||||||
|
configureTableSchemaUpdater,
|
||||||
doesRowExist,
|
doesRowExist,
|
||||||
getTableSchema,
|
getTableSchema,
|
||||||
prepareItem,
|
prepareItem,
|
||||||
|
@ -198,19 +199,31 @@ export async function execute(
|
||||||
db: PgpDatabase,
|
db: PgpDatabase,
|
||||||
): Promise<INodeExecutionData[]> {
|
): Promise<INodeExecutionData[]> {
|
||||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||||
|
const nodeVersion = nodeOptions.typeVersion as number;
|
||||||
|
|
||||||
|
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
let table = this.getNodeParameter('table', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||||
|
|
||||||
|
let tableSchema = await getTableSchema(db, schema, table);
|
||||||
|
|
||||||
const queries: QueryWithValues[] = [];
|
const queries: QueryWithValues[] = [];
|
||||||
|
|
||||||
for (let i = 0; i < items.length; i++) {
|
for (let i = 0; i < items.length; i++) {
|
||||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
schema = this.getNodeParameter('schema', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
const table = this.getNodeParameter('table', i, undefined, {
|
table = this.getNodeParameter('table', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
const nodeVersion = this.getNode().typeVersion;
|
|
||||||
const columnsToMatchOn: string[] =
|
const columnsToMatchOn: string[] =
|
||||||
nodeVersion < 2.2
|
nodeVersion < 2.2
|
||||||
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
||||||
|
@ -286,7 +299,7 @@ export async function execute(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const tableSchema = await getTableSchema(db, schema, table);
|
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||||
|
|
||||||
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import {
|
||||||
getTableSchema,
|
getTableSchema,
|
||||||
prepareItem,
|
prepareItem,
|
||||||
replaceEmptyStringsByNulls,
|
replaceEmptyStringsByNulls,
|
||||||
|
configureTableSchemaUpdater,
|
||||||
} from '../../helpers/utils';
|
} from '../../helpers/utils';
|
||||||
|
|
||||||
import { optionsCollection } from '../common.descriptions';
|
import { optionsCollection } from '../common.descriptions';
|
||||||
|
@ -197,19 +198,31 @@ export async function execute(
|
||||||
db: PgpDatabase,
|
db: PgpDatabase,
|
||||||
): Promise<INodeExecutionData[]> {
|
): Promise<INodeExecutionData[]> {
|
||||||
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
items = replaceEmptyStringsByNulls(items, nodeOptions.replaceEmptyStrings as boolean);
|
||||||
|
const nodeVersion = nodeOptions.typeVersion as number;
|
||||||
|
|
||||||
|
let schema = this.getNodeParameter('schema', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
let table = this.getNodeParameter('table', 0, undefined, {
|
||||||
|
extractValue: true,
|
||||||
|
}) as string;
|
||||||
|
|
||||||
|
const updateTableSchema = configureTableSchemaUpdater(schema, table);
|
||||||
|
|
||||||
|
let tableSchema = await getTableSchema(db, schema, table);
|
||||||
|
|
||||||
const queries: QueryWithValues[] = [];
|
const queries: QueryWithValues[] = [];
|
||||||
|
|
||||||
for (let i = 0; i < items.length; i++) {
|
for (let i = 0; i < items.length; i++) {
|
||||||
const schema = this.getNodeParameter('schema', i, undefined, {
|
schema = this.getNodeParameter('schema', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
const table = this.getNodeParameter('table', i, undefined, {
|
table = this.getNodeParameter('table', i, undefined, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
}) as string;
|
}) as string;
|
||||||
|
|
||||||
const nodeVersion = this.getNode().typeVersion;
|
|
||||||
const columnsToMatchOn: string[] =
|
const columnsToMatchOn: string[] =
|
||||||
nodeVersion < 2.2
|
nodeVersion < 2.2
|
||||||
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
? [this.getNodeParameter('columnToMatchOn', i) as string]
|
||||||
|
@ -255,7 +268,7 @@ export async function execute(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const tableSchema = await getTableSchema(db, schema, table);
|
tableSchema = await updateTableSchema(db, tableSchema, schema, table);
|
||||||
|
|
||||||
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,8 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
|
||||||
if (sshClient) {
|
if (sshClient) {
|
||||||
sshClient.end();
|
sshClient.end();
|
||||||
}
|
}
|
||||||
pgp.end();
|
|
||||||
|
await db.$pool.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
return this.prepareOutputData(returnData);
|
return this.prepareOutputData(returnData);
|
||||||
|
|
|
@ -15,7 +15,7 @@ export type ColumnInfo = {
|
||||||
column_name: string;
|
column_name: string;
|
||||||
data_type: string;
|
data_type: string;
|
||||||
is_nullable: string;
|
is_nullable: string;
|
||||||
udt_name: string;
|
udt_name?: string;
|
||||||
column_default?: string;
|
column_default?: string;
|
||||||
};
|
};
|
||||||
export type EnumInfo = {
|
export type EnumInfo = {
|
||||||
|
|
|
@ -458,3 +458,16 @@ export function checkItemAgainstSchema(
|
||||||
|
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const configureTableSchemaUpdater = (initialSchema: string, initialTable: string) => {
|
||||||
|
let currentSchema = initialSchema;
|
||||||
|
let currentTable = initialTable;
|
||||||
|
return async (db: PgpDatabase, tableSchema: ColumnInfo[], schema: string, table: string) => {
|
||||||
|
if (currentSchema !== schema || currentTable !== table) {
|
||||||
|
currentSchema = schema;
|
||||||
|
currentTable = table;
|
||||||
|
tableSchema = await getTableSchema(db, schema, table);
|
||||||
|
}
|
||||||
|
return tableSchema;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
|
@ -6,7 +6,7 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
|
||||||
const credentials = await this.getCredentials('postgres');
|
const credentials = await this.getCredentials('postgres');
|
||||||
const options = { nodeVersion: this.getNode().typeVersion };
|
const options = { nodeVersion: this.getNode().typeVersion };
|
||||||
|
|
||||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
|
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
|
||||||
|
@ -23,14 +23,14 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
|
||||||
if (sshClient) {
|
if (sshClient) {
|
||||||
sshClient.end();
|
sshClient.end();
|
||||||
}
|
}
|
||||||
pgp.end();
|
await db.$pool.end();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
|
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
|
||||||
const credentials = await this.getCredentials('postgres');
|
const credentials = await this.getCredentials('postgres');
|
||||||
const options = { nodeVersion: this.getNode().typeVersion };
|
const options = { nodeVersion: this.getNode().typeVersion };
|
||||||
|
|
||||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||||
|
|
||||||
const schema = this.getNodeParameter('schema', 0, {
|
const schema = this.getNodeParameter('schema', 0, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
|
@ -54,6 +54,6 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
|
||||||
if (sshClient) {
|
if (sshClient) {
|
||||||
sshClient.end();
|
sshClient.end();
|
||||||
}
|
}
|
||||||
pgp.end();
|
await db.$pool.end();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
|
||||||
const credentials = await this.getCredentials('postgres');
|
const credentials = await this.getCredentials('postgres');
|
||||||
const options = { nodeVersion: this.getNode().typeVersion };
|
const options = { nodeVersion: this.getNode().typeVersion };
|
||||||
|
|
||||||
const { db, pgp, sshClient } = await configurePostgres(credentials, options);
|
const { db, sshClient } = await configurePostgres(credentials, options);
|
||||||
|
|
||||||
const schema = this.getNodeParameter('schema', 0, {
|
const schema = this.getNodeParameter('schema', 0, {
|
||||||
extractValue: true,
|
extractValue: true,
|
||||||
|
@ -31,7 +31,7 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
|
||||||
if (sshClient) {
|
if (sshClient) {
|
||||||
sshClient.end();
|
sshClient.end();
|
||||||
}
|
}
|
||||||
pgp.end();
|
await db.$pool.end();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,8 @@ export async function getMappingColumns(
|
||||||
const canBeUsedToMatch =
|
const canBeUsedToMatch =
|
||||||
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
|
operation === 'upsert' ? unique.some((u) => u.attname === col.column_name) : true;
|
||||||
const type = mapPostgresType(col.data_type);
|
const type = mapPostgresType(col.data_type);
|
||||||
const options = type === 'options' ? getEnumValues(enumInfo, col.udt_name) : undefined;
|
const options =
|
||||||
|
type === 'options' ? getEnumValues(enumInfo, col.udt_name as string) : undefined;
|
||||||
const isAutoIncrement = col.column_default?.startsWith('nextval');
|
const isAutoIncrement = col.column_default?.startsWith('nextval');
|
||||||
return {
|
return {
|
||||||
id: col.column_name,
|
id: col.column_name,
|
||||||
|
|
Loading…
Reference in a new issue