mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-26 05:04:05 -08:00
7ce7285f7a
* Changes to types so that credentials can be always loaded from DB This first commit changes all return types from the execute functions and calls to get credentials to be async so we can use await. This is a first step as previously credentials were loaded in memory and always available. We will now be loading them from the DB which requires turning the whole call chain async. * Fix updated files * Removed unnecessary credential loading to improve performance * Fix typo * ⚡ Fix issue * Updated new nodes to load credentials async * ⚡ Remove not needed comment Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
410 lines
9.6 KiB
TypeScript
410 lines
9.6 KiB
TypeScript
import {
|
|
IExecuteFunctions,
|
|
} from 'n8n-core';
|
|
|
|
import {
|
|
IDataObject,
|
|
INodeExecutionData,
|
|
INodeType,
|
|
INodeTypeDescription,
|
|
NodeOperationError,
|
|
} from 'n8n-workflow';
|
|
|
|
import {
|
|
chunk,
|
|
flatten,
|
|
} from '../../utils/utilities';
|
|
|
|
import * as mssql from 'mssql';
|
|
|
|
import {
|
|
ITables,
|
|
} from './TableInterface';
|
|
|
|
import {
|
|
copyInputItem,
|
|
createTableStruct,
|
|
executeQueryQueue,
|
|
extractDeleteValues,
|
|
extractUpdateCondition,
|
|
extractUpdateSet,
|
|
extractValues,
|
|
formatColumns,
|
|
} from './GenericFunctions';
|
|
|
|
export class MicrosoftSql implements INodeType {
|
|
description: INodeTypeDescription = {
|
|
displayName: 'Microsoft SQL',
|
|
name: 'microsoftSql',
|
|
icon: 'file:mssql.svg',
|
|
group: ['input'],
|
|
version: 1,
|
|
description: 'Get, add and update data in Microsoft SQL',
|
|
defaults: {
|
|
name: 'Microsoft SQL',
|
|
color: '#bcbcbd',
|
|
},
|
|
inputs: ['main'],
|
|
outputs: ['main'],
|
|
credentials: [
|
|
{
|
|
name: 'microsoftSql',
|
|
required: true,
|
|
},
|
|
],
|
|
properties: [
|
|
{
|
|
displayName: 'Operation',
|
|
name: 'operation',
|
|
type: 'options',
|
|
options: [
|
|
{
|
|
name: 'Execute Query',
|
|
value: 'executeQuery',
|
|
description: 'Execute an SQL query',
|
|
},
|
|
{
|
|
name: 'Insert',
|
|
value: 'insert',
|
|
description: 'Insert rows in database',
|
|
},
|
|
{
|
|
name: 'Update',
|
|
value: 'update',
|
|
description: 'Update rows in database',
|
|
},
|
|
{
|
|
name: 'Delete',
|
|
value: 'delete',
|
|
description: 'Delete rows in database',
|
|
},
|
|
],
|
|
default: 'insert',
|
|
description: 'The operation to perform.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// executeQuery
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Query',
|
|
name: 'query',
|
|
type: 'string',
|
|
typeOptions: {
|
|
rows: 5,
|
|
},
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['executeQuery'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'SELECT id, name FROM product WHERE id < 40',
|
|
required: true,
|
|
description: 'The SQL query to execute.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// insert
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Table',
|
|
name: 'table',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert'],
|
|
},
|
|
},
|
|
default: '',
|
|
required: true,
|
|
description: 'Name of the table in which to insert data to.',
|
|
},
|
|
{
|
|
displayName: 'Columns',
|
|
name: 'columns',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['insert'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'id,name,description',
|
|
description:
|
|
'Comma separated list of the properties which should used as columns for the new rows.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// update
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Table',
|
|
name: 'table',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: '',
|
|
required: true,
|
|
description: 'Name of the table in which to update data in',
|
|
},
|
|
{
|
|
displayName: 'Update Key',
|
|
name: 'updateKey',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: 'id',
|
|
required: true,
|
|
description:
|
|
'Name of the property which decides which rows in the database should be updated. Normally that would be "id".',
|
|
},
|
|
{
|
|
displayName: 'Columns',
|
|
name: 'columns',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['update'],
|
|
},
|
|
},
|
|
default: '',
|
|
placeholder: 'name,description',
|
|
description:
|
|
'Comma separated list of the properties which should used as columns for rows to update.',
|
|
},
|
|
|
|
// ----------------------------------
|
|
// delete
|
|
// ----------------------------------
|
|
{
|
|
displayName: 'Table',
|
|
name: 'table',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['delete'],
|
|
},
|
|
},
|
|
default: '',
|
|
required: true,
|
|
description: 'Name of the table in which to delete data.',
|
|
},
|
|
{
|
|
displayName: 'Delete Key',
|
|
name: 'deleteKey',
|
|
type: 'string',
|
|
displayOptions: {
|
|
show: {
|
|
operation: ['delete'],
|
|
},
|
|
},
|
|
default: 'id',
|
|
required: true,
|
|
description:
|
|
'Name of the property which decides which rows in the database should be deleted. Normally that would be "id".',
|
|
},
|
|
],
|
|
};
|
|
|
|
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
|
|
const credentials = await this.getCredentials('microsoftSql');
|
|
|
|
if (credentials === undefined) {
|
|
throw new NodeOperationError(this.getNode(), 'No credentials got returned!');
|
|
}
|
|
|
|
const config = {
|
|
server: credentials.server as string,
|
|
port: credentials.port as number,
|
|
database: credentials.database as string,
|
|
user: credentials.user as string,
|
|
password: credentials.password as string,
|
|
domain: credentials.domain ? (credentials.domain as string) : undefined,
|
|
connectionTimeout: credentials.connectTimeout as number,
|
|
requestTimeout: credentials.requestTimeout as number,
|
|
options: {
|
|
encrypt: credentials.tls as boolean,
|
|
enableArithAbort: false,
|
|
},
|
|
};
|
|
|
|
const pool = new mssql.ConnectionPool(config);
|
|
await pool.connect();
|
|
|
|
let returnItems = [];
|
|
|
|
const items = this.getInputData();
|
|
const operation = this.getNodeParameter('operation', 0) as string;
|
|
|
|
try {
|
|
if (operation === 'executeQuery') {
|
|
// ----------------------------------
|
|
// executeQuery
|
|
// ----------------------------------
|
|
|
|
const rawQuery = this.getNodeParameter('query', 0) as string;
|
|
|
|
const queryResult = await pool.request().query(rawQuery);
|
|
|
|
const result =
|
|
queryResult.recordsets.length > 1
|
|
? flatten(queryResult.recordsets)
|
|
: queryResult.recordsets[0];
|
|
|
|
returnItems = this.helpers.returnJsonArray(result as IDataObject[]);
|
|
} else if (operation === 'insert') {
|
|
// ----------------------------------
|
|
// insert
|
|
// ----------------------------------
|
|
|
|
const tables = createTableStruct(this.getNodeParameter, items);
|
|
await executeQueryQueue(
|
|
tables,
|
|
({
|
|
table,
|
|
columnString,
|
|
items,
|
|
}: {
|
|
table: string;
|
|
columnString: string;
|
|
items: IDataObject[];
|
|
}): Array<Promise<object>> => {
|
|
return chunk(items, 1000).map(insertValues => {
|
|
const values = insertValues
|
|
.map((item: IDataObject) => extractValues(item))
|
|
.join(',');
|
|
return pool
|
|
.request()
|
|
.query(
|
|
`INSERT INTO ${table}(${formatColumns(columnString)}) VALUES ${values};`,
|
|
);
|
|
});
|
|
},
|
|
);
|
|
|
|
returnItems = items;
|
|
} else if (operation === 'update') {
|
|
// ----------------------------------
|
|
// update
|
|
// ----------------------------------
|
|
|
|
const updateKeys = items.map(
|
|
(item, index) => this.getNodeParameter('updateKey', index) as string,
|
|
);
|
|
const tables = createTableStruct(
|
|
this.getNodeParameter,
|
|
items,
|
|
['updateKey'].concat(updateKeys),
|
|
'updateKey',
|
|
);
|
|
await executeQueryQueue(
|
|
tables,
|
|
({
|
|
table,
|
|
columnString,
|
|
items,
|
|
}: {
|
|
table: string;
|
|
columnString: string;
|
|
items: IDataObject[];
|
|
}): Array<Promise<object>> => {
|
|
return items.map(item => {
|
|
const columns = columnString
|
|
.split(',')
|
|
.map(column => column.trim());
|
|
|
|
const setValues = extractUpdateSet(item, columns);
|
|
const condition = extractUpdateCondition(
|
|
item,
|
|
item.updateKey as string,
|
|
);
|
|
|
|
return pool
|
|
.request()
|
|
.query(`UPDATE ${table} SET ${setValues} WHERE ${condition};`);
|
|
});
|
|
},
|
|
);
|
|
|
|
returnItems = items;
|
|
} else if (operation === 'delete') {
|
|
// ----------------------------------
|
|
// delete
|
|
// ----------------------------------
|
|
|
|
const tables = items.reduce((tables, item, index) => {
|
|
const table = this.getNodeParameter('table', index) as string;
|
|
const deleteKey = this.getNodeParameter('deleteKey', index) as string;
|
|
if (tables[table] === undefined) {
|
|
tables[table] = {};
|
|
}
|
|
if (tables[table][deleteKey] === undefined) {
|
|
tables[table][deleteKey] = [];
|
|
}
|
|
tables[table][deleteKey].push(item);
|
|
return tables;
|
|
}, {} as ITables);
|
|
|
|
const queriesResults = await Promise.all(
|
|
Object.keys(tables).map(table => {
|
|
const deleteKeyResults = Object.keys(tables[table]).map(
|
|
deleteKey => {
|
|
const deleteItemsList = chunk(
|
|
tables[table][deleteKey].map(item =>
|
|
copyInputItem(item as INodeExecutionData, [deleteKey]),
|
|
),
|
|
1000,
|
|
);
|
|
const queryQueue = deleteItemsList.map(deleteValues => {
|
|
return pool
|
|
.request()
|
|
.query(
|
|
`DELETE FROM ${table} WHERE "${deleteKey}" IN ${extractDeleteValues(
|
|
deleteValues,
|
|
deleteKey,
|
|
)};`,
|
|
);
|
|
});
|
|
return Promise.all(queryQueue);
|
|
},
|
|
);
|
|
return Promise.all(deleteKeyResults);
|
|
}),
|
|
);
|
|
|
|
const rowsDeleted = flatten(queriesResults).reduce(
|
|
(acc: number, resp: mssql.IResult<object>): number =>
|
|
(acc += resp.rowsAffected.reduce((sum, val) => (sum += val))),
|
|
0,
|
|
);
|
|
|
|
returnItems = this.helpers.returnJsonArray({
|
|
rowsDeleted,
|
|
} as IDataObject);
|
|
} else {
|
|
await pool.close();
|
|
throw new NodeOperationError(this.getNode(), `The operation "${operation}" is not supported!`);
|
|
}
|
|
} catch (error) {
|
|
if (this.continueOnFail() === true) {
|
|
returnItems = items;
|
|
} else {
|
|
await pool.close();
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Close the connection
|
|
await pool.close();
|
|
|
|
return this.prepareOutputData(returnItems);
|
|
}
|
|
}
|