From 1e3285b299c291261a30c7f4209c42290660d577 Mon Sep 17 00:00:00 2001 From: proddy Date: Wed, 11 Sep 2024 22:30:26 +0200 Subject: [PATCH] applied bugfix --- .../src/AsyncEventSource.cpp | 206 +++++++++--------- lib/ESPAsyncWebServer/src/AsyncEventSource.h | 40 ++-- 2 files changed, 123 insertions(+), 123 deletions(-) diff --git a/lib/ESPAsyncWebServer/src/AsyncEventSource.cpp b/lib/ESPAsyncWebServer/src/AsyncEventSource.cpp index 04d7a3d50..69a0253ea 100644 --- a/lib/ESPAsyncWebServer/src/AsyncEventSource.cpp +++ b/lib/ESPAsyncWebServer/src/AsyncEventSource.cpp @@ -21,24 +21,24 @@ #include "AsyncEventSource.h" static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){ - String ev; + String ev = ""; if(reconnect){ - ev += F("retry: "); - ev += reconnect; - ev += F("\r\n"); + ev += "retry: "; + ev += String(reconnect); + ev += "\r\n"; } if(id){ - ev += F("id: "); + ev += "id: "; ev += String(id); - ev += F("\r\n"); + ev += "\r\n"; } if(event != NULL){ - ev += F("event: "); + ev += "event: "; ev += String(event); - ev += F("\r\n"); + ev += "\r\n"; } if(message != NULL){ @@ -54,9 +54,9 @@ static String generateEventMessage(const char *message, const char *event, uint3 if(ldata != NULL){ memcpy(ldata, lineStart, llen); ldata[llen] = 0; - ev += F("data: "); + ev += "data: "; ev += ldata; - ev += F("\r\n\r\n"); + ev += "\r\n\r\n"; free(ldata); } lineStart = (char *)message + messageLen; @@ -89,14 +89,14 @@ static String generateEventMessage(const char *message, const char *event, uint3 if(ldata != NULL){ memcpy(ldata, lineStart, llen); ldata[llen] = 0; - ev += F("data: "); + ev += "data: "; ev += ldata; - ev += F("\r\n"); + ev += "\r\n"; free(ldata); } lineStart = nextLine; if(lineStart == ((char *)message + messageLen)) - ev += F("\r\n"); + ev += "\r\n"; } } while(lineStart < ((char *)message + messageLen)); } @@ -123,8 +123,7 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() { free(_data); } -size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { - (void)time; +size_t AsyncEventSourceMessage::ack(size_t len) { // If the whole message is now acked... if(_acked + len > _len){ // Return the number of extra bytes acked (they will be carried on to the next message) @@ -137,17 +136,22 @@ size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { return 0; } -// This could also return void as the return value is not used. -// Leaving as-is for compatibility... +size_t AsyncEventSourceMessage::write_buffer(AsyncClient *client) { + if (!client->canSend()) + return 0; + const size_t len = _len - _sent; + if(client->space() < len){ + return 0; + } + size_t sent = client->add((const char *)_data, len); + _sent += sent; + return sent; +} + size_t AsyncEventSourceMessage::send(AsyncClient *client) { - if (_sent >= _len) { - return 0; - } - const size_t len_to_send = _len - _sent; - auto position = reinterpret_cast(_data + _sent); - const size_t sent_now = client->write(position, len_to_send); - _sent += sent_now; - return sent_now; + size_t sent = write_buffer(client); + client->send(); + return sent; } // Client @@ -158,8 +162,8 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A _client = request->client(); _server = server; _lastId = 0; - if(request->hasHeader(F("Last-Event-ID"))) - _lastId = atoi(request->getHeader(F("Last-Event-ID"))->value().c_str()); + if(request->hasHeader("Last-Event-ID")) + _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str()); _client->setRxTimeout(0); _client->onError(NULL, NULL); @@ -171,12 +175,12 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A _server->_addClient(this); delete request; + + _client->setNoDelay(true); } AsyncEventSourceClient::~AsyncEventSourceClient(){ - _lockmq.lock(); - _messageQueue.free(); - _lockmq.unlock(); + _messageQueue.free(); close(); } @@ -187,41 +191,28 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage) delete dataMessage; return; } - //length() is not thread-safe, thus acquiring the lock before this call.. - _lockmq.lock(); + if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ - // ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); + ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event"); delete dataMessage; } else { - _messageQueue.add(dataMessage); - // runqueue trigger when new messages added - if(_client->canSend()) { - _runQueue(); - } + _messageQueue.add(dataMessage); } - _lockmq.unlock(); + if(_client->canSend()) + _runQueue(); } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ - // Same here, acquiring the lock early - _lockmq.lock(); - while(len && !_messageQueue.isEmpty()){ - len = _messageQueue.front()->ack(len, time); - if(_messageQueue.front()->finished()) - _messageQueue.remove(_messageQueue.front()); - } _runQueue(); - _lockmq.unlock(); } void AsyncEventSourceClient::_onPoll(){ - _lockmq.lock(); if(!_messageQueue.isEmpty()){ _runQueue(); } - _lockmq.unlock(); } + void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ _client->close(true); } @@ -245,24 +236,52 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32 _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); } -size_t AsyncEventSourceClient::packetsWaiting() const { - size_t len; - _lockmq.lock(); - len = _messageQueue.length(); - _lockmq.unlock(); - return len; -} +void AsyncEventSourceClient::_runQueue(){ +#if defined(ESP32) + if(!this->_messageQueue_mutex.try_lock()) { + return; + } +#else + if(this->_messageQueue_processing){ + return; + } + this->_messageQueue_processing = true; +#endif // ESP32 -void AsyncEventSourceClient::_runQueue() { - // Calls to this private method now already protected by _lockmq acquisition - // so no extra call of _lockmq.lock() here.. - for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { - // If it crashes here, iterator (i) has been invalidated as _messageQueue - // has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) ) - if (!(*i)->sent()) { - (*i)->send(_client); + size_t total_bytes_written = 0; + for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) + { + if(!(*i)->sent()) { + size_t bytes_written = (*i)->write_buffer(_client); + total_bytes_written += bytes_written; + if(bytes_written == 0) + break; + // todo: there is a further optimization to write a partial event to squeeze the last few bytes into the outgoing tcp send buffer, in + // fact all of this code is already set up to do so, it's only write_buffer that needs to be updated to allow it instead of + // returning zero when the full event won't fit into what's left of the buffer + // todo: windows is taking 40-50ms to send an ack back while it waits for more data which won't come since this code must wait for ack first + // due to system resource limitations - if the dashboard javascript just sends a single byte back per event received (which this + // code would of course throw away as meaningless) then windows (or whatever other host runs the webbrower) will piggyback an ack + // onto that outgoing packet for us, reducing roundtrip ack latency and potentially as much as trippling throughput again + // (measured: ESP-01: 20ms to send another packet after ack received, windows: 40-50ms to ack after receiving a packet) } } + if(total_bytes_written > 0) + _client->send(); + + size_t len = total_bytes_written; + while(len && !_messageQueue.isEmpty()){ + len = _messageQueue.front()->ack(len); + if(_messageQueue.front()->finished()){ + _messageQueue.remove(_messageQueue.front()); + } + } + +#if defined(ESP32) + this->_messageQueue_mutex.unlock(); +#else + this->_messageQueue_processing = false; +#endif // ESP32 } @@ -282,10 +301,6 @@ void AsyncEventSource::onConnect(ArEventHandlerFunction cb){ _connectcb = cb; } -void AsyncEventSource::authorizeConnect(ArAuthorizeConnectHandler cb){ - _authorizeConnectHandler = cb; -} - void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ /*char * temp = (char *)malloc(2054); if(temp != NULL){ @@ -299,22 +314,17 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ client->write((const char *)temp, 2053); free(temp); }*/ - AsyncWebLockGuard l(_client_queue_lock); + _clients.add(client); if(_connectcb) _connectcb(client); } void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ - AsyncWebLockGuard l(_client_queue_lock); _clients.remove(client); } void AsyncEventSource::close(){ - // While the whole loop is not done, the linked list is locked and so the - // iterator should remain valid even when AsyncEventSource::_handleDisconnect() - // is called very early - AsyncWebLockGuard l(_client_queue_lock); for(const auto &c: _clients){ if(c->connected()) c->close(); @@ -323,25 +333,26 @@ void AsyncEventSource::close(){ // pmb fix size_t AsyncEventSource::avgPacketsWaiting() const { - size_t aql = 0; - uint32_t nConnectedClients = 0; - AsyncWebLockGuard l(_client_queue_lock); - if (_clients.isEmpty()) { + if(_clients.isEmpty()) return 0; - } + + size_t aql=0; + uint32_t nConnectedClients=0; + for(const auto &c: _clients){ if(c->connected()) { - aql += c->packetsWaiting(); + aql+=c->packetsWaiting(); ++nConnectedClients; } } - return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up +// return aql / nConnectedClients; + return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up } -void AsyncEventSource::send( - const char *message, const char *event, uint32_t id, uint32_t reconnect){ +void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ + + String ev = generateEventMessage(message, event, id, reconnect); - AsyncWebLockGuard l(_client_queue_lock); for(const auto &c: _clients){ if(c->connected()) { c->write(ev.c_str(), ev.length()); @@ -350,32 +361,22 @@ void AsyncEventSource::send( } size_t AsyncEventSource::count() const { - size_t n_clients; - AsyncWebLockGuard l(_client_queue_lock); - n_clients = _clients.count_if([](AsyncEventSourceClient *c){ - return c->connected(); - }); - return n_clients; + return _clients.count_if([](AsyncEventSourceClient *c){ + return c->connected(); + }); } bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){ if(request->method() != HTTP_GET || !request->url().equals(_url)) { return false; } - request->addInterestingHeader(F("Last-Event-ID")); - request->addInterestingHeader("Cookie"); + request->addInterestingHeader("Last-Event-ID"); return true; } void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){ - if((_username.length() && _password.length()) && !request->authenticate(_username.c_str(), _password.c_str())) { + if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str())) return request->requestAuthentication(); - } - if(_authorizeConnectHandler != NULL){ - if(!_authorizeConnectHandler(request)){ - return request->send(401); - } - } request->send(new AsyncEventSourceResponse(this)); } @@ -384,10 +385,10 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){ AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){ _server = server; _code = 200; - _contentType = F("text/event-stream"); + _contentType = "text/event-stream"; _sendContentLength = false; - addHeader(F("Cache-Control"), F("no-cache")); - addHeader(F("Connection"), F("keep-alive")); + addHeader("Cache-Control", "no-cache"); + addHeader("Connection","keep-alive"); } void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){ @@ -402,4 +403,3 @@ size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len } return 0; } - diff --git a/lib/ESPAsyncWebServer/src/AsyncEventSource.h b/lib/ESPAsyncWebServer/src/AsyncEventSource.h index 18868bafc..e52d91bae 100644 --- a/lib/ESPAsyncWebServer/src/AsyncEventSource.h +++ b/lib/ESPAsyncWebServer/src/AsyncEventSource.h @@ -21,16 +21,19 @@ #define ASYNCEVENTSOURCE_H_ #include -#ifdef ESP32 +#include +#if defined(ESP32) || defined(LIBRETINY) #include -#ifndef SSE_MAX_QUEUED_MESSAGES -#define SSE_MAX_QUEUED_MESSAGES 32 -#endif #else #include -#ifndef SSE_MAX_QUEUED_MESSAGES -#define SSE_MAX_QUEUED_MESSAGES 8 #endif + +#if defined(ESP32) +#include +#endif // ESP32 + +#ifndef SSE_MAX_QUEUED_MESSAGES +#define SSE_MAX_QUEUED_MESSAGES 32 #endif #include @@ -44,7 +47,7 @@ #endif #endif -#ifdef ESP32 +#if defined(ESP32) || defined(LIBRETINY) #define DEFAULT_MAX_SSE_CLIENTS 8 #else #define DEFAULT_MAX_SSE_CLIENTS 4 @@ -54,7 +57,6 @@ class AsyncEventSource; class AsyncEventSourceResponse; class AsyncEventSourceClient; typedef std::function ArEventHandlerFunction; -typedef std::function ArAuthorizeConnectHandler; class AsyncEventSourceMessage { private: @@ -66,7 +68,8 @@ class AsyncEventSourceMessage { public: AsyncEventSourceMessage(const char * data, size_t len); ~AsyncEventSourceMessage(); - size_t ack(size_t len, uint32_t time __attribute__((unused))); + size_t ack(size_t len); + size_t write_buffer(AsyncClient *client); size_t send(AsyncClient *client); bool finished(){ return _acked == _len; } bool sent() { return _sent == _len; } @@ -77,9 +80,12 @@ class AsyncEventSourceClient { AsyncClient *_client; AsyncEventSource *_server; uint32_t _lastId; +#if defined(ESP32) + std::mutex _messageQueue_mutex; +#else + bool _messageQueue_processing{false}; +#endif // ESP32 LinkedList _messageQueue; - // ArFi 2020-08-27 for protecting/serializing _messageQueue - AsyncPlainLock _lockmq; void _queueMessage(AsyncEventSourceMessage *dataMessage); void _runQueue(); @@ -94,7 +100,7 @@ class AsyncEventSourceClient { void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); bool connected() const { return (_client != NULL) && _client->connected(); } uint32_t lastId() const { return _lastId; } - size_t packetsWaiting() const; + size_t packetsWaiting() const { return _messageQueue.length(); } //system callbacks (do not call) void _onAck(size_t len, uint32_t time); @@ -107,11 +113,7 @@ class AsyncEventSource: public AsyncWebHandler { private: String _url; LinkedList _clients; - // Same as for individual messages, protect mutations of _clients list - // since simultaneous access from different tasks is possible - AsyncWebLock _client_queue_lock; ArEventHandlerFunction _connectcb; - ArAuthorizeConnectHandler _authorizeConnectHandler; public: AsyncEventSource(const String& url); ~AsyncEventSource(); @@ -119,10 +121,8 @@ class AsyncEventSource: public AsyncWebHandler { const char * url() const { return _url.c_str(); } void close(); void onConnect(ArEventHandlerFunction cb); - void authorizeConnect(ArAuthorizeConnectHandler cb); void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0); - // number of clients connected - size_t count() const; + size_t count() const; //number clinets connected size_t avgPacketsWaiting() const; //system callbacks (do not call) @@ -144,4 +144,4 @@ class AsyncEventSourceResponse: public AsyncWebServerResponse { }; -#endif /* ASYNCEVENTSOURCE_H_ */ +#endif /* ASYNCEVENTSOURCE_H_ */ \ No newline at end of file