diff --git a/DccMQTT.cpp b/DccMQTT.cpp index 9f8281e..d1f58ee 100644 --- a/DccMQTT.cpp +++ b/DccMQTT.cpp @@ -117,7 +117,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) errno = 0; payload[length] = '\0'; // make sure we have the string terminator in place - // DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length); + DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length); switch (payload[0]) { @@ -175,8 +175,9 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) csmsg_t tm; // topic message - if ( length + 1 > MAXPAYLOAD) { - DIAG(F("MQTT Command too long (> %d characters)"),MAXPAYLOAD); + if (length + 1 > MAXPAYLOAD) + { + DIAG(F("MQTT Command too long (> %d characters)"), MAXPAYLOAD); } strlcpy(tm.cmd, (char *)payload, length + 1); // message payload tm.mqsocket = mqsocket; // on which socket did we recieve the mq message @@ -200,8 +201,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) { case 'i': { - - + char buffer[30]; char *tmp = (char *)payload + 3; strlcpy(buffer, tmp, length); @@ -301,11 +301,11 @@ void DccMQTT::connect() { int reconnectCount = 0; + connectID[0] = '\0'; // Build the connect ID : Prefix + clientID if (broker->prefix != nullptr) { - connectID[0] = '\0'; strcpy_P(connectID, (const char *)broker->prefix); } strcat(connectID, clientID); @@ -324,6 +324,7 @@ void DccMQTT::connect() if (mqttClient.connect(connectID)) { DIAG(F("MQTT Broker connected ...")); + mqState = CONNECTED; } else { @@ -350,6 +351,7 @@ void DccMQTT::connect() if (reconnectCount == MAXRECONNECT) { DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT); + mqState = CONNECTION_FAILED; } } } @@ -373,6 +375,37 @@ void DccMQTT::publish(char *topic, char *payload) */ void DccMQTT::setup() { + // setup Ethnet connection first + byte mac[6]; + DCCTimer::getSimulatedMacAddress(mac); + +#ifdef IP_ADDRESS + Ethernet.begin(mac, IP_ADDRESS); +#else + if (Ethernet.begin(mac) == 0) + { + DIAG(F("Ethernet.begin FAILED")); + return; + } +#endif + DIAG(F("Ethernet.begin OK.")); + if (Ethernet.hardwareStatus() == EthernetNoHardware) + { + DIAG(F("Ethernet shield not found")); + return; + } + if (Ethernet.linkStatus() == LinkOFF) + { + DIAG(F("Ethernet cable not connected")); + return; + } + + IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address + + DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]); + DIAG(F("Port:%d"), IP_PORT); + + // setup the MQBroker setup(CSMQTTBROKER); } @@ -387,26 +420,21 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) //Create the MQTT environment and establish inital connection to the Broker broker = b; - // get a eth client session DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port); - ethClient = EthernetInterface::get()->getServer()->accept(); // get a cleint from the EthernetInterface - // initalize MQ Broker - mqttClient = PubSubClient(ethClient); - mqttClient.setServer(broker->ip, broker->port); - - DIAG(F("MQTT Client : Server ok ...%x/%x"), ethClient, mqttClient); - - mqttClient.setCallback(mqttCallback); // Initalize callback function for incomming messages - - DIAG(F("MQTT Client : Callback set ...")); byte mqbid[CLIENTIDSIZE] = {0}; DCCTimer::getSimulatedMacAddress(mqbid); + // initalize MQ Broker + + mqttClient = PubSubClient(broker->ip, broker->port, mqttCallback, ethClient); + DIAG(F("MQTT Client created ok...")); + array_to_string(mqbid, CLIENTIDSIZE, clientID); DIAG(F("MQTT Client ID : %s"), clientID); + connect(); // inital connection as well as reconnects auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions @@ -416,7 +444,11 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) void DccMQTT::loop() { - + // Connection impossible so just don't do anything + if (mqState == CONNECTION_FAILED) + { + return; + } if (!mqttClient.connected()) { connect(); @@ -426,26 +458,26 @@ void DccMQTT::loop() DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state()); }; - // read incomming queue for processing; one per loop bool state; + DIAG(F("in.count: %d"), in.count()); if (in.count() > 0) { auto idx = in.peek(); auto c = pool.getItem(in.pop(), &state); DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket); - DIAG(F("Ring free space1: %d"),outboundRing->freeSpace()); + DIAG(F("Ring free space1: %d"), outboundRing->freeSpace()); outboundRing->mark((uint8_t)c->mqsocket); - // CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing); - StringFormatter::send(outboundRing, F("Test result message")); + CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing); + // StringFormatter::send(outboundRing, F("Test result message")); outboundRing->commit(); - DIAG(F("Ring free space2: %d"),outboundRing->freeSpace()); - pool.returnItem(idx); + DIAG(F("Ring free space2: %d"), outboundRing->freeSpace()); + pool.returnItem(idx); } - // handle at most 1 outbound transmission int socketOut = outboundRing->read(); + DIAG(F("socketOut: %d"), socketOut); if (socketOut > 0) // mqsocket / clientid can't be 0 .... { int count = outboundRing->count(); @@ -454,7 +486,7 @@ void DccMQTT::loop() DIAG(F("MQTT publish to mqsocket=%d, count=:%d on topic %s"), socketOut, count, buffer); // construct the payload char payload[count]; - payload[count]='\0'; + payload[count] = '\0'; char *tmp = payload; for (; count > 0; count--) { @@ -465,5 +497,4 @@ void DccMQTT::loop() DIAG(F("MQTT publish with payload:\n%s"), payload); // mqtt->publish(buffer, payload); } - } \ No newline at end of file