mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-22 18:11:29 -08:00
fix(AMQP Sender Node): Node hangs forever on disconnect (#10026)
This commit is contained in:
parent
f78f4ea349
commit
27410ab2af
|
@ -12,6 +12,7 @@ export class Amqp implements ICredentialType {
|
|||
displayName: 'Hostname',
|
||||
name: 'hostname',
|
||||
type: 'string',
|
||||
placeholder: 'e.g. localhost',
|
||||
default: '',
|
||||
},
|
||||
{
|
||||
|
@ -24,12 +25,14 @@ export class Amqp implements ICredentialType {
|
|||
displayName: 'User',
|
||||
name: 'username',
|
||||
type: 'string',
|
||||
placeholder: 'e.g. guest',
|
||||
default: '',
|
||||
},
|
||||
{
|
||||
displayName: 'Password',
|
||||
name: 'password',
|
||||
type: 'string',
|
||||
placeholder: 'e.g. guest',
|
||||
typeOptions: {
|
||||
password: true,
|
||||
},
|
||||
|
@ -39,8 +42,9 @@ export class Amqp implements ICredentialType {
|
|||
displayName: 'Transport Type',
|
||||
name: 'transportType',
|
||||
type: 'string',
|
||||
placeholder: 'e.g. tcp',
|
||||
default: '',
|
||||
description: 'Optional Transport Type to use. Either tcp or tls.',
|
||||
hint: 'Optional transport type to use, either tcp or tls',
|
||||
},
|
||||
];
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import type { Connection, ContainerOptions, Dictionary, EventContext } from 'rhea';
|
||||
import type { Connection, ContainerOptions, Dictionary, EventContext, Sender } from 'rhea';
|
||||
import { create_container } from 'rhea';
|
||||
|
||||
import type {
|
||||
|
@ -14,6 +14,46 @@ import type {
|
|||
} from 'n8n-workflow';
|
||||
import { NodeOperationError } from 'n8n-workflow';
|
||||
|
||||
async function checkIfCredentialsValid(
|
||||
credentials: IDataObject,
|
||||
): Promise<INodeCredentialTestResult> {
|
||||
const connectOptions: ContainerOptions = {
|
||||
reconnect: false,
|
||||
host: credentials.hostname as string,
|
||||
hostname: credentials.hostname as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username ? (credentials.username as string) : undefined,
|
||||
password: credentials.password ? (credentials.password as string) : undefined,
|
||||
transport: credentials.transportType ? (credentials.transportType as string) : undefined,
|
||||
};
|
||||
|
||||
let conn: Connection | undefined = undefined;
|
||||
try {
|
||||
const container = create_container();
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
container.on('connection_open', function (_context: EventContext) {
|
||||
resolve();
|
||||
});
|
||||
container.on('disconnected', function (context: EventContext) {
|
||||
reject(context.error ?? new Error('unknown error'));
|
||||
});
|
||||
conn = container.connect(connectOptions);
|
||||
});
|
||||
} catch (error) {
|
||||
return {
|
||||
status: 'Error',
|
||||
message: (error as Error).message,
|
||||
};
|
||||
} finally {
|
||||
if (conn) (conn as Connection).close();
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'OK',
|
||||
message: 'Connection successful!',
|
||||
};
|
||||
}
|
||||
|
||||
export class Amqp implements INodeType {
|
||||
description: INodeTypeDescription = {
|
||||
displayName: 'AMQP Sender',
|
||||
|
@ -40,7 +80,7 @@ export class Amqp implements INodeType {
|
|||
name: 'sink',
|
||||
type: 'string',
|
||||
default: '',
|
||||
placeholder: 'topic://sourcename.something',
|
||||
placeholder: 'e.g. topic://sourcename.something',
|
||||
description: 'Name of the queue of topic to publish to',
|
||||
},
|
||||
// Header Parameters
|
||||
|
@ -106,49 +146,27 @@ export class Amqp implements INodeType {
|
|||
credential: ICredentialsDecrypted,
|
||||
): Promise<INodeCredentialTestResult> {
|
||||
const credentials = credential.data as ICredentialDataDecryptedObject;
|
||||
const connectOptions: ContainerOptions = {
|
||||
reconnect: false,
|
||||
host: credentials.hostname as string,
|
||||
hostname: credentials.hostname as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username ? (credentials.username as string) : undefined,
|
||||
password: credentials.password ? (credentials.password as string) : undefined,
|
||||
transport: credentials.transportType ? (credentials.transportType as string) : undefined,
|
||||
};
|
||||
|
||||
let conn: Connection | undefined = undefined;
|
||||
try {
|
||||
const container = create_container();
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
container.on('connection_open', function (_contex: EventContext) {
|
||||
resolve();
|
||||
});
|
||||
container.on('disconnected', function (context: EventContext) {
|
||||
reject(context.error ?? new Error('unknown error'));
|
||||
});
|
||||
conn = container.connect(connectOptions);
|
||||
});
|
||||
} catch (error) {
|
||||
return {
|
||||
status: 'Error',
|
||||
message: (error as Error).message,
|
||||
};
|
||||
} finally {
|
||||
if (conn) (conn as Connection).close();
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'OK',
|
||||
message: 'Connection successful!',
|
||||
};
|
||||
return await checkIfCredentialsValid(credentials);
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
|
||||
const container = create_container();
|
||||
let connection: Connection | undefined = undefined;
|
||||
let sender: Sender | undefined = undefined;
|
||||
|
||||
try {
|
||||
const credentials = await this.getCredentials('amqp');
|
||||
|
||||
// check if credentials are valid to avoid unnecessary reconnects
|
||||
const credentialsTestResult = await checkIfCredentialsValid(credentials);
|
||||
if (credentialsTestResult.status === 'Error') {
|
||||
throw new NodeOperationError(this.getNode(), credentialsTestResult.message, {
|
||||
description: 'Check your credentials and try again',
|
||||
});
|
||||
}
|
||||
|
||||
const sink = this.getNodeParameter('sink', 0, '') as string;
|
||||
const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as
|
||||
| string
|
||||
|
@ -169,30 +187,50 @@ export class Amqp implements INodeType {
|
|||
throw new NodeOperationError(this.getNode(), 'Queue or Topic required!');
|
||||
}
|
||||
|
||||
const container = create_container();
|
||||
|
||||
/*
|
||||
Values are documentet here: https://github.com/amqp/rhea#container
|
||||
Values are documented here: https://github.com/amqp/rhea#container
|
||||
*/
|
||||
const connectOptions: ContainerOptions = {
|
||||
host: credentials.hostname,
|
||||
hostname: credentials.hostname,
|
||||
port: credentials.port,
|
||||
reconnect: containerReconnect,
|
||||
reconnect_limit: containerReconnectLimit,
|
||||
username: credentials.username ? credentials.username : undefined,
|
||||
password: credentials.password ? credentials.password : undefined,
|
||||
transport: credentials.transportType ? credentials.transportType : undefined,
|
||||
container_id: containerId ? containerId : undefined,
|
||||
id: containerId ? containerId : undefined,
|
||||
reconnect: containerReconnect,
|
||||
reconnect_limit: containerReconnectLimit,
|
||||
};
|
||||
const conn = container.connect(connectOptions);
|
||||
|
||||
const sender = conn.open_sender(sink);
|
||||
const node = this.getNode();
|
||||
|
||||
const responseData: INodeExecutionData[] = await new Promise((resolve, reject) => {
|
||||
connection = container.connect(connectOptions);
|
||||
sender = connection.open_sender(sink);
|
||||
let limit = containerReconnectLimit;
|
||||
|
||||
container.on('disconnected', function (context: EventContext) {
|
||||
//handling this manually as container, despite reconnect_limit, does reconnect on disconnect
|
||||
if (limit <= 0) {
|
||||
connection!.options.reconnect = false;
|
||||
const error = new NodeOperationError(
|
||||
node,
|
||||
((context.error as Error) ?? {}).message ?? 'Disconnected',
|
||||
{
|
||||
description: `Check your credentials${options.reconnect ? '' : ', and consider enabling reconnect in the options'}`,
|
||||
itemIndex: 0,
|
||||
},
|
||||
);
|
||||
|
||||
reject(error);
|
||||
}
|
||||
|
||||
limit--;
|
||||
});
|
||||
|
||||
const responseData: IDataObject[] = await new Promise((resolve) => {
|
||||
container.once('sendable', (context: EventContext) => {
|
||||
const returnData = [];
|
||||
const returnData: INodeExecutionData[] = [];
|
||||
|
||||
const items = this.getInputData();
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
|
@ -214,23 +252,23 @@ export class Amqp implements INodeType {
|
|||
body,
|
||||
});
|
||||
|
||||
returnData.push({ id: result?.id });
|
||||
returnData.push({ json: { id: result?.id }, pairedItems: { item: i } });
|
||||
}
|
||||
|
||||
resolve(returnData);
|
||||
});
|
||||
});
|
||||
|
||||
sender.close();
|
||||
conn.close();
|
||||
|
||||
return [this.helpers.returnJsonArray(responseData)];
|
||||
return [responseData];
|
||||
} catch (error) {
|
||||
if (this.continueOnFail(error)) {
|
||||
return [this.helpers.returnJsonArray({ error: error.message })];
|
||||
return [[{ json: { error: error.message }, pairedItems: { item: 0 } }]];
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
} finally {
|
||||
if (sender) (sender as Sender).close();
|
||||
if (connection) (connection as Connection).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue