diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index a2c669206..c936735d3 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -6,6 +6,8 @@ import { TopicMessages, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { IExecuteFunctions, } from 'n8n-core'; @@ -73,6 +75,44 @@ export class Kafka implements INodeType { type: 'boolean', default: false, }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Confluent Schema Registry.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + placeholder: 'https://schema-registry-domain:8081', + default: '', + description: 'URL of the schema registry.', + }, + { + displayName: 'Event Name', + name: 'eventName', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'Namespace and Name of Schema in Schema Registry (namespace.name).', + }, { displayName: 'Headers', name: 'headersUi', @@ -170,6 +210,8 @@ export class Kafka implements INodeType { const options = this.getNodeParameter('options', 0) as IDataObject; const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + const timeout = options.timeout as number; let compression = CompressionTypes.None; @@ -211,7 +253,7 @@ export class Kafka implements INodeType { await producer.connect(); - let message: string; + let message: string | Buffer; for (let i = 0; i < length; i++) { if (sendInputData === true) { @@ -220,6 +262,20 @@ export class Kafka implements INodeType { message = this.getNodeParameter('message', i) as string; } + if (useSchemaRegistry) { + try { + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + const eventName = this.getNodeParameter('eventName', 0) as string; + + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); + const id = await registry.getLatestSchemaId(eventName); + + message = await registry.encode(id, JSON.parse(message)); + } catch (exception) { + throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration'); + } + } + const topic = this.getNodeParameter('topic', i) as string; const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean; diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index be7e544e0..935cb4fe4 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -5,6 +5,8 @@ import { SASLOptions, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { ITriggerFunctions, } from 'n8n-core'; @@ -55,6 +57,29 @@ export class KafkaTrigger implements INodeType { placeholder: 'n8n-kafka', description: 'ID of the consumer group.', }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Confluent Schema Registry.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + placeholder: 'https://schema-registry-domain:8081', + default: '', + description: 'URL of the schema registry.', + }, { displayName: 'Options', name: 'options', @@ -104,6 +129,13 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, + { + displayName: 'Return headers', + name: 'returnHeaders', + type: 'boolean', + default: false, + description: 'Return the headers received from Kafka', + }, ], }, ], @@ -153,6 +185,10 @@ export class KafkaTrigger implements INodeType { const self = this; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => { @@ -166,6 +202,23 @@ export class KafkaTrigger implements INodeType { } catch (error) { } } + if (useSchemaRegistry) { + try { + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); + value = await registry.decode(message.value as Buffer); + } catch (error) { } + } + + if (options.returnHeaders) { + const headers: {[key: string]: string} = {}; + for (const key in message.headers) { + const header = message.headers[key]; + headers[key] = header?.toString('utf8') || ''; + } + + data.headers = headers; + } + data.message = value; data.topic = topic; diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 25bb7deec..08a157ef5 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -696,6 +696,7 @@ "typescript": "~4.3.5" }, "dependencies": { + "@kafkajs/confluent-schema-registry": "1.0.6", "@types/lossless-json": "^1.0.0", "@types/promise-ftp": "^1.3.4", "@types/snowflake-sdk": "^1.5.1", diff --git a/packages/nodes-base/tsconfig.json b/packages/nodes-base/tsconfig.json index 1d0670252..2dbcc429d 100644 --- a/packages/nodes-base/tsconfig.json +++ b/packages/nodes-base/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "lib": [ + "dom", "es2017", "es2019.array" ],