diff --git a/README.md b/README.md index 8017c6b..6246779 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Playing with IoT [![Build Status](https://travis-ci.com/daniviga/bite.svg?branch=master)](https://travis-ci.com/daniviga/bite) ![AGPLv3](./docs/.badges/agpl3.svg) -![Python 3.9](./docs/.badges/python.svg) +![Python 3.11](./docs/.badges/python.svg) ![MQTT](./docs/.badges/mqtt.svg) ![Moby](./docs/.badges/moby.svg) ![docker-compose 3.7+](./docs/.badges/docker-compose.svg) @@ -15,11 +15,6 @@ production. ![Application Schema](./docs/application_chart.png) -### Future implementations - -- Broker HA via [VerneMQ clustering](https://docs.vernemq.com/clustering/introduction) -- Stream analytics via [Apache Spark](https://spark.apache.org/) - ## Installation ### Requirements @@ -62,8 +57,10 @@ The application stack is composed by the following components: - [Django](https://www.djangoproject.com/) with [Django REST framework](https://www.django-rest-framework.org/) web application (running via `gunicorn` in production mode) - - `mqtt-to-db` custom daemon to dump telemetry into the timeseries database + - `dispatcher` custom daemon to dump telemetry into the Kafka queue + - `handler` custom daemon to dump telemetry into the timeseries database from the Kafka queue - telemetry payload is stored as json object (via PostgreSQL JSON data type) +- [Kafka](https://kafka.apache.org/) broker - [Timescale](https://www.timescale.com/) DB, a [PostgreSQL](https://www.postgresql.org/) database with a timeseries extension - [Mosquitto](https://mosquitto.org/) MQTT broker (see alternatives below) diff --git a/arduino/tempLightSensor/tempLightSensor.ino b/arduino/tempLightSensor/tempLightSensor.ino index 5891ca1..63c6d8a 100644 --- a/arduino/tempLightSensor/tempLightSensor.ino +++ b/arduino/tempLightSensor/tempLightSensor.ino @@ -55,7 +55,7 @@ struct netConfig { } config; char serial[9]; -const String apiURL = "/api/device/subscribe/"; +const String dpsURL = "/dps/device/subscribe/"; const String telemetryURL = "/telemetry/"; void setup(void) { @@ -63,7 +63,7 @@ void setup(void) { analogReference(EXTERNAL); - StaticJsonDocument<20> api; + StaticJsonDocument<20> dps; byte mac[6]; int eeAddress = 0; @@ -110,8 +110,8 @@ void setup(void) { Serial.println("DEBUG: clock updated via NTP."); #endif - api["serial"] = serial; - postData(config, apiURL, api); + dps["serial"] = serial; + postData(config, dpsURL, dps); telemetry["device"] = serial; // payload["id"] = serverName; diff --git a/bite/bite/local_settings.py.sample b/bite/bite/local_settings.py.sample new file mode 100644 index 0000000..1c573b2 --- /dev/null +++ b/bite/bite/local_settings.py.sample @@ -0,0 +1,20 @@ +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': 'bite', + 'USER': 'bite', + 'PASSWORD': 'password', + 'HOST': 'localhost', + 'PORT': '5432', + } +} + +MQTT_BROKER = { + 'HOST': 'localhost', + 'PORT': '1883', +} + +KAFKA_BROKER = { + 'HOST': 'localhost', + 'PORT': '29092', +} diff --git a/bite/bite/settings.py b/bite/bite/settings.py index 363dfb4..8014f2c 100644 --- a/bite/bite/settings.py +++ b/bite/bite/settings.py @@ -61,7 +61,7 @@ INSTALLED_APPS = [ # 'health_check.storage', 'rest_framework', 'bite', - 'api', + 'dps', 'telemetry', ] @@ -151,6 +151,10 @@ STATIC_URL = '/static/' STATIC_ROOT = '/srv/appdata/bite/static' +REST_FRAMEWORK = { + 'DEFAULT_AUTHENTICATION_CLASSES': [] +} + SKIP_WHITELIST = True MQTT_BROKER = { @@ -158,11 +162,17 @@ MQTT_BROKER = { 'PORT': '1883', } -# If no local_settings.py is availble in the current folder let's try to -# load it from the application root +KAFKA_BROKER = { + 'HOST': 'kafka', + 'PORT': '9092', +} + +try: + from bite.local_settings import * +except ImportError: + pass + try: from bite.production import * except ImportError: - # If a local_setting.py does not exist - # settings in this file only will be used pass diff --git a/bite/bite/urls.py b/bite/bite/urls.py index 878546a..b5ee9a7 100644 --- a/bite/bite/urls.py +++ b/bite/bite/urls.py @@ -37,13 +37,13 @@ from django.contrib import admin from django.conf import settings from django.urls import include, path -from api import urls as api_urls +from dps import urls as dps_urls from telemetry import urls as telemetry_urls urlpatterns = [ path('admin/', admin.site.urls), path('ht/', include('health_check.urls')), - path('api/', include(api_urls)), + path('dps/', include(dps_urls)), path('telemetry/', include(telemetry_urls)), ] diff --git a/bite/api/__init__.py b/bite/dps/__init__.py similarity index 100% rename from bite/api/__init__.py rename to bite/dps/__init__.py diff --git a/bite/api/admin.py b/bite/dps/admin.py similarity index 97% rename from bite/api/admin.py rename to bite/dps/admin.py index 9a83191..bd56cb9 100644 --- a/bite/api/admin.py +++ b/bite/dps/admin.py @@ -18,7 +18,7 @@ # along with this program. If not, see . from django.contrib import admin -from api.models import Device, WhiteList +from dps.models import Device, WhiteList @admin.register(Device) diff --git a/bite/api/apps.py b/bite/dps/apps.py similarity index 94% rename from bite/api/apps.py rename to bite/dps/apps.py index 63e4e00..5090321 100644 --- a/bite/api/apps.py +++ b/bite/dps/apps.py @@ -20,5 +20,5 @@ from django.apps import AppConfig -class ApiConfig(AppConfig): - name = 'api' +class DPSConfig(AppConfig): + name = 'dps' diff --git a/bite/api/migrations/0001_initial.py b/bite/dps/migrations/0001_initial.py similarity index 94% rename from bite/api/migrations/0001_initial.py rename to bite/dps/migrations/0001_initial.py index 0276db1..269131a 100644 --- a/bite/api/migrations/0001_initial.py +++ b/bite/dps/migrations/0001_initial.py @@ -1,6 +1,6 @@ # Generated by Django 3.1.3 on 2021-03-19 08:08 -import api.models +import dps.models from django.db import migrations, models import uuid @@ -16,7 +16,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name='Device', fields=[ - ('serial', models.CharField(max_length=128, unique=True, validators=[api.models.device_validation])), + ('serial', models.CharField(max_length=128, unique=True, validators=[dps.models.device_validation])), ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), ('creation_time', models.DateTimeField(auto_now_add=True)), ('updated_time', models.DateTimeField(auto_now=True)), diff --git a/bite/api/migrations/__init__.py b/bite/dps/migrations/__init__.py similarity index 100% rename from bite/api/migrations/__init__.py rename to bite/dps/migrations/__init__.py diff --git a/bite/api/models.py b/bite/dps/models.py similarity index 100% rename from bite/api/models.py rename to bite/dps/models.py diff --git a/bite/api/serializers.py b/bite/dps/serializers.py similarity index 96% rename from bite/api/serializers.py rename to bite/dps/serializers.py index 876f195..91ad6d4 100644 --- a/bite/api/serializers.py +++ b/bite/dps/serializers.py @@ -18,7 +18,7 @@ # along with this program. If not, see . from rest_framework import serializers -from api.models import Device, device_validation +from dps.models import Device, device_validation class DeviceSerializer(serializers.ModelSerializer): diff --git a/bite/api/tests.py b/bite/dps/tests.py similarity index 81% rename from bite/api/tests.py rename to bite/dps/tests.py index 1b787c4..2a030e1 100644 --- a/bite/api/tests.py +++ b/bite/dps/tests.py @@ -18,10 +18,10 @@ # along with this program. If not, see . from django.test import TestCase, Client -from api.models import Device, WhiteList +from dps.models import Device, WhiteList -class ApiTestCase(TestCase): +class DPSTestCase(TestCase): c = Client() def setUp(self): @@ -29,17 +29,17 @@ class ApiTestCase(TestCase): Device.objects.create(serial='test1234') def test_no_whitelist(self): - response = self.c.post('/api/device/subscribe/', + response = self.c.post('/dps/device/provision/', {'serial': 'test12345'}) self.assertEqual(response.status_code, 400) - def test_subscribe_post(self): + def test_provision_post(self): WhiteList.objects.create(serial='test12345') - response = self.c.post('/api/device/subscribe/', + response = self.c.post('/dps/device/provision/', {'serial': 'test12345'}) self.assertEqual(response.status_code, 201) - def test_subscribe_get(self): - response = self.c.get('/api/device/list/') + def test_provision_get(self): + response = self.c.get('/dps/device/list/') self.assertEqual( response.json()[0]['serial'], 'test1234') diff --git a/bite/api/tests/sample.json b/bite/dps/tests/sample.json similarity index 100% rename from bite/api/tests/sample.json rename to bite/dps/tests/sample.json diff --git a/bite/api/urls.py b/bite/dps/urls.py similarity index 88% rename from bite/api/urls.py rename to bite/dps/urls.py index cdcae17..6272de4 100644 --- a/bite/api/urls.py +++ b/bite/dps/urls.py @@ -33,13 +33,13 @@ Including another URLconf 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.urls import path -from api.views import APISubscribe +from dps.views import DPS urlpatterns = [ - path('device/subscribe/', - APISubscribe.as_view({'post': 'create'}), - name='device-subscribe'), + path('device/provision/', + DPS.as_view({'post': 'create'}), + name='device-provision'), path('device/list/', - APISubscribe.as_view({'get': 'list'}), + DPS.as_view({'get': 'list'}), name='device-list'), ] diff --git a/bite/api/views.py b/bite/dps/views.py similarity index 89% rename from bite/api/views.py rename to bite/dps/views.py index 4705e69..3c31e7c 100644 --- a/bite/api/views.py +++ b/bite/dps/views.py @@ -19,10 +19,10 @@ from rest_framework.viewsets import ModelViewSet -from api.models import Device -from api.serializers import DeviceSerializer +from dps.models import Device +from dps.serializers import DeviceSerializer -class APISubscribe(ModelViewSet): +class DPS(ModelViewSet): queryset = Device.objects.all() serializer_class = DeviceSerializer diff --git a/bite/telemetry/management/commands/mqtt-to-db.py b/bite/telemetry/management/commands/dispatcher.py similarity index 53% rename from bite/telemetry/management/commands/mqtt-to-db.py rename to bite/telemetry/management/commands/dispatcher.py index a161519..abed5ae 100644 --- a/bite/telemetry/management/commands/mqtt-to-db.py +++ b/bite/telemetry/management/commands/dispatcher.py @@ -22,22 +22,26 @@ import asyncio import json import time import paho.mqtt.client as mqtt +from kafka import KafkaProducer +from kafka.errors import NoBrokersAvailable from asgiref.sync import sync_to_async -from asyncio_mqtt import Client +from aiomqtt import Client from django.conf import settings from django.core.management.base import BaseCommand from django.core.exceptions import ObjectDoesNotExist -from api.models import Device -from telemetry.models import Telemetry - -MQTT_HOST = settings.MQTT_BROKER['HOST'] -MQTT_PORT = int(settings.MQTT_BROKER['PORT']) +from dps.models import Device class Command(BaseCommand): - help = 'MQTT to DB deamon' + help = "Telemetry dispatcher" + + MQTT_HOST = settings.MQTT_BROKER["HOST"] + MQTT_PORT = int(settings.MQTT_BROKER["PORT"]) + KAFKA_HOST = settings.KAFKA_BROKER["HOST"] + KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"]) + producer = None @sync_to_async def get_device(self, serial): @@ -47,40 +51,54 @@ class Command(BaseCommand): return None @sync_to_async - def store_telemetry(self, device, payload): - Telemetry.objects.create( - device=device, - transport='mqtt', - clock=payload['clock'], - payload=payload['payload'] - ) + def dispatch(self, message): + self.producer.send("telemetry", {"transport": "mqtt", "body": message}) async def mqtt_broker(self): - async with Client(MQTT_HOST, port=MQTT_PORT) as client: + async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: # use shared subscription for HA/balancing await client.subscribe("$share/telemetry/#") - async with client.unfiltered_messages() as messages: + async with client.messages() as messages: async for message in messages: - payload = json.loads(message.payload.decode('utf-8')) device = await self.get_device(message.topic) if device is not None: - await self.store_telemetry(device, payload) + message_body = json.loads( + message.payload.decode("utf-8") + ) + await self.dispatch(message_body) else: self.stdout.write( - self.style.ERROR( - 'DEBUG: message discarded')) + self.style.ERROR("DEBUG: message discarded") + ) def handle(self, *args, **options): client = mqtt.Client() while True: try: - client.connect(MQTT_HOST, MQTT_PORT) + client.connect(self.MQTT_HOST, self.MQTT_PORT) break except (socket.gaierror, ConnectionRefusedError): self.stdout.write( - self.style.WARNING('WARNING: Broker not available')) + self.style.WARNING("WARNING: MQTT broker not available") + ) time.sleep(5) - self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed')) + while True: + try: + self.producer = KafkaProducer( + bootstrap_servers="{}:{}".format( + self.KAFKA_HOST, self.KAFKA_PORT + ), + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + retries=5, + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING("WARNING: Kafka broker not available") + ) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS("INFO: Brokers subscribed")) client.disconnect() asyncio.run(self.mqtt_broker()) diff --git a/bite/telemetry/management/commands/handler.py b/bite/telemetry/management/commands/handler.py new file mode 100644 index 0000000..2502ee5 --- /dev/null +++ b/bite/telemetry/management/commands/handler.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# BITE - A Basic/IoT/Example +# Copyright (C) 2020-2021 Daniele ViganĂ² +# +# BITE is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# BITE is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import json +import time +from kafka import KafkaConsumer +from kafka.errors import NoBrokersAvailable + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.core.exceptions import ObjectDoesNotExist + +from dps.models import Device +from telemetry.models import Telemetry + + +class Command(BaseCommand): + help = "Telemetry handler" + + KAFKA_HOST = settings.KAFKA_BROKER["HOST"] + KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"]) + + def get_device(self, serial): + try: + return Device.objects.get(serial=serial) + except ObjectDoesNotExist: + return None + + def store_telemetry(self, transport, message): + Telemetry.objects.create( + transport=transport, + device=self.get_device(message["device"]), + clock=message["clock"], + payload=message["payload"], + ) + + def handle(self, *args, **options): + while True: + try: + consumer = KafkaConsumer( + "telemetry", + bootstrap_servers="{}:{}".format( + self.KAFKA_HOST, self.KAFKA_PORT + ), + group_id="handler", + value_deserializer=lambda m: json.loads(m.decode("utf8")), + ) + break + except NoBrokersAvailable: + self.stdout.write( + self.style.WARNING("WARNING: Kafka broker not available") + ) + time.sleep(5) + + self.stdout.write(self.style.SUCCESS("INFO: Kafka broker subscribed")) + for message in consumer: + self.store_telemetry( + message.value["transport"], message.value["body"] + ) + consumer.unsuscribe() diff --git a/bite/telemetry/migrations/0001_initial.py b/bite/telemetry/migrations/0001_initial.py index 9a6222a..8d30fba 100644 --- a/bite/telemetry/migrations/0001_initial.py +++ b/bite/telemetry/migrations/0001_initial.py @@ -11,7 +11,7 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('api', '0001_initial'), + ('dps', '0001_initial'), ] operations = [ @@ -23,7 +23,7 @@ class Migration(migrations.Migration): ('transport', models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4)), ('clock', models.IntegerField(null=True, validators=[django.core.validators.MinValueValidator(0)])), ('payload', models.JSONField(validators=[telemetry.models.telemetry_validation])), - ('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api.device')), + ('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='dps.device')), ], options={ 'verbose_name_plural': 'Telemetry', diff --git a/bite/telemetry/models.py b/bite/telemetry/models.py index c7cafe9..3f7cec7 100644 --- a/bite/telemetry/models.py +++ b/bite/telemetry/models.py @@ -21,7 +21,7 @@ from django.db import models from django.core.validators import MinValueValidator from django.core.exceptions import ValidationError -from api.models import Device +from dps.models import Device def telemetry_validation(value): diff --git a/bite/telemetry/serializers.py b/bite/telemetry/serializers.py index d657682..251762e 100644 --- a/bite/telemetry/serializers.py +++ b/bite/telemetry/serializers.py @@ -18,7 +18,7 @@ # along with this program. If not, see . from rest_framework import serializers -from api.models import Device +from dps.models import Device from telemetry.models import Telemetry diff --git a/bite/telemetry/tests.py b/bite/telemetry/tests.py index f9f8034..468e50e 100644 --- a/bite/telemetry/tests.py +++ b/bite/telemetry/tests.py @@ -19,7 +19,7 @@ import json from django.test import TestCase, Client -from api.models import Device, WhiteList +from dps.models import Device, WhiteList class ApiTestCase(TestCase): diff --git a/docker/django/Dockerfile b/docker/django/Dockerfile index bbae1ff..417c3e9 100644 --- a/docker/django/Dockerfile +++ b/docker/django/Dockerfile @@ -17,20 +17,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM python:3.9-alpine AS builder +FROM python:3.11-alpine AS builder RUN apk update && apk add gcc musl-dev postgresql-dev \ && pip install psycopg2-binary # --- -FROM python:3.9-alpine +FROM python:3.11-alpine ENV PYTHONUNBUFFERED 1 ENV DJANGO_SETTINGS_MODULE "bite.settings" RUN apk update && apk add --no-cache postgresql-libs \ - && wget https://github.com/jwilder/dockerize/releases/download/v0.6.1/dockerize-alpine-linux-amd64-v0.6.1.tar.gz -qO- \ + && wget https://github.com/jwilder/dockerize/releases/download/v0.7.0/dockerize-alpine-linux-amd64-v0.7.0.tar.gz -qO- \ | tar -xz -C /usr/local/bin -COPY --from=builder /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/ +COPY --from=builder /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/ COPY --chown=1000:1000 requirements.txt /srv/app/bite/requirements.txt RUN pip3 install --no-cache-dir -r /srv/app/bite/requirements.txt diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 07d4da1..7b75d4a 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -36,6 +36,13 @@ services: ports: - "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000" + kafka: + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + ports: + - "${CUSTOM_DOCKER_IP:-0.0.0.0}:29092:29092" + data-migration: volumes: - ../bite:/srv/app/bite @@ -44,6 +51,10 @@ services: volumes: - ../bite:/srv/app/bite - mqtt-to-db: + dispatcher: + volumes: + - ../bite:/srv/app/bite + + handler: volumes: - ../bite:/srv/app/bite diff --git a/docker/docker-compose.prod.yml b/docker/docker-compose.prod.yml index 4598288..5aee5cb 100644 --- a/docker/docker-compose.prod.yml +++ b/docker/docker-compose.prod.yml @@ -29,6 +29,10 @@ services: volumes: - ./django/production.py.sample:/srv/app/bite/bite/production.py - mqtt-to-db: + dispatcher: + volumes: + - ./django/production.py.sample:/srv/app/bite/bite/production.py + + handler: volumes: - ./django/production.py.sample:/srv/app/bite/bite/production.py diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a099e28..12149ed 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -62,6 +62,28 @@ services: ports: - "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883" + zookeeper: + image: confluentinc/cp-zookeeper:latest + networks: + - net + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + networks: + - net + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ingress: <<: *service_default image: nginx:stable-alpine @@ -104,13 +126,21 @@ services: - "staticdata:/srv/appdata/bite/static:U" # REMOVE ':U' ON MOBY/DOCKER command: ["python3", "manage.py", "collectstatic", "--noinput"] - mqtt-to-db: + dispatcher: <<: *service_default image: daniviga/bite - command: ["python3", "manage.py", "mqtt-to-db"] + command: ["python3", "manage.py", "dispatcher"] + networks: + - net + depends_on: + - broker + + handler: + <<: *service_default + image: daniviga/bite + command: ["python3", "manage.py", "handler"] networks: - net depends_on: - data-migration - timescale - - broker diff --git a/docker/ntpd/Dockerfile b/docker/ntpd/Dockerfile index 6b50ff6..2c1e570 100644 --- a/docker/ntpd/Dockerfile +++ b/docker/ntpd/Dockerfile @@ -17,9 +17,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM alpine:3.15 +FROM alpine:3.18 -RUN apk update && apk add chrony && \ +RUN apk add --no-cache chrony && \ chown -R chrony:chrony /var/lib/chrony COPY ./chrony.conf /etc/chrony/chrony.conf diff --git a/docker/simulator/Dockerfile b/docker/simulator/Dockerfile index 4f1a738..c284db6 100644 --- a/docker/simulator/Dockerfile +++ b/docker/simulator/Dockerfile @@ -17,7 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -FROM python:3.9-alpine +FROM python:3.11-alpine RUN pip3 install urllib3 paho-mqtt COPY ./device_simulator.py /opt/bite/device_simulator.py diff --git a/docker/simulator/device_simulator.py b/docker/simulator/device_simulator.py index 13a5c17..822aade 100755 --- a/docker/simulator/device_simulator.py +++ b/docker/simulator/device_simulator.py @@ -29,7 +29,7 @@ import argparse from time import sleep import paho.mqtt.publish as publish -DEBUG = bool(os.environ.get('IOT_DEBUG', False)) +DEBUG = bool(os.environ.get("IOT_DEBUG", False)) http = urllib3.PoolManager() @@ -39,15 +39,16 @@ def post_json(endpoint, url, data): if DEBUG: print(json_data) - encoded_data = json_data.encode('utf8') + encoded_data = json_data.encode("utf8") while True: try: r = http.request( - 'POST', + "POST", endpoint + url, body=encoded_data, - headers={'content-type': 'application/json'}) + headers={"content-type": "application/json"}, + ) return r except urllib3.exceptions.MaxRetryError: pass @@ -57,74 +58,89 @@ def post_json(endpoint, url, data): def publish_json(transport, endpoint, data): json_data = json.dumps(data) - serial = data['device'] + serial = data["device"] if DEBUG: print(json_data) - encoded_data = json_data.encode('utf8') + encoded_data = json_data.encode("utf8") publish.single( topic=serial, payload=encoded_data, - hostname=endpoint.split(':')[0], - port=int(endpoint.split(':')[1]), + hostname=endpoint.split(":")[0], + port=int(endpoint.split(":")[1]), client_id=serial, - transport=('websockets' if transport == 'ws' else 'tcp'), + transport=("websockets" if transport == "ws" else "tcp"), # auth=auth FIXME ) def main(): - parser = argparse.ArgumentParser( - description='IoT simulator oprions') + parser = argparse.ArgumentParser(description="IoT simulator oprions") - parser.add_argument('-e', '--endpoint', - default=os.environ.get('IOT_HTTP', - 'http://127.0.0.1:8000'), - help='IoT HTTP endpoint') - parser.add_argument('-m', '--mqtt', - default=os.environ.get('IOT_MQTT', - '127.0.0.1:1883'), - help='IoT MQTT endpoint') - parser.add_argument('-t', '--transport', - choices=['mqtt', 'ws', 'http'], - default=os.environ.get('IOT_TL', 'http'), - help='IoT transport layer') - parser.add_argument('-s', '--serial', - default=os.environ.get('IOT_SERIAL'), - help='IoT device serial number') - parser.add_argument('-d', '--delay', metavar='s', type=int, - default=os.environ.get('IOT_DELAY', 10), - help='Delay between requests') + parser.add_argument( + "-e", + "--endpoint", + default=os.environ.get("IOT_HTTP", "http://127.0.0.1:8000"), + help="IoT HTTP endpoint", + ) + parser.add_argument( + "-m", + "--mqtt", + default=os.environ.get("IOT_MQTT", "127.0.0.1:1883"), + help="IoT MQTT endpoint", + ) + parser.add_argument( + "-t", + "--transport", + choices=["mqtt", "ws", "http"], + default=os.environ.get("IOT_TL", "http"), + help="IoT transport layer", + ) + parser.add_argument( + "-s", + "--serial", + default=os.environ.get("IOT_SERIAL"), + help="IoT device serial number", + ) + parser.add_argument( + "-d", + "--delay", + metavar="s", + type=float, + default=os.environ.get("IOT_DELAY", 10), + help="Delay between requests", + ) args = parser.parse_args() - subscribe = '/api/device/subscribe/' - telemetry = '/telemetry/' + dps = "/dps/device/provision/" + telemetry = "/telemetry/" if args.serial is None: - args.serial = ''.join( - random.choices(string.ascii_lowercase + string.digits, k=8)) + args.serial = "".join( + random.choices(string.ascii_lowercase + string.digits, k=8) + ) - data = {'serial': args.serial} - post_json(args.endpoint, subscribe, data) + data = {"serial": args.serial} + post_json(args.endpoint, dps, data) while True: data = { - 'device': args.serial, - 'clock': int(datetime.datetime.now().timestamp()), + "device": args.serial, + "clock": int(datetime.datetime.now().timestamp()), } payload = { - 'id': 'device_simulator', - 'light': random.randint(300, 500), - 'temperature': { - 'celsius': round(random.uniform(20, 28), 1)} + "id": "device_simulator", + "light": random.randint(300, 500), + "temperature": {"celsius": round(random.uniform(20, 28), 1)}, } - if args.transport == 'http': - post_json(args.endpoint, telemetry, {**data, 'payload': payload}) - elif args.transport in ('mqtt', 'ws'): + if args.transport == "http": + post_json(args.endpoint, telemetry, {**data, "payload": payload}) + elif args.transport in ("mqtt", "ws"): publish_json( - args.transport, args.mqtt, {**data, 'payload': payload}) + args.transport, args.mqtt, {**data, "payload": payload} + ) else: raise NotImplementedError sleep(args.delay) diff --git a/docs/.badges/python.svg b/docs/.badges/python.svg index 0546923..599d97f 100644 --- a/docs/.badges/python.svg +++ b/docs/.badges/python.svg @@ -1 +1 @@ -pythonpython3.93.9 +pythonpython3.113.11 diff --git a/docs/application_chart.odg b/docs/application_chart.odg index e5d4a33..e7ca904 100644 Binary files a/docs/application_chart.odg and b/docs/application_chart.odg differ diff --git a/docs/application_chart.png b/docs/application_chart.png index 5ba6e24..1d02671 100644 Binary files a/docs/application_chart.png and b/docs/application_chart.png differ diff --git a/esp32/rssiHall/rssiHall.ino b/esp32/rssiHall/rssiHall.ino index 8059ddb..378418a 100644 --- a/esp32/rssiHall/rssiHall.ino +++ b/esp32/rssiHall/rssiHall.ino @@ -55,13 +55,13 @@ struct netConfig { } config; char* serial; -const String apiURL = "/api/device/subscribe/"; +const String dpsURL = "/dps/device/subscribe/"; const String telemetryURL = "/telemetry/"; void setup(void) { Serial.begin(115200); - StaticJsonDocument<64> api; + StaticJsonDocument<64> dps; preferences.begin("iot"); // Get the serial number from flash @@ -117,8 +117,8 @@ void setup(void) { Serial.println("DEBUG: clock updated via NTP."); #endif - api["serial"] = serial; - postData(config, apiURL, api); + dps["serial"] = serial; + postData(config, dpsURL, dps); telemetry["device"] = serial; // payload["id"] = serverName; diff --git a/requirements-dev.txt b/requirements-dev.txt index f37be1f..199768c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,3 +4,4 @@ ipython flake8 pyinstrument django-debug-toolbar +urllib3 diff --git a/requirements.txt b/requirements.txt index 27023fb..9d18c6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,8 @@ djangorestframework django-health-check psycopg2-binary paho-mqtt -asyncio-mqtt +kafka-python +aiomqtt PyYAML uritemplate pygments