diff --git a/MQTTInterface.cpp b/MQTTInterface.cpp index d5fed9c..f93546a 100644 --- a/MQTTInterface.cpp +++ b/MQTTInterface.cpp @@ -50,8 +50,39 @@ void cantorDecode(int32_t c, int *a, int *b) *a = w - *b; } +/** + * @brief callback used from DIAG to send diag messages to the broker / clients + * + * @param msg + * @param length + */ +void mqttDiag(const char *msg, const int length) +{ + + if (MQTTInterface::get()->getState() == CONNECTED) + { + // if not connected all goes only to Serial; + // if CONNECTED we have at least the root topic subscribed to + auto mqSocket = MQTTInterface::get()->getActive(); + char topic[MAXTSTR]; + memset(topic, 0, MAXTSTR); + + if (mqSocket == 0) + { // send to root topic of the commandstation as it doen't concern a specific client at this point + sprintf(topic, "%s", MQTTInterface::get()->getClientID()); + } + else + { + sprintf(topic, "%s/%ld/diag", MQTTInterface::get()->getClientID(), MQTTInterface::get()->getClients()[mqSocket].topic); + } + // Serial.print(" ---- MQTT pub to: "); Serial.print(topic); Serial.print(" Msg: "); Serial.print(msg); + MQTTInterface::get()->publish(topic, msg); + } +} + void MQTTInterface::setup() { + StringLogger::get().addDiagWriter(mqttDiag); singleton = new MQTTInterface(); if (!singleton->connected) @@ -97,6 +128,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) { case '<': // Recieved a DCC-EX Command { + if (payload[1] == '*') { return;} // it's a bounced diag message const char s[2] = "/"; // topic delimiter is / char *token; byte mqsocket; @@ -538,8 +570,22 @@ void MQTTInterface::loop() bool showonce = false; +auto s = millis(); +void loopPing(int interval) +{ + auto c = millis(); + if (c - s > 2000) + { + DIAG(F("loop alive")); // ping every 2 sec + s = c; + } +} + + void MQTTInterface::loop2() { + + loopPing(2000); // ping every 2 sec // Connection impossible so just don't do anything if (singleton->mqState == CONNECTION_FAILED) { diff --git a/MQTTInterface.h b/MQTTInterface.h index 4b980a1..e6f5db2 100644 --- a/MQTTInterface.h +++ b/MQTTInterface.h @@ -178,34 +178,33 @@ private: char clientID[(CLIENTIDSIZE * 2) + 1]; // unique ID of the commandstation; not to confused with the connectionID csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker - uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection + byte subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection + byte activeSubscriber = 0; // if its 0 no active Subscriber; set as soon as we recieve a command of go into processing on the CS bool connected = false; // set to true if the ethernet connection is available MQTTInterfaceState mqState = INIT; // Status of the MQBroker connection RingStream *outboundRing; // Buffer for collecting the results from the command parser PubSubClient *mqttClient; // PubSub Endpoint for data exchange + public: - static MQTTInterface *get() noexcept - { - return singleton; - } + static MQTTInterface *get() noexcept { return singleton;} - boolean subscribe(const char *topic); + boolean subscribe(const char *topic); + void publish(const char *topic, const char *payload); - void publish(const char *topic, const char *payload); - - ObjectPool *getPool() { return &pool; }; - Queue *getIncomming() { return ∈ }; - Queue *getSubscriptionQueue() { return &subscriberQueue; }; - - char *getClientID() { return clientID; }; - uint8_t getClientSize() { return subscriberid; } + ObjectPool *getPool() { return &pool; }; + Queue *getIncomming() { return ∈ }; + Queue *getSubscriptionQueue() { return &subscriberQueue; }; + MQTTInterfaceState getState() { return mqState; }; + byte getActive() { return activeSubscriber; }; + void setActive(byte mqSocket) { activeSubscriber = mqSocket; }; + char *getClientID() { return clientID; }; + uint8_t getClientSize() { return subscriberid; }; // initalized to 0 so that the first id comming back is 1 // index 0 in the clients array is not used therefore - //! improvement here to be done to save some bytes - + uint8_t obtainSubscriberID() { if (subscriberid == MAXMQTTCONNECTIONS) @@ -216,7 +215,7 @@ public: } csmqttclient_t *getClients() { return clients; }; - RingStream *getRingStream() { return outboundRing; }; // debug only + RingStream *getRingStream() { return outboundRing; }; static void setup(); static void loop(); @@ -224,4 +223,5 @@ public: ~MQTTInterface() = default; }; + #endif \ No newline at end of file