Implement the MQTT protocol (#11)

* Add MQTT support on Arduino

* Add MQTT simulator

* Use a single python simulator

* Minor improvements to dockerfiles

* Add transport internal field to telemetry

* Add PubSubClient
This commit is contained in:
Daniele Viganò 2020-06-08 23:17:16 +02:00 committed by GitHub
parent bbb92100f8
commit a7cebefbf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 238 additions and 93 deletions

3
.gitmodules vendored
View File

@ -7,3 +7,6 @@
[submodule "arduino/libraries/NTPClient"]
path = arduino/libraries/NTPClient
url = https://github.com/daniviga/NTPClient.git
[submodule "arduino/libraries/pubsubclient"]
path = arduino/libraries/pubsubclient
url = https://github.com/knolleary/pubsubclient.git

@ -0,0 +1 @@
Subproject commit 2d228f2f862a95846c65a8518c79f48dfc8f188c

View File

@ -1,12 +1,16 @@
#include <EEPROM.h>
#include <Ethernet.h>
#include <EthernetUdp.h>
#include <PubSubClient.h>
#include <NTPClient.h>
#include <ArduinoJson.h>
#define DEBUG_TO_SERIAL 1
#define DEBUG_TO_SERIAL 1 // debug on serial port
#define USE_MQTT 0 // use mqtt protocol instead of http post
#define USE_INTERNAL_NTP 0 // use default ntp server or the internal one
#define AREF_VOLTAGE 3.3
#define AREF_VOLTAGE 3.3 // set aref voltage to 3.3v instead of default 5v
char serial[9];
// const String serverName = "sensor.server.domain";
const size_t capacity = 2 * JSON_OBJECT_SIZE(3) + JSON_OBJECT_SIZE(2) + 20;
@ -17,11 +21,13 @@ JsonObject temp = payload.createNestedObject("temperature");
unsigned int counter = 0;
EthernetUDP ntpUDP;
NTPClient timeClient(ntpUDP);
bool NTPValid = false;
EthernetClient ethClient;
PubSubClient clientMQTT(ethClient);
struct netConfig {
IPAddress address;
unsigned int port;
@ -39,8 +45,6 @@ void setup(void) {
StaticJsonDocument<20> api;
byte mac[6];
char serial[9];
int eeAddress = 0;
EEPROM.get(eeAddress, mac);
@ -60,10 +64,10 @@ void setup(void) {
Serial.print("IoT #");
Serial.print(serial);
Serial.println(" at address:");
Serial.print(" at address: ");
Serial.println(Ethernet.localIP());
Serial.println();
Serial.println("Connecting to:");
Serial.print("Connecting to: ");
Serial.print(config.address);
Serial.print(":");
Serial.println(config.port);
@ -85,6 +89,10 @@ void setup(void) {
telemetry["device"] = serial;
// payload["id"] = serverName;
#if USE_MQTT
clientMQTT.setServer(config.address, 1883);
#endif
}
void loop(void) {
@ -107,7 +115,11 @@ void loop(void) {
temp["raw"] = tempReading;
temp["volts"] = tempVoltage;
#if USE_MQTT
publishData(config, telemetry);
#else
postData(config, telemetryURL, telemetry);
#endif
if (counter == 6 * 120) { // Update clock every 6 times * 10 sec * 120 minutes = 2 hrs
timeClient.update();
@ -122,6 +134,22 @@ void loop(void) {
delay(postDelay);
}
#if USE_MQTT
void publishData(const netConfig &mqtt, const DynamicJsonDocument &json) {
if (clientMQTT.connect(serial, "freedcs", "password")) {
char buffer[256];
serializeJson(json, buffer);
clientMQTT.publish(serial, buffer);
#if DEBUG_TO_SERIAL
Serial.println("DEBUG: MQTT PUBLISH>>>");
serializeJsonPretty(json, Serial);
Serial.println("\n<<<");
#endif
}
}
#endif
void postData(const netConfig &postAPI, const String &URL, const DynamicJsonDocument &json) {
if (EthernetClient client = client.connect(postAPI.address, postAPI.port)) {
client.print("POST ");
@ -140,7 +168,7 @@ void postData(const netConfig &postAPI, const String &URL, const DynamicJsonDocu
client.stop();
#if DEBUG_TO_SERIAL
Serial.println("DEBUG: >>>");
Serial.println("DEBUG: HTTP POST>>>");
serializeJsonPretty(json, Serial);
Serial.println("\n<<<");
#endif

View File

@ -7,12 +7,14 @@ volumes:
pgdata:
x-op-service-default: &service_default
restart: unless-stopped
restart: always # unless-stopped
init: true
services:
ntpd:
<<: *service_default
build:
context: ./ntpd
image: daniviga/ntpd
networks:
- net
@ -32,18 +34,35 @@ services:
ports:
- "127.0.0.1:5432:5432"
# mosquitto simple deployment
# mqtt:
# <<: *service_default
# # image: vernemq/vernemq
# # environment:
# # DOCKER_VERNEMQ_ALLOW_ANONYMOUS: "on"
# # DOCKER_VERNEMQ_ACCEPT_EULA: "yes"
# image: eclipse-mosquitto
# networks:
# - net
# ports:
# - "1883:1883"
# # - "9001:9001" # mqtt via websocket
rabbitmq:
<<: *service_default
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_VHOST: "freedcs"
RABBITMQ_DEFAULT_USER: "freedcs"
RABBITMQ_DEFAULT_PASS: "password"
image: rabbitmq:3-management-alpine
# environment: # we use unauth access atm
# RABBITMQ_DEFAULT_VHOST: "freedcs"
# RABBITMQ_DEFAULT_USER: "freedcs"
# RABBITMQ_DEFAULT_PASS: "password"
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
networks:
- net
ports:
- "15672:15672"
- "1883:1883"
- "5672:5672"
- "15672:15672"
edge:
<<: *service_default

View File

@ -4,17 +4,35 @@ networks:
localnet:
x-op-service-default: &service_default
restart: unless-stopped
restart: always
init: true
tty: true
services:
device-http:
<<: *service_default
image: daniviga/freedcs-device-http
build:
context: ../simulator
image: daniviga/freedcs-device-simulator
environment:
IOT_HOST: "http://192.168.10.123:8000"
IOT_HTTP: "http://192.168.10.123:8000"
# IOT_SERIAL: "abcd1234"
# IOT_DELAY: 10
IOT_DEBUG: 1
networks:
- localnet
device-mqtt:
<<: *service_default
build:
context: ../simulator
image: daniviga/freedcs-device-simulator
environment:
IOT_HTTP: "http://192.168.10.123:8000"
IOT_MQTT: "192.168.10.123:1883"
# IOT_SERIAL: "abcd1234"
# IOT_DELAY: 10
IOT_DEBUG: 1
command: ["/opt/freedcs/device_simulator.py", "-t", "mqtt"]
networks:
- localnet

View File

@ -0,0 +1 @@
[rabbitmq_management,rabbitmq_mqtt].

View File

@ -0,0 +1,7 @@
FROM python:3.8-alpine
RUN pip3 install urllib3 paho-mqtt
COPY ./device_simulator.py /opt/freedcs/device_simulator.py
ENTRYPOINT ["python3"]
CMD ["/opt/freedcs/device_simulator.py"]

View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
import os
import json
import string
import random
import datetime
import urllib3
import argparse
from time import sleep
import paho.mqtt.publish as publish
DEBUG = bool(os.environ.get('IOT_DEBUG', False))
http = urllib3.PoolManager()
def post_json(endpoint, url, data):
json_data = json.dumps(data)
if DEBUG:
print(json_data)
encoded_data = json_data.encode('utf8')
while True:
try:
r = http.request(
'POST',
endpoint + url,
body=encoded_data,
headers={'content-type': 'application/json'})
return r
except urllib3.exceptions.MaxRetryError:
pass
sleep(10) # retry in 10 seconds
def publish_json(endpoint, data):
json_data = json.dumps(data)
serial = data['device']
if DEBUG:
print(json_data)
encoded_data = json_data.encode('utf8')
publish.single(
topic=serial,
payload=encoded_data,
hostname=endpoint.split(':')[0],
port=int(endpoint.split(':')[1]),
client_id=serial,
# auth=auth FIXME
)
def main():
parser = argparse.ArgumentParser(
description='IoT simulator oprions')
parser.add_argument('-e', '--endpoint',
default=os.environ.get('IOT_HTTP',
'http://127.0.0.1:8000'),
help='IoT HTTP endpoint')
parser.add_argument('-m', '--mqtt',
default=os.environ.get('IOT_MQTT',
'127.0.0.1:1883'),
help='IoT MQTT endpoint')
parser.add_argument('-t', '--transport',
choices=['mqtt', 'http'],
default=os.environ.get('IOT_TL', 'http'),
help='IoT transport layer')
parser.add_argument('-s', '--serial',
default=os.environ.get('IOT_SERIAL'),
help='IoT device serial number')
parser.add_argument('-d', '--delay', metavar='s', type=int,
default=os.environ.get('IOT_DELAY', 10),
help='Delay between requests')
args = parser.parse_args()
subscribe = '/api/device/subscribe/'
telemetry = '/telemetry/'
if args.serial is None:
args.serial = ''.join(
random.choices(string.ascii_lowercase + string.digits, k=8))
data = {'serial': args.serial}
post_json(args.endpoint, subscribe, data)
data = {
'device': args.serial,
'clock': int(datetime.datetime.now().timestamp()),
}
while True:
payload = {
'id': 'device_simulator',
'light': random.randint(300, 500),
'temperature': {
'celsius': round(random.uniform(20, 28), 1)}
}
if args.transport == 'http':
post_json(args.endpoint, telemetry, {**data, 'payload': payload})
elif args.transport == 'mqtt':
publish_json(args.mqtt, {**data, 'payload': payload})
else:
raise NotImplementedError
sleep(args.delay)
if __name__ == "__main__":
main()

View File

@ -1,6 +0,0 @@
FROM python:3.8-alpine
RUN pip3 install urllib3
COPY ./simulator_http.py /opt/freedcs/simulator_http.py
ENTRYPOINT ["python3", "/opt/freedcs/simulator_http.py"]

View File

@ -1,68 +0,0 @@
#!/usr/bin/env python3
import os
import json
import string
import random
import datetime
import urllib3
from time import sleep
DEBUG = bool(os.environ.get('IOT_DEBUG', False))
http = urllib3.PoolManager()
def post_json(host, url, data):
json_data = json.dumps(data)
if DEBUG:
print(json_data)
encoded_data = json_data.encode('utf8')
while True:
try:
r = http.request(
'POST',
host + url,
body=encoded_data,
headers={'content-type': 'application/json'})
return r
except urllib3.exceptions.MaxRetryError:
pass
sleep(10) # retry in 10 seconds
def main():
host = os.environ.get('IOT_HOST', 'http://127.0.0.1:8000')
subscribe = '/api/device/subscribe/'
telemetry = '/telemetry/'
delay = int(os.environ.get('IOT_DELAY', 10))
serial = os.environ.get('IOT_SERIAL')
if serial is None:
serial = ''.join(
random.choices(string.ascii_lowercase + string.digits, k=8))
data = {'serial': serial}
post_json(host, subscribe, data)
data = {
'device': serial,
'clock': int(datetime.datetime.now().timestamp()),
}
while True:
payload = {
'id': 'device_http_simulator',
'light': random.randint(300, 500),
'temperature': {
'celsius': round(random.uniform(20, 28), 1)}
}
post_json(host, telemetry, {**data, 'payload': payload})
sleep(delay)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,18 @@
# Generated by Django 3.0.6 on 2020-06-08 20:07
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telemetry', '0006_auto_20200603_1317'),
]
operations = [
migrations.AddField(
model_name='telemetry',
name='transport',
field=models.CharField(choices=[('http', 'http'), ('mqtt', 'mqtt')], default='http', max_length=4),
),
]

View File

@ -8,6 +8,9 @@ from api.models import Device
class Telemetry(models.Model):
device = models.ForeignKey(Device, on_delete=models.CASCADE)
time = models.DateTimeField(primary_key=True, auto_now_add=True)
transport = models.CharField(max_length=4,
choices=[('http', 'http'), ('mqtt', 'mqtt')],
default='http')
clock = models.IntegerField(
validators=[MinValueValidator(0)],
null=True)

View File

@ -11,4 +11,10 @@ class TelemetrySerializer(serializers.ModelSerializer):
class Meta:
model = Telemetry
fields = ('time', 'device', 'clock', 'payload',)
fields = ('time', 'device', 'clock', 'transport', 'payload',)
read_only_fields = ['transport']
def create(self, validated_data):
validated_data['transport'] = 'http'
telemetry = Telemetry.objects.create(**validated_data)
return telemetry