mirror of
https://github.com/DCC-EX/CommandStation-EX.git
synced 2024-11-27 01:56:14 +01:00
529 lines
16 KiB
C++
529 lines
16 KiB
C++
|
#if __has_include("config.h")
|
||
|
#include "config.h"
|
||
|
#else
|
||
|
#warning config.h not found. Using defaults from config.example.h
|
||
|
#include "config.example.h"
|
||
|
#endif
|
||
|
#include "defines.h"
|
||
|
|
||
|
#include "MQTTInterface.h"
|
||
|
#include "MQTTBrokers.h"
|
||
|
#include "DCCTimer.h"
|
||
|
#include <CommandDistributor.h>
|
||
|
#include <errno.h>
|
||
|
#include <limits.h>
|
||
|
|
||
|
MQTTInterface *MQTTInterface::singleton = NULL;
|
||
|
|
||
|
long cantorEncode(long a, long b)
|
||
|
{
|
||
|
return (((a + b) * (a + b + 1)) / 2) + b;
|
||
|
}
|
||
|
|
||
|
void cantorDecode(int32_t c, int *a, int *b)
|
||
|
{
|
||
|
|
||
|
int w = floor((sqrt(8 * c + 1) - 1) / 2);
|
||
|
int t = (w * (w + 1)) / 2;
|
||
|
*b = c - t;
|
||
|
*a = w - *b;
|
||
|
}
|
||
|
|
||
|
void MQTTInterface::setup()
|
||
|
{
|
||
|
singleton = new MQTTInterface();
|
||
|
|
||
|
if (!singleton->connected)
|
||
|
{
|
||
|
singleton = NULL;
|
||
|
}
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT Interface instance: [%x] - Setup done"), singleton);
|
||
|
};
|
||
|
|
||
|
MQTTInterface::MQTTInterface()
|
||
|
{
|
||
|
|
||
|
this->connected = this->setupNetwork();
|
||
|
if (!this->connected)
|
||
|
{
|
||
|
DIAG(F("Network setup failed"));
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
this->setup(CSMQTTBROKER);
|
||
|
}
|
||
|
this->outboundRing = new RingStream(OUT_BOUND_SIZE);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* @brief MQTT Interface callback recieving all incomming messages from the PubSubClient
|
||
|
*
|
||
|
* @param topic
|
||
|
* @param payload
|
||
|
* @param length
|
||
|
*/
|
||
|
void mqttCallback(char *topic, byte *payload, unsigned int length)
|
||
|
{
|
||
|
MQTTInterface *mqtt = MQTTInterface::get();
|
||
|
auto clients = mqtt->getClients();
|
||
|
|
||
|
errno = 0;
|
||
|
payload[length] = '\0'; // make sure we have the string terminator in place
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT Callback:[%s] [%s] [%d] on interface [%x]"), topic, (char *)payload, length, mqtt);
|
||
|
switch (payload[0])
|
||
|
{
|
||
|
case '<':
|
||
|
{
|
||
|
const char s[2] = "/"; // topic delimiter is /
|
||
|
char *token;
|
||
|
byte mqsocket;
|
||
|
|
||
|
/* get the first token = ClientID */
|
||
|
token = strtok(topic, s);
|
||
|
/* get the second token = topicID */
|
||
|
token = strtok(NULL, s);
|
||
|
if (token == NULL)
|
||
|
{
|
||
|
DIAG(F("MQTT Can't identify sender #1; command send on wrong topic"));
|
||
|
return;
|
||
|
// don't do anything as we wont know where to send the results
|
||
|
// normally the topicid shall be valid as we only have subscribed to that one and nothing else
|
||
|
// comes here; The only issue is when recieveing on the open csid channel ( which stays open in order to
|
||
|
// able to accept other connections )
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
auto topicid = atoi(token);
|
||
|
// verify that there is a MQTT client with that topic id connected
|
||
|
bool isClient = false;
|
||
|
// check in the array of clients if we have one with the topicid
|
||
|
// start at 1 as 0 is not allocated as mqsocket
|
||
|
for (int i = 1; i <= mqtt->getClientSize(); i++)
|
||
|
// for (int i = 1; i <= subscriberid; i++)
|
||
|
{
|
||
|
if (clients[i].topic == topicid)
|
||
|
{
|
||
|
isClient = true;
|
||
|
mqsocket = i;
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
if (!isClient)
|
||
|
{
|
||
|
// no such client connected
|
||
|
DIAG(F("MQTT Can't identify sender #2; command send on wrong topic"));
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
// if we make it until here we dont even need to test the last "cmd" element from the topic as there is no
|
||
|
// subscription for anything else
|
||
|
|
||
|
// DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid);
|
||
|
|
||
|
// Prepare the DCC-EX command
|
||
|
csmsg_t tm; // topic message
|
||
|
|
||
|
if (length >= MAXPAYLOAD)
|
||
|
{
|
||
|
DIAG(F("MQTT Command too long (> [%d] characters)"), MAXPAYLOAD);
|
||
|
}
|
||
|
memset(tm.cmd, 0, MAXPAYLOAD); // Clean up the cmd buffer - should not be necessary
|
||
|
strlcpy(tm.cmd, (char *)payload, length + 1); // Message payload
|
||
|
tm.mqsocket = mqsocket; // On which socket did we recieve the mq message
|
||
|
int idx = mqtt->getPool()->setItem(tm); // Add the recieved command to the pool
|
||
|
|
||
|
if (idx == -1)
|
||
|
{
|
||
|
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
mqtt->getIncomming()->push(idx); // Add the index of the pool item to the incomming queue
|
||
|
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
|
||
|
break;
|
||
|
}
|
||
|
case 'm':
|
||
|
{
|
||
|
switch (payload[1])
|
||
|
{
|
||
|
case 'i':
|
||
|
{
|
||
|
char buffer[MAXPAYLOAD];
|
||
|
char *tmp = (char *)payload + 3;
|
||
|
strlcpy(buffer, tmp, length);
|
||
|
buffer[length - 4] = '\0';
|
||
|
// DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
|
||
|
|
||
|
auto distantid = strtol(buffer, NULL, 10);
|
||
|
|
||
|
if (errno == ERANGE || distantid > UCHAR_MAX)
|
||
|
{
|
||
|
DIAG(F("MQTT Invalid Handshake ID; must be between 0 and 255"));
|
||
|
return;
|
||
|
}
|
||
|
if (distantid == 0)
|
||
|
{
|
||
|
DIAG(F("MQTT Invalid Handshake ID"));
|
||
|
return;
|
||
|
}
|
||
|
// ---------------------------
|
||
|
// Create a new MQTT client
|
||
|
// ---------------------------
|
||
|
|
||
|
// check in the clients if the distantid has been set already somewhere
|
||
|
// if so we either have a new one with the same id then we have a collision -> publish a collision
|
||
|
// or its the same i.e; the message comming back as we are subscribed -> stop here
|
||
|
|
||
|
// All is ok so set up the channel; MQTT Ctrl command
|
||
|
|
||
|
auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
||
|
|
||
|
if (subscriberid == 0)
|
||
|
{
|
||
|
DIAG(F("MQTT no more connections are available"));
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
|
||
|
DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid);
|
||
|
// extract the number delivered from
|
||
|
// we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one
|
||
|
|
||
|
// initalize the new mqtt client object
|
||
|
clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available
|
||
|
|
||
|
auto sq = mqtt->getSubscriptionQueue();
|
||
|
sq->push(subscriberid);
|
||
|
|
||
|
return;
|
||
|
}
|
||
|
default:
|
||
|
{
|
||
|
// ignore
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
default:
|
||
|
{
|
||
|
// invalid command
|
||
|
DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID
|
||
|
*
|
||
|
* @param array array containing bytes
|
||
|
* @param len length of the array
|
||
|
* @param buffer buffer to which the string will be written; make sure the buffer has appropriate length
|
||
|
*/
|
||
|
static void array_to_string(byte array[], unsigned int len, char buffer[])
|
||
|
{
|
||
|
for (unsigned int i = 0; i < len; i++)
|
||
|
{
|
||
|
byte nib1 = (array[i] >> 4) & 0x0F;
|
||
|
byte nib2 = (array[i] >> 0) & 0x0F;
|
||
|
buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA;
|
||
|
buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA;
|
||
|
}
|
||
|
buffer[len * 2] = '\0';
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief Connect to the MQTT broker; Parameters for this function are defined in
|
||
|
* like the motoshield configurations there are mqtt broker configurations in config.h
|
||
|
*
|
||
|
* @param id Name provided to the broker configuration
|
||
|
* @param b MQTT broker object containing the main configuration parameters
|
||
|
*/
|
||
|
void MQTTInterface::setup(const FSH *id, MQTTBroker *b)
|
||
|
{
|
||
|
|
||
|
//Create the MQTT environment and establish inital connection to the Broker
|
||
|
broker = b;
|
||
|
|
||
|
DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
|
||
|
|
||
|
// initalize MQ Broker
|
||
|
mqttClient = new PubSubClient(broker->ip, broker->port, mqttCallback, ethClient);
|
||
|
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT Client created ok..."));
|
||
|
array_to_string(mac, CLIENTIDSIZE, clientID);
|
||
|
DIAG(F("MQTT Client ID : %s"), clientID);
|
||
|
connect(); // inital connection as well as reconnects
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief MQTT broker connection / reconnection
|
||
|
*
|
||
|
*/
|
||
|
void MQTTInterface::connect()
|
||
|
{
|
||
|
|
||
|
int reconnectCount = 0;
|
||
|
connectID[0] = '\0';
|
||
|
|
||
|
// Build the connect ID : Prefix + clientID
|
||
|
if (broker->prefix != nullptr)
|
||
|
{
|
||
|
strcpy_P(connectID, (const char *)broker->prefix);
|
||
|
}
|
||
|
strcat(connectID, clientID);
|
||
|
|
||
|
// Connect to the broker
|
||
|
DIAG(F("MQTT %s (re)connecting ..."), connectID);
|
||
|
while (!mqttClient->connected() && reconnectCount < MAXRECONNECT)
|
||
|
{
|
||
|
switch (broker->cType)
|
||
|
{
|
||
|
// no uid no pwd
|
||
|
case 6:
|
||
|
case 1:
|
||
|
{ // port(p), ip(i), domain(d),
|
||
|
if (mqttClient->connect(connectID))
|
||
|
{
|
||
|
DIAG(F("MQTT Broker connected ..."));
|
||
|
auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
|
||
|
mqState = CONNECTED;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state());
|
||
|
mqState = CONNECTION_FAILED;
|
||
|
reconnectCount++;
|
||
|
}
|
||
|
break;
|
||
|
}
|
||
|
// with uid passwd
|
||
|
case 5:
|
||
|
case 2:
|
||
|
{ // port(p), ip(i), domain(d), user(uid), pwd(pass),
|
||
|
break;
|
||
|
}
|
||
|
// with uid, passwd & prefix
|
||
|
case 4:
|
||
|
case 3:
|
||
|
{ // port(p), ip(i), domain(d), user(uid), pwd(pass), prefix(pfix)
|
||
|
// port(p), domain(d), user(uid), pwd(pass), prefix(pfix)
|
||
|
// mqttClient.connect(connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0))
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
if (reconnectCount == MAXRECONNECT)
|
||
|
{
|
||
|
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
|
||
|
mqState = CONNECTION_FAILED;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief for the time being only one topic at the root
|
||
|
* which is the unique clientID from the MCU
|
||
|
* QoS is 0 by default
|
||
|
*
|
||
|
* @param topic to subsribe to
|
||
|
* @return boolean true if successful false otherwise
|
||
|
*/
|
||
|
boolean MQTTInterface::subscribe(const char *topic)
|
||
|
{
|
||
|
auto res = mqttClient->subscribe(topic);
|
||
|
return res;
|
||
|
}
|
||
|
|
||
|
void MQTTInterface::publish(const char *topic, const char *payload)
|
||
|
{
|
||
|
mqttClient->publish(topic, payload);
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief Connect the Ethernet network;
|
||
|
*
|
||
|
* @return true if connections was successful
|
||
|
*/
|
||
|
bool MQTTInterface::setupNetwork()
|
||
|
{
|
||
|
|
||
|
// setup Ethernet connection first
|
||
|
|
||
|
DCCTimer::getSimulatedMacAddress(mac);
|
||
|
|
||
|
#ifdef IP_ADDRESS
|
||
|
Ethernet.begin(mac, IP_ADDRESS);
|
||
|
#else
|
||
|
if (Ethernet.begin(mac) == 0)
|
||
|
{
|
||
|
DIAG(F("Ethernet.begin FAILED"));
|
||
|
return false;
|
||
|
}
|
||
|
#endif
|
||
|
DIAG(F("Ethernet.begin OK."));
|
||
|
if (Ethernet.hardwareStatus() == EthernetNoHardware)
|
||
|
{
|
||
|
DIAG(F("Ethernet shield not found"));
|
||
|
return false;
|
||
|
}
|
||
|
if (Ethernet.linkStatus() == LinkOFF)
|
||
|
{
|
||
|
DIAG(F("Ethernet cable not connected"));
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address
|
||
|
DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]);
|
||
|
DIAG(F("Port:%d"), IP_PORT);
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief handle the incomming queue in the loop
|
||
|
*
|
||
|
*/
|
||
|
void inLoop(Queue<int> &in, ObjectPool<csmsg_t, MAXPOOLSIZE> &pool, RingStream *outboundRing)
|
||
|
{
|
||
|
|
||
|
bool state;
|
||
|
if (in.count() > 0)
|
||
|
{
|
||
|
// pop a command index from the incomming queue and get the command from the pool
|
||
|
int idx = in.pop();
|
||
|
csmsg_t *c = pool.getItem(idx, &state);
|
||
|
|
||
|
// execute the command and collect results
|
||
|
outboundRing->mark((uint8_t)c->mqsocket);
|
||
|
CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing);
|
||
|
outboundRing->commit();
|
||
|
|
||
|
// free the slot in the command pool
|
||
|
pool.returnItem(idx);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief handle the outgoing messages in the loop
|
||
|
*
|
||
|
*/
|
||
|
void outLoop(PubSubClient *mq)
|
||
|
{
|
||
|
// handle at most 1 outbound transmission
|
||
|
MQTTInterface *mqtt = MQTTInterface::get();
|
||
|
|
||
|
auto clients = mqtt->getClients();
|
||
|
auto outboundRing = mqtt->getRingStream();
|
||
|
|
||
|
int mqSocket = outboundRing->read();
|
||
|
if (mqSocket >= 0) // mqsocket / clientid can't be 0 ....
|
||
|
{
|
||
|
int count = outboundRing->count();
|
||
|
char buffer[MAXTSTR];
|
||
|
buffer[0] = '\0';
|
||
|
sprintf(buffer, "%s/%d/result", mqtt->getClientID(), (int)clients[mqSocket].topic);
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT publish to mqSocket=%d, count=:%d on topic %s"), mqSocket, count, buffer);
|
||
|
|
||
|
if (mq->beginPublish(buffer, count, false))
|
||
|
{
|
||
|
for (; count > 0; count--)
|
||
|
{
|
||
|
mq->write(outboundRing->read());
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
DIAG(F("MQTT error start publishing result)"));
|
||
|
};
|
||
|
|
||
|
if (!mq->endPublish())
|
||
|
{
|
||
|
DIAG(F("MQTT error finalizing published result)"));
|
||
|
};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* @brief check if there are new subscribers connected and create the channels
|
||
|
*
|
||
|
* @param sq if the callback captured a client there will be an entry in the sq with the subscriber number
|
||
|
* @param clients the clients array where we find the info to setup the subsciptions and print out the publish topics for info
|
||
|
*/
|
||
|
void checkSubscribers(Queue<int> &sq, csmqttclient_t *clients)
|
||
|
{
|
||
|
MQTTInterface *mqtt = MQTTInterface::get();
|
||
|
if (sq.count() > 0)
|
||
|
{
|
||
|
// new subscriber
|
||
|
auto s = sq.pop();
|
||
|
|
||
|
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTSTR];
|
||
|
sprintf(tbuffer, "%s/%ld/cmd", mqtt->getClientID(), clients[s].topic);
|
||
|
|
||
|
auto ok = mqtt->subscribe(tbuffer);
|
||
|
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
||
|
|
||
|
// send the topicid on which the CS will listen for commands to the MQTT client on the root topic
|
||
|
char buffer[30];
|
||
|
memset(buffer, 0, 30);
|
||
|
sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic);
|
||
|
if (Diag::MQTT)
|
||
|
DIAG(F("MQTT Publishing: [%s] to [%s]"), buffer, mqtt->getClientID());
|
||
|
mqtt->publish(mqtt->getClientID(), buffer);
|
||
|
|
||
|
// on the cs side all is set and we declare that the cs is open for business
|
||
|
clients[s].open = true;
|
||
|
|
||
|
// we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there
|
||
|
// in the < case we should test that we got the command on the right topic ...
|
||
|
DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer);
|
||
|
memset(buffer, 0, 30);
|
||
|
sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), clients[s].topic);
|
||
|
DIAG(F("MQTT CS is publishing return information to [%s]"), buffer);
|
||
|
memset(buffer, 0, 30);
|
||
|
sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), clients[s].topic);
|
||
|
DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void MQTTInterface::loop()
|
||
|
{
|
||
|
|
||
|
if (!singleton)
|
||
|
return;
|
||
|
singleton->loop2();
|
||
|
}
|
||
|
|
||
|
void MQTTInterface::loop2()
|
||
|
{
|
||
|
// Connection impossible so just don't do anything
|
||
|
if (singleton->mqState == CONNECTION_FAILED)
|
||
|
{
|
||
|
DIAG(F("MQTT connection failed..."));
|
||
|
return;
|
||
|
}
|
||
|
if (!mqttClient->connected())
|
||
|
{
|
||
|
DIAG(F("MQTT no connection trying to reconnect ..."));
|
||
|
connect();
|
||
|
}
|
||
|
if (!mqttClient->loop())
|
||
|
{
|
||
|
DIAG(F("mqttClient returned with error; state: %d"), mqttClient->state());
|
||
|
return;
|
||
|
};
|
||
|
|
||
|
checkSubscribers(subscriberQueue, clients);
|
||
|
inLoop(in, pool, outboundRing);
|
||
|
outLoop(mqttClient);
|
||
|
}
|