mirror of
https://github.com/DCC-EX/CommandStation-EX.git
synced 2024-11-27 01:56:14 +01:00
MQ and Ethenet support are independent
This commit is contained in:
parent
598fb116a1
commit
8f2f052e2a
85
DccMQTT.cpp
85
DccMQTT.cpp
|
@ -117,7 +117,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||||
errno = 0;
|
errno = 0;
|
||||||
payload[length] = '\0'; // make sure we have the string terminator in place
|
payload[length] = '\0'; // make sure we have the string terminator in place
|
||||||
|
|
||||||
// DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length);
|
DIAG(F("MQTT Callback:[%s] [%s] [%d]"), topic, (char *)payload, length);
|
||||||
|
|
||||||
switch (payload[0])
|
switch (payload[0])
|
||||||
{
|
{
|
||||||
|
@ -175,8 +175,9 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||||
|
|
||||||
csmsg_t tm; // topic message
|
csmsg_t tm; // topic message
|
||||||
|
|
||||||
if ( length + 1 > MAXPAYLOAD) {
|
if (length + 1 > MAXPAYLOAD)
|
||||||
DIAG(F("MQTT Command too long (> %d characters)"),MAXPAYLOAD);
|
{
|
||||||
|
DIAG(F("MQTT Command too long (> %d characters)"), MAXPAYLOAD);
|
||||||
}
|
}
|
||||||
strlcpy(tm.cmd, (char *)payload, length + 1); // message payload
|
strlcpy(tm.cmd, (char *)payload, length + 1); // message payload
|
||||||
tm.mqsocket = mqsocket; // on which socket did we recieve the mq message
|
tm.mqsocket = mqsocket; // on which socket did we recieve the mq message
|
||||||
|
@ -200,8 +201,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||||
{
|
{
|
||||||
case 'i':
|
case 'i':
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
char buffer[30];
|
char buffer[30];
|
||||||
char *tmp = (char *)payload + 3;
|
char *tmp = (char *)payload + 3;
|
||||||
strlcpy(buffer, tmp, length);
|
strlcpy(buffer, tmp, length);
|
||||||
|
@ -301,11 +301,11 @@ void DccMQTT::connect()
|
||||||
{
|
{
|
||||||
|
|
||||||
int reconnectCount = 0;
|
int reconnectCount = 0;
|
||||||
|
connectID[0] = '\0';
|
||||||
|
|
||||||
// Build the connect ID : Prefix + clientID
|
// Build the connect ID : Prefix + clientID
|
||||||
if (broker->prefix != nullptr)
|
if (broker->prefix != nullptr)
|
||||||
{
|
{
|
||||||
connectID[0] = '\0';
|
|
||||||
strcpy_P(connectID, (const char *)broker->prefix);
|
strcpy_P(connectID, (const char *)broker->prefix);
|
||||||
}
|
}
|
||||||
strcat(connectID, clientID);
|
strcat(connectID, clientID);
|
||||||
|
@ -324,6 +324,7 @@ void DccMQTT::connect()
|
||||||
if (mqttClient.connect(connectID))
|
if (mqttClient.connect(connectID))
|
||||||
{
|
{
|
||||||
DIAG(F("MQTT Broker connected ..."));
|
DIAG(F("MQTT Broker connected ..."));
|
||||||
|
mqState = CONNECTED;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -350,6 +351,7 @@ void DccMQTT::connect()
|
||||||
if (reconnectCount == MAXRECONNECT)
|
if (reconnectCount == MAXRECONNECT)
|
||||||
{
|
{
|
||||||
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
|
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
|
||||||
|
mqState = CONNECTION_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -373,6 +375,37 @@ void DccMQTT::publish(char *topic, char *payload)
|
||||||
*/
|
*/
|
||||||
void DccMQTT::setup()
|
void DccMQTT::setup()
|
||||||
{
|
{
|
||||||
|
// setup Ethnet connection first
|
||||||
|
byte mac[6];
|
||||||
|
DCCTimer::getSimulatedMacAddress(mac);
|
||||||
|
|
||||||
|
#ifdef IP_ADDRESS
|
||||||
|
Ethernet.begin(mac, IP_ADDRESS);
|
||||||
|
#else
|
||||||
|
if (Ethernet.begin(mac) == 0)
|
||||||
|
{
|
||||||
|
DIAG(F("Ethernet.begin FAILED"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
DIAG(F("Ethernet.begin OK."));
|
||||||
|
if (Ethernet.hardwareStatus() == EthernetNoHardware)
|
||||||
|
{
|
||||||
|
DIAG(F("Ethernet shield not found"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (Ethernet.linkStatus() == LinkOFF)
|
||||||
|
{
|
||||||
|
DIAG(F("Ethernet cable not connected"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address
|
||||||
|
|
||||||
|
DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]);
|
||||||
|
DIAG(F("Port:%d"), IP_PORT);
|
||||||
|
|
||||||
|
// setup the MQBroker
|
||||||
setup(CSMQTTBROKER);
|
setup(CSMQTTBROKER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,26 +420,21 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
|
||||||
|
|
||||||
//Create the MQTT environment and establish inital connection to the Broker
|
//Create the MQTT environment and establish inital connection to the Broker
|
||||||
broker = b;
|
broker = b;
|
||||||
// get a eth client session
|
|
||||||
|
|
||||||
DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
|
DIAG(F("MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
|
||||||
ethClient = EthernetInterface::get()->getServer()->accept(); // get a cleint from the EthernetInterface
|
|
||||||
|
|
||||||
// initalize MQ Broker
|
|
||||||
mqttClient = PubSubClient(ethClient);
|
|
||||||
mqttClient.setServer(broker->ip, broker->port);
|
|
||||||
|
|
||||||
DIAG(F("MQTT Client : Server ok ...%x/%x"), ethClient, mqttClient);
|
|
||||||
|
|
||||||
mqttClient.setCallback(mqttCallback); // Initalize callback function for incomming messages
|
|
||||||
|
|
||||||
DIAG(F("MQTT Client : Callback set ..."));
|
|
||||||
|
|
||||||
byte mqbid[CLIENTIDSIZE] = {0};
|
byte mqbid[CLIENTIDSIZE] = {0};
|
||||||
DCCTimer::getSimulatedMacAddress(mqbid);
|
DCCTimer::getSimulatedMacAddress(mqbid);
|
||||||
|
|
||||||
|
// initalize MQ Broker
|
||||||
|
|
||||||
|
mqttClient = PubSubClient(broker->ip, broker->port, mqttCallback, ethClient);
|
||||||
|
DIAG(F("MQTT Client created ok..."));
|
||||||
|
|
||||||
array_to_string(mqbid, CLIENTIDSIZE, clientID);
|
array_to_string(mqbid, CLIENTIDSIZE, clientID);
|
||||||
DIAG(F("MQTT Client ID : %s"), clientID);
|
DIAG(F("MQTT Client ID : %s"), clientID);
|
||||||
|
|
||||||
connect(); // inital connection as well as reconnects
|
connect(); // inital connection as well as reconnects
|
||||||
auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions
|
auto sub = DccMQTT::subscribe(clientID); // set up all subscriptions
|
||||||
|
|
||||||
|
@ -416,7 +444,11 @@ void DccMQTT::setup(const FSH *id, MQTTBroker *b)
|
||||||
|
|
||||||
void DccMQTT::loop()
|
void DccMQTT::loop()
|
||||||
{
|
{
|
||||||
|
// Connection impossible so just don't do anything
|
||||||
|
if (mqState == CONNECTION_FAILED)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!mqttClient.connected())
|
if (!mqttClient.connected())
|
||||||
{
|
{
|
||||||
connect();
|
connect();
|
||||||
|
@ -426,26 +458,26 @@ void DccMQTT::loop()
|
||||||
DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state());
|
DIAG(F("mqttClient returned with error; state: %d"), mqttClient.state());
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
// read incomming queue for processing; one per loop
|
// read incomming queue for processing; one per loop
|
||||||
bool state;
|
bool state;
|
||||||
|
DIAG(F("in.count: %d"), in.count());
|
||||||
if (in.count() > 0)
|
if (in.count() > 0)
|
||||||
{
|
{
|
||||||
auto idx = in.peek();
|
auto idx = in.peek();
|
||||||
auto c = pool.getItem(in.pop(), &state);
|
auto c = pool.getItem(in.pop(), &state);
|
||||||
DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket);
|
DIAG(F("MQTT Processing pool: %d with command: %s from client %d"), idx, c->cmd, c->mqsocket);
|
||||||
DIAG(F("Ring free space1: %d"),outboundRing->freeSpace());
|
DIAG(F("Ring free space1: %d"), outboundRing->freeSpace());
|
||||||
outboundRing->mark((uint8_t)c->mqsocket);
|
outboundRing->mark((uint8_t)c->mqsocket);
|
||||||
// CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing);
|
CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing);
|
||||||
StringFormatter::send(outboundRing, F("Test result message"));
|
// StringFormatter::send(outboundRing, F("Test result message"));
|
||||||
outboundRing->commit();
|
outboundRing->commit();
|
||||||
DIAG(F("Ring free space2: %d"),outboundRing->freeSpace());
|
DIAG(F("Ring free space2: %d"), outboundRing->freeSpace());
|
||||||
pool.returnItem(idx);
|
pool.returnItem(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// handle at most 1 outbound transmission
|
// handle at most 1 outbound transmission
|
||||||
int socketOut = outboundRing->read();
|
int socketOut = outboundRing->read();
|
||||||
|
DIAG(F("socketOut: %d"), socketOut);
|
||||||
if (socketOut > 0) // mqsocket / clientid can't be 0 ....
|
if (socketOut > 0) // mqsocket / clientid can't be 0 ....
|
||||||
{
|
{
|
||||||
int count = outboundRing->count();
|
int count = outboundRing->count();
|
||||||
|
@ -454,7 +486,7 @@ void DccMQTT::loop()
|
||||||
DIAG(F("MQTT publish to mqsocket=%d, count=:%d on topic %s"), socketOut, count, buffer);
|
DIAG(F("MQTT publish to mqsocket=%d, count=:%d on topic %s"), socketOut, count, buffer);
|
||||||
// construct the payload
|
// construct the payload
|
||||||
char payload[count];
|
char payload[count];
|
||||||
payload[count]='\0';
|
payload[count] = '\0';
|
||||||
char *tmp = payload;
|
char *tmp = payload;
|
||||||
for (; count > 0; count--)
|
for (; count > 0; count--)
|
||||||
{
|
{
|
||||||
|
@ -465,5 +497,4 @@ void DccMQTT::loop()
|
||||||
DIAG(F("MQTT publish with payload:\n%s"), payload);
|
DIAG(F("MQTT publish with payload:\n%s"), payload);
|
||||||
// mqtt->publish(buffer, payload);
|
// mqtt->publish(buffer, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user