From ab6cc43a4c1681c062abfe8f8e8105a669c25317 Mon Sep 17 00:00:00 2001 From: Jan Oberhauser Date: Sat, 2 Nov 2019 12:16:20 +0100 Subject: [PATCH] :shirt: Fix lint issues --- .../credentials/Amqp.credentials.ts | 3 -- packages/nodes-base/nodes/Amqp/Amqp.node.ts | 50 +++++++------------ .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 41 +++++++-------- 3 files changed, 36 insertions(+), 58 deletions(-) diff --git a/packages/nodes-base/credentials/Amqp.credentials.ts b/packages/nodes-base/credentials/Amqp.credentials.ts index 640dedb18..ff6b23ed1 100644 --- a/packages/nodes-base/credentials/Amqp.credentials.ts +++ b/packages/nodes-base/credentials/Amqp.credentials.ts @@ -8,9 +8,6 @@ export class Amqp implements ICredentialType { name = 'amqp'; displayName = 'AMQP'; properties = [ - // The credentials to get from user and save encrypted. - // Properties can be defined exactly in the same way - // as node properties. { displayName: 'Hostname', name: 'hostname', diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 9dcb3ca5e..66ed1aa25 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,11 +1,11 @@ +import { ContainerOptions, Delivery } from 'rhea'; + import { IExecuteSingleFunctions } from 'n8n-core'; import { - IDataObject, INodeExecutionData, INodeType, INodeTypeDescription, } from 'n8n-workflow'; -import { Delivery } from 'rhea'; export class Amqp implements INodeType { description: INodeTypeDescription = { @@ -26,23 +26,6 @@ export class Amqp implements INodeType { required: true, }], properties: [ - { - displayName: 'Host', - name: 'hostname', - type: 'string', - default: 'localhost', - description: 'hostname of the amqp server', - }, - { - displayName: 'Port', - name: 'port', - type: 'number', - typeOptions: { - minValue: 1, - }, - default: 5672, - description: 'TCP Port to connect to', - }, { displayName: 'Queue / Topic', name: 'sink', @@ -71,46 +54,47 @@ export class Amqp implements INodeType { } const sink = this.getNodeParameter('sink', '') as string; - let applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object; + const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object; let headerProperties = applicationProperties; - if(typeof applicationProperties === 'string' && applicationProperties != '') { - headerProperties = JSON.parse(applicationProperties) + if(typeof applicationProperties === 'string' && applicationProperties !== '') { + headerProperties = JSON.parse(applicationProperties); } - if (sink == '') { + if (sink === '') { throw new Error('Queue or Topic required!'); } - let container = require('rhea'); + const container = require('rhea'); - let connectOptions = { + const connectOptions: ContainerOptions = { host: credentials.hostname, port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm - } + }; if (credentials.username || credentials.password) { container.options.username = credentials.username; container.options.password = credentials.password; } - let allSent = new Promise( function( resolve ) { - container.on('sendable', function (context: any) { + const allSent = new Promise(( resolve ) => { + container.on('sendable', (context: any) => { // tslint:disable-line:no-any - let message = { + const message = { application_properties: headerProperties, body: JSON.stringify(item) - } - let sendResult = context.sender.send(message); + }; + + const sendResult = context.sender.send(message); resolve(sendResult); }); }); - + container.connect(connectOptions).open_sender(sink); - let sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned + const sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned return { json: { id: sendResult.id } } as INodeExecutionData; } diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index c65f9996e..91a4eff55 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,9 +1,10 @@ +import { ContainerOptions } from 'rhea'; + import { ITriggerFunctions } from 'n8n-core'; import { INodeType, INodeTypeDescription, ITriggerResponse, - } from 'n8n-workflow'; @@ -67,40 +68,40 @@ export class AmqpTrigger implements INodeType { const clientname = this.getNodeParameter('clientname', '') as string; const subscription = this.getNodeParameter('subscription', '') as string; - if (sink == '') { + if (sink === '') { throw new Error('Queue or Topic required!'); } - let durable: boolean = false; + let durable = false; if(subscription && clientname) { durable = true; } - let container = require('rhea'); - let connectOptions = { + const container = require('rhea'); + const connectOptions: ContainerOptions = { host: credentials.hostname, port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm container_id: (durable ? clientname : null) - } + }; if (credentials.username || credentials.password) { container.options.username = credentials.username; container.options.password = credentials.password; } - let lastMsgId: any = undefined; - let self = this; + let lastMsgId: number | undefined = undefined; + const self = this; - container.on('message', function (context: any) { - if (context.message.message_id && context.message.message_id == lastMsgId) { + container.on('message', (context: any) => { // tslint:disable-line:no-any + if (context.message.message_id && context.message.message_id === lastMsgId) { // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code lastMsgId = context.message.message_id; return; } self.emit([self.helpers.returnJsonArray([context.message])]); }); - - let connection = container.connect(connectOptions); + + const connection = container.connect(connectOptions); let clientOptions = undefined; if (durable) { clientOptions = { @@ -111,14 +112,14 @@ export class AmqpTrigger implements INodeType { expiry_policy: 'never' }, credit_window: 1 // prefetch 1 - } + }; } else { clientOptions = { source: { address: sink, }, credit_window: 1 // prefetch 1 - } + }; } connection.open_receiver(clientOptions); @@ -135,15 +136,11 @@ export class AmqpTrigger implements INodeType { // for AMQP it doesn't make much sense to wait here but // for a new user who doesn't know how this works, it's better to wait and show a respective info message async function manualTriggerFunction() { - - await new Promise( function( resolve ) { - let timeoutHandler = setTimeout(function() { - self.emit([self.helpers.returnJsonArray([{ - error: 'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' - }])]); - resolve(true); + await new Promise(( resolve, reject ) => { + const timeoutHandler = setTimeout(() => { + reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); }, 30000); - container.on('message', function (context: any) { + container.on('message', (context: any) => { // tslint:disable-line:no-any clearTimeout(timeoutHandler); resolve(true); });