1
0
mirror of https://github.com/daniviga/bite.git synced 2024-11-24 13:56:13 +01:00

Compare commits

...

7 Commits

28 changed files with 162 additions and 123 deletions

View File

@ -13,6 +13,8 @@ This project is for educational purposes only. It does not implement any
authentication and/or encryption protocol, so it is not suitable for real
production.
![Application Schema](./docs/application_chart.png)
## Installation
### Requirements

View File

@ -55,7 +55,7 @@ struct netConfig {
} config;
char serial[9];
const String apiURL = "/api/device/subscribe/";
const String dpsURL = "/dps/device/subscribe/";
const String telemetryURL = "/telemetry/";
void setup(void) {
@ -63,7 +63,7 @@ void setup(void) {
analogReference(EXTERNAL);
StaticJsonDocument<20> api;
StaticJsonDocument<20> dps;
byte mac[6];
int eeAddress = 0;
@ -110,8 +110,8 @@ void setup(void) {
Serial.println("DEBUG: clock updated via NTP.");
#endif
api["serial"] = serial;
postData(config, apiURL, api);
dps["serial"] = serial;
postData(config, dpsURL, dps);
telemetry["device"] = serial;
// payload["id"] = serverName;

View File

@ -0,0 +1,20 @@
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'bite',
'USER': 'bite',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
}
}
MQTT_BROKER = {
'HOST': 'localhost',
'PORT': '1883',
}
KAFKA_BROKER = {
'HOST': 'localhost',
'PORT': '29092',
}

View File

@ -61,7 +61,7 @@ INSTALLED_APPS = [
# 'health_check.storage',
'rest_framework',
'bite',
'api',
'dps',
'telemetry',
]
@ -167,11 +167,12 @@ KAFKA_BROKER = {
'PORT': '9092',
}
# If no local_settings.py is availble in the current folder let's try to
# load it from the application root
try:
from bite.local_settings import *
except ImportError:
pass
try:
from bite.production import *
except ImportError:
# If a local_setting.py does not exist
# settings in this file only will be used
pass

View File

@ -37,13 +37,13 @@ from django.contrib import admin
from django.conf import settings
from django.urls import include, path
from api import urls as api_urls
from dps import urls as dps_urls
from telemetry import urls as telemetry_urls
urlpatterns = [
path('admin/', admin.site.urls),
path('ht/', include('health_check.urls')),
path('api/', include(api_urls)),
path('dps/', include(dps_urls)),
path('telemetry/', include(telemetry_urls)),
]

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib import admin
from api.models import Device, WhiteList
from dps.models import Device, WhiteList
@admin.register(Device)

View File

@ -20,5 +20,5 @@
from django.apps import AppConfig
class ApiConfig(AppConfig):
name = 'api'
class DPSConfig(AppConfig):
name = 'dps'

View File

@ -1,6 +1,6 @@
# Generated by Django 3.1.3 on 2021-03-19 08:08
import api.models
import dps.models
from django.db import migrations, models
import uuid
@ -16,7 +16,7 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='Device',
fields=[
('serial', models.CharField(max_length=128, unique=True, validators=[api.models.device_validation])),
('serial', models.CharField(max_length=128, unique=True, validators=[dps.models.device_validation])),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('creation_time', models.DateTimeField(auto_now_add=True)),
('updated_time', models.DateTimeField(auto_now=True)),

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers
from api.models import Device, device_validation
from dps.models import Device, device_validation
class DeviceSerializer(serializers.ModelSerializer):

View File

@ -18,10 +18,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.test import TestCase, Client
from api.models import Device, WhiteList
from dps.models import Device, WhiteList
class ApiTestCase(TestCase):
class DPSTestCase(TestCase):
c = Client()
def setUp(self):
@ -29,17 +29,17 @@ class ApiTestCase(TestCase):
Device.objects.create(serial='test1234')
def test_no_whitelist(self):
response = self.c.post('/api/device/subscribe/',
response = self.c.post('/dps/device/provision/',
{'serial': 'test12345'})
self.assertEqual(response.status_code, 400)
def test_subscribe_post(self):
def test_provision_post(self):
WhiteList.objects.create(serial='test12345')
response = self.c.post('/api/device/subscribe/',
response = self.c.post('/dps/device/provision/',
{'serial': 'test12345'})
self.assertEqual(response.status_code, 201)
def test_subscribe_get(self):
response = self.c.get('/api/device/list/')
def test_provision_get(self):
response = self.c.get('/dps/device/list/')
self.assertEqual(
response.json()[0]['serial'], 'test1234')

View File

@ -33,13 +33,13 @@ Including another URLconf
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.urls import path
from api.views import APISubscribe
from dps.views import DPS
urlpatterns = [
path('device/subscribe/',
APISubscribe.as_view({'post': 'create'}),
name='device-subscribe'),
path('device/provision/',
DPS.as_view({'post': 'create'}),
name='device-provision'),
path('device/list/',
APISubscribe.as_view({'get': 'list'}),
DPS.as_view({'get': 'list'}),
name='device-list'),
]

View File

@ -19,10 +19,10 @@
from rest_framework.viewsets import ModelViewSet
from api.models import Device
from api.serializers import DeviceSerializer
from dps.models import Device
from dps.serializers import DeviceSerializer
class APISubscribe(ModelViewSet):
class DPS(ModelViewSet):
queryset = Device.objects.all()
serializer_class = DeviceSerializer

View File

@ -31,16 +31,16 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist
from api.models import Device
from dps.models import Device
class Command(BaseCommand):
help = 'MQTT to DB deamon'
help = "Telemetry dispatcher"
MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
KAFKA_HOST = settings.KAFKA_BROKER['HOST']
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT'])
MQTT_HOST = settings.MQTT_BROKER["HOST"]
MQTT_PORT = int(settings.MQTT_BROKER["PORT"])
KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
producer = None
@sync_to_async
@ -52,10 +52,7 @@ class Command(BaseCommand):
@sync_to_async
def dispatch(self, message):
self.producer.send(
'telemetry', {"transport": 'mqtt',
"body": message}
)
self.producer.send("telemetry", {"transport": "mqtt", "body": message})
async def mqtt_broker(self):
async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client:
@ -66,12 +63,13 @@ class Command(BaseCommand):
device = await self.get_device(message.topic)
if device is not None:
message_body = json.loads(
message.payload.decode('utf-8'))
message.payload.decode("utf-8")
)
await self.dispatch(message_body)
else:
self.stdout.write(
self.style.ERROR(
'DEBUG: message discarded'))
self.style.ERROR("DEBUG: message discarded")
)
def handle(self, *args, **options):
client = mqtt.Client()
@ -81,24 +79,26 @@ class Command(BaseCommand):
break
except (socket.gaierror, ConnectionRefusedError):
self.stdout.write(
self.style.WARNING('WARNING: MQTT broker not available'))
self.style.WARNING("WARNING: MQTT broker not available")
)
time.sleep(5)
while True:
try:
self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(
bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT
),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=5
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
retries=5,
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed'))
self.stdout.write(self.style.SUCCESS("INFO: Brokers subscribed"))
client.disconnect()
asyncio.run(self.mqtt_broker())

View File

@ -26,15 +26,15 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist
from api.models import Device
from dps.models import Device
from telemetry.models import Telemetry
class Command(BaseCommand):
help = 'MQTT to DB deamon'
help = "Telemetry handler"
KAFKA_HOST = settings.KAFKA_BROKER['HOST']
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT'])
KAFKA_HOST = settings.KAFKA_BROKER["HOST"]
KAFKA_PORT = int(settings.KAFKA_BROKER["PORT"])
def get_device(self, serial):
try:
@ -47,7 +47,7 @@ class Command(BaseCommand):
transport=transport,
device=self.get_device(message["device"]),
clock=message["clock"],
payload=message["payload"]
payload=message["payload"],
)
def handle(self, *args, **options):
@ -55,22 +55,22 @@ class Command(BaseCommand):
try:
consumer = KafkaConsumer(
"telemetry",
bootstrap_servers='{}:{}'.format(
bootstrap_servers="{}:{}".format(
self.KAFKA_HOST, self.KAFKA_PORT
),
group_id="handler",
value_deserializer=lambda m: json.loads(m.decode('utf8')),
value_deserializer=lambda m: json.loads(m.decode("utf8")),
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
self.style.WARNING("WARNING: Kafka broker not available")
)
time.sleep(5)
self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed'))
self.stdout.write(self.style.SUCCESS("INFO: Kafka broker subscribed"))
for message in consumer:
self.store_telemetry(
message.value["transport"],
message.value["body"]
message.value["transport"], message.value["body"]
)
consumer.unsuscribe()

View File

@ -11,7 +11,7 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
('api', '0001_initial'),
('dps', '0001_initial'),
]
operations = [
@ -23,7 +23,7 @@ class Migration(migrations.Migration):
('transport', models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4)),
('clock', models.IntegerField(null=True, validators=[django.core.validators.MinValueValidator(0)])),
('payload', models.JSONField(validators=[telemetry.models.telemetry_validation])),
('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='api.device')),
('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='dps.device')),
],
options={
'verbose_name_plural': 'Telemetry',

View File

@ -21,7 +21,7 @@ from django.db import models
from django.core.validators import MinValueValidator
from django.core.exceptions import ValidationError
from api.models import Device
from dps.models import Device
def telemetry_validation(value):

View File

@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers
from api.models import Device
from dps.models import Device
from telemetry.models import Telemetry

View File

@ -19,7 +19,7 @@
import json
from django.test import TestCase, Client
from api.models import Device, WhiteList
from dps.models import Device, WhiteList
class ApiTestCase(TestCase):

View File

@ -37,8 +37,11 @@ services:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000"
kafka:
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:9092:9092"
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:29092:29092"
data-migration:
volumes:

View File

@ -43,7 +43,7 @@ services:
timescale:
<<: *service_default
image: timescale/timescaledb:latest-pg12
image: timescale/timescaledb:latest-pg15
environment:
POSTGRES_USER: "bite"
POSTGRES_PASSWORD: "password"
@ -69,8 +69,6 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
@ -81,8 +79,8 @@ services:
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
@ -99,7 +97,6 @@ services:
- staticdata:/srv/appdata/bite/static
- ./ingress/nginx.conf:/etc/nginx/nginx.conf
bite:
<<: *service_default
build:

View File

@ -29,7 +29,7 @@ import argparse
from time import sleep
import paho.mqtt.publish as publish
DEBUG = bool(os.environ.get('IOT_DEBUG', False))
DEBUG = bool(os.environ.get("IOT_DEBUG", False))
http = urllib3.PoolManager()
@ -39,15 +39,16 @@ def post_json(endpoint, url, data):
if DEBUG:
print(json_data)
encoded_data = json_data.encode('utf8')
encoded_data = json_data.encode("utf8")
while True:
try:
r = http.request(
'POST',
"POST",
endpoint + url,
body=encoded_data,
headers={'content-type': 'application/json'})
headers={"content-type": "application/json"},
)
return r
except urllib3.exceptions.MaxRetryError:
pass
@ -57,74 +58,89 @@ def post_json(endpoint, url, data):
def publish_json(transport, endpoint, data):
json_data = json.dumps(data)
serial = data['device']
serial = data["device"]
if DEBUG:
print(json_data)
encoded_data = json_data.encode('utf8')
encoded_data = json_data.encode("utf8")
publish.single(
topic=serial,
payload=encoded_data,
hostname=endpoint.split(':')[0],
port=int(endpoint.split(':')[1]),
hostname=endpoint.split(":")[0],
port=int(endpoint.split(":")[1]),
client_id=serial,
transport=('websockets' if transport == 'ws' else 'tcp'),
transport=("websockets" if transport == "ws" else "tcp"),
# auth=auth FIXME
)
def main():
parser = argparse.ArgumentParser(
description='IoT simulator oprions')
parser = argparse.ArgumentParser(description="IoT simulator oprions")
parser.add_argument('-e', '--endpoint',
default=os.environ.get('IOT_HTTP',
'http://127.0.0.1:8000'),
help='IoT HTTP endpoint')
parser.add_argument('-m', '--mqtt',
default=os.environ.get('IOT_MQTT',
'127.0.0.1:1883'),
help='IoT MQTT endpoint')
parser.add_argument('-t', '--transport',
choices=['mqtt', 'ws', 'http'],
default=os.environ.get('IOT_TL', 'http'),
help='IoT transport layer')
parser.add_argument('-s', '--serial',
default=os.environ.get('IOT_SERIAL'),
help='IoT device serial number')
parser.add_argument('-d', '--delay', metavar='s', type=float,
default=os.environ.get('IOT_DELAY', 10),
help='Delay between requests')
parser.add_argument(
"-e",
"--endpoint",
default=os.environ.get("IOT_HTTP", "http://127.0.0.1:8000"),
help="IoT HTTP endpoint",
)
parser.add_argument(
"-m",
"--mqtt",
default=os.environ.get("IOT_MQTT", "127.0.0.1:1883"),
help="IoT MQTT endpoint",
)
parser.add_argument(
"-t",
"--transport",
choices=["mqtt", "ws", "http"],
default=os.environ.get("IOT_TL", "http"),
help="IoT transport layer",
)
parser.add_argument(
"-s",
"--serial",
default=os.environ.get("IOT_SERIAL"),
help="IoT device serial number",
)
parser.add_argument(
"-d",
"--delay",
metavar="s",
type=float,
default=os.environ.get("IOT_DELAY", 10),
help="Delay between requests",
)
args = parser.parse_args()
subscribe = '/api/device/subscribe/'
telemetry = '/telemetry/'
dps = "/dps/device/provision/"
telemetry = "/telemetry/"
if args.serial is None:
args.serial = ''.join(
random.choices(string.ascii_lowercase + string.digits, k=8))
args.serial = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)
)
data = {'serial': args.serial}
post_json(args.endpoint, subscribe, data)
data = {"serial": args.serial}
post_json(args.endpoint, dps, data)
while True:
data = {
'device': args.serial,
'clock': int(datetime.datetime.now().timestamp()),
"device": args.serial,
"clock": int(datetime.datetime.now().timestamp()),
}
payload = {
'id': 'device_simulator',
'light': random.randint(300, 500),
'temperature': {
'celsius': round(random.uniform(20, 28), 1)}
"id": "device_simulator",
"light": random.randint(300, 500),
"temperature": {"celsius": round(random.uniform(20, 28), 1)},
}
if args.transport == 'http':
post_json(args.endpoint, telemetry, {**data, 'payload': payload})
elif args.transport in ('mqtt', 'ws'):
if args.transport == "http":
post_json(args.endpoint, telemetry, {**data, "payload": payload})
elif args.transport in ("mqtt", "ws"):
publish_json(
args.transport, args.mqtt, {**data, 'payload': payload})
args.transport, args.mqtt, {**data, "payload": payload}
)
else:
raise NotImplementedError
sleep(args.delay)

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 101 KiB

After

Width:  |  Height:  |  Size: 122 KiB

View File

@ -55,13 +55,13 @@ struct netConfig {
} config;
char* serial;
const String apiURL = "/api/device/subscribe/";
const String dpsURL = "/dps/device/subscribe/";
const String telemetryURL = "/telemetry/";
void setup(void) {
Serial.begin(115200);
StaticJsonDocument<64> api;
StaticJsonDocument<64> dps;
preferences.begin("iot");
// Get the serial number from flash
@ -117,8 +117,8 @@ void setup(void) {
Serial.println("DEBUG: clock updated via NTP.");
#endif
api["serial"] = serial;
postData(config, apiURL, api);
dps["serial"] = serial;
postData(config, dpsURL, dps);
telemetry["device"] = serial;
// payload["id"] = serverName;