mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-23 10:32:17 -08:00
✨ Add AWS SQS node (#1530)
* implement sqs node to send message * remove console logs * update node to use url as value * add message attributes * tslint --fix * fix comment * address comments, fix subtitle bug, fix style issues * add comma * update pathname handling * fix spacing * fix queue loading for no queues and more than 1 * change logo to svg, support sending input as json * add binary data property support * fix node read to depend on item * 🎨 Fix SVG size to prevent cropping * ⚡ Ajust imports and display names per codebase * ✏️ Edit parameter descriptions * address comments * tslint fixes * update subtitle to operation * update fifo handling * ⚡ Improvements * ⚡ Minor fix Co-authored-by: Iván Ovejero <ivov.src@gmail.com> Co-authored-by: ricardo <ricardoespinoza105@gmail.com> Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
parent
dd6d523b85
commit
99bb6e40c1
|
@ -96,6 +96,21 @@ export class Aws implements ICredentialType {
|
|||
default: '',
|
||||
placeholder: 'https://email.{region}.amazonaws.com',
|
||||
},
|
||||
{
|
||||
displayName: 'SQS Endpoint',
|
||||
name: 'sqsEndpoint',
|
||||
description: 'If you use Amazon VPC to host n8n, you can establish a connection between your VPC and SQS using a VPC endpoint. Leave blank to use the default endpoint.',
|
||||
type: 'string' as NodePropertyTypes,
|
||||
displayOptions: {
|
||||
show: {
|
||||
customEndpoints: [
|
||||
true,
|
||||
],
|
||||
},
|
||||
},
|
||||
default: '',
|
||||
placeholder: 'https://sqs.{region}.amazonaws.com',
|
||||
},
|
||||
{
|
||||
displayName: 'S3 Endpoint',
|
||||
name: 's3Endpoint',
|
||||
|
|
|
@ -20,6 +20,8 @@ function getEndpointForService(service: string, credentials: ICredentialDataDecr
|
|||
endpoint = credentials.lambdaEndpoint;
|
||||
} else if (service === 'sns' && credentials.snsEndpoint) {
|
||||
endpoint = credentials.snsEndpoint;
|
||||
} else if (service === 'sqs' && credentials.sqsEndpoint) {
|
||||
endpoint = credentials.sqsEndpoint;
|
||||
} else {
|
||||
endpoint = `https://${service}.${credentials.region}.amazonaws.com`;
|
||||
}
|
||||
|
|
21
packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.json
Normal file
21
packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.json
Normal file
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"node": "n8n-nodes-base.awsSqs",
|
||||
"nodeVersion": "1.0",
|
||||
"codexVersion": "1.0",
|
||||
"categories": [
|
||||
"Development",
|
||||
"Communication"
|
||||
],
|
||||
"resources": {
|
||||
"credentialDocumentation": [
|
||||
{
|
||||
"url": "https://docs.n8n.io/credentials/aws"
|
||||
}
|
||||
],
|
||||
"primaryDocumentation": [
|
||||
{
|
||||
"url": "https://docs.n8n.io/nodes/n8n-nodes-base.awsSqs/"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
387
packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.ts
Normal file
387
packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.ts
Normal file
|
@ -0,0 +1,387 @@
|
|||
import {
|
||||
BINARY_ENCODING,
|
||||
IExecuteFunctions,
|
||||
} from 'n8n-core';
|
||||
|
||||
import {
|
||||
IDataObject,
|
||||
ILoadOptionsFunctions,
|
||||
INodeExecutionData,
|
||||
INodeParameters,
|
||||
INodePropertyOptions,
|
||||
INodeType,
|
||||
INodeTypeDescription,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import {
|
||||
URL,
|
||||
} from 'url';
|
||||
|
||||
import {
|
||||
awsApiRequestSOAP,
|
||||
} from '../GenericFunctions';
|
||||
|
||||
export class AwsSqs implements INodeType {
|
||||
description: INodeTypeDescription = {
|
||||
displayName: 'AWS SQS',
|
||||
name: 'awsSqs',
|
||||
icon: 'file:sqs.svg',
|
||||
group: ['output'],
|
||||
version: 1,
|
||||
subtitle: `={{$parameter["operation"]}}`,
|
||||
description: 'Sends messages to AWS SQS',
|
||||
defaults: {
|
||||
name: 'AWS SQS',
|
||||
color: '#FF9900',
|
||||
},
|
||||
inputs: ['main'],
|
||||
outputs: ['main'],
|
||||
credentials: [
|
||||
{
|
||||
name: 'aws',
|
||||
required: true,
|
||||
},
|
||||
],
|
||||
properties: [
|
||||
{
|
||||
displayName: 'Operation',
|
||||
name: 'operation',
|
||||
type: 'options',
|
||||
options: [
|
||||
{
|
||||
name: 'Send message',
|
||||
value: 'sendMessage',
|
||||
description: 'Send a message to a queue.',
|
||||
},
|
||||
],
|
||||
default: 'sendMessage',
|
||||
description: 'The operation to perform.',
|
||||
},
|
||||
{
|
||||
displayName: 'Queue',
|
||||
name: 'queue',
|
||||
type: 'options',
|
||||
typeOptions: {
|
||||
loadOptionsMethod: 'getQueues',
|
||||
},
|
||||
displayOptions: {
|
||||
show: {
|
||||
operation: [
|
||||
'sendMessage',
|
||||
],
|
||||
},
|
||||
},
|
||||
options: [],
|
||||
default: '',
|
||||
required: true,
|
||||
description: 'Queue to send a message to.',
|
||||
},
|
||||
{
|
||||
displayName: 'Queue Type',
|
||||
name: 'queueType',
|
||||
type: 'options',
|
||||
options: [
|
||||
{
|
||||
name: 'FIFO',
|
||||
value: 'fifo',
|
||||
description: 'FIFO SQS queue.',
|
||||
},
|
||||
{
|
||||
name: 'Standard',
|
||||
value: 'standard',
|
||||
description: 'Standard SQS queue.',
|
||||
},
|
||||
],
|
||||
default: 'standard',
|
||||
description: 'The operation to perform.',
|
||||
},
|
||||
{
|
||||
displayName: 'Send Input Data',
|
||||
name: 'sendInputData',
|
||||
type: 'boolean',
|
||||
default: true,
|
||||
description: 'Send the data the node receives as JSON to SQS.',
|
||||
},
|
||||
{
|
||||
displayName: 'Message',
|
||||
name: 'message',
|
||||
type: 'string',
|
||||
displayOptions: {
|
||||
show: {
|
||||
operation: [
|
||||
'sendMessage',
|
||||
],
|
||||
sendInputData: [
|
||||
false,
|
||||
],
|
||||
},
|
||||
},
|
||||
required: true,
|
||||
typeOptions: {
|
||||
alwaysOpenEditWindow: true,
|
||||
},
|
||||
default: '',
|
||||
description: 'Message to send to the queue.',
|
||||
},
|
||||
{
|
||||
displayName: 'Message Group ID',
|
||||
name: 'messageGroupId',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'Tag that specifies that a message belongs to a specific message group. Applies only to FIFO (first-in-first-out) queues.',
|
||||
displayOptions: {
|
||||
show: {
|
||||
queueType: [
|
||||
'fifo',
|
||||
],
|
||||
},
|
||||
},
|
||||
required: true,
|
||||
},
|
||||
{
|
||||
displayName: 'Options',
|
||||
name: 'options',
|
||||
type: 'collection',
|
||||
displayOptions: {
|
||||
show: {
|
||||
operation: [
|
||||
'sendMessage',
|
||||
],
|
||||
},
|
||||
},
|
||||
default: {},
|
||||
placeholder: 'Add Option',
|
||||
options: [
|
||||
{
|
||||
displayName: 'Delay Seconds',
|
||||
name: 'delaySeconds',
|
||||
type: 'number',
|
||||
displayOptions: {
|
||||
show: {
|
||||
'/queueType': [
|
||||
'standard',
|
||||
],
|
||||
},
|
||||
},
|
||||
description: 'How long, in seconds, to delay a message for.',
|
||||
default: 0,
|
||||
typeOptions: {
|
||||
minValue: 0,
|
||||
maxValue: 900,
|
||||
},
|
||||
},
|
||||
{
|
||||
displayName: 'Message Attributes',
|
||||
name: 'messageAttributes',
|
||||
placeholder: 'Add Attribute',
|
||||
type: 'fixedCollection',
|
||||
typeOptions: {
|
||||
multipleValues: true,
|
||||
},
|
||||
description: 'Attributes to set.',
|
||||
default: {},
|
||||
options: [
|
||||
{
|
||||
name: 'binary',
|
||||
displayName: 'Binary',
|
||||
values: [
|
||||
{
|
||||
displayName: 'Name',
|
||||
name: 'name',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'Name of the attribute.',
|
||||
},
|
||||
{
|
||||
displayName: 'Property Name',
|
||||
name: 'dataPropertyName',
|
||||
type: 'string',
|
||||
default: 'data',
|
||||
description: 'Name of the binary property which contains the data for the message attribute.',
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'number',
|
||||
displayName: 'Number',
|
||||
values: [
|
||||
{
|
||||
displayName: 'Name',
|
||||
name: 'name',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'Name of the attribute.',
|
||||
},
|
||||
{
|
||||
displayName: 'Value',
|
||||
name: 'value',
|
||||
type: 'number',
|
||||
default: 0,
|
||||
description: 'Number value of the attribute.',
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
name: 'string',
|
||||
displayName: 'String',
|
||||
values: [
|
||||
{
|
||||
displayName: 'Name',
|
||||
name: 'name',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'Name of the attribute.',
|
||||
},
|
||||
{
|
||||
displayName: 'Value',
|
||||
name: 'value',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'String value of attribute.',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
displayName: 'Message Deduplication ID',
|
||||
name: 'messageDeduplicationId',
|
||||
type: 'string',
|
||||
default: '',
|
||||
description: 'Token used for deduplication of sent messages. Applies only to FIFO (first-in-first-out) queues.',
|
||||
displayOptions: {
|
||||
show: {
|
||||
'/queueType': [
|
||||
'fifo',
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
methods = {
|
||||
loadOptions: {
|
||||
// Get all the available queues to display them to user so that it can be selected easily
|
||||
async getQueues(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
|
||||
let data;
|
||||
try {
|
||||
// loads first 1000 queues from SQS
|
||||
data = await awsApiRequestSOAP.call(this, 'sqs', 'GET', `?Action=ListQueues`);
|
||||
} catch (err) {
|
||||
throw new Error(`AWS Error: ${err}`);
|
||||
}
|
||||
|
||||
let queues = data.ListQueuesResponse.ListQueuesResult.QueueUrl;
|
||||
if (!queues) {
|
||||
return [];
|
||||
}
|
||||
|
||||
if (!Array.isArray(queues)) {
|
||||
// If user has only a single queue no array get returned so we make
|
||||
// one manually to be able to process everything identically
|
||||
queues = [queues];
|
||||
}
|
||||
|
||||
return queues.map((queueUrl: string) => {
|
||||
const urlParts = queueUrl.split('/');
|
||||
const name = urlParts[urlParts.length - 1];
|
||||
|
||||
return {
|
||||
name,
|
||||
value: queueUrl,
|
||||
};
|
||||
});
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
|
||||
const items = this.getInputData();
|
||||
const returnData: IDataObject[] = [];
|
||||
|
||||
const operation = this.getNodeParameter('operation', 0) as string;
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
const queueUrl = this.getNodeParameter('queue', i) as string;
|
||||
const queuePath = new URL(queueUrl).pathname;
|
||||
const params = [];
|
||||
|
||||
const options = this.getNodeParameter('options', i, {}) as IDataObject;
|
||||
const sendInputData = this.getNodeParameter('sendInputData', i) as boolean;
|
||||
|
||||
const message = sendInputData ? JSON.stringify(items[i].json) : this.getNodeParameter('message', i) as string;
|
||||
params.push(`MessageBody=${message}`);
|
||||
|
||||
if (options.delaySeconds) {
|
||||
params.push(`DelaySeconds=${options.delaySeconds}`);
|
||||
}
|
||||
|
||||
const queueType = this.getNodeParameter('queueType', i, {}) as string;
|
||||
if (queueType === 'fifo') {
|
||||
const messageDeduplicationId = this.getNodeParameter('options.messageDeduplicationId', i, '') as string;
|
||||
if (messageDeduplicationId) {
|
||||
params.push(`MessageDeduplicationId=${messageDeduplicationId}`);
|
||||
}
|
||||
|
||||
const messageGroupId = this.getNodeParameter('messageGroupId', i) as string;
|
||||
if (messageGroupId) {
|
||||
params.push(`MessageGroupId=${messageGroupId}`);
|
||||
}
|
||||
}
|
||||
|
||||
let attributeCount = 0;
|
||||
// Add string values
|
||||
(this.getNodeParameter('options.messageAttributes.string', i, []) as INodeParameters[]).forEach((attribute) => {
|
||||
attributeCount++;
|
||||
params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.StringValue=${attribute.value}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.DataType=String`);
|
||||
});
|
||||
|
||||
// Add binary values
|
||||
(this.getNodeParameter('options.messageAttributes.binary', i, []) as INodeParameters[]).forEach((attribute) => {
|
||||
attributeCount++;
|
||||
const dataPropertyName = attribute.dataPropertyName as string;
|
||||
const item = items[i];
|
||||
|
||||
if (item.binary === undefined) {
|
||||
throw new Error('No binary data set. So message attribute cannot be added!');
|
||||
}
|
||||
|
||||
if (item.binary[dataPropertyName] === undefined) {
|
||||
throw new Error(`The binary property "${dataPropertyName}" does not exist. So message attribute cannot be added!`);
|
||||
}
|
||||
|
||||
const binaryData = item.binary[dataPropertyName].data;
|
||||
|
||||
params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.BinaryValue=${binaryData}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.DataType=Binary`);
|
||||
});
|
||||
|
||||
// Add number values
|
||||
(this.getNodeParameter('options.messageAttributes.number', i, []) as INodeParameters[]).forEach((attribute) => {
|
||||
attributeCount++;
|
||||
params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.StringValue=${attribute.value}`);
|
||||
params.push(`MessageAttribute.${attributeCount}.Value.DataType=Number`);
|
||||
});
|
||||
|
||||
let responseData;
|
||||
try {
|
||||
responseData = await awsApiRequestSOAP.call(this, 'sqs', 'GET', `${queuePath}/?Action=${operation}&` + params.join('&'));
|
||||
} catch (err) {
|
||||
throw new Error(`AWS Error: ${err}`);
|
||||
}
|
||||
|
||||
const result = responseData.SendMessageResponse.SendMessageResult;
|
||||
returnData.push(result as IDataObject);
|
||||
}
|
||||
|
||||
return [this.helpers.returnJsonArray(returnData)];
|
||||
}
|
||||
}
|
1
packages/nodes-base/nodes/Aws/SQS/sqs.svg
Normal file
1
packages/nodes-base/nodes/Aws/SQS/sqs.svg
Normal file
|
@ -0,0 +1 @@
|
|||
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="-5 0 85 85" fill="#fff" fill-rule="evenodd" stroke="#000" stroke-linecap="round" stroke-linejoin="round"><use xlink:href="#A" x="2.188" y="2.5"/><symbol id="A" overflow="visible"><g fill="#876929" stroke="none"><path d="M0 25.938L.021 63.44 35 80l34.98-16.56v-9.368l-29.745-3.508.02-21.127L70 25.948V16.57L35 0 0 16.56v9.378z"/><path d="M.021 54.062l34.98 9.942V80L.021 63.44v-9.378z"/><path d="M4.465 65.549L0 63.431V16.56l4.475-2.109-.01 51.098z"/></g><path d="M40.255 50.564l-35.79 4.762.01-30.661 35.78 4.772v21.127zM70 25.948l-35-9.951V0l35 16.57v9.378zm-.02 28.124L35 64.004V80l34.98-16.56v-9.368z" stroke="none" fill="#d9a741"/><path d="M22.109 48.581l12.892 1.526V29.815L22.109 31.36v17.221zM9.125 47.065l8.365.982V31.924l-8.365 1.001v14.14z" stroke="none" fill="#876929"/><path d="M4.475 24.665L35 15.996l35 9.951-29.745 3.489-35.78-4.772z" fill="#624a1e" stroke="none"/><path d="M4.465 55.326L35 64.004l34.98-9.932-29.724-3.508-35.79 4.762z" fill="#fad791" stroke="none"/><path d="M69.98 45.918L35 50.107V29.815l34.98 4.218v11.885z" fill="#d9a741" stroke="none"/></symbol></svg>
|
After Width: | Height: | Size: 1.2 KiB |
|
@ -288,6 +288,7 @@
|
|||
"dist/nodes/Aws/Rekognition/AwsRekognition.node.js",
|
||||
"dist/nodes/Aws/S3/AwsS3.node.js",
|
||||
"dist/nodes/Aws/SES/AwsSes.node.js",
|
||||
"dist/nodes/Aws/SQS/AwsSqs.node.js",
|
||||
"dist/nodes/Aws/AwsSns.node.js",
|
||||
"dist/nodes/Aws/AwsSnsTrigger.node.js",
|
||||
"dist/nodes/Bannerbear/Bannerbear.node.js",
|
||||
|
|
Loading…
Reference in a new issue