mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-03 17:07:29 -08:00
471 lines
12 KiB
TypeScript
471 lines
12 KiB
TypeScript
import type {
|
|
IDataObject,
|
|
IExecuteFunctions,
|
|
INode,
|
|
INodeExecutionData,
|
|
IPairedItemData,
|
|
NodeExecutionWithMetadata,
|
|
} from 'n8n-workflow';
|
|
|
|
import { NodeOperationError } from 'n8n-workflow';
|
|
|
|
import { generatePairedItemData } from '../../../../utils/utilities';
|
|
import type {
|
|
Mysql2Pool,
|
|
QueryMode,
|
|
QueryValues,
|
|
QueryWithValues,
|
|
SortRule,
|
|
WhereClause,
|
|
} from './interfaces';
|
|
|
|
import { BATCH_MODE } from './interfaces';
|
|
|
|
export function escapeSqlIdentifier(identifier: string): string {
|
|
const parts = identifier.match(/(`[^`]*`|[^.`]+)/g) ?? [];
|
|
|
|
return parts
|
|
.map((part) => {
|
|
const trimmedPart = part.trim();
|
|
|
|
if (trimmedPart.startsWith('`') && trimmedPart.endsWith('`')) {
|
|
return trimmedPart;
|
|
}
|
|
|
|
return `\`${trimmedPart}\``;
|
|
})
|
|
.join('.');
|
|
}
|
|
|
|
export const prepareQueryAndReplacements = (rawQuery: string, replacements?: QueryValues) => {
|
|
if (replacements === undefined) {
|
|
return { query: rawQuery, values: [] };
|
|
}
|
|
// in UI for replacements we use syntax identical to Postgres Query Replacement, but we need to convert it to mysql2 replacement syntax
|
|
let query: string = rawQuery;
|
|
const values: QueryValues = [];
|
|
|
|
const regex = /\$(\d+)(?::name)?/g;
|
|
const matches = rawQuery.match(regex) || [];
|
|
|
|
for (const match of matches) {
|
|
if (match.includes(':name')) {
|
|
const matchIndex = Number(match.replace('$', '').replace(':name', '')) - 1;
|
|
query = query.replace(match, escapeSqlIdentifier(replacements[matchIndex].toString()));
|
|
} else {
|
|
const matchIndex = Number(match.replace('$', '')) - 1;
|
|
query = query.replace(match, '?');
|
|
values.push(replacements[matchIndex]);
|
|
}
|
|
}
|
|
|
|
return { query, values };
|
|
};
|
|
|
|
export function prepareErrorItem(
|
|
item: IDataObject,
|
|
error: IDataObject | NodeOperationError | Error,
|
|
index: number,
|
|
) {
|
|
return {
|
|
json: { message: error.message, item: { ...item }, itemIndex: index, error: { ...error } },
|
|
pairedItem: { item: index },
|
|
} as INodeExecutionData;
|
|
}
|
|
|
|
export function parseMySqlError(
|
|
this: IExecuteFunctions,
|
|
error: any,
|
|
itemIndex = 0,
|
|
queries?: string[],
|
|
) {
|
|
let message: string = error.message;
|
|
const description = `sql: ${error.sql}, code: ${error.code}`;
|
|
|
|
if (
|
|
queries?.length &&
|
|
(message || '').toLowerCase().includes('you have an error in your sql syntax')
|
|
) {
|
|
let queryIndex = itemIndex;
|
|
const failedStatement = ((message.split("near '")[1] || '').split("' at")[0] || '').split(
|
|
';',
|
|
)[0];
|
|
|
|
if (failedStatement) {
|
|
if (queryIndex === 0 && queries.length > 1) {
|
|
const failedQueryIndex = queries.findIndex((query) => query.includes(failedStatement));
|
|
if (failedQueryIndex !== -1) {
|
|
queryIndex = failedQueryIndex;
|
|
}
|
|
}
|
|
const lines = queries[queryIndex].split('\n');
|
|
|
|
const failedLine = lines.findIndex((line) => line.includes(failedStatement));
|
|
if (failedLine !== -1) {
|
|
message = `You have an error in your SQL syntax on line ${
|
|
failedLine + 1
|
|
} near '${failedStatement}'`;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ((error?.message as string).includes('ECONNREFUSED')) {
|
|
message = 'Connection refused';
|
|
}
|
|
|
|
return new NodeOperationError(this.getNode(), error as Error, {
|
|
message,
|
|
description,
|
|
itemIndex,
|
|
});
|
|
}
|
|
|
|
export function wrapData(data: IDataObject | IDataObject[]): INodeExecutionData[] {
|
|
if (!Array.isArray(data)) {
|
|
return [{ json: data }];
|
|
}
|
|
return data.map((item) => ({
|
|
json: item,
|
|
}));
|
|
}
|
|
|
|
export function prepareOutput(
|
|
response: IDataObject[],
|
|
options: IDataObject,
|
|
statements: string[],
|
|
constructExecutionHelper: (
|
|
inputData: INodeExecutionData[],
|
|
options: {
|
|
itemData: IPairedItemData | IPairedItemData[];
|
|
},
|
|
) => NodeExecutionWithMetadata[],
|
|
itemData: IPairedItemData | IPairedItemData[],
|
|
) {
|
|
let returnData: INodeExecutionData[] = [];
|
|
|
|
if (options.detailedOutput) {
|
|
response.forEach((entry, index) => {
|
|
const item = {
|
|
sql: statements[index],
|
|
data: entry,
|
|
};
|
|
|
|
const executionData = constructExecutionHelper(wrapData(item), {
|
|
itemData,
|
|
});
|
|
|
|
returnData = returnData.concat(executionData);
|
|
});
|
|
} else {
|
|
response
|
|
.filter((entry) => Array.isArray(entry))
|
|
.forEach((entry, index) => {
|
|
const executionData = constructExecutionHelper(wrapData(entry), {
|
|
itemData: Array.isArray(itemData) ? itemData[index] : itemData,
|
|
});
|
|
|
|
returnData = returnData.concat(executionData);
|
|
});
|
|
}
|
|
|
|
if (!returnData.length) {
|
|
if ((options?.nodeVersion as number) < 2.2) {
|
|
returnData.push({ json: { success: true }, pairedItem: itemData });
|
|
} else {
|
|
const isSelectQuery = statements
|
|
.filter((statement) => !statement.startsWith('--'))
|
|
.every((statement) =>
|
|
statement
|
|
.replace(/\/\*.*?\*\//g, '') // remove multiline comments
|
|
.replace(/\n/g, '')
|
|
.toLowerCase()
|
|
.startsWith('select'),
|
|
);
|
|
|
|
if (!isSelectQuery) {
|
|
returnData.push({ json: { success: true }, pairedItem: itemData });
|
|
}
|
|
}
|
|
}
|
|
|
|
return returnData;
|
|
}
|
|
const END_OF_STATEMENT = /;(?=(?:[^'\\]|'[^']*?'|\\[\s\S])*?$)/g;
|
|
export const splitQueryToStatements = (query: string, filterOutEmpty = true) => {
|
|
const statements = query
|
|
.replace(/\n/g, '')
|
|
.split(END_OF_STATEMENT)
|
|
.map((statement) => statement.trim());
|
|
return filterOutEmpty ? statements.filter((statement) => statement !== '') : statements;
|
|
};
|
|
|
|
export function configureQueryRunner(
|
|
this: IExecuteFunctions,
|
|
options: IDataObject,
|
|
pool: Mysql2Pool,
|
|
) {
|
|
return async (queries: QueryWithValues[]) => {
|
|
if (queries.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
let returnData: INodeExecutionData[] = [];
|
|
const mode = (options.queryBatching as QueryMode) || BATCH_MODE.SINGLE;
|
|
|
|
const connection = await pool.getConnection();
|
|
|
|
if (mode === BATCH_MODE.SINGLE) {
|
|
const formattedQueries = queries.map(({ query, values }) => connection.format(query, values));
|
|
try {
|
|
//releasing connection after formatting queries, otherwise pool.query() will fail with timeout
|
|
connection.release();
|
|
|
|
let singleQuery = '';
|
|
if (formattedQueries.length > 1) {
|
|
singleQuery = formattedQueries.map((query) => query.trim().replace(/;$/, '')).join(';');
|
|
} else {
|
|
singleQuery = formattedQueries[0];
|
|
}
|
|
|
|
let response: IDataObject | IDataObject[] = (
|
|
await pool.query(singleQuery)
|
|
)[0] as unknown as IDataObject;
|
|
|
|
if (!response) return [];
|
|
|
|
let statements;
|
|
if ((options?.nodeVersion as number) <= 2.3) {
|
|
statements = singleQuery
|
|
.replace(/\n/g, '')
|
|
.split(';')
|
|
.filter((statement) => statement !== '');
|
|
} else {
|
|
statements = splitQueryToStatements(singleQuery);
|
|
}
|
|
|
|
if (Array.isArray(response)) {
|
|
if (statements.length === 1) response = [response];
|
|
} else {
|
|
response = [response];
|
|
}
|
|
|
|
//because single query is used in this mode mapping itemIndex not possible, setting all items as paired
|
|
const pairedItem = generatePairedItemData(queries.length);
|
|
|
|
returnData = returnData.concat(
|
|
prepareOutput(
|
|
response,
|
|
options,
|
|
statements,
|
|
this.helpers.constructExecutionMetaData,
|
|
pairedItem,
|
|
),
|
|
);
|
|
} catch (err) {
|
|
const error = parseMySqlError.call(this, err, 0, formattedQueries);
|
|
|
|
if (!this.continueOnFail()) throw error;
|
|
returnData.push({ json: { message: error.message, error: { ...error } } });
|
|
}
|
|
} else {
|
|
if (mode === BATCH_MODE.INDEPENDENTLY) {
|
|
let formattedQuery = '';
|
|
for (const [index, queryWithValues] of queries.entries()) {
|
|
try {
|
|
const { query, values } = queryWithValues;
|
|
formattedQuery = connection.format(query, values);
|
|
|
|
let statements;
|
|
if ((options?.nodeVersion as number) <= 2.3) {
|
|
statements = formattedQuery.split(';').map((q) => q.trim());
|
|
} else {
|
|
statements = splitQueryToStatements(formattedQuery, false);
|
|
}
|
|
|
|
const responses: IDataObject[] = [];
|
|
for (const statement of statements) {
|
|
if (statement === '') continue;
|
|
const response = (await connection.query(statement))[0] as unknown as IDataObject;
|
|
|
|
responses.push(response);
|
|
}
|
|
|
|
returnData = returnData.concat(
|
|
prepareOutput(
|
|
responses,
|
|
options,
|
|
statements,
|
|
this.helpers.constructExecutionMetaData,
|
|
{ item: index },
|
|
),
|
|
);
|
|
} catch (err) {
|
|
const error = parseMySqlError.call(this, err, index, [formattedQuery]);
|
|
|
|
if (!this.continueOnFail()) {
|
|
connection.release();
|
|
throw error;
|
|
}
|
|
returnData.push(prepareErrorItem(queries[index], error as Error, index));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (mode === BATCH_MODE.TRANSACTION) {
|
|
await connection.beginTransaction();
|
|
|
|
let formattedQuery = '';
|
|
for (const [index, queryWithValues] of queries.entries()) {
|
|
try {
|
|
const { query, values } = queryWithValues;
|
|
formattedQuery = connection.format(query, values);
|
|
|
|
let statements;
|
|
if ((options?.nodeVersion as number) <= 2.3) {
|
|
statements = formattedQuery.split(';').map((q) => q.trim());
|
|
} else {
|
|
statements = splitQueryToStatements(formattedQuery, false);
|
|
}
|
|
|
|
const responses: IDataObject[] = [];
|
|
for (const statement of statements) {
|
|
if (statement === '') continue;
|
|
const response = (await connection.query(statement))[0] as unknown as IDataObject;
|
|
|
|
responses.push(response);
|
|
}
|
|
|
|
returnData = returnData.concat(
|
|
prepareOutput(
|
|
responses,
|
|
options,
|
|
statements,
|
|
this.helpers.constructExecutionMetaData,
|
|
{ item: index },
|
|
),
|
|
);
|
|
} catch (err) {
|
|
const error = parseMySqlError.call(this, err, index, [formattedQuery]);
|
|
|
|
if (connection) {
|
|
await connection.rollback();
|
|
connection.release();
|
|
}
|
|
|
|
if (!this.continueOnFail()) throw error;
|
|
returnData.push(prepareErrorItem(queries[index], error as Error, index));
|
|
|
|
// Return here because we already rolled back the transaction
|
|
return returnData;
|
|
}
|
|
}
|
|
|
|
await connection.commit();
|
|
}
|
|
|
|
connection.release();
|
|
}
|
|
|
|
return returnData;
|
|
};
|
|
}
|
|
|
|
export function addWhereClauses(
|
|
node: INode,
|
|
itemIndex: number,
|
|
query: string,
|
|
clauses: WhereClause[],
|
|
replacements: QueryValues,
|
|
combineConditions?: string,
|
|
): [string, QueryValues] {
|
|
if (clauses.length === 0) return [query, replacements];
|
|
|
|
let combineWith = 'AND';
|
|
|
|
if (combineConditions === 'OR') {
|
|
combineWith = 'OR';
|
|
}
|
|
|
|
let whereQuery = ' WHERE';
|
|
const values: QueryValues = [];
|
|
|
|
clauses.forEach((clause, index) => {
|
|
if (clause.condition === 'equal') {
|
|
clause.condition = '=';
|
|
}
|
|
|
|
if (['>', '<', '>=', '<='].includes(clause.condition)) {
|
|
const value = Number(clause.value);
|
|
|
|
if (Number.isNaN(value)) {
|
|
throw new NodeOperationError(
|
|
node,
|
|
`Operator in entry ${index + 1} of 'Select Rows' works with numbers, but value ${
|
|
clause.value
|
|
} is not a number`,
|
|
{
|
|
itemIndex,
|
|
},
|
|
);
|
|
}
|
|
|
|
clause.value = value;
|
|
}
|
|
|
|
let valueReplacement = ' ';
|
|
if (clause.condition !== 'IS NULL' && clause.condition !== 'IS NOT NULL') {
|
|
valueReplacement = ' ?';
|
|
values.push(clause.value);
|
|
}
|
|
|
|
const operator = index === clauses.length - 1 ? '' : ` ${combineWith}`;
|
|
|
|
whereQuery += ` ${escapeSqlIdentifier(clause.column)} ${
|
|
clause.condition
|
|
}${valueReplacement}${operator}`;
|
|
});
|
|
|
|
return [`${query}${whereQuery}`, replacements.concat(...values)];
|
|
}
|
|
|
|
export function addSortRules(
|
|
query: string,
|
|
rules: SortRule[],
|
|
replacements: QueryValues,
|
|
): [string, QueryValues] {
|
|
if (rules.length === 0) return [query, replacements];
|
|
|
|
let orderByQuery = ' ORDER BY';
|
|
const values: string[] = [];
|
|
|
|
rules.forEach((rule, index) => {
|
|
const endWith = index === rules.length - 1 ? '' : ',';
|
|
|
|
orderByQuery += ` ${escapeSqlIdentifier(rule.column)} ${rule.direction}${endWith}`;
|
|
});
|
|
|
|
return [`${query}${orderByQuery}`, replacements.concat(...values)];
|
|
}
|
|
|
|
export function replaceEmptyStringsByNulls(
|
|
items: INodeExecutionData[],
|
|
replace?: boolean,
|
|
): INodeExecutionData[] {
|
|
if (!replace) return [...items];
|
|
|
|
const returnData: INodeExecutionData[] = items.map((item) => {
|
|
const newItem = { ...item };
|
|
const keys = Object.keys(newItem.json);
|
|
|
|
for (const key of keys) {
|
|
if (newItem.json[key] === '') {
|
|
newItem.json[key] = null;
|
|
}
|
|
}
|
|
|
|
return newItem;
|
|
});
|
|
|
|
return returnData;
|
|
}
|