From da5fddad5a9ec6284308fd597965de8100eb2ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther?= Date: Wed, 11 Nov 2020 08:44:59 +0100 Subject: [PATCH] :zap: Amqp - Message Throttling - Close Connection on Send (#1127) * Azuer Service Bus * Message throttling * remove the Events the WF is desabled * close connections after send * sendable once --- packages/nodes-base/nodes/Amqp/Amqp.node.ts | 12 +++-- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 47 ++++++++++++++++--- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 6cbdcf4b4..50f6f20a9 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -66,7 +66,7 @@ export class Amqp implements INodeType { }, ], }, - ], + ] }; async executeSingle(this: IExecuteSingleFunctions): Promise { @@ -110,7 +110,7 @@ export class Amqp implements INodeType { } const allSent = new Promise(( resolve ) => { - container.on('sendable', (context: any) => { // tslint:disable-line:no-any + container.once('sendable', (context: any) => { // tslint:disable-line:no-any let body: IDataObject | string = item.json; const sendOnlyProperty = options.sendOnlyProperty as string; @@ -125,7 +125,7 @@ export class Amqp implements INodeType { const message = { application_properties: headerProperties, - body, + body }; const sendResult = context.sender.send(message); @@ -134,10 +134,14 @@ export class Amqp implements INodeType { }); }); - container.connect(connectOptions).open_sender(sink); + const conn = container.connect(connectOptions); + const sender = conn.open_sender(sink); const sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned + sender.close(); + conn.close(); + return { json: { id: sendResult.id } } as INodeExecutionData; } } diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 1b1e679f1..d63d785c2 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -55,6 +55,20 @@ export class AmqpTrigger implements INodeType { description: 'Leave empty for non-durable topic subscriptions or queues', }, { + displayName: 'Pull N Messages per Cicle', + name: 'pullMessagesNumber', + type: 'number', + default: 100, + description: 'Number of messages to pull from the bus for every cicle', + }, + { + displayName: 'Sleep time after cicle', + name: 'sleepTime', + type: 'number', + default: 10, + description: 'Milliseconds to sleep after every cicle', + }, + { displayName: 'Options', name: 'options', type: 'collection', @@ -84,7 +98,7 @@ export class AmqpTrigger implements INodeType { }, ], }, - ], + ] }; @@ -99,6 +113,8 @@ export class AmqpTrigger implements INodeType { const clientname = this.getNodeParameter('clientname', '') as string; const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; + const pullMessagesNumber = this.getNodeParameter('pullMessagesNumber', {}) as number; + const sleepTime = this.getNodeParameter('sleepTime', {}) as number; if (sink === '') { throw new Error('Queue or Topic required!'); @@ -117,7 +133,7 @@ export class AmqpTrigger implements INodeType { port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm - container_id: (durable ? clientname : null), + container_id: (durable ? clientname : null) }; if (credentials.username || credentials.password) { // Old rhea implementation. not shure if it is neccessary @@ -131,9 +147,15 @@ export class AmqpTrigger implements INodeType { } + let lastMsgId: number | undefined = undefined; const self = this; + container.on('receiver_open', function (context: any) { + console.log("Connection opened"); + context.receiver.add_credit(pullMessagesNumber); + }); + container.on('message', (context: any) => { // tslint:disable-line:no-any // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code if (context.message.message_id && context.message.message_id === lastMsgId) { @@ -142,6 +164,12 @@ export class AmqpTrigger implements INodeType { lastMsgId = context.message.message_id; let data = context.message; + + if(options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { + // The buffer is not ready... Stringify and parse back to load it. + let cont = JSON.stringify(data.body.content); + data.body = String.fromCharCode.apply(null,JSON.parse(cont).data); + } if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { // The buffer is not ready... Stringify and parse back to load it. @@ -158,6 +186,9 @@ export class AmqpTrigger implements INodeType { self.emit([self.helpers.returnJsonArray([data])]); + + if(context.receiver.credit ==0) + setTimeout(function(){ context.receiver.add_credit(pullMessagesNumber); }, sleepTime || 0); }); const connection = container.connect(connectOptions); @@ -168,16 +199,16 @@ export class AmqpTrigger implements INodeType { source: { address: sink, durable: 2, - expiry_policy: 'never', + expiry_policy: 'never' }, - credit_window: 1, // prefetch 1 + credit_window: 0 // prefetch 1 }; } else { clientOptions = { source: { address: sink, }, - credit_window: 1, // prefetch 1 + credit_window: 0 // prefetch 1 }; } connection.open_receiver(clientOptions); @@ -186,9 +217,11 @@ export class AmqpTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. async function closeFunction() { + container.removeAllListeners("receiver_open"); + container.removeAllListeners("message"); connection.close(); } - + // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the // workflow manually. @@ -198,7 +231,7 @@ export class AmqpTrigger implements INodeType { await new Promise((resolve, reject) => { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); - }, 30000); + }, 3000); container.on('message', (context: any) => { // tslint:disable-line:no-any // Check if the only property present in the message is body // in which case we only emit the content of the body property