Add Snowflake Node (#1230)

This commit is contained in:
Ricardo Espinoza 2020-12-10 04:17:16 -05:00 committed by GitHub
parent e8f53effb4
commit c87382c086
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 380 additions and 0 deletions

View file

@ -0,0 +1,70 @@
import {
ICredentialType,
NodePropertyTypes,
} from 'n8n-workflow';
export class Snowflake implements ICredentialType {
name = 'snowflake';
displayName = 'Snowflake';
documentationUrl = 'snowflake';
properties = [
{
displayName: 'Account',
name: 'account',
type: 'string' as NodePropertyTypes,
default: '',
description: 'Enter the name of your Snowflake account.',
},
{
displayName: 'Database',
name: 'database',
type: 'string' as NodePropertyTypes,
default: '',
description: 'Specify the database you want to use after creating the connection.',
},
{
displayName: 'Warehouse',
name: 'warehouse',
type: 'string' as NodePropertyTypes,
default: '',
description: 'The default virtual warehouse to use for the session after connecting. Used for performing queries, loading data, etc.',
},
{
displayName: 'Username',
name: 'username',
type: 'string' as NodePropertyTypes,
default: '',
},
{
displayName: 'Password',
name: 'password',
type: 'string' as NodePropertyTypes,
typeOptions: {
password: true,
},
default: '',
},
{
displayName: 'Schema',
name: 'schema',
type: 'string' as NodePropertyTypes,
default: '',
description: 'Enter the schema you want to use after creating the connection',
},
{
displayName: 'Role',
name: 'role',
type: 'string' as NodePropertyTypes,
default: '',
description: 'Enter the security role you want to use after creating the connection',
},
{
displayName: 'Client Session Keep Alive',
name: 'clientSessionKeepAlive',
type: 'boolean' as NodePropertyTypes,
default: false,
description: `By default, client connections typically time out approximately 3-4 hours after the most recent query was executed.<br>
If the parameter clientSessionKeepAlive is set to true, the clients connection to the server will be kept alive indefinitely, even if no queries are executed.`,
},
];
}

View file

@ -0,0 +1,62 @@
import {
IDataObject,
INodeExecutionData,
} from 'n8n-workflow';
import * as snowflake from 'snowflake-sdk';
export function connect(conn: snowflake.Connection) {
return new Promise((resolve, reject) => {
conn.connect((err, conn) => {
if (!err) {
resolve();
} else {
reject(err);
}
});
});
}
export function destroy(conn: snowflake.Connection) {
return new Promise((resolve, reject) => {
conn.destroy((err, conn) => {
if (!err) {
resolve();
} else {
reject(err);
}
});
});
}
export function execute(conn: snowflake.Connection, sqlText: string, binds: snowflake.InsertBinds) {
return new Promise((resolve, reject) => {
conn.execute({
sqlText,
binds,
complete: (err, stmt, rows) => {
if (!err) {
resolve(rows);
} else {
reject(err);
}
},
});
});
}
export function copyInputItems(items: INodeExecutionData[], properties: string[]): IDataObject[] {
// Prepare the data to insert and copy it to be returned
let newItem: IDataObject;
return items.map((item) => {
newItem = {};
for (const property of properties) {
if (item.json[property] === undefined) {
newItem[property] = null;
} else {
newItem[property] = JSON.parse(JSON.stringify(item.json[property]));
}
}
return newItem;
});
}

View file

@ -0,0 +1,244 @@
import {
IExecuteFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeExecutionData,
INodeType,
INodeTypeDescription,
} from 'n8n-workflow';
import {
connect,
copyInputItems,
destroy,
execute,
} from './GenericFunctions';
import * as snowflake from 'snowflake-sdk';
export class Snowflake implements INodeType {
description: INodeTypeDescription = {
displayName: 'Snowflake',
name: 'snowflake',
icon: 'file:snowflake.png',
group: ['input'],
version: 1,
description: 'Get, add and update data in Snowflake.',
defaults: {
name: 'Snowflake',
color: '#5ebbeb',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'snowflake',
required: true,
},
],
properties: [
{
displayName: 'Operation',
name: 'operation',
type: 'options',
options: [
{
name: 'Execute Query',
value: 'executeQuery',
description: 'Execute an SQL query.',
},
{
name: 'Insert',
value: 'insert',
description: 'Insert rows in database.',
},
{
name: 'Update',
value: 'update',
description: 'Update rows in database.',
},
],
default: 'insert',
description: 'The operation to perform.',
},
// ----------------------------------
// executeQuery
// ----------------------------------
{
displayName: 'Query',
name: 'query',
type: 'string',
typeOptions: {
rows: 5,
},
displayOptions: {
show: {
operation: [
'executeQuery',
],
},
},
default: '',
placeholder: 'SELECT id, name FROM product WHERE id < 40',
required: true,
description: 'The SQL query to execute.',
},
// ----------------------------------
// insert
// ----------------------------------
{
displayName: 'Table',
name: 'table',
type: 'string',
displayOptions: {
show: {
operation: [
'insert',
],
},
},
default: '',
required: true,
description: 'Name of the table in which to insert data to.',
},
{
displayName: 'Columns',
name: 'columns',
type: 'string',
displayOptions: {
show: {
operation: [
'insert',
],
},
},
default: '',
placeholder: 'id,name,description',
description: 'Comma separated list of the properties which should used as columns for the new rows.',
},
// ----------------------------------
// update
// ----------------------------------
{
displayName: 'Table',
name: 'table',
type: 'string',
displayOptions: {
show: {
operation: [
'update',
],
},
},
default: '',
required: true,
description: 'Name of the table in which to update data in',
},
{
displayName: 'Update Key',
name: 'updateKey',
type: 'string',
displayOptions: {
show: {
operation: [
'update',
],
},
},
default: 'id',
required: true,
description: 'Name of the property which decides which rows in the database should be updated. Normally that would be "id".',
},
{
displayName: 'Columns',
name: 'columns',
type: 'string',
displayOptions: {
show: {
operation: [
'update',
],
},
},
default: '',
placeholder: 'name,description',
description: 'Comma separated list of the properties which should used as columns for rows to update.',
},
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const credentials = this.getCredentials('snowflake') as unknown as snowflake.ConnectionOptions;
const returnData: IDataObject[] = [];
let responseData;
const connection = snowflake.createConnection(credentials);
await connect(connection);
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0) as string;
if (operation === 'executeQuery') {
// ----------------------------------
// executeQuery
// ----------------------------------
for (let i = 0; i < items.length; i++) {
const query = this.getNodeParameter('query', i) as string;
responseData = await execute(connection, query, []);
returnData.push.apply(returnData, responseData as IDataObject[]);
}
}
if (operation === 'insert') {
// ----------------------------------
// insert
// ----------------------------------
const table = this.getNodeParameter('table', 0) as string;
const columnString = this.getNodeParameter('columns', 0) as string;
const columns = columnString.split(',').map(column => column.trim());
const query = `INSERT INTO ${table}(${columns.join(',')}) VALUES (${columns.map(column => '?').join(',')})`;
const data = copyInputItems(items, columns);
const binds = data.map((element => Object.values(element)));
await execute(connection, query, binds as unknown as snowflake.InsertBinds);
returnData.push.apply(returnData, data);
}
if (operation === 'update') {
// ----------------------------------
// update
// ----------------------------------
const table = this.getNodeParameter('table', 0) as string;
const updateKey = this.getNodeParameter('updateKey', 0) as string;
const columnString = this.getNodeParameter('columns', 0) as string;
const columns = columnString.split(',').map(column => column.trim());
if (!columns.includes(updateKey)) {
columns.unshift(updateKey);
}
const query = `UPDATE ${table} SET ${columns.map(column => `${column} = ?`).join(',')} WHERE ${updateKey} = ?;`;
const data = copyInputItems(items, columns);
const binds = data.map((element => Object.values(element).concat(element[updateKey])));
for (let i = 0; i < binds.length; i++) {
await execute(connection, query, binds[i] as unknown as snowflake.InsertBinds);
}
returnData.push.apply(returnData, data);
}
await destroy(connection);
return [this.helpers.returnJsonArray(returnData)];
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

View file

@ -183,6 +183,7 @@
"dist/credentials/SlackApi.credentials.js",
"dist/credentials/SlackOAuth2Api.credentials.js",
"dist/credentials/Sms77Api.credentials.js",
"dist/credentials/Snowflake.credentials.js",
"dist/credentials/Smtp.credentials.js",
"dist/credentials/SpotifyOAuth2Api.credentials.js",
"dist/credentials/StravaOAuth2Api.credentials.js",
@ -410,6 +411,7 @@
"dist/nodes/Signl4/Signl4.node.js",
"dist/nodes/Slack/Slack.node.js",
"dist/nodes/Sms77/Sms77.node.js",
"dist/nodes/Snowflake/Snowflake.node.js",
"dist/nodes/SplitInBatches.node.js",
"dist/nodes/Spontit/Spontit.node.js",
"dist/nodes/Spotify/Spotify.node.js",
@ -497,6 +499,7 @@
},
"dependencies": {
"@types/promise-ftp": "^1.3.4",
"@types/snowflake-sdk": "^1.5.1",
"aws4": "^1.8.0",
"basic-auth": "^2.0.1",
"change-case": "^4.1.1",
@ -531,6 +534,7 @@
"request": "^2.88.2",
"rhea": "^1.0.11",
"rss-parser": "^3.7.0",
"snowflake-sdk": "^1.5.3",
"ssh2-sftp-client": "^5.2.1",
"tmp-promise": "^3.0.2",
"uuid": "^3.4.0",