diff --git a/packages/nodes-base/credentials/Mqtt.credentials.ts b/packages/nodes-base/credentials/Mqtt.credentials.ts index 05d4b0ec5..a474e0501 100644 --- a/packages/nodes-base/credentials/Mqtt.credentials.ts +++ b/packages/nodes-base/credentials/Mqtt.credentials.ts @@ -3,15 +3,11 @@ import { NodePropertyTypes, } from 'n8n-workflow'; - export class Mqtt implements ICredentialType { name = 'mqtt'; displayName = 'MQTT'; documentationUrl = 'mqtt'; properties = [ - // The credentials to get from user and save encrypted. - // Properties can be defined exactly in the same way - // as node properties. { displayName: 'Protocol', name: 'protocol', @@ -55,5 +51,19 @@ export class Mqtt implements ICredentialType { }, default: '', }, + { + displayName: 'Clean Session', + name: 'clean', + type: 'boolean' as NodePropertyTypes, + default: true, + description: `Set to false to receive QoS 1 and 2 messages while offline.`, + }, + { + displayName: 'Client ID', + name: 'clientId', + type: 'string' as NodePropertyTypes, + default: '', + description: 'Client ID. If left empty, one is autogenrated for you', + }, ]; } diff --git a/packages/nodes-base/nodes/MQTT/Mqtt.node.json b/packages/nodes-base/nodes/MQTT/Mqtt.node.json new file mode 100644 index 000000000..595ac2b74 --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/Mqtt.node.json @@ -0,0 +1,21 @@ +{ + "node": "n8n-nodes-base.mqtt", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": [ + "Communication", + "Development" + ], + "resources": { + "credentialDocumentation": [ + { + "url": "https://docs.n8n.io/credentials/mqtt" + } + ], + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/nodes/n8n-nodes-base.mqtt/" + } + ] + } +} \ No newline at end of file diff --git a/packages/nodes-base/nodes/MQTT/Mqtt.node.ts b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts new file mode 100644 index 000000000..fc71fe622 --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts @@ -0,0 +1,171 @@ +import { + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +import * as mqtt from 'mqtt'; + +import { + IClientOptions, +} from 'mqtt'; + +export class Mqtt implements INodeType { + description: INodeTypeDescription = { + displayName: 'MQTT', + name: 'mqtt', + icon: 'file:mqtt.svg', + group: ['input'], + version: 1, + description: 'Push messages to MQTT', + defaults: { + name: 'MQTT', + color: '#9b27af', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'mqtt', + required: true, + }, + ], + properties: [ + { + displayName: 'Topic', + name: 'topic', + type: 'string', + required: true, + default: '', + description: `The topic to publish to`, + }, + { + displayName: 'Send Input Data', + name: 'sendInputData', + type: 'boolean', + default: true, + description: 'Send the the data the node receives as JSON.', + }, + { + displayName: 'Message', + name: 'message', + type: 'string', + required: true, + displayOptions: { + show: { + sendInputData: [ + false, + ], + }, + }, + default: '', + description: 'The message to publish', + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'QoS', + name: 'qos', + type: 'options', + options: [ + { + name: 'Received at Most Once', + value: 0, + }, + { + name: 'Received at Least Once', + value: 1, + }, + { + name: 'Exactly Once', + value: 2, + }, + ], + default: 0, + description: 'QoS subscription level', + }, + { + displayName: 'Retain', + name: 'retain', + type: 'boolean', + default: false, + description: `Normally if a publisher publishes a message to a topic, and no one is subscribed to
+ that topic the message is simply discarded by the broker. However the publisher can tell the broker
+ to keep the last message on that topic by setting the retain flag to true.`, + }, + ], + }, + ], + }; + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + const length = (items.length as unknown) as number; + const credentials = this.getCredentials('mqtt') as IDataObject; + + const protocol = credentials.protocol as string || 'mqtt'; + const host = credentials.host as string; + const brokerUrl = `${protocol}://${host}`; + const port = credentials.port as number || 1883; + const clientId = credentials.clientId as string || `mqttjs_${Math.random().toString(16).substr(2, 8)}`; + const clean = credentials.clean as boolean; + + const clientOptions: IClientOptions = { + port, + clean, + clientId, + }; + + if (credentials.username && credentials.password) { + clientOptions.username = credentials.username as string; + clientOptions.password = credentials.password as string; + } + + const client = mqtt.connect(brokerUrl, clientOptions); + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + + // tslint:disable-next-line: no-any + const data = await new Promise((resolve, reject): any => { + client.on('connect', () => { + for (let i = 0; i < length; i++) { + + let message; + const topic = (this.getNodeParameter('topic', i) as string); + const options = (this.getNodeParameter('options', i) as IDataObject); + + try { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); + } else { + message = this.getNodeParameter('message', i) as string; + } + client.publish(topic, message, options); + } catch (e) { + reject(e); + } + } + //wait for the in-flight messages to be acked. + //needed for messages with QoS 1 & 2 + client.end(false, {}, () => { + resolve([items]); + }); + + client.on('error', (e: string | undefined) => { + reject(e); + }); + }); + }); + + return data as INodeExecutionData[][]; + } +} diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts index c99078e91..94bbda8c1 100644 --- a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -13,14 +13,14 @@ import { import * as mqtt from 'mqtt'; import { - IClientOptions, + IClientOptions, ISubscriptionMap, } from 'mqtt'; export class MqttTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'MQTT Trigger', name: 'mqttTrigger', - icon: 'file:mqtt.png', + icon: 'file:mqtt.svg', group: ['trigger'], version: 1, description: 'Listens to MQTT events', @@ -43,7 +43,9 @@ export class MqttTrigger implements INodeType { type: 'string', default: '', description: `Topics to subscribe to, multiple can be defined with comma.
- wildcard characters are supported (+ - for single level and # - for multi level)`, + wildcard characters are supported (+ - for single level and # - for multi level)
+ By default all subscription used QoS=0. To set a different QoS, write the QoS desired
+ after the topic preceded by a colom. For Example: topicA:1,topicB:2`, }, { displayName: 'Options', @@ -52,6 +54,13 @@ export class MqttTrigger implements INodeType { placeholder: 'Add Option', default: {}, options: [ + { + displayName: 'JSON Parse Body', + name: 'jsonParseBody', + type: 'boolean', + default: false, + description: 'Try to parse the message to an object.', + }, { displayName: 'Only Message', name: 'onlyMessage', @@ -59,13 +68,6 @@ export class MqttTrigger implements INodeType { default: false, description: 'Returns only the message property.', }, - { - displayName: 'JSON Parse Message', - name: 'jsonParseMessage', - type: 'boolean', - default: false, - description: 'Try to parse the message to an object.', - }, ], }, ], @@ -81,6 +83,13 @@ export class MqttTrigger implements INodeType { const topics = (this.getNodeParameter('topics') as string).split(','); + const topicsQoS: IDataObject = {}; + + for (const data of topics) { + const [topic, qos] = data.split(':'); + topicsQoS[topic] = (qos) ? { qos: parseInt(qos, 10) } : { qos: 0 }; + } + const options = this.getNodeParameter('options') as IDataObject; if (!topics) { @@ -91,9 +100,13 @@ export class MqttTrigger implements INodeType { const host = credentials.host as string; const brokerUrl = `${protocol}://${host}`; const port = credentials.port as number || 1883; + const clientId = credentials.clientId as string || `mqttjs_${Math.random().toString(16).substr(2, 8)}`; + const clean = credentials.clean as boolean; const clientOptions: IClientOptions = { port, + clean, + clientId, }; if (credentials.username && credentials.password) { @@ -108,20 +121,19 @@ export class MqttTrigger implements INodeType { async function manualTriggerFunction() { await new Promise((resolve, reject) => { client.on('connect', () => { - client.subscribe(topics, (err, granted) => { + client.subscribe(topicsQoS as ISubscriptionMap, (err, granted) => { if (err) { reject(err); } client.on('message', (topic: string, message: Buffer | string) => { // tslint:disable-line:no-any - let result: IDataObject = {}; message = message.toString() as string; - if (options.jsonParseMessage) { + if (options.jsonParseBody) { try { message = JSON.parse(message.toString()); - } catch (error) { } + } catch (err) { } } result.message = message; @@ -129,10 +141,9 @@ export class MqttTrigger implements INodeType { if (options.onlyMessage) { //@ts-ignore - result = message; + result = [message as string]; } - - self.emit([self.helpers.returnJsonArray([result])]); + self.emit([self.helpers.returnJsonArray(result)]); resolve(true); }); }); @@ -144,7 +155,9 @@ export class MqttTrigger implements INodeType { }); } - manualTriggerFunction(); + if (this.getMode() === 'trigger') { + manualTriggerFunction(); + } async function closeFunction() { client.end(); diff --git a/packages/nodes-base/nodes/MQTT/mqtt.png b/packages/nodes-base/nodes/MQTT/mqtt.png deleted file mode 100644 index 12c5f2495..000000000 Binary files a/packages/nodes-base/nodes/MQTT/mqtt.png and /dev/null differ diff --git a/packages/nodes-base/nodes/MQTT/mqtt.svg b/packages/nodes-base/nodes/MQTT/mqtt.svg new file mode 100644 index 000000000..3e202aa2c --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/mqtt.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 963d6c591..516fa72e4 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -439,6 +439,7 @@ "dist/nodes/Mocean/Mocean.node.js", "dist/nodes/MondayCom/MondayCom.node.js", "dist/nodes/MongoDb/MongoDb.node.js", + "dist/nodes/MQTT/Mqtt.node.js", "dist/nodes/MQTT/MqttTrigger.node.js", "dist/nodes/MoveBinaryData.node.js", "dist/nodes/Msg91/Msg91.node.js",