1
0
mirror of https://github.com/DCC-EX/CommandStation-EX.git synced 2024-11-23 08:06:13 +01:00

MQTT firt send/recv ok

This commit is contained in:
Gregor Baues 2021-05-01 12:38:27 +02:00
parent da85e4e245
commit 6bd9e28be4
4 changed files with 297 additions and 164 deletions

View File

@ -76,7 +76,7 @@ void setup()
#endif // ETHERNET_ON #endif // ETHERNET_ON
#if MQTT_ON #if MQTT_ON
DccMQTT::get()->setup(LOCAL_MQTT_BROKER); DccMQTT::get()->setup();
#endif #endif
@ -128,6 +128,10 @@ void loop()
#if ETHERNET_ON #if ETHERNET_ON
EthernetInterface::loop(); EthernetInterface::loop();
#endif #endif
#if MQTT_ON
DccMQTT::get()->loop();
#endif
#if defined(RMFT_ACTIVE) #if defined(RMFT_ACTIVE)
RMFT::loop(); RMFT::loop();

View File

@ -19,29 +19,25 @@
* GNU General Public License for more details <https://www.gnu.org/licenses/>. * GNU General Public License for more details <https://www.gnu.org/licenses/>.
*/ */
#if __has_include ( "config.h") #if __has_include("config.h")
#include "config.h" #include "config.h"
#else #else
#warning config.h not found. Using defaults from config.example.h #warning config.h not found. Using defaults from config.example.h
#include "config.example.h" #include "config.example.h"
#endif #endif
#include "defines.h" #include "defines.h"
#include <Arduino.h> #include <Arduino.h>
#include <avr/pgmspace.h>
#include <EthernetInterface.h> #include <EthernetInterface.h>
#include <PubSubClient.h> // Base (sync) MQTT library #include <PubSubClient.h> // Base (sync) MQTT library
#include <DIAG.h> #include <DIAG.h>
#include <Ethernet.h>
#include <Dns.h>
#include <DCCTimer.h>
#include <DccMQTT.h> #include <DccMQTT.h>
//---------
// Defines
//---------
#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked
#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ?
#define MAXTSTR 30 //!< max length of a topic string
//--------- //---------
// Variables // Variables
//--------- //---------
@ -52,99 +48,171 @@ char topicMessage[MAXTMSG];
DccMQTT DccMQTT::singleton; DccMQTT DccMQTT::singleton;
/**
* @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID
*
* @param array array containing bytes
* @param len length of the array
* @param buffer buffer to which the string will be written; make sure the buffer has appropriate length
*/
static void array_to_string(byte array[], unsigned int len, char buffer[])
{
for (unsigned int i = 0; i < len; i++)
{
byte nib1 = (array[i] >> 4) & 0x0F;
byte nib2 = (array[i] >> 0) & 0x0F;
buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA;
buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA;
}
buffer[len * 2] = '\0';
}
// callback when a message arrives from the broker; push cmd into the incommming queue
// callback when a message arrives from the broker
void mqttCallback(char *topic, byte *payload, unsigned int length) void mqttCallback(char *topic, byte *payload, unsigned int length)
{ {
topicName[0] = '\0'; topicName[0] = '\0';
topicMessage[0] = '\0'; topicMessage[0] = '\0';
strcpy(topicName, topic); strcpy(topicName, topic);
strlcpy(topicMessage, (char *)payload, length + 1); strlcpy(topicMessage, (char *)payload, length + 1);
Serial.println("some msg arrived");
DIAG(F("MQTT Message arrived [%s]: %s"), topicName, topicMessage); DIAG(F("MQTT Message arrived [%s]: %s"), topicName, topicMessage);
} }
/** /**
* @brief MQTT broker connection / reconnection * @brief MQTT broker connection / reconnection
* *
*/ */
static void reconnect() void DccMQTT::connect()
{ {
DIAG(F("MQTT (re)connecting ..."));
while (!mqttClient.connected())
{
DIAG(F("Attempting MQTT Broker connection..."));
// Attempt to connect
#ifdef CLOUDBROKER
char *connectID = new char[40];
char *connectID = new char[MAXCONNECTID];
connectID[0] = '\0'; connectID[0] = '\0';
strcat(connectID, MQTT_BROKER_CLIENTID_PREFIX);
strcat(connectID,DccMQTT::getDeviceID());
INFO(F("ConnectID: %s %s %s"), connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD); int reconnectCount = 0;
#ifdef MQTT_BROKER_USER
DBG(F("MQTT (re)connecting (Cloud/User) ...")); // if(broker->prefix != nullptr) {
if (mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0)) // char tmp[20];
#else // strcpy_P(tmp, (const char *)broker->prefix);
DBG(F("MQTT (re)connecting (Cloud) ...")); // Serial.println(tmp);
if (mqttClient.connect(DccMQTT::getDeviceID())) // Serial.println(broker->prefix);
#endif // connectID[0] = '\0';
#else // strcat(connectID, tmp);
#ifdef MQTT_BROKER_USER // }
DBG(F("MQTT (re)connecting (Local/User) ..."));
if (mqttClient.connect(DccMQTT::getDeviceID(), MQTT_BROKER_USER, MQTT_BROKER_PASSWD)) strcat(connectID, clientID);
#else
DBG(F("MQTT (re)connecting (Local) ..."));
if (mqttClient.connect(DccMQTT::getDeviceID())) DIAG(F("MQTT %s (re)connecting ..."), connectID);
#endif // Build the connect ID : Prefix + clientID
#endif
while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
{ {
INFO(F("MQTT broker connected ...")); DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType);
// publish on the $connected topic switch (broker->cType)
DccMQTT::subscribe(); // required in case of a connection loss to do it again (this causes a mem leak !! of 200bytes each time!!) {
case 6:
case 1:
{ // port(p), ip(i), domain(d),
if (mqttClient.connect(clientID))
{
DIAG(F("MQTT broker connected ..."));
} }
else else
{ {
INFO(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient.state()); DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient.state());
reconnectCount++;
}
break;
}
case 2:
{ // port(p), ip(i), domain(d), user(uid), pwd(pass),
break;
}
case 3:
{ // port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix)
// mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0))
break;
}
case 4:
{ // port(p), domain(d), user(uid), pwd(pass), prefix(pfix)
break;
}
case 5:
{ // port(p), domain(d), user(uid), pwd(pass)
break;
}
// case 6:
// { // port(p), domain(d)
// mqttClient.connect()
// break;
// }
}
if (reconnectCount == MAXRECONNECT) {
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
} }
} }
} }
// for the time being only one topic at the root which os the unique clientID from the MCU
// QoS is 0 by default
boolean DccMQTT::subscribe() {
return mqttClient.subscribe(clientID);
}
/**
* @brief Public part of the MQTT setup function. Will call the secondary private setup function following the broker
* configuration from config.h
*
*/
void DccMQTT::setup() void DccMQTT::setup()
{ {
setup(CSMQTTBROKER);
}
/**
* @brief Private part of the MQTT setup function. Realizes all required actions for establishing the MQTT connection.
*
* @param id Name provided to the broker configuration
* @param b MQTT broker object containing the main configuration parameters
*/
void DccMQTT::setup(const FSH *id, MQTTBroker *b)
{
//Create the MQTT environment and establish inital connection to the Broker //Create the MQTT environment and establish inital connection to the Broker
broker = b;
// get a eth client session // get a eth client session
ethClient = EthernetInterface::get()->getServer()->available();
DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
// ethClient = EthernetInterface::get()->getServer()->available();
ethClient = EthernetInterface::get()->getServer()->accept();
// initalize MQ Broker // initalize MQ Broker
mqttClient = PubSubClient(ethClient);
mqttClient.setServer(IPAddress(MQTT_BROKER_ADDRESS), MQTT_BROKER_PORT);
DIAG(F("MQTT Client : Server ok ...")); mqttClient = PubSubClient(ethClient);
mqttClient.setServer(broker->ip, broker->port);
DIAG(F("MQTT Client : Server ok ...%x/%x"), ethClient, mqttClient);
mqttClient.setCallback(mqttCallback); // Initalize callback function for incomming messages mqttClient.setCallback(mqttCallback); // Initalize callback function for incomming messages
DIAG(F("MQTT Client : Callback set ...")); DIAG(F("MQTT Client : Callback set ..."));
// DccMQTT::setDeviceID(); // set the unique device ID to bu used for creating / listening to topic byte mqbid[CLIENTIDSIZE] = {0};
DCCTimer::getSimulatedMacAddress(mqbid);
array_to_string(mqbid, CLIENTIDSIZE, clientID);
DIAG(F("MQTT Client ID : %s"), clientID);
reconnect(); // inital connection as well as reconnects connect(); // inital connection as well as reconnects
// DccMQTT::subscribe(); // set up all subscriptionn auto sub = DccMQTT::subscribe(); // set up all subscriptions
DIAG(F("MQTT subscriptons done..."));
DIAG(F("MQTT subscriptons %s..."), sub ? "ok":"failed");
mqttClient.publish(clientID, "Hello from DccEX");
// sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID()); // sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID());
// mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic // mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic
// /** // /**
// * @todo set the connect status with a retained message on the $connected topic /admin/<csid>/$connected as used in the connect // * @todo set the connect status with a retained message on the $connected topic /admin/<csid>/$connected as used in the connect
// * // *
@ -154,6 +222,24 @@ void DccMQTT::setup()
} }
void DccMQTT::loop()
{
// DccTelemetry::deltaT(1);
if (!mqttClient.connected())
{
connect();
}
if (!mqttClient.loop())
{
DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state());
};
// DccMQTTProc::loop(); //!< give time to the command processor to handle msg ..
// DccMQTT::publish(); //!< publish waiting messages from the outgoing queue
// DccTelemetry::deltaT(1);
}
// #include <avr/pgmspace.h> // for PROGMEM use // #include <avr/pgmspace.h> // for PROGMEM use
// #include <Diag/DIAG.h> // Diagnostig output to the serial terminal // #include <Diag/DIAG.h> // Diagnostig output to the serial terminal
@ -1001,7 +1087,6 @@ void DccMQTT::setup()
// #define PUB_CSID_FMT "{\"csid\":\"%s\"}" // #define PUB_CSID_FMT "{\"csid\":\"%s\"}"
// PROGMEM const char csidfmt[] = {PUB_CSID_FMT}; // PROGMEM const char csidfmt[] = {PUB_CSID_FMT};
// void DccMQTT::setup(DCCEXParser p) // void DccMQTT::setup(DCCEXParser p)
// { // {
// char _csidMsg[64]{'\0'}; //!< string buffer for the serialized message to return // char _csidMsg[64]{'\0'}; //!< string buffer for the serialized message to return
@ -1021,7 +1106,6 @@ void DccMQTT::setup()
// sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID()); // sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID());
// mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic // mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic
// /** // /**
// * @todo set the connect status with a retained message on the $connected topic /admin/<csid>/$connected as used in the connect // * @todo set the connect status with a retained message on the $connected topic /admin/<csid>/$connected as used in the connect
// * // *

View File

@ -1,11 +1,11 @@
#ifndef _DccMQTT_h_ #ifndef _DccMQTT_h_
#define _DccMQTT_h_ #define _DccMQTT_h_
#if __has_include ( "config.h") #if __has_include("config.h")
#include "config.h" #include "config.h"
#else #else
#warning config.h not found. Using defaults from config.example.h #warning config.h not found. Using defaults from config.example.h
#include "config.example.h" #include "config.example.h"
#endif #endif
#include "defines.h" #include "defines.h"
@ -13,8 +13,17 @@
#include <DCCEXParser.h> #include <DCCEXParser.h>
#include <Queue.h> #include <Queue.h>
#include <Ethernet.h> #include <Ethernet.h>
#include <Dns.h>
#define MAXPAYLOAD 64 #define MAXPAYLOAD 64
#define MAXDOMAINLENGTH 32
#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked
#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ?
#define MAXTSTR 30 //!< max length of a topic string
#define MAXCONNECTID 40
#define CLIENTIDSIZE 6
#define MAXRECONNECT 5
// Define Broker configurations; Values are provided in the following order // Define Broker configurations; Values are provided in the following order
// MQTT_BROKER_PORT 9883 // MQTT_BROKER_PORT 9883
@ -23,51 +32,91 @@
// MQTT_BROKER_USER "dcccs" // MQTT_BROKER_USER "dcccs"
// MQTT_BROKER_PASSWD "dcccs$3020" // MQTT_BROKER_PASSWD "dcccs$3020"
// MQTT_BROKER_CLIENTID_PREFIX "dcc$lms-" // MQTT_BROKER_CLIENTID_PREFIX "dcc$lms-"
struct MQTTBroker
{
#define LOCAL_MQTT_BROKER F("LOCAL_MQTT_BROKER"), new MQTTBroker( 1883, {10, 0, 0, 2}, "my.local.server", MQ_UNUSED, MQ_UNUSED, MQ_UNUSED)
#define DCCEX_MQTT_BROKER F("DCCEX_MQTT_BROKER"), new MQTTBroker( 9883, {51, 210, 151, 143}, "dcclms.modelrailroad.ovh", "dcccs", "dcccs$3020", "dcc$lms-")
struct MQTTBroker {
int port; int port;
IPAddress ip;
const FSH *domain = nullptr;
const FSH *user = nullptr;
const FSH *pwd = nullptr;
const FSH *prefix = nullptr;
byte cType; // connection type to identify valid params
MQTTBroker(int p)(port(p)){}; IPAddress resovleBroker(const FSH *d){
DNSClient dns;
IPAddress bip;
char domain[MAXDOMAINLENGTH];
strcpy_P(domain, (const char *)d);
dns.begin(Ethernet.dnsServerIP());
if (dns.getHostByName(domain, bip) == 1)
{
DIAG(F("MQTT Broker/ %s = %d.%d.%d.%d"), domain, bip[0], bip[1],bip[2],bip[3]);
}
else
{
DIAG(F("MQTT Dns lookup for %s failed"), domain);
}
return bip;
}
MQTTBroker(int p, IPAddress i, const FSH *d) : port(p), ip(i), domain(d), cType(1) {};
MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass) : port(p), ip(i), domain(d), user(uid), pwd(pass), cType(2){};
MQTTBroker(int p, IPAddress i, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix), cType(3){};
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), domain(d), user(uid), pwd(pass), prefix(pfix), cType(4)
{
ip = resovleBroker(d);
};
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass) : port(p), domain(d), user(uid), pwd(pass), cType(5)
{
ip = resovleBroker(d);
};
MQTTBroker(int p, const FSH *d) : port(p), domain(d), cType(6)
{
ip = resovleBroker(d);
};
}; };
struct DccMQTTMsg {
struct DccMQTTMsg
{
char payload[MAXPAYLOAD]; char payload[MAXPAYLOAD];
}; };
enum DccMQTTState
enum DccMQTTState { {
INIT, INIT,
CONFIGURED, // server/client objects set CONFIGURED, // server/client objects set
CONNECTED // mqtt broker is connected CONNECTED // mqtt broker is connected
}; };
class DccMQTT class DccMQTT
{ {
private: private:
static DccMQTT singleton; static DccMQTT singleton;
DccMQTT() = default; DccMQTT() = default;
DccMQTT(const DccMQTT&); // non construction-copyable DccMQTT(const DccMQTT &); // non construction-copyable
DccMQTT& operator=( const DccMQTT& ); // non copyable DccMQTT &operator=(const DccMQTT &); // non copyable
void setup(const FSH *id, MQTTBroker *broker);
void connect(); // (re)connects to the broker
boolean subscribe();
EthernetClient ethClient; // TCP Client object for the MQ Connection EthernetClient ethClient; // TCP Client object for the MQ Connection
IPAddress server; // MQTT server object IPAddress server; // MQTT server object
PubSubClient mqttClient; // PubSub Endpoint for data exchange PubSubClient mqttClient; // PubSub Endpoint for data exchange
MQTTBroker *broker; // Broker configuration object as set in config.h
// EthernetClient ethClient = ETHNetwork::getServer().available();
Queue<DccMQTTMsg> in; Queue<DccMQTTMsg> in;
Queue<DccMQTTMsg> out; Queue<DccMQTTMsg> out;
char clientID[(CLIENTIDSIZE*2)+1];
DccMQTTState mqState = INIT; DccMQTTState mqState = INIT;
public: public:
static DccMQTT *get() noexcept
static DccMQTT *get() noexcept { {
return &singleton; return &singleton;
} }
@ -81,14 +130,10 @@ public:
~DccMQTT() = default; ~DccMQTT() = default;
}; };
// /** // /**
// * @brief MQTT broker configuration done in config.h // * @brief MQTT broker configuration done in config.h
// */ // */
// // Class for setting up the MQTT connection / topics / queues for processing commands and sendig back results // // Class for setting up the MQTT connection / topics / queues for processing commands and sendig back results
// #define MAXDEVICEID 20 // maximum length of the unique id / device id // #define MAXDEVICEID 20 // maximum length of the unique id / device id
@ -178,5 +223,4 @@ public:
// ~DccMQTT(); // ~DccMQTT();
// }; // };
#endif #endif

View File

@ -35,6 +35,7 @@ bool Diag::WIFI=false;
bool Diag::WITHROTTLE=false; bool Diag::WITHROTTLE=false;
bool Diag::ETHERNET=false; bool Diag::ETHERNET=false;
bool Diag::LCN=false; bool Diag::LCN=false;
bool Diag::MQTT=false;
void StringFormatter::diag( const FSH* input...) { void StringFormatter::diag( const FSH* input...) {