diff --git a/MQTTInterface.cpp b/MQTTInterface.cpp index 58cb8c0..1f48ae7 100644 --- a/MQTTInterface.cpp +++ b/MQTTInterface.cpp @@ -74,7 +74,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) DIAG(F("MQTT Callback:[%s] [%s] [%d] on interface [%x]"), topic, (char *)payload, length, mqtt); switch (payload[0]) { - case '<': + case '<': // Recieved a DCC-EX Command { const char s[2] = "/"; // topic delimiter is / char *token; @@ -120,8 +120,6 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) // if we make it until here we dont even need to test the last "cmd" element from the topic as there is no // subscription for anything else - // DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid); - // Prepare the DCC-EX command csmsg_t tm; // topic message @@ -146,16 +144,17 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); break; } - case 'm': + case 'm': // Recieved an MQTT Connection management message { switch (payload[1]) { - case 'i': + case 'i': // Inital handshake message to create the tunnel { char buffer[MAXPAYLOAD]; char *tmp = (char *)payload + 3; strlcpy(buffer, tmp, length); buffer[length - 4] = '\0'; + // DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); auto distantid = strtol(buffer, NULL, 10); @@ -170,15 +169,8 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) DIAG(F("MQTT Invalid Handshake ID")); return; } - // --------------------------- + // Create a new MQTT client - // --------------------------- - - // check in the clients if the distantid has been set already somewhere - // if so we either have a new one with the same id then we have a collision -> publish a collision - // or its the same i.e; the message comming back as we are subscribed -> stop here - - // All is ok so set up the channel; MQTT Ctrl command auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer @@ -190,10 +182,8 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) auto topicid = cantorEncode((long)subscriberid, (long)distantid); DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid); - // extract the number delivered from - // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one - // initalize the new mqtt client object + // extract the number delivered from & initalize the new mqtt client object clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available auto sq = mqtt->getSubscriptionQueue(); @@ -201,16 +191,15 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) return; } - default: + default: // Invalid message { // ignore return; } } } - default: - { - // invalid command + default: // invalid command / message + { DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload); break; } @@ -473,26 +462,33 @@ void checkSubscribers(Queue &sq, csmqttclient_t *clients) if (Diag::MQTT) DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK"); - // send the topicid on which the CS will listen for commands to the MQTT client on the root topic - char buffer[30]; - memset(buffer, 0, 30); - sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic); + // send the topic on which the CS will listen for commands and the ones on which it will publish for the connecting + // client to pickup. Once the connecting client has setup other topic setup messages on the main channel shall be + // ignored + // JSON message { init: channels: {result: , diag: }} + + char buffer[MAXPAYLOAD*2]; + memset(buffer, 0, MAXPAYLOAD*2); + + // sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic); + + sprintf(buffer, "{ \"init\": %d, \"subscribeto\": {\"result\": \"%s/%ld/result\" , \"diag\": \"%s/%ld/diag\" }, \"publishto\": {\"cmd\": \"%s/%ld/cmd\" } }", + (int)clients[s].distant, + mqtt->getClientID(), + clients[s].topic, + mqtt->getClientID(), + clients[s].topic, + mqtt->getClientID(), + clients[s].topic + ); + if (Diag::MQTT) - DIAG(F("MQTT Publishing: [%s] to [%s]"), buffer, mqtt->getClientID()); + DIAG(F("MQTT channel setup message: [%s]"), buffer); + mqtt->publish(mqtt->getClientID(), buffer); // on the cs side all is set and we declare that the cs is open for business clients[s].open = true; - - // we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there - // in the < case we should test that we got the command on the right topic ... - DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer); - memset(buffer, 0, 30); - sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), clients[s].topic); - DIAG(F("MQTT CS is publishing return information to [%s]"), buffer); - memset(buffer, 0, 30); - sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), clients[s].topic); - DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer); } }