1
0
mirror of https://github.com/daniviga/bite.git synced 2024-11-22 21:16:12 +01:00

Implement Kafka dispatcher and handler

This commit is contained in:
Daniele Viganò 2023-09-08 18:06:33 +02:00
parent 681f99d2f4
commit 23dfb6837d
2 changed files with 105 additions and 18 deletions

View File

@ -22,23 +22,25 @@ import asyncio
import json import json
import time import time
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
from asgiref.sync import sync_to_async from asgiref.sync import sync_to_async
from asyncio_mqtt import Client from aiomqtt import Client
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from api.models import Device from api.models import Device
from telemetry.models import Telemetry
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
class Command(BaseCommand): class Command(BaseCommand):
help = 'MQTT to DB deamon' help = 'MQTT to DB deamon'
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
producer = None
@sync_to_async @sync_to_async
def get_device(self, serial): def get_device(self, serial):
try: try:
@ -47,24 +49,23 @@ class Command(BaseCommand):
return None return None
@sync_to_async @sync_to_async
def store_telemetry(self, device, payload): def dispatch(self, message):
Telemetry.objects.create( self.producer.send(
device=device, 'telemetry', {"transport": 'mqtt',
transport='mqtt', "body": message}
clock=payload['clock'],
payload=payload['payload']
) )
async def mqtt_broker(self): async def mqtt_broker(self):
async with Client(MQTT_HOST, port=MQTT_PORT) as client: async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client:
# use shared subscription for HA/balancing # use shared subscription for HA/balancing
await client.subscribe("$share/telemetry/#") await client.subscribe("$share/telemetry/#")
async with client.unfiltered_messages() as messages: async with client.messages() as messages:
async for message in messages: async for message in messages:
payload = json.loads(message.payload.decode('utf-8'))
device = await self.get_device(message.topic) device = await self.get_device(message.topic)
if device is not None: if device is not None:
await self.store_telemetry(device, payload) message_body = json.loads(
message.payload.decode('utf-8'))
await self.dispatch(message_body)
else: else:
self.stdout.write( self.stdout.write(
self.style.ERROR( self.style.ERROR(
@ -74,13 +75,26 @@ class Command(BaseCommand):
client = mqtt.Client() client = mqtt.Client()
while True: while True:
try: try:
client.connect(MQTT_HOST, MQTT_PORT) client.connect(self.MQTT_HOST, self.MQTT_PORT)
break break
except (socket.gaierror, ConnectionRefusedError): except (socket.gaierror, ConnectionRefusedError):
self.stdout.write( self.stdout.write(
self.style.WARNING('WARNING: Broker not available')) self.style.WARNING('WARNING: MQTT broker not available'))
time.sleep(5) time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed')) while True:
try:
self.producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=5
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed'))
client.disconnect() client.disconnect()
asyncio.run(self.mqtt_broker()) asyncio.run(self.mqtt_broker())

View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# BITE - A Basic/IoT/Example
# Copyright (C) 2020-2021 Daniele Viganò <daniele@vigano.me>
#
# BITE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# BITE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import time
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist
from api.models import Device
from telemetry.models import Telemetry
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
class Command(BaseCommand):
help = 'MQTT to DB deamon'
def get_device(self, serial):
try:
return Device.objects.get(serial=serial)
except ObjectDoesNotExist:
return None
def store_telemetry(self, transport, message):
Telemetry.objects.create(
transport=transport,
device=self.get_device(message["device"]),
clock=message["clock"],
payload=message["payload"]
)
def handle(self, *args, **options):
while True:
try:
consumer = KafkaConsumer(
"telemetry",
value_deserializer=lambda m: json.loads(m.decode('utf8')),
bootstrap_servers='localhost:9092'
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed'))
for message in consumer:
self.store_telemetry(
message.value["transport"],
message.value["body"]
)
consumer.unsuscribe()