From e3785d46697639f893395456e35ec470eec9d9e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 18:18:42 +0200 Subject: [PATCH] Add kafka settings --- bite/bite/settings.py | 5 +++++ bite/telemetry/management/commands/dispatcher.py | 6 +++++- bite/telemetry/management/commands/handler.py | 10 ++++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/bite/bite/settings.py b/bite/bite/settings.py index 363dfb4..42807a0 100644 --- a/bite/bite/settings.py +++ b/bite/bite/settings.py @@ -158,6 +158,11 @@ MQTT_BROKER = { 'PORT': '1883', } +KAFKA_BROKER = { + 'HOST': 'kafka', + 'PORT': '9092', +} + # If no local_settings.py is availble in the current folder let's try to # load it from the application root try: diff --git a/bite/telemetry/management/commands/dispatcher.py b/bite/telemetry/management/commands/dispatcher.py index 15c0d29..f304357 100644 --- a/bite/telemetry/management/commands/dispatcher.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -39,6 +39,8 @@ class Command(BaseCommand): MQTT_HOST = settings.MQTT_BROKER['HOST'] MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + KAFKA_HOST = settings.KAFKA_BROKER['HOST'] + KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) producer = None @sync_to_async @@ -85,7 +87,9 @@ class Command(BaseCommand): while True: try: self.producer = KafkaProducer( - bootstrap_servers='localhost:9092', + bootstrap_servers='{}:{}'.format( + self.KAFKA_HOST, self.KAFKA_PORT + ), value_serializer=lambda v: json.dumps(v).encode('utf-8'), retries=5 ) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py index d15ac0f..14d2747 100644 --- a/bite/telemetry/management/commands/handler.py +++ b/bite/telemetry/management/commands/handler.py @@ -29,13 +29,13 @@ from django.core.exceptions import ObjectDoesNotExist from api.models import Device from telemetry.models import Telemetry -MQTT_HOST = settings.MQTT_BROKER['HOST'] -MQTT_PORT = int(settings.MQTT_BROKER['PORT']) - class Command(BaseCommand): help = 'MQTT to DB deamon' + KAFKA_HOST = settings.KAFKA_BROKER['HOST'] + KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) + def get_device(self, serial): try: return Device.objects.get(serial=serial) @@ -56,7 +56,9 @@ class Command(BaseCommand): consumer = KafkaConsumer( "telemetry", value_deserializer=lambda m: json.loads(m.decode('utf8')), - bootstrap_servers='localhost:9092' + bootstrap_servers='{}:{}'.format( + self.KAFKA_HOST, self.KAFKA_PORT + ), ) break except NoBrokersAvailable: