mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
fc54f7c82b
* Adding support to ParameterizedQuery on Postgres Node * Created another parameter to toggle on replacement so it's clear to users what is happening. * Fixed lint issues * ⚡ Formatting * Improvements to questDB node so it is more consistent * Fixed lint issues * Fixed typing issue * ⚡ Apply suggestions BHesseldieck Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com> * Standardized output for postgres and postgres-like nodes This changes the behavior of CrateDB, Postgres, QuestDB and TimescaleDB nodes. The Execute Query operation used to execute multiple queries but return the result from only one of the queries. This change causes the node output to containt results from all queries that ran, making the behavior more consistent across all n8n. * Fixing lint issues * ⚡ Minor improvements * ⚡ Fix breaking changes files Co-authored-by: Gustavo Arjones <gustavo.arjones@ank.app> Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com> Co-authored-by: Jan <janober@users.noreply.github.com> Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com>
362 lines
9.5 KiB
TypeScript
362 lines
9.5 KiB
TypeScript
import { IExecuteFunctions } from 'n8n-core';
|
|
import {
|
|
IDataObject,
|
|
INodeExecutionData,
|
|
INodeType,
|
|
INodeTypeDescription,
|
|
NodeOperationError,
|
|
} from 'n8n-workflow';
|
|
|
|
import {
|
|
generateReturning,
|
|
getItemCopy,
|
|
getItemsCopy,
|
|
pgInsert,
|
|
pgQuery,
|
|
pgUpdate,
|
|
} from '../Postgres/Postgres.node.functions';
|
|
|
|
import * as pgPromise from 'pg-promise';
|
|
|
|
export class CrateDb implements INodeType {
|
|
description: INodeTypeDescription = {
|
|
displayName: 'CrateDB',
|
|
name: 'crateDb',
|
|
icon: 'file:cratedb.png',
|
|
group: ['input'],
|
|
version: 1,
|
|
description: 'Add and update data in CrateDB.',
|
|
defaults: {
|
|
name: 'CrateDB',
|
|
color: '#47889f',
|
|
},
|
|
inputs: ['main'],
|
|
outputs: ['main'],
|
|
credentials: [
|
|
{
|
|
name: 'crateDb',
|
|
required: true,
|
|
},
|
|
],
|
|
properties: [
|
|
{
|
|
displayName: 'Operation',
|
|
name: 'operation',
|
|
type: 'options',
|
|
options: [
|
|
{
|
|
name: 'Execute Query',
|
|
value: 'executeQuery',
|
|
description: 'Execute an SQL query',
|
|
},
|
|
{
|
|
name: 'Insert',
|
|
value: 'insert',
|
|
description: 'Insert rows in database',
|
|
},
|
|
{
|
|
name: 'Update',
|
|
value: 'update',
|
|
description: 'Update rows in database',
|
|
},
|
|
],
|
|
default: 'insert',
|
|
description: 'The operation to perform.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// executeQuery
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Query',
|
|
name: 'query',
|
|
type: 'string',
|
|
typeOptions: {
|
|
rows: 5,
|
|
},
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['executeQuery'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'SELECT id, name FROM product WHERE quantity > $1 AND price <= $2',
|
|
required: true,
|
|
description: 'The SQL query to execute. You can use n8n expressions or $1 and $2 in conjunction with query parameters.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// insert
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Schema',
|
|
name: 'schema',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert'],
|
|
},
|
|
},
|
|
default: 'doc',
|
|
required: true,
|
|
description: 'Name of the schema the table belongs to',
|
|
},
|
|
{
|
|
displayName: 'Table',
|
|
name: 'table',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert'],
|
|
},
|
|
},
|
|
default: '',
|
|
required: true,
|
|
description: 'Name of the table in which to insert data to.',
|
|
},
|
|
{
|
|
displayName: 'Columns',
|
|
name: 'columns',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'id,name,description',
|
|
description:
|
|
'Comma separated list of the properties which should used as columns for the new rows.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// update
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Schema',
|
|
name: 'schema',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: 'doc',
|
|
required: true,
|
|
description: 'Name of the schema the table belongs to',
|
|
},
|
|
{
|
|
displayName: 'Table',
|
|
name: 'table',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: '',
|
|
required: true,
|
|
description: 'Name of the table in which to update data in',
|
|
},
|
|
{
|
|
displayName: 'Update Key',
|
|
name: 'updateKey',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: 'id',
|
|
required: true,
|
|
description:
|
|
'Comma separated list of the properties which decides which rows in the database should be updated. Normally that would be "id".',
|
|
},
|
|
{
|
|
displayName: 'Columns',
|
|
name: 'columns',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'name,description',
|
|
description:
|
|
'Comma separated list of the properties which should used as columns for rows to update.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// insert,update
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Return Fields',
|
|
name: 'returnFields',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert', 'update'],
|
|
},
|
|
},
|
|
default: '*',
|
|
description: 'Comma separated list of the fields that the operation will return',
|
|
},
|
|
// ----------------------------------
|
|
// additional fields
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Additional Fields',
|
|
name: 'additionalFields',
|
|
type: 'collection',
|
|
placeholder: 'Add Field',
|
|
default: {},
|
|
options: [
|
|
{
|
|
displayName: 'Mode',
|
|
name: 'mode',
|
|
type: 'options',
|
|
options: [
|
|
{
|
|
name: 'Independently',
|
|
value: 'independently',
|
|
description: 'Execute each query independently',
|
|
},
|
|
{
|
|
name: 'Multiple queries',
|
|
value: 'multiple',
|
|
description: '<b>Default</b>. Sends multiple queries at once to database.',
|
|
},
|
|
],
|
|
default: 'multiple',
|
|
description: [
|
|
'The way queries should be sent to database.',
|
|
'Can be used in conjunction with <b>Continue on Fail</b>.',
|
|
'See the docs for more examples',
|
|
].join('<br>'),
|
|
},
|
|
{
|
|
displayName: 'Query Parameters',
|
|
name: 'queryParams',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
'/operation': [
|
|
'executeQuery',
|
|
],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'quantity,price',
|
|
description: 'Comma separated list of properties which should be used as query parameters.',
|
|
},
|
|
],
|
|
},
|
|
],
|
|
};
|
|
|
|
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
|
|
const credentials = this.getCredentials('crateDb');
|
|
|
|
if (credentials === undefined) {
|
|
throw new NodeOperationError(this.getNode(), 'No credentials got returned!');
|
|
}
|
|
|
|
const pgp = pgPromise();
|
|
|
|
const config = {
|
|
host: credentials.host as string,
|
|
port: credentials.port as number,
|
|
database: credentials.database as string,
|
|
user: credentials.user as string,
|
|
password: credentials.password as string,
|
|
ssl: !['disable', undefined].includes(credentials.ssl as string | undefined),
|
|
sslmode: (credentials.ssl as string) || 'disable',
|
|
};
|
|
|
|
const db = pgp(config);
|
|
|
|
let returnItems: INodeExecutionData[] = [];
|
|
|
|
const items = this.getInputData();
|
|
const operation = this.getNodeParameter('operation', 0) as string;
|
|
|
|
if (operation === 'executeQuery') {
|
|
// ----------------------------------
|
|
// executeQuery
|
|
// ----------------------------------
|
|
|
|
const queryResult = await pgQuery(this.getNodeParameter, pgp, db, items, this.continueOnFail());
|
|
|
|
returnItems = this.helpers.returnJsonArray(queryResult);
|
|
} else if (operation === 'insert') {
|
|
// ----------------------------------
|
|
// insert
|
|
// ----------------------------------
|
|
|
|
const insertData = await pgInsert(this.getNodeParameter, pgp, db, items, this.continueOnFail());
|
|
|
|
for (let i = 0; i < insertData.length; i++) {
|
|
returnItems.push({
|
|
json: insertData[i],
|
|
});
|
|
}
|
|
} else if (operation === 'update') {
|
|
// ----------------------------------
|
|
// update
|
|
// ----------------------------------
|
|
|
|
const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject;
|
|
const mode = additionalFields.mode ?? 'multiple' as string;
|
|
|
|
if(mode === 'independently') {
|
|
const updateItems = await pgUpdate(this.getNodeParameter, pgp, db, items, this.continueOnFail());
|
|
|
|
returnItems = this.helpers.returnJsonArray(updateItems);
|
|
} else if(mode === 'multiple') {
|
|
// Crate db does not support multiple-update queries
|
|
// Therefore we cannot invoke `pgUpdate` using multiple mode
|
|
// so we have to call multiple updates manually here
|
|
|
|
const table = this.getNodeParameter('table', 0) as string;
|
|
const schema = this.getNodeParameter('schema', 0) as string;
|
|
const updateKeys = (this.getNodeParameter('updateKey', 0) as string).split(',').map(column => column.trim());
|
|
const columns = (this.getNodeParameter('columns', 0) as string).split(',').map(column => column.trim());
|
|
const queryColumns = columns.slice();
|
|
|
|
updateKeys.forEach(updateKey => {
|
|
if (!queryColumns.includes(updateKey)) {
|
|
columns.unshift(updateKey);
|
|
queryColumns.unshift('?' + updateKey);
|
|
}
|
|
});
|
|
|
|
const cs = new pgp.helpers.ColumnSet(queryColumns, { table: { table, schema } });
|
|
|
|
const where = ' WHERE ' + updateKeys.map(updateKey => pgp.as.name(updateKey) + ' = ${' + updateKey + '}').join(' AND ');
|
|
// updateKeyValue = item.json[updateKey] as string | number;
|
|
// if (updateKeyValue === undefined) {
|
|
// throw new NodeOperationError(this.getNode(), 'No value found for update key!');
|
|
// }
|
|
|
|
const returning = generateReturning(pgp, this.getNodeParameter('returnFields', 0) as string);
|
|
const queries:string[] = [];
|
|
for (let i = 0; i < items.length; i++) {
|
|
const itemCopy = getItemCopy(items[i], columns);
|
|
queries.push(pgp.helpers.update(itemCopy, cs) + pgp.as.format(where, itemCopy) + returning);
|
|
}
|
|
const updateItems = await db.multi(pgp.helpers.concat(queries));
|
|
returnItems = this.helpers.returnJsonArray(getItemsCopy(items, columns) as IDataObject[]);
|
|
}
|
|
} else {
|
|
await pgp.end();
|
|
throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`);
|
|
}
|
|
|
|
// Close the connection
|
|
await pgp.end();
|
|
|
|
return this.prepareOutputData(returnItems);
|
|
}
|
|
}
|