diff --git a/DccMQTT.cpp b/DccMQTT.cpp index e973793..80cde3b 100644 --- a/DccMQTT.cpp +++ b/DccMQTT.cpp @@ -38,6 +38,8 @@ #include #include #include +#include +#include //--------- // Variables @@ -46,10 +48,6 @@ DccMQTT DccMQTT::singleton; auto mqtt = DccMQTT::get(); -char topicName[MAXTBUF]; -char topicMessage[MAXTMSG]; -// char keyword[MAX_KEYWORD_LENGTH]; - // pairing functions used for creating a client identifier // when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published // shall be a admin message with a random number @@ -61,7 +59,6 @@ char topicMessage[MAXTMSG]; int cantorEncode(int a, int b) { - return (((a + b) * (a + b + 1)) / 2) + b; } @@ -96,9 +93,7 @@ static void array_to_string(byte array[], unsigned int len, char buffer[]) // callback when a message arrives from the broker; push cmd into the incommming queue void mqttCallback(char *topic, byte *payload, unsigned int length) { - topicName[0] = '\0'; - topicMessage[0] = '\0'; - strcpy(topicName, topic); + errno = 0; switch (payload[0]) { @@ -119,14 +114,56 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) } // Add the index of the pool item to the incomming queue q->push(idx); - DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd); + DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd); break; } case '(': { - // MQTT Ctrl command + char buffer[30]; + memset(buffer, 0, 30); payload[length] = '\0'; - DIAG(F("MQTT Ctrl Message arrived [%s]: [%s]"), topicName, (char *)payload); + + char *tmp = (char *)payload + 1; + strlcpy(buffer, tmp, length); + buffer[length - 2] = '\0'; + + DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length); + + auto distantid = strtol(buffer, NULL, 10); + + if (errno == ERANGE || distantid > INT16_MAX) + { + DIAG(F("Invalid Handshake ID; must be in the range of int")); + return; + } + if (distantid == 0) + { + DIAG(F("Invalid Handshake ID")); + return; + } + + // All is ok so set up the channel; MQTT Ctrl command + auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer + auto topicid = cantorEncode(subscriberid, (int)distantid); + + DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid); + // extract the number delivered from + + auto clients = mqtt->getClients(); + + // we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one + + clients[subscriberid] = {(int)distantid, subscriberid, topicid}; // add subscribertopic + + char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH]; + mqtt->getSubscriberTopic(subscriberid, tbuffer); + auto ok = mqtt->subscribe(tbuffer); + DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK":"NOK"); + + memset(buffer, 0, 30); + sprintf(buffer, "(%d,%d)", (int) distantid, topicid ); + mqtt->publish(topic, buffer); + break; } default: @@ -208,9 +245,14 @@ void DccMQTT::connect() // for the time being only one topic at the root which os the unique clientID from the MCU // QoS is 0 by default -boolean DccMQTT::subscribe() +boolean DccMQTT::subscribe(char *topic) { - return mqttClient.subscribe(clientID); + return mqttClient.subscribe(topic); +} + +void DccMQTT::publish(char *topic, char *payload) +{ + mqttClient.publish(topic, payload); } /** @@ -258,11 +300,11 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b) DIAG(F("MQTT Client ID : %s"), clientID); connect(); // inital connection as well as reconnects - auto sub = DccMQTT::subscribe(); // set up all subscriptions + auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed"); - mqttClient.publish(clientID, "Hello from DccEX"); + // mqttClient.publish(clientID, "Hello from DccEX"); // sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID()); // mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic diff --git a/DccMQTT.h b/DccMQTT.h index c7874b7..0b4c672 100644 --- a/DccMQTT.h +++ b/DccMQTT.h @@ -15,6 +15,7 @@ #include #include #include +#include #define MAXPAYLOAD 64 #define MAXDOMAINLENGTH 32 @@ -26,6 +27,7 @@ #define CLIENTIDSIZE 6 // #define MAXRECONNECT 5 // reconnection tries before final failure #define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers +#define MAXTOPICLENGTH 20 // Define Broker configurations; Values are provided in the following order // MQTT_BROKER_PORT 9883 @@ -89,10 +91,10 @@ typedef struct csmsg_t { } csmsg_t; typedef struct csmqttclient_t { - uint8_t subscriber; - uint8_t cs; - -} csmqttclient_t + int distant; // random int number recieved from the subscriber + byte mqsocket; // mqtt socket = subscriberid provided by the cs + int topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands +} csmqttclient_t; enum DccMQTTState { @@ -111,10 +113,10 @@ private: void setup(const FSH *id, MQTTBroker *broker); void connect(); // (re)connects to the broker - boolean subscribe(); - // static int cantorEncode(int a, int b); - // static void cantorDecode(int c, int *a, int *b); - + // boolean subscribe(); + + + EthernetClient ethClient; // TCP Client object for the MQ Connection IPAddress server; // MQTT server object PubSubClient mqttClient; // PubSub Endpoint for data exchange @@ -124,18 +126,21 @@ private: Queue in; Queue out; - char clientID[(CLIENTIDSIZE*2)+1]; + char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID + csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients + uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital + // handshake in form of a random number DccMQTTState mqState = INIT; - - - public: static DccMQTT *get() noexcept { return &singleton; } + + boolean subscribe(char *topic); + void publish(char *topic, char* payload); bool isConfigured() { return mqState == CONFIGURED; }; bool isConnected() { return mqState == CONNECTED; }; @@ -145,6 +150,23 @@ public: Queue *getIncomming() { return ∈ }; Queue *getOutgoing() { return &out; }; + char *getClientID() { return clientID; }; + + uint8_t obtainSubscriberID(){ + if ( subscriberid == UCHAR_MAX) { + return 0; // no more subscriber id available + } + return (++subscriberid); + } + + // this could be calculated once forever at each new connect and be stored + // but to save space we calculate it at each publish + + void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){ + sprintf(tbuffer, "%s/%d", clientID, clients[subscriberid].topic); + } + + csmqttclient_t *getClients() { return clients; }; void setup(); // called at setup in the main ino file void loop(); diff --git a/ObjectPool.h b/ObjectPool.h index 5149a54..54691bc 100644 --- a/ObjectPool.h +++ b/ObjectPool.h @@ -11,7 +11,6 @@ class ObjectPool // just make sure that we don't create a pool eating up all memory @compiletime static_assert(length <= MAXPOOLSIZE); - struct item { T i; diff --git a/test/mpub.sh b/test/mpub.sh new file mode 100755 index 0000000..128843a --- /dev/null +++ b/test/mpub.sh @@ -0,0 +1 @@ +mosquitto_pub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -m "(255)" diff --git a/test/msub.sh b/test/msub.sh new file mode 100755 index 0000000..49d84d8 --- /dev/null +++ b/test/msub.sh @@ -0,0 +1 @@ + mosquitto_sub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -k 600