diff --git a/CommandStation-EX.ino b/CommandStation-EX.ino
index 5355376..d197d80 100644
--- a/CommandStation-EX.ino
+++ b/CommandStation-EX.ino
@@ -75,6 +75,14 @@ void setup()
EthernetInterface::setup();
#endif // ETHERNET_ON
+#if MQTT_ON
+ // We assume here that we have a network connection
+ DccMQTT::setup(serialParser);
+#endif
+
+
+
+
// Responsibility 3: Start the DCC engine.
// Note: this provides DCC with two motor drivers, main and prog, which handle the motor shield(s)
// Standard supported devices have pre-configured macros but custome hardware installations require
diff --git a/DCCEX.h b/DCCEX.h
index 244be19..d274e45 100644
--- a/DCCEX.h
+++ b/DCCEX.h
@@ -10,9 +10,15 @@
#include "DCCEXParser.h"
#include "version.h"
#include "WifiInterface.h"
+
#if ETHERNET_ON == true
#include "EthernetInterface.h"
#endif
+
+#if MQTT_ON == true
+#include "DccMQTT.h"
+#endif
+
#include "LCD_Implementation.h"
#include "LCN.h"
#include "freeMemory.h"
diff --git a/DccMQTT.cpp b/DccMQTT.cpp
new file mode 100644
index 0000000..666cba8
--- /dev/null
+++ b/DccMQTT.cpp
@@ -0,0 +1,964 @@
+/**
+ * @file DccMQTT.cpp
+ * @author Gregor Baues
+ * @brief MQTT protocol controller for DCC-EX. Sets up and maintains the connection to the MQTT broker incl setting up the topics.
+ * Topics are created specifically for the command station on which the code runs. Manages subsriptions as well as recieving/sending of messages on the different topics.
+ * @version 0.1
+ * @date 2020-07-08
+ *
+ * @copyright Copyright (c) 2020
+ *
+ * This is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * It is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details .
+ */
+
+#if __has_include ( "config.h")
+ #include "config.h"
+#else
+ #warning config.h not found. Using defaults from config.example.h
+ #include "config.example.h"
+#endif
+#include "defines.h"
+
+#include
+#include // JSON format handling library
+#include // Base (sync) MQTT library
+#include // for PROGMEM use
+
+#include // Diagnostig output to the serial terminal
+#include // Diagnostic/metrics output to MQTT
+#include // Ethernet setup; todo: abstract this away into a network interface so that we can use WiFi or Ethernet
+#include // Std Ethernet library
+
+#include
+#include // MQTT Message controller
+#include // MQTT Message processor
+#include // MQTT Message model
+
+#include // DCC++-EX Parser;; used for pushing JRMI commands directly recieved
+
+//---------
+// Defines
+//---------
+
+#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked
+#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ?
+#define MAXTSTR 30 //!< max length of a topic string
+
+//---------
+// Variables
+//---------
+
+char topicName[MAXTBUF];
+char topicMessage[MAXTMSG];
+char keyword[MAX_KEYWORD_LENGTH];
+
+void mqttCallback(char *topic, byte *payload, unsigned int length);
+void dccmqttCommandHandler(char *topicName, char *topicMessage);
+bool setMsgParams(int mdix, Parameters p, uint16_t v);
+bool setMsgParamsByObj(int mdix, Parameters p, JsonObject v, bool mandatory);
+bool validateParam(Parameters p, uint16_t v);
+char *getPGMkeyword(const char *keyword);
+
+// Ethernet connection to the MQTT server
+IPAddress server(MQTT_BROKER_ADDRESS);
+EthernetClient ethClient = ETHNetwork::getServer().available();
+
+// MQTT connection
+PubSubClient mqttClient(ethClient);
+PubSubClient *DccMQTT::mqClient = &mqttClient;
+
+char _deviceId[MAXDEVICEID]; //!< string holding the device id
+char *DccMQTT::deviceID = _deviceId; //!< assign device ID to the class member
+char **DccMQTT::topics = new char *[MAXTOPICS]; //!< array of pointesr to the topic
+
+// Queue setup
+Queue DccMQTT::inComming(sizeof(DccMQTTCommandMsg *), MAXQUEUE, FIFO); //!< incomming messages : struct after preprocessing. type to be defined
+Queue DccMQTT::outGoing(sizeof(DccMQTTCommandMsg *), MAXQUEUE, FIFO); //!< outgoing messages; string in JSON format
+
+// JSON buffer
+StaticJsonDocument doc;
+
+// index into the message pool
+uint8_t midx;
+
+DCCEXParser mqttDccExParser;
+
+//---------
+// Functions
+//---------
+
+/**
+ * @brief Copies an array to a string; used for generating the unique Arduino ID
+ *
+ * @param array array containing bytes
+ * @param len length of the array
+ * @param buffer buffer to which the string will be written; make sure the buffer has appropriate length
+ */
+static void array_to_string(byte array[], unsigned int len, char buffer[])
+{
+ for (unsigned int i = 0; i < len; i++)
+ {
+ byte nib1 = (array[i] >> 4) & 0x0F;
+ byte nib2 = (array[i] >> 0) & 0x0F;
+ buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA;
+ buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA;
+ }
+ buffer[len * 2] = '\0';
+}
+
+/**
+ * @brief Maps a command recieved from the JSON string to the corresponding enum value
+ *
+ * @param c string containing the command
+ * @return Commands enum value from the Command enum
+ * @throw Returns an INVALID_C - invalid command for unknown Commands send
+ * @see Commands
+ */
+Commands resolveCommand(const char *c)
+{
+ DBG(F("Resolving Command: %s"), c);
+ if (strcmp_P(c, _kRead) == 0)
+ return READ;
+ if (strcmp_P(c, _kWrite) == 0)
+ return WRITE;
+ if (strcmp_P(c, _kPower) == 0)
+ return POWER;
+ if (strcmp_P(c, _kThrottle) == 0)
+ return THROTTLE;
+ if (strcmp_P(c, _kFunction) == 0)
+ return FUNCTION;
+ return INVALID_C;
+};
+
+DccTopics resolveTopics(const char *c)
+{
+ int i = 0;
+ while ((strcmp(c, DccMQTT::topics[i]) != 0))
+ {
+ i++;
+ if (i > NOOFDCCTOPICS)
+ {
+ return INVALID_T;
+ }
+ }
+ return (DccTopics)i;
+}
+
+/**
+ * @brief Maps parameters names recieved from the JSON string to the corresponding enum value
+ *
+ * @param c string containing the parameter name
+ * @return parameters enum value from the Parameter enum
+ */
+Parameters resolveParameters(char *c)
+{
+ DBG(F("Resolving parameter: %s"), c);
+ if (strcmp_P(c, _kCv) == 0)
+ return CV;
+ if (strcmp_P(c, _kValue) == 0)
+ return VALUE;
+ if (strcmp_P(c, _kLocomotive) == 0)
+ return LCOCMOTIVE;
+ if (strcmp_P(c, _kSpeed) == 0)
+ return SPEED;
+ if (strcmp_P(c, _kDirection) == 0)
+ return DIRECTION;
+ if (strcmp_P(c, _kFn) == 0)
+ return FN;
+ if (strcmp_P(c, _kState) == 0)
+ return STATE;
+ if (strcmp_P(c, _kTrack) == 0)
+ return TRACK;
+ if (strcmp_P(c, _kBit) == 0)
+ return BIT;
+ return INVALID_P;
+};
+
+/**
+ * @brief Maps Parameters enum values to the corresponding string
+ *
+ * @param p : Enum value of the Parameter
+ * @return const char*
+ */
+const char *resolveParameters(Parameters p)
+{
+ switch (p)
+ {
+ case CV:
+ return _kCv;
+ case VALUE:
+ return _kValue;
+ case LCOCMOTIVE:
+ return _kLocomotive;
+ case SPEED:
+ return _kSpeed;
+ case DIRECTION:
+ return _kDirection;
+ case FN:
+ return _kFn;
+ case STATE:
+ return _kState;
+ case TRACK:
+ return _kTrack;
+ case BIT:
+ return _kBit;
+ case INVALID_P:
+ return NULL;
+ }
+ return NULL;
+}
+
+/**
+ * @brief Callback executed upon reception of a message by the PubSubClient
+ *
+ * @param topic topic string
+ * @param payload serialized content of the message
+ * @param length length of the recieved message
+ *
+ */
+void mqttCallback(char *topic, byte *payload, unsigned int length)
+{
+ topicName[0] = '\0';
+ topicMessage[0] = '\0';
+ strcpy(topicName, topic);
+ strlcpy(topicMessage, (char *)payload, length + 1);
+
+ DccTopics t = resolveTopics(topicName);
+
+ switch (t)
+ {
+ case CMD_L:
+ case CMD_T:
+ case CMD_S:
+ case CMD_A:
+ {
+ INFO(F("MQTT Message arrived [%s]: %s"), topicName, topicMessage);
+ dccmqttCommandHandler(topicName, topicMessage); // /command topic recieved
+ break;
+ }
+ case JRMI:
+ {
+ INFO(F("JRMI Message arrived [%s]: %s"), topicName, topicMessage);
+ // mqttDccExParser.parse(Serial, (const byte *)topicMessage, 0); // send the message to the DCC parser for handling and return;
+ mqttDccExParser.parse(&Serial, (byte *)topicMessage, 0); // send the message to the DCC parser for handling and return;
+ return;
+ }
+ case ADMIN: {
+ INFO(F("Admin Message arrived [%s]: %s"), topicName, topicMessage);
+ return;
+ }
+ case TELEMETRY:
+ case RESULT:
+ case DIAGNOSTIC:
+ case INVALID_T:
+ {
+ // don't do anything for those
+ return;
+ }
+ }
+}
+
+/**
+ * @brief Central nervous system. All messages are passed through here parse and a pool item gets build. Syntax and Semantic checks are done here on all commands parameters etc ...
+ * All error get printed to Serial for now but will be send over MQ as well in the future for further processing by the client
+ *
+ * @param topicName : Topic on which the message has been recieved
+ * @param topicMessage : Message that has been recieved
+ *
+ */
+void dccmqttCommandHandler(char *topicName, char *topicMessage)
+{
+
+ DeserializationError err = deserializeJson(doc, topicMessage);
+
+ if (err)
+ {
+ ERR(F("deserializeJson() failed: %s"), err.c_str());
+ ERR(F("Message ignored ..."));
+ return;
+ }
+
+ JsonObject p;
+ if (doc.containsKey("p"))
+ {
+ p = doc["p"]; // parameters recieved - present in any command recieved the payload may differ though
+ }
+ else
+ {
+ ERR(F("No parameters provided in Message"));
+ return;
+ }
+
+ midx = DccMQTTCommandMsg::setMsg(resolveCommand(doc["c"]), doc["cid"]);
+ DccMQTTCommandMsg::msg[midx].nParams = 0;
+
+ DBG(F("Recieved command: [%d] : cid [%s]"), DccMQTTCommandMsg::msg[midx].cmd, DccMQTTCommandMsg::msg[midx]._cid);
+
+ switch (DccMQTTCommandMsg::msg[midx].cmd)
+ {
+ case READ:
+ {
+ // Mandatory parameters
+ if (!setMsgParamsByObj(midx, CV, p, true)) // reqd parameter not available; error printing done in setMsgParams
+ {
+ return;
+ }
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ DBG(F("Read: recieved parameter cv: [%d]"), DccMQTTCommandMsg::msg[midx].params[CV]);
+
+ // Optional parameters
+ if (setMsgParamsByObj(midx, BIT, p, false)) // found correct bit parameter if true
+ {
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ DBG(F("Read: recieved optional parameter bit: [%d]"), DccMQTTCommandMsg::msg[midx].params[BIT]);
+ }
+ break;
+ }
+ case WRITE:
+ {
+ // Mandatory parameters
+ if (!setMsgParamsByObj(midx, CV, p, true))
+ return;
+ if (!setMsgParamsByObj(midx, VALUE, p, true))
+ return;
+
+ DccMQTTCommandMsg::msg[midx].nParams = 2;
+
+ // Optional parameters
+ // If loco is present we need the track to be main
+ // Writing on Prog doesn't need a loco to be present as there shall be only one loco anyway
+ if (setMsgParamsByObj(midx, LCOCMOTIVE, p, false))
+ {
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ // verify that track is M
+ if (setMsgParamsByObj(midx, TRACK, p, false))
+ {
+ if (DccMQTTCommandMsg::msg[midx].params[TRACK] == MAIN)
+ {
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ }
+ else
+ {
+ ERR(F("Track has to be main as Locomotive has been provided "));
+ return;
+ }
+ }
+ }
+
+ if (setMsgParamsByObj(midx, BIT, p, false)) // found correct bit parameter if true
+ {
+ // verify if value is 0 or 1
+ uint16_t v = DccMQTTCommandMsg::msg[midx].params[VALUE];
+ if (v == 0 || v == 1)
+ {
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ }
+ else
+ {
+ ERR(F("Value has to be 0 or 1 as bit to be written has been set"));
+ return;
+ }
+ }
+ break;
+ }
+ case POWER:
+ {
+ // Mandatory parameters
+ if (!setMsgParamsByObj(midx, TRACK, p, true)) // reqd parameter track not available; error printing done in setMsgParams
+ {
+ return;
+ };
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ if (!setMsgParamsByObj(midx, STATE, p, true)) // reqd parameter state not available; error printing done in setMsgParams
+ {
+ return;
+ }
+ DccMQTTCommandMsg::msg[midx].nParams++;
+ DBG(F("Power: recieved parameter track: %d state: %d "), DccMQTTCommandMsg::msg[midx].params[TRACK], DccMQTTCommandMsg::msg[midx].params[STATE]);
+ break;
+ }
+ case THROTTLE:
+ {
+ // Mandatory parameters
+ if (!setMsgParamsByObj(midx, LCOCMOTIVE, p, true))
+ return;
+ if (!setMsgParamsByObj(midx, SPEED, p, true))
+ return;
+ if (!setMsgParamsByObj(midx, DIRECTION, p, true))
+ return;
+ DccMQTTCommandMsg::msg[midx].nParams = 3;
+ DBG(F("Throttle: recieved parameter locomotive: %d speed: %d direction: %d "), DccMQTTCommandMsg::msg[midx].params[LCOCMOTIVE], DccMQTTCommandMsg::msg[midx].params[SPEED], DccMQTTCommandMsg::msg[midx].params[DIRECTION]);
+ break;
+ }
+ case FUNCTION:
+ {
+ // Mandatory parameters
+ if (!setMsgParamsByObj(midx, LCOCMOTIVE, p, true))
+ return;
+ if (!setMsgParamsByObj(midx, FN, p, true))
+ return;
+ if (!setMsgParamsByObj(midx, STATE, p, true))
+ return;
+ DccMQTTCommandMsg::msg[midx].nParams = 3;
+ break;
+ }
+ case INVALID_C:
+ default:
+ {
+ ERR(F("Invalid command recieved"));
+ return;
+ }
+ }
+ // Note: Do not use the client in the callback to publish, subscribe or
+ // unsubscribe as it may cause deadlocks when other things arrive while
+ // sending and receiving acknowledgments. Instead, change a global variable,
+ // or push to a queue and handle it in the loop after calling `DccMQTT::loop()`
+
+ // enqueue the cmdMsg pool item used for passing the commands
+ DccMQTT::pushIn(midx);
+}
+
+/**
+ * @brief Set the parameters in the pool item to be send to the processor
+ *
+ * @param mdix : Index of the pool item
+ * @param p : Parameter to be set
+ * @param v : Json object of the parameter value recieved
+ * @param mandatory : if the parameter is mandatory
+ * @return true
+ * @return false the value is 0 return false
+ */
+bool setMsgParamsByObj(int mdix, Parameters p, JsonObject v, bool mandatory)
+{
+ // This does make two calls deserialzing the parameter; may possibly be optimized
+ bool success = true;
+ char *kw = getPGMkeyword(resolveParameters(p));
+ if (v.containsKey(kw) == false)
+ {
+ if (mandatory)
+ {
+ ERR(F("Wrong parameter provided: [%s]"), kw);
+ return false;
+ }
+ else
+ {
+ INFO(F("Ignoring optional parameter: [%s]"), kw);
+ return false;
+ }
+ }
+ switch (p)
+ {
+ case BIT:
+ {
+ DccMQTTCommandMsg::msg[midx].params[p] = v[kw];
+ success = validateParam(BIT, v[kw]);
+ break;
+ }
+ case TRACK:
+ {
+ char tr[2] = {0};
+ strncpy(tr, v[kw], 2);
+ DccMQTTCommandMsg::msg[midx].params[p] = tr[0];
+ success = validateParam(TRACK, tr[0]);
+ break;
+ }
+ case DIRECTION:
+ {
+ char tr[2] = {0};
+ strncpy(tr, v[kw], 2);
+ success = validateParam(DIRECTION, tr[0]);
+ if (tr[0] == 'F') {
+ DccMQTTCommandMsg::msg[midx].params[p] = 1;
+ } else {
+ DccMQTTCommandMsg::msg[midx].params[p] = 0;
+ }
+ break;
+ }
+ case STATE:
+ {
+ char st[4] = {0};
+ strncpy(st, v[kw], 3);
+ /**
+ * @todo validate ON/OFF keywords here so that we don't handle any garbage comming in
+ *
+ */
+ DBG(F("State keyword [%s] [%s]"), kw, st);
+ if (st[1] == 'N') {
+ DccMQTTCommandMsg::msg[midx].params[p] = 1;
+ } else {
+ DccMQTTCommandMsg::msg[midx].params[p] = 0;
+ }
+ // DccMQTTCommandMsg::msg[midx].params[p] = v.getMember(kw);
+ break;
+ }
+ case CV:
+ case VALUE:
+ case LCOCMOTIVE:
+ case SPEED:
+ case FN:
+ {
+ DccMQTTCommandMsg::msg[midx].params[p] = v.getMember(kw);
+ break;
+ }
+ case INVALID_P:
+ {
+ success = false;
+ break;
+ }
+ }
+ DBG(F("Set Parameter [%s] to [%d] [%s]"), kw, DccMQTTCommandMsg::msg[midx].params[p], success ? "OK" : "FAILED");
+ return success;
+};
+
+/**
+ * @brief Validates permitted values for Parameters
+ *
+ * @param p : Parameter
+ * @param v : value to validate for the given Parameter
+ * @return true : if value is allowed
+ * @return false : false if value is not allowed
+ */
+bool validateParam(Parameters p, uint16_t v)
+{
+
+ bool valid;
+
+ switch (p)
+ {
+ case CV:
+ case VALUE:
+ case LCOCMOTIVE:
+ case SPEED:
+ case FN:
+ case STATE:
+ case INVALID_P:
+ {
+ valid = true;
+ break;
+ }
+ case DIRECTION:
+ {
+ if (v == 'F' || v == 'R')
+ {
+ valid = true;
+ }
+ else
+ {
+ ERR(F("Wrong value for direction provided (F or R)"));
+ valid = false;
+ }
+ break;
+ }
+ case TRACK:
+ {
+ if (v == MAIN || v == ALL || v == PROG)
+ {
+ valid = true;
+ }
+ else
+ {
+ ERR(F("Wrong value for track provided (M or A or P)"));
+ valid = false;
+ }
+ break;
+ }
+ case BIT:
+ {
+ if (v >= 0 && v <= 7)
+ {
+ valid = true;
+ }
+ else
+ {
+ ERR(F("Wrong parameter value provided for bit (>= 0 and <= 7)"));
+ valid = false;
+ }
+ break;
+ }
+ }
+ DBG(F("Validated parameter:[%s] Value:[%d] [%s]"), getPGMkeyword(resolveParameters(p)), v, valid ? "OK" : "FAILED");
+ return valid;
+}
+
+/**
+ * @brief Retrieves a keyword from PROGMEM
+ *
+ * @param k keyword to retrieve
+ * @return char* string copied into SRAM
+ */
+char *getPGMkeyword(const char *k)
+{
+ strncpy_P(keyword, k, MAX_KEYWORD_LENGTH);
+ return keyword;
+};
+
+/**
+ * @brief Pushes a message into the the in comming queue; locks the pool item
+ *
+ * @param midx the index of the message in the pool to be pushed into the queue
+ */
+void DccMQTT::pushIn(uint8_t midx)
+{
+ static uint32_t msgInCnt = 101;
+ if (!DccMQTT::inComming.isFull())
+ {
+ DccMQTTCommandMsg::msg[midx].msgId = msgInCnt;
+ DccMQTTCommandMsg::msg[midx].free = false;
+ DccMQTTCommandMsg::msg[midx].mIdx = midx;
+ DccMQTT::inComming.push(&DccMQTTCommandMsg::msg[midx].mIdx);
+ DBG(F("Enqueued incomming msg[%d] #%d"), midx, DccMQTTCommandMsg::msg[midx].msgId);
+ // DccMQTTCommandMsg::printCmdMsg(&DccMQTTCommandMsg::msg[midx]);
+
+ msgInCnt++;
+ }
+ else
+ {
+ ERR(F("Dcc incomming message queue is full; Message ignored"));
+ }
+}
+
+/**
+ * @brief Pushes a message into the the outgoing queue; locks the pool item;
+ *
+ * @param midx the index of the message in the pool to be pushed into the queue
+ */
+void DccMQTT::pushOut(uint8_t midx)
+{
+ static uint32_t msgOutCnt = 501;
+ if (!DccMQTT::outGoing.isFull())
+ {
+ DccMQTTCommandMsg::msg[midx].msgId = msgOutCnt;
+ DccMQTTCommandMsg::msg[midx].free = false;
+ DccMQTTCommandMsg::msg[midx].mIdx = midx;
+ DccMQTT::outGoing.push(&DccMQTTCommandMsg::msg[midx].mIdx);
+ DBG(F("Enqueued outgoing msg #%d"), DccMQTTCommandMsg::msg[midx].msgId);
+ msgOutCnt++;
+ }
+ else
+ {
+ ERR(F("Dcc outgoing message queue is full; Message ignored"));
+ }
+}
+
+/**
+ * @brief pops a message from the the in comming queue; The pool item used is still in use and stays locked
+ *
+ * @param midx the index of the message in the pool to be poped
+ * @return index in the message pool of the message which has been poped; -1 if the queue is empty
+ */
+uint8_t DccMQTT::popIn()
+{
+ uint8_t i;
+ if (!DccMQTT::inComming.isEmpty())
+ {
+ DccMQTT::inComming.pop(&i);
+ DccMQTTCommandMsg::msg[i].free = false;
+ DBG(F("Dequeued incomming msg #%d, cid: %s"), DccMQTTCommandMsg::msg[i].msgId, DccMQTTCommandMsg::msg[i]._cid);
+ return i;
+ }
+ return -1;
+}
+/**
+ * @brief pops a message from the the outgoing queue; The pool item used is freed and can be reused
+ *
+ * @param midx the index of the message in the pool to be poped
+ * @return index in the message pool of the message which has been poped; -1 if the queue is empty
+ */
+uint8_t DccMQTT::popOut()
+{
+ uint8_t i;
+ if (!DccMQTT::outGoing.isEmpty())
+ {
+ DccMQTT::outGoing.pop(&i);
+ DccMQTTCommandMsg::msg[i].free = true;
+ DBG(F("Dequeued outgoing msg #%d, cid: %s"), DccMQTTCommandMsg::msg[i].msgId, DccMQTTCommandMsg::msg[i]._cid);
+ // DccMQTTCommandMsg::printCmdMsg(&DccMQTTCommandMsg::msg[i]);
+ // DccMQTTCommandMsg::printCmdMsgPool();
+ return i;
+ }
+ return -1;
+}
+
+/**
+ * @brief in case we lost the connection to the MQTT broker try to restablish a conection
+ *
+ */
+static void reconnect()
+{
+ DBG(F("MQTT (re)connecting ..."));
+
+ while (!mqttClient.connected())
+ {
+ INFO(F("Attempting MQTT Broker connection..."));
+ // Attempt to connect
+#ifdef CLOUDBROKER
+ char *connectID = new char[40];
+
+ connectID[0] = '\0';
+ strcat(connectID, MQTT_BROKER_CLIENTID_PREFIX);
+ strcat(connectID,DccMQTT::getDeviceID());
+
+ INFO(F("ConnectID: %s %s %s"), connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD);
+ #ifdef MQTT_BROKER_USER
+ DBG(F("MQTT (re)connecting (Cloud/User) ..."));
+ if (mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0))
+ #else
+ DBG(F("MQTT (re)connecting (Cloud) ..."));
+ if (mqttClient.connect(DccMQTT::getDeviceID()))
+ #endif
+#else
+ #ifdef MQTT_BROKER_USER
+ DBG(F("MQTT (re)connecting (Local/User) ..."));
+ if (mqttClient.connect(DccMQTT::getDeviceID(), MQTT_BROKER_USER, MQTT_BROKER_PASSWD))
+ #else
+ DBG(F("MQTT (re)connecting (Local) ..."));
+ if (mqttClient.connect(DccMQTT::getDeviceID()))
+ #endif
+#endif
+ {
+ INFO(F("MQTT broker connected ..."));
+ // publish on the $connected topic
+ DccMQTT::subscribe(); // required in case of a connection loss to do it again (this causes a mem leak !! of 200bytes each time!!)
+ }
+ else
+ {
+ INFO(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient.state());
+ }
+ }
+}
+
+/**
+ * @brief Test if the mqtt client is connected
+ *
+ * @return true - connected
+ * @return false - not connected
+ */
+bool DccMQTT::connected()
+{
+ return mqttClient.connected();
+}
+
+/**
+ * @brief builds the topic strings for the intents together with the unique ID and the Ressource letter
+ *
+ * @param c char for specific chnalle under the intent; if '_' it will not be used
+ * @param t index for the topic in the list of topics
+ * @return char* of the topic string build
+ */
+static char *buildTopicID(char c, DccTopics t)
+{
+ char *str = new char[MAXTSTR];
+ str[0] = '\0'; // flush the string
+ switch (t)
+ {
+ case RESULT:
+ {
+ strcat(str, TCMRESROOT); // results channel
+ strcat(str, DccMQTT::getDeviceID());
+ break;
+ }
+ case ADMIN:
+ {
+ strcat(str, ADMROOT); // admin
+ strcat(str, DccMQTT::getDeviceID());
+ break;
+ }
+ case TELEMETRY:
+ {
+ strcat(str, TELEMETRYROOT); // telemetry
+ strcat(str, DccMQTT::getDeviceID());
+ break;
+ }
+ case DIAGNOSTIC:
+ {
+ strcat(str, DIAGROOT); // diganostics
+ strcat(str, DccMQTT::getDeviceID());
+ break;
+ }
+ case JRMI:
+ {
+ strcat(str, JRMIROOT); // for JRMI formmated input from the JRMI topic
+ strcat(str, DccMQTT::getDeviceID());
+ break;
+ }
+ case CMD_L:
+ case CMD_A:
+ case CMD_T:
+ case CMD_S:
+ {
+ strcat(str, TCMDROOT);
+ strcat(str, DccMQTT::getDeviceID());
+ strncat(str, "/", 1);
+ strncat(str, &c, 1);
+ break;
+ }
+ case INVALID_T:
+ return NULL;
+ }
+
+ DccMQTT::topics[t] = str;
+ INFO(F("Topic: %s created"), DccMQTT::topics[t]);
+ return DccMQTT::topics[t];
+}
+
+void DccMQTT::printTopics()
+{
+ INFO(F("List of subcribed Topics ... "));
+ for (int i = 0; i < MAXTOPICS; i++)
+ {
+ INFO(F("Topic: %s"), DccMQTT::topics[i]);
+ }
+}
+
+void DccMQTT::subscribe()
+{
+ static bool subscribed = false;
+ if (subscribed == false) // Subscribe for the first time
+ {
+ mqttClient.subscribe(buildTopicID('L', CMD_L)); // command//L - Subscription CSID is 18 in length
+ mqttClient.subscribe(buildTopicID('T', CMD_T)); // command//T - Subscription only
+ mqttClient.subscribe(buildTopicID('S', CMD_S)); // command//S - Subscription only
+ mqttClient.subscribe(buildTopicID('A', CMD_A)); // command//A - Subscription only
+ mqttClient.subscribe(buildTopicID('_', RESULT)); // command Response - Publish only
+ mqttClient.subscribe(buildTopicID('_', ADMIN)); // admin - Publish / Subscribe
+ mqttClient.subscribe(buildTopicID('_', DIAGNOSTIC)); // diagnostics - Publish / Subscribe
+ mqttClient.subscribe(buildTopicID('_', TELEMETRY)); // metrics - Publish / Subscribe
+ mqttClient.subscribe(buildTopicID('_', JRMI)); // metrics - Publish / Subscribe
+ subscribed = true;
+ }
+ else // already subscribed once so we have the topics stored no need to rebuild them
+ {
+ for (int i = 0; i < MAXTOPICS; i++)
+ {
+ mqttClient.subscribe(DccMQTT::topics[i]);
+ // mqttClient.subscribe("diag/memory", 0); // to be replaced by telemetry
+ }
+ }
+}
+
+void DccMQTT::subscribeT(char *topic)
+{
+ mqttClient.subscribe(topic);
+}
+
+#define PUB_RES_FMT "{\"cid\":\"%s\", \"result\":\"%d\"}"
+PROGMEM const char resfmt[] = {PUB_RES_FMT};
+
+void DccMQTT::publish()
+{
+ char _memMsg[64]{'\0'}; //!< string buffer for the serialized message to return
+ if (!DccMQTT::outGoing.isEmpty())
+ {
+ uint8_t c = DccMQTT::popOut();
+ if (c != -1)
+ {
+ /**
+ * @todo check for the length of theoverall response and make sure its smalle than the _memMsg buffer size
+ */
+ sprintf_P(_memMsg, resfmt, DccMQTTCommandMsg::msg[c]._cid, DccMQTTCommandMsg::msg[c].result);
+
+ INFO(F("Sending result: %s length: %d"), _memMsg, strlen(_memMsg));
+ mqttClient.publish(DccMQTT::topics[RESULT], _memMsg);
+ };
+ };
+}
+/**
+ * @brief Initalizes the MQTT broker connection; subcribes to all reqd topics and sends the deviceID to the broker on the /admin channel
+ *
+ */
+#define PUB_CSID_FMT "{\"csid\":\"%s\"}"
+PROGMEM const char csidfmt[] = {PUB_CSID_FMT};
+
+
+void DccMQTT::setup(DCCEXParser p)
+{
+ char _csidMsg[64]{'\0'}; //!< string buffer for the serialized message to return
+ mqttClient.setServer(server, MQTT_BROKER_PORT); // Initalize MQ broker
+ DBG(F("MQTT Client : Server ok ..."));
+ mqttClient.setCallback(mqttCallback); // Initalize callback function for incomming messages
+ DBG(F("MQTT Client : Callback set ..."));
+
+ DccMQTT::setDeviceID(); // set the unique device ID to bu used for creating / listening to topic
+
+ /**
+ * @todo check for connection failure
+ */
+ reconnect(); // inital connection as well as reconnects
+ DccMQTT::subscribe(); // set up all subscriptionn
+ INFO(F("MQTT subscriptons done..."));
+ sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID());
+ mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic
+
+
+ /**
+ * @todo set the connect status with a retained message on the $connected topic /admin//$connected as used in the connect
+ *
+ */
+
+ DccTelemetry::setup();
+ mqttDccExParser = p;
+}
+
+void DccMQTT::setDeviceID()
+{
+ array_to_string(UniqueID, UniqueIDsize, _deviceId);
+ INFO(F("UniqueID: %s"), _deviceId);
+}
+
+char *DccMQTT::getDeviceID()
+{
+ return DccMQTT::deviceID;
+};
+
+void DccMQTT::loop()
+{
+
+ DccTelemetry::deltaT(1);
+
+ if (!mqttClient.connected())
+ {
+ reconnect();
+ }
+ if (!mqttClient.loop())
+ {
+ ERR(F("mqttClient returned with error; state: %d"), mqttClient.state());
+ };
+ DccMQTTProc::loop(); //!< give time to the command processor to handle msg ..
+ DccMQTT::publish(); //!< publish waiting messages from the outgoing queue
+ DccTelemetry::deltaT(1);
+}
+
+bool DccMQTT::inIsEmpty()
+{
+ if (DccMQTT::inComming.isEmpty())
+ {
+ return true;
+ }
+ return false;
+}
+
+bool DccMQTT::outIsEmpty()
+{
+ if (DccMQTT::outGoing.isEmpty())
+ {
+ return true;
+ }
+ return false;
+};
+
+DccMQTT::DccMQTT()
+{
+ // long l = random();
+}
+
+DccMQTT::~DccMQTT()
+{
+}
\ No newline at end of file
diff --git a/DccMQTT.h b/DccMQTT.h
new file mode 100644
index 0000000..bbf4b10
--- /dev/null
+++ b/DccMQTT.h
@@ -0,0 +1,125 @@
+#ifndef _DccMQTT_h_
+#define _DccMQTT_h_
+/**
+ * @file DccMQTT.h
+ * @author Gregor Baues
+ * @brief MQTT protocol controller for DCC-EX. Sets up and maintains the connection to the MQTT broker incl setting up the topics.
+ * Topics are created specifically for the command station on which the code runs. Manages subsriptions as well as recieving/sending of messages on the different topics.
+ * @version 0.1
+ * @date 2020-07-08
+ *
+ * @copyright Copyright (c) 2020
+ *
+ * This is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * It is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details .
+ */
+
+// #include
+#include
+// #include
+#include
+#include
+
+/**
+ * @brief MQTT broker configuration done in config.h
+ */
+
+
+// Class for setting up the MQTT connection / topics / queues for processing commands and sendig back results
+
+#define MAXDEVICEID 20 // maximum length of the unique id / device id
+#define MAXTOPICS 8 // command L,T,S,A plus response plus admin for inital handshake
+#define TCMDROOT "command/" // root of command topics
+#define TCMRESROOT "result/" // root of the result topic
+#define ADMROOT "admin/" // root of the admin topic where whe can do hanshakes for the inital setup
+; // esp for sec reasons i.e. making sure we are talking to the right device and
+; // not some one elses
+#define TELEMETRYROOT "telemetry/" // telemetry topic
+#define DIAGROOT "diag/" // diagnostics
+#define JRMIROOT "jrmi/"
+
+#define NOOFDCCTOPICS 11
+enum DccTopics {
+ CMD_L, // L is Loco or Layout(power on/off)
+ CMD_T,
+ CMD_S,
+ CMD_A,
+ RESULT,
+ ADMIN,
+ TELEMETRY,
+ DIAGNOSTIC,
+ JRMI,
+ INVALID_T
+};
+
+/**
+ * @brief List of keywords used in the command protocol
+ *
+ */
+#define MAX_KEYWORD_LENGTH 11
+PROGMEM const char _kRead[] = {"read"};
+PROGMEM const char _kWrite[] = {"write"};
+PROGMEM const char _kPower[] = {"power"};
+PROGMEM const char _kThrottle[] = {"throttle"};
+PROGMEM const char _kFunction[] = {"function"};
+PROGMEM const char _kCv[] = {"cv"};
+PROGMEM const char _kSpeed[] = {"speed"};
+PROGMEM const char _kLocomotive[] = {"locomotive"};
+PROGMEM const char _kValue[] = {"value"};
+PROGMEM const char _kDirection[] = {"direction"};
+PROGMEM const char _kState[] = {"state"};
+PROGMEM const char _kFn[] = {"fn"};
+PROGMEM const char _kTrack[] = {"track"};
+PROGMEM const char _kBit[] = {"bit"};
+
+/**
+ * @brief The ingoin and outgoing queues can hold 20 messages each; this should be bigger than the number
+ * of statically allocated pool items whose pointers are getting pushed into the queues.
+ *
+ */
+#define MAXQUEUE 20 // MAX message queue length
+
+class DccMQTT
+{
+private:
+
+ static char *deviceID; // Unique Device Identifier; based on the chip
+ static Queue inComming; // incomming messages queue; the queue only contains indexes to the message pool
+ static Queue outGoing; // outgoing messages queue; the queue only contains indexes to the message pool
+
+public:
+ static char **topics; // list of pub/sub topics
+ static PubSubClient *mqClient;
+
+ static void setup(DCCEXParser p); // main entry to get things going
+ static void loop(); // recieveing commands / processing commands / publish results
+ static bool connected(); // true if the MQ client is connected
+
+ static char *getDeviceID();
+ static void setDeviceID();
+ static void subscribe(); // subscribes to all relevant topics
+ static void subscribeT(char *topic);// subscribe to a particular topic for other than the std ones in subscribe (e.g. telemetry)
+ static void publish(); // publishes a JSON message constructed from the outgoing queue (cid and result)
+ static void printTopics(); // prints the list of subscribed topics - debug use
+ static bool inIsEmpty(); // test if the incomming queue is empty
+ static bool outIsEmpty(); // test if the outgoing queue is empty
+ static void pushIn(uint8_t midx); // push a command struct into the incomming queue for processing
+ static void pushOut(uint8_t midx); // push a command struct into the incomming queue for processing
+ static uint8_t popOut(); // pop a command struct with the result to be published
+ static uint8_t popIn(); // pop a command struct from the in comming queue for processing
+
+ static void pub_free_memory(int fm);
+
+ DccMQTT();
+ ~DccMQTT();
+};
+
+
+#endif
\ No newline at end of file
diff --git a/Queue.h b/Queue.h
new file mode 100644
index 0000000..3f3256d
--- /dev/null
+++ b/Queue.h
@@ -0,0 +1,106 @@
+/*
+ * Queue.h
+ *
+ */
+
+#ifndef _Queue_h_
+#define _Queue_h_
+
+#include
+
+template
+class Queue {
+
+ private:
+ int _front, _back, _count;
+ T *_data;
+ int _maxitems;
+
+ public:
+ Queue(int maxitems = 256) {
+ _front = 0;
+ _back = 0;
+ _count = 0;
+ _maxitems = maxitems;
+ _data = new T[maxitems + 1];
+ }
+ ~Queue() {
+ delete[] _data;
+ }
+ inline int count();
+ inline int front();
+ inline int back();
+ void push(const T &item);
+ T peek();
+ T pop();
+ void clear();
+};
+
+template
+inline int Queue::count()
+{
+ return _count;
+}
+
+template
+inline int Queue::front()
+{
+ return _front;
+}
+
+template
+inline int Queue::back()
+{
+ return _back;
+}
+
+template
+void Queue::push(const T &item)
+{
+ if(_count < _maxitems) { // Drops out when full
+ _data[_back++]=item;
+ ++_count;
+ // Check wrap around
+ if (_back > _maxitems)
+ _back -= (_maxitems + 1);
+ }
+}
+
+template
+T Queue::pop() {
+ if(_count <= 0) return T(); // Returns empty
+ else {
+ T result = _data[_front];
+ _front++;
+ --_count;
+ // Check wrap around
+ if (_front > _maxitems)
+ _front -= (_maxitems + 1);
+ return result;
+ }
+}
+
+template
+T Queue::peek() {
+ if(_count <= 0) return T(); // Returns empty
+ else return _data[_front];
+}
+
+template
+void Queue::clear()
+{
+ _front = _back;
+ _count = 0;
+}
+
+#endif // _Queue_h_
+
+
+
+
+
+
+
+
+
+
diff --git a/defines.h b/defines.h
index 3bf314f..4d8bce0 100644
--- a/defines.h
+++ b/defines.h
@@ -40,8 +40,9 @@
// MQTT is on only if the ethernet is enabled
#if ENABLE_MQTT && ETHERNET_ON
- #define MQTT_ON
+ #define MQTT_ON true
#else
+ #define MQTT_ON false
#warning MQTT requires Ethernet to be enabled.
#endif
diff --git a/platformio.ini b/platformio.ini
index a05a191..047c7b9 100644
--- a/platformio.ini
+++ b/platformio.ini
@@ -23,6 +23,7 @@ framework = arduino
upload_protocol = atmel-ice
lib_deps =
${env.lib_deps}
+ PubSubClient
SparkFun External EEPROM Arduino Library
monitor_speed = 115200
monitor_flags = --echo
@@ -33,6 +34,7 @@ board = megaatmega2560
framework = arduino
lib_deps =
${env.lib_deps}
+ PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -44,6 +46,7 @@ board = uno
framework = arduino
lib_deps =
${env.lib_deps}
+ PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -55,6 +58,7 @@ board = uno_wifi_rev2
framework = arduino
lib_deps =
${env.lib_deps}
+ PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -67,6 +71,7 @@ board = uno
framework = arduino
lib_deps =
${env.lib_deps}
+ PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200