2021-04-02 10:48:22 -07:00
import {
IExecuteFunctions ,
} from 'n8n-core' ;
import {
IDataObject ,
ILoadOptionsFunctions ,
INodeExecutionData ,
INodeParameters ,
INodePropertyOptions ,
INodeType ,
INodeTypeDescription ,
2021-04-16 09:33:36 -07:00
NodeApiError ,
NodeOperationError ,
2021-04-02 10:48:22 -07:00
} from 'n8n-workflow' ;
import {
URL ,
} from 'url' ;
import {
awsApiRequestSOAP ,
} from '../GenericFunctions' ;
2021-04-17 00:59:11 -07:00
import {
pascalCase ,
} from 'change-case' ;
2021-04-02 10:48:22 -07:00
export class AwsSqs implements INodeType {
description : INodeTypeDescription = {
displayName : 'AWS SQS' ,
name : 'awsSqs' ,
icon : 'file:sqs.svg' ,
group : [ 'output' ] ,
version : 1 ,
subtitle : ` ={{ $ parameter["operation"]}} ` ,
description : 'Sends messages to AWS SQS' ,
defaults : {
name : 'AWS SQS' ,
} ,
inputs : [ 'main' ] ,
outputs : [ 'main' ] ,
credentials : [
{
name : 'aws' ,
required : true ,
} ,
] ,
properties : [
{
displayName : 'Operation' ,
name : 'operation' ,
type : 'options' ,
2022-05-20 14:47:24 -07:00
noDataExpression : true ,
2021-04-02 10:48:22 -07:00
options : [
{
2022-06-03 10:23:49 -07:00
name : 'Send Message' ,
2021-04-02 10:48:22 -07:00
value : 'sendMessage' ,
2022-05-06 14:01:25 -07:00
description : 'Send a message to a queue' ,
2022-07-10 13:50:51 -07:00
action : 'Send a message to a queue' ,
2021-04-02 10:48:22 -07:00
} ,
] ,
default : 'sendMessage' ,
} ,
{
2022-06-03 10:23:49 -07:00
displayName : 'Queue Name or ID' ,
2021-04-02 10:48:22 -07:00
name : 'queue' ,
type : 'options' ,
typeOptions : {
loadOptionsMethod : 'getQueues' ,
} ,
displayOptions : {
show : {
operation : [
'sendMessage' ,
] ,
} ,
} ,
options : [ ] ,
default : '' ,
required : true ,
2022-07-14 13:05:11 -07:00
description : 'Queue to send a message to. Choose from the list, or specify an ID using an <a href="https://docs.n8n.io/code-examples/expressions/">expression</a>.' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Queue Type' ,
name : 'queueType' ,
type : 'options' ,
options : [
{
name : 'FIFO' ,
value : 'fifo' ,
2022-05-06 14:01:25 -07:00
description : 'FIFO SQS queue' ,
2021-04-02 10:48:22 -07:00
} ,
{
name : 'Standard' ,
value : 'standard' ,
2022-05-06 14:01:25 -07:00
description : 'Standard SQS queue' ,
2021-04-02 10:48:22 -07:00
} ,
] ,
default : 'standard' ,
} ,
{
displayName : 'Send Input Data' ,
name : 'sendInputData' ,
type : 'boolean' ,
default : true ,
2022-06-20 07:54:01 -07:00
description : 'Whether to send the data the node receives as JSON to SQS' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Message' ,
name : 'message' ,
type : 'string' ,
displayOptions : {
show : {
operation : [
'sendMessage' ,
] ,
sendInputData : [
false ,
] ,
} ,
} ,
required : true ,
typeOptions : {
alwaysOpenEditWindow : true ,
} ,
default : '' ,
2022-05-06 14:01:25 -07:00
description : 'Message to send to the queue' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Message Group ID' ,
name : 'messageGroupId' ,
type : 'string' ,
default : '' ,
description : 'Tag that specifies that a message belongs to a specific message group. Applies only to FIFO (first-in-first-out) queues.' ,
displayOptions : {
show : {
queueType : [
'fifo' ,
] ,
} ,
} ,
required : true ,
} ,
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
displayOptions : {
show : {
operation : [
'sendMessage' ,
] ,
} ,
} ,
default : { } ,
placeholder : 'Add Option' ,
options : [
{
displayName : 'Delay Seconds' ,
name : 'delaySeconds' ,
type : 'number' ,
displayOptions : {
show : {
'/queueType' : [
'standard' ,
] ,
} ,
} ,
2022-05-06 14:01:25 -07:00
description : 'How long, in seconds, to delay a message for' ,
2021-04-02 10:48:22 -07:00
default : 0 ,
typeOptions : {
minValue : 0 ,
maxValue : 900 ,
} ,
} ,
{
displayName : 'Message Attributes' ,
name : 'messageAttributes' ,
placeholder : 'Add Attribute' ,
type : 'fixedCollection' ,
typeOptions : {
multipleValues : true ,
} ,
2022-05-06 14:01:25 -07:00
description : 'Attributes to set' ,
2021-04-02 10:48:22 -07:00
default : { } ,
options : [
{
name : 'binary' ,
displayName : 'Binary' ,
values : [
{
displayName : 'Name' ,
name : 'name' ,
type : 'string' ,
default : '' ,
2022-05-06 14:01:25 -07:00
description : 'Name of the attribute' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Property Name' ,
name : 'dataPropertyName' ,
type : 'string' ,
default : 'data' ,
2022-05-06 14:01:25 -07:00
description : 'Name of the binary property which contains the data for the message attribute' ,
2021-04-02 10:48:22 -07:00
} ,
] ,
} ,
{
name : 'number' ,
displayName : 'Number' ,
values : [
{
displayName : 'Name' ,
name : 'name' ,
type : 'string' ,
default : '' ,
2022-05-06 14:01:25 -07:00
description : 'Name of the attribute' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Value' ,
name : 'value' ,
type : 'number' ,
default : 0 ,
2022-05-06 14:01:25 -07:00
description : 'Number value of the attribute' ,
2021-04-02 10:48:22 -07:00
} ,
] ,
} ,
{
name : 'string' ,
displayName : 'String' ,
values : [
{
displayName : 'Name' ,
name : 'name' ,
type : 'string' ,
default : '' ,
2022-05-06 14:01:25 -07:00
description : 'Name of the attribute' ,
2021-04-02 10:48:22 -07:00
} ,
{
displayName : 'Value' ,
name : 'value' ,
type : 'string' ,
default : '' ,
2022-05-06 14:01:25 -07:00
description : 'String value of attribute' ,
2021-04-02 10:48:22 -07:00
} ,
] ,
} ,
] ,
} ,
{
displayName : 'Message Deduplication ID' ,
name : 'messageDeduplicationId' ,
type : 'string' ,
default : '' ,
description : 'Token used for deduplication of sent messages. Applies only to FIFO (first-in-first-out) queues.' ,
displayOptions : {
show : {
'/queueType' : [
'fifo' ,
] ,
} ,
} ,
} ,
] ,
} ,
] ,
} ;
methods = {
loadOptions : {
// Get all the available queues to display them to user so that it can be selected easily
async getQueues ( this : ILoadOptionsFunctions ) : Promise < INodePropertyOptions [ ] > {
2021-04-17 00:59:11 -07:00
const params = [
'Version=2012-11-05' ,
` Action=ListQueues ` ,
] ;
2021-04-02 10:48:22 -07:00
let data ;
try {
// loads first 1000 queues from SQS
2021-04-17 00:59:11 -07:00
data = await awsApiRequestSOAP . call ( this , 'sqs' , 'GET' , ` ? ${ params . join ( '&' ) } ` ) ;
2021-04-16 09:33:36 -07:00
} catch ( error ) {
throw new NodeApiError ( this . getNode ( ) , error ) ;
2021-04-02 10:48:22 -07:00
}
let queues = data . ListQueuesResponse . ListQueuesResult . QueueUrl ;
if ( ! queues ) {
return [ ] ;
}
if ( ! Array . isArray ( queues ) ) {
// If user has only a single queue no array get returned so we make
// one manually to be able to process everything identically
queues = [ queues ] ;
}
return queues . map ( ( queueUrl : string ) = > {
const urlParts = queueUrl . split ( '/' ) ;
const name = urlParts [ urlParts . length - 1 ] ;
return {
name ,
value : queueUrl ,
} ;
} ) ;
} ,
} ,
} ;
async execute ( this : IExecuteFunctions ) : Promise < INodeExecutionData [ ] [ ] > {
const items = this . getInputData ( ) ;
const returnData : IDataObject [ ] = [ ] ;
const operation = this . getNodeParameter ( 'operation' , 0 ) as string ;
for ( let i = 0 ; i < items . length ; i ++ ) {
2021-07-19 23:58:54 -07:00
try {
const queueUrl = this . getNodeParameter ( 'queue' , i ) as string ;
const queuePath = new URL ( queueUrl ) . pathname ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
const params = [
'Version=2012-11-05' ,
` Action= ${ pascalCase ( operation ) } ` ,
] ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
const options = this . getNodeParameter ( 'options' , i , { } ) as IDataObject ;
const sendInputData = this . getNodeParameter ( 'sendInputData' , i ) as boolean ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
const message = sendInputData ? JSON . stringify ( items [ i ] . json ) : this . getNodeParameter ( 'message' , i ) as string ;
params . push ( ` MessageBody= ${ message } ` ) ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
if ( options . delaySeconds ) {
params . push ( ` DelaySeconds= ${ options . delaySeconds } ` ) ;
2021-04-02 10:48:22 -07:00
}
2021-07-19 23:58:54 -07:00
const queueType = this . getNodeParameter ( 'queueType' , i , { } ) as string ;
if ( queueType === 'fifo' ) {
const messageDeduplicationId = this . getNodeParameter ( 'options.messageDeduplicationId' , i , '' ) as string ;
if ( messageDeduplicationId ) {
params . push ( ` MessageDeduplicationId= ${ messageDeduplicationId } ` ) ;
}
const messageGroupId = this . getNodeParameter ( 'messageGroupId' , i ) as string ;
if ( messageGroupId ) {
params . push ( ` MessageGroupId= ${ messageGroupId } ` ) ;
}
2021-04-02 10:48:22 -07:00
}
2021-07-19 23:58:54 -07:00
let attributeCount = 0 ;
// Add string values
( this . getNodeParameter ( 'options.messageAttributes.string' , i , [ ] ) as INodeParameters [ ] ) . forEach ( ( attribute ) = > {
attributeCount ++ ;
params . push ( ` MessageAttribute. ${ attributeCount } .Name= ${ attribute . name } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.StringValue= ${ attribute . value } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.DataType=String ` ) ;
} ) ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
// Add binary values
( this . getNodeParameter ( 'options.messageAttributes.binary' , i , [ ] ) as INodeParameters [ ] ) . forEach ( ( attribute ) = > {
attributeCount ++ ;
const dataPropertyName = attribute . dataPropertyName as string ;
const item = items [ i ] ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
if ( item . binary === undefined ) {
2022-07-12 08:51:01 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'No binary data set. So message attribute cannot be added!' , { itemIndex : i } ) ;
2021-07-19 23:58:54 -07:00
}
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
if ( item . binary [ dataPropertyName ] === undefined ) {
2022-07-12 08:51:01 -07:00
throw new NodeOperationError ( this . getNode ( ) , ` The binary property " ${ dataPropertyName } " does not exist. So message attribute cannot be added! ` , { itemIndex : i } ) ;
2021-07-19 23:58:54 -07:00
}
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
const binaryData = item . binary [ dataPropertyName ] . data ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
params . push ( ` MessageAttribute. ${ attributeCount } .Name= ${ attribute . name } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.BinaryValue= ${ binaryData } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.DataType=Binary ` ) ;
} ) ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
// Add number values
( this . getNodeParameter ( 'options.messageAttributes.number' , i , [ ] ) as INodeParameters [ ] ) . forEach ( ( attribute ) = > {
attributeCount ++ ;
params . push ( ` MessageAttribute. ${ attributeCount } .Name= ${ attribute . name } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.StringValue= ${ attribute . value } ` ) ;
params . push ( ` MessageAttribute. ${ attributeCount } .Value.DataType=Number ` ) ;
} ) ;
2021-04-02 10:48:22 -07:00
2021-07-19 23:58:54 -07:00
let responseData ;
try {
responseData = await awsApiRequestSOAP . call ( this , 'sqs' , 'GET' , ` ${ queuePath } ? ${ params . join ( '&' ) } ` ) ;
} catch ( error ) {
throw new NodeApiError ( this . getNode ( ) , error ) ;
}
const result = responseData . SendMessageResponse . SendMessageResult ;
returnData . push ( result as IDataObject ) ;
2021-04-16 09:33:36 -07:00
} catch ( error ) {
2021-07-19 23:58:54 -07:00
if ( this . continueOnFail ( ) ) {
returnData . push ( { error : error.description } ) ;
continue ;
}
throw error ;
2021-04-02 10:48:22 -07:00
}
}
return [ this . helpers . returnJsonArray ( returnData ) ] ;
}
}