mirror of
https://github.com/DCC-EX/CommandStation-EX.git
synced 2025-07-28 18:03:45 +02:00
Compare commits
48 Commits
v5.4.14-Pr
...
MQTT
Author | SHA1 | Date | |
---|---|---|---|
|
e721303ed1 | ||
|
e742a8b120 | ||
|
1b687808cb | ||
|
21e19128f9 | ||
|
c30f1c7cbe | ||
|
083c73ebc3 | ||
|
9fcc69d273 | ||
|
e51279b202 | ||
|
3c89d713fd | ||
|
dc9368665d | ||
|
6b6ae4e904 | ||
|
a52e864a6f | ||
|
d72636b4ec | ||
|
a33b463c74 | ||
|
41ed2aeeeb | ||
|
a58bc63764 | ||
|
b59def8e7b | ||
|
be5ca00b7e | ||
|
b5520f13ba | ||
|
428628f6f0 | ||
|
c3abb0018d | ||
|
981453d399 | ||
|
8f2f052e2a | ||
|
598fb116a1 | ||
|
ce154abe94 | ||
|
6fd866d273 | ||
|
35d81cd848 | ||
|
508b1fcfce | ||
|
0b0744cc94 | ||
|
595b6bad93 | ||
|
c042240019 | ||
|
866833a19e | ||
|
851228fba6 | ||
|
6bd9e28be4 | ||
|
da85e4e245 | ||
|
a6a36b50e3 | ||
|
4efa260003 | ||
|
7442e3452e | ||
|
caaad92887 | ||
|
c740e25cc1 | ||
|
8acfdc6190 | ||
|
f48965a676 | ||
|
afc01f1967 | ||
|
8fc1470e4e | ||
|
33f2474c20 | ||
|
35c5e875d3 | ||
|
6166484783 | ||
|
4ab21294ce |
@@ -20,12 +20,20 @@
|
||||
#include "CommandDistributor.h"
|
||||
#include "WiThrottle.h"
|
||||
|
||||
DCCEXParser * CommandDistributor::parser=0;
|
||||
DCCEXParser *CommandDistributor::parser = 0;
|
||||
|
||||
void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * streamer) {
|
||||
if (buffer[0] == '<') {
|
||||
if (!parser) parser = new DCCEXParser();
|
||||
parser->parse(streamer, buffer, streamer);
|
||||
void CommandDistributor::parse(byte clientId, byte *buffer, RingStream *streamer)
|
||||
{
|
||||
if (buffer[0] == '<')
|
||||
{
|
||||
if (!parser)
|
||||
{
|
||||
parser = new DCCEXParser();
|
||||
}
|
||||
parser->parse(streamer, buffer, streamer);
|
||||
}
|
||||
else
|
||||
{
|
||||
WiThrottle::getThrottle(clientId)->parse(streamer, buffer);
|
||||
}
|
||||
else WiThrottle::getThrottle(clientId)->parse(streamer, buffer);
|
||||
}
|
||||
|
@@ -1,29 +1,28 @@
|
||||
////////////////////////////////////////////////////////////////////////////////////
|
||||
// DCC-EX CommandStation-EX Please see https://DCC-EX.com
|
||||
// DCC-EX CommandStation-EX Please see https://DCC-EX.com
|
||||
//
|
||||
// This file is the main sketch for the Command Station.
|
||||
//
|
||||
// CONFIGURATION:
|
||||
//
|
||||
// CONFIGURATION:
|
||||
// Configuration is normally performed by editing a file called config.h.
|
||||
// This file is NOT shipped with the code so that if you pull a later version
|
||||
// of the code, your configuration will not be overwritten.
|
||||
//
|
||||
// If you used the automatic installer program, config.h will have been created automatically.
|
||||
//
|
||||
// To obtain a starting copy of config.h please copy the file config.example.h which is
|
||||
// shipped with the code and may be updated as new features are added.
|
||||
//
|
||||
//
|
||||
// To obtain a starting copy of config.h please copy the file config.example.h which is
|
||||
// shipped with the code and may be updated as new features are added.
|
||||
//
|
||||
// If config.h is not found, config.example.h will be used with all defaults.
|
||||
////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#if __has_include ( "config.h")
|
||||
#include "config.h"
|
||||
#if __has_include("config.h")
|
||||
#include "config.h"
|
||||
#else
|
||||
#warning config.h not found. Using defaults from config.example.h
|
||||
#include "config.example.h"
|
||||
#warning config.h not found. Using defaults from config.example.h
|
||||
#include "config.example.h"
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* © 2020,2021 Chris Harlow, Harald Barth, David Cutting,
|
||||
* Fred Decker, Gregor Baues, Anthony W - Dayton All rights reserved.
|
||||
@@ -43,10 +42,9 @@
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
#include "DCCEX.h"
|
||||
|
||||
// Create a serial command parser for the USB connection,
|
||||
// Create a serial command parser for the USB connection,
|
||||
// This supports JMRI or manual diagnostics and commands
|
||||
// to be issued from the USB serial console.
|
||||
DCCEXParser serialParser;
|
||||
@@ -58,14 +56,15 @@ void setup()
|
||||
// Responsibility 1: Start the usb connection for diagnostics
|
||||
// This is normally Serial but uses SerialUSB on a SAMD processor
|
||||
Serial.begin(115200);
|
||||
|
||||
CONDITIONAL_LCD_START {
|
||||
// This block is still executed for DIAGS if LCD not in use
|
||||
LCD(0,F("DCC++ EX v%S"),F(VERSION));
|
||||
LCD(1,F("Starting"));
|
||||
}
|
||||
|
||||
// Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi
|
||||
CONDITIONAL_LCD_START
|
||||
{
|
||||
// This block is still executed for DIAGS if LCD not in use
|
||||
LCD(0, F("DCC++ EX v%S"), F(VERSION));
|
||||
LCD(1, F("Starting"));
|
||||
}
|
||||
|
||||
// Start the WiFi interface on a MEGA, Uno cannot currently handle WiFi
|
||||
|
||||
#if WIFI_ON
|
||||
WifiInterface::setup(WIFI_SERIAL_LINK_SPEED, F(WIFI_SSID), F(WIFI_PASSWORD), F(WIFI_HOSTNAME), IP_PORT, WIFI_CHANNEL);
|
||||
@@ -75,6 +74,10 @@ void setup()
|
||||
EthernetInterface::setup();
|
||||
#endif // ETHERNET_ON
|
||||
|
||||
#if MQTT_ON
|
||||
MQTTInterface::setup();
|
||||
#endif
|
||||
|
||||
// Responsibility 3: Start the DCC engine.
|
||||
// Note: this provides DCC with two motor drivers, main and prog, which handle the motor shield(s)
|
||||
// Standard supported devices have pre-configured macros but custome hardware installations require
|
||||
@@ -82,25 +85,26 @@ void setup()
|
||||
|
||||
// STANDARD_MOTOR_SHIELD, POLOLU_MOTOR_SHIELD, FIREBOX_MK1, FIREBOX_MK1S are pre defined in MotorShields.h
|
||||
|
||||
|
||||
DCC::begin(MOTOR_SHIELD_TYPE);
|
||||
|
||||
#if defined(RMFT_ACTIVE)
|
||||
RMFT::begin();
|
||||
#endif
|
||||
DCC::begin(MOTOR_SHIELD_TYPE);
|
||||
|
||||
#if __has_include ( "mySetup.h")
|
||||
#define SETUP(cmd) serialParser.parse(F(cmd))
|
||||
#include "mySetup.h"
|
||||
#undef SETUP
|
||||
#endif
|
||||
#if defined(RMFT_ACTIVE)
|
||||
RMFT::begin();
|
||||
#endif
|
||||
|
||||
#if defined(LCN_SERIAL)
|
||||
LCN_SERIAL.begin(115200);
|
||||
LCN::init(LCN_SERIAL);
|
||||
#endif
|
||||
#if __has_include("mySetup.h")
|
||||
#define SETUP(cmd) serialParser.parse(F(cmd))
|
||||
#include "mySetup.h"
|
||||
#undef SETUP
|
||||
#endif
|
||||
|
||||
LCD(1,F("Ready"));
|
||||
#if defined(LCN_SERIAL)
|
||||
LCN_SERIAL.begin(115200);
|
||||
LCN::init(LCN_SERIAL);
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
LCD(1, F("Ready"));
|
||||
}
|
||||
|
||||
void loop()
|
||||
@@ -118,27 +122,32 @@ void loop()
|
||||
#if WIFI_ON
|
||||
WifiInterface::loop();
|
||||
#endif
|
||||
|
||||
#if ETHERNET_ON
|
||||
EthernetInterface::loop();
|
||||
#endif
|
||||
|
||||
#if defined(RMFT_ACTIVE)
|
||||
#if MQTT_ON
|
||||
MQTTInterface::loop();
|
||||
#endif
|
||||
|
||||
#if defined(RMFT_ACTIVE)
|
||||
RMFT::loop();
|
||||
#endif
|
||||
|
||||
#if defined(LCN_SERIAL)
|
||||
LCN::loop();
|
||||
#endif
|
||||
#if defined(LCN_SERIAL)
|
||||
LCN::loop();
|
||||
#endif
|
||||
|
||||
LCDDisplay::loop(); // ignored if LCD not in use
|
||||
|
||||
LCDDisplay::loop(); // ignored if LCD not in use
|
||||
|
||||
// Report any decrease in memory (will automatically trigger on first call)
|
||||
static int ramLowWatermark = __INT_MAX__; // replaced on first loop
|
||||
static int ramLowWatermark = __INT_MAX__; // replaced on first loop
|
||||
|
||||
int freeNow = minimumFreeMemory();
|
||||
if (freeNow < ramLowWatermark)
|
||||
{
|
||||
ramLowWatermark = freeNow;
|
||||
LCD(2,F("Free RAM=%5db"), ramLowWatermark);
|
||||
LCD(2, F("Free RAM=%5db"), ramLowWatermark);
|
||||
}
|
||||
}
|
||||
|
6
DCCEX.h
6
DCCEX.h
@@ -10,9 +10,15 @@
|
||||
#include "DCCEXParser.h"
|
||||
#include "version.h"
|
||||
#include "WifiInterface.h"
|
||||
|
||||
#if ETHERNET_ON == true
|
||||
#include "EthernetInterface.h"
|
||||
#endif
|
||||
|
||||
#if MQTT_ON == true
|
||||
#include "MQTTInterface.h"
|
||||
#endif
|
||||
|
||||
#include "LCD_Implementation.h"
|
||||
#include "LCN.h"
|
||||
#include "freeMemory.h"
|
||||
|
@@ -56,6 +56,7 @@ const int16_t HASH_KEYWORD_LCN = 15137;
|
||||
const int16_t HASH_KEYWORD_RESET = 26133;
|
||||
const int16_t HASH_KEYWORD_SPEED28 = -17064;
|
||||
const int16_t HASH_KEYWORD_SPEED128 = 25816;
|
||||
const int16_t HASH_KEYWORD_MQTT = 28220;
|
||||
|
||||
int16_t DCCEXParser::stashP[MAX_COMMAND_PARAMS];
|
||||
bool DCCEXParser::stashBusy;
|
||||
@@ -500,9 +501,11 @@ void DCCEXParser::parse(Print *stream, byte *com, RingStream * ringStream)
|
||||
case 's': // <s>
|
||||
StringFormatter::send(stream, F("<p%d>\n"), DCCWaveform::mainTrack.getPowerMode() == POWERMODE::ON);
|
||||
StringFormatter::send(stream, F("<iDCC-EX V-%S / %S / %S G-%S>\n"), F(VERSION), F(ARDUINO_TYPE), DCC::getMotorShieldName(), F(GITHUB_SHA));
|
||||
#ifndef MQTT_ON // the return can get really large depending on the #of items defined and the outputbuffer will overflow
|
||||
Turnout::printAll(stream); //send all Turnout states
|
||||
Output::printAll(stream); //send all Output states
|
||||
Sensor::printAll(stream); //send all Sensor states
|
||||
#endif
|
||||
// TODO Send stats of speed reminders table
|
||||
return;
|
||||
|
||||
@@ -802,6 +805,10 @@ bool DCCEXParser::parseD(Print *stream, int16_t params, int16_t p[])
|
||||
StringFormatter::send(stream, F("128 Speedsteps"));
|
||||
return true;
|
||||
|
||||
case HASH_KEYWORD_MQTT: // <D LCN ON/OFF>
|
||||
Diag::MQTT = onOff;
|
||||
return true;
|
||||
|
||||
default: // invalid/unknown
|
||||
break;
|
||||
}
|
||||
|
7
DIAG.h
7
DIAG.h
@@ -16,10 +16,15 @@
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef DIAG_h
|
||||
#define DIAG_h
|
||||
|
||||
#include "StringFormatter.h"
|
||||
#define DIAG StringFormatter::diag
|
||||
#include "DiagLogger.h"
|
||||
|
||||
// #define DIAG StringFormatter::diag // Std logging to serial only
|
||||
#define DIAG DiagLogger::get().diag // allows to add other log writers
|
||||
#define LCD StringFormatter::lcd
|
||||
|
||||
#endif
|
||||
|
63
DiagLogger.cpp
Normal file
63
DiagLogger.cpp
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#include "DiagLogger.h"
|
||||
|
||||
// DIAG.h the #define DIAG points to here ...
|
||||
// EthernetSetup , Wifisetup, etc can register a function to be called allowing the channel
|
||||
// to publish the diag info to
|
||||
// serial is default end enabled all the time
|
||||
|
||||
DiagLogger DiagLogger::singleton; // static instantiation;
|
||||
|
||||
void DiagLogger::addDiagWriter(DiagWriter l) {
|
||||
if ( registered == MAXWRITERS ) {
|
||||
Serial.println("Error: Max amount of writers exceeded.");
|
||||
return;
|
||||
}
|
||||
writers[registered] = l;
|
||||
registered++;
|
||||
}
|
||||
|
||||
|
||||
void DiagLogger::diag(const FSH *input,...)
|
||||
{
|
||||
|
||||
va_list args;
|
||||
va_start(args, input);
|
||||
|
||||
int len = 0;
|
||||
len += sprintf(&b1[len], "<* ");
|
||||
len += vsprintf_P(&b1[len], (const char *)input, args);
|
||||
len += sprintf(&b1[len], " *>\n");
|
||||
|
||||
if ( len >= 256 ) { Serial.print("ERROR : Diag Buffer overflow"); return; }
|
||||
// allways print to Serial
|
||||
Serial.print(b1);
|
||||
|
||||
// callback the other registered diag writers
|
||||
for (size_t i = 0; i < (size_t) registered; i++)
|
||||
{
|
||||
writers[i](b1, len);
|
||||
}
|
||||
|
||||
va_end(args);
|
||||
|
||||
}
|
59
DiagLogger.h
Normal file
59
DiagLogger.h
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef DiagLogger_h
|
||||
#define DiagLogger_h
|
||||
|
||||
#include "StringFormatter.h"
|
||||
|
||||
|
||||
#define MAXWRITERS 10
|
||||
typedef void (*DiagWriter)(const char *msg, const int length);
|
||||
|
||||
class DiagLogger
|
||||
{
|
||||
|
||||
private:
|
||||
// Methods
|
||||
DiagLogger() = default;
|
||||
DiagLogger(const DiagLogger &); // non construction-copyable
|
||||
DiagLogger &operator=(const DiagLogger &); // non copyable
|
||||
|
||||
// Members
|
||||
static DiagLogger singleton; // unique instance of the MQTTInterface object
|
||||
DiagWriter writers[MAXWRITERS];
|
||||
int registered = 0; // number of registered writers ( Serial is not counted as always used )
|
||||
char b1[256];
|
||||
|
||||
public:
|
||||
// Methods
|
||||
static DiagLogger &get() noexcept
|
||||
{ // return a reference to the unique instance
|
||||
return singleton;
|
||||
}
|
||||
|
||||
void diag(const FSH *input...);
|
||||
void addDiagWriter(DiagWriter l);
|
||||
~DiagLogger() = default;
|
||||
|
||||
// Members
|
||||
};
|
||||
|
||||
#endif
|
@@ -51,14 +51,26 @@ class EthernetInterface {
|
||||
|
||||
public:
|
||||
|
||||
static void setup();
|
||||
static void loop();
|
||||
|
||||
static void setup();
|
||||
static void loop();
|
||||
|
||||
bool isConnected() { return connected; };
|
||||
static EthernetInterface *get() { return singleton; };
|
||||
EthernetClient *getClient(int socket) { return &clients[socket]; };
|
||||
EthernetServer *getServer() { return server; };
|
||||
|
||||
~EthernetInterface() = default;
|
||||
|
||||
private:
|
||||
static EthernetInterface * singleton;
|
||||
bool connected;
|
||||
EthernetInterface();
|
||||
void loop2();
|
||||
|
||||
EthernetInterface();
|
||||
EthernetInterface(const EthernetInterface&); // non construction-copyable
|
||||
EthernetInterface& operator=( const EthernetInterface& ); // non copyable
|
||||
static EthernetInterface * singleton;
|
||||
bool connected;
|
||||
|
||||
void loop2();
|
||||
|
||||
EthernetServer * server;
|
||||
EthernetClient clients[MAX_SOCK_NUM]; // accept up to MAX_SOCK_NUM client connections at the same time; This depends on the chipset used on the Shield
|
||||
uint8_t buffer[MAX_ETH_BUFFER+1]; // buffer used by TCP for the recv
|
||||
|
42
MQTTBrokers.h
Normal file
42
MQTTBrokers.h
Normal file
@@ -0,0 +1,42 @@
|
||||
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
#ifndef _MQTTBrokers_h_
|
||||
#define _MQTTBrokers_h_
|
||||
|
||||
// Defines preconfigured mqtt broker configurations
|
||||
|
||||
// EthernetShields / Arduino do not support secure transport i.e. on either port 443 or 8883 for MQTTS on most broker installations
|
||||
// Once we support the ESP / Wifi as Transport medium we may get TLS capabilities for data in transit i.e. can use the 443/8883 ports
|
||||
|
||||
|
||||
#define MQPWD F(MQTT_PWD)
|
||||
#define MQUID F(MQTT_USER)
|
||||
#define MQPREFIX F(MQTT_PREFIX)
|
||||
|
||||
// Cloud server provided by the DccEX team for testing purposes; apply for a uid/pwd on discord
|
||||
#define DCCEX_MQTT_BROKER F("DccexMQ"), new MQTTBroker( 9883, F("dcclms.modelrailroad.ovh"), MQUID, MQPWD, MQPREFIX)
|
||||
// Mosquitto test server
|
||||
#define DCCEX_MOSQUITTO F("Mosquitto"), new MQTTBroker(1883, F("test.mosquitto.org"))
|
||||
// HiveMQ test server
|
||||
#define DCCEX_HIVEMQ F("HiveMQ"), new MQTTBroker(1883, F("broker.hivemq.com"))
|
||||
|
||||
|
||||
#endif
|
180
MQTTCallbackHandlers.cpp
Normal file
180
MQTTCallbackHandlers.cpp
Normal file
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#if __has_include("config.h")
|
||||
#include "config.h"
|
||||
#else
|
||||
#warning config.h not found. Using defaults from config.example.h
|
||||
#include "config.example.h"
|
||||
#endif
|
||||
#include "defines.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <limits.h>
|
||||
|
||||
#include "MQTTInterface.h"
|
||||
|
||||
|
||||
// Fwd decl for the callback handlers
|
||||
void mqttDCCEXCallback(MQTTInterface *mqtt, csmsg_t &tm);
|
||||
void mqttProtocolCallback(MQTTInterface *mqtt, csmsg_t &tm);
|
||||
void mqttMCallback(MQTTInterface *mqtt, csmsg_t &tm);
|
||||
|
||||
typedef void (*CallbackFunc)(MQTTInterface *mqtt, csmsg_t &tm);
|
||||
|
||||
template<class M, class N>
|
||||
struct CallbackFunction {
|
||||
M first;
|
||||
N second;
|
||||
};
|
||||
using CallbackFunctions = CallbackFunction<char, CallbackFunc>[MAX_CALLBACKS];
|
||||
|
||||
// lookup table for the protocol handle functions
|
||||
constexpr CallbackFunctions vec = {
|
||||
{'<', mqttDCCEXCallback},
|
||||
{'{', mqttProtocolCallback},
|
||||
{'m', mqttMCallback}
|
||||
};
|
||||
|
||||
long cantorEncode(long a, long b)
|
||||
{
|
||||
return (((a + b) * (a + b + 1)) / 2) + b;
|
||||
}
|
||||
|
||||
void cantorDecode(int32_t c, int *a, int *b)
|
||||
{
|
||||
int w = floor((sqrt(8 * c + 1) - 1) / 2);
|
||||
int t = (w * (w + 1)) / 2;
|
||||
*b = c - t;
|
||||
*a = w - *b;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief lookup of the proper function for < or { based commands
|
||||
*
|
||||
* @param c
|
||||
* @return CallbackFunc
|
||||
*/
|
||||
auto protocolDistributor(const char c) -> CallbackFunc {
|
||||
for (auto &&f : vec)
|
||||
{
|
||||
if (f.first == c)
|
||||
return f.second;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void protocolHandler(MQTTInterface *mqtt, csmsg_t &tm) {
|
||||
protocolDistributor(tm.cmd[0])(mqtt, tm);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Callback for handling 'm' MQTT Protocol commands (deprecated)
|
||||
* @deprecated to be replaced by '{' commands in simple JSON format
|
||||
*/
|
||||
void mqttMCallback(MQTTInterface *mqtt, csmsg_t &tm)
|
||||
{
|
||||
auto clients = mqtt->getClients();
|
||||
// DIAG(F("MQTT m - Callback"));
|
||||
switch (tm.cmd[1])
|
||||
{
|
||||
case 'i': // Inital handshake message to create the tunnel
|
||||
{
|
||||
char buffer[MAXPAYLOAD];
|
||||
char *tmp = tm.cmd + 3;
|
||||
auto length = strlen(tm.cmd);
|
||||
strlcpy(buffer, tmp, length);
|
||||
buffer[length - 4] = '\0';
|
||||
|
||||
// DIAG(F("MQTT buffer %s - %s - %s - %d"), tm.cmd, tmp, buffer, length);
|
||||
|
||||
auto distantid = strtol(buffer, NULL, 10);
|
||||
|
||||
if (errno == ERANGE || distantid > UCHAR_MAX)
|
||||
{
|
||||
DIAG(F("MQTT Invalid Handshake ID; must be between 0 and 255"));
|
||||
return;
|
||||
}
|
||||
if (distantid == 0)
|
||||
{
|
||||
DIAG(F("MQTT Invalid Handshake ID"));
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a new MQTT client
|
||||
|
||||
auto subscriberid = mqtt->obtainSubscriberID(); // to be used in the parsing process for the clientid in the ringbuffer
|
||||
|
||||
if (subscriberid == 0)
|
||||
{
|
||||
DIAG(F("MQTT no more connections are available"));
|
||||
return;
|
||||
}
|
||||
|
||||
auto topicid = cantorEncode((long)subscriberid, (long)distantid);
|
||||
DIAG(F("MQTT Client connected : subscriber [%d] : distant [%d] : topic: [%d]"), subscriberid, (int)distantid, topicid);
|
||||
|
||||
// extract the number delivered from & initalize the new mqtt client object
|
||||
clients[subscriberid] = {(int)distantid, subscriberid, topicid, false}; // set to true once the channels are available
|
||||
|
||||
auto sq = mqtt->getSubscriptionQueue();
|
||||
sq->push(subscriberid);
|
||||
|
||||
return;
|
||||
}
|
||||
default:
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Callback for handling '{' MQTT Protocol commands
|
||||
*/
|
||||
void mqttProtocolCallback(MQTTInterface *mqtt, csmsg_t &tm)
|
||||
{
|
||||
// DIAG(F("MQTT Protocol - Callback"));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Callback for handling '<' DccEX commands
|
||||
*/
|
||||
void mqttDCCEXCallback(MQTTInterface *mqtt, csmsg_t &tm)
|
||||
// void mqttDCCEXCallback(MQTTInterface *mqtt, char *topic, char *payload, unsigned int length)
|
||||
{
|
||||
// DIAG(F("MQTT DCCEX - Callback"));
|
||||
if (!tm.mqsocket)
|
||||
{
|
||||
DIAG(F("MQTT Can't identify sender; command send on wrong topic"));
|
||||
return;
|
||||
}
|
||||
int idx = mqtt->getPool()->setItem(tm); // Add the recieved command to the pool
|
||||
if (idx == -1)
|
||||
{
|
||||
DIAG(F("MQTT Command pool full. Could not handle recieved command."));
|
||||
return;
|
||||
}
|
||||
mqtt->getIncomming()->push(idx); // Add the index of the pool item to the incomming queue
|
||||
|
||||
// don't show the topic as we would have to save it also just like the payload
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT Message arrived: [%s]"), tm.cmd);
|
||||
}
|
540
MQTTInterface.cpp
Normal file
540
MQTTInterface.cpp
Normal file
@@ -0,0 +1,540 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#if __has_include("config.h")
|
||||
#include "config.h"
|
||||
#else
|
||||
#warning config.h not found. Using defaults from config.example.h
|
||||
#include "config.example.h"
|
||||
#endif
|
||||
#include "defines.h"
|
||||
|
||||
#include <errno.h>
|
||||
#include <limits.h>
|
||||
|
||||
#include "MQTTInterface.h"
|
||||
#include "MQTTBrokers.h"
|
||||
#include "DCCTimer.h"
|
||||
#include "CommandDistributor.h"
|
||||
#include "freeMemory.h"
|
||||
|
||||
MQTTInterface *MQTTInterface::singleton = NULL;
|
||||
|
||||
void protocolHandler(MQTTInterface *mqtt, csmsg_t &tm);
|
||||
|
||||
/**
|
||||
* @brief callback used from DIAG to send diag messages to the broker / clients
|
||||
*
|
||||
* @param msg
|
||||
* @param length
|
||||
*/
|
||||
void mqttDiag(const char *msg, const int length)
|
||||
{
|
||||
|
||||
if (MQTTInterface::get()->getState() == CONNECTED)
|
||||
{
|
||||
// if not connected all goes only to Serial;
|
||||
// if CONNECTED we have at least the root topic subscribed to
|
||||
auto mqSocket = MQTTInterface::get()->getActive();
|
||||
char topic[MAXTSTR];
|
||||
memset(topic, 0, MAXTSTR);
|
||||
|
||||
if (mqSocket == 0)
|
||||
{ // send to root topic of the commandstation as it doen't concern a specific client at this point
|
||||
sprintf(topic, "%s", MQTTInterface::get()->getClientID());
|
||||
}
|
||||
else
|
||||
{
|
||||
sprintf(topic, "%s/%ld/diag", MQTTInterface::get()->getClientID(), MQTTInterface::get()->getClients()[mqSocket].topic);
|
||||
}
|
||||
// Serial.print(" ---- MQTT pub to: ");
|
||||
// Serial.print(topic);
|
||||
// Serial.print(" Msg: ");
|
||||
// Serial.print(msg);
|
||||
MQTTInterface::get()->publish(topic, msg);
|
||||
}
|
||||
}
|
||||
|
||||
void MQTTInterface::setup()
|
||||
{
|
||||
DiagLogger::get().addDiagWriter(mqttDiag);
|
||||
singleton = new MQTTInterface();
|
||||
|
||||
if (!singleton->connected)
|
||||
{
|
||||
singleton = NULL;
|
||||
}
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT Interface instance: [%x] - Setup done"), singleton);
|
||||
};
|
||||
|
||||
MQTTInterface::MQTTInterface()
|
||||
{
|
||||
|
||||
this->connected = this->setupNetwork();
|
||||
if (!this->connected)
|
||||
{
|
||||
DIAG(F("Network setup failed"));
|
||||
}
|
||||
else
|
||||
{
|
||||
this->setup(CSMQTTBROKER);
|
||||
}
|
||||
this->outboundRing = new RingStream(OUT_BOUND_SIZE);
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief determine the mqsocket from a topic
|
||||
*
|
||||
* @return byte the mqsocketid for the message recieved
|
||||
*/
|
||||
byte senderMqSocket(MQTTInterface *mqtt, char *topic)
|
||||
{
|
||||
// list of all available clients from which we can determine the mqsocket
|
||||
auto clients = mqtt->getClients();
|
||||
const char s[2] = "/"; // topic delimiter is /
|
||||
char *token;
|
||||
byte mqsocket = 0;
|
||||
|
||||
/* get the first token = ClientID */
|
||||
token = strtok(topic, s);
|
||||
/* get the second token = topicID */
|
||||
token = strtok(NULL, s);
|
||||
if (token != NULL) // topic didn't contain any topicID
|
||||
{
|
||||
auto topicid = atoi(token);
|
||||
// verify that there is a MQTT client with that topic id connected
|
||||
// check in the array of clients if we have one with the topicid
|
||||
// start at 1 as 0 is not allocated as mqsocket
|
||||
for (int i = 1; i <= mqtt->getClientSize(); i++)
|
||||
{
|
||||
if (clients[i].topic == topicid)
|
||||
{
|
||||
mqsocket = i;
|
||||
break; // we are done
|
||||
}
|
||||
}
|
||||
// if we get here we have a topic but no associated client
|
||||
}
|
||||
// if mqsocket == 0 here we haven't got any Id in the topic string
|
||||
return mqsocket;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief MQTT Interface callback recieving all incomming messages from the PubSubClient
|
||||
*
|
||||
* @param topic
|
||||
* @param payload
|
||||
* @param length
|
||||
*/
|
||||
void mqttCallback(char *topic, byte *pld, unsigned int length)
|
||||
{
|
||||
// it's a bounced diag message ignore in all cases
|
||||
// but it should not be necessary here .. that means the active mqsocket is wrong when sending to diag message
|
||||
if ((pld[0] == '<') && (pld[1] == '*'))
|
||||
{
|
||||
return;
|
||||
}
|
||||
// ignore anything above the PAYLOAD limit of 64 char which should be enough
|
||||
// in general things rejected here is the bounce of the inital messages setting up the chnanel etc
|
||||
if (length >= MAXPAYLOAD)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MQTTInterface *mqtt = MQTTInterface::get();
|
||||
csmsg_t tm; // topic message
|
||||
|
||||
// FOR DIAGS and MQTT ON in the callback we need to copy the payload buffer
|
||||
// as during the publish of the diag messages the original payload gets destroyed
|
||||
// so we setup the csmsg_t now to save yet another buffer
|
||||
// if tm not used it will just be discarded at the end of the function call
|
||||
|
||||
memset(tm.cmd, 0, MAXPAYLOAD); // Clean up the cmd buffer - should not be necessary
|
||||
strlcpy(tm.cmd, (char *)pld, length + 1); // Message payload
|
||||
tm.mqsocket = senderMqSocket(mqtt, topic); // On which socket did we recieve the mq message
|
||||
mqtt->setActive(tm.mqsocket); // connection from where we recieved the command is active now
|
||||
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT Callback:[%s/%d] [%s] [%d] on interface [%x]"), topic, tm.mqsocket, tm.cmd, length, mqtt);
|
||||
|
||||
protocolHandler(mqtt, tm);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Copies an byte array to a hex representation as string; used for generating the unique Arduino ID
|
||||
*
|
||||
* @param array array containing bytes
|
||||
* @param len length of the array
|
||||
* @param buffer buffer to which the string will be written; make sure the buffer has appropriate length
|
||||
*/
|
||||
static void array_to_string(byte array[], unsigned int len, char buffer[])
|
||||
{
|
||||
for (unsigned int i = 0; i < len; i++)
|
||||
{
|
||||
byte nib1 = (array[i] >> 4) & 0x0F;
|
||||
byte nib2 = (array[i] >> 0) & 0x0F;
|
||||
buffer[i * 2 + 0] = nib1 < 0xA ? '0' + nib1 : 'A' + nib1 - 0xA;
|
||||
buffer[i * 2 + 1] = nib2 < 0xA ? '0' + nib2 : 'A' + nib2 - 0xA;
|
||||
}
|
||||
buffer[len * 2] = '\0';
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Connect to the MQTT broker; Parameters for this function are defined in
|
||||
* like the motoshield configurations there are mqtt broker configurations in config.h
|
||||
*
|
||||
* @param id Name provided to the broker configuration
|
||||
* @param b MQTT broker object containing the main configuration parameters
|
||||
*/
|
||||
void MQTTInterface::setup(const FSH *id, MQTTBroker *b)
|
||||
{
|
||||
|
||||
//Create the MQTT environment and establish inital connection to the Broker
|
||||
broker = b;
|
||||
|
||||
DIAG(F("[%d] MQTT Connect to %S at %S/%d.%d.%d.%d:%d"), freeMemory(), id, broker->domain, broker->ip[0], broker->ip[1], broker->ip[2], broker->ip[3], broker->port);
|
||||
|
||||
// initalize MQ Broker
|
||||
mqttClient = new PubSubClient(broker->ip, broker->port, mqttCallback, ethClient);
|
||||
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT Client created ok..."));
|
||||
array_to_string(mac, CLIENTIDSIZE, clientID);
|
||||
DIAG(F("[%d] MQTT Client ID : %s"), freeMemory(), clientID);
|
||||
connect(); // inital connection as well as reconnects
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief MQTT broker connection / reconnection
|
||||
*
|
||||
*/
|
||||
void MQTTInterface::connect()
|
||||
{
|
||||
|
||||
int reconnectCount = 0;
|
||||
connectID[0] = '\0';
|
||||
|
||||
// Build the connect ID : Prefix + clientID
|
||||
if (broker->prefix != nullptr)
|
||||
{
|
||||
strcpy_P(connectID, (const char *)broker->prefix);
|
||||
}
|
||||
strcat(connectID, clientID);
|
||||
|
||||
// Connect to the broker
|
||||
DIAG(F("[%d] MQTT %s (re)connecting ..."), freeMemory(), connectID);
|
||||
while (!mqttClient->connected() && reconnectCount < MAXRECONNECT)
|
||||
{
|
||||
switch (broker->cType)
|
||||
{
|
||||
// no uid no pwd
|
||||
case 1:
|
||||
{ // port(p), ip(i), domain(d),
|
||||
DIAG(F("[%d] MQTT Broker connecting anonymous ..."), freeMemory());
|
||||
if (mqttClient->connect(connectID))
|
||||
{
|
||||
DIAG(F("[%d] MQTT Broker connected ..."),freeMemory());
|
||||
auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
|
||||
mqState = CONNECTED;
|
||||
}
|
||||
else
|
||||
{
|
||||
DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state());
|
||||
reconnectCount++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
// with uid passwd
|
||||
case 2:
|
||||
{
|
||||
DIAG(F("MQTT Broker connecting with uid/pwd ..."));
|
||||
char user[strlen_P((const char *)broker->user)];
|
||||
char pwd[strlen_P((const char *)broker->pwd)];
|
||||
|
||||
// need to copy from progmem to lacal
|
||||
strcpy_P(user, (const char *)broker->user);
|
||||
strcpy_P(pwd, (const char *)broker->pwd);
|
||||
|
||||
if (mqttClient->connect(connectID, user, pwd))
|
||||
{
|
||||
DIAG(F("MQTT Broker connected ..."));
|
||||
auto sub = subscribe(clientID); // set up the main subscription on which we will recieve the intal mi message from a subscriber
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT subscriptons %s..."), sub ? "ok" : "failed");
|
||||
mqState = CONNECTED;
|
||||
}
|
||||
else
|
||||
{
|
||||
DIAG(F("MQTT broker connection failed, rc=%d, trying to reconnect"), mqttClient->state());
|
||||
reconnectCount++;
|
||||
}
|
||||
break;
|
||||
// ! add last will messages for the client
|
||||
// (connectID, MQTT_BROKER_USER, MQTT_BROKER_PASSWD, "$connected", 0, true, "0", 0))
|
||||
}
|
||||
}
|
||||
if (reconnectCount == MAXRECONNECT)
|
||||
{
|
||||
DIAG(F("MQTT Connection aborted after %d tries"), MAXRECONNECT);
|
||||
mqState = CONNECTION_FAILED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief for the time being only one topic at the root
|
||||
* which is the unique clientID from the MCU
|
||||
* QoS is 0 by default
|
||||
*
|
||||
* @param topic to subsribe to
|
||||
* @return boolean true if successful false otherwise
|
||||
*/
|
||||
boolean MQTTInterface::subscribe(const char *topic)
|
||||
{
|
||||
auto res = mqttClient->subscribe(topic);
|
||||
return res;
|
||||
}
|
||||
|
||||
void MQTTInterface::publish(const char *topic, const char *payload)
|
||||
{
|
||||
mqttClient->publish(topic, payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Connect the Ethernet network;
|
||||
*
|
||||
* @return true if connections was successful
|
||||
*/
|
||||
bool MQTTInterface::setupNetwork()
|
||||
{
|
||||
// setup Ethernet connection first
|
||||
DIAG(F("[%d] Starting network setup ... "), freeMemory());
|
||||
DCCTimer::getSimulatedMacAddress(mac);
|
||||
|
||||
#ifdef IP_ADDRESS
|
||||
Ethernet.begin(mac, IP_ADDRESS);
|
||||
#else
|
||||
if (Ethernet.begin(mac) == 0)
|
||||
{
|
||||
DIAG(F("Ethernet.begin FAILED"));
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
DIAG(F("[%d] Ethernet.begin OK"), freeMemory());
|
||||
if (Ethernet.hardwareStatus() == EthernetNoHardware)
|
||||
{
|
||||
DIAG(F("Ethernet shield not found"));
|
||||
return false;
|
||||
}
|
||||
|
||||
// For slower cards like the ENC courtesy @PaulS
|
||||
// wait max 5 sec before bailing out on the connection
|
||||
|
||||
unsigned long startmilli = millis();
|
||||
while ((millis() - startmilli) < 5500)
|
||||
{
|
||||
if (Ethernet.linkStatus() == LinkON)
|
||||
break;
|
||||
DIAG(F("Ethernet waiting for link (1sec) "));
|
||||
delay(1000);
|
||||
}
|
||||
|
||||
if (Ethernet.linkStatus() == LinkOFF)
|
||||
{
|
||||
DIAG(F("Ethernet cable not connected"));
|
||||
return false;
|
||||
}
|
||||
DIAG(F("[%d] Ethernet link is up"),freeMemory());
|
||||
IPAddress ip = Ethernet.localIP(); // reassign the obtained ip address
|
||||
DIAG(F("IP: %d.%d.%d.%d"), ip[0], ip[1], ip[2], ip[3]);
|
||||
DIAG(F("Port:%d"), IP_PORT);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief handle the incomming queue in the loop
|
||||
*
|
||||
*/
|
||||
void inLoop(Queue<int> &in, ObjectPool<csmsg_t, MAXPOOLSIZE> &pool, RingStream *outboundRing)
|
||||
{
|
||||
|
||||
bool state;
|
||||
if (in.count() > 0)
|
||||
{
|
||||
// pop a command index from the incomming queue and get the command from the pool
|
||||
int idx = in.pop();
|
||||
csmsg_t *c = pool.getItem(idx, &state);
|
||||
|
||||
MQTTInterface::get()->setActive(c->mqsocket); // connection from where we recieved the command is active now
|
||||
// execute the command and collect results
|
||||
outboundRing->mark((uint8_t)c->mqsocket);
|
||||
CommandDistributor::parse(c->mqsocket, (byte *)c->cmd, outboundRing);
|
||||
outboundRing->commit();
|
||||
|
||||
// free the slot in the command pool
|
||||
pool.returnItem(idx);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief handle the outgoing messages in the loop
|
||||
*
|
||||
*/
|
||||
void outLoop(PubSubClient *mq)
|
||||
{
|
||||
// handle at most 1 outbound transmission
|
||||
MQTTInterface *mqtt = MQTTInterface::get();
|
||||
|
||||
auto clients = mqtt->getClients();
|
||||
auto outboundRing = mqtt->getRingStream();
|
||||
|
||||
int mqSocket = outboundRing->read();
|
||||
if (mqSocket >= 0) // mqsocket / clientid can't be 0 ....
|
||||
{
|
||||
int count = outboundRing->count();
|
||||
char buffer[MAXTSTR];
|
||||
buffer[0] = '\0';
|
||||
sprintf(buffer, "%s/%d/result", mqtt->getClientID(), (int)clients[mqSocket].topic);
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT publish to mqSocket=%d, count=:%d on topic %s"), mqSocket, count, buffer);
|
||||
|
||||
if (mq->beginPublish(buffer, count, false))
|
||||
{
|
||||
for (; count > 0; count--)
|
||||
{
|
||||
mq->write(outboundRing->read());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
DIAG(F("MQTT error start publishing result)"));
|
||||
};
|
||||
|
||||
if (!mq->endPublish())
|
||||
{
|
||||
DIAG(F("MQTT error finalizing published result)"));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief check if there are new subscribers connected and create the channels
|
||||
*
|
||||
* @param sq if the callback captured a client there will be an entry in the sq with the subscriber number
|
||||
* @param clients the clients array where we find the info to setup the subsciptions and print out the publish topics for info
|
||||
*/
|
||||
void checkSubscribers(Queue<int> &sq, csmqttclient_t *clients)
|
||||
{
|
||||
MQTTInterface *mqtt = MQTTInterface::get();
|
||||
if (sq.count() > 0)
|
||||
{
|
||||
// new subscriber
|
||||
auto s = sq.pop();
|
||||
|
||||
char tbuffer[(CLIENTIDSIZE * 2) + 1 + MAXTSTR];
|
||||
sprintf(tbuffer, "%s/%ld/cmd", mqtt->getClientID(), clients[s].topic);
|
||||
|
||||
auto ok = mqtt->subscribe(tbuffer);
|
||||
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT new subscriber topic: %s %s"), tbuffer, ok ? "OK" : "NOK");
|
||||
|
||||
// send the topic on which the CS will listen for commands and the ones on which it will publish for the connecting
|
||||
// client to pickup. Once the connecting client has setup other topic setup messages on the main channel shall be
|
||||
// ignored
|
||||
// JSON message { init: <number> channels: {result: <string>, diag: <string> }}
|
||||
|
||||
char buffer[MAXPAYLOAD * 2];
|
||||
memset(buffer, 0, MAXPAYLOAD * 2);
|
||||
|
||||
// sprintf(buffer, "mc(%d,%ld)", (int)clients[s].distant, clients[s].topic);
|
||||
|
||||
sprintf(buffer, "{ \"init\": %d, \"subscribeto\": {\"result\": \"%s/%ld/result\" , \"diag\": \"%s/%ld/diag\" }, \"publishto\": {\"cmd\": \"%s/%ld/cmd\" } }",
|
||||
(int)clients[s].distant,
|
||||
mqtt->getClientID(),
|
||||
clients[s].topic,
|
||||
mqtt->getClientID(),
|
||||
clients[s].topic,
|
||||
mqtt->getClientID(),
|
||||
clients[s].topic);
|
||||
|
||||
if (Diag::MQTT)
|
||||
DIAG(F("MQTT channel setup message: [%s]"), buffer);
|
||||
|
||||
mqtt->publish(mqtt->getClientID(), buffer);
|
||||
|
||||
// on the cs side all is set and we declare that the cs is open for business
|
||||
clients[s].open = true;
|
||||
}
|
||||
}
|
||||
|
||||
void MQTTInterface::loop()
|
||||
{
|
||||
if (!singleton)
|
||||
return;
|
||||
singleton->loop2();
|
||||
}
|
||||
|
||||
bool showonce = false;
|
||||
auto s = millis();
|
||||
|
||||
void loopPing(int interval)
|
||||
{
|
||||
auto c = millis();
|
||||
if (c - s > 2000)
|
||||
{
|
||||
DIAG(F("loop alive")); // ping every 2 sec
|
||||
s = c;
|
||||
}
|
||||
}
|
||||
|
||||
void MQTTInterface::loop2()
|
||||
{
|
||||
|
||||
// loopPing(2000); // ping every 2 sec
|
||||
// Connection impossible so just don't do anything
|
||||
if (singleton->mqState == CONNECTION_FAILED)
|
||||
{
|
||||
if (!showonce)
|
||||
{
|
||||
DIAG(F("MQTT connection failed..."));
|
||||
showonce = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (!mqttClient->connected())
|
||||
{
|
||||
DIAG(F("MQTT no connection trying to reconnect ..."));
|
||||
connect();
|
||||
}
|
||||
if (!mqttClient->loop())
|
||||
{
|
||||
DIAG(F("mqttClient returned with error; state: %d"), mqttClient->state());
|
||||
return;
|
||||
};
|
||||
|
||||
checkSubscribers(subscriberQueue, clients);
|
||||
inLoop(in, pool, outboundRing);
|
||||
outLoop(mqttClient);
|
||||
}
|
225
MQTTInterface.h
Normal file
225
MQTTInterface.h
Normal file
@@ -0,0 +1,225 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef _MQTTInterface_h_
|
||||
#define _MQTTInterface_h_
|
||||
|
||||
#if __has_include("config.h")
|
||||
#include "config.h"
|
||||
#else
|
||||
#warning config.h not found. Using defaults from config.example.h
|
||||
#include "config.example.h"
|
||||
#endif
|
||||
#include "defines.h"
|
||||
|
||||
#include <Arduino.h>
|
||||
#include <Ethernet.h>
|
||||
#include <Dns.h>
|
||||
#include <PubSubClient.h>
|
||||
|
||||
#include "DCCEXParser.h"
|
||||
#include "Queue.h"
|
||||
#include "ObjectPool.h"
|
||||
// #include "MemoryFree.h"
|
||||
#include "freeMemory.h"
|
||||
|
||||
#define MAXPAYLOAD 64 // max length of a payload recieved
|
||||
#define MAXDOMAINLENGTH 32 // domain name length for the broker e.g. test.mosquitto.org
|
||||
|
||||
#define MAXTBUF 64 //!< max length of the buffer for building the topic name ;to be checked
|
||||
#define MAXTMSG 64 //!< max length of the messages for a topic ;to be checked PROGMEM ?
|
||||
#define MAXTSTR 32 //!< max length of a topic string
|
||||
#define MAXCONNECTID 32 // broker connection id length incl possible prefixes
|
||||
#define CLIENTIDSIZE 6 // max length of the clientid used for connection to the broker
|
||||
#define MAXRECONNECT 5 // reconnection tries before final failure
|
||||
#define MAXMQTTCONNECTIONS 20 // maximum number of unique tpoics available for subscribers
|
||||
#define OUT_BOUND_SIZE 128 // Size of the RingStream used to provide results from the parser and publish
|
||||
#define MAX_POOL_SIZE 16 // recieved command store size
|
||||
#define MAX_CALLBACKS 10
|
||||
|
||||
// extern int freeMemory();
|
||||
struct MQTTBroker
|
||||
{
|
||||
int port;
|
||||
IPAddress ip;
|
||||
const FSH *domain = nullptr;
|
||||
const FSH *user = nullptr;
|
||||
const FSH *pwd = nullptr;
|
||||
const FSH *prefix = nullptr;
|
||||
byte cType; // connection type to identify valid params
|
||||
|
||||
IPAddress resovleBroker(const FSH *d)
|
||||
{
|
||||
DNSClient dns;
|
||||
IPAddress bip;
|
||||
|
||||
char domain[MAXDOMAINLENGTH];
|
||||
strcpy_P(domain, (const char *)d);
|
||||
|
||||
dns.begin(Ethernet.dnsServerIP());
|
||||
if (dns.getHostByName(domain, bip) == 1)
|
||||
{
|
||||
DIAG(F("[%d] MQTT Broker: %s = %d.%d.%d.%d"), freeMemory(), domain, bip[0], bip[1], bip[2], bip[3]);
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
DIAG(F("MQTT Dns lookup for %s failed"), domain);
|
||||
}
|
||||
return bip;
|
||||
}
|
||||
|
||||
// all boils down to the ip address type = 1 without user authentication 2 with user authentication
|
||||
// no ssl support !
|
||||
|
||||
// port & ip address
|
||||
MQTTBroker(int p, IPAddress i) : port(p), ip(i), cType(1){};
|
||||
// port & domain name
|
||||
MQTTBroker(int p, const FSH *d) : port(p), domain(d), cType(1)
|
||||
{
|
||||
ip = resovleBroker(d);
|
||||
};
|
||||
|
||||
// port & ip & prefix
|
||||
MQTTBroker(int p, IPAddress i, const FSH *pfix) : port(p), ip(i), prefix(pfix), cType(1){};
|
||||
// port & domain & prefix
|
||||
MQTTBroker(int p, const FSH *d, const FSH *pfix) : port(p), domain(d), prefix(pfix), cType(1)
|
||||
{
|
||||
ip = resovleBroker(d);
|
||||
};
|
||||
|
||||
// port & ip & user & pwd
|
||||
MQTTBroker(int p, IPAddress i, const FSH *uid, const FSH *pass) : port(p), ip(i), user(uid), pwd(pass), cType(2){};
|
||||
// port & domain & user & pwd
|
||||
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass) : port(p), domain(d), user(uid), pwd(pass), cType(2)
|
||||
{
|
||||
ip = resovleBroker(d);
|
||||
};
|
||||
|
||||
// port & ip & user & pwd & prefix
|
||||
MQTTBroker(int p, IPAddress i, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), ip(i), user(uid), pwd(pass), prefix(pfix), cType(2){};
|
||||
// port & domain & user & pwd & prefix
|
||||
MQTTBroker(int p, const FSH *d, const FSH *uid, const FSH *pass, const FSH *pfix) : port(p), domain(d), user(uid), pwd(pass), prefix(pfix), cType(2)
|
||||
{
|
||||
ip = resovleBroker(d);
|
||||
};
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief dcc-ex command as recieved via MQ
|
||||
*
|
||||
*/
|
||||
typedef struct csmsg_t
|
||||
{
|
||||
char cmd[MAXPAYLOAD]; // recieved command message
|
||||
byte mqsocket; // from which mqsocket / subscriberid
|
||||
} csmsg_t;
|
||||
|
||||
typedef struct csmqttclient_t
|
||||
{
|
||||
int distant; // random int number recieved from the subscriber
|
||||
byte mqsocket; // mqtt socket = subscriberid provided by the cs
|
||||
long topic; // cantor(subscriber,cs) encoded tpoic used to send / recieve commands
|
||||
bool open; // true as soon as we have send the id to the mq broker for the client to pickup
|
||||
} csmqttclient_t;
|
||||
|
||||
enum MQTTInterfaceState
|
||||
{
|
||||
INIT,
|
||||
CONFIGURED, // server/client objects set
|
||||
CONNECTED, // mqtt broker is connected
|
||||
CONNECTION_FAILED // Impossible to get the connection set after MAXRECONNECT tries
|
||||
};
|
||||
|
||||
class MQTTInterface
|
||||
{
|
||||
private:
|
||||
// Methods
|
||||
MQTTInterface();
|
||||
MQTTInterface(const MQTTInterface &); // non construction-copyable
|
||||
MQTTInterface &operator=(const MQTTInterface &); // non copyable
|
||||
|
||||
void setup(const FSH *id, MQTTBroker *broker); // instantiates the broker
|
||||
void connect(); // (re)connects to the broker
|
||||
bool setupNetwork(); // sets up the network connection for the PubSub system
|
||||
void loop2();
|
||||
|
||||
// Members
|
||||
static MQTTInterface *singleton; // unique instance of the MQTTInterface object
|
||||
EthernetClient ethClient; // TCP Client object for the MQ Connection
|
||||
byte mac[6]; // simulated mac address
|
||||
IPAddress server; // MQTT server object
|
||||
MQTTBroker *broker; // Broker configuration object as set in config.h
|
||||
|
||||
ObjectPool<csmsg_t, MAXPOOLSIZE> pool; // Pool of commands recieved for the CS
|
||||
Queue<int> in; // Queue of indexes into the pool according to incomming cmds
|
||||
Queue<int> subscriberQueue; // Queue for incomming subscribers; push the subscriber into the queue for setup in a loop cycle
|
||||
|
||||
char clientID[(CLIENTIDSIZE * 2) + 1]; // unique ID of the commandstation; not to confused with the connectionID
|
||||
csmqttclient_t clients[MAXMQTTCONNECTIONS]; // array of connected mqtt clients
|
||||
char connectID[MAXCONNECTID]; // clientId plus possible prefix if required by the broker
|
||||
byte subscriberid = 0; // id assigned to a mqtt client when recieving the inital handshake; +1 at each connection
|
||||
byte activeSubscriber = 0; // if its 0 no active Subscriber; set as soon as we recieve a command of go into processing on the CS
|
||||
|
||||
bool connected = false; // set to true if the ethernet connection is available
|
||||
MQTTInterfaceState mqState = INIT; // Status of the MQBroker connection
|
||||
RingStream *outboundRing; // Buffer for collecting the results from the command parser
|
||||
PubSubClient *mqttClient; // PubSub Endpoint for data exchange
|
||||
|
||||
|
||||
public:
|
||||
static MQTTInterface *get() noexcept { return singleton;}
|
||||
|
||||
boolean subscribe(const char *topic);
|
||||
void publish(const char *topic, const char *payload);
|
||||
|
||||
ObjectPool<csmsg_t, MAXPOOLSIZE> *getPool() { return &pool; };
|
||||
Queue<int> *getIncomming() { return ∈ };
|
||||
Queue<int> *getSubscriptionQueue() { return &subscriberQueue; };
|
||||
MQTTInterfaceState getState() { return mqState; };
|
||||
byte getActive() { return activeSubscriber; };
|
||||
void setActive(byte mqSocket) { activeSubscriber = mqSocket; };
|
||||
char *getClientID() { return clientID; };
|
||||
uint8_t getClientSize() { return subscriberid; };
|
||||
|
||||
// initalized to 0 so that the first id comming back is 1
|
||||
// index 0 in the clients array is not used therefore
|
||||
|
||||
uint8_t obtainSubscriberID()
|
||||
{
|
||||
if (subscriberid == MAXMQTTCONNECTIONS)
|
||||
{
|
||||
return 0; // no more subscriber id available
|
||||
}
|
||||
return (++subscriberid);
|
||||
}
|
||||
|
||||
csmqttclient_t *getClients() { return clients; };
|
||||
RingStream *getRingStream() { return outboundRing; };
|
||||
|
||||
static void setup();
|
||||
static void loop();
|
||||
|
||||
~MQTTInterface() = default;
|
||||
};
|
||||
|
||||
|
||||
#endif
|
@@ -25,7 +25,7 @@
|
||||
// Arduino standard Motor Shield
|
||||
#define STANDARD_MOTOR_SHIELD F("STANDARD_MOTOR_SHIELD"), \
|
||||
new MotorDriver(3, 12, UNUSED_PIN, UNUSED_PIN, A0, 2.99, 2000, UNUSED_PIN), \
|
||||
new MotorDriver(11, 13, UNUSED_PIN, UNUSED_PIN, A1, 2.99, 2000, UNUSED_PIN)
|
||||
new MotorDriver(11, 13, UNUSED_PIN, UNUSED_PIN, A2, 2.99, 2000, UNUSED_PIN)
|
||||
|
||||
// Pololu Motor Shield
|
||||
#define POLOLU_MOTOR_SHIELD F("POLOLU_MOTOR_SHIELD"), \
|
||||
|
110
ObjectPool.h
Normal file
110
ObjectPool.h
Normal file
@@ -0,0 +1,110 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef _ObjectPool_h_
|
||||
#define _ObjectPool_h_
|
||||
|
||||
#include <DIAG.h>
|
||||
|
||||
#define MAXPOOLSIZE 32
|
||||
|
||||
template <typename T, int length>
|
||||
class ObjectPool
|
||||
{
|
||||
|
||||
// just make sure that we don't create a pool eating up all memory @compiletime
|
||||
static_assert(length <= MAXPOOLSIZE);
|
||||
struct item
|
||||
{
|
||||
T i;
|
||||
bool free = true; // boolean 1 free i.e. i can be reused; 0 occupied
|
||||
};
|
||||
|
||||
private:
|
||||
item p[length]; // MAXPOOLSIZE items of struct item
|
||||
const int size = length; // size of the pool
|
||||
|
||||
int findFreeIdx()
|
||||
{ // find the first free index or return -1 if there is none
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
if (p[i].free)
|
||||
{
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1; // if we are here there is no free slot available
|
||||
}
|
||||
|
||||
public:
|
||||
int setItem(T i)
|
||||
{ // add an item to the pool at a free slot
|
||||
int idx = findFreeIdx();
|
||||
if (idx != -1)
|
||||
{
|
||||
p[idx].i = i;
|
||||
p[idx].free = false;
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief returns the slot for an object to the pool i.e. frees the slot for reuse of the data member and
|
||||
* clears out the memory
|
||||
*
|
||||
* @param idx
|
||||
* @return true if the return is ok
|
||||
* @return false otherwise
|
||||
*/
|
||||
bool returnItem(int idx)
|
||||
{ // clear item at pool index idx
|
||||
if (idx > size)
|
||||
{ // can't return an item outside of the pool size; returns false;
|
||||
return false;
|
||||
}
|
||||
memset(&p[idx].i, 0, sizeof(T)); // clear out the memory but keep the allocation for reuse
|
||||
p[idx].free = true;
|
||||
return true; // set the free flag
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Obtain a pool item
|
||||
* @note This should only be used for debugging.
|
||||
* It allows to change actually the content of the pool item where this should only be allowed for the setItem method.
|
||||
* @param idx Index of the pool item to retrieve
|
||||
* @param state State of the pool item ( 1 available, 0 occupied)
|
||||
* @return T* returns the pointer to the pool item
|
||||
*/
|
||||
T *getItem(int idx, bool *state)
|
||||
{
|
||||
*state = p[idx].free;
|
||||
return &p[idx].i;
|
||||
}
|
||||
|
||||
int getSize()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
|
||||
ObjectPool() = default;
|
||||
~ObjectPool() = default;
|
||||
};
|
||||
|
||||
#endif
|
121
Queue.h
Normal file
121
Queue.h
Normal file
@@ -0,0 +1,121 @@
|
||||
/*
|
||||
* © 2021, Gregor Baues, All rights reserved.
|
||||
*
|
||||
* This file is part of DCC-EX/CommandStation-EX
|
||||
*
|
||||
* This is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* It is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with CommandStation. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef _Queue_h_
|
||||
#define _Queue_h_
|
||||
|
||||
#include <Arduino.h>
|
||||
|
||||
template<class T>
|
||||
class Queue {
|
||||
|
||||
private:
|
||||
int _front, _back, _count;
|
||||
T *_data;
|
||||
int _maxitems;
|
||||
|
||||
public:
|
||||
Queue(int maxitems = 256) {
|
||||
_front = 0;
|
||||
_back = 0;
|
||||
_count = 0;
|
||||
_maxitems = maxitems;
|
||||
_data = new T[maxitems + 1];
|
||||
}
|
||||
~Queue() {
|
||||
delete[] _data;
|
||||
}
|
||||
inline int count();
|
||||
inline int front();
|
||||
inline int back();
|
||||
void push(const T &item);
|
||||
T peek();
|
||||
T pop();
|
||||
void clear();
|
||||
};
|
||||
|
||||
template<class T>
|
||||
inline int Queue<T>::count()
|
||||
{
|
||||
return _count;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
inline int Queue<T>::front()
|
||||
{
|
||||
return _front;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
inline int Queue<T>::back()
|
||||
{
|
||||
return _back;
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void Queue<T>::push(const T &item)
|
||||
{
|
||||
if(_count < _maxitems) { // Drops out when full
|
||||
_data[_back++]=item;
|
||||
++_count;
|
||||
// Check wrap around
|
||||
if (_back > _maxitems)
|
||||
_back -= (_maxitems + 1);
|
||||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
T Queue<T>::pop() {
|
||||
if(_count <= 0) return T(); // Returns empty
|
||||
else {
|
||||
T result = _data[_front];
|
||||
_front++;
|
||||
--_count;
|
||||
// Check wrap around
|
||||
if (_front > _maxitems)
|
||||
_front -= (_maxitems + 1);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
template<class T>
|
||||
T Queue<T>::peek() {
|
||||
if(_count <= 0) return T(); // Returns empty
|
||||
else return _data[_front];
|
||||
}
|
||||
|
||||
template<class T>
|
||||
void Queue<T>::clear()
|
||||
{
|
||||
_front = _back;
|
||||
_count = 0;
|
||||
}
|
||||
|
||||
#endif // _Queue_h_
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@@ -103,3 +103,43 @@ bool RingStream::commit() {
|
||||
_buffer[_mark]=lowByte(_count);
|
||||
return true; // commit worked
|
||||
}
|
||||
|
||||
// grbba to be removed
|
||||
// print the buffer one line for 10 chars in the array
|
||||
// void RingStream::printBuffer() {
|
||||
// int j = 0;
|
||||
// for ( int k = 0; k < _len; k++ ) {
|
||||
// if ( j == 10) {
|
||||
// j = 0;
|
||||
// Serial.println();
|
||||
// }
|
||||
// j++;
|
||||
// Serial.print((char) _buffer[k]);
|
||||
// Serial.print(" ");
|
||||
// }
|
||||
// }
|
||||
|
||||
// void RingStream::printInfo() {
|
||||
// Serial.print("_len: "); Serial.println(_len);
|
||||
// Serial.print("_pos_write: "); Serial.println(_pos_write);
|
||||
// Serial.print("_pos_read: "); Serial.println(_pos_read);
|
||||
// Serial.print("_overflow: "); Serial.println(_overflow);
|
||||
// Serial.print("_mark: "); Serial.println(_mark);
|
||||
// Serial.print("_count: ");Serial.println(_count);
|
||||
|
||||
// }
|
||||
|
||||
// void RingStream::reset(const uint16_t len)
|
||||
// {
|
||||
// _len=len;
|
||||
// memset(_buffer,0,len);
|
||||
// // _buffer=new byte[len];
|
||||
// _pos_write=0;
|
||||
// _pos_read=0;
|
||||
// _buffer[0]=0;
|
||||
// _overflow=false;
|
||||
// _mark=0;
|
||||
// _count=0;
|
||||
// }
|
||||
|
||||
// grbba to be removed
|
20
RingStream.h
20
RingStream.h
@@ -21,10 +21,12 @@
|
||||
|
||||
#include <Arduino.h>
|
||||
|
||||
// template <size_t S>
|
||||
class RingStream : public Print {
|
||||
|
||||
public:
|
||||
RingStream( const uint16_t len);
|
||||
~RingStream() = default;
|
||||
|
||||
virtual size_t write(uint8_t b);
|
||||
using Print::write;
|
||||
@@ -34,15 +36,27 @@ class RingStream : public Print {
|
||||
void mark(uint8_t b);
|
||||
bool commit();
|
||||
uint8_t peekTargetMark();
|
||||
|
||||
|
||||
int size() {return _len;}
|
||||
byte *getBuffer() { return _buffer; }
|
||||
// grbba to be removed
|
||||
// void printBuffer();
|
||||
// void printInfo();
|
||||
// void reset(const uint16_t len);
|
||||
// grbba to be removed
|
||||
|
||||
int getLen() { return _len; };
|
||||
|
||||
private:
|
||||
int _len;
|
||||
int _pos_write;
|
||||
// int _len = S;
|
||||
int _pos_write ;
|
||||
int _pos_read;
|
||||
bool _overflow;
|
||||
int _mark;
|
||||
int _count;
|
||||
byte * _buffer;
|
||||
// byte _buffer[S];
|
||||
byte *_buffer;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@@ -35,6 +35,7 @@ bool Diag::WIFI=false;
|
||||
bool Diag::WITHROTTLE=false;
|
||||
bool Diag::ETHERNET=false;
|
||||
bool Diag::LCN=false;
|
||||
bool Diag::MQTT=false;
|
||||
|
||||
|
||||
void StringFormatter::diag( const FSH* input...) {
|
||||
|
@@ -34,6 +34,7 @@ class Diag {
|
||||
static bool WITHROTTLE;
|
||||
static bool ETHERNET;
|
||||
static bool LCN;
|
||||
static bool MQTT;
|
||||
|
||||
};
|
||||
|
||||
|
@@ -37,7 +37,7 @@ The configuration file for DCC-EX Command Station
|
||||
// NOTE: Only supported on Arduino Mega
|
||||
// Set to false if you not even want it on the Arduino Mega
|
||||
//
|
||||
#define ENABLE_WIFI true
|
||||
// #define ENABLE_WIFI true
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
@@ -93,6 +93,44 @@ The configuration file for DCC-EX Command Station
|
||||
//#define IP_ADDRESS { 192, 168, 1, 200 }
|
||||
|
||||
|
||||
// ENABLE_MQTT: if set to true you have to have an Arduino Ethernet card (wired). This
|
||||
// is not for Wifi. You will need the Arduino Ethernet library as well as the PubSub
|
||||
// library from <add link here> or get via the libray manager either from the IDE
|
||||
// or PIO
|
||||
|
||||
// The following is only needed if the Broker requires it. cf broker descriptions below
|
||||
#define MQTT_USER "your broker user name"
|
||||
#define MQTT_PWD "your broker passwd"
|
||||
#define MQTT_PREFIX "prefix if required by the broker"
|
||||
|
||||
// UNCOMMENT THE FOLLOWING LINE TO ENABLE MQTT
|
||||
#define ENABLE_MQTT true
|
||||
|
||||
// Set the used broker to one of the configurations from MQTTBrokers.h where some
|
||||
// public freely avaiable brokers are configured
|
||||
|
||||
// DEFINE THE MQTT BROKER BELOW ACCORDING TO THE FOLLOWING TABLE:
|
||||
//
|
||||
// DCCEX_MQTT_BROKER : DCCEX Team best effort operated MQTT broker; pls apply for user/pwd on discord in the mqtt channel if you want to try it
|
||||
// DCCEX_MOSQUITTO : Mosquitto.org public test broker no user / pwd required so anyone can subscribe/publish to any topic here; good for testing only
|
||||
// DCCEX_HIVEMQ : Provided by HiveMQ; Public no user / pwd required
|
||||
// |
|
||||
// +-----------------------v
|
||||
|
||||
#define CSMQTTBROKER DCCEX_MOSQUITTO
|
||||
|
||||
// --------------------------
|
||||
// CUSTOMIZED EXAMPLE
|
||||
// Configuration for a broker installed on a machine on you home netowrk where the IP address of the machine runing the broker
|
||||
// is 192.168.0.2 and requires user authentication. The uid ad pwd are set in the config.h file
|
||||
|
||||
// Port IPAddress Username (opt) Password(opt) Prefix (opt)
|
||||
// #define MY_PERSONAL_BROKER F("MYBROKERMQ"), new MQTTBroker( 1883, {192, 168, 0, 2}, F("username"), F("password"), F("prefix-if-required"))
|
||||
//
|
||||
// If you have access to a broker on the internet replace the IPAddress by F("my-broker-domain-name")
|
||||
// -------------------------
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// DEFINE LCD SCREEN USAGE BY THE BASE STATION
|
||||
|
122
csexpatch.sh
Normal file
122
csexpatch.sh
Normal file
@@ -0,0 +1,122 @@
|
||||
#!/bin/bash
|
||||
# Files to be added to the CS as is
|
||||
# Session.cpp
|
||||
# Session.h
|
||||
# Diag.cpp
|
||||
# Queue.cpp
|
||||
# Queue.h
|
||||
|
||||
# patches to apply
|
||||
# file / "marker" / "replace with marker + target" / 0 marker before 1 marker after
|
||||
|
||||
|
||||
patch2=(WifiInboundHandler.cpp "runningClientId);" "Connection::type = _WIFI; Connection::id = runningClientId;" 0)
|
||||
|
||||
#main include of the class for handling the CLI session
|
||||
patch3=(DCCEX.h "#define DCCEX_h" "\n#include \"Session.h\"" 0)
|
||||
|
||||
#added testing if the motoshield has been started and thus the Waveform gen is running
|
||||
patch7=(DCCWaveform.cpp "progTripValue=0;" "bool DCCWaveform::running=false;" 0)
|
||||
patch13=(DCCWaveform.cpp "interruptHandler);" "\nrunning=true;" 0)
|
||||
patch8=(DCCWaveform.h "public:" "\nstatic bool isRunning() { return running; }" 0)
|
||||
patch9=(DCCWaveform.h "private:" "\nstatic bool running;" 0)
|
||||
|
||||
#definitions needed for handling latching i.e. sending the diag output to the currentmy 'active' connection
|
||||
#prepared for WiFi but that is not implemented
|
||||
patch10=(DIAG.h "StringFormatter::lcd" "\nenum Transport { _WIFI, _ETHERNET}; \
|
||||
\nstruct Connection { static Transport type; static byte id;}; \
|
||||
\nstruct Latch { static Transport type; static byte id;};" 0)
|
||||
|
||||
#Ethernet Interface changes to get to the connection for sending information to the CLI
|
||||
patch11=(EthernetInterface.h "loop();" "\nbool isConnected() { return connected; };\
|
||||
\nstatic EthernetInterface *get() { return singleton; };\
|
||||
\nEthernetClient *getClient(int socket) { return \&clients[socket]; };" 0)
|
||||
patch12=(EthernetInterface.cpp "socket,buffer);" "\nConnection::type = _ETHERNET; Connection::id = socket;" 0)
|
||||
|
||||
#Adding a) the LATCH diagnostic command to the parseD; allowing to send set the diag output to the active ethernet
|
||||
#connection; DOes not work for WiFi and b) handling of the atCommandCallback piggy backing the + command so need check
|
||||
#if the Waveform gen has statred as otherwise we try to poweroff a non exisiting motorshield
|
||||
|
||||
patch4=(DCCEXParser.cpp "26133;" "\nconst int16_t HASH_KEYWORD_LATCH = 1618;" 0)
|
||||
patch5=(DCCEXParser.cpp "(atCommandCallback) {" "\nif (DCCWaveform::isRunning()) {" 0)
|
||||
patch6=(DCCEXParser.cpp "progTrack.setPowerMode(POWERMODE::OFF);" "\n}" 0)
|
||||
patch14=(DCCEXParser.cpp "case HASH_KEYWORD_CABS:" "\n case HASH_KEYWORD_LATCH:\
|
||||
\n Diag::LATCH = onOff; \
|
||||
\n Latch::type = Connection::type; \
|
||||
\n Latch::id = Connection::id; \
|
||||
\n return true; \n " 1 )
|
||||
|
||||
#StringFormatter : adding things needed for Latching the Wifi or Ethernet connection to reciev the diag output
|
||||
#remove a bracket on line 47 which will be added again in patch 17; If that is not done we end up with one
|
||||
#bracket too much; Brittle and prone to issues as they change stuff but so far the best i can get
|
||||
patch1=(StringFormatter.h "LCN;" "static bool LATCH;" 0)
|
||||
|
||||
sed -i -e '47d' StringFormatter.cpp
|
||||
|
||||
patch15=(StringFormatter.cpp "LCN=false;" "\nbool Diag::LATCH=false;" 0)
|
||||
patch16=(StringFormatter.cpp "if (!diagSerial) return;" "\n#if ETHERNET_ON == true || WIFI_ON == true \
|
||||
\n auto t = diagSerial;\
|
||||
\n if (Diag::LATCH)\
|
||||
\n {\
|
||||
\n switch (Latch::type)\
|
||||
\n {\
|
||||
\n case _ETHERNET:\
|
||||
\n {\
|
||||
\n#if ETHERNET_ON == true\
|
||||
\n auto i = EthernetInterface::get();\
|
||||
\n auto s = i->getClient(Latch::id); \
|
||||
\n if (s->connected())\
|
||||
\n { \
|
||||
\n diagSerial = s;\
|
||||
\n }\
|
||||
\n#endif\
|
||||
\n break;\
|
||||
\n }\
|
||||
\n case _WIFI:\
|
||||
\n {\
|
||||
\n DIAG(F(\"Latch on Wifi is not possible for now ...\"));\
|
||||
\n break;\
|
||||
\n }\
|
||||
\n }\
|
||||
\n }\
|
||||
\n#endif\n" 1)
|
||||
patch17=(StringFormatter.cpp "void StringFormatter::lcd" "\n#if ETHERNET_ON == true || WIFI_ON == true \
|
||||
\n if (Diag::LATCH)\
|
||||
\n {\
|
||||
\n diagSerial = t;\
|
||||
\n }\
|
||||
\n#endif\n}\n" 1)
|
||||
|
||||
patch=(patch1 patch2 patch3 patch4 patch5 patch6 patch7 patch8 patch9 patch10 patch11 patch12 patch13 patch14 patch15 patch16 patch17)
|
||||
|
||||
# patch=(patch17)
|
||||
|
||||
declare -n elmv1
|
||||
|
||||
for elmv1 in "${patch[@]}"; do
|
||||
file="${elmv1[0]}"
|
||||
marker="${elmv1[1]}"
|
||||
markerpos="${elmv1[3]}"
|
||||
|
||||
if [ $markerpos = 1 ]
|
||||
then
|
||||
target="${elmv1[2]} $marker"
|
||||
else
|
||||
target="$marker ${elmv1[2]}"
|
||||
fi
|
||||
|
||||
echo $marker
|
||||
# echo $target
|
||||
echo $file
|
||||
|
||||
grep -q $marker $file
|
||||
if [ $? -eq 0 ]
|
||||
then
|
||||
echo "Patching $file with $marker --> $target ..."
|
||||
sed -i "s/$marker/$target/" $file
|
||||
else
|
||||
echo "Patching $file failed."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
done
|
17
defines.h
17
defines.h
@@ -33,13 +33,24 @@
|
||||
#endif
|
||||
|
||||
#if ENABLE_ETHERNET && (defined(ARDUINO_AVR_MEGA) || defined(ARDUINO_AVR_MEGA2560) || defined(ARDUINO_SAMD_ZERO) || defined(TEENSYDUINO))
|
||||
#define ETHERNET_ON true
|
||||
#define ETHERNET_ON true
|
||||
#else
|
||||
#define ETHERNET_ON false
|
||||
#define ETHERNET_ON false
|
||||
#endif
|
||||
|
||||
// MQTT handles ethernet on it's own
|
||||
#if ENABLE_MQTT && (defined(ARDUINO_AVR_MEGA) || defined(ARDUINO_AVR_MEGA2560) || defined(ARDUINO_SAMD_ZERO) || defined(TEENSYDUINO))
|
||||
#if ENABLE_ETHERNET
|
||||
#error Ethernet and MQTT can not be enabled simultaneaously
|
||||
#elif ENABLE_WIFI
|
||||
#error WIFI and MQTT can not be enabled simultaneaously
|
||||
#else
|
||||
#define MQTT_ON true
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#if WIFI_ON && ETHERNET_ON
|
||||
#error Command Station does not support WIFI and ETHERNET at the same time.
|
||||
#error Command Station does not support WIFI and ETHERNET at the same time.
|
||||
#endif
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@@ -23,7 +23,7 @@
|
||||
|
||||
// thanks go to https://github.com/mpflaga/Arduino-MemoryFree
|
||||
#if defined(__arm__)
|
||||
extern "C" char* sbrk(int);
|
||||
extern "C" char *sbrk(int);
|
||||
#elif defined(__AVR__)
|
||||
extern char *__brkval;
|
||||
extern char *__malloc_heap_start;
|
||||
@@ -31,14 +31,16 @@ extern char *__malloc_heap_start;
|
||||
#error Unsupported board type
|
||||
#endif
|
||||
|
||||
|
||||
static volatile int minimum_free_memory = __INT_MAX__;
|
||||
|
||||
#if !defined(__IMXRT1062__)
|
||||
static inline int freeMemory() {
|
||||
|
||||
inline int freeMemory()
|
||||
// static inline int freeMemory()
|
||||
{
|
||||
char top;
|
||||
#if defined(__arm__)
|
||||
return &top - reinterpret_cast<char*>(sbrk(0));
|
||||
return &top - reinterpret_cast<char *>(sbrk(0));
|
||||
#elif defined(__AVR__)
|
||||
return __brkval ? &top - __brkval : &top - __malloc_heap_start;
|
||||
#else
|
||||
@@ -47,7 +49,8 @@ static inline int freeMemory() {
|
||||
}
|
||||
|
||||
// Return low memory value.
|
||||
int minimumFreeMemory() {
|
||||
int minimumFreeMemory()
|
||||
{
|
||||
byte sreg_save = SREG;
|
||||
noInterrupts(); // Disable interrupts
|
||||
int retval = minimum_free_memory;
|
||||
@@ -57,34 +60,36 @@ int minimumFreeMemory() {
|
||||
|
||||
#else
|
||||
#if defined(ARDUINO_TEENSY40)
|
||||
static const unsigned DTCM_START = 0x20000000UL;
|
||||
static const unsigned OCRAM_START = 0x20200000UL;
|
||||
static const unsigned OCRAM_SIZE = 512;
|
||||
static const unsigned FLASH_SIZE = 1984;
|
||||
static const unsigned DTCM_START = 0x20000000UL;
|
||||
static const unsigned OCRAM_START = 0x20200000UL;
|
||||
static const unsigned OCRAM_SIZE = 512;
|
||||
static const unsigned FLASH_SIZE = 1984;
|
||||
#elif defined(ARDUINO_TEENSY41)
|
||||
static const unsigned DTCM_START = 0x20000000UL;
|
||||
static const unsigned OCRAM_START = 0x20200000UL;
|
||||
static const unsigned OCRAM_SIZE = 512;
|
||||
static const unsigned FLASH_SIZE = 7936;
|
||||
#if TEENSYDUINO>151
|
||||
extern "C" uint8_t external_psram_size;
|
||||
static const unsigned DTCM_START = 0x20000000UL;
|
||||
static const unsigned OCRAM_START = 0x20200000UL;
|
||||
static const unsigned OCRAM_SIZE = 512;
|
||||
static const unsigned FLASH_SIZE = 7936;
|
||||
#if TEENSYDUINO > 151
|
||||
extern "C" uint8_t external_psram_size;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
static inline int freeMemory() {
|
||||
static inline int freeMemory()
|
||||
{
|
||||
extern unsigned long _ebss;
|
||||
extern unsigned long _sdata;
|
||||
extern unsigned long _estack;
|
||||
const unsigned DTCM_START = 0x20000000UL;
|
||||
unsigned dtcm = (unsigned)&_estack - DTCM_START;
|
||||
unsigned stackinuse = (unsigned) &_estack - (unsigned) __builtin_frame_address(0);
|
||||
unsigned stackinuse = (unsigned)&_estack - (unsigned)__builtin_frame_address(0);
|
||||
unsigned varsinuse = (unsigned)&_ebss - (unsigned)&_sdata;
|
||||
unsigned freemem = dtcm - (stackinuse + varsinuse);
|
||||
return freemem;
|
||||
}
|
||||
|
||||
// Return low memory value.
|
||||
int minimumFreeMemory() {
|
||||
int minimumFreeMemory()
|
||||
{
|
||||
//byte sreg_save = SREG;
|
||||
//noInterrupts(); // Disable interrupts
|
||||
int retval = minimum_free_memory;
|
||||
@@ -93,19 +98,20 @@ int minimumFreeMemory() {
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
// Update low ram level. Allow for extra bytes to be specified
|
||||
// by estimation or inspection, that may be used by other
|
||||
// by estimation or inspection, that may be used by other
|
||||
// called subroutines. Must be called with interrupts disabled.
|
||||
//
|
||||
//
|
||||
// Although __brkval may go up and down as heap memory is allocated
|
||||
// and freed, this function records only the worst case encountered.
|
||||
// So even if all of the heap is freed, the reported minimum free
|
||||
// So even if all of the heap is freed, the reported minimum free
|
||||
// memory will not increase.
|
||||
//
|
||||
void updateMinimumFreeMemory(unsigned char extraBytes) {
|
||||
int spare = freeMemory()-extraBytes;
|
||||
if (spare < 0) spare = 0;
|
||||
if (spare < minimum_free_memory) minimum_free_memory = spare;
|
||||
void updateMinimumFreeMemory(unsigned char extraBytes)
|
||||
{
|
||||
int spare = freeMemory() - extraBytes;
|
||||
if (spare < 0)
|
||||
spare = 0;
|
||||
if (spare < minimum_free_memory)
|
||||
minimum_free_memory = spare;
|
||||
}
|
||||
|
||||
|
@@ -20,6 +20,8 @@
|
||||
|
||||
#ifndef freeMemory_h
|
||||
#define freeMemory_h
|
||||
|
||||
void updateMinimumFreeMemory(unsigned char extraBytes=0);
|
||||
int minimumFreeMemory();
|
||||
int freeMemory();
|
||||
#endif
|
||||
|
@@ -23,6 +23,7 @@ framework = arduino
|
||||
upload_protocol = atmel-ice
|
||||
lib_deps =
|
||||
${env.lib_deps}
|
||||
PubSubClient
|
||||
SparkFun External EEPROM Arduino Library
|
||||
monitor_speed = 115200
|
||||
monitor_flags = --echo
|
||||
@@ -33,6 +34,7 @@ board = megaatmega2560
|
||||
framework = arduino
|
||||
lib_deps =
|
||||
${env.lib_deps}
|
||||
PubSubClient
|
||||
arduino-libraries/Ethernet
|
||||
SPI
|
||||
monitor_speed = 115200
|
||||
@@ -44,6 +46,7 @@ board = uno
|
||||
framework = arduino
|
||||
lib_deps =
|
||||
${env.lib_deps}
|
||||
PubSubClient
|
||||
arduino-libraries/Ethernet
|
||||
SPI
|
||||
monitor_speed = 115200
|
||||
@@ -55,6 +58,7 @@ board = uno_wifi_rev2
|
||||
framework = arduino
|
||||
lib_deps =
|
||||
${env.lib_deps}
|
||||
PubSubClient
|
||||
arduino-libraries/Ethernet
|
||||
SPI
|
||||
monitor_speed = 115200
|
||||
@@ -67,6 +71,7 @@ board = uno
|
||||
framework = arduino
|
||||
lib_deps =
|
||||
${env.lib_deps}
|
||||
PubSubClient
|
||||
arduino-libraries/Ethernet
|
||||
SPI
|
||||
monitor_speed = 115200
|
||||
|
1
test/mpub.sh
Executable file
1
test/mpub.sh
Executable file
@@ -0,0 +1 @@
|
||||
mosquitto_pub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -m "mi(255)"
|
1
test/msub.sh
Executable file
1
test/msub.sh
Executable file
@@ -0,0 +1 @@
|
||||
mosquitto_sub -h test.mosquitto.org -p 1883 -t 6E756E6B776F -k 600
|
Reference in New Issue
Block a user