mirror of
https://github.com/DCC-EX/CommandStation-EX.git
synced 2025-02-19 23:46:02 +01:00
working with multiple clients and
the ringstream for processing
This commit is contained in:
parent
508b1fcfce
commit
35d81cd848
161
DccMQTT.cpp
161
DccMQTT.cpp
@ -49,6 +49,7 @@
|
|||||||
#include <Dns.h>
|
#include <Dns.h>
|
||||||
#include <DCCTimer.h>
|
#include <DCCTimer.h>
|
||||||
#include <DccMQTT.h>
|
#include <DccMQTT.h>
|
||||||
|
#include <CommandDistributor.h>
|
||||||
#include <Queue.h>
|
#include <Queue.h>
|
||||||
#include <ObjectPool.h>
|
#include <ObjectPool.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
@ -62,6 +63,9 @@
|
|||||||
DccMQTT DccMQTT::singleton;
|
DccMQTT DccMQTT::singleton;
|
||||||
auto mqtt = DccMQTT::get();
|
auto mqtt = DccMQTT::get();
|
||||||
|
|
||||||
|
// The RingBuffer size / no Withrottle over MQ so we don't need the huge buffer as for ethernet
|
||||||
|
#define OUT_BOUND_SIZE 256
|
||||||
|
|
||||||
// pairing functions used for creating a client identifier
|
// pairing functions used for creating a client identifier
|
||||||
// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published
|
// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published
|
||||||
// shall be a admin message with a random number
|
// shall be a admin message with a random number
|
||||||
@ -71,7 +75,7 @@ auto mqtt = DccMQTT::get();
|
|||||||
// during processing then send the replies to the topic from which the command was recieved.
|
// during processing then send the replies to the topic from which the command was recieved.
|
||||||
// Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients
|
// Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients
|
||||||
|
|
||||||
int32_t cantorEncode(int a, int b)
|
long cantorEncode(long a, long b)
|
||||||
{
|
{
|
||||||
return (((a + b) * (a + b + 1)) / 2) + b;
|
return (((a + b) * (a + b + 1)) / 2) + b;
|
||||||
}
|
}
|
||||||
@ -109,28 +113,76 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
payload[length] = '\0'; // make sure we have the string terminator in place
|
payload[length] = '\0'; // make sure we have the string terminator in place
|
||||||
DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length);
|
|
||||||
|
// DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length);
|
||||||
|
|
||||||
switch (payload[0])
|
switch (payload[0])
|
||||||
{
|
{
|
||||||
case '<':
|
case '<':
|
||||||
{
|
{
|
||||||
// DCC-EX command
|
|
||||||
auto pool = mqtt->getPool();
|
|
||||||
auto q = mqtt->getIncomming();
|
|
||||||
|
|
||||||
csmsg_t tm;
|
const char s[2] = "/"; // topic delimiter is /
|
||||||
strlcpy(tm.cmd, (char *)payload, length + 1);
|
char *token;
|
||||||
// Add the recieved command to the pool
|
byte mqsocket;
|
||||||
int idx = pool->setItem(tm);
|
|
||||||
|
/* get the first token = ClientID */
|
||||||
|
token = strtok(topic, s);
|
||||||
|
/* get the second token = topicID */
|
||||||
|
token = strtok(NULL, s);
|
||||||
|
if ( token == NULL ) {
|
||||||
|
DIAG(F("Can't identify sender #1; command send on wrong topic"));
|
||||||
|
return;
|
||||||
|
// don't do anything as we wont know where to send the results
|
||||||
|
// normally the topicid shall be valid as we only have subscribed to that one and nothing else
|
||||||
|
// comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to
|
||||||
|
// able to accept other connections )
|
||||||
|
} else {
|
||||||
|
auto topicid = atoi(token);
|
||||||
|
// verify that there is a MQTT client with that topic id connected
|
||||||
|
auto clients = mqtt->getClients();
|
||||||
|
bool isClient = false;
|
||||||
|
// check in the array of clients if we have one with the topicid
|
||||||
|
// start at 1 as 0 is not allocated as mqsocket
|
||||||
|
for( int i = 1; i <= mqtt->getClientSize(); i++ ) {
|
||||||
|
if (clients[i].topic == topicid) {
|
||||||
|
isClient = true;
|
||||||
|
mqsocket = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!isClient) {
|
||||||
|
// no such client connected
|
||||||
|
DIAG(F("Can't identify sender #2; command send on wrong topic"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// if we make it until here we dont even need to test the last "cmd" element from the topic as there is no
|
||||||
|
// subscription for anything else
|
||||||
|
|
||||||
|
// DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid);
|
||||||
|
|
||||||
|
// Prepare the DCC-EX command
|
||||||
|
auto pool = mqtt->getPool(); // message pool
|
||||||
|
auto q = mqtt->getIncomming(); // incomming queue
|
||||||
|
|
||||||
|
csmsg_t tm; // topic message
|
||||||
|
|
||||||
|
|
||||||
|
strlcpy(tm.cmd, (char *)payload, length + 1); // message payload
|
||||||
|
tm.mqsocket = mqsocket; // on which socket did we recieve the mq message
|
||||||
|
int idx = pool->setItem(tm); // Add the recieved command to the pool
|
||||||
|
|
||||||
if (idx == -1)
|
if (idx == -1)
|
||||||
{
|
{
|
||||||
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
|
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Add the index of the pool item to the incomming queue
|
|
||||||
q->push(idx);
|
q->push(idx); // Add the index of the pool item to the incomming queue
|
||||||
|
|
||||||
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
|
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'm':
|
case 'm':
|
||||||
@ -143,13 +195,11 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
|
|
||||||
char buffer[30];
|
char buffer[30];
|
||||||
memset(buffer, 0, 30);
|
memset(buffer, 0, 30);
|
||||||
|
|
||||||
|
|
||||||
char *tmp = (char *)payload + 3;
|
char *tmp = (char *)payload + 3;
|
||||||
strlcpy(buffer, tmp, length);
|
strlcpy(buffer, tmp, length);
|
||||||
buffer[length - 4] = '\0';
|
buffer[length - 4] = '\0';
|
||||||
|
|
||||||
DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
|
// DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
|
||||||
|
|
||||||
auto distantid = strtol(buffer, NULL, 10);
|
auto distantid = strtol(buffer, NULL, 10);
|
||||||
|
|
||||||
@ -163,12 +213,16 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
DIAG(F("Invalid Handshake ID"));
|
DIAG(F("Invalid Handshake ID"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// ---------------------------
|
||||||
|
// Create a new MQTT client
|
||||||
|
// ---------------------------
|
||||||
|
|
||||||
// check in the clients if the distantid has been set already somewhere
|
// check in the clients if the distantid has been set already somewhere
|
||||||
// if so we either have a new one with the same id then we have a collision -> publish a collision
|
// if so we either have a new one with the same id then we have a collision -> publish a collision
|
||||||
// or its the same i.e; the message comming back as we are subscribed -> stop here
|
// or its the same i.e; the message comming back as we are subscribed -> stop here
|
||||||
|
|
||||||
// All is ok so set up the channel; MQTT Ctrl command
|
// All is ok so set up the channel; MQTT Ctrl command
|
||||||
|
|
||||||
auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
auto subscriberid = DccMQTT::get()->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
||||||
|
|
||||||
if(subscriberid == 0) {
|
if(subscriberid == 0) {
|
||||||
@ -176,25 +230,46 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto topicid = cantorEncode(subscriberid, (int)distantid);
|
// long a = subscriberid;
|
||||||
|
// long b = distantid;
|
||||||
|
// auto topicid = cantorEncode(a, b);
|
||||||
|
|
||||||
|
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
|
||||||
DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid);
|
DIAG(F("MQTT Ctrl Message arrived [%s] : subscriber [%d] : distant [%d] : topic: [%d]"), buffer, subscriberid, (int)distantid, topicid);
|
||||||
// extract the number delivered from
|
// extract the number delivered from
|
||||||
// we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one
|
// we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one
|
||||||
|
|
||||||
clients[subscriberid] = {(int)distantid, subscriberid, topicid, true}; // add subscribertopic
|
// initalize the new mqtt client object
|
||||||
|
clients[subscriberid] = {(int)distantid, subscriberid, topicid, true};
|
||||||
|
|
||||||
|
// add/subcribe to the topic for listening on cmds recieved via the channel for the client with
|
||||||
|
// subscriberid as identifier
|
||||||
|
|
||||||
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH];
|
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTOPICLENGTH];
|
||||||
mqtt->getSubscriberTopic(subscriberid, tbuffer);
|
mqtt->getSubscriberTopic(subscriberid, tbuffer);
|
||||||
auto ok = mqtt->subscribe(tbuffer);
|
auto ok = mqtt->subscribe(tbuffer);
|
||||||
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
||||||
|
|
||||||
|
// send the topicid on which the CS will listen for commands to the MQTT client on the root topic
|
||||||
memset(buffer, 0, 30);
|
memset(buffer, 0, 30);
|
||||||
sprintf(buffer, "mc(%d,%ld)", (int)distantid, (long) topicid);
|
sprintf(buffer, "mc(%d,%ld)", (int)distantid, (long) topicid);
|
||||||
DIAG(F("Publishing: [%s] to [%s]"), buffer, mqtt->getClientID());
|
DIAG(F("Publishing: [%s] to [%s]"), buffer, mqtt->getClientID());
|
||||||
mqtt->publish(mqtt->getClientID(), buffer);
|
mqtt->publish(mqtt->getClientID(), buffer);
|
||||||
|
|
||||||
|
// on the cs side all is set and we declare that the cs is open for business
|
||||||
clients[subscriberid].open = true;
|
clients[subscriberid].open = true;
|
||||||
// we are done
|
|
||||||
|
// we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there
|
||||||
|
// in the < case we should test that we got the command on the right topic ...
|
||||||
|
|
||||||
|
DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer);
|
||||||
|
memset(buffer, 0, 30);
|
||||||
|
sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), topicid);
|
||||||
|
DIAG(F("MQTT CS is publishing return information to [%s]"), buffer);
|
||||||
|
memset(buffer, 0, 30);
|
||||||
|
sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), topicid);
|
||||||
|
DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -221,24 +296,19 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
void DccMQTT::connect()
|
void DccMQTT::connect()
|
||||||
{
|
{
|
||||||
|
|
||||||
char *connectID = new char[MAXCONNECTID];
|
|
||||||
connectID[0] = '\0';
|
|
||||||
|
|
||||||
int reconnectCount = 0;
|
int reconnectCount = 0;
|
||||||
|
|
||||||
|
// Build the connect ID : Prefix + clientID
|
||||||
if (broker->prefix != nullptr)
|
if (broker->prefix != nullptr)
|
||||||
{
|
{
|
||||||
char tmp[20];
|
connectID[0] ='\0';
|
||||||
strcpy_P(tmp, (const char *)broker->prefix);
|
strcpy_P(connectID, (const char *)broker->prefix);
|
||||||
connectID[0] = '\0';
|
|
||||||
strcat(connectID, tmp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
strcat(connectID, clientID);
|
strcat(connectID, clientID);
|
||||||
|
|
||||||
DIAG(F("MQTT %s (re)connecting ..."), connectID);
|
|
||||||
// Build the connect ID : Prefix + clientID
|
|
||||||
|
|
||||||
|
// Connect to the broker
|
||||||
|
DIAG(F("MQTT %s (re)connecting ..."), connectID);
|
||||||
while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
|
while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
|
||||||
{
|
{
|
||||||
DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType);
|
DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType);
|
||||||
@ -342,7 +412,7 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
|
|||||||
|
|
||||||
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
|
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
|
||||||
|
|
||||||
// mqttClient.publish(clientID, "Hello from DccEX");
|
outboundRing = new RingStream(OUT_BOUND_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DccMQTT::loop()
|
void DccMQTT::loop()
|
||||||
@ -363,15 +433,34 @@ void DccMQTT::loop()
|
|||||||
{
|
{
|
||||||
auto idx = in.peek();
|
auto idx = in.peek();
|
||||||
auto c = pool.getItem(in.pop(), &state);
|
auto c = pool.getItem(in.pop(), &state);
|
||||||
DIAG(F("MQTT Processing pool: %d with command: %s"), idx, c->cmd);
|
DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket);
|
||||||
|
outboundRing->mark((uint8_t)c->mqsocket);
|
||||||
|
DIAG(F("#1"));
|
||||||
|
CommandDistributor::parse(c->mqsocket,(byte *)c->cmd,outboundRing);
|
||||||
|
DIAG(F("#2"));
|
||||||
|
outboundRing->commit();
|
||||||
|
DIAG(F("#3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle at most 1 outbound transmission
|
||||||
|
int socketOut=outboundRing->read();
|
||||||
|
if (socketOut>=0) {
|
||||||
|
int count=outboundRing->count();
|
||||||
|
DIAG(F("MQTT publish to mqsocket=%d, count=:%d"), socketOut,count);
|
||||||
|
|
||||||
|
// for(;count>0;count--) clients[socketOut].write(outboundRing->read());
|
||||||
|
|
||||||
|
for(;count>0;count--) Serial.print((char) outboundRing->read());
|
||||||
|
|
||||||
|
// clients[socketOut].flush(); //maybe
|
||||||
|
}
|
||||||
|
|
||||||
// read outgoing queue for publishing replies; one per loop
|
// read outgoing queue for publishing replies; one per loop
|
||||||
if (out.count() > 0)
|
// if (out.count() > 0)
|
||||||
{
|
// {
|
||||||
auto m = pool.getItem(out.pop(), &state);
|
// auto m = pool.getItem(out.pop(), &state);
|
||||||
DIAG(F("MQTT Publish reply from command %s"), m->cmd);
|
// DIAG(F("MQTT Publish reply from command %s"), m->cmd);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// DccMQTTProc::loop(); //!< give time to the command processor to handle msg ..
|
// DccMQTTProc::loop(); //!< give time to the command processor to handle msg ..
|
||||||
// take a command from the incomming queue
|
// take a command from the incomming queue
|
||||||
|
63
DccMQTT.h
63
DccMQTT.h
@ -87,14 +87,15 @@ struct MQTTBroker
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
typedef struct csmsg_t {
|
typedef struct csmsg_t {
|
||||||
char cmd[MAXPAYLOAD];
|
char cmd[MAXPAYLOAD]; // recieved command message
|
||||||
|
byte mqsocket; // from which mqsocket / subscriberid
|
||||||
} csmsg_t;
|
} csmsg_t;
|
||||||
|
|
||||||
typedef struct csmqttclient_t {
|
typedef struct csmqttclient_t {
|
||||||
int distant; // random int number recieved from the subscriber
|
int distant; // random int number recieved from the subscriber
|
||||||
byte mqsocket; // mqtt socket = subscriberid provided by the cs
|
byte mqsocket; // mqtt socket = subscriberid provided by the cs
|
||||||
int32_t topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands
|
int32_t topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands
|
||||||
bool open; // true as soon as we have send the id to the mq broker for the client to pickup
|
bool open; // true as soon as we have send the id to the mq broker for the client to pickup
|
||||||
} csmqttclient_t;
|
} csmqttclient_t;
|
||||||
|
|
||||||
enum DccMQTTState
|
enum DccMQTTState
|
||||||
@ -107,32 +108,31 @@ enum DccMQTTState
|
|||||||
class DccMQTT
|
class DccMQTT
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
static DccMQTT singleton;
|
// Methods
|
||||||
DccMQTT() = default;
|
DccMQTT() = default;
|
||||||
DccMQTT(const DccMQTT &); // non construction-copyable
|
DccMQTT(const DccMQTT &); // non construction-copyable
|
||||||
DccMQTT &operator=(const DccMQTT &); // non copyable
|
DccMQTT &operator=(const DccMQTT &); // non copyable
|
||||||
|
|
||||||
void setup(const FSH *id, MQTTBroker *broker);
|
void setup(const FSH *id, MQTTBroker *broker);
|
||||||
void connect(); // (re)connects to the broker
|
void connect(); // (re)connects to the broker
|
||||||
// boolean subscribe();
|
// Members
|
||||||
|
static DccMQTT singleton;
|
||||||
|
EthernetClient ethClient; // TCP Client object for the MQ Connection
|
||||||
|
IPAddress server; // MQTT server object
|
||||||
EthernetClient ethClient; // TCP Client object for the MQ Connection
|
PubSubClient mqttClient; // PubSub Endpoint for data exchange
|
||||||
IPAddress server; // MQTT server object
|
MQTTBroker *broker; // Broker configuration object as set in config.h
|
||||||
PubSubClient mqttClient; // PubSub Endpoint for data exchange
|
|
||||||
MQTTBroker *broker; // Broker configuration object as set in config.h
|
|
||||||
|
|
||||||
ObjectPool<csmsg_t,MAXPOOLSIZE> pool;
|
ObjectPool<csmsg_t,MAXPOOLSIZE> pool; // Pool of commands recieved for the CS
|
||||||
Queue<int> in;
|
Queue<int> in; // Queue of indexes into the pool according to incomming cmds
|
||||||
Queue<int> out;
|
|
||||||
|
|
||||||
char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID
|
char clientID[(CLIENTIDSIZE*2)+1]; // unique ID of the commandstation; not to confused with the connectionID
|
||||||
csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients
|
csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients
|
||||||
|
char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker
|
||||||
uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital
|
uint8_t subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection
|
||||||
// handshake in form of a random number
|
|
||||||
DccMQTTState mqState = INIT;
|
DccMQTTState mqState = INIT;
|
||||||
|
RingStream *outboundRing;
|
||||||
|
char buffer[MAXTMSG]; // temp buffer for manipulating strings / messages
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static DccMQTT *get() noexcept
|
static DccMQTT *get() noexcept
|
||||||
@ -149,10 +149,13 @@ public:
|
|||||||
|
|
||||||
ObjectPool<csmsg_t,MAXPOOLSIZE> *getPool() { return &pool; };
|
ObjectPool<csmsg_t,MAXPOOLSIZE> *getPool() { return &pool; };
|
||||||
Queue<int> *getIncomming() { return ∈ };
|
Queue<int> *getIncomming() { return ∈ };
|
||||||
Queue<int> *getOutgoing() { return &out; };
|
|
||||||
|
|
||||||
char *getClientID() { return clientID; };
|
char *getClientID() { return clientID; };
|
||||||
|
uint8_t getClientSize() { return subscriberid; }
|
||||||
|
|
||||||
|
// initalized to 0 so that the first id comming back is 1
|
||||||
|
// index 0 in the clients array is not used therefore
|
||||||
|
//! improvement here to be done to save some bytes
|
||||||
uint8_t obtainSubscriberID(){
|
uint8_t obtainSubscriberID(){
|
||||||
if ( subscriberid == MAXMQTTCONNECTIONS) {
|
if ( subscriberid == MAXMQTTCONNECTIONS) {
|
||||||
return 0; // no more subscriber id available
|
return 0; // no more subscriber id available
|
||||||
@ -164,7 +167,7 @@ public:
|
|||||||
// but to save space we calculate it at each publish
|
// but to save space we calculate it at each publish
|
||||||
|
|
||||||
void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){
|
void getSubscriberTopic( uint8_t subscriberid, char *tbuffer ){
|
||||||
sprintf(tbuffer, "%s/%ld", clientID, (long) clients[subscriberid].topic);
|
sprintf(tbuffer, "%s/%ld/cmd", clientID, (long) clients[subscriberid].topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
csmqttclient_t *getClients() { return clients; };
|
csmqttclient_t *getClients() { return clients; };
|
||||||
|
Loading…
Reference in New Issue
Block a user