diff --git a/bite/telemetry/management/commands/dispatcher.py b/bite/telemetry/management/commands/dispatcher.py index f304357..a97beb6 100644 --- a/bite/telemetry/management/commands/dispatcher.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -35,12 +35,12 @@ from api.models import Device class Command(BaseCommand): - help = 'MQTT to DB deamon' + help = "Telemetry dispatcher" - MQTT_HOST = settings.MQTT_BROKER['HOST'] - MQTT_PORT = int(settings.MQTT_BROKER['PORT']) - KAFKA_HOST = settings.KAFKA_BROKER['HOST'] - KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) + MQTT_HOST = settings.MQTT_BROKER["HOST"] + MQTT_PORT = int(settings.MQTT_BROKER["PORT"]) + KAFKA_HOST = settings.KAFKA_BROKER["HOST"] + KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"]) producer = None @sync_to_async @@ -52,10 +52,7 @@ class Command(BaseCommand): @sync_to_async def dispatch(self, message): - self.producer.send( - 'telemetry', {"transport": 'mqtt', - "body": message} - ) + self.producer.send("telemetry", {"transport": "mqtt", "body": message}) async def mqtt_broker(self): async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: @@ -66,12 +63,13 @@ class Command(BaseCommand): device = await self.get_device(message.topic) if device is not None: message_body = json.loads( - message.payload.decode('utf-8')) + message.payload.decode("utf-8") + ) await self.dispatch(message_body) else: self.stdout.write( - self.style.ERROR( - 'DEBUG: message discarded')) + self.style.ERROR("DEBUG: message discarded") + ) def handle(self, *args, **options): client = mqtt.Client() @@ -81,24 +79,26 @@ class Command(BaseCommand): break except (socket.gaierror, ConnectionRefusedError): self.stdout.write( - self.style.WARNING('WARNING: MQTT broker not available')) + self.style.WARNING("WARNING: MQTT broker not available") + ) time.sleep(5) while True: try: self.producer = KafkaProducer( - bootstrap_servers='{}:{}'.format( + bootstrap_servers="{}:{}".format( self.KAFKA_HOST, self.KAFKA_PORT ), - value_serializer=lambda v: json.dumps(v).encode('utf-8'), - retries=5 + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + retries=5, ) break except NoBrokersAvailable: self.stdout.write( - self.style.WARNING('WARNING: Kafka broker not available')) + self.style.WARNING("WARNING: Kafka broker not available") + ) time.sleep(5) - self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed')) + self.stdout.write(self.style.SUCCESS("INFO: Brokers subscribed")) client.disconnect() asyncio.run(self.mqtt_broker()) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py index 4f707bf..f06a753 100644 --- a/bite/telemetry/management/commands/handler.py +++ b/bite/telemetry/management/commands/handler.py @@ -31,10 +31,10 @@ from telemetry.models import Telemetry class Command(BaseCommand): - help = 'MQTT to DB deamon' + help = "Telemetry handler" - KAFKA_HOST = settings.KAFKA_BROKER['HOST'] - KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) + KAFKA_HOST = settings.KAFKA_BROKER["HOST"] + KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"]) def get_device(self, serial): try: @@ -47,7 +47,7 @@ class Command(BaseCommand): transport=transport, device=self.get_device(message["device"]), clock=message["clock"], - payload=message["payload"] + payload=message["payload"], ) def handle(self, *args, **options): @@ -55,22 +55,22 @@ class Command(BaseCommand): try: consumer = KafkaConsumer( "telemetry", - bootstrap_servers='{}:{}'.format( + bootstrap_servers="{}:{}".format( self.KAFKA_HOST, self.KAFKA_PORT ), group_id="handler", - value_deserializer=lambda m: json.loads(m.decode('utf8')), + value_deserializer=lambda m: json.loads(m.decode("utf8")), ) break except NoBrokersAvailable: self.stdout.write( - self.style.WARNING('WARNING: Kafka broker not available')) + self.style.WARNING("WARNING: Kafka broker not available") + ) time.sleep(5) - self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed')) + self.stdout.write(self.style.SUCCESS("INFO: Kafka broker subscribed")) for message in consumer: self.store_telemetry( - message.value["transport"], - message.value["body"] + message.value["transport"], message.value["body"] ) consumer.unsuscribe() diff --git a/docker/simulator/device_simulator.py b/docker/simulator/device_simulator.py index 682a6ad..6ffb1e6 100755 --- a/docker/simulator/device_simulator.py +++ b/docker/simulator/device_simulator.py @@ -29,7 +29,7 @@ import argparse from time import sleep import paho.mqtt.publish as publish -DEBUG = bool(os.environ.get('IOT_DEBUG', False)) +DEBUG = bool(os.environ.get("IOT_DEBUG", False)) http = urllib3.PoolManager() @@ -39,15 +39,16 @@ def post_json(endpoint, url, data): if DEBUG: print(json_data) - encoded_data = json_data.encode('utf8') + encoded_data = json_data.encode("utf8") while True: try: r = http.request( - 'POST', + "POST", endpoint + url, body=encoded_data, - headers={'content-type': 'application/json'}) + headers={"content-type": "application/json"}, + ) return r except urllib3.exceptions.MaxRetryError: pass @@ -57,74 +58,89 @@ def post_json(endpoint, url, data): def publish_json(transport, endpoint, data): json_data = json.dumps(data) - serial = data['device'] + serial = data["device"] if DEBUG: print(json_data) - encoded_data = json_data.encode('utf8') + encoded_data = json_data.encode("utf8") publish.single( topic=serial, payload=encoded_data, - hostname=endpoint.split(':')[0], - port=int(endpoint.split(':')[1]), + hostname=endpoint.split(":")[0], + port=int(endpoint.split(":")[1]), client_id=serial, - transport=('websockets' if transport == 'ws' else 'tcp'), + transport=("websockets" if transport == "ws" else "tcp"), # auth=auth FIXME ) def main(): - parser = argparse.ArgumentParser( - description='IoT simulator oprions') + parser = argparse.ArgumentParser(description="IoT simulator oprions") - parser.add_argument('-e', '--endpoint', - default=os.environ.get('IOT_HTTP', - 'http://127.0.0.1:8000'), - help='IoT HTTP endpoint') - parser.add_argument('-m', '--mqtt', - default=os.environ.get('IOT_MQTT', - '127.0.0.1:1883'), - help='IoT MQTT endpoint') - parser.add_argument('-t', '--transport', - choices=['mqtt', 'ws', 'http'], - default=os.environ.get('IOT_TL', 'http'), - help='IoT transport layer') - parser.add_argument('-s', '--serial', - default=os.environ.get('IOT_SERIAL'), - help='IoT device serial number') - parser.add_argument('-d', '--delay', metavar='s', type=float, - default=os.environ.get('IOT_DELAY', 10), - help='Delay between requests') + parser.add_argument( + "-e", + "--endpoint", + default=os.environ.get("IOT_HTTP", "http://127.0.0.1:8000"), + help="IoT HTTP endpoint", + ) + parser.add_argument( + "-m", + "--mqtt", + default=os.environ.get("IOT_MQTT", "127.0.0.1:1883"), + help="IoT MQTT endpoint", + ) + parser.add_argument( + "-t", + "--transport", + choices=["mqtt", "ws", "http"], + default=os.environ.get("IOT_TL", "http"), + help="IoT transport layer", + ) + parser.add_argument( + "-s", + "--serial", + default=os.environ.get("IOT_SERIAL"), + help="IoT device serial number", + ) + parser.add_argument( + "-d", + "--delay", + metavar="s", + type=float, + default=os.environ.get("IOT_DELAY", 10), + help="Delay between requests", + ) args = parser.parse_args() - subscribe = '/api/device/subscribe/' - telemetry = '/telemetry/' + subscribe = "/api/device/subscribe/" + telemetry = "/telemetry/" if args.serial is None: - args.serial = ''.join( - random.choices(string.ascii_lowercase + string.digits, k=8)) + args.serial = "".join( + random.choices(string.ascii_lowercase + string.digits, k=8) + ) - data = {'serial': args.serial} + data = {"serial": args.serial} post_json(args.endpoint, subscribe, data) while True: data = { - 'device': args.serial, - 'clock': int(datetime.datetime.now().timestamp()), + "device": args.serial, + "clock": int(datetime.datetime.now().timestamp()), } payload = { - 'id': 'device_simulator', - 'light': random.randint(300, 500), - 'temperature': { - 'celsius': round(random.uniform(20, 28), 1)} + "id": "device_simulator", + "light": random.randint(300, 500), + "temperature": {"celsius": round(random.uniform(20, 28), 1)}, } - if args.transport == 'http': - post_json(args.endpoint, telemetry, {**data, 'payload': payload}) - elif args.transport in ('mqtt', 'ws'): + if args.transport == "http": + post_json(args.endpoint, telemetry, {**data, "payload": payload}) + elif args.transport in ("mqtt", "ws"): publish_json( - args.transport, args.mqtt, {**data, 'payload': payload}) + args.transport, args.mqtt, {**data, "payload": payload} + ) else: raise NotImplementedError sleep(args.delay)