1
0
mirror of https://github.com/DCC-EX/CommandStation-EX.git synced 2025-04-10 15:30:12 +02:00

start adding MQTT client channel implementation

This commit is contained in:
Gregor Baues 2021-05-05 11:23:59 +02:00
parent c042240019
commit 595b6bad93
4 changed files with 140 additions and 28 deletions

View File

@ -50,7 +50,29 @@ char topicName[MAXTBUF];
char topicMessage[MAXTMSG];
// char keyword[MAX_KEYWORD_LENGTH];
// pairing functions used for creating a client identifier
// when a external system connects via MQ to the CS i.e. subscribes to the main channel the first message to be published
// shall be a admin message with a random number
// the cs will based on a counter use a second number to create the cantor encoding of both numbers and publish the cantor code
// this message will be seen by all throttles and they can decode the number which provides the first number they send and the
// second number to be used as tpoic for the external system from then on. The CS will recieve on all topics the commands and
// during processing then send the replies to the topic from which the command was recieved.
// Thus the main channel shall not be used for any p2p coms ev just for broadcast from the CS to the subscribed clients
int cantorEncode(int a, int b)
{
return (((a + b) * (a + b + 1)) / 2) + b;
}
void cantorDecode(int c, int *a, int *b)
{
int w = floor((sqrt(8 * c + 1) - 1) / 2);
int t = (w * (w + 1)) / 2;
*b = c - t;
*a = w - *b;
}
/**
* @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID
@ -78,20 +100,43 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
topicMessage[0] = '\0';
strcpy(topicName, topic);
auto pool = mqtt->getPool();
auto q = mqtt->getIncomming();
switch (payload[0])
{
case '<':
{
// DCC-EX command
auto pool = mqtt->getPool();
auto q = mqtt->getIncomming();
csmsg_t tm;
strlcpy(tm.cmd, (char *)payload, length + 1);
// Add the recieved command to the pool
int idx = pool->setItem(tm);
if ( idx == -1) {
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
return;
csmsg_t tm;
strlcpy(tm.cmd, (char *)payload, length + 1);
// Add the recieved command to the pool
int idx = pool->setItem(tm);
if (idx == -1)
{
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
return;
}
// Add the index of the pool item to the incomming queue
q->push(idx);
DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd);
break;
}
case '(':
{
// MQTT Ctrl command
payload[length] = '\0';
DIAG(F("MQTT Ctrl Message arrived [%s]: [%s]"), topicName, (char *)payload);
break;
}
default:
{
// invalid command
payload[length] = '\0';
DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload);
break;
}
}
// Add the index of the pool item to the incomming queue
q->push(idx);
DIAG(F("MQTT Message arrived [%s]: [%s]"), topicName, tm.cmd);
}
/**
@ -105,8 +150,9 @@ void DccMQTT::connect()
connectID[0] = '\0';
int reconnectCount = 0;
if(broker->prefix != nullptr) {
if (broker->prefix != nullptr)
{
char tmp[20];
strcpy_P(tmp, (const char *)broker->prefix);
connectID[0] = '\0';
@ -118,7 +164,7 @@ void DccMQTT::connect()
DIAG(F("MQTT %s (re)connecting ..."), connectID);
// Build the connect ID : Prefix + clientID
while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
{
DIAG(F("Attempting MQTT Broker connection[%d]..."), broker->cType);
switch (broker->cType)
@ -162,8 +208,9 @@ while (!mqttClient.connected() && reconnectCount < MAXRECONNECT)
// for the time being only one topic at the root which os the unique clientID from the MCU
// QoS is 0 by default
boolean DccMQTT::subscribe() {
return mqttClient.subscribe(clientID);
boolean DccMQTT::subscribe()
{
return mqttClient.subscribe(clientID);
}
/**
@ -210,10 +257,10 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
array_to_string(mqbid, CLIENTIDSIZE, clientID);
DIAG(F("MQTT Client ID : %s"), clientID);
connect(); // inital connection as well as reconnects
auto sub = DccMQTT::subscribe(); // set up all subscriptions
connect(); // inital connection as well as reconnects
auto sub = DccMQTT::subscribe(); // set up all subscriptions
DIAG(F("MQTT subscriptons %s..."), sub ? "ok":"failed");
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
mqttClient.publish(clientID, "Hello from DccEX");
@ -228,12 +275,9 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
// mqttDccExParser = p;
}
void DccMQTT::loop()
{
// DccTelemetry::deltaT(1);
if (!mqttClient.connected())
{
connect();
@ -242,9 +286,30 @@ void DccMQTT::loop()
{
DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state());
};
// read incomming queue for processing; one per loop
bool state;
if (in.count() > 0)
{
auto idx = in.peek();
auto c = pool.getItem(in.pop(), &state);
DIAG(F("MQTT Processing pool: %d with command: %s"), idx, c->cmd);
}
// read outgoing queue for publishing replies; one per loop
if (out.count() > 0)
{
auto m = pool.getItem(out.pop(), &state);
DIAG(F("MQTT Publish reply from command %s"), m->cmd);
}
// DccMQTTProc::loop(); //!< give time to the command processor to handle msg ..
// take a command from the incomming queue
// execute it
// store the results in the outgoing queue
// DccMQTT::publish(); //!< publish waiting messages from the outgoing queue
// DccTelemetry::deltaT(1);
// if there is someting in the outgoing queue publish on response
}
// #include <avr/pgmspace.h> // for PROGMEM use

View File

@ -22,9 +22,10 @@
#define MAXTBUF 50 //!< max length of the buffer for building the topic name ;to be checked
#define MAXTMSG 120 //!< max length of the messages for a topic ;to be checked PROGMEM ?
#define MAXTSTR 30 //!< max length of a topic string
#define MAXCONNECTID 40
#define CLIENTIDSIZE 6
#define MAXRECONNECT 5
#define MAXCONNECTID 40 // Broker connection id length incl possible prefixes
#define CLIENTIDSIZE 6 //
#define MAXRECONNECT 5 // reconnection tries before final failure
#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers
// Define Broker configurations; Values are provided in the following order
// MQTT_BROKER_PORT 9883
@ -79,10 +80,20 @@ struct MQTTBroker
};
};
/**
* @brief dcc-ex command as recieved via MQ
*
*/
typedef struct csmsg_t {
char cmd[MAXPAYLOAD];
} csmsg_t;
typedef struct csmqttclient_t {
uint8_t subscriber;
uint8_t cs;
} csmqttclient_t
enum DccMQTTState
{
INIT,
@ -100,7 +111,9 @@ private:
void setup(const FSH *id, MQTTBroker *broker);
void connect(); // (re)connects to the broker
boolean subscribe();
boolean subscribe();
// static int cantorEncode(int a, int b);
// static void cantorDecode(int c, int *a, int *b);
EthernetClient ethClient; // TCP Client object for the MQ Connection
IPAddress server; // MQTT server object
@ -115,6 +128,9 @@ private:
DccMQTTState mqState = INIT;
public:
static DccMQTT *get() noexcept
{

16
DccMQTTProc.cpp Normal file
View File

@ -0,0 +1,16 @@
/**
* @file DccMQTTProc.cpp
* @author Gregor Baues (gr2bba@gmail.com)
* @brief Eexecuting DCC commands recieved through MQTT.
* @version 0.1
* @date 08-07-2020
*
* @copyright Copyright (c) 2021
*
* Licenced under GPLv3
*/
#include <Arduino.h>
#include <Diag.h>

15
DccMQTTProc.h Normal file
View File

@ -0,0 +1,15 @@
#ifndef _DccMQTTProc_h_
#define _DccMQTTProc_h_
class DccMQTTProc
{
private:
public:
static void loop();
DccMQTTProc() = default;
~DccMQTTProc() = default;
};
#endif