/* Asynchronous WebServer library for Espressif MCUs Copyright (c) 2016 Hristo Gochkov. All rights reserved. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "Arduino.h" #include "AsyncEventSource.h" static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = ""; if(reconnect){ ev += "retry: "; ev += String(reconnect); ev += "\r\n"; } if(id){ ev += "id: "; ev += String(id); ev += "\r\n"; } if(event != NULL){ ev += "event: "; ev += String(event); ev += "\r\n"; } if(message != NULL){ size_t messageLen = strlen(message); char * lineStart = (char *)message; char * lineEnd; do { char * nextN = strchr(lineStart, '\n'); char * nextR = strchr(lineStart, '\r'); if(nextN == NULL && nextR == NULL){ size_t llen = ((char *)message + messageLen) - lineStart; char * ldata = (char *)malloc(llen+1); if(ldata != NULL){ memcpy(ldata, lineStart, llen); ldata[llen] = 0; ev += "data: "; ev += ldata; ev += "\r\n\r\n"; free(ldata); } lineStart = (char *)message + messageLen; } else { char * nextLine = NULL; if(nextN != NULL && nextR != NULL){ if(nextR < nextN){ lineEnd = nextR; if(nextN == (nextR + 1)) nextLine = nextN + 1; else nextLine = nextR + 1; } else { lineEnd = nextN; if(nextR == (nextN + 1)) nextLine = nextR + 1; else nextLine = nextN + 1; } } else if(nextN != NULL){ lineEnd = nextN; nextLine = nextN + 1; } else { lineEnd = nextR; nextLine = nextR + 1; } size_t llen = lineEnd - lineStart; char * ldata = (char *)malloc(llen+1); if(ldata != NULL){ memcpy(ldata, lineStart, llen); ldata[llen] = 0; ev += "data: "; ev += ldata; ev += "\r\n"; free(ldata); } lineStart = nextLine; if(lineStart == ((char *)message + messageLen)) ev += "\r\n"; } } while(lineStart < ((char *)message + messageLen)); } return ev; } // Message AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len) : _data(nullptr), _len(len), _sent(0), _acked(0) { _data = (uint8_t*)malloc(_len+1); if(_data == nullptr){ _len = 0; } else { memcpy(_data, data, len); _data[_len] = 0; } } AsyncEventSourceMessage::~AsyncEventSourceMessage() { if(_data != NULL) free(_data); } size_t AsyncEventSourceMessage::ack(size_t len) { // If the whole message is now acked... if(_acked + len > _len){ // Return the number of extra bytes acked (they will be carried on to the next message) const size_t extra = _acked + len - _len; _acked = _len; return extra; } // Return that no extra bytes left. _acked += len; return 0; } size_t AsyncEventSourceMessage::write_buffer(AsyncClient *client) { if (!client->canSend()) return 0; const size_t len = _len - _sent; if(client->space() < len){ return 0; } size_t sent = client->add((const char *)_data, len); _sent += sent; return sent; } size_t AsyncEventSourceMessage::send(AsyncClient *client) { size_t sent = write_buffer(client); client->send(); return sent; } // Client AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _messageQueue(LinkedList([](AsyncEventSourceMessage *m){ delete m; })) { _client = request->client(); _server = server; _lastId = 0; if(request->hasHeader("Last-Event-ID")) _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str()); _client->setRxTimeout(0); _client->onError(NULL, NULL); _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this); _client->onPoll([](void *r, AsyncClient* c){ (void)c; ((AsyncEventSourceClient*)(r))->_onPoll(); }, this); _client->onData(NULL, NULL); _client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this); _client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this); _server->_addClient(this); delete request; _client->setNoDelay(true); } AsyncEventSourceClient::~AsyncEventSourceClient(){ _messageQueue.free(); close(); } void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){ if(dataMessage == NULL) return; if(!connected()){ delete dataMessage; return; } if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){ ets_printf("AsyncEventSourceClient: ERROR: Queue is full, communications too slow, dropping event"); delete dataMessage; } else { _messageQueue.add(dataMessage); } if(_client->canSend()) _runQueue(); } void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){ _runQueue(); } void AsyncEventSourceClient::_onPoll(){ if(!_messageQueue.isEmpty()){ _runQueue(); } } void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){ _client->close(true); } void AsyncEventSourceClient::_onDisconnect(){ _client = NULL; _server->_handleDisconnect(this); } void AsyncEventSourceClient::close(){ if(_client != NULL) _client->close(); } void AsyncEventSourceClient::write(const char * message, size_t len){ _queueMessage(new AsyncEventSourceMessage(message, len)); } void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = generateEventMessage(message, event, id, reconnect); _queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length())); } void AsyncEventSourceClient::_runQueue(){ #if defined(ESP32) if(!this->_messageQueue_mutex.try_lock()) { return; } #else if(this->_messageQueue_processing){ return; } this->_messageQueue_processing = true; #endif // ESP32 size_t total_bytes_written = 0; for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { if(!(*i)->sent()) { size_t bytes_written = (*i)->write_buffer(_client); total_bytes_written += bytes_written; if(bytes_written == 0) break; // todo: there is a further optimization to write a partial event to squeeze the last few bytes into the outgoing tcp send buffer, in // fact all of this code is already set up to do so, it's only write_buffer that needs to be updated to allow it instead of // returning zero when the full event won't fit into what's left of the buffer // todo: windows is taking 40-50ms to send an ack back while it waits for more data which won't come since this code must wait for ack first // due to system resource limitations - if the dashboard javascript just sends a single byte back per event received (which this // code would of course throw away as meaningless) then windows (or whatever other host runs the webbrower) will piggyback an ack // onto that outgoing packet for us, reducing roundtrip ack latency and potentially as much as trippling throughput again // (measured: ESP-01: 20ms to send another packet after ack received, windows: 40-50ms to ack after receiving a packet) } } if(total_bytes_written > 0) _client->send(); size_t len = total_bytes_written; while(len && !_messageQueue.isEmpty()){ len = _messageQueue.front()->ack(len); if(_messageQueue.front()->finished()){ _messageQueue.remove(_messageQueue.front()); } } #if defined(ESP32) this->_messageQueue_mutex.unlock(); #else this->_messageQueue_processing = false; #endif // ESP32 } // Handler AsyncEventSource::AsyncEventSource(const String& url) : _url(url) , _clients(LinkedList([](AsyncEventSourceClient *c){ delete c; })) , _connectcb(NULL) {} AsyncEventSource::~AsyncEventSource(){ close(); } void AsyncEventSource::onConnect(ArEventHandlerFunction cb){ _connectcb = cb; } void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ /*char * temp = (char *)malloc(2054); if(temp != NULL){ memset(temp+1,' ',2048); temp[0] = ':'; temp[2049] = '\r'; temp[2050] = '\n'; temp[2051] = '\r'; temp[2052] = '\n'; temp[2053] = 0; client->write((const char *)temp, 2053); free(temp); }*/ _clients.add(client); if(_connectcb) _connectcb(client); } void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){ _clients.remove(client); } void AsyncEventSource::close(){ for(const auto &c: _clients){ if(c->connected()) c->close(); } } // pmb fix size_t AsyncEventSource::avgPacketsWaiting() const { if(_clients.isEmpty()) return 0; size_t aql=0; uint32_t nConnectedClients=0; for(const auto &c: _clients){ if(c->connected()) { aql+=c->packetsWaiting(); ++nConnectedClients; } } // return aql / nConnectedClients; return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up } void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){ String ev = generateEventMessage(message, event, id, reconnect); for(const auto &c: _clients){ if(c->connected()) { c->write(ev.c_str(), ev.length()); } } } size_t AsyncEventSource::count() const { return _clients.count_if([](AsyncEventSourceClient *c){ return c->connected(); }); } bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){ if(request->method() != HTTP_GET || !request->url().equals(_url)) { return false; } request->addInterestingHeader("Last-Event-ID"); return true; } void AsyncEventSource::handleRequest(AsyncWebServerRequest *request){ if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str())) return request->requestAuthentication(); request->send(new AsyncEventSourceResponse(this)); } // Response AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server){ _server = server; _code = 200; _contentType = "text/event-stream"; _sendContentLength = false; addHeader("Cache-Control", "no-cache"); addHeader("Connection","keep-alive"); } void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request){ String out = _assembleHead(request->version()); request->client()->write(out.c_str(), _headLength); _state = RESPONSE_WAIT_ACK; } size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))){ if(len){ new AsyncEventSourceClient(request, _server); } return 0; }