/* #ifndef WebSocketTxRx_h #define WebSocketTxRx_h #include #include #include #define WEB_SOCKET_CLIENT_ID_MSG_SIZE 128 #define WEB_SOCKET_ORIGIN "websocket" #define WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX "websocket:" using namespace std::placeholders; // for `_1` etc template class WebSocketConnector { protected: StatefulService * _statefulService; PsychicHttpServer * _server; AsyncWebSocket _webSocket; size_t _bufferSize; WebSocketConnector(StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, SecurityManager * securityManager, AuthenticationPredicate authenticationPredicate, size_t bufferSize) : _statefulService(statefulService) , _server(server) , _webSocket(webSocketPath) , _bufferSize(bufferSize) { _webSocket.setFilter(securityManager->filterRequest(authenticationPredicate)); _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent, this, _1, _2, _3, _4, _5, _6)); _server->addHandler(&_webSocket); _server->on(webSocketPath, HTTP_GET, std::bind(&WebSocketConnector::forbidden, this, _1)); } WebSocketConnector(StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, size_t bufferSize) : _statefulService(statefulService) , _server(server) , _webSocket(webSocketPath) , _bufferSize(bufferSize) { _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent, this, _1, _2, _3, _4, _5, _6)); _server->addHandler(&_webSocket); } virtual void onWSEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t * data, size_t len) = 0; String clientId(AsyncWebSocketClient * client) { return WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX + String(client->id()); } private: void forbidden(PsychicRequest * request) { request->send(403); } }; template class WebSocketTx : virtual public WebSocketConnector { public: WebSocketTx(JsonStateReader stateReader, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, SecurityManager * securityManager, AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate, bufferSize) , _stateReader(stateReader) { WebSocketConnector::_statefulService->addUpdateHandler([&](const String & originId) { transmitData(nullptr, originId); }, false); } WebSocketTx(JsonStateReader stateReader, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, bufferSize) , _stateReader(stateReader) { WebSocketConnector::_statefulService->addUpdateHandler([&](const String & originId) { transmitData(nullptr, originId); }, false); } protected: virtual void onWSEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t * data, size_t len) { if (type == WS_EVT_CONNECT) { // when a client connects, we transmit it's id and the current payload transmitId(client); transmitData(client, WEB_SOCKET_ORIGIN); } } private: JsonStateReader _stateReader; void transmitId(AsyncWebSocketClient * client) { DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_CLIENT_ID_MSG_SIZE); JsonObject root = jsonDocument.to(); root["type"] = "id"; root["id"] = WebSocketConnector::clientId(client); size_t len = measureJson(jsonDocument); AsyncWebSocketMessageBuffer * buffer = WebSocketConnector::_webSocket.makeBuffer(len); if (buffer) { serializeJson(jsonDocument, (char *)buffer->get(), len + 1); client->text(buffer); } } // Broadcasts the payload to the destination, if provided. Otherwise broadcasts to all clients except the origin, if // specified. // // Original implementation sent clients their own IDs so they could ignore updates they initiated. This approach // simplifies the client and the server implementation but may not be sufficent for all use-cases. // void transmitData(AsyncWebSocketClient * client, const String & originId) { DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector::_bufferSize); JsonObject root = jsonDocument.to(); root["type"] = "payload"; root["origin_id"] = originId; JsonObject payload = root.createNestedObject("payload"); WebSocketConnector::_statefulService->read(payload, _stateReader); size_t len = measureJson(jsonDocument); AsyncWebSocketMessageBuffer * buffer = WebSocketConnector::_webSocket.makeBuffer(len); if (buffer) { serializeJson(jsonDocument, (char *)buffer->get(), len + 1); if (client) { client->text(buffer); } else { WebSocketConnector::_webSocket.textAll(buffer); } } } }; template class WebSocketRx : virtual public WebSocketConnector { public: WebSocketRx(JsonStateUpdater stateUpdater, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, SecurityManager * securityManager, AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate, bufferSize) , _stateUpdater(stateUpdater) { } WebSocketRx(JsonStateUpdater stateUpdater, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, bufferSize) , _stateUpdater(stateUpdater) { } protected: virtual void onWSEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t * data, size_t len) { if (type == WS_EVT_DATA) { AwsFrameInfo * info = (AwsFrameInfo *)arg; if (info->final && info->index == 0 && info->len == len) { if (info->opcode == WS_TEXT) { DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector::_bufferSize); DeserializationError error = deserializeJson(jsonDocument, (char *)data); if (!error && jsonDocument.is()) { JsonObject jsonObject = jsonDocument.as(); WebSocketConnector::_statefulService->update(jsonObject, _stateUpdater, WebSocketConnector::clientId(client)); } } } } } private: JsonStateUpdater _stateUpdater; }; template class WebSocketTxRx : public WebSocketTx, public WebSocketRx { public: WebSocketTxRx(JsonStateReader stateReader, JsonStateUpdater stateUpdater, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, SecurityManager * securityManager, AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate, bufferSize) , WebSocketTx(stateReader, statefulService, server, webSocketPath, securityManager, authenticationPredicate, bufferSize) , WebSocketRx(stateUpdater, statefulService, server, webSocketPath, securityManager, authenticationPredicate, bufferSize) { } WebSocketTxRx(JsonStateReader stateReader, JsonStateUpdater stateUpdater, StatefulService * statefulService, PsychicHttpServer * server, const char * webSocketPath, size_t bufferSize = DEFAULT_BUFFER_SIZE) : WebSocketConnector(statefulService, server, webSocketPath, bufferSize) , WebSocketTx(stateReader, statefulService, server, webSocketPath, bufferSize) , WebSocketRx(stateUpdater, statefulService, server, webSocketPath, bufferSize) { } protected: void onWSEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t * data, size_t len) { WebSocketRx::onWSEvent(server, client, type, arg, data, len); WebSocketTx::onWSEvent(server, client, type, arg, data, len); } }; #endif */