diff --git a/DccMQTT.cpp b/DccMQTT.cpp index 62e3037..e973793 100644 --- a/DccMQTT.cpp +++ b/DccMQTT.cpp @@ -50,7 +50,29 @@ char topicName[MAXTBUF]; char topicMessage[MAXTMSG]; // char keyword[MAX_KEYWORD_LENGTH]; +// pairing functions used for creating a client identifier +// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published +// shall be a admin message with a random number +// the cs will based on a counter use a second number to create the cantor encoding of both numbers and publish the cantor code +// this message will be seen by all throttles and they can decode the number which provides the first number they send and the +// second number to be used as tpoic for the external system from then on. The CS will recieve on all topics the commands and +// during processing then send the replies to the topic from which the command was recieved. +// Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients +int cantorEncode(int a, int b) +{ + + return (((a + b) * (a + b + 1)) / 2) + b; +} + +void cantorDecode(int c, int *a, int *b) +{ + + int w = floor((sqrt(8 * c + 1) - 1) / 2); + int t = (w * (w + 1)) / 2; + *b = c - t; + *a = w - *b; +} /** * @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID @@ -78,20 +100,43 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) topicMessage[0] = '\0'; strcpy(topicName, topic); - auto pool = mqtt->getPool(); - auto q = mqtt->getIncomming(); + switch (payload[0]) + { + case '<': + { + // DCC-EX command + auto pool = mqtt->getPool(); + auto q = mqtt->getIncomming(); - csmsg_t tm; - strlcpy(tm.cmd, (char *)payload, length + 1); - // Add the recieved command to the pool - int idx = pool->setItem(tm); - if ( idx == -1) { - DIAG(F("MQTT Command pool full. Could not handle recieved command.")); - return; + csmsg_t tm; + strlcpy(tm.cmd, (char *)payload, length + 1); + // Add the recieved command to the pool + int idx = pool->setItem(tm); + if (idx == -1) + { + DIAG(F("MQTT Command pool full. Could not handle recieved command.")); + return; + } + // Add the index of the pool item to the incomming queue + q->push(idx); + DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd); + break; + } + case '(': + { + // MQTT Ctrl command + payload[length] = '\0'; + DIAG(F("MQTT Ctrl Message arrived [%s]: [%s]"), topicName, (char *)payload); + break; + } + default: + { + // invalid command + payload[length] = '\0'; + DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload); + break; + } } - // Add the index of the pool item to the incomming queue - q->push(idx); - DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd); } /** @@ -105,8 +150,9 @@ void DccMQTT::connect() connectID[0] = '\0'; int reconnectCount = 0; - - if(broker->prefix != nullptr) { + + if (broker->prefix != nullptr) + { char tmp[20]; strcpy_P(tmp, (const char *)broker->prefix); connectID[0] = '\0'; @@ -118,7 +164,7 @@ void DccMQTT::connect() DIAG(F("MQTT %s (re)connecting ..."), connectID); // Build the connect ID : Prefix + clientID -while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) + while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) { DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType); switch (broker->cType) @@ -162,8 +208,9 @@ while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) // for the time being only one topic at the root which os the unique clientID from the MCU // QoS is 0 by default -boolean DccMQTT::subscribe() { - return mqttClient.subscribe(clientID); +boolean DccMQTT::subscribe() +{ + return mqttClient.subscribe(clientID); } /** @@ -210,10 +257,10 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) array_to_string(mqbid, CLIENTIDSIZE, clientID); DIAG(F("MQTT Client ID : %s"), clientID); - connect(); // inital connection as well as reconnects - auto sub = DccMQTT::subscribe(); // set up all subscriptions + connect(); // inital connection as well as reconnects + auto sub = DccMQTT::subscribe(); // set up all subscriptions - DIAG(F("MQTT subscriptons %s..."), sub ? "ok":"failed"); + DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); mqttClient.publish(clientID, "Hello from DccEX"); @@ -228,12 +275,9 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) // mqttDccExParser = p; } - void DccMQTT::loop() { - // DccTelemetry::deltaT(1); - if (!mqttClient.connected()) { connect(); @@ -242,9 +286,30 @@ void DccMQTT::loop() { DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state()); }; + + // read incomming queue for processing; one per loop + bool state; + if (in.count() > 0) + { + auto idx = in.peek(); + auto c = pool.getItem(in.pop(), &state); + DIAG(F("MQTT Processing pool: %d with command: %s"), idx, c->cmd); + } + + // read outgoing queue for publishing replies; one per loop + if (out.count() > 0) + { + auto m = pool.getItem(out.pop(), &state); + DIAG(F("MQTT Publish reply from command %s"), m->cmd); + } + // DccMQTTProc::loop(); //!< give time to the command processor to handle msg .. + // take a command from the incomming queue + // execute it + // store the results in the outgoing queue + // DccMQTT::publish(); //!< publish waiting messages from the outgoing queue - // DccTelemetry::deltaT(1); + // if there is someting in the outgoing queue publish on response } // #include // for PROGMEM use diff --git a/DccMQTT.h b/DccMQTT.h index ee23d86..c7874b7 100644 --- a/DccMQTT.h +++ b/DccMQTT.h @@ -22,9 +22,10 @@ #define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked #define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ? #define MAXTSTR 30 //!< max length of a topic string -#define MAXCONNECTID 40 -#define CLIENTIDSIZE 6 -#define MAXRECONNECT 5 +#define MAXCONNECTID 40 // Broker connection id length incl possible prefixes +#define CLIENTIDSIZE 6 // +#define MAXRECONNECT 5 // reconnection tries before final failure +#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers // Define Broker configurations; Values are provided in the following order // MQTT_BROKER_PORT 9883 @@ -79,10 +80,20 @@ struct MQTTBroker }; }; +/** + * @brief dcc-ex command as recieved via MQ + * + */ typedef struct csmsg_t { char cmd[MAXPAYLOAD]; } csmsg_t; +typedef struct csmqttclient_t { + uint8_t subscriber; + uint8_t cs; + +} csmqttclient_t + enum DccMQTTState { INIT, @@ -100,7 +111,9 @@ private: void setup(const FSH *id, MQTTBroker *broker); void connect(); // (re)connects to the broker - boolean subscribe(); + boolean subscribe(); + // static int cantorEncode(int a, int b); + // static void cantorDecode(int c, int *a, int *b); EthernetClient ethClient; // TCP Client object for the MQ Connection IPAddress server; // MQTT server object @@ -115,6 +128,9 @@ private: DccMQTTState mqState = INIT; + + + public: static DccMQTT *get() noexcept { diff --git a/DccMQTTProc.cpp b/DccMQTTProc.cpp new file mode 100644 index 0000000..2526799 --- /dev/null +++ b/DccMQTTProc.cpp @@ -0,0 +1,16 @@ +/** + * @file DccMQTTProc.cpp + * @author Gregor Baues (gr2bba@gmail.com) + * @brief Eexecuting DCC commands recieved through MQTT. + * @version 0.1 + * @date 08-07-2020 + * + * @copyright Copyright (c) 2021 + * + * Licenced under GPLv3 + */ + +#include +#include + + diff --git a/DccMQTTProc.h b/DccMQTTProc.h new file mode 100644 index 0000000..f6addbb --- /dev/null +++ b/DccMQTTProc.h @@ -0,0 +1,15 @@ +#ifndef _DccMQTTProc_h_ +#define _DccMQTTProc_h_ + +class DccMQTTProc +{ +private: + +public: + static void loop(); + + DccMQTTProc() = default; + ~DccMQTTProc() = default; +}; + +#endif \ No newline at end of file