mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 07:49:52 +03:00
applied bugfix
This commit is contained in:
@@ -21,24 +21,24 @@
|
||||
#include "AsyncEventSource.h"
|
||||
|
||||
static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
|
||||
String ev;
|
||||
String ev = "";
|
||||
|
||||
if(reconnect){
|
||||
ev += F("retry: ");
|
||||
ev += reconnect;
|
||||
ev += F("\r\n");
|
||||
ev += "retry: ";
|
||||
ev += String(reconnect);
|
||||
ev += "\r\n";
|
||||
}
|
||||
|
||||
if(id){
|
||||
ev += F("id: ");
|
||||
ev += "id: ";
|
||||
ev += String(id);
|
||||
ev += F("\r\n");
|
||||
ev += "\r\n";
|
||||
}
|
||||
|
||||
if(event != NULL){
|
||||
ev += F("event: ");
|
||||
ev += "event: ";
|
||||
ev += String(event);
|
||||
ev += F("\r\n");
|
||||
ev += "\r\n";
|
||||
}
|
||||
|
||||
if(message != NULL){
|
||||
@@ -54,9 +54,9 @@ static String generateEventMessage(const char *message, const char *event, uint3
|
||||
if(ldata != NULL){
|
||||
memcpy(ldata, lineStart, llen);
|
||||
ldata[llen] = 0;
|
||||
ev += F("data: ");
|
||||
ev += "data: ";
|
||||
ev += ldata;
|
||||
ev += F("\r\n\r\n");
|
||||
ev += "\r\n\r\n";
|
||||
free(ldata);
|
||||
}
|
||||
lineStart = (char *)message + messageLen;
|
||||
@@ -89,14 +89,14 @@ static String generateEventMessage(const char *message, const char *event, uint3
|
||||
if(ldata != NULL){
|
||||
memcpy(ldata, lineStart, llen);
|
||||
ldata[llen] = 0;
|
||||
ev += F("data: ");
|
||||
ev += "data: ";
|
||||
ev += ldata;
|
||||
ev += F("\r\n");
|
||||
ev += "\r\n";
|
||||
free(ldata);
|
||||
}
|
||||
lineStart = nextLine;
|
||||
if(lineStart == ((char *)message + messageLen))
|
||||
ev += F("\r\n");
|
||||
ev += "\r\n";
|
||||
}
|
||||
} while(lineStart < ((char *)message + messageLen));
|
||||
}
|
||||
@@ -123,8 +123,7 @@ AsyncEventSourceMessage::~AsyncEventSourceMessage() {
|
||||
free(_data);
|
||||
}
|
||||
|
||||
size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
|
||||
(void)time;
|
||||
size_t AsyncEventSourceMessage::ack(size_t len) {
|
||||
// If the whole message is now acked...
|
||||
if(_acked + len > _len){
|
||||
// Return the number of extra bytes acked (they will be carried on to the next message)
|
||||
@@ -137,17 +136,22 @@ size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// This could also return void as the return value is not used.
|
||||
// Leaving as-is for compatibility...
|
||||
size_t AsyncEventSourceMessage::write_buffer(AsyncClient *client) {
|
||||
if (!client->canSend())
|
||||
return 0;
|
||||
const size_t len = _len - _sent;
|
||||
if(client->space() < len){
|
||||
return 0;
|
||||
}
|
||||
size_t sent = client->add((const char *)_data, len);
|
||||
_sent += sent;
|
||||
return sent;
|
||||
}
|
||||
|
||||
size_t AsyncEventSourceMessage::send(AsyncClient *client) {
|
||||
if (_sent >= _len) {
|
||||
return 0;
|
||||
}
|
||||
const size_t len_to_send = _len - _sent;
|
||||
auto position = reinterpret_cast<const char*>(_data + _sent);
|
||||
const size_t sent_now = client->write(position, len_to_send);
|
||||
_sent += sent_now;
|
||||
return sent_now;
|
||||
size_t sent = write_buffer(client);
|
||||
client->send();
|
||||
return sent;
|
||||
}
|
||||
|
||||
// Client
|
||||
@@ -158,8 +162,8 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
|
||||
_client = request->client();
|
||||
_server = server;
|
||||
_lastId = 0;
|
||||
if(request->hasHeader(F("Last-Event-ID")))
|
||||
_lastId = atoi(request->getHeader(F("Last-Event-ID"))->value().c_str());
|
||||
if(request->hasHeader("Last-Event-ID"))
|
||||
_lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str());
|
||||
|
||||
_client->setRxTimeout(0);
|
||||
_client->onError(NULL, NULL);
|
||||
@@ -171,12 +175,12 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
|
||||
|
||||
_server->_addClient(this);
|
||||
delete request;
|
||||
|
||||
_client->setNoDelay(true);
|
||||
}
|
||||
|
||||
AsyncEventSourceClient::~AsyncEventSourceClient(){
|
||||
_lockmq.lock();
|
||||
_messageQueue.free();
|
||||
_lockmq.unlock();
|
||||
_messageQueue.free();
|
||||
close();
|
||||
}
|
||||
|
||||
@@ -187,41 +191,28 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
|
||||
delete dataMessage;
|
||||
return;
|
||||
}
|
||||
//length() is not thread-safe, thus acquiring the lock before this call..
|
||||
_lockmq.lock();
|
||||
|
||||
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
|
||||
// ets_printf(String(F("ERROR: Too many messages queued\n")).c_str());
|
||||
ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event");
|
||||
delete dataMessage;
|
||||
} else {
|
||||
_messageQueue.add(dataMessage);
|
||||
// runqueue trigger when new messages added
|
||||
if(_client->canSend()) {
|
||||
_runQueue();
|
||||
}
|
||||
_messageQueue.add(dataMessage);
|
||||
}
|
||||
_lockmq.unlock();
|
||||
if(_client->canSend())
|
||||
_runQueue();
|
||||
}
|
||||
|
||||
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
|
||||
// Same here, acquiring the lock early
|
||||
_lockmq.lock();
|
||||
while(len && !_messageQueue.isEmpty()){
|
||||
len = _messageQueue.front()->ack(len, time);
|
||||
if(_messageQueue.front()->finished())
|
||||
_messageQueue.remove(_messageQueue.front());
|
||||
}
|
||||
_runQueue();
|
||||
_lockmq.unlock();
|
||||
}
|
||||
|
||||
void AsyncEventSourceClient::_onPoll(){
|
||||
_lockmq.lock();
|
||||
if(!_messageQueue.isEmpty()){
|
||||
_runQueue();
|
||||
}
|
||||
_lockmq.unlock();
|
||||
}
|
||||
|
||||
|
||||
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
|
||||
_client->close(true);
|
||||
}
|
||||
@@ -245,24 +236,52 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
|
||||
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
|
||||
}
|
||||
|
||||
size_t AsyncEventSourceClient::packetsWaiting() const {
|
||||
size_t len;
|
||||
_lockmq.lock();
|
||||
len = _messageQueue.length();
|
||||
_lockmq.unlock();
|
||||
return len;
|
||||
}
|
||||
void AsyncEventSourceClient::_runQueue(){
|
||||
#if defined(ESP32)
|
||||
if(!this->_messageQueue_mutex.try_lock()) {
|
||||
return;
|
||||
}
|
||||
#else
|
||||
if(this->_messageQueue_processing){
|
||||
return;
|
||||
}
|
||||
this->_messageQueue_processing = true;
|
||||
#endif // ESP32
|
||||
|
||||
void AsyncEventSourceClient::_runQueue() {
|
||||
// Calls to this private method now already protected by _lockmq acquisition
|
||||
// so no extra call of _lockmq.lock() here..
|
||||
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
|
||||
// If it crashes here, iterator (i) has been invalidated as _messageQueue
|
||||
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
|
||||
if (!(*i)->sent()) {
|
||||
(*i)->send(_client);
|
||||
size_t total_bytes_written = 0;
|
||||
for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
|
||||
{
|
||||
if(!(*i)->sent()) {
|
||||
size_t bytes_written = (*i)->write_buffer(_client);
|
||||
total_bytes_written += bytes_written;
|
||||
if(bytes_written == 0)
|
||||
break;
|
||||
// todo: there is a further optimization to write a partial event to squeeze the last few bytes into the outgoing tcp send buffer, in
|
||||
// fact all of this code is already set up to do so, it's only write_buffer that needs to be updated to allow it instead of
|
||||
// returning zero when the full event won't fit into what's left of the buffer
|
||||
// todo: windows is taking 40-50ms to send an ack back while it waits for more data which won't come since this code must wait for ack first
|
||||
// due to system resource limitations - if the dashboard javascript just sends a single byte back per event received (which this
|
||||
// code would of course throw away as meaningless) then windows (or whatever other host runs the webbrower) will piggyback an ack
|
||||
// onto that outgoing packet for us, reducing roundtrip ack latency and potentially as much as trippling throughput again
|
||||
// (measured: ESP-01: 20ms to send another packet after ack received, windows: 40-50ms to ack after receiving a packet)
|
||||
}
|
||||
}
|
||||
if(total_bytes_written > 0)
|
||||
_client->send();
|
||||
|
||||
size_t len = total_bytes_written;
|
||||
while(len && !_messageQueue.isEmpty()){
|
||||
len = _messageQueue.front()->ack(len);
|
||||
if(_messageQueue.front()->finished()){
|
||||
_messageQueue.remove(_messageQueue.front());
|
||||
}
|
||||
}
|
||||
|
||||
#if defined(ESP32)
|
||||
this->_messageQueue_mutex.unlock();
|
||||
#else
|
||||
this->_messageQueue_processing = false;
|
||||
#endif // ESP32
|
||||
}
|
||||
|
||||
|
||||
@@ -282,10 +301,6 @@ void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
|
||||
_connectcb = cb;
|
||||
}
|
||||
|
||||
void AsyncEventSource::authorizeConnect(ArAuthorizeConnectHandler cb){
|
||||
_authorizeConnectHandler = cb;
|
||||
}
|
||||
|
||||
void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
|
||||
/*char * temp = (char *)malloc(2054);
|
||||
if(temp != NULL){
|
||||
@@ -299,22 +314,17 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
|
||||
client->write((const char *)temp, 2053);
|
||||
free(temp);
|
||||
}*/
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
|
||||
_clients.add(client);
|
||||
if(_connectcb)
|
||||
_connectcb(client);
|
||||
}
|
||||
|
||||
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
_clients.remove(client);
|
||||
}
|
||||
|
||||
void AsyncEventSource::close(){
|
||||
// While the whole loop is not done, the linked list is locked and so the
|
||||
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
|
||||
// is called very early
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
for(const auto &c: _clients){
|
||||
if(c->connected())
|
||||
c->close();
|
||||
@@ -323,25 +333,26 @@ void AsyncEventSource::close(){
|
||||
|
||||
// pmb fix
|
||||
size_t AsyncEventSource::avgPacketsWaiting() const {
|
||||
size_t aql = 0;
|
||||
uint32_t nConnectedClients = 0;
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
if (_clients.isEmpty()) {
|
||||
if(_clients.isEmpty())
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t aql=0;
|
||||
uint32_t nConnectedClients=0;
|
||||
|
||||
for(const auto &c: _clients){
|
||||
if(c->connected()) {
|
||||
aql += c->packetsWaiting();
|
||||
aql+=c->packetsWaiting();
|
||||
++nConnectedClients;
|
||||
}
|
||||
}
|
||||
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
|
||||
// return aql / nConnectedClients;
|
||||
return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
|
||||
}
|
||||
|
||||
void AsyncEventSource::send(
|
||||
const char *message, const char *event, uint32_t id, uint32_t reconnect){
|
||||
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
|
||||
|
||||
|
||||
String ev = generateEventMessage(message, event, id, reconnect);
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
for(const auto &c: _clients){
|
||||
if(c->connected()) {
|
||||
c->write(ev.c_str(), ev.length());
|
||||
@@ -350,32 +361,22 @@ void AsyncEventSource::send(
|
||||
}
|
||||
|
||||
size_t AsyncEventSource::count() const {
|
||||
size_t n_clients;
|
||||
AsyncWebLockGuard l(_client_queue_lock);
|
||||
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
|
||||
return c->connected();
|
||||
});
|
||||
return n_clients;
|
||||
return _clients.count_if([](AsyncEventSourceClient *c){
|
||||
return c->connected();
|
||||
});
|
||||
}
|
||||
|
||||
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){
|
||||
if(request->method() != HTTP_GET || !request->url().equals(_url)) {
|
||||
return false;
|
||||
}
|
||||
request->addInterestingHeader(F("Last-Event-ID"));
|
||||
request->addInterestingHeader("Cookie");
|
||||
request->addInterestingHeader("Last-Event-ID");
|
||||
return true;
|
||||
}
|
||||
|
||||
void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
|
||||
if((_username.length() && _password.length()) && !request->authenticate(_username.c_str(), _password.c_str())) {
|
||||
if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
|
||||
return request->requestAuthentication();
|
||||
}
|
||||
if(_authorizeConnectHandler != NULL){
|
||||
if(!_authorizeConnectHandler(request)){
|
||||
return request->send(401);
|
||||
}
|
||||
}
|
||||
request->send(new AsyncEventSourceResponse(this));
|
||||
}
|
||||
|
||||
@@ -384,10 +385,10 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){
|
||||
AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){
|
||||
_server = server;
|
||||
_code = 200;
|
||||
_contentType = F("text/event-stream");
|
||||
_contentType = "text/event-stream";
|
||||
_sendContentLength = false;
|
||||
addHeader(F("Cache-Control"), F("no-cache"));
|
||||
addHeader(F("Connection"), F("keep-alive"));
|
||||
addHeader("Cache-Control", "no-cache");
|
||||
addHeader("Connection","keep-alive");
|
||||
}
|
||||
|
||||
void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){
|
||||
@@ -402,4 +403,3 @@ size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -21,16 +21,19 @@
|
||||
#define ASYNCEVENTSOURCE_H_
|
||||
|
||||
#include <Arduino.h>
|
||||
#ifdef ESP32
|
||||
#include <Arduino.h>
|
||||
#if defined(ESP32) || defined(LIBRETINY)
|
||||
#include <AsyncTCP.h>
|
||||
#ifndef SSE_MAX_QUEUED_MESSAGES
|
||||
#define SSE_MAX_QUEUED_MESSAGES 32
|
||||
#endif
|
||||
#else
|
||||
#include <ESPAsyncTCP.h>
|
||||
#ifndef SSE_MAX_QUEUED_MESSAGES
|
||||
#define SSE_MAX_QUEUED_MESSAGES 8
|
||||
#endif
|
||||
|
||||
#if defined(ESP32)
|
||||
#include <mutex>
|
||||
#endif // ESP32
|
||||
|
||||
#ifndef SSE_MAX_QUEUED_MESSAGES
|
||||
#define SSE_MAX_QUEUED_MESSAGES 32
|
||||
#endif
|
||||
|
||||
#include <ESPAsyncWebServer.h>
|
||||
@@ -44,7 +47,7 @@
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#ifdef ESP32
|
||||
#if defined(ESP32) || defined(LIBRETINY)
|
||||
#define DEFAULT_MAX_SSE_CLIENTS 8
|
||||
#else
|
||||
#define DEFAULT_MAX_SSE_CLIENTS 4
|
||||
@@ -54,7 +57,6 @@ class AsyncEventSource;
|
||||
class AsyncEventSourceResponse;
|
||||
class AsyncEventSourceClient;
|
||||
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
|
||||
typedef std::function<bool(AsyncWebServerRequest *request)> ArAuthorizeConnectHandler;
|
||||
|
||||
class AsyncEventSourceMessage {
|
||||
private:
|
||||
@@ -66,7 +68,8 @@ class AsyncEventSourceMessage {
|
||||
public:
|
||||
AsyncEventSourceMessage(const char * data, size_t len);
|
||||
~AsyncEventSourceMessage();
|
||||
size_t ack(size_t len, uint32_t time __attribute__((unused)));
|
||||
size_t ack(size_t len);
|
||||
size_t write_buffer(AsyncClient *client);
|
||||
size_t send(AsyncClient *client);
|
||||
bool finished(){ return _acked == _len; }
|
||||
bool sent() { return _sent == _len; }
|
||||
@@ -77,9 +80,12 @@ class AsyncEventSourceClient {
|
||||
AsyncClient *_client;
|
||||
AsyncEventSource *_server;
|
||||
uint32_t _lastId;
|
||||
#if defined(ESP32)
|
||||
std::mutex _messageQueue_mutex;
|
||||
#else
|
||||
bool _messageQueue_processing{false};
|
||||
#endif // ESP32
|
||||
LinkedList<AsyncEventSourceMessage *> _messageQueue;
|
||||
// ArFi 2020-08-27 for protecting/serializing _messageQueue
|
||||
AsyncPlainLock _lockmq;
|
||||
void _queueMessage(AsyncEventSourceMessage *dataMessage);
|
||||
void _runQueue();
|
||||
|
||||
@@ -94,7 +100,7 @@ class AsyncEventSourceClient {
|
||||
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
||||
bool connected() const { return (_client != NULL) && _client->connected(); }
|
||||
uint32_t lastId() const { return _lastId; }
|
||||
size_t packetsWaiting() const;
|
||||
size_t packetsWaiting() const { return _messageQueue.length(); }
|
||||
|
||||
//system callbacks (do not call)
|
||||
void _onAck(size_t len, uint32_t time);
|
||||
@@ -107,11 +113,7 @@ class AsyncEventSource: public AsyncWebHandler {
|
||||
private:
|
||||
String _url;
|
||||
LinkedList<AsyncEventSourceClient *> _clients;
|
||||
// Same as for individual messages, protect mutations of _clients list
|
||||
// since simultaneous access from different tasks is possible
|
||||
AsyncWebLock _client_queue_lock;
|
||||
ArEventHandlerFunction _connectcb;
|
||||
ArAuthorizeConnectHandler _authorizeConnectHandler;
|
||||
public:
|
||||
AsyncEventSource(const String& url);
|
||||
~AsyncEventSource();
|
||||
@@ -119,10 +121,8 @@ class AsyncEventSource: public AsyncWebHandler {
|
||||
const char * url() const { return _url.c_str(); }
|
||||
void close();
|
||||
void onConnect(ArEventHandlerFunction cb);
|
||||
void authorizeConnect(ArAuthorizeConnectHandler cb);
|
||||
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
|
||||
// number of clients connected
|
||||
size_t count() const;
|
||||
size_t count() const; //number clinets connected
|
||||
size_t avgPacketsWaiting() const;
|
||||
|
||||
//system callbacks (do not call)
|
||||
|
||||
Reference in New Issue
Block a user