From 88d5fd458063d20c1b378e19f56594cc97880e30 Mon Sep 17 00:00:00 2001 From: Asbelos Date: Mon, 4 Dec 2023 07:14:33 +0000 Subject: [PATCH] bidirectional communication! --- CommandDistributor.cpp | 18 ++++++++++--- CommandDistributor.h | 2 +- Websockets.cpp | 60 +++++++++++++++++++++++++++++++++++++++++- Websockets.h | 4 +++ WifiInboundHandler.cpp | 18 +++++++++---- 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/CommandDistributor.cpp b/CommandDistributor.cpp index aef461c..866dbaf 100644 --- a/CommandDistributor.cpp +++ b/CommandDistributor.cpp @@ -75,7 +75,7 @@ void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * stream if (clients[clientId] == NONE_TYPE) { auto websock=Websockets::checkConnectionString(clientId,buffer,stream); if (websock) { - clients[clientId]=COMMAND_TYPE; + clients[clientId]=WEBSOCK_CONNECTING_TYPE; return; } if (buffer[0] == '<') @@ -84,16 +84,28 @@ void CommandDistributor::parse(byte clientId,byte * buffer, RingStream * stream clients[clientId]=WITHROTTLE_TYPE; } + // after first inbound transmission the websocket is connected + if (clients[clientId]==WEBSOCK_CONNECTING_TYPE) + clients[clientId]=WEBSOCKET_TYPE; + + // mark buffer that is sent to parser - ring->mark(clientId); - // When type is known, send the string // to the right parser if (clients[clientId] == COMMAND_TYPE) { + ring->mark(clientId); DCCEXParser::parse(stream, buffer, ring); } else if (clients[clientId] == WITHROTTLE_TYPE) { + ring->mark(clientId); WiThrottle::getThrottle(clientId)->parse(ring, buffer); } + else if (clients[clientId] == WEBSOCKET_TYPE) { + buffer=Websockets::unmask(clientId,ring, buffer); + if (!buffer) return; // unmask may have handled it alrerday (ping/pong) + // mark ring with client flagged as websocket for transmission later + ring->mark(clientId | Websockets::WEBSOCK_CLIENT_MARKER); + DCCEXParser::parse(stream, buffer, ring); + } if (ring->peekTargetMark()!=RingStream::NO_CLIENT) { // The commit call will either write the length bytes diff --git a/CommandDistributor.h b/CommandDistributor.h index e4dff5d..3626cef 100644 --- a/CommandDistributor.h +++ b/CommandDistributor.h @@ -36,7 +36,7 @@ class CommandDistributor { public: - enum clientType: byte {NONE_TYPE,COMMAND_TYPE,WITHROTTLE_TYPE}; + enum clientType: byte {NONE_TYPE,COMMAND_TYPE,WITHROTTLE_TYPE,WEBSOCK_CONNECTING_TYPE,WEBSOCKET_TYPE}; private: static void broadcastToClients(clientType type); static StringBuffer * broadcastBufferWriter; diff --git a/Websockets.cpp b/Websockets.cpp index 97e8f4c..2c5fc86 100644 --- a/Websockets.cpp +++ b/Websockets.cpp @@ -59,7 +59,7 @@ bool Websockets::checkConnectionString(byte clientId,byte * cmd, RingStream * ou SHA1Update(&ctx, (unsigned char *)replyKey, strlen(replyKey)); SHA1Final(sha1HashBin, &ctx); - // ghenerate the response and embed the base64 encode + // generate the response and embed the base64 encode // of the key outbound->mark(clientId); outbound->print("HTTP/1.1 101 Switching Protocols\r\n" @@ -84,3 +84,61 @@ bool Websockets::checkConnectionString(byte clientId,byte * cmd, RingStream * ou return true; } +byte * Websockets::unmask(byte clientId,RingStream *ring, byte * buffer) { + // buffer should have a websocket header + //byte opcode=buffer[0] & 0x0f; + DIAG(F("Websock in: %x %x %x %x %x %x %x %x"), + buffer[0],buffer[1],buffer[2],buffer[3], + buffer[4],buffer[5],buffer[6]); + + byte opcode=buffer[0]; + if (opcode!=0x81) { + DIAG(F("unknown opcode ")); + return nullptr; + } + bool maskbit=buffer[1]&0x80; + int16_t payloadLength=buffer[1]&0x7f; + + byte * mask; + if (payloadLength<126) { + mask=buffer+2; + } + else { + payloadLength=(buffer[3]<<8)|(buffer[2]); + mask=buffer+4; + } + DIAG(F("Websock mb=%b pl=%d m=%x %x %x %x"), maskbit, payloadLength, + mask[0],mask[1],mask[2], mask[3]); + + if (payloadLength>100) return nullptr; // remove this check + byte * payload=mask+4; + for (int i=0;i=126)? 4:2; + } + + void Websockets::writeOutboundHeader(Print * stream,uint16_t dataLength) { + // write the outbound header + // length patched if necessary. + // text opcode, flag(126= use 2 length bytes, no mask bit) , length + if (dataLength>=126) { + const byte prefix[]={0x81,126, + (byte)(dataLength & 0xFF), (byte)(dataLength>>8)}; + stream->write(prefix,sizeof(prefix)); + } + else { + const byte prefix[]={0x81,(byte)dataLength}; + stream->write(prefix,sizeof(prefix)); + } + + } + + + diff --git a/Websockets.h b/Websockets.h index 0f8be04..25ed71d 100644 --- a/Websockets.h +++ b/Websockets.h @@ -5,6 +5,10 @@ class Websockets { public: static bool checkConnectionString(byte clientId,byte * cmd, RingStream * outbound ); + static byte * unmask(byte clientId,RingStream *ring, byte * buffer); + static int16_t getOutboundHeaderSize(uint16_t dataLength); + static void writeOutboundHeader(Print * stream,uint16_t dataLength); + static const byte WEBSOCK_CLIENT_MARKER=0x80; }; #endif \ No newline at end of file diff --git a/WifiInboundHandler.cpp b/WifiInboundHandler.cpp index b570527..aba473a 100644 --- a/WifiInboundHandler.cpp +++ b/WifiInboundHandler.cpp @@ -1,8 +1,7 @@ /* * © 2021 Fred Decker * © 2021 Fred Decker - * © 2020-2021 Chris Harlow - * © 2020, Chris Harlow. All rights reserved. + * © 2020-2023, Chris Harlow. All rights reserved. * © 2020, Harald Barth. * * This file is part of Asbelos DCC API @@ -26,6 +25,7 @@ #include "RingStream.h" #include "CommandDistributor.h" #include "DIAG.h" +#include "Websockets.h" WifiInboundHandler * WifiInboundHandler::singleton; @@ -67,8 +67,13 @@ void WifiInboundHandler::loop1() { if (pendingCipsend && millis()-lastCIPSEND > CIPSENDgap) { - if (Diag::WIFI) DIAG( F("WiFi: [[CIPSEND=%d,%d]]"), clientPendingCIPSEND, currentReplySize); - StringFormatter::send(wifiStream, F("AT+CIPSEND=%d,%d\r\n"), clientPendingCIPSEND, currentReplySize); + // add allowances for websockets + bool websocket=clientPendingCIPSEND & Websockets::WEBSOCK_CLIENT_MARKER; + byte realClient=clientPendingCIPSEND & ~Websockets::WEBSOCK_CLIENT_MARKER; + int16_t realSize=currentReplySize; + if (websocket) realSize+=Websockets::getOutboundHeaderSize(currentReplySize); + if (Diag::WIFI) DIAG( F("WiFi: [[CIPSEND=%d,%d]]"), realClient, realSize); + StringFormatter::send(wifiStream, F("AT+CIPSEND=%d,%d\r\n"), realClient,realSize); pendingCipsend=false; return; } @@ -112,9 +117,12 @@ WifiInboundHandler::INBOUND_STATE WifiInboundHandler::loop2() { } if (ch=='>') { - if (Diag::WIFI) DIAG(F("[XMIT %d]"),currentReplySize); + bool websocket=clientPendingCIPSEND & Websockets::WEBSOCK_CLIENT_MARKER; + if (Diag::WIFI) DIAG(F("[XMIT %d ws=%b]"),currentReplySize,websocket); + if (websocket) Websockets::writeOutboundHeader(wifiStream,currentReplySize); for (int i=0;iread(); + if (websocket && (cout=='\n')) cout=' '; wifiStream->write(cout); if (Diag::WIFI) StringFormatter::printEscape(cout); // DIAG in disguise }