1
0
mirror of https://github.com/daniviga/bite.git synced 2024-11-23 13:26:14 +01:00
bite/freedcs/telemetry/management/commands/mqtt-to-db.py
Daniele Viganò 8e5a407b28
Implement MQTT ingestion and improve dockerization (#13)
* Add mqtt-to-db command

* Minor fixes

* Ignore production.py on git

* Add a production conf

* Add django container

* Add gunicorn for prod and traefik
2020-06-15 22:47:55 +02:00

53 lines
1.6 KiB
Python

import asyncio
import json
import time
import paho.mqtt.client as mqtt
from asgiref.sync import sync_to_async
from asyncio_mqtt import Client
from django.conf import settings
from django.core.management.base import BaseCommand
from api.models import Device
from telemetry.models import Telemetry
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
class Command(BaseCommand):
help = 'MQTT to DB deamon'
@sync_to_async
def get_device(self, serial):
return Device.objects.get(serial=serial)
@sync_to_async
def store_telemetry(self, device, payload):
Telemetry.objects.create(
device=device,
transport='mqtt',
clock=payload['clock'],
payload=payload['payload']
)
async def mqtt_broker(self):
async with Client(MQTT_HOST, port=MQTT_PORT) as client:
await client.subscribe("#")
async with client.unfiltered_messages() as messages:
async for message in messages:
payload = json.loads(message.payload.decode('utf-8'))
device = await self.get_device(message.topic)
await self.store_telemetry(device, payload)
def handle(self, *args, **options):
client = mqtt.Client()
while True:
try:
client.connect(MQTT_HOST, MQTT_PORT)
break
except ConnectionRefusedError:
self.stdout.write('WARNING: Broker not available')
time.sleep(5)
client.disconnect()
asyncio.run(self.mqtt_broker())