1
0
mirror of https://github.com/daniviga/bite.git synced 2024-11-25 06:16:13 +01:00

Compare commits

..

No commits in common. "b4b6294aa7b9a8dd30f98f95507331ee167d0152" and "cc93c5ae7596f1553f78d4e35bcff7f643844396" have entirely different histories.

35 changed files with 130 additions and 314 deletions

View File

@ -4,7 +4,7 @@ Playing with IoT
[![Build Status](https://travis-ci.com/daniviga/bite.svg?branch=master)](https://travis-ci.com/daniviga/bite) [![Build Status](https://travis-ci.com/daniviga/bite.svg?branch=master)](https://travis-ci.com/daniviga/bite)
![AGPLv3](./docs/.badges/agpl3.svg) ![AGPLv3](./docs/.badges/agpl3.svg)
![Python 3.11](./docs/.badges/python.svg) ![Python 3.9](./docs/.badges/python.svg)
![MQTT](./docs/.badges/mqtt.svg) ![MQTT](./docs/.badges/mqtt.svg)
![Moby](./docs/.badges/moby.svg) ![Moby](./docs/.badges/moby.svg)
![docker-compose 3.7+](./docs/.badges/docker-compose.svg) ![docker-compose 3.7+](./docs/.badges/docker-compose.svg)
@ -15,6 +15,11 @@ production.
![Application Schema](./docs/application_chart.png) ![Application Schema](./docs/application_chart.png)
### Future implementations
- Broker HA via [VerneMQ clustering](https://docs.vernemq.com/clustering/introduction)
- Stream analytics via [Apache Spark](https://spark.apache.org/)
## Installation ## Installation
### Requirements ### Requirements
@ -57,10 +62,8 @@ The application stack is composed by the following components:
- [Django](https://www.djangoproject.com/) with - [Django](https://www.djangoproject.com/) with
[Django REST framework](https://www.django-rest-framework.org/) [Django REST framework](https://www.django-rest-framework.org/)
web application (running via `gunicorn` in production mode) web application (running via `gunicorn` in production mode)
- `dispatcher` custom daemon to dump telemetry into the Kafka queue - `mqtt-to-db` custom daemon to dump telemetry into the timeseries database
- `handler` custom daemon to dump telemetry into the timeseries database from the Kafka queue
- telemetry payload is stored as json object (via PostgreSQL JSON data type) - telemetry payload is stored as json object (via PostgreSQL JSON data type)
- [Kafka](https://kafka.apache.org/) broker
- [Timescale](https://www.timescale.com/) DB, - [Timescale](https://www.timescale.com/) DB,
a [PostgreSQL](https://www.postgresql.org/) database with a timeseries extension a [PostgreSQL](https://www.postgresql.org/) database with a timeseries extension
- [Mosquitto](https://mosquitto.org/) MQTT broker (see alternatives below) - [Mosquitto](https://mosquitto.org/) MQTT broker (see alternatives below)

View File

@ -55,7 +55,7 @@ struct netConfig {
} config; } config;
char serial[9]; char serial[9];
const String dpsURL = "/dps/device/subscribe/"; const String apiURL = "/api/device/subscribe/";
const String telemetryURL = "/telemetry/"; const String telemetryURL = "/telemetry/";
void setup(void) { void setup(void) {
@ -63,7 +63,7 @@ void setup(void) {
analogReference(EXTERNAL); analogReference(EXTERNAL);
StaticJsonDocument<20> dps; StaticJsonDocument<20> api;
byte mac[6]; byte mac[6];
int eeAddress = 0; int eeAddress = 0;
@ -110,8 +110,8 @@ void setup(void) {
Serial.println("DEBUG: clock updated via NTP."); Serial.println("DEBUG: clock updated via NTP.");
#endif #endif
dps["serial"] = serial; api["serial"] = serial;
postData(config, dpsURL, dps); postData(config, apiURL, api);
telemetry["device"] = serial; telemetry["device"] = serial;
// payload["id"] = serverName; // payload["id"] = serverName;

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import admin from django.contrib import admin
from dps.models import Device, WhiteList from api.models import Device, WhiteList
@admin.register(Device) @admin.register(Device)

View File

@ -20,5 +20,5 @@
from django.apps import AppConfig from django.apps import AppConfig
class DPSConfig(AppConfig): class ApiConfig(AppConfig):
name = 'dps' name = 'api'

View File

@ -1,6 +1,6 @@
# Generated by Django 3.1.3 on 2021-03-19 08:08 # Generated by Django 3.1.3 on 2021-03-19 08:08
import dps.models import api.models
from django.db import migrations, models from django.db import migrations, models
import uuid import uuid
@ -16,7 +16,7 @@ class Migration(migrations.Migration):
migrations.CreateModel( migrations.CreateModel(
name='Device', name='Device',
fields=[ fields=[
('serial', models.CharField(max_length=128, unique=True, validators=[dps.models.device_validation])), ('serial', models.CharField(max_length=128, unique=True, validators=[api.models.device_validation])),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('creation_time', models.DateTimeField(auto_now_add=True)), ('creation_time', models.DateTimeField(auto_now_add=True)),
('updated_time', models.DateTimeField(auto_now=True)), ('updated_time', models.DateTimeField(auto_now=True)),

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers from rest_framework import serializers
from dps.models import Device, device_validation from api.models import Device, device_validation
class DeviceSerializer(serializers.ModelSerializer): class DeviceSerializer(serializers.ModelSerializer):

View File

@ -18,10 +18,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.test import TestCase, Client from django.test import TestCase, Client
from dps.models import Device, WhiteList from api.models import Device, WhiteList
class DPSTestCase(TestCase): class ApiTestCase(TestCase):
c = Client() c = Client()
def setUp(self): def setUp(self):
@ -29,17 +29,17 @@ class DPSTestCase(TestCase):
Device.objects.create(serial='test1234') Device.objects.create(serial='test1234')
def test_no_whitelist(self): def test_no_whitelist(self):
response = self.c.post('/dps/device/provision/', response = self.c.post('/api/device/subscribe/',
{'serial': 'test12345'}) {'serial': 'test12345'})
self.assertEqual(response.status_code, 400) self.assertEqual(response.status_code, 400)
def test_provision_post(self): def test_subscribe_post(self):
WhiteList.objects.create(serial='test12345') WhiteList.objects.create(serial='test12345')
response = self.c.post('/dps/device/provision/', response = self.c.post('/api/device/subscribe/',
{'serial': 'test12345'}) {'serial': 'test12345'})
self.assertEqual(response.status_code, 201) self.assertEqual(response.status_code, 201)
def test_provision_get(self): def test_subscribe_get(self):
response = self.c.get('/dps/device/list/') response = self.c.get('/api/device/list/')
self.assertEqual( self.assertEqual(
response.json()[0]['serial'], 'test1234') response.json()[0]['serial'], 'test1234')

View File

@ -33,13 +33,13 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
""" """
from django.urls import path from django.urls import path
from dps.views import DPS from api.views import APISubscribe
urlpatterns = [ urlpatterns = [
path('device/provision/', path('device/subscribe/',
DPS.as_view({'post': 'create'}), APISubscribe.as_view({'post': 'create'}),
name='device-provision'), name='device-subscribe'),
path('device/list/', path('device/list/',
DPS.as_view({'get': 'list'}), APISubscribe.as_view({'get': 'list'}),
name='device-list'), name='device-list'),
] ]

View File

@ -19,10 +19,10 @@
from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ModelViewSet
from dps.models import Device from api.models import Device
from dps.serializers import DeviceSerializer from api.serializers import DeviceSerializer
class DPS(ModelViewSet): class APISubscribe(ModelViewSet):
queryset = Device.objects.all() queryset = Device.objects.all()
serializer_class = DeviceSerializer serializer_class = DeviceSerializer

View File

@ -1,20 +0,0 @@
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'bite',
'USER': 'bite',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
}
}
MQTT_BROKER = {
'HOST': 'localhost',
'PORT': '1883',
}
KAFKA_BROKER = {
'HOST': 'localhost',
'PORT': '29092',
}

View File

@ -61,7 +61,7 @@ INSTALLED_APPS = [
# 'health_check.storage', # 'health_check.storage',
'rest_framework', 'rest_framework',
'bite', 'bite',
'dps', 'api',
'telemetry', 'telemetry',
] ]
@ -151,10 +151,6 @@ STATIC_URL = '/static/'
STATIC_ROOT = '/srv/appdata/bite/static' STATIC_ROOT = '/srv/appdata/bite/static'
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': []
}
SKIP_WHITELIST = True SKIP_WHITELIST = True
MQTT_BROKER = { MQTT_BROKER = {
@ -162,17 +158,11 @@ MQTT_BROKER = {
'PORT': '1883', 'PORT': '1883',
} }
KAFKA_BROKER = { # If no local_settings.py is availble in the current folder let's try to
'HOST': 'kafka', # load it from the application root
'PORT': '9092',
}
try:
from bite.local_settings import *
except ImportError:
pass
try: try:
from bite.production import * from bite.production import *
except ImportError: except ImportError:
# If a local_setting.py does not exist
# settings in this file only will be used
pass pass

View File

@ -37,13 +37,13 @@ from django.contrib import admin
from django.conf import settings from django.conf import settings
from django.urls import include, path from django.urls import include, path
from dps import urls as dps_urls from api import urls as api_urls
from telemetry import urls as telemetry_urls from telemetry import urls as telemetry_urls
urlpatterns = [ urlpatterns = [
path('admin/', admin.site.urls), path('admin/', admin.site.urls),
path('ht/', include('health_check.urls')), path('ht/', include('health_check.urls')),
path('dps/', include(dps_urls)), path('api/', include(api_urls)),
path('telemetry/', include(telemetry_urls)), path('telemetry/', include(telemetry_urls)),
] ]

View File

@ -1,76 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# BITE - A Basic/IoT/Example
# Copyright (C) 2020-2021 Daniele Viganò <daniele@vigano.me>
#
# BITE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# BITE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import time
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist
from dps.models import Device
from telemetry.models import Telemetry
class Command(BaseCommand):
help = "Telemetry handler"
KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
def get_device(self, serial):
try:
return Device.objects.get(serial=serial)
except ObjectDoesNotExist:
return None
def store_telemetry(self, transport, message):
Telemetry.objects.create(
transport=transport,
device=self.get_device(message["device"]),
clock=message["clock"],
payload=message["payload"],
)
def handle(self, *args, **options):
while True:
try:
consumer = KafkaConsumer(
"telemetry",
bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT
),
group_id="handler",
value_deserializer=lambda m: json.loads(m.decode("utf8")),
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5)
self.stdout.write(self.style.SUCCESS("INFO: Kafka broker subscribed"))
for message in consumer:
self.store_telemetry(
message.value["transport"], message.value["body"]
)
consumer.unsuscribe()

View File

@ -22,26 +22,22 @@ import asyncio
import json import json
import time import time
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
from asgiref.sync import sync_to_async from asgiref.sync import sync_to_async
from aiomqtt import Client from asyncio_mqtt import Client
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from dps.models import Device from api.models import Device
from telemetry.models import Telemetry
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
class Command(BaseCommand): class Command(BaseCommand):
help = "Telemetry dispatcher" help = 'MQTT to DB deamon'
MQTT_HOST = settings.MQTT_BROKER["HOST"]
MQTT_PORT = int(settings.MQTT_BROKER["PORT"])
KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
producer = None
@sync_to_async @sync_to_async
def get_device(self, serial): def get_device(self, serial):
@ -51,54 +47,40 @@ class Command(BaseCommand):
return None return None
@sync_to_async @sync_to_async
def dispatch(self, message): def store_telemetry(self, device, payload):
self.producer.send("telemetry", {"transport": "mqtt", "body": message}) Telemetry.objects.create(
device=device,
transport='mqtt',
clock=payload['clock'],
payload=payload['payload']
)
async def mqtt_broker(self): async def mqtt_broker(self):
async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client: async with Client(MQTT_HOST, port=MQTT_PORT) as client:
# use shared subscription for HA/balancing # use shared subscription for HA/balancing
await client.subscribe("$share/telemetry/#") await client.subscribe("$share/telemetry/#")
async with client.messages() as messages: async with client.unfiltered_messages() as messages:
async for message in messages: async for message in messages:
payload = json.loads(message.payload.decode('utf-8'))
device = await self.get_device(message.topic) device = await self.get_device(message.topic)
if device is not None: if device is not None:
message_body = json.loads( await self.store_telemetry(device, payload)
message.payload.decode("utf-8")
)
await self.dispatch(message_body)
else: else:
self.stdout.write( self.stdout.write(
self.style.ERROR("DEBUG: message discarded") self.style.ERROR(
) 'DEBUG: message discarded'))
def handle(self, *args, **options): def handle(self, *args, **options):
client = mqtt.Client() client = mqtt.Client()
while True: while True:
try: try:
client.connect(self.MQTT_HOST, self.MQTT_PORT) client.connect(MQTT_HOST, MQTT_PORT)
break break
except (socket.gaierror, ConnectionRefusedError): except (socket.gaierror, ConnectionRefusedError):
self.stdout.write( self.stdout.write(
self.style.WARNING("WARNING: MQTT broker not available") self.style.WARNING('WARNING: Broker not available'))
)
time.sleep(5) time.sleep(5)
while True: self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed'))
try:
self.producer = KafkaProducer(
bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT
),
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
retries=5,
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5)
self.stdout.write(self.style.SUCCESS("INFO: Brokers subscribed"))
client.disconnect() client.disconnect()
asyncio.run(self.mqtt_broker()) asyncio.run(self.mqtt_broker())

View File

@ -11,7 +11,7 @@ class Migration(migrations.Migration):
initial = True initial = True
dependencies = [ dependencies = [
('dps', '0001_initial'), ('api', '0001_initial'),
] ]
operations = [ operations = [
@ -23,7 +23,7 @@ class Migration(migrations.Migration):
('transport', models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4)), ('transport', models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4)),
('clock', models.IntegerField(null=True, validators=[django.core.validators.MinValueValidator(0)])), ('clock', models.IntegerField(null=True, validators=[django.core.validators.MinValueValidator(0)])),
('payload', models.JSONField(validators=[telemetry.models.telemetry_validation])), ('payload', models.JSONField(validators=[telemetry.models.telemetry_validation])),
('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='dps.device')), ('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api.device')),
], ],
options={ options={
'verbose_name_plural': 'Telemetry', 'verbose_name_plural': 'Telemetry',

View File

@ -21,7 +21,7 @@ from django.db import models
from django.core.validators import MinValueValidator from django.core.validators import MinValueValidator
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from dps.models import Device from api.models import Device
def telemetry_validation(value): def telemetry_validation(value):

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers from rest_framework import serializers
from dps.models import Device from api.models import Device
from telemetry.models import Telemetry from telemetry.models import Telemetry

View File

@ -19,7 +19,7 @@
import json import json
from django.test import TestCase, Client from django.test import TestCase, Client
from dps.models import Device, WhiteList from api.models import Device, WhiteList
class ApiTestCase(TestCase): class ApiTestCase(TestCase):

View File

@ -17,20 +17,20 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
FROM python:3.11-alpine AS builder FROM python:3.9-alpine AS builder
RUN apk update && apk add gcc musl-dev postgresql-dev \ RUN apk update && apk add gcc musl-dev postgresql-dev \
&& pip install psycopg2-binary && pip install psycopg2-binary
# --- # ---
FROM python:3.11-alpine FROM python:3.9-alpine
ENV PYTHONUNBUFFERED 1 ENV PYTHONUNBUFFERED 1
ENV DJANGO_SETTINGS_MODULE "bite.settings" ENV DJANGO_SETTINGS_MODULE "bite.settings"
RUN apk update && apk add --no-cache postgresql-libs \ RUN apk update && apk add --no-cache postgresql-libs \
&& wget https://github.com/jwilder/dockerize/releases/download/v0.7.0/dockerize-alpine-linux-amd64-v0.7.0.tar.gz -qO- \ && wget https://github.com/jwilder/dockerize/releases/download/v0.6.1/dockerize-alpine-linux-amd64-v0.6.1.tar.gz -qO- \
| tar -xz -C /usr/local/bin | tar -xz -C /usr/local/bin
COPY --from=builder /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/ COPY --from=builder /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/
COPY --chown=1000:1000 requirements.txt /srv/app/bite/requirements.txt COPY --chown=1000:1000 requirements.txt /srv/app/bite/requirements.txt
RUN pip3 install --no-cache-dir -r /srv/app/bite/requirements.txt RUN pip3 install --no-cache-dir -r /srv/app/bite/requirements.txt

View File

@ -36,13 +36,6 @@ services:
ports: ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000" - "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000"
kafka:
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:29092:29092"
data-migration: data-migration:
volumes: volumes:
- ../bite:/srv/app/bite - ../bite:/srv/app/bite
@ -51,10 +44,6 @@ services:
volumes: volumes:
- ../bite:/srv/app/bite - ../bite:/srv/app/bite
dispatcher: mqtt-to-db:
volumes:
- ../bite:/srv/app/bite
handler:
volumes: volumes:
- ../bite:/srv/app/bite - ../bite:/srv/app/bite

View File

@ -29,10 +29,6 @@ services:
volumes: volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py - ./django/production.py.sample:/srv/app/bite/bite/production.py
dispatcher: mqtt-to-db:
volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py
handler:
volumes: volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py - ./django/production.py.sample:/srv/app/bite/bite/production.py

View File

@ -62,28 +62,6 @@ services:
ports: ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883" - "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883"
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
networks:
- net
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ingress: ingress:
<<: *service_default <<: *service_default
image: nginx:stable-alpine image: nginx:stable-alpine
@ -126,21 +104,13 @@ services:
- "staticdata:/srv/appdata/bite/static:U" # REMOVE ':U' ON MOBY/DOCKER - "staticdata:/srv/appdata/bite/static:U" # REMOVE ':U' ON MOBY/DOCKER
command: ["python3", "manage.py", "collectstatic", "--noinput"] command: ["python3", "manage.py", "collectstatic", "--noinput"]
dispatcher: mqtt-to-db:
<<: *service_default <<: *service_default
image: daniviga/bite image: daniviga/bite
command: ["python3", "manage.py", "dispatcher"] command: ["python3", "manage.py", "mqtt-to-db"]
networks:
- net
depends_on:
- broker
handler:
<<: *service_default
image: daniviga/bite
command: ["python3", "manage.py", "handler"]
networks: networks:
- net - net
depends_on: depends_on:
- data-migration - data-migration
- timescale - timescale
- broker

View File

@ -17,9 +17,9 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
FROM alpine:3.18 FROM alpine:3.15
RUN apk add --no-cache chrony && \ RUN apk update && apk add chrony && \
chown -R chrony:chrony /var/lib/chrony chown -R chrony:chrony /var/lib/chrony
COPY ./chrony.conf /etc/chrony/chrony.conf COPY ./chrony.conf /etc/chrony/chrony.conf

View File

@ -17,7 +17,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
FROM python:3.11-alpine FROM python:3.9-alpine
RUN pip3 install urllib3 paho-mqtt RUN pip3 install urllib3 paho-mqtt
COPY ./device_simulator.py /opt/bite/device_simulator.py COPY ./device_simulator.py /opt/bite/device_simulator.py

View File

@ -29,7 +29,7 @@ import argparse
from time import sleep from time import sleep
import paho.mqtt.publish as publish import paho.mqtt.publish as publish
DEBUG = bool(os.environ.get("IOT_DEBUG", False)) DEBUG = bool(os.environ.get('IOT_DEBUG', False))
http = urllib3.PoolManager() http = urllib3.PoolManager()
@ -39,16 +39,15 @@ def post_json(endpoint, url, data):
if DEBUG: if DEBUG:
print(json_data) print(json_data)
encoded_data = json_data.encode("utf8") encoded_data = json_data.encode('utf8')
while True: while True:
try: try:
r = http.request( r = http.request(
"POST", 'POST',
endpoint + url, endpoint + url,
body=encoded_data, body=encoded_data,
headers={"content-type": "application/json"}, headers={'content-type': 'application/json'})
)
return r return r
except urllib3.exceptions.MaxRetryError: except urllib3.exceptions.MaxRetryError:
pass pass
@ -58,89 +57,74 @@ def post_json(endpoint, url, data):
def publish_json(transport, endpoint, data): def publish_json(transport, endpoint, data):
json_data = json.dumps(data) json_data = json.dumps(data)
serial = data["device"] serial = data['device']
if DEBUG: if DEBUG:
print(json_data) print(json_data)
encoded_data = json_data.encode("utf8") encoded_data = json_data.encode('utf8')
publish.single( publish.single(
topic=serial, topic=serial,
payload=encoded_data, payload=encoded_data,
hostname=endpoint.split(":")[0], hostname=endpoint.split(':')[0],
port=int(endpoint.split(":")[1]), port=int(endpoint.split(':')[1]),
client_id=serial, client_id=serial,
transport=("websockets" if transport == "ws" else "tcp"), transport=('websockets' if transport == 'ws' else 'tcp'),
# auth=auth FIXME # auth=auth FIXME
) )
def main(): def main():
parser = argparse.ArgumentParser(description="IoT simulator oprions") parser = argparse.ArgumentParser(
description='IoT simulator oprions')
parser.add_argument( parser.add_argument('-e', '--endpoint',
"-e", default=os.environ.get('IOT_HTTP',
"--endpoint", 'http://127.0.0.1:8000'),
default=os.environ.get("IOT_HTTP", "http://127.0.0.1:8000"), help='IoT HTTP endpoint')
help="IoT HTTP endpoint", parser.add_argument('-m', '--mqtt',
) default=os.environ.get('IOT_MQTT',
parser.add_argument( '127.0.0.1:1883'),
"-m", help='IoT MQTT endpoint')
"--mqtt", parser.add_argument('-t', '--transport',
default=os.environ.get("IOT_MQTT", "127.0.0.1:1883"), choices=['mqtt', 'ws', 'http'],
help="IoT MQTT endpoint", default=os.environ.get('IOT_TL', 'http'),
) help='IoT transport layer')
parser.add_argument( parser.add_argument('-s', '--serial',
"-t", default=os.environ.get('IOT_SERIAL'),
"--transport", help='IoT device serial number')
choices=["mqtt", "ws", "http"], parser.add_argument('-d', '--delay', metavar='s', type=int,
default=os.environ.get("IOT_TL", "http"), default=os.environ.get('IOT_DELAY', 10),
help="IoT transport layer", help='Delay between requests')
)
parser.add_argument(
"-s",
"--serial",
default=os.environ.get("IOT_SERIAL"),
help="IoT device serial number",
)
parser.add_argument(
"-d",
"--delay",
metavar="s",
type=float,
default=os.environ.get("IOT_DELAY", 10),
help="Delay between requests",
)
args = parser.parse_args() args = parser.parse_args()
dps = "/dps/device/provision/" subscribe = '/api/device/subscribe/'
telemetry = "/telemetry/" telemetry = '/telemetry/'
if args.serial is None: if args.serial is None:
args.serial = "".join( args.serial = ''.join(
random.choices(string.ascii_lowercase + string.digits, k=8) random.choices(string.ascii_lowercase + string.digits, k=8))
)
data = {"serial": args.serial} data = {'serial': args.serial}
post_json(args.endpoint, dps, data) post_json(args.endpoint, subscribe, data)
while True: while True:
data = { data = {
"device": args.serial, 'device': args.serial,
"clock": int(datetime.datetime.now().timestamp()), 'clock': int(datetime.datetime.now().timestamp()),
} }
payload = { payload = {
"id": "device_simulator", 'id': 'device_simulator',
"light": random.randint(300, 500), 'light': random.randint(300, 500),
"temperature": {"celsius": round(random.uniform(20, 28), 1)}, 'temperature': {
'celsius': round(random.uniform(20, 28), 1)}
} }
if args.transport == "http": if args.transport == 'http':
post_json(args.endpoint, telemetry, {**data, "payload": payload}) post_json(args.endpoint, telemetry, {**data, 'payload': payload})
elif args.transport in ("mqtt", "ws"): elif args.transport in ('mqtt', 'ws'):
publish_json( publish_json(
args.transport, args.mqtt, {**data, "payload": payload} args.transport, args.mqtt, {**data, 'payload': payload})
)
else: else:
raise NotImplementedError raise NotImplementedError
sleep(args.delay) sleep(args.delay)

View File

@ -1 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="93.0" height="20"><linearGradient id="smooth" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><clipPath id="round"><rect width="93.0" height="20" rx="3" fill="#fff"/></clipPath><g clip-path="url(#round)"><rect width="65.5" height="20" fill="#555"/><rect x="65.5" width="27.5" height="20" fill="#007ec6"/><rect width="93.0" height="20" fill="url(#smooth)"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110"><image x="5" y="3" width="14" height="14" xlink:href=""/><text x="422.5" y="150" fill="#010101" fill-opacity=".3" transform="scale(0.1)" textLength="385.0" lengthAdjust="spacing">python</text><text x="422.5" y="140" transform="scale(0.1)" textLength="385.0" lengthAdjust="spacing">python</text><text x="782.5" y="150" fill="#010101" fill-opacity=".3" transform="scale(0.1)" textLength="175.0" lengthAdjust="spacing">3.11</text><text x="782.5" y="140" transform="scale(0.1)" textLength="175.0" lengthAdjust="spacing">3.11</text></g></svg> <svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="93.0" height="20"><linearGradient id="smooth" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><clipPath id="round"><rect width="93.0" height="20" rx="3" fill="#fff"/></clipPath><g clip-path="url(#round)"><rect width="65.5" height="20" fill="#555"/><rect x="65.5" width="27.5" height="20" fill="#007ec6"/><rect width="93.0" height="20" fill="url(#smooth)"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110"><image x="5" y="3" width="14" height="14" xlink:href=""/><text x="422.5" y="150" fill="#010101" fill-opacity=".3" transform="scale(0.1)" textLength="385.0" lengthAdjust="spacing">python</text><text x="422.5" y="140" transform="scale(0.1)" textLength="385.0" lengthAdjust="spacing">python</text><text x="782.5" y="150" fill="#010101" fill-opacity=".3" transform="scale(0.1)" textLength="175.0" lengthAdjust="spacing">3.9</text><text x="782.5" y="140" transform="scale(0.1)" textLength="175.0" lengthAdjust="spacing">3.9</text></g></svg>

Before

Width:  |  Height:  |  Size: 2.4 KiB

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 122 KiB

After

Width:  |  Height:  |  Size: 101 KiB

View File

@ -55,13 +55,13 @@ struct netConfig {
} config; } config;
char* serial; char* serial;
const String dpsURL = "/dps/device/subscribe/"; const String apiURL = "/api/device/subscribe/";
const String telemetryURL = "/telemetry/"; const String telemetryURL = "/telemetry/";
void setup(void) { void setup(void) {
Serial.begin(115200); Serial.begin(115200);
StaticJsonDocument<64> dps; StaticJsonDocument<64> api;
preferences.begin("iot"); preferences.begin("iot");
// Get the serial number from flash // Get the serial number from flash
@ -117,8 +117,8 @@ void setup(void) {
Serial.println("DEBUG: clock updated via NTP."); Serial.println("DEBUG: clock updated via NTP.");
#endif #endif
dps["serial"] = serial; api["serial"] = serial;
postData(config, dpsURL, dps); postData(config, apiURL, api);
telemetry["device"] = serial; telemetry["device"] = serial;
// payload["id"] = serverName; // payload["id"] = serverName;

View File

@ -4,4 +4,3 @@ ipython
flake8 flake8
pyinstrument pyinstrument
django-debug-toolbar django-debug-toolbar
urllib3

View File

@ -4,8 +4,7 @@ djangorestframework
django-health-check django-health-check
psycopg2-binary psycopg2-binary
paho-mqtt paho-mqtt
kafka-python asyncio-mqtt
aiomqtt
PyYAML PyYAML
uritemplate uritemplate
pygments pygments