mirror of
https://github.com/DCC-EX/CommandStation-EX.git
synced 2025-02-22 00:36:04 +01:00
Minor updates/cleanup
This commit is contained in:
parent
c3abb0018d
commit
428628f6f0
@ -74,7 +74,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
DIAG(F("MQTT Callback:[%s] [%s] [%d] on interface [%x]"), topic, (char *)payload, length, mqtt);
|
DIAG(F("MQTT Callback:[%s] [%s] [%d] on interface [%x]"), topic, (char *)payload, length, mqtt);
|
||||||
switch (payload[0])
|
switch (payload[0])
|
||||||
{
|
{
|
||||||
case '<':
|
case '<': // Recieved a DCC-EX Command
|
||||||
{
|
{
|
||||||
const char s[2] = "/"; // topic delimiter is /
|
const char s[2] = "/"; // topic delimiter is /
|
||||||
char *token;
|
char *token;
|
||||||
@ -120,8 +120,6 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
// if we make it until here we dont even need to test the last "cmd" element from the topic as there is no
|
// if we make it until here we dont even need to test the last "cmd" element from the topic as there is no
|
||||||
// subscription for anything else
|
// subscription for anything else
|
||||||
|
|
||||||
// DIAG(F("MQTT Message arrived on [%s]: [%d]"), buf, topicid);
|
|
||||||
|
|
||||||
// Prepare the DCC-EX command
|
// Prepare the DCC-EX command
|
||||||
csmsg_t tm; // topic message
|
csmsg_t tm; // topic message
|
||||||
|
|
||||||
@ -146,16 +144,17 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
|
DIAG(F("MQTT Message arrived [%s]: [%s]"), topic, tm.cmd);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'm':
|
case 'm': // Recieved an MQTT Connection management message
|
||||||
{
|
{
|
||||||
switch (payload[1])
|
switch (payload[1])
|
||||||
{
|
{
|
||||||
case 'i':
|
case 'i': // Inital handshake message to create the tunnel
|
||||||
{
|
{
|
||||||
char buffer[MAXPAYLOAD];
|
char buffer[MAXPAYLOAD];
|
||||||
char *tmp = (char *)payload + 3;
|
char *tmp = (char *)payload + 3;
|
||||||
strlcpy(buffer, tmp, length);
|
strlcpy(buffer, tmp, length);
|
||||||
buffer[length - 4] = '\0';
|
buffer[length - 4] = '\0';
|
||||||
|
|
||||||
// DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
|
// DIAG(F("MQTT buffer %s - %s - %s - %d"), payload, tmp, buffer, length);
|
||||||
|
|
||||||
auto distantid = strtol(buffer, NULL, 10);
|
auto distantid = strtol(buffer, NULL, 10);
|
||||||
@ -170,15 +169,8 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
DIAG(F("MQTT Invalid Handshake ID"));
|
DIAG(F("MQTT Invalid Handshake ID"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// ---------------------------
|
|
||||||
// Create a new MQTT client
|
// Create a new MQTT client
|
||||||
// ---------------------------
|
|
||||||
|
|
||||||
// check in the clients if the distantid has been set already somewhere
|
|
||||||
// if so we either have a new one with the same id then we have a collision -> publish a collision
|
|
||||||
// or its the same i.e; the message comming back as we are subscribed -> stop here
|
|
||||||
|
|
||||||
// All is ok so set up the channel; MQTT Ctrl command
|
|
||||||
|
|
||||||
auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
||||||
|
|
||||||
@ -190,10 +182,8 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
|
|
||||||
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
|
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
|
||||||
DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid);
|
DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid);
|
||||||
// extract the number delivered from
|
|
||||||
// we need to check if the id we got from the client has been used allready and if yes reject and ask for a different one
|
|
||||||
|
|
||||||
// initalize the new mqtt client object
|
// extract the number delivered from & initalize the new mqtt client object
|
||||||
clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available
|
clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available
|
||||||
|
|
||||||
auto sq = mqtt->getSubscriptionQueue();
|
auto sq = mqtt->getSubscriptionQueue();
|
||||||
@ -201,16 +191,15 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
|||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
default:
|
default: // Invalid message
|
||||||
{
|
{
|
||||||
// ignore
|
// ignore
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default: // invalid command / message
|
||||||
{
|
{
|
||||||
// invalid command
|
|
||||||
DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload);
|
DIAG(F("MQTT Invalid DCC-EX command: %s"), (char *)payload);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -473,26 +462,33 @@ void checkSubscribers(Queue<int> &sq, csmqttclient_t *clients)
|
|||||||
if (Diag::MQTT)
|
if (Diag::MQTT)
|
||||||
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
||||||
|
|
||||||
// send the topicid on which the CS will listen for commands to the MQTT client on the root topic
|
// send the topic on which the CS will listen for commands and the ones on which it will publish for the connecting
|
||||||
char buffer[30];
|
// client to pickup. Once the connecting client has setup other topic setup messages on the main channel shall be
|
||||||
memset(buffer, 0, 30);
|
// ignored
|
||||||
sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic);
|
// JSON message { init: <number> channels: {result: <string>, diag: <string> }}
|
||||||
|
|
||||||
|
char buffer[MAXPAYLOAD*2];
|
||||||
|
memset(buffer, 0, MAXPAYLOAD*2);
|
||||||
|
|
||||||
|
// sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic);
|
||||||
|
|
||||||
|
sprintf(buffer, "{ \"init\": %d, \"subscribeto\": {\"result\": \"%s/%ld/result\" , \"diag\": \"%s/%ld/diag\" }, \"publishto\": {\"cmd\": \"%s/%ld/cmd\" } }",
|
||||||
|
(int)clients[s].distant,
|
||||||
|
mqtt->getClientID(),
|
||||||
|
clients[s].topic,
|
||||||
|
mqtt->getClientID(),
|
||||||
|
clients[s].topic,
|
||||||
|
mqtt->getClientID(),
|
||||||
|
clients[s].topic
|
||||||
|
);
|
||||||
|
|
||||||
if (Diag::MQTT)
|
if (Diag::MQTT)
|
||||||
DIAG(F("MQTT Publishing: [%s] to [%s]"), buffer, mqtt->getClientID());
|
DIAG(F("MQTT channel setup message: [%s]"), buffer);
|
||||||
|
|
||||||
mqtt->publish(mqtt->getClientID(), buffer);
|
mqtt->publish(mqtt->getClientID(), buffer);
|
||||||
|
|
||||||
// on the cs side all is set and we declare that the cs is open for business
|
// on the cs side all is set and we declare that the cs is open for business
|
||||||
clients[s].open = true;
|
clients[s].open = true;
|
||||||
|
|
||||||
// we now need to subscribe to the ../clientid/topicid/cmd topic as we shall recieve the cmds from there
|
|
||||||
// in the < case we should test that we got the command on the right topic ...
|
|
||||||
DIAG(F("MQTT CS is listening for commands on [%s]"), tbuffer);
|
|
||||||
memset(buffer, 0, 30);
|
|
||||||
sprintf(buffer, "%s/%ld/result", mqtt->getClientID(), clients[s].topic);
|
|
||||||
DIAG(F("MQTT CS is publishing return information to [%s]"), buffer);
|
|
||||||
memset(buffer, 0, 30);
|
|
||||||
sprintf(buffer, "%s/%ld/diag", mqtt->getClientID(), clients[s].topic);
|
|
||||||
DIAG(F("MQTT CS is publishing diagnostic information to [%s]"), buffer);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user