diff --git a/packages/nodes-base/credentials/Aws.credentials.ts b/packages/nodes-base/credentials/Aws.credentials.ts index 69fe9f1a47..732817d4b8 100644 --- a/packages/nodes-base/credentials/Aws.credentials.ts +++ b/packages/nodes-base/credentials/Aws.credentials.ts @@ -96,6 +96,21 @@ export class Aws implements ICredentialType { default: '', placeholder: 'https://email.{region}.amazonaws.com', }, + { + displayName: 'SQS Endpoint', + name: 'sqsEndpoint', + description: 'If you use Amazon VPC to host n8n, you can establish a connection between your VPC and SQS using a VPC endpoint. Leave blank to use the default endpoint.', + type: 'string' as NodePropertyTypes, + displayOptions: { + show: { + customEndpoints: [ + true, + ], + }, + }, + default: '', + placeholder: 'https://sqs.{region}.amazonaws.com', + }, { displayName: 'S3 Endpoint', name: 's3Endpoint', diff --git a/packages/nodes-base/nodes/Aws/GenericFunctions.ts b/packages/nodes-base/nodes/Aws/GenericFunctions.ts index 9bccee16c3..edc733af64 100644 --- a/packages/nodes-base/nodes/Aws/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Aws/GenericFunctions.ts @@ -20,6 +20,8 @@ function getEndpointForService(service: string, credentials: ICredentialDataDecr endpoint = credentials.lambdaEndpoint; } else if (service === 'sns' && credentials.snsEndpoint) { endpoint = credentials.snsEndpoint; + } else if (service === 'sqs' && credentials.sqsEndpoint) { + endpoint = credentials.sqsEndpoint; } else { endpoint = `https://${service}.${credentials.region}.amazonaws.com`; } diff --git a/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.json b/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.json new file mode 100644 index 0000000000..ec0fa1cc0b --- /dev/null +++ b/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.json @@ -0,0 +1,21 @@ +{ + "node": "n8n-nodes-base.awsSqs", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": [ + "Development", + "Communication" + ], + "resources": { + "credentialDocumentation": [ + { + "url": "https://docs.n8n.io/credentials/aws" + } + ], + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/nodes/n8n-nodes-base.awsSqs/" + } + ] + } +} \ No newline at end of file diff --git a/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.ts b/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.ts new file mode 100644 index 0000000000..fb58bd7139 --- /dev/null +++ b/packages/nodes-base/nodes/Aws/SQS/AwsSqs.node.ts @@ -0,0 +1,387 @@ +import { + BINARY_ENCODING, + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + ILoadOptionsFunctions, + INodeExecutionData, + INodeParameters, + INodePropertyOptions, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +import { + URL, +} from 'url'; + +import { + awsApiRequestSOAP, +} from '../GenericFunctions'; + +export class AwsSqs implements INodeType { + description: INodeTypeDescription = { + displayName: 'AWS SQS', + name: 'awsSqs', + icon: 'file:sqs.svg', + group: ['output'], + version: 1, + subtitle: `={{$parameter["operation"]}}`, + description: 'Sends messages to AWS SQS', + defaults: { + name: 'AWS SQS', + color: '#FF9900', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'aws', + required: true, + }, + ], + properties: [ + { + displayName: 'Operation', + name: 'operation', + type: 'options', + options: [ + { + name: 'Send message', + value: 'sendMessage', + description: 'Send a message to a queue.', + }, + ], + default: 'sendMessage', + description: 'The operation to perform.', + }, + { + displayName: 'Queue', + name: 'queue', + type: 'options', + typeOptions: { + loadOptionsMethod: 'getQueues', + }, + displayOptions: { + show: { + operation: [ + 'sendMessage', + ], + }, + }, + options: [], + default: '', + required: true, + description: 'Queue to send a message to.', + }, + { + displayName: 'Queue Type', + name: 'queueType', + type: 'options', + options: [ + { + name: 'FIFO', + value: 'fifo', + description: 'FIFO SQS queue.', + }, + { + name: 'Standard', + value: 'standard', + description: 'Standard SQS queue.', + }, + ], + default: 'standard', + description: 'The operation to perform.', + }, + { + displayName: 'Send Input Data', + name: 'sendInputData', + type: 'boolean', + default: true, + description: 'Send the data the node receives as JSON to SQS.', + }, + { + displayName: 'Message', + name: 'message', + type: 'string', + displayOptions: { + show: { + operation: [ + 'sendMessage', + ], + sendInputData: [ + false, + ], + }, + }, + required: true, + typeOptions: { + alwaysOpenEditWindow: true, + }, + default: '', + description: 'Message to send to the queue.', + }, + { + displayName: 'Message Group ID', + name: 'messageGroupId', + type: 'string', + default: '', + description: 'Tag that specifies that a message belongs to a specific message group. Applies only to FIFO (first-in-first-out) queues.', + displayOptions: { + show: { + queueType: [ + 'fifo', + ], + }, + }, + required: true, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + displayOptions: { + show: { + operation: [ + 'sendMessage', + ], + }, + }, + default: {}, + placeholder: 'Add Option', + options: [ + { + displayName: 'Delay Seconds', + name: 'delaySeconds', + type: 'number', + displayOptions: { + show: { + '/queueType': [ + 'standard', + ], + }, + }, + description: 'How long, in seconds, to delay a message for.', + default: 0, + typeOptions: { + minValue: 0, + maxValue: 900, + }, + }, + { + displayName: 'Message Attributes', + name: 'messageAttributes', + placeholder: 'Add Attribute', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + description: 'Attributes to set.', + default: {}, + options: [ + { + name: 'binary', + displayName: 'Binary', + values: [ + { + displayName: 'Name', + name: 'name', + type: 'string', + default: '', + description: 'Name of the attribute.', + }, + { + displayName: 'Property Name', + name: 'dataPropertyName', + type: 'string', + default: 'data', + description: 'Name of the binary property which contains the data for the message attribute.', + }, + ], + }, + { + name: 'number', + displayName: 'Number', + values: [ + { + displayName: 'Name', + name: 'name', + type: 'string', + default: '', + description: 'Name of the attribute.', + }, + { + displayName: 'Value', + name: 'value', + type: 'number', + default: 0, + description: 'Number value of the attribute.', + }, + ], + }, + { + name: 'string', + displayName: 'String', + values: [ + { + displayName: 'Name', + name: 'name', + type: 'string', + default: '', + description: 'Name of the attribute.', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + description: 'String value of attribute.', + }, + ], + }, + ], + }, + { + displayName: 'Message Deduplication ID', + name: 'messageDeduplicationId', + type: 'string', + default: '', + description: 'Token used for deduplication of sent messages. Applies only to FIFO (first-in-first-out) queues.', + displayOptions: { + show: { + '/queueType': [ + 'fifo', + ], + }, + }, + }, + ], + }, + ], + }; + + methods = { + loadOptions: { + // Get all the available queues to display them to user so that it can be selected easily + async getQueues(this: ILoadOptionsFunctions): Promise { + let data; + try { + // loads first 1000 queues from SQS + data = await awsApiRequestSOAP.call(this, 'sqs', 'GET', `?Action=ListQueues`); + } catch (err) { + throw new Error(`AWS Error: ${err}`); + } + + let queues = data.ListQueuesResponse.ListQueuesResult.QueueUrl; + if (!queues) { + return []; + } + + if (!Array.isArray(queues)) { + // If user has only a single queue no array get returned so we make + // one manually to be able to process everything identically + queues = [queues]; + } + + return queues.map((queueUrl: string) => { + const urlParts = queueUrl.split('/'); + const name = urlParts[urlParts.length - 1]; + + return { + name, + value: queueUrl, + }; + }); + }, + }, + }; + + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + const returnData: IDataObject[] = []; + + const operation = this.getNodeParameter('operation', 0) as string; + + for (let i = 0; i < items.length; i++) { + const queueUrl = this.getNodeParameter('queue', i) as string; + const queuePath = new URL(queueUrl).pathname; + const params = []; + + const options = this.getNodeParameter('options', i, {}) as IDataObject; + const sendInputData = this.getNodeParameter('sendInputData', i) as boolean; + + const message = sendInputData ? JSON.stringify(items[i].json) : this.getNodeParameter('message', i) as string; + params.push(`MessageBody=${message}`); + + if (options.delaySeconds) { + params.push(`DelaySeconds=${options.delaySeconds}`); + } + + const queueType = this.getNodeParameter('queueType', i, {}) as string; + if (queueType === 'fifo') { + const messageDeduplicationId = this.getNodeParameter('options.messageDeduplicationId', i, '') as string; + if (messageDeduplicationId) { + params.push(`MessageDeduplicationId=${messageDeduplicationId}`); + } + + const messageGroupId = this.getNodeParameter('messageGroupId', i) as string; + if (messageGroupId) { + params.push(`MessageGroupId=${messageGroupId}`); + } + } + + let attributeCount = 0; + // Add string values + (this.getNodeParameter('options.messageAttributes.string', i, []) as INodeParameters[]).forEach((attribute) => { + attributeCount++; + params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`); + params.push(`MessageAttribute.${attributeCount}.Value.StringValue=${attribute.value}`); + params.push(`MessageAttribute.${attributeCount}.Value.DataType=String`); + }); + + // Add binary values + (this.getNodeParameter('options.messageAttributes.binary', i, []) as INodeParameters[]).forEach((attribute) => { + attributeCount++; + const dataPropertyName = attribute.dataPropertyName as string; + const item = items[i]; + + if (item.binary === undefined) { + throw new Error('No binary data set. So message attribute cannot be added!'); + } + + if (item.binary[dataPropertyName] === undefined) { + throw new Error(`The binary property "${dataPropertyName}" does not exist. So message attribute cannot be added!`); + } + + const binaryData = item.binary[dataPropertyName].data; + + params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`); + params.push(`MessageAttribute.${attributeCount}.Value.BinaryValue=${binaryData}`); + params.push(`MessageAttribute.${attributeCount}.Value.DataType=Binary`); + }); + + // Add number values + (this.getNodeParameter('options.messageAttributes.number', i, []) as INodeParameters[]).forEach((attribute) => { + attributeCount++; + params.push(`MessageAttribute.${attributeCount}.Name=${attribute.name}`); + params.push(`MessageAttribute.${attributeCount}.Value.StringValue=${attribute.value}`); + params.push(`MessageAttribute.${attributeCount}.Value.DataType=Number`); + }); + + let responseData; + try { + responseData = await awsApiRequestSOAP.call(this, 'sqs', 'GET', `${queuePath}/?Action=${operation}&` + params.join('&')); + } catch (err) { + throw new Error(`AWS Error: ${err}`); + } + + const result = responseData.SendMessageResponse.SendMessageResult; + returnData.push(result as IDataObject); + } + + return [this.helpers.returnJsonArray(returnData)]; + } +} diff --git a/packages/nodes-base/nodes/Aws/SQS/sqs.svg b/packages/nodes-base/nodes/Aws/SQS/sqs.svg new file mode 100644 index 0000000000..5d51fef2a8 --- /dev/null +++ b/packages/nodes-base/nodes/Aws/SQS/sqs.svg @@ -0,0 +1 @@ + diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index d3d39335f6..b545998de8 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -288,6 +288,7 @@ "dist/nodes/Aws/Rekognition/AwsRekognition.node.js", "dist/nodes/Aws/S3/AwsS3.node.js", "dist/nodes/Aws/SES/AwsSes.node.js", + "dist/nodes/Aws/SQS/AwsSqs.node.js", "dist/nodes/Aws/AwsSns.node.js", "dist/nodes/Aws/AwsSnsTrigger.node.js", "dist/nodes/Bannerbear/Bannerbear.node.js",