mirror of https://github.com/DCC-EX/CommandStation-EX.git synced 2025-03-13 17:43:08 +01:00

channel setup ok

channel subscrition not ok yet
(maye be bc its done during the
This commit is contained in:
Gregor Baues 2021-05-06 13:06:16 +02:00
parent 595b6bad93
commit 0b0744cc94
5 changed files with 93 additions and 28 deletions

View File

@ -38,6 +38,8 @@
#include <DccMQTT.h>
#include <Queue.h>
#include <ObjectPool.h>
#include <errno.h>
#include <limits.h>
// Variables
@ -46,10 +48,6 @@
DccMQTT DccMQTT::singleton;
auto mqtt = DccMQTT::get();
char topicName[MAXTBUF];
char topicMessage[MAXTMSG];
// char keyword[MAX_KEYWORD_LENGTH];
// pairing functions used for creating a client identifier
// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published
// shall be a admin message with a random number
@ -61,7 +59,6 @@ char topicMessage[MAXTMSG];
int cantorEncode(int a, int b)
return (((a + b) * (a + b + 1)) / 2) + b;
@ -96,9 +93,7 @@ static void array_to_string(byte array[], unsigned int len, char buffer[])
// callback when a message arrives from the broker; push cmd into the incommming queue
void mqttCallback(char *topic, byte *payload, unsigned int length)
topicName[0] = '\0';
topicMessage[0] = '\0';
strcpy(topicName, topic);
errno = 0;
switch (payload[0])
@ -119,14 +114,56 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
// Add the index of the pool item to the incomming queue
DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd);
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
case '(':
// MQTT Ctrl command
char buffer[30];
memset(buffer, 0, 30);
payload[length] = '\0';
DIAG(F("MQTT Ctrl Message arrived [%s]: [%s]"), topicName, (char *)payload);
char *tmp = (char *)payload + 1;
strlcpy(buffer, tmp, length);
buffer[length - 2] = '\0';
DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
auto distantid = strtol(buffer, NULL, 10);
if (errno == ERANGE || distantid > INT16_MAX)
DIAG(F("Invalid Handshake ID; must be in the range of int"));
if (distantid == 0)
DIAG(F("Invalid Handshake ID"));
// All is ok so set up the channel; MQTT Ctrl command
auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
auto topicid = cantorEncode(subscriberid, (int)distantid);
DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid);
// extract the number delivered from
auto clients = mqtt->getClients();
// we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one
clients[subscriberid] = {(int)distantid, subscriberid, topicid}; // add subscribertopic
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH];
mqtt->getSubscriberTopic(subscriberid, tbuffer);
auto ok = mqtt->subscribe(tbuffer);
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK":"NOK");
memset(buffer, 0, 30);
sprintf(buffer, "(%d,%d)", (int) distantid, topicid );
mqtt->publish(topic, buffer);
@ -208,9 +245,14 @@ void DccMQTT::connect()
// for the time being only one topic at the root which os the unique clientID from the MCU
// QoS is 0 by default
boolean DccMQTT::subscribe()
boolean DccMQTT::subscribe(char *topic)
return mqttClient.subscribe(clientID);
return mqttClient.subscribe(topic);
void DccMQTT::publish(char *topic, char *payload)
mqttClient.publish(topic, payload);
@ -258,11 +300,11 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
DIAG(F("MQTT Client ID : %s"), clientID);
connect(); // inital connection as well as reconnects
auto sub = DccMQTT::subscribe(); // set up all subscriptions
auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
mqttClient.publish(clientID, "Hello from DccEX");
// mqttClient.publish(clientID, "Hello from DccEX");
// sprintf_P(_csidMsg, csidfmt, DccMQTT::getDeviceID());
// mqttClient.publish(DccMQTT::topics[ADMIN], _csidMsg); // say hello to the broker and the API who listens to this topic

View File

@ -15,6 +15,7 @@
#include <Ethernet.h>
#include <Dns.h>
#include <ObjectPool.h>
#include <limits.h>
#define MAXPAYLOAD 64
@ -26,6 +27,7 @@
#define CLIENTIDSIZE 6 //
#define MAXRECONNECT 5 // reconnection tries before final failure
#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers
// Define Broker configurations; Values are provided in the following order
@ -89,10 +91,10 @@ typedef struct csmsg_t {
} csmsg_t;
typedef struct csmqttclient_t {
uint8_t subscriber;
uint8_t cs;
} csmqttclient_t
int distant; // random int number recieved from the subscriber
byte mqsocket; // mqtt socket = subscriberid provided by the cs
int topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands
} csmqttclient_t;
enum DccMQTTState
@ -111,10 +113,10 @@ private:
void setup(const FSH *id, MQTTBroker *broker);
void connect(); // (re)connects to the broker
boolean subscribe();
// static int cantorEncode(int a, int b);
// static void cantorDecode(int c, int *a, int *b);
// boolean subscribe();
EthernetClient ethClient; // TCP Client object for the MQ Connection
IPAddress server; // MQTT server object
PubSubClient mqttClient; // PubSub Endpoint for data exchange
@ -124,18 +126,21 @@ private:
Queue<int> in;
Queue<int> out;
char clientID[(CLIENTIDSIZE*2)+1];
char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID
csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients
uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital
// handshake in form of a random number
DccMQTTState mqState = INIT;
static DccMQTT *get() noexcept
return &singleton;
boolean subscribe(char *topic);
void publish(char *topic, char* payload);
bool isConfigured() { return mqState == CONFIGURED; };
bool isConnected() { return mqState == CONNECTED; };
@ -145,6 +150,23 @@ public:
Queue<int> *getIncomming() { return &in; };
Queue<int> *getOutgoing() { return &out; };
char *getClientID() { return clientID; };
uint8_t obtainSubscriberID(){
if ( subscriberid == UCHAR_MAX) {
return 0; // no more subscriber id available
return (++subscriberid);
// this could be calculated once forever at each new connect and be stored
// but to save space we calculate it at each publish
void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){
sprintf(tbuffer, "%s/%d", clientID, clients[subscriberid].topic);
csmqttclient_t *getClients() { return clients; };
void setup(); // called at setup in the main ino file
void loop();

View File

@ -11,7 +11,6 @@ class ObjectPool
// just make sure that we don't create a pool eating up all memory @compiletime
static_assert(length <= MAXPOOLSIZE);
struct item
T i;

test/mpub.sh Executable file
View File

@ -0,0 +1 @@
mosquitto_pub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -m "(255)"

test/msub.sh Executable file
View File

@ -0,0 +1 @@
mosquitto_sub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -k 600