From 23dfb6837df4d01d244f816b3c01bbf95fdcb3d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 18:06:33 +0200 Subject: [PATCH] Implement Kafka dispatcher and handler --- .../commands/{mqtt-to-db.py => dispatcher.py} | 50 ++++++++----- bite/telemetry/management/commands/handler.py | 73 +++++++++++++++++++ 2 files changed, 105 insertions(+), 18 deletions(-) rename bite/telemetry/management/commands/{mqtt-to-db.py => dispatcher.py} (61%) create mode 100644 bite/telemetry/management/commands/handler.py diff --git a/bite/telemetry/management/commands/mqtt-to-db.py b/bite/telemetry/management/commands/dispatcher.py similarity index 61% rename from bite/telemetry/management/commands/mqtt-to-db.py rename to bite/telemetry/management/commands/dispatcher.py index a161519..15c0d29 100644 --- a/bite/telemetry/management/commands/mqtt-to-db.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -22,23 +22,25 @@ import asyncio import json import time import paho.mqtt.client as mqtt +from kafka import KafkaProducer +from kafka.errors import NoBrokersAvailable from asgiref.sync import sync_to_async -from asyncio_mqtt import Client +from aiomqtt import Client from django.conf import settings from django.core.management.base import BaseCommand from django.core.exceptions import ObjectDoesNotExist from api.models import Device -from telemetry.models import Telemetry - -MQTT_HOST = settings.MQTT_BROKER['HOST'] -MQTT_PORT = int(settings.MQTT_BROKER['PORT']) class Command(BaseCommand): help = 'MQTT to DB deamon' + MQTT_HOST = settings.MQTT_BROKER['HOST'] + MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + producer = None + @sync_to_async def get_device(self, serial): try: @@ -47,24 +49,23 @@ class Command(BaseCommand): return None @sync_to_async - def store_telemetry(self, device, payload): - Telemetry.objects.create( - device=device, - transport='mqtt', - clock=payload['clock'], - payload=payload['payload'] + def dispatch(self, message): + self.producer.send( + 'telemetry', {"transport": 'mqtt', + "body": message} ) async def mqtt_broker(self): - async with Client(MQTT_HOST, port=MQTT_PORT) as client: + async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: # use shared subscription for HA/balancing await client.subscribe("$share/telemetry/#") - async with client.unfiltered_messages() as messages: + async with client.messages() as messages: async for message in messages: - payload = json.loads(message.payload.decode('utf-8')) device = await self.get_device(message.topic) if device is not None: - await self.store_telemetry(device, payload) + message_body = json.loads( + message.payload.decode('utf-8')) + await self.dispatch(message_body) else: self.stdout.write( self.style.ERROR( @@ -74,13 +75,26 @@ class Command(BaseCommand): client = mqtt.Client() while True: try: - client.connect(MQTT_HOST, MQTT_PORT) + client.connect(self.MQTT_HOST, self.MQTT_PORT) break except (socket.gaierror, ConnectionRefusedError): self.stdout.write( - self.style.WARNING('WARNING: Broker not available')) + self.style.WARNING('WARNING: MQTT broker not available')) time.sleep(5) - self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed')) + while True: + try: + self.producer = KafkaProducer( + bootstrap_servers='localhost:9092', + value_serializer=lambda v: json.dumps(v).encode('utf-8'), + retries=5 + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING('WARNING: Kafka broker not available')) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed')) client.disconnect() asyncio.run(self.mqtt_broker()) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py new file mode 100644 index 0000000..d15ac0f --- /dev/null +++ b/bite/telemetry/management/commands/handler.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# BITE - A Basic/IoT/Example +# Copyright (C) 2020-2021 Daniele ViganĂ² +# +# BITE is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# BITE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import time +from kafka import KafkaConsumer +from kafka.errors import NoBrokersAvailable + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.core.exceptions import ObjectDoesNotExist + +from api.models import Device +from telemetry.models import Telemetry + +MQTT_HOST = settings.MQTT_BROKER['HOST'] +MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + + +class Command(BaseCommand): + help = 'MQTT to DB deamon' + + def get_device(self, serial): + try: + return Device.objects.get(serial=serial) + except ObjectDoesNotExist: + return None + + def store_telemetry(self, transport, message): + Telemetry.objects.create( + transport=transport, + device=self.get_device(message["device"]), + clock=message["clock"], + payload=message["payload"] + ) + + def handle(self, *args, **options): + while True: + try: + consumer = KafkaConsumer( + "telemetry", + value_deserializer=lambda m: json.loads(m.decode('utf8')), + bootstrap_servers='localhost:9092' + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING('WARNING: Kafka broker not available')) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed')) + for message in consumer: + self.store_telemetry( + message.value["transport"], + message.value["body"] + ) + consumer.unsuscribe()