diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index ab7f2e21b..14637d44a 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,5 +1,9 @@ -import { ContainerOptions, Dictionary, EventContext } from 'rhea'; -import rhea = require('rhea'); +import { + create_container, + ContainerOptions, + Dictionary, + EventContext, +} from 'rhea'; import { IExecuteFunctions } from 'n8n-core'; import { @@ -51,6 +55,13 @@ export class Amqp implements INodeType { placeholder: 'Add Option', default: {}, options: [ + { + displayName: 'Container ID', + name: 'containerId', + type: 'string', + default: '', + description: 'Will be used to pass to the RHEA Backend as container_id', + }, { displayName: 'Data as Object', name: 'dataAsObject', @@ -58,6 +69,20 @@ export class Amqp implements INodeType { default: false, description: 'Send the data as an object.', }, + { + displayName: 'Reconnect', + name: 'reconnect', + type: 'boolean', + default: true, + description: 'Automatically reconnect if disconnected', + }, + { + displayName: 'Reconnect Limit', + name: 'reconnectLimit', + type: 'number', + default: 50, + description: 'Maximum number of reconnect attempts', + }, { displayName: 'Send property', name: 'sendOnlyProperty', @@ -65,33 +90,12 @@ export class Amqp implements INodeType { default: '', description: 'The only property to send. If empty the whole item will be sent.', }, - { - displayName: 'Container ID', - name: 'containerID', - type: 'string', - default: '', - description: 'Will be used to pass to the RHEA Backend as container_id', - }, - { - displayName: 'Reconnect', - name: 'reconnect', - type: 'boolean', - default: true, - description: 'If on, the library will automatically attempt to reconnect if disconnected', - }, - { - displayName: 'Reconnect limit', - name: 'reconnectLimit', - type: 'number', - default: 50, - description: 'maximum number of reconnect attempts', - }, ], }, ], }; - async execute(this: IExecuteFunctions): Promise < INodeExecutionData[][] > { + async execute(this: IExecuteFunctions): Promise { const credentials = this.getCredentials('amqp'); if (!credentials) { throw new Error('Credentials are mandatory!'); @@ -100,22 +104,22 @@ export class Amqp implements INodeType { const sink = this.getNodeParameter('sink', 0, '') as string; const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as string | object; const options = this.getNodeParameter('options', 0, {}) as IDataObject; - const container_id = options.containerID as string; - const containerReconnect = options.reconnect as boolean || true ; + const containerId = options.containerId as string; + const containerReconnect = options.reconnect as boolean || true; const containerReconnectLimit = options.reconnectLimit as number || 50; - let headerProperties : Dictionary; + let headerProperties: Dictionary; // tslint:disable-line:no-any if (typeof applicationProperties === 'string' && applicationProperties !== '') { headerProperties = JSON.parse(applicationProperties); } else { headerProperties = applicationProperties as object; - } + } if (sink === '') { throw new Error('Queue or Topic required!'); } - const container = rhea.create_container(); + const container = create_container(); /* Values are documentet here: https://github.com/amqp/rhea#container @@ -124,20 +128,20 @@ export class Amqp implements INodeType { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: containerReconnect, + reconnect: containerReconnect, reconnect_limit: containerReconnectLimit, username: credentials.username ? credentials.username : undefined, password: credentials.password ? credentials.password : undefined, transport: credentials.transportType ? credentials.transportType : undefined, - container_id: container_id ? container_id : undefined, - id: container_id ? container_id : undefined, + container_id: containerId ? containerId : undefined, + id: containerId ? containerId : undefined, }; const conn = container.connect(connectOptions); const sender = conn.open_sender(sink); const responseData: IDataObject[] = await new Promise((resolve) => { - container.once('sendable', (context: EventContext) => { + container.once('sendable', (context: EventContext) => { const returnData = []; const items = this.getInputData(); diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 3b3bf8a55..cd9510a9a 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,5 +1,10 @@ -import { ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; -import rhea = require("rhea"); +import { + create_container, + ContainerOptions, + EventContext, + Message, + ReceiverOptions, +} from 'rhea'; import { ITriggerFunctions } from 'n8n-core'; import { @@ -56,26 +61,19 @@ export class AmqpTrigger implements INodeType { description: 'Leave empty for non-durable topic subscriptions or queues', }, { - displayName: 'Pull N Messages per Cicle', - name: 'pullMessagesNumber', - type: 'number', - default: 100, - description: 'Number of messages to pull from the bus for every cicle', - }, - { - displayName: 'Sleep time after cicle', - name: 'sleepTime', - type: 'number', - default: 10, - description: 'Milliseconds to sleep after every cicle', - }, - { displayName: 'Options', name: 'options', type: 'collection', placeholder: 'Add Option', default: {}, options: [ + { + displayName: 'Container ID', + name: 'containerId', + type: 'string', + default: '', + description: 'Will be used to pass to the RHEA Backend as container_id', + }, { displayName: 'Convert Body To String', name: 'jsonConvertByteArrayToString', @@ -90,6 +88,13 @@ export class AmqpTrigger implements INodeType { default: false, description: 'Parse the body to an object.', }, + { + displayName: 'Messages per Cicle', + name: 'pullMessagesNumber', + type: 'number', + default: 100, + description: 'Number of messages to pull from the bus for every cicle', + }, { displayName: 'Only Body', name: 'onlyBody', @@ -98,11 +103,18 @@ export class AmqpTrigger implements INodeType { description: 'Returns only the body property.', }, { - displayName: 'Messages per Cicle', - name: 'pullMessagesNumber', + displayName: 'Reconnect', + name: 'reconnect', + type: 'boolean', + default: true, + description: 'Automatically reconnect if disconnected', + }, + { + displayName: 'Reconnect Limit', + name: 'reconnectLimit', type: 'number', - default: 100, - description: 'Number of messages to pull from the bus for every cicle', + default: 50, + description: 'Maximum number of reconnect attempts', }, { displayName: 'Sleep Time', @@ -111,30 +123,9 @@ export class AmqpTrigger implements INodeType { default: 10, description: 'Milliseconds to sleep after every cicle.', }, - { - displayName: 'Container ID', - name: 'containerID', - type: 'string', - default: '', - description: 'Will be used to pass to the RHEA Backend as container_id', - }, - { - displayName: 'Reconnect', - name: 'reconnect', - type: 'boolean', - default: true, - description: 'If on, the library will automatically attempt to reconnect if disconnected', - }, - { - displayName: 'Reconnect limit', - name: 'reconnectLimit', - type: 'number', - default: 50, - description: 'maximum number of reconnect attempts', - }, ], }, - ] + ], }; @@ -150,8 +141,8 @@ export class AmqpTrigger implements INodeType { const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; const pullMessagesNumber = options.pullMessagesNumber as number || 100; - const container_id = options.containerID as string; - const containerReconnect = options.reconnect as boolean || true ; + const containerId = options.containerId as string; + const containerReconnect = options.reconnect as boolean || true; const containerReconnectLimit = options.reconnectLimit as number || 50; if (sink === '') { @@ -164,20 +155,21 @@ export class AmqpTrigger implements INodeType { durable = true; } - const container = rhea.create_container(); + const container = create_container(); - let lastMsgId: string| number | Buffer | undefined = undefined; + let lastMsgId: string | number | Buffer | undefined = undefined; const self = this; - container.on('receiver_open', (context: EventContext) => { + container.on('receiver_open', (context: EventContext) => { context.receiver?.add_credit(pullMessagesNumber); }); - container.on('message', (context: EventContext) => { + container.on('message', (context: EventContext) => { // No message in the context - if(!context.message) - return; + if (!context.message) { + return; + } // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code if (context.message.message_id && context.message.message_id === lastMsgId) { @@ -186,11 +178,11 @@ export class AmqpTrigger implements INodeType { lastMsgId = context.message.message_id; let data = context.message; - - if(options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { + + if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { // The buffer is not ready... Stringify and parse back to load it. - let cont = JSON.stringify(data.body.content); - data.body = String.fromCharCode.apply(null,JSON.parse(cont).data); + const cont = JSON.stringify(data.body.content); + data.body = String.fromCharCode.apply(null, JSON.parse(cont).data); } if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { @@ -213,7 +205,7 @@ export class AmqpTrigger implements INodeType { } - self.emit([self.helpers.returnJsonArray([data as any])]); + self.emit([self.helpers.returnJsonArray([data as any])]); // tslint:disable-line:no-any if (!context.receiver?.has_credit()) { setTimeout(() => { @@ -229,17 +221,17 @@ export class AmqpTrigger implements INodeType { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: containerReconnect, + reconnect: containerReconnect, reconnect_limit: containerReconnectLimit, username: credentials.username ? credentials.username : undefined, password: credentials.password ? credentials.password : undefined, transport: credentials.transportType ? credentials.transportType : undefined, - container_id: container_id ? container_id : undefined, - id: container_id ? container_id : undefined, + container_id: containerId ? containerId : undefined, + id: containerId ? containerId : undefined, }; const connection = container.connect(connectOptions); - let clientOptions : ReceiverOptions = { + const clientOptions: ReceiverOptions = { name: subscription ? subscription : undefined, source: { address: sink, @@ -258,7 +250,7 @@ export class AmqpTrigger implements INodeType { container.removeAllListeners('message'); connection.close(); } - + // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the // workflow manually. @@ -268,8 +260,8 @@ export class AmqpTrigger implements INodeType { await new Promise((resolve, reject) => { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); - }, 3000); - container.on('message', (context: EventContext) => { + }, 30000); + container.on('message', (context: EventContext) => { // Check if the only property present in the message is body // in which case we only emit the content of the body property // otherwise we emit all properties and their content @@ -277,7 +269,7 @@ export class AmqpTrigger implements INodeType { if (Object.keys(message)[0] === 'body' && Object.keys(message).length === 1) { self.emit([self.helpers.returnJsonArray([message.body])]); } else { - self.emit([self.helpers.returnJsonArray([message as any])]); + self.emit([self.helpers.returnJsonArray([message as any])]); // tslint:disable-line:no-any } clearTimeout(timeoutHandler); resolve(true);