From a7cebefbf5c7714163fa8de5b4884fb99fe57b34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniele=20Vigan=C3=B2?= Date: Mon, 8 Jun 2020 23:17:16 +0200 Subject: [PATCH] Implement the MQTT protocol (#11) * Add MQTT support on Arduino * Add MQTT simulator * Use a single python simulator * Minor improvements to dockerfiles * Add transport internal field to telemetry * Add PubSubClient --- .gitmodules | 3 + arduino/libraries/pubsubclient | 1 + arduino/tempLightSensor/tempLightSensor.ino | 44 +++++-- docker/docker-compose.yml | 33 +++-- docker/edge/docker-compose.yml | 24 +++- docker/rabbitmq/enabled_plugins | 1 + docker/simulator/Dockerfile | 7 ++ docker/simulator/device_simulator.py | 115 ++++++++++++++++++ docker/simulators/Dockerfile.http | 6 - docker/simulators/simulator_http.py | 68 ----------- .../migrations/0007_telemetry_transport.py | 18 +++ freedcs/telemetry/models.py | 3 + freedcs/telemetry/serializers.py | 8 +- 13 files changed, 238 insertions(+), 93 deletions(-) create mode 160000 arduino/libraries/pubsubclient create mode 100644 docker/rabbitmq/enabled_plugins create mode 100644 docker/simulator/Dockerfile create mode 100644 docker/simulator/device_simulator.py delete mode 100644 docker/simulators/Dockerfile.http delete mode 100644 docker/simulators/simulator_http.py create mode 100644 freedcs/telemetry/migrations/0007_telemetry_transport.py diff --git a/.gitmodules b/.gitmodules index 8207b76..295f707 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "arduino/libraries/NTPClient"] path = arduino/libraries/NTPClient url = https://github.com/daniviga/NTPClient.git +[submodule "arduino/libraries/pubsubclient"] + path = arduino/libraries/pubsubclient + url = https://github.com/knolleary/pubsubclient.git diff --git a/arduino/libraries/pubsubclient b/arduino/libraries/pubsubclient new file mode 160000 index 0000000..2d228f2 --- /dev/null +++ b/arduino/libraries/pubsubclient @@ -0,0 +1 @@ +Subproject commit 2d228f2f862a95846c65a8518c79f48dfc8f188c diff --git a/arduino/tempLightSensor/tempLightSensor.ino b/arduino/tempLightSensor/tempLightSensor.ino index 3956ccd..fd06f85 100644 --- a/arduino/tempLightSensor/tempLightSensor.ino +++ b/arduino/tempLightSensor/tempLightSensor.ino @@ -1,12 +1,16 @@ #include #include #include +#include #include #include -#define DEBUG_TO_SERIAL 1 +#define DEBUG_TO_SERIAL 1 // debug on serial port +#define USE_MQTT 0 // use mqtt protocol instead of http post #define USE_INTERNAL_NTP 0 // use default ntp server or the internal one -#define AREF_VOLTAGE 3.3 +#define AREF_VOLTAGE 3.3 // set aref voltage to 3.3v instead of default 5v + +char serial[9]; // const String serverName = "sensor.server.domain"; const size_t capacity = 2 * JSON_OBJECT_SIZE(3) + JSON_OBJECT_SIZE(2) + 20; @@ -17,11 +21,13 @@ JsonObject temp = payload.createNestedObject("temperature"); unsigned int counter = 0; - EthernetUDP ntpUDP; NTPClient timeClient(ntpUDP); bool NTPValid = false; +EthernetClient ethClient; +PubSubClient clientMQTT(ethClient); + struct netConfig { IPAddress address; unsigned int port; @@ -39,8 +45,6 @@ void setup(void) { StaticJsonDocument<20> api; byte mac[6]; - char serial[9]; - int eeAddress = 0; EEPROM.get(eeAddress, mac); @@ -60,10 +64,10 @@ void setup(void) { Serial.print("IoT #"); Serial.print(serial); - Serial.println(" at address:"); + Serial.print(" at address: "); Serial.println(Ethernet.localIP()); Serial.println(); - Serial.println("Connecting to:"); + Serial.print("Connecting to: "); Serial.print(config.address); Serial.print(":"); Serial.println(config.port); @@ -85,6 +89,10 @@ void setup(void) { telemetry["device"] = serial; // payload["id"] = serverName; + +#if USE_MQTT + clientMQTT.setServer(config.address, 1883); +#endif } void loop(void) { @@ -107,7 +115,11 @@ void loop(void) { temp["raw"] = tempReading; temp["volts"] = tempVoltage; +#if USE_MQTT + publishData(config, telemetry); +#else postData(config, telemetryURL, telemetry); +#endif if (counter == 6 * 120) { // Update clock every 6 times * 10 sec * 120 minutes = 2 hrs timeClient.update(); @@ -122,6 +134,22 @@ void loop(void) { delay(postDelay); } +#if USE_MQTT +void publishData(const netConfig &mqtt, const DynamicJsonDocument &json) { + if (clientMQTT.connect(serial, "freedcs", "password")) { + char buffer[256]; + serializeJson(json, buffer); + clientMQTT.publish(serial, buffer); + +#if DEBUG_TO_SERIAL + Serial.println("DEBUG: MQTT PUBLISH>>>"); + serializeJsonPretty(json, Serial); + Serial.println("\n<<<"); +#endif + } +} +#endif + void postData(const netConfig &postAPI, const String &URL, const DynamicJsonDocument &json) { if (EthernetClient client = client.connect(postAPI.address, postAPI.port)) { client.print("POST "); @@ -140,7 +168,7 @@ void postData(const netConfig &postAPI, const String &URL, const DynamicJsonDocu client.stop(); #if DEBUG_TO_SERIAL - Serial.println("DEBUG: >>>"); + Serial.println("DEBUG: HTTP POST>>>"); serializeJsonPretty(json, Serial); Serial.println("\n<<<"); #endif diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 191b27f..ad8d5d6 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,12 +7,14 @@ volumes: pgdata: x-op-service-default: &service_default - restart: unless-stopped + restart: always # unless-stopped init: true services: ntpd: <<: *service_default + build: + context: ./ntpd image: daniviga/ntpd networks: - net @@ -32,18 +34,35 @@ services: ports: - "127.0.0.1:5432:5432" + # mosquitto simple deployment + # mqtt: + # <<: *service_default + # # image: vernemq/vernemq + # # environment: + # # DOCKER_VERNEMQ_ALLOW_ANONYMOUS: "on" + # # DOCKER_VERNEMQ_ACCEPT_EULA: "yes" + # image: eclipse-mosquitto + # networks: + # - net + # ports: + # - "1883:1883" + # # - "9001:9001" # mqtt via websocket + rabbitmq: <<: *service_default - image: rabbitmq:3-management - environment: - RABBITMQ_DEFAULT_VHOST: "freedcs" - RABBITMQ_DEFAULT_USER: "freedcs" - RABBITMQ_DEFAULT_PASS: "password" + image: rabbitmq:3-management-alpine + # environment: # we use unauth access atm + # RABBITMQ_DEFAULT_VHOST: "freedcs" + # RABBITMQ_DEFAULT_USER: "freedcs" + # RABBITMQ_DEFAULT_PASS: "password" + volumes: + - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins networks: - net ports: - - "15672:15672" + - "1883:1883" - "5672:5672" + - "15672:15672" edge: <<: *service_default diff --git a/docker/edge/docker-compose.yml b/docker/edge/docker-compose.yml index 25d509e..2b778ae 100644 --- a/docker/edge/docker-compose.yml +++ b/docker/edge/docker-compose.yml @@ -4,17 +4,35 @@ networks: localnet: x-op-service-default: &service_default - restart: unless-stopped + restart: always init: true + tty: true services: device-http: <<: *service_default - image: daniviga/freedcs-device-http + build: + context: ../simulator + image: daniviga/freedcs-device-simulator environment: - IOT_HOST: "http://192.168.10.123:8000" + IOT_HTTP: "http://192.168.10.123:8000" # IOT_SERIAL: "abcd1234" # IOT_DELAY: 10 IOT_DEBUG: 1 networks: - localnet + + device-mqtt: + <<: *service_default + build: + context: ../simulator + image: daniviga/freedcs-device-simulator + environment: + IOT_HTTP: "http://192.168.10.123:8000" + IOT_MQTT: "192.168.10.123:1883" + # IOT_SERIAL: "abcd1234" + # IOT_DELAY: 10 + IOT_DEBUG: 1 + command: ["/opt/freedcs/device_simulator.py", "-t", "mqtt"] + networks: + - localnet diff --git a/docker/rabbitmq/enabled_plugins b/docker/rabbitmq/enabled_plugins new file mode 100644 index 0000000..5358cb0 --- /dev/null +++ b/docker/rabbitmq/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management,rabbitmq_mqtt]. diff --git a/docker/simulator/Dockerfile b/docker/simulator/Dockerfile new file mode 100644 index 0000000..5ec0069 --- /dev/null +++ b/docker/simulator/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.8-alpine + +RUN pip3 install urllib3 paho-mqtt +COPY ./device_simulator.py /opt/freedcs/device_simulator.py + +ENTRYPOINT ["python3"] +CMD ["/opt/freedcs/device_simulator.py"] diff --git a/docker/simulator/device_simulator.py b/docker/simulator/device_simulator.py new file mode 100644 index 0000000..7fff546 --- /dev/null +++ b/docker/simulator/device_simulator.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 + +import os +import json +import string +import random +import datetime +import urllib3 +import argparse + +from time import sleep +import paho.mqtt.publish as publish + +DEBUG = bool(os.environ.get('IOT_DEBUG', False)) +http = urllib3.PoolManager() + + +def post_json(endpoint, url, data): + json_data = json.dumps(data) + + if DEBUG: + print(json_data) + + encoded_data = json_data.encode('utf8') + + while True: + try: + r = http.request( + 'POST', + endpoint + url, + body=encoded_data, + headers={'content-type': 'application/json'}) + return r + except urllib3.exceptions.MaxRetryError: + pass + + sleep(10) # retry in 10 seconds + + +def publish_json(endpoint, data): + json_data = json.dumps(data) + serial = data['device'] + + if DEBUG: + print(json_data) + + encoded_data = json_data.encode('utf8') + + publish.single( + topic=serial, + payload=encoded_data, + hostname=endpoint.split(':')[0], + port=int(endpoint.split(':')[1]), + client_id=serial, + # auth=auth FIXME + ) + + +def main(): + parser = argparse.ArgumentParser( + description='IoT simulator oprions') + + parser.add_argument('-e', '--endpoint', + default=os.environ.get('IOT_HTTP', + 'http://127.0.0.1:8000'), + help='IoT HTTP endpoint') + parser.add_argument('-m', '--mqtt', + default=os.environ.get('IOT_MQTT', + '127.0.0.1:1883'), + help='IoT MQTT endpoint') + parser.add_argument('-t', '--transport', + choices=['mqtt', 'http'], + default=os.environ.get('IOT_TL', 'http'), + help='IoT transport layer') + parser.add_argument('-s', '--serial', + default=os.environ.get('IOT_SERIAL'), + help='IoT device serial number') + parser.add_argument('-d', '--delay', metavar='s', type=int, + default=os.environ.get('IOT_DELAY', 10), + help='Delay between requests') + args = parser.parse_args() + + subscribe = '/api/device/subscribe/' + telemetry = '/telemetry/' + + if args.serial is None: + args.serial = ''.join( + random.choices(string.ascii_lowercase + string.digits, k=8)) + + data = {'serial': args.serial} + post_json(args.endpoint, subscribe, data) + + data = { + 'device': args.serial, + 'clock': int(datetime.datetime.now().timestamp()), + } + + while True: + payload = { + 'id': 'device_simulator', + 'light': random.randint(300, 500), + 'temperature': { + 'celsius': round(random.uniform(20, 28), 1)} + } + if args.transport == 'http': + post_json(args.endpoint, telemetry, {**data, 'payload': payload}) + elif args.transport == 'mqtt': + publish_json(args.mqtt, {**data, 'payload': payload}) + else: + raise NotImplementedError + sleep(args.delay) + + +if __name__ == "__main__": + main() diff --git a/docker/simulators/Dockerfile.http b/docker/simulators/Dockerfile.http deleted file mode 100644 index ec666be..0000000 --- a/docker/simulators/Dockerfile.http +++ /dev/null @@ -1,6 +0,0 @@ -FROM python:3.8-alpine - -RUN pip3 install urllib3 -COPY ./simulator_http.py /opt/freedcs/simulator_http.py - -ENTRYPOINT ["python3", "/opt/freedcs/simulator_http.py"] diff --git a/docker/simulators/simulator_http.py b/docker/simulators/simulator_http.py deleted file mode 100644 index bcbfd29..0000000 --- a/docker/simulators/simulator_http.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 - -import os -import json -import string -import random -import datetime -import urllib3 -from time import sleep - -DEBUG = bool(os.environ.get('IOT_DEBUG', False)) -http = urllib3.PoolManager() - - -def post_json(host, url, data): - json_data = json.dumps(data) - - if DEBUG: - print(json_data) - - encoded_data = json_data.encode('utf8') - - while True: - try: - r = http.request( - 'POST', - host + url, - body=encoded_data, - headers={'content-type': 'application/json'}) - return r - except urllib3.exceptions.MaxRetryError: - pass - - sleep(10) # retry in 10 seconds - - -def main(): - host = os.environ.get('IOT_HOST', 'http://127.0.0.1:8000') - subscribe = '/api/device/subscribe/' - telemetry = '/telemetry/' - delay = int(os.environ.get('IOT_DELAY', 10)) - - serial = os.environ.get('IOT_SERIAL') - if serial is None: - serial = ''.join( - random.choices(string.ascii_lowercase + string.digits, k=8)) - - data = {'serial': serial} - post_json(host, subscribe, data) - - data = { - 'device': serial, - 'clock': int(datetime.datetime.now().timestamp()), - } - - while True: - payload = { - 'id': 'device_http_simulator', - 'light': random.randint(300, 500), - 'temperature': { - 'celsius': round(random.uniform(20, 28), 1)} - } - post_json(host, telemetry, {**data, 'payload': payload}) - sleep(delay) - - -if __name__ == "__main__": - main() diff --git a/freedcs/telemetry/migrations/0007_telemetry_transport.py b/freedcs/telemetry/migrations/0007_telemetry_transport.py new file mode 100644 index 0000000..29395d8 --- /dev/null +++ b/freedcs/telemetry/migrations/0007_telemetry_transport.py @@ -0,0 +1,18 @@ +# Generated by Django 3.0.6 on 2020-06-08 20:07 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('telemetry', '0006_auto_20200603_1317'), + ] + + operations = [ + migrations.AddField( + model_name='telemetry', + name='transport', + field=models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4), + ), + ] diff --git a/freedcs/telemetry/models.py b/freedcs/telemetry/models.py index 2c8a04b..60f1ff0 100644 --- a/freedcs/telemetry/models.py +++ b/freedcs/telemetry/models.py @@ -8,6 +8,9 @@ from api.models import Device class Telemetry(models.Model): device = models.ForeignKey(Device, on_delete=models.CASCADE) time = models.DateTimeField(primary_key=True, auto_now_add=True) + transport = models.CharField(max_length=4, + choices=[('http', 'http'), ('mqtt', 'mqtt')], + default='http') clock = models.IntegerField( validators=[MinValueValidator(0)], null=True) diff --git a/freedcs/telemetry/serializers.py b/freedcs/telemetry/serializers.py index 2149af4..372039b 100644 --- a/freedcs/telemetry/serializers.py +++ b/freedcs/telemetry/serializers.py @@ -11,4 +11,10 @@ class TelemetrySerializer(serializers.ModelSerializer): class Meta: model = Telemetry - fields = ('time', 'device', 'clock', 'payload',) + fields = ('time', 'device', 'clock', 'transport', 'payload',) + read_only_fields = ['transport'] + + def create(self, validated_data): + validated_data['transport'] = 'http' + telemetry = Telemetry.objects.create(**validated_data) + return telemetry