From 23dfb6837df4d01d244f816b3c01bbf95fdcb3d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 18:06:33 +0200 Subject: [PATCH 1/6] Implement Kafka dispatcher and handler --- .../commands/{mqtt-to-db.py => dispatcher.py} | 50 ++++++++----- bite/telemetry/management/commands/handler.py | 73 +++++++++++++++++++ 2 files changed, 105 insertions(+), 18 deletions(-) rename bite/telemetry/management/commands/{mqtt-to-db.py => dispatcher.py} (61%) create mode 100644 bite/telemetry/management/commands/handler.py diff --git a/bite/telemetry/management/commands/mqtt-to-db.py b/bite/telemetry/management/commands/dispatcher.py similarity index 61% rename from bite/telemetry/management/commands/mqtt-to-db.py rename to bite/telemetry/management/commands/dispatcher.py index a161519..15c0d29 100644 --- a/bite/telemetry/management/commands/mqtt-to-db.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -22,23 +22,25 @@ import asyncio import json import time import paho.mqtt.client as mqtt +from kafka import KafkaProducer +from kafka.errors import NoBrokersAvailable from asgiref.sync import sync_to_async -from asyncio_mqtt import Client +from aiomqtt import Client from django.conf import settings from django.core.management.base import BaseCommand from django.core.exceptions import ObjectDoesNotExist from api.models import Device -from telemetry.models import Telemetry - -MQTT_HOST = settings.MQTT_BROKER['HOST'] -MQTT_PORT = int(settings.MQTT_BROKER['PORT']) class Command(BaseCommand): help = 'MQTT to DB deamon' + MQTT_HOST = settings.MQTT_BROKER['HOST'] + MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + producer = None + @sync_to_async def get_device(self, serial): try: @@ -47,24 +49,23 @@ class Command(BaseCommand): return None @sync_to_async - def store_telemetry(self, device, payload): - Telemetry.objects.create( - device=device, - transport='mqtt', - clock=payload['clock'], - payload=payload['payload'] + def dispatch(self, message): + self.producer.send( + 'telemetry', {"transport": 'mqtt', + "body": message} ) async def mqtt_broker(self): - async with Client(MQTT_HOST, port=MQTT_PORT) as client: + async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: # use shared subscription for HA/balancing await client.subscribe("$share/telemetry/#") - async with client.unfiltered_messages() as messages: + async with client.messages() as messages: async for message in messages: - payload = json.loads(message.payload.decode('utf-8')) device = await self.get_device(message.topic) if device is not None: - await self.store_telemetry(device, payload) + message_body = json.loads( + message.payload.decode('utf-8')) + await self.dispatch(message_body) else: self.stdout.write( self.style.ERROR( @@ -74,13 +75,26 @@ class Command(BaseCommand): client = mqtt.Client() while True: try: - client.connect(MQTT_HOST, MQTT_PORT) + client.connect(self.MQTT_HOST, self.MQTT_PORT) break except (socket.gaierror, ConnectionRefusedError): self.stdout.write( - self.style.WARNING('WARNING: Broker not available')) + self.style.WARNING('WARNING: MQTT broker not available')) time.sleep(5) - self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed')) + while True: + try: + self.producer = KafkaProducer( + bootstrap_servers='localhost:9092', + value_serializer=lambda v: json.dumps(v).encode('utf-8'), + retries=5 + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING('WARNING: Kafka broker not available')) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed')) client.disconnect() asyncio.run(self.mqtt_broker()) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py new file mode 100644 index 0000000..d15ac0f --- /dev/null +++ b/bite/telemetry/management/commands/handler.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# BITE - A Basic/IoT/Example +# Copyright (C) 2020-2021 Daniele ViganĂ² +# +# BITE is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# BITE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import time +from kafka import KafkaConsumer +from kafka.errors import NoBrokersAvailable + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.core.exceptions import ObjectDoesNotExist + +from api.models import Device +from telemetry.models import Telemetry + +MQTT_HOST = settings.MQTT_BROKER['HOST'] +MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + + +class Command(BaseCommand): + help = 'MQTT to DB deamon' + + def get_device(self, serial): + try: + return Device.objects.get(serial=serial) + except ObjectDoesNotExist: + return None + + def store_telemetry(self, transport, message): + Telemetry.objects.create( + transport=transport, + device=self.get_device(message["device"]), + clock=message["clock"], + payload=message["payload"] + ) + + def handle(self, *args, **options): + while True: + try: + consumer = KafkaConsumer( + "telemetry", + value_deserializer=lambda m: json.loads(m.decode('utf8')), + bootstrap_servers='localhost:9092' + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING('WARNING: Kafka broker not available')) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed')) + for message in consumer: + self.store_telemetry( + message.value["transport"], + message.value["body"] + ) + consumer.unsuscribe() From 49211437d2c5d963c782afcd68bd19aca426ea54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 18:14:14 +0200 Subject: [PATCH 2/6] WIP: extend docker compose --- docker/docker-compose.dev.yml | 6 +++++- docker/docker-compose.prod.yml | 6 +++++- docker/docker-compose.yml | 14 +++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 07d4da1..f0da724 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -44,6 +44,10 @@ services: volumes: - ../bite:/srv/app/bite - mqtt-to-db: + dispatcher: + volumes: + - ../bite:/srv/app/bite + + handler: volumes: - ../bite:/srv/app/bite diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 4598288..5aee5cb 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -29,6 +29,10 @@ services: volumes: - ./django/production.py.sample:/srv/app/bite/bite/production.py - mqtt-to-db: + dispatcher: + volumes: + - ./django/production.py.sample:/srv/app/bite/bite/production.py + + handler: volumes: - ./django/production.py.sample:/srv/app/bite/bite/production.py diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index f9f6922..fe391db 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -104,13 +104,21 @@ services: - staticdata:/srv/appdata/bite/static command: ["python3", "manage.py", "collectstatic", "--noinput"] - mqtt-to-db: + dispatcher: <<: *service_default image: daniviga/bite - command: ["python3", "manage.py", "mqtt-to-db"] + command: ["python3", "manage.py", "dispatcher"] + networks: + - net + depends_on: + - broker + + handler: + <<: *service_default + image: daniviga/bite + command: ["python3", "manage.py", "handler"] networks: - net depends_on: - data-migration - timescale - - broker From e3785d46697639f893395456e35ec470eec9d9e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 18:18:42 +0200 Subject: [PATCH 3/6] Add kafka settings --- bite/bite/settings.py | 5 +++++ bite/telemetry/management/commands/dispatcher.py | 6 +++++- bite/telemetry/management/commands/handler.py | 10 ++++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/bite/bite/settings.py b/bite/bite/settings.py index 363dfb4..42807a0 100644 --- a/bite/bite/settings.py +++ b/bite/bite/settings.py @@ -158,6 +158,11 @@ MQTT_BROKER = { 'PORT': '1883', } +KAFKA_BROKER = { + 'HOST': 'kafka', + 'PORT': '9092', +} + # If no local_settings.py is availble in the current folder let's try to # load it from the application root try: diff --git a/bite/telemetry/management/commands/dispatcher.py b/bite/telemetry/management/commands/dispatcher.py index 15c0d29..f304357 100644 --- a/bite/telemetry/management/commands/dispatcher.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -39,6 +39,8 @@ class Command(BaseCommand): MQTT_HOST = settings.MQTT_BROKER['HOST'] MQTT_PORT = int(settings.MQTT_BROKER['PORT']) + KAFKA_HOST = settings.KAFKA_BROKER['HOST'] + KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) producer = None @sync_to_async @@ -85,7 +87,9 @@ class Command(BaseCommand): while True: try: self.producer = KafkaProducer( - bootstrap_servers='localhost:9092', + bootstrap_servers='{}:{}'.format( + self.KAFKA_HOST, self.KAFKA_PORT + ), value_serializer=lambda v: json.dumps(v).encode('utf-8'), retries=5 ) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py index d15ac0f..14d2747 100644 --- a/bite/telemetry/management/commands/handler.py +++ b/bite/telemetry/management/commands/handler.py @@ -29,13 +29,13 @@ from django.core.exceptions import ObjectDoesNotExist from api.models import Device from telemetry.models import Telemetry -MQTT_HOST = settings.MQTT_BROKER['HOST'] -MQTT_PORT = int(settings.MQTT_BROKER['PORT']) - class Command(BaseCommand): help = 'MQTT to DB deamon' + KAFKA_HOST = settings.KAFKA_BROKER['HOST'] + KAFKA_PORT = int(settings.KAFKA_BROKER['PORT']) + def get_device(self, serial): try: return Device.objects.get(serial=serial) @@ -56,7 +56,9 @@ class Command(BaseCommand): consumer = KafkaConsumer( "telemetry", value_deserializer=lambda m: json.loads(m.decode('utf8')), - bootstrap_servers='localhost:9092' + bootstrap_servers='{}:{}'.format( + self.KAFKA_HOST, self.KAFKA_PORT + ), ) break except NoBrokersAvailable: From ea9f9ef705aca642e3b50cff447aa4bff1e2a6f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 23:10:49 +0200 Subject: [PATCH 4/6] Implement kafka in Docker compose and set group_id in handlers --- bite/bite/settings.py | 4 ++++ bite/telemetry/management/commands/handler.py | 3 ++- docker/django/Dockerfile | 8 +++---- docker/docker-compose.dev.yml | 4 ++++ docker/docker-compose.yml | 24 +++++++++++++++++++ docker/ntpd/Dockerfile | 4 ++-- docker/simulator/Dockerfile | 2 +- docker/simulator/device_simulator.py | 2 +- requirements.txt | 3 ++- 9 files changed, 44 insertions(+), 10 deletions(-) diff --git a/bite/bite/settings.py b/bite/bite/settings.py index 42807a0..8fc4ea3 100644 --- a/bite/bite/settings.py +++ b/bite/bite/settings.py @@ -151,6 +151,10 @@ STATIC_URL = '/static/' STATIC_ROOT = '/srv/appdata/bite/static' +REST_FRAMEWORK = { + 'DEFAULT_AUTHENTICATION_CLASSES': [] +} + SKIP_WHITELIST = True MQTT_BROKER = { diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py index 14d2747..4f707bf 100644 --- a/bite/telemetry/management/commands/handler.py +++ b/bite/telemetry/management/commands/handler.py @@ -55,10 +55,11 @@ class Command(BaseCommand): try: consumer = KafkaConsumer( "telemetry", - value_deserializer=lambda m: json.loads(m.decode('utf8')), bootstrap_servers='{}:{}'.format( self.KAFKA_HOST, self.KAFKA_PORT ), + group_id="handler", + value_deserializer=lambda m: json.loads(m.decode('utf8')), ) break except NoBrokersAvailable: diff --git a/docker/django/Dockerfile b/docker/django/Dockerfile index bbae1ff..417c3e9 100644 --- a/docker/django/Dockerfile +++ b/docker/django/Dockerfile @@ -17,20 +17,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM python:3.9-alpine AS builder +FROM python:3.11-alpine AS builder RUN apk update && apk add gcc musl-dev postgresql-dev \ && pip install psycopg2-binary # --- -FROM python:3.9-alpine +FROM python:3.11-alpine ENV PYTHONUNBUFFERED 1 ENV DJANGO_SETTINGS_MODULE "bite.settings" RUN apk update && apk add --no-cache postgresql-libs \ - && wget https://github.com/jwilder/dockerize/releases/download/v0.6.1/dockerize-alpine-linux-amd64-v0.6.1.tar.gz -qO- \ + && wget https://github.com/jwilder/dockerize/releases/download/v0.7.0/dockerize-alpine-linux-amd64-v0.7.0.tar.gz -qO- \ | tar -xz -C /usr/local/bin -COPY --from=builder /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/ +COPY --from=builder /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/ COPY --chown=1000:1000 requirements.txt /srv/app/bite/requirements.txt RUN pip3 install --no-cache-dir -r /srv/app/bite/requirements.txt diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index f0da724..5bf7abc 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -36,6 +36,10 @@ services: ports: - "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000" + kafka: + ports: + - "${CUSTOM_DOCKER_IP:-0.0.0.0}:9092:9092" + data-migration: volumes: - ../bite:/srv/app/bite diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index fe391db..5a74303 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -62,6 +62,30 @@ services: ports: - "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883" + zookeeper: + image: confluentinc/cp-zookeeper:latest + networks: + - net + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + networks: + - net + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ingress: <<: *service_default image: nginx:stable-alpine diff --git a/docker/ntpd/Dockerfile b/docker/ntpd/Dockerfile index 6b50ff6..2c1e570 100644 --- a/docker/ntpd/Dockerfile +++ b/docker/ntpd/Dockerfile @@ -17,9 +17,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM alpine:3.15 +FROM alpine:3.18 -RUN apk update && apk add chrony && \ +RUN apk add --no-cache chrony && \ chown -R chrony:chrony /var/lib/chrony COPY ./chrony.conf /etc/chrony/chrony.conf diff --git a/docker/simulator/Dockerfile b/docker/simulator/Dockerfile index 4f1a738..c284db6 100644 --- a/docker/simulator/Dockerfile +++ b/docker/simulator/Dockerfile @@ -17,7 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM python:3.9-alpine +FROM python:3.11-alpine RUN pip3 install urllib3 paho-mqtt COPY ./device_simulator.py /opt/bite/device_simulator.py diff --git a/docker/simulator/device_simulator.py b/docker/simulator/device_simulator.py index 13a5c17..682a6ad 100755 --- a/docker/simulator/device_simulator.py +++ b/docker/simulator/device_simulator.py @@ -94,7 +94,7 @@ def main(): parser.add_argument('-s', '--serial', default=os.environ.get('IOT_SERIAL'), help='IoT device serial number') - parser.add_argument('-d', '--delay', metavar='s', type=int, + parser.add_argument('-d', '--delay', metavar='s', type=float, default=os.environ.get('IOT_DELAY', 10), help='Delay between requests') args = parser.parse_args() diff --git a/requirements.txt b/requirements.txt index 27023fb..9d18c6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,8 @@ djangorestframework django-health-check psycopg2-binary paho-mqtt -asyncio-mqtt +kafka-python +aiomqtt PyYAML uritemplate pygments From 7e689eca23a6eab2f7423e6c5b0b577769d1de0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 23:30:03 +0200 Subject: [PATCH 5/6] Update README --- README.md | 11 +++-------- requirements-dev.txt | 1 + 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b290866..42a227d 100644 --- a/README.md +++ b/README.md @@ -13,13 +13,6 @@ This project is for educational purposes only. It does not implement any authentication and/or encryption protocol, so it is not suitable for real production. -![Application Schema](./docs/application_chart.png) - -### Future implementations - -- Broker HA via [VerneMQ clustering](https://docs.vernemq.com/clustering/introduction) -- Stream analytics via [Apache Spark](https://spark.apache.org/) - ## Installation ### Requirements @@ -36,8 +29,10 @@ The application stack is composed by the following components: - [Django](https://www.djangoproject.com/) with [Django REST framework](https://www.django-rest-framework.org/) web application (running via `gunicorn` in production mode) - - `mqtt-to-db` custom daemon to dump telemetry into the timeseries database + - `dispatcher` custom daemon to dump telemetry into the Kafka queue + - `handler` custom daemon to dump telemetry into the timeseries database from the Kafka queue - telemetry payload is stored as json object (via PostgreSQL JSON data type) +- [Kafka](https://kafka.apache.org/) broker - [Timescale](https://www.timescale.com/) DB, a [PostgreSQL](https://www.postgresql.org/) database with a timeseries extension - [Mosquitto](https://mosquitto.org/) MQTT broker (see alternatives below) diff --git a/requirements-dev.txt b/requirements-dev.txt index f37be1f..199768c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,3 +4,4 @@ ipython flake8 pyinstrument django-debug-toolbar +urllib3 From 465870a9c5d7a14994f5a41ea13cc30bc602d06e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Fri, 8 Sep 2023 23:31:51 +0200 Subject: [PATCH 6/6] Update README --- README.md | 2 +- docs/.badges/python.svg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 42a227d..e9a2426 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Playing with IoT [![Build Status](https://travis-ci.com/daniviga/bite.svg?branch=master)](https://travis-ci.com/daniviga/bite) ![AGPLv3](./docs/.badges/agpl3.svg) -![Python 3.9](./docs/.badges/python.svg) +![Python 3.11](./docs/.badges/python.svg) ![MQTT](./docs/.badges/mqtt.svg) ![Moby](./docs/.badges/moby.svg) ![docker-compose 3.7+](./docs/.badges/docker-compose.svg) diff --git a/docs/.badges/python.svg b/docs/.badges/python.svg index 0546923..599d97f 100644 --- a/docs/.badges/python.svg +++ b/docs/.badges/python.svg @@ -1 +1 @@ -pythonpython3.93.9 +pythonpython3.113.11