From 6fd866d2731e3b1a56ce1d7928fa32db258b1157 Mon Sep 17 00:00:00 2001 From: Gregor Baues Date: Wed, 12 May 2021 21:30:15 +0200 Subject: [PATCH] ok with gremlin after 3/4 recieves --- CommandDistributor.cpp | 7 +- DCCEXParser.cpp | 6 ++ DccMQTT.cpp | 156 +++++++++++++++++++++-------------------- 3 files changed, 93 insertions(+), 76 deletions(-) diff --git a/CommandDistributor.cpp b/CommandDistributor.cpp index 3bf994d..4f5f261 100644 --- a/CommandDistributor.cpp +++ b/CommandDistributor.cpp @@ -19,13 +19,18 @@ #include #include "CommandDistributor.h" #include "WiThrottle.h" +#include "DIAG.h" DCCEXParser * CommandDistributor::parser=0; void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * streamer) { + DIAG(F("CDS %d : %s : %x"), clientId, (char *) buffer, streamer); if (buffer[0] == '<') { if (!parser) parser = new DCCEXParser(); parser->parse(streamer, buffer, streamer); } - else WiThrottle::getThrottle(clientId)->parse(streamer, buffer); + else { + DIAG(F("CDS WiThrottle")); + WiThrottle::getThrottle(clientId)->parse(streamer, buffer); + } } diff --git a/DCCEXParser.cpp b/DCCEXParser.cpp index bc2045e..beaffc2 100644 --- a/DCCEXParser.cpp +++ b/DCCEXParser.cpp @@ -499,11 +499,17 @@ void DCCEXParser::parse(Print *stream, byte *com, RingStream * ringStream) return; case 's': // + // DIAG(F("s1")); StringFormatter::send(stream, F("\n"), DCCWaveform::mainTrack.getPowerMode() == POWERMODE::ON); + // DIAG(F("s2")); StringFormatter::send(stream, F("\n"), F(VERSION), F(ARDUINO_TYPE), DCC::getMotorShieldName(), F(GITHUB_SHA)); + // DIAG(F("s3")); Turnout::printAll(stream); //send all Turnout states + // DIAG(F("s4")); Output::printAll(stream); //send all Output states + // DIAG(F("s5")); Sensor::printAll(stream); //send all Sensor states + //DIAG(F("s6")); // TODO Send stats of speed reminders table return; diff --git a/DccMQTT.cpp b/DccMQTT.cpp index 36559c9..6cbeefc 100644 --- a/DccMQTT.cpp +++ b/DccMQTT.cpp @@ -43,7 +43,7 @@ #include #include #include -#include // Base (sync) MQTT library +#include #include #include #include @@ -55,6 +55,7 @@ #include #include #include +#include //--------- // Variables @@ -111,6 +112,8 @@ static void array_to_string(byte array[], unsigned int len, char buffer[]) // callback when a message arrives from the broker; push cmd into the incommming queue void mqttCallback(char *topic, byte *payload, unsigned int length) { + + auto clients = mqtt->getClients(); errno = 0; payload[length] = '\0'; // make sure we have the string terminator in place @@ -121,68 +124,75 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) case '<': { - const char s[2] = "/"; // topic delimiter is / + const char s[2] = "/"; // topic delimiter is / char *token; byte mqsocket; /* get the first token = ClientID */ token = strtok(topic, s); - /* get the second token = topicID */ + /* get the second token = topicID */ token = strtok(NULL, s); - if ( token == NULL ) { + if (token == NULL) + { DIAG(F("Can't identify sender #1; command send on wrong topic")); - return; + return; // don't do anything as we wont know where to send the results // normally the topicid shall be valid as we only have subscribed to that one and nothing else // comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to // able to accept other connections ) - } else { + } + else + { auto topicid = atoi(token); // verify that there is a MQTT client with that topic id connected - auto clients = mqtt->getClients(); + // auto clients = mqtt->getClients(); bool isClient = false; // check in the array of clients if we have one with the topicid - // start at 1 as 0 is not allocated as mqsocket - for( int i = 1; i <= mqtt->getClientSize(); i++ ) { - if (clients[i].topic == topicid) { - isClient = true; - mqsocket = i; - break; - } + // start at 1 as 0 is not allocated as mqsocket + for (int i = 1; i <= mqtt->getClientSize(); i++) + { + if (clients[i].topic == topicid) + { + isClient = true; + mqsocket = i; + break; + } } - if(!isClient) { + if (!isClient) + { // no such client connected DIAG(F("Can't identify sender #2; command send on wrong topic")); return; } - } // if we make it until here we dont even need to test the last "cmd" element from the topic as there is no - // subscription for anything else + // subscription for anything else // DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid); // Prepare the DCC-EX command - auto pool = mqtt->getPool(); // message pool - auto q = mqtt->getIncomming(); // incomming queue + auto pool = mqtt->getPool(); // message pool + auto q = mqtt->getIncomming(); // incomming queue - csmsg_t tm; // topic message + csmsg_t tm; // topic message - - strlcpy(tm.cmd, (char *)payload, length + 1); // message payload - tm.mqsocket = mqsocket; // on which socket did we recieve the mq message - int idx = pool->setItem(tm); // Add the recieved command to the pool + if ( length + 1 > MAXPAYLOAD) { + DIAG(F("MQTT Command too long (> %d characters)"),MAXPAYLOAD); + } + strlcpy(tm.cmd, (char *)payload, length + 1); // message payload + tm.mqsocket = mqsocket; // on which socket did we recieve the mq message + int idx = pool->setItem(tm); // Add the recieved command to the pool if (idx == -1) { DIAG(F("MQTT Command pool full. Could not handle recieved command.")); return; } - - q->push(idx); // Add the index of the pool item to the incomming queue - + + q->push(idx); // Add the index of the pool item to the incomming queue + DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); - + break; } case 'm': @@ -191,7 +201,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) { case 'i': { - auto clients = mqtt->getClients(); + char buffer[30]; memset(buffer, 0, 30); @@ -225,7 +235,8 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer - if(subscriberid == 0) { + if (subscriberid == 0) + { DIAG(F("MQTT no more connections are available")); return; } @@ -238,10 +249,10 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid); // extract the number delivered from // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one - + // initalize the new mqtt client object - clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; - + clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; + // add/subcribe to the topic for listening on cmds recieved via the channel for the client with // subscriberid as identifier @@ -252,16 +263,16 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) // send the topicid on which the CS will listen for commands to the MQTT client on the root topic memset(buffer, 0, 30); - sprintf(buffer, "mc(%d,%ld)", (int)distantid, (long) topicid); + sprintf(buffer, "mc(%d,%ld)", (int)distantid, (long)topicid); DIAG(F("Publishing: [%s] to [%s]"), buffer, mqtt->getClientID()); mqtt->publish(mqtt->getClientID(), buffer); // on the cs side all is set and we declare that the cs is open for business clients[subscriberid].open = true; - + // we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there - // in the < case we should test that we got the command on the right topic ... - + // in the < case we should test that we got the command on the right topic ... + DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer); memset(buffer, 0, 30); sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), topicid); @@ -301,12 +312,11 @@ void DccMQTT::connect() // Build the connect ID : Prefix + clientID if (broker->prefix != nullptr) { - connectID[0] ='\0'; + connectID[0] = '\0'; strcpy_P(connectID, (const char *)broker->prefix); } strcat(connectID, clientID); - // Connect to the broker DIAG(F("MQTT %s (re)connecting ..."), connectID); while (!mqttClient.connected() && reconnectCount < MAXRECONNECT) @@ -387,11 +397,9 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) // get a eth client session DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port); + ethClient = EthernetInterface::get()->getServer()->accept(); // get a cleint from the EthernetInterface - // ethClient = EthernetInterface::get()->getServer()->available(); - ethClient = EthernetInterface::get()->getServer()->accept(); // initalize MQ Broker - mqttClient = PubSubClient(ethClient); mqttClient.setServer(broker->ip, broker->port); @@ -406,13 +414,11 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) array_to_string(mqbid, CLIENTIDSIZE, clientID); DIAG(F("MQTT Client ID : %s"), clientID); - connect(); // inital connection as well as reconnects auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); - - outboundRing = new RingStream(OUT_BOUND_SIZE); + outboundRing = new RingStream(OUT_BOUND_SIZE); } void DccMQTT::loop() @@ -427,6 +433,7 @@ void DccMQTT::loop() DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state()); }; + // read incomming queue for processing; one per loop bool state; if (in.count() > 0) @@ -434,39 +441,38 @@ void DccMQTT::loop() auto idx = in.peek(); auto c = pool.getItem(in.pop(), &state); DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket); - outboundRing->mark((uint8_t)c->mqsocket); - DIAG(F("#1")); - CommandDistributor::parse(c->mqsocket,(byte *)c->cmd,outboundRing); - DIAG(F("#2")); + DIAG(F("Ring free space1: %d"),outboundRing->freeSpace()); + outboundRing->mark((uint8_t)c->mqsocket); + CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing); + DIAG(F("Return from CDS")); outboundRing->commit(); - DIAG(F("#3")); + DIAG(F("Ring free space2: %d"),outboundRing->freeSpace()); + pool.returnItem(idx); } - // handle at most 1 outbound transmission - int socketOut=outboundRing->read(); - if (socketOut>=0) { - int count=outboundRing->count(); - DIAG(F("MQTT publish to mqsocket=%d, count=:%d"), socketOut,count); - - // for(;count>0;count--) clients[socketOut].write(outboundRing->read()); - - for(;count>0;count--) Serial.print((char) outboundRing->read()); - - // clients[socketOut].flush(); //maybe + // + // handle at most 1 outbound transmission + int socketOut = outboundRing->read(); + if (socketOut > 0) // mqsocket / clientid can't be 0 .... + { + int count = outboundRing->count(); + DIAG(F("Ring free space3: %d"),outboundRing->freeSpace()); + // construct the topic : clientID/topicId/result + buffer[0] = '\0'; + sprintf(buffer, "%s/%d/result", clientID, (int)clients[socketOut].topic); + DIAG(F("MQTT publish to mqsocket=%d, count=:%d on topic %s"), socketOut, count, buffer); + // construct the payload + char payload[count]; + payload[count]='\0'; + char *tmp = payload; + for (; count > 0; count--) + { + *tmp = (char)outboundRing->read(); + tmp++; } + DIAG(F("Ring free space4: %d"),outboundRing->freeSpace()); + DIAG(F("MQTT publish with payload:\n%s"), payload); + // mqtt->publish(buffer, payload); + } - // read outgoing queue for publishing replies; one per loop - // if (out.count() > 0) - // { - // auto m = pool.getItem(out.pop(), &state); - // DIAG(F("MQTT Publish reply from command %s"), m->cmd); - // } - - // DccMQTTProc::loop(); //!< give time to the command processor to handle msg .. - // take a command from the incomming queue - // execute it - // store the results in the outgoing queue - - // DccMQTT::publish(); //!< publish waiting messages from the outgoing queue - // if there is someting in the outgoing queue publish on response } \ No newline at end of file