diff --git a/CommandDistributor.cpp b/CommandDistributor.cpp index 4f5f261..dd9c7cd 100644 --- a/CommandDistributor.cpp +++ b/CommandDistributor.cpp @@ -19,18 +19,21 @@ #include #include "CommandDistributor.h" #include "WiThrottle.h" -#include "DIAG.h" -DCCEXParser * CommandDistributor::parser=0; +DCCEXParser *CommandDistributor::parser = 0; -void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * streamer) { - DIAG(F("CDS %d : %s : %x"), clientId, (char *) buffer, streamer); - if (buffer[0] == '<') { - if (!parser) parser = new DCCEXParser(); - parser->parse(streamer, buffer, streamer); +void CommandDistributor::parse(byte clientId, byte *buffer, RingStream *streamer) +{ + if (buffer[0] == '<') + { + if (!parser) + { + parser = new DCCEXParser(); + } + parser->parse(streamer, buffer, streamer); } - else { - DIAG(F("CDS WiThrottle")); + else + { WiThrottle::getThrottle(clientId)->parse(streamer, buffer); } } diff --git a/CommandStation-EX.ino b/CommandStation-EX.ino index 745668c..106ab6f 100644 --- a/CommandStation-EX.ino +++ b/CommandStation-EX.ino @@ -1,29 +1,28 @@ //////////////////////////////////////////////////////////////////////////////////// -// DCC-EX CommandStation-EX Please see https://DCC-EX.com +// DCC-EX CommandStation-EX Please see https://DCC-EX.com // // This file is the main sketch for the Command Station. -// -// CONFIGURATION: +// +// CONFIGURATION: // Configuration is normally performed by editing a file called config.h. // This file is NOT shipped with the code so that if you pull a later version // of the code, your configuration will not be overwritten. // // If you used the automatic installer program, config.h will have been created automatically. -// -// To obtain a starting copy of config.h please copy the file config.example.h which is -// shipped with the code and may be updated as new features are added. -// +// +// To obtain a starting copy of config.h please copy the file config.example.h which is +// shipped with the code and may be updated as new features are added. +// // If config.h is not found, config.example.h will be used with all defaults. //////////////////////////////////////////////////////////////////////////////////// -#if __has_include ( "config.h") - #include "config.h" +#if __has_include("config.h") +#include "config.h" #else - #warning config.h not found. Using defaults from config.example.h - #include "config.example.h" +#warning config.h not found. Using defaults from config.example.h +#include "config.example.h" #endif - /* * © 2020,2021 Chris Harlow, Harald Barth, David Cutting, * Fred Decker, Gregor Baues, Anthony W - Dayton All rights reserved. @@ -43,10 +42,9 @@ * along with CommandStation. If not, see . */ - #include "DCCEX.h" -// Create a serial command parser for the USB connection, +// Create a serial command parser for the USB connection, // This supports JMRI or manual diagnostics and commands // to be issued from the USB serial console. DCCEXParser serialParser; @@ -58,14 +56,15 @@ void setup() // Responsibility 1: Start the usb connection for diagnostics // This is normally Serial but uses SerialUSB on a SAMD processor Serial.begin(115200); - - CONDITIONAL_LCD_START { - // This block is still executed for DIAGS if LCD not in use - LCD(0,F("DCC++ EX v%S"),F(VERSION)); - LCD(1,F("Starting")); - } -// Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi + CONDITIONAL_LCD_START + { + // This block is still executed for DIAGS if LCD not in use + LCD(0, F("DCC++ EX v%S"), F(VERSION)); + LCD(1, F("Starting")); + } + + // Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi #if WIFI_ON WifiInterface::setup(WIFI_SERIAL_LINK_SPEED, F(WIFI_SSID), F(WIFI_PASSWORD), F(WIFI_HOSTNAME), IP_PORT, WIFI_CHANNEL); @@ -76,12 +75,9 @@ void setup() #endif // ETHERNET_ON #if MQTT_ON - DccMQTT::get()->setup(); + MQTTInterface::setup(); #endif - - - // Responsibility 3: Start the DCC engine. // Note: this provides DCC with two motor drivers, main and prog, which handle the motor shield(s) // Standard supported devices have pre-configured macros but custome hardware installations require @@ -89,25 +85,26 @@ void setup() // STANDARD_MOTOR_SHIELD, POLOLU_MOTOR_SHIELD, FIREBOX_MK1, FIREBOX_MK1S are pre defined in MotorShields.h - - DCC::begin(MOTOR_SHIELD_TYPE); - - #if defined(RMFT_ACTIVE) - RMFT::begin(); - #endif + DCC::begin(MOTOR_SHIELD_TYPE); - #if __has_include ( "mySetup.h") - #define SETUP(cmd) serialParser.parse(F(cmd)) - #include "mySetup.h" - #undef SETUP - #endif +#if defined(RMFT_ACTIVE) + RMFT::begin(); +#endif - #if defined(LCN_SERIAL) - LCN_SERIAL.begin(115200); - LCN::init(LCN_SERIAL); - #endif +#if __has_include("mySetup.h") +#define SETUP(cmd) serialParser.parse(F(cmd)) +#include "mySetup.h" +#undef SETUP +#endif - LCD(1,F("Ready")); +#if defined(LCN_SERIAL) + LCN_SERIAL.begin(115200); + LCN::init(LCN_SERIAL); +#endif + + + + LCD(1, F("Ready")); } void loop() @@ -125,31 +122,32 @@ void loop() #if WIFI_ON WifiInterface::loop(); #endif + #if ETHERNET_ON EthernetInterface::loop(); #endif + #if MQTT_ON - DccMQTT::get()->loop(); + MQTTInterface::loop(); #endif - -#if defined(RMFT_ACTIVE) +#if defined(RMFT_ACTIVE) RMFT::loop(); #endif - #if defined(LCN_SERIAL) - LCN::loop(); - #endif +#if defined(LCN_SERIAL) + LCN::loop(); +#endif + + LCDDisplay::loop(); // ignored if LCD not in use - LCDDisplay::loop(); // ignored if LCD not in use - // Report any decrease in memory (will automatically trigger on first call) - static int ramLowWatermark = __INT_MAX__; // replaced on first loop + static int ramLowWatermark = __INT_MAX__; // replaced on first loop int freeNow = minimumFreeMemory(); if (freeNow < ramLowWatermark) { ramLowWatermark = freeNow; - LCD(2,F("Free RAM=%5db"), ramLowWatermark); + LCD(2, F("Free RAM=%5db"), ramLowWatermark); } } diff --git a/DCCEX.h b/DCCEX.h index d274e45..f9a3aa8 100644 --- a/DCCEX.h +++ b/DCCEX.h @@ -16,7 +16,7 @@ #endif #if MQTT_ON == true -#include "DccMQTT.h" +#include "MQTTInterface.h" #endif #include "LCD_Implementation.h" diff --git a/DCCEXParser.cpp b/DCCEXParser.cpp index beaffc2..bc2045e 100644 --- a/DCCEXParser.cpp +++ b/DCCEXParser.cpp @@ -499,17 +499,11 @@ void DCCEXParser::parse(Print *stream, byte *com, RingStream * ringStream) return; case 's': // - // DIAG(F("s1")); StringFormatter::send(stream, F("\n"), DCCWaveform::mainTrack.getPowerMode() == POWERMODE::ON); - // DIAG(F("s2")); StringFormatter::send(stream, F("\n"), F(VERSION), F(ARDUINO_TYPE), DCC::getMotorShieldName(), F(GITHUB_SHA)); - // DIAG(F("s3")); Turnout::printAll(stream); //send all Turnout states - // DIAG(F("s4")); Output::printAll(stream); //send all Output states - // DIAG(F("s5")); Sensor::printAll(stream); //send all Sensor states - //DIAG(F("s6")); // TODO Send stats of speed reminders table return; diff --git a/DccMQTT.cpp b/DccMQTT.cpp deleted file mode 100644 index d1f58ee..0000000 --- a/DccMQTT.cpp +++ /dev/null @@ -1,500 +0,0 @@ -/** - * @file DccMQTT.cpp - * @author Gregor Baues - * @brief MQTT protocol controller for DCC-EX. Sets up and maintains the connection to the MQTT broker incl setting up the topics. - * Topics are created specifically for the command station on which the code runs. Manages subsriptions as well as recieving/sending of messages on the different topics. - * @version 0.1 - * @date 2020-07-08 - * - * @copyright Copyright (c) 2020 - * - * This is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * It is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details . - * - * Notes: - * At most 20 channels are allowed (MAXMQTTCONNECTIONS ) this can be pushed put to 255 - * notwithstanding memeory consumption for maintaining the session info - * - * once the channel is open i.E the topic id has been send via MQ the CS will subscribe to - * clientid/topicid/cmd for recieveing commands and publish on - * clientid/topicid/diag - * clientid/topicid/result - * i.e. the consumer connected via MQTT to the cs has/should to subscribe to - * clientid/topicid/diag - * clientid/topicid/result - * and publish on clientid/topicid/cmd - */ - -#if __has_include("config.h") -#include "config.h" -#else -#warning config.h not found. Using defaults from config.example.h -#include "config.example.h" -#endif -#include "defines.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -//--------- -// Variables -//--------- - -DccMQTT DccMQTT::singleton; -auto mqtt = DccMQTT::get(); - -// The RingBuffer size / no Withrottle over MQ so we don't need the huge buffer as for ethernet -#define OUT_BOUND_SIZE 256 - -// pairing functions used for creating a client identifier -// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published -// shall be a admin message with a random number -// the cs will based on a counter use a second number to create the cantor encoding of both numbers and publish the cantor code -// this message will be seen by all throttles and they can decode the number which provides the first number they send and the -// second number to be used as tpoic for the external system from then on. The CS will recieve on all topics the commands and -// during processing then send the replies to the topic from which the command was recieved. -// Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients - -long cantorEncode(long a, long b) -{ - return (((a + b) * (a + b + 1)) / 2) + b; -} - -void cantorDecode(int32_t c, int *a, int *b) -{ - - int w = floor((sqrt(8 * c + 1) - 1) / 2); - int t = (w * (w + 1)) / 2; - *b = c - t; - *a = w - *b; -} - -/** - * @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID - * - * @param array array containing bytes - * @param len length of the array - * @param buffer buffer to which the string will be written; make sure the buffer has appropriate length - */ -static void array_to_string(byte array[], unsigned int len, char buffer[]) -{ - for (unsigned int i = 0; i < len; i++) - { - byte nib1 = (array[i] >> 4) & 0x0F; - byte nib2 = (array[i] >> 0) & 0x0F; - buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA; - buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA; - } - buffer[len * 2] = '\0'; -} - -// callback when a message arrives from the broker; push cmd into the incommming queue -void mqttCallback(char *topic, byte *payload, unsigned int length) -{ - - auto clients = mqtt->getClients(); - errno = 0; - payload[length] = '\0'; // make sure we have the string terminator in place - - DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length); - - switch (payload[0]) - { - case '<': - { - - const char s[2] = "/"; // topic delimiter is / - char *token; - byte mqsocket; - - /* get the first token = ClientID */ - token = strtok(topic, s); - /* get the second token = topicID */ - token = strtok(NULL, s); - if (token == NULL) - { - DIAG(F("Can't identify sender #1; command send on wrong topic")); - return; - // don't do anything as we wont know where to send the results - // normally the topicid shall be valid as we only have subscribed to that one and nothing else - // comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to - // able to accept other connections ) - } - else - { - auto topicid = atoi(token); - // verify that there is a MQTT client with that topic id connected - bool isClient = false; - // check in the array of clients if we have one with the topicid - // start at 1 as 0 is not allocated as mqsocket - for (int i = 1; i <= mqtt->getClientSize(); i++) - { - if (clients[i].topic == topicid) - { - isClient = true; - mqsocket = i; - break; - } - } - if (!isClient) - { - // no such client connected - DIAG(F("Can't identify sender #2; command send on wrong topic")); - return; - } - } - // if we make it until here we dont even need to test the last "cmd" element from the topic as there is no - // subscription for anything else - - // DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid); - - // Prepare the DCC-EX command - auto pool = mqtt->getPool(); // message pool - auto q = mqtt->getIncomming(); // incomming queue - - csmsg_t tm; // topic message - - if (length + 1 > MAXPAYLOAD) - { - DIAG(F("MQTT Command too long (> %d characters)"), MAXPAYLOAD); - } - strlcpy(tm.cmd, (char *)payload, length + 1); // message payload - tm.mqsocket = mqsocket; // on which socket did we recieve the mq message - int idx = pool->setItem(tm); // Add the recieved command to the pool - - if (idx == -1) - { - DIAG(F("MQTT Command pool full. Could not handle recieved command.")); - return; - } - - q->push(idx); // Add the index of the pool item to the incomming queue - - DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); - - break; - } - case 'm': - { - switch (payload[1]) - { - case 'i': - { - - char buffer[30]; - char *tmp = (char *)payload + 3; - strlcpy(buffer, tmp, length); - buffer[length - 4] = '\0'; - - // DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); - - auto distantid = strtol(buffer, NULL, 10); - - if (errno == ERANGE || distantid > UCHAR_MAX) - { - DIAG(F("Invalid Handshake ID; must be between 0 and 255")); - return; - } - if (distantid == 0) - { - DIAG(F("Invalid Handshake ID")); - return; - } - // --------------------------- - // Create a new MQTT client - // --------------------------- - - // check in the clients if the distantid has been set already somewhere - // if so we either have a new one with the same id then we have a collision -> publish a collision - // or its the same i.e; the message comming back as we are subscribed -> stop here - - // All is ok so set up the channel; MQTT Ctrl command - - auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer - - if (subscriberid == 0) - { - DIAG(F("MQTT no more connections are available")); - return; - } - - auto topicid = cantorEncode((long)subscriberid, (long)distantid); - DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid); - // extract the number delivered from - // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one - - // initalize the new mqtt client object - clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; - - // add/subcribe to the topic for listening on cmds recieved via the channel for the client with - // subscriberid as identifier - - char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH]; - mqtt->getSubscriberTopic(subscriberid, tbuffer); - auto ok = mqtt->subscribe(tbuffer); - DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK"); - - // send the topicid on which the CS will listen for commands to the MQTT client on the root topic - memset(buffer, 0, 30); - sprintf(buffer, "mc(%d,%ld)", (int)distantid, topicid); - DIAG(F("Publishing: [%s] to [%s]"), buffer, mqtt->getClientID()); - mqtt->publish(mqtt->getClientID(), buffer); - - // on the cs side all is set and we declare that the cs is open for business - clients[subscriberid].open = true; - - // we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there - // in the < case we should test that we got the command on the right topic ... - - DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer); - memset(buffer, 0, 30); - sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), topicid); - DIAG(F("MQTT CS is publishing return information to [%s]"), buffer); - memset(buffer, 0, 30); - sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), topicid); - DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer); - - return; - } - default: - { - // ignore - return; - } - } - } - default: - { - // invalid command - DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload); - break; - } - } -} - -/** - * @brief MQTT broker connection / reconnection - * - */ -void DccMQTT::connect() -{ - - int reconnectCount = 0; - connectID[0] = '\0'; - - // Build the connect ID : Prefix + clientID - if (broker->prefix != nullptr) - { - strcpy_P(connectID, (const char *)broker->prefix); - } - strcat(connectID, clientID); - - // Connect to the broker - DIAG(F("MQTT %s (re)connecting ..."), connectID); - while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) - { - DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType); - switch (broker->cType) - { - // no uid no pwd - case 6: - case 1: - { // port(p), ip(i), domain(d), - if (mqttClient.connect(connectID)) - { - DIAG(F("MQTT Broker connected ...")); - mqState = CONNECTED; - } - else - { - DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient.state()); - reconnectCount++; - } - break; - } - // with uid passwd - case 5: - case 2: - { // port(p), ip(i), domain(d), user(uid), pwd(pass), - break; - } - // with uid, passwd & prefix - case 4: - case 3: - { // port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix) - // port(p), domain(d), user(uid), pwd(pass), prefix(pfix) - // mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0)) - break; - } - } - if (reconnectCount == MAXRECONNECT) - { - DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT); - mqState = CONNECTION_FAILED; - } - } -} - -// for the time being only one topic at the root which os the unique clientID from the MCU -// QoS is 0 by default -boolean DccMQTT::subscribe(char *topic) -{ - return mqttClient.subscribe(topic); -} - -void DccMQTT::publish(char *topic, char *payload) -{ - mqttClient.publish(topic, payload); -} - -/** - * @brief Public part of the MQTT setup function. Will call the secondary private setup function following the broker - * configuration from config.h - * - */ -void DccMQTT::setup() -{ - // setup Ethnet connection first - byte mac[6]; - DCCTimer::getSimulatedMacAddress(mac); - -#ifdef IP_ADDRESS - Ethernet.begin(mac, IP_ADDRESS); -#else - if (Ethernet.begin(mac) == 0) - { - DIAG(F("Ethernet.begin FAILED")); - return; - } -#endif - DIAG(F("Ethernet.begin OK.")); - if (Ethernet.hardwareStatus() == EthernetNoHardware) - { - DIAG(F("Ethernet shield not found")); - return; - } - if (Ethernet.linkStatus() == LinkOFF) - { - DIAG(F("Ethernet cable not connected")); - return; - } - - IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address - - DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]); - DIAG(F("Port:%d"), IP_PORT); - - // setup the MQBroker - setup(CSMQTTBROKER); -} - -/** - * @brief Private part of the MQTT setup function. Realizes all required actions for establishing the MQTT connection. - * - * @param id Name provided to the broker configuration - * @param b MQTT broker object containing the main configuration parameters - */ -void DccMQTT::setup(const FSH *id, MQTTBroker *b) -{ - - //Create the MQTT environment and establish inital connection to the Broker - broker = b; - - DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port); - - - byte mqbid[CLIENTIDSIZE] = {0}; - DCCTimer::getSimulatedMacAddress(mqbid); - - // initalize MQ Broker - - mqttClient = PubSubClient(broker->ip, broker->port, mqttCallback, ethClient); - DIAG(F("MQTT Client created ok...")); - - array_to_string(mqbid, CLIENTIDSIZE, clientID); - DIAG(F("MQTT Client ID : %s"), clientID); - - connect(); // inital connection as well as reconnects - auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions - - DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); - outboundRing = new RingStream(OUT_BOUND_SIZE); -} - -void DccMQTT::loop() -{ - // Connection impossible so just don't do anything - if (mqState == CONNECTION_FAILED) - { - return; - } - if (!mqttClient.connected()) - { - connect(); - } - if (!mqttClient.loop()) - { - DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state()); - }; - - // read incomming queue for processing; one per loop - bool state; - DIAG(F("in.count: %d"), in.count()); - if (in.count() > 0) - { - auto idx = in.peek(); - auto c = pool.getItem(in.pop(), &state); - DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket); - DIAG(F("Ring free space1: %d"), outboundRing->freeSpace()); - outboundRing->mark((uint8_t)c->mqsocket); - CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing); - // StringFormatter::send(outboundRing, F("Test result message")); - outboundRing->commit(); - DIAG(F("Ring free space2: %d"), outboundRing->freeSpace()); - pool.returnItem(idx); - } - - // handle at most 1 outbound transmission - int socketOut = outboundRing->read(); - DIAG(F("socketOut: %d"), socketOut); - if (socketOut > 0) // mqsocket / clientid can't be 0 .... - { - int count = outboundRing->count(); - buffer[0] = '\0'; - sprintf(buffer, "%s/%d/result", clientID, (int)clients[socketOut].topic); - DIAG(F("MQTT publish to mqsocket=%d, count=:%d on topic %s"), socketOut, count, buffer); - // construct the payload - char payload[count]; - payload[count] = '\0'; - char *tmp = payload; - for (; count > 0; count--) - { - *tmp = (char)outboundRing->read(); - tmp++; - } - // DIAG(F("Ring free space4: %d"),outboundRing->freeSpace()); - DIAG(F("MQTT publish with payload:\n%s"), payload); - // mqtt->publish(buffer, payload); - } -} \ No newline at end of file diff --git a/DccMQTT.h b/DccMQTT.h deleted file mode 100644 index 8600877..0000000 --- a/DccMQTT.h +++ /dev/null @@ -1,275 +0,0 @@ -#ifndef _DccMQTT_h_ -#define _DccMQTT_h_ - -#if __has_include("config.h") -#include "config.h" -#else -#warning config.h not found. Using defaults from config.example.h -#include "config.example.h" -#endif -#include "defines.h" - -#include -#include -#include -#include -#include -#include -#include - -#define MAXPAYLOAD 64 -#define MAXDOMAINLENGTH 32 - -#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked -#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ? -#define MAXTSTR 30 //!< max length of a topic string -#define MAXCONNECTID 40 // Broker connection id length incl possible prefixes -#define CLIENTIDSIZE 6 // -#define MAXRECONNECT 5 // reconnection tries before final failure -#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers -#define MAXTOPICLENGTH 20 - -// Define Broker configurations; Values are provided in the following order -// MQTT_BROKER_PORT 9883 -// MQTT_BROKER_DOMAIN "dcclms.modelrailroad.ovh" -// MQTT_BROKER_ADDRESS 51, 210, 151, 143 -// MQTT_BROKER_USER "dcccs" -// MQTT_BROKER_PASSWD "dcccs$3020" -// MQTT_BROKER_CLIENTID_PREFIX "dcc$lms-" -struct MQTTBroker -{ - int port; - IPAddress ip; - const FSH *domain = nullptr; - const FSH *user = nullptr; - const FSH *pwd = nullptr; - const FSH *prefix = nullptr; - byte cType; // connection type to identify valid params - - IPAddress resovleBroker(const FSH *d){ - DNSClient dns; - IPAddress bip; - - char domain[MAXDOMAINLENGTH]; - strcpy_P(domain, (const char *)d); - - dns.begin(Ethernet.dnsServerIP()); - if (dns.getHostByName(domain, bip) == 1) - { - DIAG(F("MQTT Broker/ %s = %d.%d.%d.%d"), domain, bip[0], bip[1],bip[2],bip[3]); - } - else - { - DIAG(F("MQTT Dns lookup for %s failed"), domain); - } - return bip; - } - - MQTTBroker(int p, IPAddress i, const FSH *d) : port(p), ip(i), domain(d), cType(1) {}; - MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass) : port(p), ip(i), domain(d), user(uid), pwd(pass), cType(2){}; - MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix), cType(3){}; - MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), domain(d), user(uid), pwd(pass), prefix(pfix), cType(4) - { - ip = resovleBroker(d); - }; - MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass) : port(p), domain(d), user(uid), pwd(pass), cType(5) - { - ip = resovleBroker(d); - }; - MQTTBroker(int p, const FSH *d) : port(p), domain(d), cType(6) - { - ip = resovleBroker(d); - }; -}; - -/** - * @brief dcc-ex command as recieved via MQ - * - */ -typedef struct csmsg_t { - char cmd[MAXPAYLOAD]; // recieved command message - byte mqsocket; // from which mqsocket / subscriberid -} csmsg_t; - -typedef struct csmqttclient_t { - int distant; // random int number recieved from the subscriber - byte mqsocket; // mqtt socket = subscriberid provided by the cs - long topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands - bool open; // true as soon as we have send the id to the mq broker for the client to pickup -} csmqttclient_t; - -enum DccMQTTState -{ - INIT, - CONFIGURED, // server/client objects set - CONNECTED, // mqtt broker is connected - CONNECTION_FAILED // Impossible to get the connection set after MAXRECONNECT tries -}; - -class DccMQTT -{ -private: -// Methods - DccMQTT() = default; - DccMQTT(const DccMQTT &); // non construction-copyable - DccMQTT &operator=(const DccMQTT &); // non copyable - -void setup(const FSH *id, MQTTBroker *broker); -void connect(); // (re)connects to the broker -// Members -static DccMQTT singleton; - EthernetClient ethClient; // TCP Client object for the MQ Connection - IPAddress server; // MQTT server object - PubSubClient mqttClient; // PubSub Endpoint for data exchange - MQTTBroker *broker; // Broker configuration object as set in config.h - - ObjectPool pool; // Pool of commands recieved for the CS - Queue in; // Queue of indexes into the pool according to incomming cmds - - char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID - csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients - char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker - uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection - - DccMQTTState mqState = INIT; - RingStream *outboundRing; - char buffer[MAXTMSG]; // temp buffer for manipulating strings / messages - -public: - static DccMQTT *get() noexcept - { - return &singleton; - } - - boolean subscribe(char *topic); - void publish(char *topic, char* payload); - - bool isConfigured() { return mqState == CONFIGURED; }; - bool isConnected() { return mqState == CONNECTED; }; - void setState(DccMQTTState s) { mqState = s; }; - - ObjectPool *getPool() { return &pool; }; - Queue *getIncomming() { return ∈ }; - - char *getClientID() { return clientID; }; - uint8_t getClientSize() { return subscriberid; } - - // initalized to 0 so that the first id comming back is 1 - // index 0 in the clients array is not used therefore - //! improvement here to be done to save some bytes - uint8_t obtainSubscriberID(){ - if ( subscriberid == MAXMQTTCONNECTIONS) { - return 0; // no more subscriber id available - } - return (++subscriberid); - } - - // this could be calculated once forever at each new connect and be stored - // but to save space we calculate it at each publish - - void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){ - sprintf(tbuffer, "%s/%ld/cmd", clientID, (long) clients[subscriberid].topic); - } - - csmqttclient_t *getClients() { return clients; }; - - void setup(); // called at setup in the main ino file - void loop(); - - ~DccMQTT() = default; -}; - -// /** -// * @brief MQTT broker configuration done in config.h -// */ - -// // Class for setting up the MQTT connection / topics / queues for processing commands and sendig back results - -// #define MAXDEVICEID 20 // maximum length of the unique id / device id -// #define MAXTOPICS 8 // command L,T,S,A plus response plus admin for inital handshake -// #define TCMDROOT "command/" // root of command topics -// #define TCMRESROOT "result/" // root of the result topic -// #define ADMROOT "admin/" // root of the admin topic where whe can do hanshakes for the inital setup -// ; // esp for sec reasons i.e. making sure we are talking to the right device and -// ; // not some one elses -// #define TELEMETRYROOT "telemetry/" // telemetry topic -// #define DIAGROOT "diag/" // diagnostics -// #define JRMIROOT "jrmi/" - -// #define NOOFDCCTOPICS 11 -// enum DccTopics { -// CMD_L, // L is Loco or Layout(power on/off) -// CMD_T, -// CMD_S, -// CMD_A, -// RESULT, -// ADMIN, -// TELEMETRY, -// DIAGNOSTIC, -// JRMI, -// INVALID_T -// }; - -// /** -// * @brief List of keywords used in the command protocol -// * -// */ -// #define MAX_KEYWORD_LENGTH 11 -// PROGMEM const char _kRead[] = {"read"}; -// PROGMEM const char _kWrite[] = {"write"}; -// PROGMEM const char _kPower[] = {"power"}; -// PROGMEM const char _kThrottle[] = {"throttle"}; -// PROGMEM const char _kFunction[] = {"function"}; -// PROGMEM const char _kCv[] = {"cv"}; -// PROGMEM const char _kSpeed[] = {"speed"}; -// PROGMEM const char _kLocomotive[] = {"locomotive"}; -// PROGMEM const char _kValue[] = {"value"}; -// PROGMEM const char _kDirection[] = {"direction"}; -// PROGMEM const char _kState[] = {"state"}; -// PROGMEM const char _kFn[] = {"fn"}; -// PROGMEM const char _kTrack[] = {"track"}; -// PROGMEM const char _kBit[] = {"bit"}; - -// /** -// * @brief The ingoin and outgoing queues can hold 20 messages each; this should be bigger than the number -// * of statically allocated pool items whose pointers are getting pushed into the queues. -// * -// */ -// #define MAXQUEUE 20 // MAX message queue length - -// class DccMQTT -// { -// private: - -// static char *deviceID; // Unique Device Identifier; based on the chip -// static Queue inComming; // incomming messages queue; the queue only contains indexes to the message pool -// static Queue outGoing; // outgoing messages queue; the queue only contains indexes to the message pool - -// public: -// static char **topics; // list of pub/sub topics -// static PubSubClient *mqClient; - -// static void setup(DCCEXParser p); // main entry to get things going -// static void loop(); // recieveing commands / processing commands / publish results -// static bool connected(); // true if the MQ client is connected - -// static char *getDeviceID(); -// static void setDeviceID(); -// static void subscribe(); // subscribes to all relevant topics -// static void subscribeT(char *topic);// subscribe to a particular topic for other than the std ones in subscribe (e.g. telemetry) -// static void publish(); // publishes a JSON message constructed from the outgoing queue (cid and result) -// static void printTopics(); // prints the list of subscribed topics - debug use -// static bool inIsEmpty(); // test if the incomming queue is empty -// static bool outIsEmpty(); // test if the outgoing queue is empty -// static void pushIn(uint8_t midx); // push a command struct into the incomming queue for processing -// static void pushOut(uint8_t midx); // push a command struct into the incomming queue for processing -// static uint8_t popOut(); // pop a command struct with the result to be published -// static uint8_t popIn(); // pop a command struct from the in comming queue for processing - -// static void pub_free_memory(int fm); - -// DccMQTT(); -// ~DccMQTT(); -// }; - -#endif \ No newline at end of file diff --git a/MQTTBrokers.h b/MQTTBrokers.h new file mode 100644 index 0000000..aa48960 --- /dev/null +++ b/MQTTBrokers.h @@ -0,0 +1,27 @@ +#ifndef _MQTTBrokers_h_ +#define _MQTTBrokers_h_ + +// Define Broker configurations; Values are provided in the following order +// MQTT_BROKER_PORT 9883 +// MQTT_BROKER_DOMAIN "dcclms.modelrailroad.ovh" +// MQTT_BROKER_ADDRESS 51, 210, 151, 143 +// MQTT_BROKER_USER "dcccs" +// MQTT_BROKER_PASSWD "dcccs$3020" +// MQTT_BROKER_CLIENTID_PREFIX "dcc$lms-" + +// Local server no user / pwd / prefix required +// EthernetShields / Arduino do not support securte transport i.e. on either port 443 or 8883 for MQTTS on most broker installations +// Once we support the ESP / Wifi as Transport medium we may get TLS capabilities for data in transit i.e. can use the 443/8883 ports +#define LOCAL_MQTT_BROKER F("LOCALMQ"), new MQTTBroker( 1883, {192, 168, 0, 51}, F("my.local.server")) +// Local server with user / pwd and no prefix +#define LOCAL_MQTT_USER_BROKER F("LOCALMQ"), new MQTTBroker( 1883, {192, 168, 0, 51}, F("my.local.server"), F("myuser"), F("mypassword")) +// Cloud server +#define DCCEX_MQTT_BROKER F("DCCEXMQ"), new MQTTBroker( 9883, {51, 210, 151, 143}, F("dcclms.modelrailroad.ovh"), F("dcccs"), F("dcccs$3020"), F("dcc$lms-")) +// Cloud server +#define DCCEX_MQTT_DOMAIN_BROKER F("DCCEXMQ"), new MQTTBroker( 9883, F("dcclms.modelrailroad.ovh"), F("dcccs"), F("dcccs$3020"), F("dcc$lms-")) +// Mosquitto test server +#define DCCEX_MOSQUITTO F("Mosquitto"), new MQTTBroker(1883, F("test.mosquitto.org")) +// HiveMQ test server +#define DCCEX_HIVEMQ F("HiveMQ"), new MQTTBroker(1883, F("broker.hivemq.com")) + +#endif \ No newline at end of file diff --git a/MQTTInterface.cpp b/MQTTInterface.cpp new file mode 100644 index 0000000..58cb8c0 --- /dev/null +++ b/MQTTInterface.cpp @@ -0,0 +1,529 @@ +#if __has_include("config.h") +#include "config.h" +#else +#warning config.h not found. Using defaults from config.example.h +#include "config.example.h" +#endif +#include "defines.h" + +#include "MQTTInterface.h" +#include "MQTTBrokers.h" +#include "DCCTimer.h" +#include +#include +#include + +MQTTInterface *MQTTInterface::singleton = NULL; + +long cantorEncode(long a, long b) +{ + return (((a + b) * (a + b + 1)) / 2) + b; +} + +void cantorDecode(int32_t c, int *a, int *b) +{ + + int w = floor((sqrt(8 * c + 1) - 1) / 2); + int t = (w * (w + 1)) / 2; + *b = c - t; + *a = w - *b; +} + +void MQTTInterface::setup() +{ + singleton = new MQTTInterface(); + + if (!singleton->connected) + { + singleton = NULL; + } + if (Diag::MQTT) + DIAG(F("MQTT Interface instance: [%x] - Setup done"), singleton); +}; + +MQTTInterface::MQTTInterface() +{ + + this->connected = this->setupNetwork(); + if (!this->connected) + { + DIAG(F("Network setup failed")); + } + else + { + this->setup(CSMQTTBROKER); + } + this->outboundRing = new RingStream(OUT_BOUND_SIZE); +}; + +/** + * @brief MQTT Interface callback recieving all incomming messages from the PubSubClient + * + * @param topic + * @param payload + * @param length + */ +void mqttCallback(char *topic, byte *payload, unsigned int length) +{ + MQTTInterface *mqtt = MQTTInterface::get(); + auto clients = mqtt->getClients(); + + errno = 0; + payload[length] = '\0'; // make sure we have the string terminator in place + if (Diag::MQTT) + DIAG(F("MQTT Callback:[%s] [%s] [%d] on interface [%x]"), topic, (char *)payload, length, mqtt); + switch (payload[0]) + { + case '<': + { + const char s[2] = "/"; // topic delimiter is / + char *token; + byte mqsocket; + + /* get the first token = ClientID */ + token = strtok(topic, s); + /* get the second token = topicID */ + token = strtok(NULL, s); + if (token == NULL) + { + DIAG(F("MQTT Can't identify sender #1; command send on wrong topic")); + return; + // don't do anything as we wont know where to send the results + // normally the topicid shall be valid as we only have subscribed to that one and nothing else + // comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to + // able to accept other connections ) + } + else + { + auto topicid = atoi(token); + // verify that there is a MQTT client with that topic id connected + bool isClient = false; + // check in the array of clients if we have one with the topicid + // start at 1 as 0 is not allocated as mqsocket + for (int i = 1; i <= mqtt->getClientSize(); i++) + // for (int i = 1; i <= subscriberid; i++) + { + if (clients[i].topic == topicid) + { + isClient = true; + mqsocket = i; + break; + } + } + if (!isClient) + { + // no such client connected + DIAG(F("MQTT Can't identify sender #2; command send on wrong topic")); + return; + } + } + // if we make it until here we dont even need to test the last "cmd" element from the topic as there is no + // subscription for anything else + + // DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid); + + // Prepare the DCC-EX command + csmsg_t tm; // topic message + + if (length >= MAXPAYLOAD) + { + DIAG(F("MQTT Command too long (> [%d] characters)"), MAXPAYLOAD); + } + memset(tm.cmd, 0, MAXPAYLOAD); // Clean up the cmd buffer - should not be necessary + strlcpy(tm.cmd, (char *)payload, length + 1); // Message payload + tm.mqsocket = mqsocket; // On which socket did we recieve the mq message + int idx = mqtt->getPool()->setItem(tm); // Add the recieved command to the pool + + if (idx == -1) + { + DIAG(F("MQTT Command pool full. Could not handle recieved command.")); + return; + } + + mqtt->getIncomming()->push(idx); // Add the index of the pool item to the incomming queue + + if (Diag::MQTT) + DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); + break; + } + case 'm': + { + switch (payload[1]) + { + case 'i': + { + char buffer[MAXPAYLOAD]; + char *tmp = (char *)payload + 3; + strlcpy(buffer, tmp, length); + buffer[length - 4] = '\0'; + // DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); + + auto distantid = strtol(buffer, NULL, 10); + + if (errno == ERANGE || distantid > UCHAR_MAX) + { + DIAG(F("MQTT Invalid Handshake ID; must be between 0 and 255")); + return; + } + if (distantid == 0) + { + DIAG(F("MQTT Invalid Handshake ID")); + return; + } + // --------------------------- + // Create a new MQTT client + // --------------------------- + + // check in the clients if the distantid has been set already somewhere + // if so we either have a new one with the same id then we have a collision -> publish a collision + // or its the same i.e; the message comming back as we are subscribed -> stop here + + // All is ok so set up the channel; MQTT Ctrl command + + auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer + + if (subscriberid == 0) + { + DIAG(F("MQTT no more connections are available")); + return; + } + + auto topicid = cantorEncode((long)subscriberid, (long)distantid); + DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid); + // extract the number delivered from + // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one + + // initalize the new mqtt client object + clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available + + auto sq = mqtt->getSubscriptionQueue(); + sq->push(subscriberid); + + return; + } + default: + { + // ignore + return; + } + } + } + default: + { + // invalid command + DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload); + break; + } + } +} + +/** + * @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID + * + * @param array array containing bytes + * @param len length of the array + * @param buffer buffer to which the string will be written; make sure the buffer has appropriate length + */ +static void array_to_string(byte array[], unsigned int len, char buffer[]) +{ + for (unsigned int i = 0; i < len; i++) + { + byte nib1 = (array[i] >> 4) & 0x0F; + byte nib2 = (array[i] >> 0) & 0x0F; + buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA; + buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA; + } + buffer[len * 2] = '\0'; +} + +/** + * @brief Connect to the MQTT broker; Parameters for this function are defined in + * like the motoshield configurations there are mqtt broker configurations in config.h + * + * @param id Name provided to the broker configuration + * @param b MQTT broker object containing the main configuration parameters + */ +void MQTTInterface::setup(const FSH *id, MQTTBroker *b) +{ + + //Create the MQTT environment and establish inital connection to the Broker + broker = b; + + DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port); + + // initalize MQ Broker + mqttClient = new PubSubClient(broker->ip, broker->port, mqttCallback, ethClient); + + if (Diag::MQTT) + DIAG(F("MQTT Client created ok...")); + array_to_string(mac, CLIENTIDSIZE, clientID); + DIAG(F("MQTT Client ID : %s"), clientID); + connect(); // inital connection as well as reconnects +} + +/** + * @brief MQTT broker connection / reconnection + * + */ +void MQTTInterface::connect() +{ + + int reconnectCount = 0; + connectID[0] = '\0'; + + // Build the connect ID : Prefix + clientID + if (broker->prefix != nullptr) + { + strcpy_P(connectID, (const char *)broker->prefix); + } + strcat(connectID, clientID); + + // Connect to the broker + DIAG(F("MQTT %s (re)connecting ..."), connectID); + while (!mqttClient->connected() && reconnectCount < MAXRECONNECT) + { + switch (broker->cType) + { + // no uid no pwd + case 6: + case 1: + { // port(p), ip(i), domain(d), + if (mqttClient->connect(connectID)) + { + DIAG(F("MQTT Broker connected ...")); + auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber + if (Diag::MQTT) + DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); + mqState = CONNECTED; + } + else + { + DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state()); + mqState = CONNECTION_FAILED; + reconnectCount++; + } + break; + } + // with uid passwd + case 5: + case 2: + { // port(p), ip(i), domain(d), user(uid), pwd(pass), + break; + } + // with uid, passwd & prefix + case 4: + case 3: + { // port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix) + // port(p), domain(d), user(uid), pwd(pass), prefix(pfix) + // mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0)) + break; + } + } + if (reconnectCount == MAXRECONNECT) + { + DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT); + mqState = CONNECTION_FAILED; + } + } +} + +/** + * @brief for the time being only one topic at the root + * which is the unique clientID from the MCU + * QoS is 0 by default + * + * @param topic to subsribe to + * @return boolean true if successful false otherwise + */ +boolean MQTTInterface::subscribe(const char *topic) +{ + auto res = mqttClient->subscribe(topic); + return res; +} + +void MQTTInterface::publish(const char *topic, const char *payload) +{ + mqttClient->publish(topic, payload); +} + +/** + * @brief Connect the Ethernet network; + * + * @return true if connections was successful + */ +bool MQTTInterface::setupNetwork() +{ + + // setup Ethernet connection first + + DCCTimer::getSimulatedMacAddress(mac); + +#ifdef IP_ADDRESS + Ethernet.begin(mac, IP_ADDRESS); +#else + if (Ethernet.begin(mac) == 0) + { + DIAG(F("Ethernet.begin FAILED")); + return false; + } +#endif + DIAG(F("Ethernet.begin OK.")); + if (Ethernet.hardwareStatus() == EthernetNoHardware) + { + DIAG(F("Ethernet shield not found")); + return false; + } + if (Ethernet.linkStatus() == LinkOFF) + { + DIAG(F("Ethernet cable not connected")); + return false; + } + + IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address + DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]); + DIAG(F("Port:%d"), IP_PORT); + + return true; +} + +/** + * @brief handle the incomming queue in the loop + * + */ +void inLoop(Queue &in, ObjectPool &pool, RingStream *outboundRing) +{ + + bool state; + if (in.count() > 0) + { + // pop a command index from the incomming queue and get the command from the pool + int idx = in.pop(); + csmsg_t *c = pool.getItem(idx, &state); + + // execute the command and collect results + outboundRing->mark((uint8_t)c->mqsocket); + CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing); + outboundRing->commit(); + + // free the slot in the command pool + pool.returnItem(idx); + } +} + +/** + * @brief handle the outgoing messages in the loop + * + */ +void outLoop(PubSubClient *mq) +{ + // handle at most 1 outbound transmission + MQTTInterface *mqtt = MQTTInterface::get(); + + auto clients = mqtt->getClients(); + auto outboundRing = mqtt->getRingStream(); + + int mqSocket = outboundRing->read(); + if (mqSocket >= 0) // mqsocket / clientid can't be 0 .... + { + int count = outboundRing->count(); + char buffer[MAXTSTR]; + buffer[0] = '\0'; + sprintf(buffer, "%s/%d/result", mqtt->getClientID(), (int)clients[mqSocket].topic); + if (Diag::MQTT) + DIAG(F("MQTT publish to mqSocket=%d, count=:%d on topic %s"), mqSocket, count, buffer); + + if (mq->beginPublish(buffer, count, false)) + { + for (; count > 0; count--) + { + mq->write(outboundRing->read()); + } + } + else + { + DIAG(F("MQTT error start publishing result)")); + }; + + if (!mq->endPublish()) + { + DIAG(F("MQTT error finalizing published result)")); + }; + } +} + +/** + * @brief check if there are new subscribers connected and create the channels + * + * @param sq if the callback captured a client there will be an entry in the sq with the subscriber number + * @param clients the clients array where we find the info to setup the subsciptions and print out the publish topics for info + */ +void checkSubscribers(Queue &sq, csmqttclient_t *clients) +{ + MQTTInterface *mqtt = MQTTInterface::get(); + if (sq.count() > 0) + { + // new subscriber + auto s = sq.pop(); + + char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTSTR]; + sprintf(tbuffer, "%s/%ld/cmd", mqtt->getClientID(), clients[s].topic); + + auto ok = mqtt->subscribe(tbuffer); + + if (Diag::MQTT) + DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK"); + + // send the topicid on which the CS will listen for commands to the MQTT client on the root topic + char buffer[30]; + memset(buffer, 0, 30); + sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic); + if (Diag::MQTT) + DIAG(F("MQTT Publishing: [%s] to [%s]"), buffer, mqtt->getClientID()); + mqtt->publish(mqtt->getClientID(), buffer); + + // on the cs side all is set and we declare that the cs is open for business + clients[s].open = true; + + // we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there + // in the < case we should test that we got the command on the right topic ... + DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer); + memset(buffer, 0, 30); + sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), clients[s].topic); + DIAG(F("MQTT CS is publishing return information to [%s]"), buffer); + memset(buffer, 0, 30); + sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), clients[s].topic); + DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer); + } +} + +void MQTTInterface::loop() +{ + + if (!singleton) + return; + singleton->loop2(); +} + +void MQTTInterface::loop2() +{ + // Connection impossible so just don't do anything + if (singleton->mqState == CONNECTION_FAILED) + { + DIAG(F("MQTT connection failed...")); + return; + } + if (!mqttClient->connected()) + { + DIAG(F("MQTT no connection trying to reconnect ...")); + connect(); + } + if (!mqttClient->loop()) + { + DIAG(F("mqttClient returned with error; state: %d"), mqttClient->state()); + return; + }; + + checkSubscribers(subscriberQueue, clients); + inLoop(in, pool, outboundRing); + outLoop(mqttClient); +} \ No newline at end of file diff --git a/MQTTInterface.h b/MQTTInterface.h new file mode 100644 index 0000000..28bc6f5 --- /dev/null +++ b/MQTTInterface.h @@ -0,0 +1,186 @@ +#ifndef _MQTTInterface_h_ +#define _MQTTInterface_h_ + +#if __has_include("config.h") +#include "config.h" +#else +#warning config.h not found. Using defaults from config.example.h +#include "config.example.h" +#endif +#include "defines.h" + +#include +#include +#include +#include +#include +#include +#include + +#define MAXPAYLOAD 64 // max length of a payload recieved +#define MAXDOMAINLENGTH 32 // domain name length for the broker e.g. test.mosquitto.org + +#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked +#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ? +#define MAXTSTR 30 //!< max length of a topic string +#define MAXCONNECTID 40 // broker connection id length incl possible prefixes +#define CLIENTIDSIZE 6 // max length of the clientid used for connection to the broker +#define MAXRECONNECT 5 // reconnection tries before final failure +#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers +#define OUT_BOUND_SIZE 256 // Size of the RingStream used to provide results from the parser and publish +#define MAX_POOL_SIZE 32 // recieved command store size + +// Define Broker configurations; Values are provided in the following order +// MQTT_BROKER_PORT 9883 +// MQTT_BROKER_DOMAIN "dcclms.modelrailroad.ovh" +// MQTT_BROKER_ADDRESS 51, 210, 151, 143 +// MQTT_BROKER_USER "dcccs" +// MQTT_BROKER_PASSWD "dcccs$3020" +// MQTT_BROKER_CLIENTID_PREFIX "dcc$lms-" +struct MQTTBroker +{ + int port; + IPAddress ip; + const FSH *domain = nullptr; + const FSH *user = nullptr; + const FSH *pwd = nullptr; + const FSH *prefix = nullptr; + byte cType; // connection type to identify valid params + + IPAddress resovleBroker(const FSH *d) + { + DNSClient dns; + IPAddress bip; + + char domain[MAXDOMAINLENGTH]; + strcpy_P(domain, (const char *)d); + + dns.begin(Ethernet.dnsServerIP()); + if (dns.getHostByName(domain, bip) == 1) + { + DIAG(F("MQTT Broker/ %s = %d.%d.%d.%d"), domain, bip[0], bip[1], bip[2], bip[3]); + } + else + { + DIAG(F("MQTT Dns lookup for %s failed"), domain); + } + return bip; + } + + MQTTBroker(int p, IPAddress i, const FSH *d) : port(p), ip(i), domain(d), cType(1){}; + MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass) : port(p), ip(i), domain(d), user(uid), pwd(pass), cType(2){}; + MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix), cType(3){}; + MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), domain(d), user(uid), pwd(pass), prefix(pfix), cType(4) + { + ip = resovleBroker(d); + }; + MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass) : port(p), domain(d), user(uid), pwd(pass), cType(5) + { + ip = resovleBroker(d); + }; + MQTTBroker(int p, const FSH *d) : port(p), domain(d), cType(6) + { + ip = resovleBroker(d); + }; +}; + +/** + * @brief dcc-ex command as recieved via MQ + * + */ +typedef struct csmsg_t +{ + char cmd[MAXPAYLOAD]; // recieved command message + byte mqsocket; // from which mqsocket / subscriberid +} csmsg_t; + +typedef struct csmqttclient_t +{ + int distant; // random int number recieved from the subscriber + byte mqsocket; // mqtt socket = subscriberid provided by the cs + long topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands + bool open; // true as soon as we have send the id to the mq broker for the client to pickup +} csmqttclient_t; + +enum MQTTInterfaceState +{ + INIT, + CONFIGURED, // server/client objects set + CONNECTED, // mqtt broker is connected + CONNECTION_FAILED // Impossible to get the connection set after MAXRECONNECT tries +}; + +class MQTTInterface +{ +private: + // Methods + MQTTInterface(); + MQTTInterface(const MQTTInterface &); // non construction-copyable + MQTTInterface &operator=(const MQTTInterface &); // non copyable + + void setup(const FSH *id, MQTTBroker *broker); // instantiates the broker + void connect(); // (re)connects to the broker + bool setupNetwork(); // sets up the network connection for the PubSub system + void loop2(); + + // Members + static MQTTInterface *singleton; // unique instance of the MQTTInterface object + EthernetClient ethClient; // TCP Client object for the MQ Connection + byte mac[6]; // simulated mac address + IPAddress server; // MQTT server object + MQTTBroker *broker; // Broker configuration object as set in config.h + + ObjectPool pool; // Pool of commands recieved for the CS + Queue in; // Queue of indexes into the pool according to incomming cmds + Queue subscriberQueue; // Queue for incomming subscribers; push the subscriber into the queue for setup in a loop cycle + + char clientID[(CLIENTIDSIZE * 2) + 1]; // unique ID of the commandstation; not to confused with the connectionID + csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients + char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker + uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection + + bool connected = false; // set to true if the ethernet connection is available + MQTTInterfaceState mqState = INIT; // Status of the MQBroker connection + RingStream *outboundRing; // Buffer for collecting the results from the command parser + PubSubClient *mqttClient; // PubSub Endpoint for data exchange + +public: + static MQTTInterface *get() noexcept + { + return singleton; + } + + boolean subscribe(const char *topic); + + void publish(const char *topic, const char *payload); + + ObjectPool *getPool() { return &pool; }; + Queue *getIncomming() { return ∈ }; + Queue *getSubscriptionQueue() { return &subscriberQueue; }; + + char *getClientID() { return clientID; }; + uint8_t getClientSize() { return subscriberid; } + + // initalized to 0 so that the first id comming back is 1 + // index 0 in the clients array is not used therefore + //! improvement here to be done to save some bytes + + uint8_t obtainSubscriberID() + { + if (subscriberid == MAXMQTTCONNECTIONS) + { + return 0; // no more subscriber id available + } + return (++subscriberid); + } + + csmqttclient_t *getClients() { return clients; }; + RingStream *getRingStream() { return outboundRing; }; // debug only + + static void setup(); + static void loop(); + + ~MQTTInterface() = default; +}; + +#endif \ No newline at end of file diff --git a/MemStream.cpp b/MemStream.cpp new file mode 100644 index 0000000..ca443e2 --- /dev/null +++ b/MemStream.cpp @@ -0,0 +1,98 @@ +/* + + (c) 2015 Ingo Fischer + buffer serial device + based on Arduino SoftwareSerial + + Constructor warning messages fixed by Chris Harlow. + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*/ + +#include "MemStream.h" + +MemStream::MemStream(uint8_t *buffer, const uint16_t len, uint16_t content_len, bool allowWrite) +:_buffer(buffer),_len(len), _buffer_overflow(false), _pos_read(0), _allowWrite(allowWrite) +{ + if (content_len==0) memset(_buffer, 0, _len); + _pos_write=(content_len>len)? len: content_len; +} + +size_t MemStream::write(uint8_t byte) { + if (! _allowWrite) return -1; + if (_pos_write >= _len) { + _buffer_overflow = true; + return 0; + } + _buffer[_pos_write] = byte; + ++_pos_write; + return 1; +} + +void MemStream::flush() { + memset(_buffer, 0, _len); + _pos_write = 0; + _pos_read = 0; +} + +int MemStream::read() { + if (_pos_read >= _len) { + _buffer_overflow = true; + return -1; + } + if (_pos_read >= _pos_write) { + return -1; + } + return _buffer[_pos_read++]; +} + +int MemStream::peek() { + if (_pos_read >= _len) { + _buffer_overflow = true; + return -1; + } + if (_pos_read >= _pos_write) { + return -1; + } + return _buffer[_pos_read+1]; +} + +int MemStream::available() { + int ret=_pos_write-_pos_read; + if (ret<0) ret=0; + return ret; +} + +void MemStream::setBufferContent(uint8_t *buffer, uint16_t content_len) { + memset(_buffer, 0, _len); + memcpy(_buffer, buffer, content_len); + _buffer_overflow=false; + _pos_write=content_len; + _pos_read=0; +} + +void MemStream::setBufferContentFromProgmem(uint8_t *buffer, uint16_t content_len) { + memset(_buffer, 0, _len); + memcpy_P(_buffer, buffer, content_len); + _buffer_overflow=false; + _pos_write=content_len; + _pos_read=0; +} + +void MemStream::setBufferContentPosition(uint16_t read_pos, uint16_t write_pos) { + _pos_write=write_pos; + _pos_read=read_pos; +} \ No newline at end of file diff --git a/MemStream.h b/MemStream.h new file mode 100644 index 0000000..4c5c154 --- /dev/null +++ b/MemStream.h @@ -0,0 +1,78 @@ +/* + + (c) 2015 Ingo FIscher + buffer serial device + based on Arduino SoftwareSerial + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*/ + +#ifndef MemStream_h +#define MemStream_h + +#include +#if defined(ARDUINO_ARCH_MEGAAVR) +#include +#else +#include +#endif + +#include + +class MemStream : public Stream +{ +private: + uint8_t *_buffer; + const uint16_t _len; + bool _buffer_overflow; + uint16_t _pos_read; + uint16_t _pos_write; + bool _allowWrite; + +public: + // public methods + MemStream(uint8_t *buffer, const uint16_t len, uint16_t content_len = 0, bool allowWrite = true); + ~MemStream() {} + + operator const uint8_t *() const { return _buffer; } + operator const char *() const { return (const char *)_buffer; } + + uint16_t current_length() const { return _pos_write; } + + bool listen() { return true; } + void end() {} + bool isListening() { return true; } + bool overflow() + { + bool ret = _buffer_overflow; + _buffer_overflow = false; + return ret; + } + int peek(); + + virtual size_t write(uint8_t byte); + virtual int read(); + virtual int available(); + virtual void flush(); + + void setBufferContent(uint8_t *buffer, uint16_t content_len); + void setBufferContentFromProgmem(uint8_t *buffer, uint16_t content_len); + void setBufferContentPosition(uint16_t read_pos, uint16_t write_pos); + + using Print::write; +}; + +#endif \ No newline at end of file diff --git a/RingStream.cpp b/RingStream.cpp index 2c28d1b..8a59ceb 100644 --- a/RingStream.cpp +++ b/RingStream.cpp @@ -103,3 +103,43 @@ bool RingStream::commit() { _buffer[_mark]=lowByte(_count); return true; // commit worked } + +// grbba to be removed +// print the buffer one line for 10 chars in the array +// void RingStream::printBuffer() { +// int j = 0; +// for ( int k = 0; k < _len; k++ ) { +// if ( j == 10) { +// j = 0; +// Serial.println(); +// } +// j++; +// Serial.print((char) _buffer[k]); +// Serial.print(" "); +// } +// } + +// void RingStream::printInfo() { +// Serial.print("_len: "); Serial.println(_len); +// Serial.print("_pos_write: "); Serial.println(_pos_write); +// Serial.print("_pos_read: "); Serial.println(_pos_read); +// Serial.print("_overflow: "); Serial.println(_overflow); +// Serial.print("_mark: "); Serial.println(_mark); +// Serial.print("_count: ");Serial.println(_count); + +// } + +// void RingStream::reset(const uint16_t len) +// { +// _len=len; +// memset(_buffer,0,len); +// // _buffer=new byte[len]; +// _pos_write=0; +// _pos_read=0; +// _buffer[0]=0; +// _overflow=false; +// _mark=0; +// _count=0; +// } + +// grbba to be removed \ No newline at end of file diff --git a/RingStream.h b/RingStream.h index 790c66e..40c205c 100644 --- a/RingStream.h +++ b/RingStream.h @@ -21,10 +21,12 @@ #include +// template class RingStream : public Print { public: RingStream( const uint16_t len); + ~RingStream() = default; virtual size_t write(uint8_t b); using Print::write; @@ -34,15 +36,27 @@ class RingStream : public Print { void mark(uint8_t b); bool commit(); uint8_t peekTargetMark(); - + + int size() {return _len;} + byte *getBuffer() { return _buffer; } +// grbba to be removed + // void printBuffer(); + // void printInfo(); + // void reset(const uint16_t len); +// grbba to be removed + + int getLen() { return _len; }; + private: int _len; - int _pos_write; + // int _len = S; + int _pos_write ; int _pos_read; bool _overflow; int _mark; int _count; - byte * _buffer; + // byte _buffer[S]; + byte *_buffer; }; #endif diff --git a/config.example.h b/config.example.h index 396d3dd..6097acd 100644 --- a/config.example.h +++ b/config.example.h @@ -92,6 +92,22 @@ The configuration file for DCC-EX Command Station // //#define IP_ADDRESS { 192, 168, 1, 200 } +// +// ENABLE_MQTT: if set to true you have to have an Arduino Ethernet card (wired). This +// is not for Wifi. You will need the Arduino Ethernet library as well as the PubSub +// library from or get via the libray manager either from the IDE +// or PIO + + +// #define ENABLE_MQTT true +// Set the used broker to one of the configurations from MQTTBrokers.h where some +// public freely avaiable brokers are configured + +#define CSMQTTBROKER DCCEX_MOSQUITTO + +// Example for configuring your own MQTT broker + + ///////////////////////////////////////////////////////////////////////////////////// //