From 11e22c5d1d55a32cc484e4b74246b54c91853d60 Mon Sep 17 00:00:00 2001 From: Asbelos Date: Tue, 13 Oct 2020 17:37:40 +0100 Subject: [PATCH] Working Wifi ringbuffer implementation Notice 1kb output buffer Aslo no need to copy command in Withrottle --- CommandDistributor.cpp | 45 +-------------- CommandDistributor.h | 9 +-- RingStream.cpp | 70 +++++++++++++++++++++++ RingStream.h | 46 +++++++++++++++ WiThrottle.cpp | 10 +--- WifiInboundHandler.cpp | 123 ++++++++++++++++++----------------------- WifiInboundHandler.h | 32 ++++------- 7 files changed, 184 insertions(+), 151 deletions(-) create mode 100644 RingStream.cpp create mode 100644 RingStream.h diff --git a/CommandDistributor.cpp b/CommandDistributor.cpp index 47551b2..4d13f40 100644 --- a/CommandDistributor.cpp +++ b/CommandDistributor.cpp @@ -22,51 +22,10 @@ DCCEXParser * CommandDistributor::parser=0; -bool CommandDistributor::parse(byte clientId,byte * buffer, Print * streamer) { - - - // SIDE EFFECT WARNING::: - // We know that parser will read the entire buffer before starting to write to it. - // Otherwise we would have to copy the buffer elsewhere and RAM is in short supply. - - - bool closeAfter=false; - // Intercept HTTP requests - if (isHTTP(buffer)) { - if (httpCallback) httpCallback(streamer, buffer); - closeAfter = true; - } - else if (buffer[0] == '<') { +void CommandDistributor::parse(byte clientId,byte * buffer, Print * streamer) { + if (buffer[0] == '<') { if (!parser) parser = new DCCEXParser(); parser->parse(streamer, buffer, true); // tell JMRI parser that ACKS are blocking because we can't handle the async } else WiThrottle::getThrottle(clientId)->parse(*streamer, buffer); - - return closeAfter; } - -bool CommandDistributor::isHTTP(byte * buffer) { - - // POST GET PUT PATCH DELETE - // You may think a simple strstr() is better... but not when ram & time is in short supply - switch (buffer[0]) { - case 'P': - if (buffer[1] == 'U' && buffer[2] == 'T' && buffer[3] == ' ' ) return true; - if (buffer[1] == 'O' && buffer[2] == 'S' && buffer[3] == 'T' && buffer[4] == ' ') return true; - if (buffer[1] == 'A' && buffer[2] == 'T' && buffer[3] == 'C' && buffer[4] == 'H' && buffer[5] == ' ') return true; - return false; - case 'G': - if (buffer[1] == 'E' && buffer[2] == 'T' && buffer[3] == ' ' ) return true; - return false; - case 'D': - if (buffer[1] == 'E' && buffer[2] == 'L' && buffer[3] == 'E' && buffer[4] == 'T' && buffer[5] == 'E' && buffer[6] == ' ') return true; - return false; - default: - return false; - } -} - -void CommandDistributor::setHTTPCallback(HTTP_CALLBACK callback) { - httpCallback = callback; -} -HTTP_CALLBACK CommandDistributor::httpCallback=0; diff --git a/CommandDistributor.h b/CommandDistributor.h index 93e4d7c..2bec81b 100644 --- a/CommandDistributor.h +++ b/CommandDistributor.h @@ -20,18 +20,11 @@ #define CommandDistributor_h #include "DCCEXParser.h" -typedef void (*HTTP_CALLBACK)(Print *stream, byte *cmd); - class CommandDistributor { public : - static void setHTTPCallback(HTTP_CALLBACK callback); - static bool parse(byte clientId,byte* buffer, Print * streamer); - - + static void parse(byte clientId,byte* buffer, Print * streamer); private: - static HTTP_CALLBACK httpCallback; - static bool isHTTP(byte * buffer); static DCCEXParser * parser; }; diff --git a/RingStream.cpp b/RingStream.cpp new file mode 100644 index 0000000..d026e9b --- /dev/null +++ b/RingStream.cpp @@ -0,0 +1,70 @@ +/* + + (c) 2015 Ingo Fischer + buffer serial device + based on Arduino SoftwareSerial + + Constructor warning messages fixed by Chris Harlow. + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*/ + +#include "RingStream.h" +#include "DIAG.h" + +RingStream::RingStream( const uint16_t len) +{ + _len=len; + _buffer=new byte[len]; + _pos_write=0; + _pos_read=0; + _buffer[0]=0; + _overflow=false; +} + +size_t RingStream::write(uint8_t byte) { + if (_overflow) return 0; + _buffer[_pos_write] = byte; + ++_pos_write; + if (_pos_write>=_len) _pos_write=0; + if (_pos_write==_pos_read) { + _overflow=true; + DIAG(F("\nRingStream(%d) OVERFLOW %d %d \n"),_len, _pos_write, _pos_read); + return 0; + } + return 1; +} + +int RingStream::read() { + if (_pos_read==_pos_write) return -1; + byte b=_buffer[_pos_read]; + _pos_read++; + if (_pos_read>=_len) _pos_read=0; + _overflow=false; + return b; +} + + +int RingStream::count() { + int peek=_pos_read; + int counter=0; + while(_buffer[peek]) { + counter++; + peek++; + if (peek >= _len) peek=0; + } + return counter; + } diff --git a/RingStream.h b/RingStream.h new file mode 100644 index 0000000..dee00b4 --- /dev/null +++ b/RingStream.h @@ -0,0 +1,46 @@ +#ifndef RingStream_h +#define RingStream_h +/* + + (c) 2015 Ingo Fischer + buffer serial device + based on Arduino SoftwareSerial + + Constructor warning messages fixed by Chris Harlow. + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*/ +#include + +class RingStream : public Print { + + public: + RingStream( const uint16_t len); + + virtual size_t write(uint8_t b); + using Print::write; + int read(); + int count(); + + private: + int _len; + int _pos_write; + int _pos_read; + bool _overflow; + byte * _buffer; +}; + +#endif diff --git a/WiThrottle.cpp b/WiThrottle.cpp index ca706fe..a0ec254 100644 --- a/WiThrottle.cpp +++ b/WiThrottle.cpp @@ -101,15 +101,7 @@ WiThrottle::~WiThrottle() { void WiThrottle::parse(Print & stream, byte * cmdx) { - // we have to take a copy of the cmd buffer as the reply will get built into the cmdx - byte local[150]; - for (byte i=0;i #include "WifiInboundHandler.h" +#include "RingStream.h" #include "CommandDistributor.h" #include "DIAG.h" @@ -16,14 +17,10 @@ void WifiInboundHandler::loop() { WifiInboundHandler::WifiInboundHandler(Stream * ESStream) { wifiStream=ESStream; - for (int clientId=0;clientIdavailable() - clientBuffer[clientId]=new byte[MAX_WIFI_BUFFER+1]; - clientStream[clientId]=new MemStream(clientBuffer[clientId], MAX_WIFI_BUFFER); - } clientPendingCIPSEND=-1; + inboundRing=new RingStream(INBOUND_RING); + outboundRing=new RingStream(OUTBOUND_RING); + pendingCipsend=false; } @@ -31,38 +28,50 @@ WifiInboundHandler::WifiInboundHandler(Stream * ESStream) { // +IPD,x,lll:data is stored in streamer[x] // Other input returns void WifiInboundHandler::loop1() { - - // First handle all inbound traffic events - if (loop2()!=INBOUND_IDLE) return; + // First handle all inbound traffic events because they will block the sending + if (loop2()!=INBOUND_IDLE) return; // if nothing is already CIPSEND pending, we can CIPSEND one reply if (clientPendingCIPSEND<0) { - for (int clientId=0;clientIdavailable()); - StringFormatter::send(wifiStream, F("AT+CIPSEND=%d,%d\r\n"), clientId, clientStream[clientId]->available()); - clientStatus[clientId]=CIPSEND_PENDING; - return; + int next=outboundRing->read(); + if (next>=0) { + currentReplySize=outboundRing->count(); + if (currentReplySize==0) { + outboundRing->read(); // drop end marker } - } - } - - // if something waiting to close we can call one of them - - for (int clientId=0;clientIdread(); + if (next>0) { + int clientId=next-'0'; //convert char to int + int count=inboundRing->count(); + if (Diag::WIFI) DIAG(F("\nExec waiting %d %d:"),clientId,count); + byte cmd[count+1]; + for (int i=0;iread(); + cmd[count]=0; + if (Diag::WIFI) DIAG(F("%e\n"),cmd); + + outboundRing->print(clientId); + CommandDistributor::parse(clientId,cmd,outboundRing); + outboundRing->write((byte)0); return; } } -} + // This is a Finite State Automation (FSA) handling the inbound bytes from an ES AT command processor @@ -86,10 +95,11 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { } if (ch=='>') { - if (Diag::WIFI) DIAG(F("[[XMIT %d]]"),clientStream[clientPendingCIPSEND]->available()); - wifiStream->write(clientBuffer[clientPendingCIPSEND], clientStream[clientPendingCIPSEND]->available()); - clientStatus[clientPendingCIPSEND]=clientCloseAfterReply[clientPendingCIPSEND]? CLOSE_AFTER_SEND: UNUSED; + if (Diag::WIFI) DIAG(F("[[XMIT %d]]"),currentReplySize); + for (int i=0;iwrite(outboundRing->read()); + outboundRing->read(); // drop the end marker clientPendingCIPSEND=-1; + pendingCipsend=false; loopState=SKIPTOEND; break; } @@ -100,15 +110,12 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { } if (ch=='b') { // This is a busy indicator... probabaly must restart a CIPSEND - if (clientPendingCIPSEND>=0) { - clientStatus[clientPendingCIPSEND]=REPLY_PENDING; - clientPendingCIPSEND=-1; - } + pendingCipsend=(clientPendingCIPSEND>=0); loopState=SKIPTOEND; break; } - if (ch>='0' && ch<=('0'+MAX_CLIENTS)) { + if (ch>='0' && ch<='9') { runningClientId=ch-'0'; loopState=GOT_CLIENT_ID; break; @@ -133,7 +140,7 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { break; case IPD4_CLIENT: // reading connection id - if (ch >= '0' || ch <('0'+MAX_CLIENTS)){ + if (ch >= '0' || ch <='9'){ runningClientId=ch-'0'; loopState=IPD5; } @@ -151,8 +158,8 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { loopState=ANYTHING; break; } - clientStream[runningClientId]->flush(); // prepare streamer for input - clientStatus[runningClientId]=INBOUND_ARRIVING; + if (Diag::WIFI) DIAG(F("\nWifi inbound data(%d:%d):"),runningClientId,dataLength); + inboundRing->print(runningClientId); // prefix inbound with client id loopState=IPD_DATA; break; } @@ -160,12 +167,10 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { break; case IPD_DATA: // reading data - clientStream[runningClientId]->write(ch); // NOTE: The MemStream will throw away bytes that do not fit in the buffer. - // This protects against buffer overflows even with things as innocent - // as a browser which send massive, irrlevent HTTP headers. + inboundRing->write(ch); dataLength--; if (dataLength == 0) { - clientStatus[runningClientId]=READY_TO_PROCESS; + inboundRing->write((byte)0); loopState = ANYTHING; } break; @@ -181,8 +186,10 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { case GOT_CLIENT_ID3: // got "x C" before CLOSE or CONNECTED (which is ignored) if(ch=='L') { // CLOSE - clientStatus[runningClientId]=UNUSED; - if (runningClientId==clientPendingCIPSEND) clientPendingCIPSEND=-1; + if (runningClientId==clientPendingCIPSEND) { + // clear the outbound for this client + for (int i=0;i<=currentReplySize;i++) outboundRing->read(); + } } loopState=SKIPTOEND; break; @@ -194,25 +201,3 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { } // available return (loopState==ANYTHING) ? INBOUND_IDLE: INBOUND_BUSY; } - - -void WifiInboundHandler::processCommand(byte clientId) { - clientStatus[clientId]=PROCESSING; - byte * buffer=clientBuffer[clientId]; - MemStream * streamer=clientStream[clientId]; - buffer[streamer->available()]='\0'; - - if (Diag::WIFI) DIAG(F("\n%l Wifi(%d)<-[%e]\n"), millis(),clientId, buffer); - streamer->setBufferContentPosition(0, 0); // reset write position to start of buffer - - clientCloseAfterReply[clientId]=CommandDistributor::parse(clientId,buffer,streamer); - - if (streamer->available() == 0) { - clientStatus[clientId]=UNUSED; - } - else { - buffer[streamer->available()]='\0'; // mark end of buffer, so it can be used as a string later - if (Diag::WIFI) DIAG(F("%l WiFi(%d)->[%e] l(%d)\n"), millis(), clientId, buffer, streamer->available()); - clientStatus[clientId]=REPLY_PENDING; - } -} diff --git a/WifiInboundHandler.h b/WifiInboundHandler.h index 87ac89a..fa41aa4 100644 --- a/WifiInboundHandler.h +++ b/WifiInboundHandler.h @@ -1,8 +1,7 @@ #ifndef WifiInboundHandler_h #define WifiInboundHandler_h -#include "MemStream.h" -#include "DCCEXParser.h" +#include "RingStream.h" #include "DIAG.h" class WifiInboundHandler { @@ -14,9 +13,7 @@ class WifiInboundHandler { static WifiInboundHandler * singleton; - static const byte MAX_CLIENTS=5; - static const byte MAX_WIFI_BUFFER=255; - + enum INBOUND_STATE { INBOUND_BUSY, // keep calling in loop() INBOUND_IDLE // Nothing happening, outbound may xcall CIPSEND @@ -41,32 +38,23 @@ class WifiInboundHandler { GOT_CLIENT_ID3 // clientid prefix to CONNECTED / CLOSED }; - enum CLIENT_STATUS { - UNUSED, // client slot not in use - INBOUND_ARRIVING, // data is arriving - READY_TO_PROCESS, // data has arrived, may call parser now - PROCESSING, // command in progress - REPLY_PENDING, // reply is ready to CIPSEND - CIPSEND_PENDING, // CIPSEND waiting for > - CLOSE_PENDING, // CLOSE received - CLOSE_AFTER_SEND // Send CLOSE after CIPSEND completed - }; WifiInboundHandler(Stream * ESStream); void loop1(); INBOUND_STATE loop2(); - void processCommand(byte clientId); Stream * wifiStream; - DCCEXParser *parser; - + static const int INBOUND_RING = 200; + static const int OUTBOUND_RING = 1024; + + RingStream * inboundRing; + RingStream * outboundRing; + LOOP_STATE loopState=ANYTHING; int runningClientId; // latest client inbound processing data or CLOSE int dataLength; // dataLength of +IPD - byte * clientBuffer[MAX_CLIENTS]; - MemStream * clientStream[MAX_CLIENTS]; - CLIENT_STATUS clientStatus[MAX_CLIENTS]; - bool clientCloseAfterReply[MAX_CLIENTS]; int clientPendingCIPSEND=-1; + int currentReplySize; + bool pendingCipsend; }; #endif