1
0
mirror of https://github.com/DCC-EX/CommandStation-EX.git synced 2025-07-28 18:03:45 +02:00

Compare commits

...

48 Commits

Author SHA1 Message Date
Gregor Baues
e721303ed1 Cleanup diags 2021-06-09 08:58:27 +02:00
Gregor Baues
e742a8b120 Merge branch 'RefactorCallbacks' into MQTT 2021-06-09 08:53:07 +02:00
Gregor Baues
1b687808cb Refactored 2021-06-09 08:49:10 +02:00
Gregor Baues
21e19128f9 Added ifndef to the <s> command not showing T/S/O 2021-06-07 09:42:46 +02:00
Gregor Baues
c30f1c7cbe freememory() available more generally
fixed some mem diags
2021-06-04 13:46:21 +02:00
Gregor Baues
083c73ebc3 Wait for ENC based chips
Memory measurements (temp)
2021-06-04 11:09:22 +02:00
Gregor Baues
9fcc69d273 Diag output over MQTT added 2021-06-03 21:42:14 +02:00
Gregor Baues
e51279b202 loopPing added
check for returning diag messages
on the main channel
2021-06-02 20:41:48 +02:00
Gregor Baues
3c89d713fd added StringLogger 2021-06-02 20:21:30 +02:00
Gregor Baues
dc9368665d fixed includes 2021-05-25 10:50:02 +02:00
Gregor Baues
6b6ae4e904 disabled WIFI andEthernet enabled MQTT 2021-05-20 16:31:13 +02:00
Gregor Baues
a52e864a6f deleted unnecessary files 2021-05-20 16:27:01 +02:00
Gregor Baues
d72636b4ec Minor fix removing misleading message 2021-05-20 15:40:04 +02:00
Gregor Baues
a33b463c74 updated config example
check for MQTT / Ethernet setup
2021-05-19 17:21:27 +02:00
Gregor Baues
41ed2aeeeb Merge branch 'master' into MQTT 2021-05-19 15:49:14 +02:00
Gregor Baues
a58bc63764 Merge branch 'master' of https://github.com/DCC-EX/CommandStation-EX 2021-05-19 15:38:05 +02:00
Gregor Baues
b59def8e7b V1.0-alpha 2021-05-19 15:37:39 +02:00
Gregor Baues
be5ca00b7e Fixed constructor 2021-05-19 15:33:08 +02:00
Gregor Baues
b5520f13ba Broker handling / connection updated 2021-05-19 15:31:30 +02:00
Gregor Baues
428628f6f0 Minor updates/cleanup 2021-05-19 11:37:20 +02:00
Gregor Baues
c3abb0018d Working inital version 2021-05-19 09:49:18 +02:00
Gregor Baues
981453d399 State and define.h changed 2021-05-14 10:53:52 +02:00
Gregor Baues
8f2f052e2a MQ and Ethenet support are independent 2021-05-14 10:53:17 +02:00
Gregor Baues
598fb116a1 still with Gremlin 2021-05-12 22:09:07 +02:00
Gregor Baues
ce154abe94 still with Gremlin 2021-05-12 22:08:46 +02:00
Gregor Baues
6fd866d273 ok with gremlin after 3/4 recieves 2021-05-12 21:30:15 +02:00
Gregor Baues
35d81cd848 working with multiple clients and
the ringstream for processing
2021-05-12 09:23:48 +02:00
Gregor Baues
508b1fcfce subscriber topics ok 2021-05-07 09:08:09 +02:00
Gregor Baues
0b0744cc94 channel setup ok
channel subscrition not ok yet
(maye be bc its done during the
callback)
2021-05-06 13:06:16 +02:00
Gregor Baues
595b6bad93 start adding MQTT client channel implementation 2021-05-05 11:23:59 +02:00
Gregor Baues
c042240019 Added ObjectPool 2021-05-04 10:35:07 +02:00
Gregor Baues
866833a19e update #2 2021-05-03 09:18:53 +02:00
Gregor Baues
851228fba6 update #1 2021-05-03 09:05:05 +02:00
Gregor Baues
6bd9e28be4 MQTT firt send/recv ok 2021-05-01 12:38:27 +02:00
Gregor Baues
da85e4e245 update 2021-04-27 17:01:21 +02:00
Gregor Baues
a6a36b50e3 Broker definition reconfig 2021-04-27 17:01:03 +02:00
Gregor Baues
4efa260003 MQ Init 2021-04-27 15:03:15 +02:00
Gregor Baues
7442e3452e update 2021-04-27 14:20:49 +02:00
Gregor Baues
caaad92887 update 2021-04-27 11:44:22 +02:00
Gregor Baues
c740e25cc1 update 2021-04-27 11:44:09 +02:00
Gregor Baues
8acfdc6190 update 2021-04-27 10:58:39 +02:00
Gregor Baues
f48965a676 updated 2021-04-27 10:58:12 +02:00
Gregor Baues
afc01f1967 More inital setup for MQTT 2021-04-27 09:56:16 +02:00
Gregor Baues
8fc1470e4e Conditional compile for MQTT 2021-04-25 21:59:39 +02:00
Gregor Baues
33f2474c20 Merge branch 'master' of https://github.com/DCC-EX/CommandStation-EX 2021-04-23 12:18:55 +02:00
Gregor Baues
35c5e875d3 addes v1 of the patch sh script 2021-04-16 21:24:16 +02:00
Gregor Baues
6166484783 update driver for my shield 2021-04-16 15:27:49 +02:00
Gregor Baues
4ab21294ce enable eth & wifi at the same time 2021-04-14 10:23:32 +02:00
27 changed files with 1723 additions and 94 deletions

View File

@@ -20,12 +20,20 @@
#include "CommandDistributor.h"
#include "WiThrottle.h"
DCCEXParser * CommandDistributor::parser=0;
DCCEXParser *CommandDistributor::parser = 0;
void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * streamer) {
if (buffer[0] == '<') {
if (!parser) parser = new DCCEXParser();
parser->parse(streamer, buffer, streamer);
void CommandDistributor::parse(byte clientId, byte *buffer, RingStream *streamer)
{
if (buffer[0] == '<')
{
if (!parser)
{
parser = new DCCEXParser();
}
parser->parse(streamer, buffer, streamer);
}
else
{
WiThrottle::getThrottle(clientId)->parse(streamer, buffer);
}
else WiThrottle::getThrottle(clientId)->parse(streamer, buffer);
}

View File

@@ -1,29 +1,28 @@
////////////////////////////////////////////////////////////////////////////////////
// DCC-EX CommandStation-EX Please see https://DCC-EX.com
// DCC-EX CommandStation-EX Please see https://DCC-EX.com
//
// This file is the main sketch for the Command Station.
//
// CONFIGURATION:
//
// CONFIGURATION:
// Configuration is normally performed by editing a file called config.h.
// This file is NOT shipped with the code so that if you pull a later version
// of the code, your configuration will not be overwritten.
//
// If you used the automatic installer program, config.h will have been created automatically.
//
// To obtain a starting copy of config.h please copy the file config.example.h which is
// shipped with the code and may be updated as new features are added.
//
//
// To obtain a starting copy of config.h please copy the file config.example.h which is
// shipped with the code and may be updated as new features are added.
//
// If config.h is not found, config.example.h will be used with all defaults.
////////////////////////////////////////////////////////////////////////////////////
#if __has_include ( "config.h")
#include "config.h"
#if __has_include("config.h")
#include "config.h"
#else
#warning config.h not found. Using defaults from config.example.h
#include "config.example.h"
#warning config.h not found. Using defaults from config.example.h
#include "config.example.h"
#endif
/*
* © 2020,2021 Chris Harlow, Harald Barth, David Cutting,
* Fred Decker, Gregor Baues, Anthony W - Dayton All rights reserved.
@@ -43,10 +42,9 @@
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*/
#include "DCCEX.h"
// Create a serial command parser for the USB connection,
// Create a serial command parser for the USB connection,
// This supports JMRI or manual diagnostics and commands
// to be issued from the USB serial console.
DCCEXParser serialParser;
@@ -58,14 +56,15 @@ void setup()
// Responsibility 1: Start the usb connection for diagnostics
// This is normally Serial but uses SerialUSB on a SAMD processor
Serial.begin(115200);
CONDITIONAL_LCD_START {
// This block is still executed for DIAGS if LCD not in use
LCD(0,F("DCC++ EX v%S"),F(VERSION));
LCD(1,F("Starting"));
}
// Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi
CONDITIONAL_LCD_START
{
// This block is still executed for DIAGS if LCD not in use
LCD(0, F("DCC++ EX v%S"), F(VERSION));
LCD(1, F("Starting"));
}
// Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi
#if WIFI_ON
WifiInterface::setup(WIFI_SERIAL_LINK_SPEED, F(WIFI_SSID), F(WIFI_PASSWORD), F(WIFI_HOSTNAME), IP_PORT, WIFI_CHANNEL);
@@ -75,6 +74,10 @@ void setup()
EthernetInterface::setup();
#endif // ETHERNET_ON
#if MQTT_ON
MQTTInterface::setup();
#endif
// Responsibility 3: Start the DCC engine.
// Note: this provides DCC with two motor drivers, main and prog, which handle the motor shield(s)
// Standard supported devices have pre-configured macros but custome hardware installations require
@@ -82,25 +85,26 @@ void setup()
// STANDARD_MOTOR_SHIELD, POLOLU_MOTOR_SHIELD, FIREBOX_MK1, FIREBOX_MK1S are pre defined in MotorShields.h
DCC::begin(MOTOR_SHIELD_TYPE);
#if defined(RMFT_ACTIVE)
RMFT::begin();
#endif
DCC::begin(MOTOR_SHIELD_TYPE);
#if __has_include ( "mySetup.h")
#define SETUP(cmd) serialParser.parse(F(cmd))
#include "mySetup.h"
#undef SETUP
#endif
#if defined(RMFT_ACTIVE)
RMFT::begin();
#endif
#if defined(LCN_SERIAL)
LCN_SERIAL.begin(115200);
LCN::init(LCN_SERIAL);
#endif
#if __has_include("mySetup.h")
#define SETUP(cmd) serialParser.parse(F(cmd))
#include "mySetup.h"
#undef SETUP
#endif
LCD(1,F("Ready"));
#if defined(LCN_SERIAL)
LCN_SERIAL.begin(115200);
LCN::init(LCN_SERIAL);
#endif
LCD(1, F("Ready"));
}
void loop()
@@ -118,27 +122,32 @@ void loop()
#if WIFI_ON
WifiInterface::loop();
#endif
#if ETHERNET_ON
EthernetInterface::loop();
#endif
#if defined(RMFT_ACTIVE)
#if MQTT_ON
MQTTInterface::loop();
#endif
#if defined(RMFT_ACTIVE)
RMFT::loop();
#endif
#if defined(LCN_SERIAL)
LCN::loop();
#endif
#if defined(LCN_SERIAL)
LCN::loop();
#endif
LCDDisplay::loop(); // ignored if LCD not in use
LCDDisplay::loop(); // ignored if LCD not in use
// Report any decrease in memory (will automatically trigger on first call)
static int ramLowWatermark = __INT_MAX__; // replaced on first loop
static int ramLowWatermark = __INT_MAX__; // replaced on first loop
int freeNow = minimumFreeMemory();
if (freeNow < ramLowWatermark)
{
ramLowWatermark = freeNow;
LCD(2,F("Free RAM=%5db"), ramLowWatermark);
LCD(2, F("Free RAM=%5db"), ramLowWatermark);
}
}

View File

@@ -10,9 +10,15 @@
#include "DCCEXParser.h"
#include "version.h"
#include "WifiInterface.h"
#if ETHERNET_ON == true
#include "EthernetInterface.h"
#endif
#if MQTT_ON == true
#include "MQTTInterface.h"
#endif
#include "LCD_Implementation.h"
#include "LCN.h"
#include "freeMemory.h"

View File

@@ -56,6 +56,7 @@ const int16_t HASH_KEYWORD_LCN = 15137;
const int16_t HASH_KEYWORD_RESET = 26133;
const int16_t HASH_KEYWORD_SPEED28 = -17064;
const int16_t HASH_KEYWORD_SPEED128 = 25816;
const int16_t HASH_KEYWORD_MQTT = 28220;
int16_t DCCEXParser::stashP[MAX_COMMAND_PARAMS];
bool DCCEXParser::stashBusy;
@@ -500,9 +501,11 @@ void DCCEXParser::parse(Print *stream, byte *com, RingStream * ringStream)
case 's': // <s>
StringFormatter::send(stream, F("<p%d>\n"), DCCWaveform::mainTrack.getPowerMode() == POWERMODE::ON);
StringFormatter::send(stream, F("<iDCC-EX V-%S / %S / %S G-%S>\n"), F(VERSION), F(ARDUINO_TYPE), DCC::getMotorShieldName(), F(GITHUB_SHA));
#ifndef MQTT_ON // the return can get really large depending on the #of items defined and the outputbuffer will overflow
Turnout::printAll(stream); //send all Turnout states
Output::printAll(stream); //send all Output states
Sensor::printAll(stream); //send all Sensor states
#endif
// TODO Send stats of speed reminders table
return;
@@ -802,6 +805,10 @@ bool DCCEXParser::parseD(Print *stream, int16_t params, int16_t p[])
StringFormatter::send(stream, F("128 Speedsteps"));
return true;
case HASH_KEYWORD_MQTT: // <D LCN ON/OFF>
Diag::MQTT = onOff;
return true;
default: // invalid/unknown
break;
}

7
DIAG.h
View File

@@ -16,10 +16,15 @@
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef DIAG_h
#define DIAG_h
#include "StringFormatter.h"
#define DIAG StringFormatter::diag
#include "DiagLogger.h"
// #define DIAG StringFormatter::diag // Std logging to serial only
#define DIAG DiagLogger::get().diag // allows to add other log writers
#define LCD StringFormatter::lcd
#endif

63
DiagLogger.cpp Normal file
View File

@@ -0,0 +1,63 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#include "DiagLogger.h"
// DIAG.h the #define DIAG points to here ...
// EthernetSetup , Wifisetup, etc can register a function to be called allowing the channel
// to publish the diag info to
// serial is default end enabled all the time
DiagLogger DiagLogger::singleton; // static instantiation;
void DiagLogger::addDiagWriter(DiagWriter l) {
if ( registered == MAXWRITERS ) {
Serial.println("Error: Max amount of writers exceeded.");
return;
}
writers[registered] = l;
registered++;
}
void DiagLogger::diag(const FSH *input,...)
{
va_list args;
va_start(args, input);
int len = 0;
len += sprintf(&b1[len], "<* ");
len += vsprintf_P(&b1[len], (const char *)input, args);
len += sprintf(&b1[len], " *>\n");
if ( len >= 256 ) { Serial.print("ERROR : Diag Buffer overflow"); return; }
// allways print to Serial
Serial.print(b1);
// callback the other registered diag writers
for (size_t i = 0; i < (size_t) registered; i++)
{
writers[i](b1, len);
}
va_end(args);
}

59
DiagLogger.h Normal file
View File

@@ -0,0 +1,59 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#ifndef DiagLogger_h
#define DiagLogger_h
#include "StringFormatter.h"
#define MAXWRITERS 10
typedef void (*DiagWriter)(const char *msg, const int length);
class DiagLogger
{
private:
// Methods
DiagLogger() = default;
DiagLogger(const DiagLogger &); // non construction-copyable
DiagLogger &operator=(const DiagLogger &); // non copyable
// Members
static DiagLogger singleton; // unique instance of the MQTTInterface object
DiagWriter writers[MAXWRITERS];
int registered = 0; // number of registered writers ( Serial is not counted as always used )
char b1[256];
public:
// Methods
static DiagLogger &get() noexcept
{ // return a reference to the unique instance
return singleton;
}
void diag(const FSH *input...);
void addDiagWriter(DiagWriter l);
~DiagLogger() = default;
// Members
};
#endif

View File

@@ -51,14 +51,26 @@ class EthernetInterface {
public:
static void setup();
static void loop();
static void setup();
static void loop();
bool isConnected() { return connected; };
static EthernetInterface *get() { return singleton; };
EthernetClient *getClient(int socket) { return &clients[socket]; };
EthernetServer *getServer() { return server; };
~EthernetInterface() = default;
private:
static EthernetInterface * singleton;
bool connected;
EthernetInterface();
void loop2();
EthernetInterface();
EthernetInterface(const EthernetInterface&); // non construction-copyable
EthernetInterface& operator=( const EthernetInterface& ); // non copyable
static EthernetInterface * singleton;
bool connected;
void loop2();
EthernetServer * server;
EthernetClient clients[MAX_SOCK_NUM]; // accept up to MAX_SOCK_NUM client connections at the same time; This depends on the chipset used on the Shield
uint8_t buffer[MAX_ETH_BUFFER+1]; // buffer used by TCP for the recv

42
MQTTBrokers.h Normal file
View File

@@ -0,0 +1,42 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#ifndef _MQTTBrokers_h_
#define _MQTTBrokers_h_
// Defines preconfigured mqtt broker configurations
// EthernetShields / Arduino do not support secure transport i.e. on either port 443 or 8883 for MQTTS on most broker installations
// Once we support the ESP / Wifi as Transport medium we may get TLS capabilities for data in transit i.e. can use the 443/8883 ports
#define MQPWD F(MQTT_PWD)
#define MQUID F(MQTT_USER)
#define MQPREFIX F(MQTT_PREFIX)
// Cloud server provided by the DccEX team for testing purposes; apply for a uid/pwd on discord
#define DCCEX_MQTT_BROKER F("DccexMQ"), new MQTTBroker( 9883, F("dcclms.modelrailroad.ovh"), MQUID, MQPWD, MQPREFIX)
// Mosquitto test server
#define DCCEX_MOSQUITTO F("Mosquitto"), new MQTTBroker(1883, F("test.mosquitto.org"))
// HiveMQ test server
#define DCCEX_HIVEMQ F("HiveMQ"), new MQTTBroker(1883, F("broker.hivemq.com"))
#endif

180
MQTTCallbackHandlers.cpp Normal file
View File

@@ -0,0 +1,180 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#if __has_include("config.h")
#include "config.h"
#else
#warning config.h not found. Using defaults from config.example.h
#include "config.example.h"
#endif
#include "defines.h"
#include <errno.h>
#include <limits.h>
#include "MQTTInterface.h"
// Fwd decl for the callback handlers
void mqttDCCEXCallback(MQTTInterface *mqtt, csmsg_t &tm);
void mqttProtocolCallback(MQTTInterface *mqtt, csmsg_t &tm);
void mqttMCallback(MQTTInterface *mqtt, csmsg_t &tm);
typedef void (*CallbackFunc)(MQTTInterface *mqtt, csmsg_t &tm);
template<class M, class N>
struct CallbackFunction {
M first;
N second;
};
using CallbackFunctions = CallbackFunction<char, CallbackFunc>[MAX_CALLBACKS];
// lookup table for the protocol handle functions
constexpr CallbackFunctions vec = {
{'<', mqttDCCEXCallback},
{'{', mqttProtocolCallback},
{'m', mqttMCallback}
};
long cantorEncode(long a, long b)
{
return (((a + b) * (a + b + 1)) / 2) + b;
}
void cantorDecode(int32_t c, int *a, int *b)
{
int w = floor((sqrt(8 * c + 1) - 1) / 2);
int t = (w * (w + 1)) / 2;
*b = c - t;
*a = w - *b;
}
/**
* @brief lookup of the proper function for < or { based commands
*
* @param c
* @return CallbackFunc
*/
auto protocolDistributor(const char c) -> CallbackFunc {
for (auto &&f : vec)
{
if (f.first == c)
return f.second;
}
return nullptr;
}
void protocolHandler(MQTTInterface *mqtt, csmsg_t &tm) {
protocolDistributor(tm.cmd[0])(mqtt, tm);
}
/**
* @brief Callback for handling 'm' MQTT Protocol commands (deprecated)
* @deprecated to be replaced by '{' commands in simple JSON format
*/
void mqttMCallback(MQTTInterface *mqtt, csmsg_t &tm)
{
auto clients = mqtt->getClients();
// DIAG(F("MQTT m - Callback"));
switch (tm.cmd[1])
{
case 'i': // Inital handshake message to create the tunnel
{
char buffer[MAXPAYLOAD];
char *tmp = tm.cmd + 3;
auto length = strlen(tm.cmd);
strlcpy(buffer, tmp, length);
buffer[length - 4] = '\0';
// DIAG(F("MQTT buffer %s - %s - %s - %d"), tm.cmd, tmp, buffer, length);
auto distantid = strtol(buffer, NULL, 10);
if (errno == ERANGE || distantid > UCHAR_MAX)
{
DIAG(F("MQTT Invalid Handshake ID; must be between 0 and 255"));
return;
}
if (distantid == 0)
{
DIAG(F("MQTT Invalid Handshake ID"));
return;
}
// Create a new MQTT client
auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
if (subscriberid == 0)
{
DIAG(F("MQTT no more connections are available"));
return;
}
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid);
// extract the number delivered from & initalize the new mqtt client object
clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available
auto sq = mqtt->getSubscriptionQueue();
sq->push(subscriberid);
return;
}
default:
{
return;
}
}
}
/**
* @brief Callback for handling '{' MQTT Protocol commands
*/
void mqttProtocolCallback(MQTTInterface *mqtt, csmsg_t &tm)
{
// DIAG(F("MQTT Protocol - Callback"));
}
/**
* @brief Callback for handling '<' DccEX commands
*/
void mqttDCCEXCallback(MQTTInterface *mqtt, csmsg_t &tm)
// void mqttDCCEXCallback(MQTTInterface *mqtt, char *topic, char *payload, unsigned int length)
{
// DIAG(F("MQTT DCCEX - Callback"));
if (!tm.mqsocket)
{
DIAG(F("MQTT Can't identify sender; command send on wrong topic"));
return;
}
int idx = mqtt->getPool()->setItem(tm); // Add the recieved command to the pool
if (idx == -1)
{
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
return;
}
mqtt->getIncomming()->push(idx); // Add the index of the pool item to the incomming queue
// don't show the topic as we would have to save it also just like the payload
if (Diag::MQTT)
DIAG(F("MQTT Message arrived: [%s]"), tm.cmd);
}

540
MQTTInterface.cpp Normal file
View File

@@ -0,0 +1,540 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#if __has_include("config.h")
#include "config.h"
#else
#warning config.h not found. Using defaults from config.example.h
#include "config.example.h"
#endif
#include "defines.h"
#include <errno.h>
#include <limits.h>
#include "MQTTInterface.h"
#include "MQTTBrokers.h"
#include "DCCTimer.h"
#include "CommandDistributor.h"
#include "freeMemory.h"
MQTTInterface *MQTTInterface::singleton = NULL;
void protocolHandler(MQTTInterface *mqtt, csmsg_t &tm);
/**
* @brief callback used from DIAG to send diag messages to the broker / clients
*
* @param msg
* @param length
*/
void mqttDiag(const char *msg, const int length)
{
if (MQTTInterface::get()->getState() == CONNECTED)
{
// if not connected all goes only to Serial;
// if CONNECTED we have at least the root topic subscribed to
auto mqSocket = MQTTInterface::get()->getActive();
char topic[MAXTSTR];
memset(topic, 0, MAXTSTR);
if (mqSocket == 0)
{ // send to root topic of the commandstation as it doen't concern a specific client at this point
sprintf(topic, "%s", MQTTInterface::get()->getClientID());
}
else
{
sprintf(topic, "%s/%ld/diag", MQTTInterface::get()->getClientID(), MQTTInterface::get()->getClients()[mqSocket].topic);
}
// Serial.print(" ---- MQTT pub to: ");
// Serial.print(topic);
// Serial.print(" Msg: ");
// Serial.print(msg);
MQTTInterface::get()->publish(topic, msg);
}
}
void MQTTInterface::setup()
{
DiagLogger::get().addDiagWriter(mqttDiag);
singleton = new MQTTInterface();
if (!singleton->connected)
{
singleton = NULL;
}
if (Diag::MQTT)
DIAG(F("MQTT Interface instance: [%x] - Setup done"), singleton);
};
MQTTInterface::MQTTInterface()
{
this->connected = this->setupNetwork();
if (!this->connected)
{
DIAG(F("Network setup failed"));
}
else
{
this->setup(CSMQTTBROKER);
}
this->outboundRing = new RingStream(OUT_BOUND_SIZE);
};
/**
* @brief determine the mqsocket from a topic
*
* @return byte the mqsocketid for the message recieved
*/
byte senderMqSocket(MQTTInterface *mqtt, char *topic)
{
// list of all available clients from which we can determine the mqsocket
auto clients = mqtt->getClients();
const char s[2] = "/"; // topic delimiter is /
char *token;
byte mqsocket = 0;
/* get the first token = ClientID */
token = strtok(topic, s);
/* get the second token = topicID */
token = strtok(NULL, s);
if (token != NULL) // topic didn't contain any topicID
{
auto topicid = atoi(token);
// verify that there is a MQTT client with that topic id connected
// check in the array of clients if we have one with the topicid
// start at 1 as 0 is not allocated as mqsocket
for (int i = 1; i <= mqtt->getClientSize(); i++)
{
if (clients[i].topic == topicid)
{
mqsocket = i;
break; // we are done
}
}
// if we get here we have a topic but no associated client
}
// if mqsocket == 0 here we haven't got any Id in the topic string
return mqsocket;
}
/**
* @brief MQTT Interface callback recieving all incomming messages from the PubSubClient
*
* @param topic
* @param payload
* @param length
*/
void mqttCallback(char *topic, byte *pld, unsigned int length)
{
// it's a bounced diag message ignore in all cases
// but it should not be necessary here .. that means the active mqsocket is wrong when sending to diag message
if ((pld[0] == '<') && (pld[1] == '*'))
{
return;
}
// ignore anything above the PAYLOAD limit of 64 char which should be enough
// in general things rejected here is the bounce of the inital messages setting up the chnanel etc
if (length >= MAXPAYLOAD)
{
return;
}
MQTTInterface *mqtt = MQTTInterface::get();
csmsg_t tm; // topic message
// FOR DIAGS and MQTT ON in the callback we need to copy the payload buffer
// as during the publish of the diag messages the original payload gets destroyed
// so we setup the csmsg_t now to save yet another buffer
// if tm not used it will just be discarded at the end of the function call
memset(tm.cmd, 0, MAXPAYLOAD); // Clean up the cmd buffer - should not be necessary
strlcpy(tm.cmd, (char *)pld, length + 1); // Message payload
tm.mqsocket = senderMqSocket(mqtt, topic); // On which socket did we recieve the mq message
mqtt->setActive(tm.mqsocket); // connection from where we recieved the command is active now
if (Diag::MQTT)
DIAG(F("MQTT Callback:[%s/%d] [%s] [%d] on interface [%x]"), topic, tm.mqsocket, tm.cmd, length, mqtt);
protocolHandler(mqtt, tm);
}
/**
* @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID
*
* @param array array containing bytes
* @param len length of the array
* @param buffer buffer to which the string will be written; make sure the buffer has appropriate length
*/
static void array_to_string(byte array[], unsigned int len, char buffer[])
{
for (unsigned int i = 0; i < len; i++)
{
byte nib1 = (array[i] >> 4) & 0x0F;
byte nib2 = (array[i] >> 0) & 0x0F;
buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA;
buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA;
}
buffer[len * 2] = '\0';
}
/**
* @brief Connect to the MQTT broker; Parameters for this function are defined in
* like the motoshield configurations there are mqtt broker configurations in config.h
*
* @param id Name provided to the broker configuration
* @param b MQTT broker object containing the main configuration parameters
*/
void MQTTInterface::setup(const FSH *id, MQTTBroker *b)
{
//Create the MQTT environment and establish inital connection to the Broker
broker = b;
DIAG(F("[%d] MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), freeMemory(), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
// initalize MQ Broker
mqttClient = new PubSubClient(broker->ip, broker->port, mqttCallback, ethClient);
if (Diag::MQTT)
DIAG(F("MQTT Client created ok..."));
array_to_string(mac, CLIENTIDSIZE, clientID);
DIAG(F("[%d] MQTT Client ID : %s"), freeMemory(), clientID);
connect(); // inital connection as well as reconnects
}
/**
* @brief MQTT broker connection / reconnection
*
*/
void MQTTInterface::connect()
{
int reconnectCount = 0;
connectID[0] = '\0';
// Build the connect ID : Prefix + clientID
if (broker->prefix != nullptr)
{
strcpy_P(connectID, (const char *)broker->prefix);
}
strcat(connectID, clientID);
// Connect to the broker
DIAG(F("[%d] MQTT %s (re)connecting ..."), freeMemory(), connectID);
while (!mqttClient->connected() && reconnectCount < MAXRECONNECT)
{
switch (broker->cType)
{
// no uid no pwd
case 1:
{ // port(p), ip(i), domain(d),
DIAG(F("[%d] MQTT Broker connecting anonymous ..."), freeMemory());
if (mqttClient->connect(connectID))
{
DIAG(F("[%d] MQTT Broker connected ..."),freeMemory());
auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber
if (Diag::MQTT)
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
mqState = CONNECTED;
}
else
{
DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state());
reconnectCount++;
}
break;
}
// with uid passwd
case 2:
{
DIAG(F("MQTT Broker connecting with uid/pwd ..."));
char user[strlen_P((const char *)broker->user)];
char pwd[strlen_P((const char *)broker->pwd)];
// need to copy from progmem to lacal
strcpy_P(user, (const char *)broker->user);
strcpy_P(pwd, (const char *)broker->pwd);
if (mqttClient->connect(connectID, user, pwd))
{
DIAG(F("MQTT Broker connected ..."));
auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber
if (Diag::MQTT)
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
mqState = CONNECTED;
}
else
{
DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state());
reconnectCount++;
}
break;
// ! add last will messages for the client
// (connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0))
}
}
if (reconnectCount == MAXRECONNECT)
{
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
mqState = CONNECTION_FAILED;
}
}
}
/**
* @brief for the time being only one topic at the root
* which is the unique clientID from the MCU
* QoS is 0 by default
*
* @param topic to subsribe to
* @return boolean true if successful false otherwise
*/
boolean MQTTInterface::subscribe(const char *topic)
{
auto res = mqttClient->subscribe(topic);
return res;
}
void MQTTInterface::publish(const char *topic, const char *payload)
{
mqttClient->publish(topic, payload);
}
/**
* @brief Connect the Ethernet network;
*
* @return true if connections was successful
*/
bool MQTTInterface::setupNetwork()
{
// setup Ethernet connection first
DIAG(F("[%d] Starting network setup ... "), freeMemory());
DCCTimer::getSimulatedMacAddress(mac);
#ifdef IP_ADDRESS
Ethernet.begin(mac, IP_ADDRESS);
#else
if (Ethernet.begin(mac) == 0)
{
DIAG(F("Ethernet.begin FAILED"));
return false;
}
#endif
DIAG(F("[%d] Ethernet.begin OK"), freeMemory());
if (Ethernet.hardwareStatus() == EthernetNoHardware)
{
DIAG(F("Ethernet shield not found"));
return false;
}
// For slower cards like the ENC courtesy @PaulS
// wait max 5 sec before bailing out on the connection
unsigned long startmilli = millis();
while ((millis() - startmilli) < 5500)
{
if (Ethernet.linkStatus() == LinkON)
break;
DIAG(F("Ethernet waiting for link (1sec) "));
delay(1000);
}
if (Ethernet.linkStatus() == LinkOFF)
{
DIAG(F("Ethernet cable not connected"));
return false;
}
DIAG(F("[%d] Ethernet link is up"),freeMemory());
IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address
DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]);
DIAG(F("Port:%d"), IP_PORT);
return true;
}
/**
* @brief handle the incomming queue in the loop
*
*/
void inLoop(Queue<int> &in, ObjectPool<csmsg_t, MAXPOOLSIZE> &pool, RingStream *outboundRing)
{
bool state;
if (in.count() > 0)
{
// pop a command index from the incomming queue and get the command from the pool
int idx = in.pop();
csmsg_t *c = pool.getItem(idx, &state);
MQTTInterface::get()->setActive(c->mqsocket); // connection from where we recieved the command is active now
// execute the command and collect results
outboundRing->mark((uint8_t)c->mqsocket);
CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing);
outboundRing->commit();
// free the slot in the command pool
pool.returnItem(idx);
}
}
/**
* @brief handle the outgoing messages in the loop
*
*/
void outLoop(PubSubClient *mq)
{
// handle at most 1 outbound transmission
MQTTInterface *mqtt = MQTTInterface::get();
auto clients = mqtt->getClients();
auto outboundRing = mqtt->getRingStream();
int mqSocket = outboundRing->read();
if (mqSocket >= 0) // mqsocket / clientid can't be 0 ....
{
int count = outboundRing->count();
char buffer[MAXTSTR];
buffer[0] = '\0';
sprintf(buffer, "%s/%d/result", mqtt->getClientID(), (int)clients[mqSocket].topic);
if (Diag::MQTT)
DIAG(F("MQTT publish to mqSocket=%d, count=:%d on topic %s"), mqSocket, count, buffer);
if (mq->beginPublish(buffer, count, false))
{
for (; count > 0; count--)
{
mq->write(outboundRing->read());
}
}
else
{
DIAG(F("MQTT error start publishing result)"));
};
if (!mq->endPublish())
{
DIAG(F("MQTT error finalizing published result)"));
};
}
}
/**
* @brief check if there are new subscribers connected and create the channels
*
* @param sq if the callback captured a client there will be an entry in the sq with the subscriber number
* @param clients the clients array where we find the info to setup the subsciptions and print out the publish topics for info
*/
void checkSubscribers(Queue<int> &sq, csmqttclient_t *clients)
{
MQTTInterface *mqtt = MQTTInterface::get();
if (sq.count() > 0)
{
// new subscriber
auto s = sq.pop();
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTSTR];
sprintf(tbuffer, "%s/%ld/cmd", mqtt->getClientID(), clients[s].topic);
auto ok = mqtt->subscribe(tbuffer);
if (Diag::MQTT)
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
// send the topic on which the CS will listen for commands and the ones on which it will publish for the connecting
// client to pickup. Once the connecting client has setup other topic setup messages on the main channel shall be
// ignored
// JSON message { init: <number> channels: {result: <string>, diag: <string> }}
char buffer[MAXPAYLOAD * 2];
memset(buffer, 0, MAXPAYLOAD * 2);
// sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic);
sprintf(buffer, "{ \"init\": %d, \"subscribeto\": {\"result\": \"%s/%ld/result\" , \"diag\": \"%s/%ld/diag\" }, \"publishto\": {\"cmd\": \"%s/%ld/cmd\" } }",
(int)clients[s].distant,
mqtt->getClientID(),
clients[s].topic,
mqtt->getClientID(),
clients[s].topic,
mqtt->getClientID(),
clients[s].topic);
if (Diag::MQTT)
DIAG(F("MQTT channel setup message: [%s]"), buffer);
mqtt->publish(mqtt->getClientID(), buffer);
// on the cs side all is set and we declare that the cs is open for business
clients[s].open = true;
}
}
void MQTTInterface::loop()
{
if (!singleton)
return;
singleton->loop2();
}
bool showonce = false;
auto s = millis();
void loopPing(int interval)
{
auto c = millis();
if (c - s > 2000)
{
DIAG(F("loop alive")); // ping every 2 sec
s = c;
}
}
void MQTTInterface::loop2()
{
// loopPing(2000); // ping every 2 sec
// Connection impossible so just don't do anything
if (singleton->mqState == CONNECTION_FAILED)
{
if (!showonce)
{
DIAG(F("MQTT connection failed..."));
showonce = true;
}
return;
}
if (!mqttClient->connected())
{
DIAG(F("MQTT no connection trying to reconnect ..."));
connect();
}
if (!mqttClient->loop())
{
DIAG(F("mqttClient returned with error; state: %d"), mqttClient->state());
return;
};
checkSubscribers(subscriberQueue, clients);
inLoop(in, pool, outboundRing);
outLoop(mqttClient);
}

225
MQTTInterface.h Normal file
View File

@@ -0,0 +1,225 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#ifndef _MQTTInterface_h_
#define _MQTTInterface_h_
#if __has_include("config.h")
#include "config.h"
#else
#warning config.h not found. Using defaults from config.example.h
#include "config.example.h"
#endif
#include "defines.h"
#include <Arduino.h>
#include <Ethernet.h>
#include <Dns.h>
#include <PubSubClient.h>
#include "DCCEXParser.h"
#include "Queue.h"
#include "ObjectPool.h"
// #include "MemoryFree.h"
#include "freeMemory.h"
#define MAXPAYLOAD 64 // max length of a payload recieved
#define MAXDOMAINLENGTH 32 // domain name length for the broker e.g. test.mosquitto.org
#define MAXTBUF 64 //!< max length of the buffer for building the topic name ;to be checked
#define MAXTMSG 64 //!< max length of the messages for a topic ;to be checked PROGMEM ?
#define MAXTSTR 32 //!< max length of a topic string
#define MAXCONNECTID 32 // broker connection id length incl possible prefixes
#define CLIENTIDSIZE 6 // max length of the clientid used for connection to the broker
#define MAXRECONNECT 5 // reconnection tries before final failure
#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers
#define OUT_BOUND_SIZE 128 // Size of the RingStream used to provide results from the parser and publish
#define MAX_POOL_SIZE 16 // recieved command store size
#define MAX_CALLBACKS 10
// extern int freeMemory();
struct MQTTBroker
{
int port;
IPAddress ip;
const FSH *domain = nullptr;
const FSH *user = nullptr;
const FSH *pwd = nullptr;
const FSH *prefix = nullptr;
byte cType; // connection type to identify valid params
IPAddress resovleBroker(const FSH *d)
{
DNSClient dns;
IPAddress bip;
char domain[MAXDOMAINLENGTH];
strcpy_P(domain, (const char *)d);
dns.begin(Ethernet.dnsServerIP());
if (dns.getHostByName(domain, bip) == 1)
{
DIAG(F("[%d] MQTT Broker: %s = %d.%d.%d.%d"), freeMemory(), domain, bip[0], bip[1], bip[2], bip[3]);
}
else
{
DIAG(F("MQTT Dns lookup for %s failed"), domain);
}
return bip;
}
// all boils down to the ip address type = 1 without user authentication 2 with user authentication
// no ssl support !
// port & ip address
MQTTBroker(int p, IPAddress i) : port(p), ip(i), cType(1){};
// port & domain name
MQTTBroker(int p, const FSH *d) : port(p), domain(d), cType(1)
{
ip = resovleBroker(d);
};
// port & ip & prefix
MQTTBroker(int p, IPAddress i, const FSH *pfix) : port(p), ip(i), prefix(pfix), cType(1){};
// port & domain & prefix
MQTTBroker(int p, const FSH *d, const FSH *pfix) : port(p), domain(d), prefix(pfix), cType(1)
{
ip = resovleBroker(d);
};
// port & ip & user & pwd
MQTTBroker(int p, IPAddress i, const FSH *uid, const FSH *pass) : port(p), ip(i), user(uid), pwd(pass), cType(2){};
// port & domain & user & pwd
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass) : port(p), domain(d), user(uid), pwd(pass), cType(2)
{
ip = resovleBroker(d);
};
// port & ip & user & pwd & prefix
MQTTBroker(int p, IPAddress i, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), ip(i), user(uid), pwd(pass), prefix(pfix), cType(2){};
// port & domain & user & pwd & prefix
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), domain(d), user(uid), pwd(pass), prefix(pfix), cType(2)
{
ip = resovleBroker(d);
};
};
/**
* @brief dcc-ex command as recieved via MQ
*
*/
typedef struct csmsg_t
{
char cmd[MAXPAYLOAD]; // recieved command message
byte mqsocket; // from which mqsocket / subscriberid
} csmsg_t;
typedef struct csmqttclient_t
{
int distant; // random int number recieved from the subscriber
byte mqsocket; // mqtt socket = subscriberid provided by the cs
long topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands
bool open; // true as soon as we have send the id to the mq broker for the client to pickup
} csmqttclient_t;
enum MQTTInterfaceState
{
INIT,
CONFIGURED, // server/client objects set
CONNECTED, // mqtt broker is connected
CONNECTION_FAILED // Impossible to get the connection set after MAXRECONNECT tries
};
class MQTTInterface
{
private:
// Methods
MQTTInterface();
MQTTInterface(const MQTTInterface &); // non construction-copyable
MQTTInterface &operator=(const MQTTInterface &); // non copyable
void setup(const FSH *id, MQTTBroker *broker); // instantiates the broker
void connect(); // (re)connects to the broker
bool setupNetwork(); // sets up the network connection for the PubSub system
void loop2();
// Members
static MQTTInterface *singleton; // unique instance of the MQTTInterface object
EthernetClient ethClient; // TCP Client object for the MQ Connection
byte mac[6]; // simulated mac address
IPAddress server; // MQTT server object
MQTTBroker *broker; // Broker configuration object as set in config.h
ObjectPool<csmsg_t, MAXPOOLSIZE> pool; // Pool of commands recieved for the CS
Queue<int> in; // Queue of indexes into the pool according to incomming cmds
Queue<int> subscriberQueue; // Queue for incomming subscribers; push the subscriber into the queue for setup in a loop cycle
char clientID[(CLIENTIDSIZE * 2) + 1]; // unique ID of the commandstation; not to confused with the connectionID
csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients
char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker
byte subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection
byte activeSubscriber = 0; // if its 0 no active Subscriber; set as soon as we recieve a command of go into processing on the CS
bool connected = false; // set to true if the ethernet connection is available
MQTTInterfaceState mqState = INIT; // Status of the MQBroker connection
RingStream *outboundRing; // Buffer for collecting the results from the command parser
PubSubClient *mqttClient; // PubSub Endpoint for data exchange
public:
static MQTTInterface *get() noexcept { return singleton;}
boolean subscribe(const char *topic);
void publish(const char *topic, const char *payload);
ObjectPool<csmsg_t, MAXPOOLSIZE> *getPool() { return &pool; };
Queue<int> *getIncomming() { return &in; };
Queue<int> *getSubscriptionQueue() { return &subscriberQueue; };
MQTTInterfaceState getState() { return mqState; };
byte getActive() { return activeSubscriber; };
void setActive(byte mqSocket) { activeSubscriber = mqSocket; };
char *getClientID() { return clientID; };
uint8_t getClientSize() { return subscriberid; };
// initalized to 0 so that the first id comming back is 1
// index 0 in the clients array is not used therefore
uint8_t obtainSubscriberID()
{
if (subscriberid == MAXMQTTCONNECTIONS)
{
return 0; // no more subscriber id available
}
return (++subscriberid);
}
csmqttclient_t *getClients() { return clients; };
RingStream *getRingStream() { return outboundRing; };
static void setup();
static void loop();
~MQTTInterface() = default;
};
#endif

View File

@@ -25,7 +25,7 @@
// Arduino standard Motor Shield
#define STANDARD_MOTOR_SHIELD F("STANDARD_MOTOR_SHIELD"), \
new MotorDriver(3, 12, UNUSED_PIN, UNUSED_PIN, A0, 2.99, 2000, UNUSED_PIN), \
new MotorDriver(11, 13, UNUSED_PIN, UNUSED_PIN, A1, 2.99, 2000, UNUSED_PIN)
new MotorDriver(11, 13, UNUSED_PIN, UNUSED_PIN, A2, 2.99, 2000, UNUSED_PIN)
// Pololu Motor Shield
#define POLOLU_MOTOR_SHIELD F("POLOLU_MOTOR_SHIELD"), \

110
ObjectPool.h Normal file
View File

@@ -0,0 +1,110 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#ifndef _ObjectPool_h_
#define _ObjectPool_h_
#include <DIAG.h>
#define MAXPOOLSIZE 32
template <typename T, int length>
class ObjectPool
{
// just make sure that we don't create a pool eating up all memory @compiletime
static_assert(length <= MAXPOOLSIZE);
struct item
{
T i;
bool free = true; // boolean 1 free i.e. i can be reused; 0 occupied
};
private:
item p[length]; // MAXPOOLSIZE items of struct item
const int size = length; // size of the pool
int findFreeIdx()
{ // find the first free index or return -1 if there is none
for (int i = 0; i < length; i++)
{
if (p[i].free)
{
return i;
}
}
return -1; // if we are here there is no free slot available
}
public:
int setItem(T i)
{ // add an item to the pool at a free slot
int idx = findFreeIdx();
if (idx != -1)
{
p[idx].i = i;
p[idx].free = false;
}
return idx;
}
/**
* @brief returns the slot for an object to the pool i.e. frees the slot for reuse of the data member and
* clears out the memory
*
* @param idx
* @return true if the return is ok
* @return false otherwise
*/
bool returnItem(int idx)
{ // clear item at pool index idx
if (idx > size)
{ // can't return an item outside of the pool size; returns false;
return false;
}
memset(&p[idx].i, 0, sizeof(T)); // clear out the memory but keep the allocation for reuse
p[idx].free = true;
return true; // set the free flag
}
/**
* @brief Obtain a pool item
* @note This should only be used for debugging.
* It allows to change actually the content of the pool item where this should only be allowed for the setItem method.
* @param idx Index of the pool item to retrieve
* @param state State of the pool item ( 1 available, 0 occupied)
* @return T* returns the pointer to the pool item
*/
T *getItem(int idx, bool *state)
{
*state = p[idx].free;
return &p[idx].i;
}
int getSize()
{
return size;
}
ObjectPool() = default;
~ObjectPool() = default;
};
#endif

121
Queue.h Normal file
View File

@@ -0,0 +1,121 @@
/*
* © 2021, Gregor Baues, All rights reserved.
*
* This file is part of DCC-EX/CommandStation-EX
*
* This is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* It is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
*
*/
#ifndef _Queue_h_
#define _Queue_h_
#include <Arduino.h>
template<class T>
class Queue {
private:
int _front, _back, _count;
T *_data;
int _maxitems;
public:
Queue(int maxitems = 256) {
_front = 0;
_back = 0;
_count = 0;
_maxitems = maxitems;
_data = new T[maxitems + 1];
}
~Queue() {
delete[] _data;
}
inline int count();
inline int front();
inline int back();
void push(const T &item);
T peek();
T pop();
void clear();
};
template<class T>
inline int Queue<T>::count()
{
return _count;
}
template<class T>
inline int Queue<T>::front()
{
return _front;
}
template<class T>
inline int Queue<T>::back()
{
return _back;
}
template<class T>
void Queue<T>::push(const T &item)
{
if(_count < _maxitems) { // Drops out when full
_data[_back++]=item;
++_count;
// Check wrap around
if (_back > _maxitems)
_back -= (_maxitems + 1);
}
}
template<class T>
T Queue<T>::pop() {
if(_count <= 0) return T(); // Returns empty
else {
T result = _data[_front];
_front++;
--_count;
// Check wrap around
if (_front > _maxitems)
_front -= (_maxitems + 1);
return result;
}
}
template<class T>
T Queue<T>::peek() {
if(_count <= 0) return T(); // Returns empty
else return _data[_front];
}
template<class T>
void Queue<T>::clear()
{
_front = _back;
_count = 0;
}
#endif // _Queue_h_

View File

@@ -103,3 +103,43 @@ bool RingStream::commit() {
_buffer[_mark]=lowByte(_count);
return true; // commit worked
}
// grbba to be removed
// print the buffer one line for 10 chars in the array
// void RingStream::printBuffer() {
// int j = 0;
// for ( int k = 0; k < _len; k++ ) {
// if ( j == 10) {
// j = 0;
// Serial.println();
// }
// j++;
// Serial.print((char) _buffer[k]);
// Serial.print(" ");
// }
// }
// void RingStream::printInfo() {
// Serial.print("_len: "); Serial.println(_len);
// Serial.print("_pos_write: "); Serial.println(_pos_write);
// Serial.print("_pos_read: "); Serial.println(_pos_read);
// Serial.print("_overflow: "); Serial.println(_overflow);
// Serial.print("_mark: "); Serial.println(_mark);
// Serial.print("_count: ");Serial.println(_count);
// }
// void RingStream::reset(const uint16_t len)
// {
// _len=len;
// memset(_buffer,0,len);
// // _buffer=new byte[len];
// _pos_write=0;
// _pos_read=0;
// _buffer[0]=0;
// _overflow=false;
// _mark=0;
// _count=0;
// }
// grbba to be removed

View File

@@ -21,10 +21,12 @@
#include <Arduino.h>
// template <size_t S>
class RingStream : public Print {
public:
RingStream( const uint16_t len);
~RingStream() = default;
virtual size_t write(uint8_t b);
using Print::write;
@@ -34,15 +36,27 @@ class RingStream : public Print {
void mark(uint8_t b);
bool commit();
uint8_t peekTargetMark();
int size() {return _len;}
byte *getBuffer() { return _buffer; }
// grbba to be removed
// void printBuffer();
// void printInfo();
// void reset(const uint16_t len);
// grbba to be removed
int getLen() { return _len; };
private:
int _len;
int _pos_write;
// int _len = S;
int _pos_write ;
int _pos_read;
bool _overflow;
int _mark;
int _count;
byte * _buffer;
// byte _buffer[S];
byte *_buffer;
};
#endif

View File

@@ -35,6 +35,7 @@ bool Diag::WIFI=false;
bool Diag::WITHROTTLE=false;
bool Diag::ETHERNET=false;
bool Diag::LCN=false;
bool Diag::MQTT=false;
void StringFormatter::diag( const FSH* input...) {

View File

@@ -34,6 +34,7 @@ class Diag {
static bool WITHROTTLE;
static bool ETHERNET;
static bool LCN;
static bool MQTT;
};

View File

@@ -37,7 +37,7 @@ The configuration file for DCC-EX Command Station
// NOTE: Only supported on Arduino Mega
// Set to false if you not even want it on the Arduino Mega
//
#define ENABLE_WIFI true
// #define ENABLE_WIFI true
/////////////////////////////////////////////////////////////////////////////////////
//
@@ -93,6 +93,44 @@ The configuration file for DCC-EX Command Station
//#define IP_ADDRESS { 192, 168, 1, 200 }
// ENABLE_MQTT: if set to true you have to have an Arduino Ethernet card (wired). This
// is not for Wifi. You will need the Arduino Ethernet library as well as the PubSub
// library from <add link here> or get via the libray manager either from the IDE
// or PIO
// The following is only needed if the Broker requires it. cf broker descriptions below
#define MQTT_USER "your broker user name"
#define MQTT_PWD "your broker passwd"
#define MQTT_PREFIX "prefix if required by the broker"
// UNCOMMENT THE FOLLOWING LINE TO ENABLE MQTT
#define ENABLE_MQTT true
// Set the used broker to one of the configurations from MQTTBrokers.h where some
// public freely avaiable brokers are configured
// DEFINE THE MQTT BROKER BELOW ACCORDING TO THE FOLLOWING TABLE:
//
// DCCEX_MQTT_BROKER : DCCEX Team best effort operated MQTT broker; pls apply for user/pwd on discord in the mqtt channel if you want to try it
// DCCEX_MOSQUITTO : Mosquitto.org public test broker no user / pwd required so anyone can subscribe/publish to any topic here; good for testing only
// DCCEX_HIVEMQ : Provided by HiveMQ; Public no user / pwd required
// |
// +-----------------------v
#define CSMQTTBROKER DCCEX_MOSQUITTO
// --------------------------
// CUSTOMIZED EXAMPLE
// Configuration for a broker installed on a machine on you home netowrk where the IP address of the machine runing the broker
// is 192.168.0.2 and requires user authentication. The uid ad pwd are set in the config.h file
// Port IPAddress Username (opt) Password(opt) Prefix (opt)
// #define MY_PERSONAL_BROKER F("MYBROKERMQ"), new MQTTBroker( 1883, {192, 168, 0, 2}, F("username"), F("password"), F("prefix-if-required"))
//
// If you have access to a broker on the internet replace the IPAddress by F("my-broker-domain-name")
// -------------------------
/////////////////////////////////////////////////////////////////////////////////////
//
// DEFINE LCD SCREEN USAGE BY THE BASE STATION

122
csexpatch.sh Normal file
View File

@@ -0,0 +1,122 @@
#!/bin/bash
# Files to be added to the CS as is
# Session.cpp
# Session.h
# Diag.cpp
# Queue.cpp
# Queue.h
# patches to apply
# file / "marker" / "replace with marker + target" / 0 marker before 1 marker after
patch2=(WifiInboundHandler.cpp "runningClientId);" "Connection::type = _WIFI; Connection::id = runningClientId;" 0)
#main include of the class for handling the CLI session
patch3=(DCCEX.h "#define DCCEX_h" "\n#include \"Session.h\"" 0)
#added testing if the motoshield has been started and thus the Waveform gen is running
patch7=(DCCWaveform.cpp "progTripValue=0;" "bool DCCWaveform::running=false;" 0)
patch13=(DCCWaveform.cpp "interruptHandler);" "\nrunning=true;" 0)
patch8=(DCCWaveform.h "public:" "\nstatic bool isRunning() { return running; }" 0)
patch9=(DCCWaveform.h "private:" "\nstatic bool running;" 0)
#definitions needed for handling latching i.e. sending the diag output to the currentmy 'active' connection
#prepared for WiFi but that is not implemented
patch10=(DIAG.h "StringFormatter::lcd" "\nenum Transport { _WIFI, _ETHERNET}; \
\nstruct Connection { static Transport type; static byte id;}; \
\nstruct Latch { static Transport type; static byte id;};" 0)
#Ethernet Interface changes to get to the connection for sending information to the CLI
patch11=(EthernetInterface.h "loop();" "\nbool isConnected() { return connected; };\
\nstatic EthernetInterface *get() { return singleton; };\
\nEthernetClient *getClient(int socket) { return \&clients[socket]; };" 0)
patch12=(EthernetInterface.cpp "socket,buffer);" "\nConnection::type = _ETHERNET; Connection::id = socket;" 0)
#Adding a) the LATCH diagnostic command to the parseD; allowing to send set the diag output to the active ethernet
#connection; DOes not work for WiFi and b) handling of the atCommandCallback piggy backing the + command so need check
#if the Waveform gen has statred as otherwise we try to poweroff a non exisiting motorshield
patch4=(DCCEXParser.cpp "26133;" "\nconst int16_t HASH_KEYWORD_LATCH = 1618;" 0)
patch5=(DCCEXParser.cpp "(atCommandCallback) {" "\nif (DCCWaveform::isRunning()) {" 0)
patch6=(DCCEXParser.cpp "progTrack.setPowerMode(POWERMODE::OFF);" "\n}" 0)
patch14=(DCCEXParser.cpp "case HASH_KEYWORD_CABS:" "\n case HASH_KEYWORD_LATCH:\
\n Diag::LATCH = onOff; \
\n Latch::type = Connection::type; \
\n Latch::id = Connection::id; \
\n return true; \n " 1 )
#StringFormatter : adding things needed for Latching the Wifi or Ethernet connection to reciev the diag output
#remove a bracket on line 47 which will be added again in patch 17; If that is not done we end up with one
#bracket too much; Brittle and prone to issues as they change stuff but so far the best i can get
patch1=(StringFormatter.h "LCN;" "static bool LATCH;" 0)
sed -i -e '47d' StringFormatter.cpp
patch15=(StringFormatter.cpp "LCN=false;" "\nbool Diag::LATCH=false;" 0)
patch16=(StringFormatter.cpp "if (!diagSerial) return;" "\n#if ETHERNET_ON == true || WIFI_ON == true \
\n auto t = diagSerial;\
\n if (Diag::LATCH)\
\n {\
\n switch (Latch::type)\
\n {\
\n case _ETHERNET:\
\n {\
\n#if ETHERNET_ON == true\
\n auto i = EthernetInterface::get();\
\n auto s = i->getClient(Latch::id); \
\n if (s->connected())\
\n { \
\n diagSerial = s;\
\n }\
\n#endif\
\n break;\
\n }\
\n case _WIFI:\
\n {\
\n DIAG(F(\"Latch on Wifi is not possible for now ...\"));\
\n break;\
\n }\
\n }\
\n }\
\n#endif\n" 1)
patch17=(StringFormatter.cpp "void StringFormatter::lcd" "\n#if ETHERNET_ON == true || WIFI_ON == true \
\n if (Diag::LATCH)\
\n {\
\n diagSerial = t;\
\n }\
\n#endif\n}\n" 1)
patch=(patch1 patch2 patch3 patch4 patch5 patch6 patch7 patch8 patch9 patch10 patch11 patch12 patch13 patch14 patch15 patch16 patch17)
# patch=(patch17)
declare -n elmv1
for elmv1 in "${patch[@]}"; do
file="${elmv1[0]}"
marker="${elmv1[1]}"
markerpos="${elmv1[3]}"
if [ $markerpos = 1 ]
then
target="${elmv1[2]} $marker"
else
target="$marker ${elmv1[2]}"
fi
echo $marker
# echo $target
echo $file
grep -q $marker $file
if [ $? -eq 0 ]
then
echo "Patching $file with $marker --> $target ..."
sed -i "s/$marker/$target/" $file
else
echo "Patching $file failed."
exit 1
fi
done

View File

@@ -33,13 +33,24 @@
#endif
#if ENABLE_ETHERNET && (defined(ARDUINO_AVR_MEGA) || defined(ARDUINO_AVR_MEGA2560) || defined(ARDUINO_SAMD_ZERO) || defined(TEENSYDUINO))
#define ETHERNET_ON true
#define ETHERNET_ON true
#else
#define ETHERNET_ON false
#define ETHERNET_ON false
#endif
// MQTT handles ethernet on it's own
#if ENABLE_MQTT && (defined(ARDUINO_AVR_MEGA) || defined(ARDUINO_AVR_MEGA2560) || defined(ARDUINO_SAMD_ZERO) || defined(TEENSYDUINO))
#if ENABLE_ETHERNET
#error Ethernet and MQTT can not be enabled simultaneaously
#elif ENABLE_WIFI
#error WIFI and MQTT can not be enabled simultaneaously
#else
#define MQTT_ON true
#endif
#endif
#if WIFI_ON && ETHERNET_ON
#error Command Station does not support WIFI and ETHERNET at the same time.
#error Command Station does not support WIFI and ETHERNET at the same time.
#endif
////////////////////////////////////////////////////////////////////////////////

View File

@@ -23,7 +23,7 @@
// thanks go to https://github.com/mpflaga/Arduino-MemoryFree
#if defined(__arm__)
extern "C" char* sbrk(int);
extern "C" char *sbrk(int);
#elif defined(__AVR__)
extern char *__brkval;
extern char *__malloc_heap_start;
@@ -31,14 +31,16 @@ extern char *__malloc_heap_start;
#error Unsupported board type
#endif
static volatile int minimum_free_memory = __INT_MAX__;
#if !defined(__IMXRT1062__)
static inline int freeMemory() {
inline int freeMemory()
// static inline int freeMemory()
{
char top;
#if defined(__arm__)
return &top - reinterpret_cast<char*>(sbrk(0));
return &top - reinterpret_cast<char *>(sbrk(0));
#elif defined(__AVR__)
return __brkval ? &top - __brkval : &top - __malloc_heap_start;
#else
@@ -47,7 +49,8 @@ static inline int freeMemory() {
}
// Return low memory value.
int minimumFreeMemory() {
int minimumFreeMemory()
{
byte sreg_save = SREG;
noInterrupts(); // Disable interrupts
int retval = minimum_free_memory;
@@ -57,34 +60,36 @@ int minimumFreeMemory() {
#else
#if defined(ARDUINO_TEENSY40)
static const unsigned DTCM_START = 0x20000000UL;
static const unsigned OCRAM_START = 0x20200000UL;
static const unsigned OCRAM_SIZE = 512;
static const unsigned FLASH_SIZE = 1984;
static const unsigned DTCM_START = 0x20000000UL;
static const unsigned OCRAM_START = 0x20200000UL;
static const unsigned OCRAM_SIZE = 512;
static const unsigned FLASH_SIZE = 1984;
#elif defined(ARDUINO_TEENSY41)
static const unsigned DTCM_START = 0x20000000UL;
static const unsigned OCRAM_START = 0x20200000UL;
static const unsigned OCRAM_SIZE = 512;
static const unsigned FLASH_SIZE = 7936;
#if TEENSYDUINO>151
extern "C" uint8_t external_psram_size;
static const unsigned DTCM_START = 0x20000000UL;
static const unsigned OCRAM_START = 0x20200000UL;
static const unsigned OCRAM_SIZE = 512;
static const unsigned FLASH_SIZE = 7936;
#if TEENSYDUINO > 151
extern "C" uint8_t external_psram_size;
#endif
#endif
static inline int freeMemory() {
static inline int freeMemory()
{
extern unsigned long _ebss;
extern unsigned long _sdata;
extern unsigned long _estack;
const unsigned DTCM_START = 0x20000000UL;
unsigned dtcm = (unsigned)&_estack - DTCM_START;
unsigned stackinuse = (unsigned) &_estack - (unsigned) __builtin_frame_address(0);
unsigned stackinuse = (unsigned)&_estack - (unsigned)__builtin_frame_address(0);
unsigned varsinuse = (unsigned)&_ebss - (unsigned)&_sdata;
unsigned freemem = dtcm - (stackinuse + varsinuse);
return freemem;
}
// Return low memory value.
int minimumFreeMemory() {
int minimumFreeMemory()
{
//byte sreg_save = SREG;
//noInterrupts(); // Disable interrupts
int retval = minimum_free_memory;
@@ -93,19 +98,20 @@ int minimumFreeMemory() {
}
#endif
// Update low ram level. Allow for extra bytes to be specified
// by estimation or inspection, that may be used by other
// by estimation or inspection, that may be used by other
// called subroutines. Must be called with interrupts disabled.
//
//
// Although __brkval may go up and down as heap memory is allocated
// and freed, this function records only the worst case encountered.
// So even if all of the heap is freed, the reported minimum free
// So even if all of the heap is freed, the reported minimum free
// memory will not increase.
//
void updateMinimumFreeMemory(unsigned char extraBytes) {
int spare = freeMemory()-extraBytes;
if (spare < 0) spare = 0;
if (spare < minimum_free_memory) minimum_free_memory = spare;
void updateMinimumFreeMemory(unsigned char extraBytes)
{
int spare = freeMemory() - extraBytes;
if (spare < 0)
spare = 0;
if (spare < minimum_free_memory)
minimum_free_memory = spare;
}

View File

@@ -20,6 +20,8 @@
#ifndef freeMemory_h
#define freeMemory_h
void updateMinimumFreeMemory(unsigned char extraBytes=0);
int minimumFreeMemory();
int freeMemory();
#endif

View File

@@ -23,6 +23,7 @@ framework = arduino
upload_protocol = atmel-ice
lib_deps =
${env.lib_deps}
PubSubClient
SparkFun External EEPROM Arduino Library
monitor_speed = 115200
monitor_flags = --echo
@@ -33,6 +34,7 @@ board = megaatmega2560
framework = arduino
lib_deps =
${env.lib_deps}
PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -44,6 +46,7 @@ board = uno
framework = arduino
lib_deps =
${env.lib_deps}
PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -55,6 +58,7 @@ board = uno_wifi_rev2
framework = arduino
lib_deps =
${env.lib_deps}
PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200
@@ -67,6 +71,7 @@ board = uno
framework = arduino
lib_deps =
${env.lib_deps}
PubSubClient
arduino-libraries/Ethernet
SPI
monitor_speed = 115200

1
test/mpub.sh Executable file
View File

@@ -0,0 +1 @@
mosquitto_pub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -m "mi(255)"

1
test/msub.sh Executable file
View File

@@ -0,0 +1 @@
mosquitto_sub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -k 600