diff --git a/freedcs/telemetry/management/commands/mqtt-to-db.py b/freedcs/telemetry/management/commands/mqtt-to-db.py new file mode 100644 index 0000000..5fd9858 --- /dev/null +++ b/freedcs/telemetry/management/commands/mqtt-to-db.py @@ -0,0 +1,52 @@ +import asyncio +import json +import time +import paho.mqtt.client as mqtt +from asgiref.sync import sync_to_async +from asyncio_mqtt import Client + +from django.conf import settings +from django.core.management.base import BaseCommand +from api.models import Device +from telemetry.models import Telemetry + +MQTT_HOST = settings.MQTT_BROKER['HOST'] +MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + + +class Command(BaseCommand): + help = 'MQTT to DB deamon' + + @sync_to_async + def get_device(self, serial): + return Device.objects.get(serial=serial) + + @sync_to_async + def store_telemetry(self, device, payload): + Telemetry.objects.create( + device=device, + transport='mqtt', + clock=payload['clock'], + payload=payload['payload'] + ) + + async def mqtt_broker(self): + async with Client(MQTT_HOST, port=MQTT_PORT) as client: + await client.subscribe("#") + async with client.unfiltered_messages() as messages: + async for message in messages: + payload = json.loads(message.payload.decode('utf-8')) + device = await self.get_device(message.topic) + await self.store_telemetry(device, payload) + + def handle(self, *args, **options): + client = mqtt.Client() + while True: + try: + client.connect(MQTT_HOST, MQTT_PORT) + break + except ConnectionRefusedError: + self.stdout.write('WARNING: Broker not available') + time.sleep(5) + client.disconnect() + asyncio.run(self.mqtt_broker())