diff --git a/DccMQTT.cpp b/DccMQTT.cpp index fd651c7..36559c9 100644 --- a/DccMQTT.cpp +++ b/DccMQTT.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -62,6 +63,9 @@ DccMQTT DccMQTT::singleton; auto mqtt = DccMQTT::get(); +// The RingBuffer size / no Withrottle over MQ so we don't need the huge buffer as for ethernet +#define OUT_BOUND_SIZE 256 + // pairing functions used for creating a client identifier // when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published // shall be a admin message with a random number @@ -71,7 +75,7 @@ auto mqtt = DccMQTT::get(); // during processing then send the replies to the topic from which the command was recieved. // Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients -int32_t cantorEncode(int a, int b) +long cantorEncode(long a, long b) { return (((a + b) * (a + b + 1)) / 2) + b; } @@ -109,28 +113,76 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) { errno = 0; payload[length] = '\0'; // make sure we have the string terminator in place - DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length); + + // DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length); switch (payload[0]) { case '<': { - // DCC-EX command - auto pool = mqtt->getPool(); - auto q = mqtt->getIncomming(); - csmsg_t tm; - strlcpy(tm.cmd, (char *)payload, length + 1); - // Add the recieved command to the pool - int idx = pool->setItem(tm); + const char s[2] = "/"; // topic delimiter is / + char *token; + byte mqsocket; + + /* get the first token = ClientID */ + token = strtok(topic, s); + /* get the second token = topicID */ + token = strtok(NULL, s); + if ( token == NULL ) { + DIAG(F("Can't identify sender #1; command send on wrong topic")); + return; + // don't do anything as we wont know where to send the results + // normally the topicid shall be valid as we only have subscribed to that one and nothing else + // comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to + // able to accept other connections ) + } else { + auto topicid = atoi(token); + // verify that there is a MQTT client with that topic id connected + auto clients = mqtt->getClients(); + bool isClient = false; + // check in the array of clients if we have one with the topicid + // start at 1 as 0 is not allocated as mqsocket + for( int i = 1; i <= mqtt->getClientSize(); i++ ) { + if (clients[i].topic == topicid) { + isClient = true; + mqsocket = i; + break; + } + } + if(!isClient) { + // no such client connected + DIAG(F("Can't identify sender #2; command send on wrong topic")); + return; + } + + } + // if we make it until here we dont even need to test the last "cmd" element from the topic as there is no + // subscription for anything else + + // DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid); + + // Prepare the DCC-EX command + auto pool = mqtt->getPool(); // message pool + auto q = mqtt->getIncomming(); // incomming queue + + csmsg_t tm; // topic message + + + strlcpy(tm.cmd, (char *)payload, length + 1); // message payload + tm.mqsocket = mqsocket; // on which socket did we recieve the mq message + int idx = pool->setItem(tm); // Add the recieved command to the pool + if (idx == -1) { DIAG(F("MQTT Command pool full. Could not handle recieved command.")); return; } - // Add the index of the pool item to the incomming queue - q->push(idx); + + q->push(idx); // Add the index of the pool item to the incomming queue + DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); + break; } case 'm': @@ -143,13 +195,11 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) char buffer[30]; memset(buffer, 0, 30); - - char *tmp = (char *)payload + 3; strlcpy(buffer, tmp, length); buffer[length - 4] = '\0'; - DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); + // DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); auto distantid = strtol(buffer, NULL, 10); @@ -163,12 +213,16 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) DIAG(F("Invalid Handshake ID")); return; } + // --------------------------- + // Create a new MQTT client + // --------------------------- // check in the clients if the distantid has been set already somewhere // if so we either have a new one with the same id then we have a collision -> publish a collision // or its the same i.e; the message comming back as we are subscribed -> stop here // All is ok so set up the channel; MQTT Ctrl command + auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer if(subscriberid == 0) { @@ -176,25 +230,46 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) return; } - auto topicid = cantorEncode(subscriberid, (int)distantid); - + // long a = subscriberid; + // long b = distantid; + // auto topicid = cantorEncode(a, b); + + auto topicid = cantorEncode((long)subscriberid, (long)distantid); DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid); // extract the number delivered from // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one - - clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; // add subscribertopic + + // initalize the new mqtt client object + clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; + + // add/subcribe to the topic for listening on cmds recieved via the channel for the client with + // subscriberid as identifier char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH]; mqtt->getSubscriberTopic(subscriberid, tbuffer); auto ok = mqtt->subscribe(tbuffer); DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK"); + // send the topicid on which the CS will listen for commands to the MQTT client on the root topic memset(buffer, 0, 30); sprintf(buffer, "mc(%d,%ld)", (int)distantid, (long) topicid); DIAG(F("Publishing: [%s] to [%s]"), buffer, mqtt->getClientID()); mqtt->publish(mqtt->getClientID(), buffer); + + // on the cs side all is set and we declare that the cs is open for business clients[subscriberid].open = true; - // we are done + + // we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there + // in the < case we should test that we got the command on the right topic ... + + DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer); + memset(buffer, 0, 30); + sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), topicid); + DIAG(F("MQTT CS is publishing return information to [%s]"), buffer); + memset(buffer, 0, 30); + sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), topicid); + DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer); + return; } default: @@ -221,24 +296,19 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) void DccMQTT::connect() { - char *connectID = new char[MAXCONNECTID]; - connectID[0] = '\0'; - int reconnectCount = 0; + // Build the connect ID : Prefix + clientID if (broker->prefix != nullptr) { - char tmp[20]; - strcpy_P(tmp, (const char *)broker->prefix); - connectID[0] = '\0'; - strcat(connectID, tmp); + connectID[0] ='\0'; + strcpy_P(connectID, (const char *)broker->prefix); } - strcat(connectID, clientID); - DIAG(F("MQTT %s (re)connecting ..."), connectID); - // Build the connect ID : Prefix + clientID + // Connect to the broker + DIAG(F("MQTT %s (re)connecting ..."), connectID); while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) { DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType); @@ -342,7 +412,7 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); - // mqttClient.publish(clientID, "Hello from DccEX"); + outboundRing = new RingStream(OUT_BOUND_SIZE); } void DccMQTT::loop() @@ -363,15 +433,34 @@ void DccMQTT::loop() { auto idx = in.peek(); auto c = pool.getItem(in.pop(), &state); - DIAG(F("MQTT Processing pool: %d with command: %s"), idx, c->cmd); + DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket); + outboundRing->mark((uint8_t)c->mqsocket); + DIAG(F("#1")); + CommandDistributor::parse(c->mqsocket,(byte *)c->cmd,outboundRing); + DIAG(F("#2")); + outboundRing->commit(); + DIAG(F("#3")); } + // handle at most 1 outbound transmission + int socketOut=outboundRing->read(); + if (socketOut>=0) { + int count=outboundRing->count(); + DIAG(F("MQTT publish to mqsocket=%d, count=:%d"), socketOut,count); + + // for(;count>0;count--) clients[socketOut].write(outboundRing->read()); + + for(;count>0;count--) Serial.print((char) outboundRing->read()); + + // clients[socketOut].flush(); //maybe + } + // read outgoing queue for publishing replies; one per loop - if (out.count() > 0) - { - auto m = pool.getItem(out.pop(), &state); - DIAG(F("MQTT Publish reply from command %s"), m->cmd); - } + // if (out.count() > 0) + // { + // auto m = pool.getItem(out.pop(), &state); + // DIAG(F("MQTT Publish reply from command %s"), m->cmd); + // } // DccMQTTProc::loop(); //!< give time to the command processor to handle msg .. // take a command from the incomming queue diff --git a/DccMQTT.h b/DccMQTT.h index 4025c62..5067526 100644 --- a/DccMQTT.h +++ b/DccMQTT.h @@ -87,14 +87,15 @@ struct MQTTBroker * */ typedef struct csmsg_t { - char cmd[MAXPAYLOAD]; + char cmd[MAXPAYLOAD]; // recieved command message + byte mqsocket; // from which mqsocket / subscriberid } csmsg_t; typedef struct csmqttclient_t { - int distant; // random int number recieved from the subscriber - byte mqsocket; // mqtt socket = subscriberid provided by the cs - int32_t topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands - bool open; // true as soon as we have send the id to the mq broker for the client to pickup + int distant; // random int number recieved from the subscriber + byte mqsocket; // mqtt socket = subscriberid provided by the cs + int32_t topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands + bool open; // true as soon as we have send the id to the mq broker for the client to pickup } csmqttclient_t; enum DccMQTTState @@ -107,32 +108,31 @@ enum DccMQTTState class DccMQTT { private: - static DccMQTT singleton; - DccMQTT() = default; - DccMQTT(const DccMQTT &); // non construction-copyable - DccMQTT &operator=(const DccMQTT &); // non copyable +// Methods + DccMQTT() = default; + DccMQTT(const DccMQTT &); // non construction-copyable + DccMQTT &operator=(const DccMQTT &); // non copyable - void setup(const FSH *id, MQTTBroker *broker); - void connect(); // (re)connects to the broker - // boolean subscribe(); - - - - EthernetClient ethClient; // TCP Client object for the MQ Connection - IPAddress server; // MQTT server object - PubSubClient mqttClient; // PubSub Endpoint for data exchange - MQTTBroker *broker; // Broker configuration object as set in config.h +void setup(const FSH *id, MQTTBroker *broker); +void connect(); // (re)connects to the broker +// Members +static DccMQTT singleton; + EthernetClient ethClient; // TCP Client object for the MQ Connection + IPAddress server; // MQTT server object + PubSubClient mqttClient; // PubSub Endpoint for data exchange + MQTTBroker *broker; // Broker configuration object as set in config.h - ObjectPool pool; - Queue in; - Queue out; + ObjectPool pool; // Pool of commands recieved for the CS + Queue in; // Queue of indexes into the pool according to incomming cmds - char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID - csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients - - uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital - // handshake in form of a random number - DccMQTTState mqState = INIT; + char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID + csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients + char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker + uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection + + DccMQTTState mqState = INIT; + RingStream *outboundRing; + char buffer[MAXTMSG]; // temp buffer for manipulating strings / messages public: static DccMQTT *get() noexcept @@ -149,10 +149,13 @@ public: ObjectPool *getPool() { return &pool; }; Queue *getIncomming() { return ∈ }; - Queue *getOutgoing() { return &out; }; char *getClientID() { return clientID; }; + uint8_t getClientSize() { return subscriberid; } + // initalized to 0 so that the first id comming back is 1 + // index 0 in the clients array is not used therefore + //! improvement here to be done to save some bytes uint8_t obtainSubscriberID(){ if ( subscriberid == MAXMQTTCONNECTIONS) { return 0; // no more subscriber id available @@ -164,7 +167,7 @@ public: // but to save space we calculate it at each publish void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){ - sprintf(tbuffer, "%s/%ld", clientID, (long) clients[subscriberid].topic); + sprintf(tbuffer, "%s/%ld/cmd", clientID, (long) clients[subscriberid].topic); } csmqttclient_t *getClients() { return clients; };