diff --git a/lib/async-mqtt-client/README.md b/lib/async-mqtt-client/README.md deleted file mode 100644 index af66beea6..000000000 --- a/lib/async-mqtt-client/README.md +++ /dev/null @@ -1,18 +0,0 @@ -Async MQTT client for ESP8266 and ESP32 -============================= - -[![Build Status](https://img.shields.io/travis/marvinroger/async-mqtt-client/master.svg?style=flat-square)](https://travis-ci.org/marvinroger/async-mqtt-client) - -An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP) . -## Features - -* Compliant with the 3.1.1 version of the protocol -* Fully asynchronous -* Subscribe at QoS 0, 1 and 2 -* Publish at QoS 0, 1 and 2 -* SSL/TLS support -* Available in the [PlatformIO registry](http://platformio.org/lib/show/346/AsyncMqttClient) - -## Requirements, installation and usage - -The project is documented in the [/docs folder](docs). diff --git a/lib/async-mqtt-client/src/AsyncMqttClient.cpp b/lib/async-mqtt-client/src/AsyncMqttClient.cpp index bec082cb5..2b57425b4 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient.cpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient.cpp @@ -1,1017 +1,749 @@ #include "AsyncMqttClient.hpp" AsyncMqttClient::AsyncMqttClient() - : _connected(false) - , _lockMutiConnections(false) - , _connectPacketNotEnoughSpace(false) - , _disconnectFlagged(false) - , _tlsBadFingerprint(false) - , _lastClientActivity(0) - , _lastServerActivity(0) - , _lastPingRequestTime(0) - , _host(nullptr) - , _useIp(false) +: _client() +, _head(nullptr) +, _tail(nullptr) +, _sent(0) +, _state(DISCONNECTED) +, _tlsBadFingerprint(false) +, _lastClientActivity(0) +, _lastServerActivity(0) +, _lastPingRequestTime(0) +, _generatedClientId{0} +, _ip() +, _host(nullptr) +, _useIp(false) #if ASYNC_TCP_SSL_ENABLED - , _secure(false) +, _secure(false) #endif - , _port(0) - , _keepAlive(15) - , _cleanSession(true) - , _clientId(nullptr) - , _username(nullptr) - , _password(nullptr) - , _willTopic(nullptr) - , _willPayload(nullptr) - , _willPayloadLength(0) - , _willQos(0) - , _willRetain(false) - , _parsingInformation{.bufferState = AsyncMqttClientInternals::BufferState::NONE} - , _currentParsedPacket(nullptr) - , _remainingLengthBufferPosition(0) - , _nextPacketId(1) { - _client.onConnect([](void * obj, AsyncClient * c) { (static_cast(obj))->_onConnect(c); }, this); - _client.onDisconnect([](void * obj, AsyncClient * c) { (static_cast(obj))->_onDisconnect(c); }, this); - _client.onError([](void * obj, AsyncClient * c, int8_t error) { (static_cast(obj))->_onError(c, error); }, this); - _client.onTimeout([](void * obj, AsyncClient * c, uint32_t time) { (static_cast(obj))->_onTimeout(c, time); }, this); - _client.onAck([](void * obj, AsyncClient * c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(c, len, time); }, this); - _client - .onData([](void * obj, AsyncClient * c, void * data, size_t len) { (static_cast(obj))->_onData(c, static_cast(data), len); }, - this); - _client.onPoll([](void * obj, AsyncClient * c) { (static_cast(obj))->_onPoll(c); }, this); - +, _port(0) +, _keepAlive(15) +, _cleanSession(true) +, _clientId(nullptr) +, _username(nullptr) +, _password(nullptr) +, _willTopic(nullptr) +, _willPayload(nullptr) +, _willPayloadLength(0) +, _willQos(0) +, _willRetain(false) +#if ASYNC_TCP_SSL_ENABLED +, _secureServerFingerprints() +#endif +, _onConnectUserCallbacks() +, _onDisconnectUserCallbacks() +, _onSubscribeUserCallbacks() +, _onUnsubscribeUserCallbacks() +, _onMessageUserCallbacks() +, _onPublishUserCallbacks() +, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE } +, _currentParsedPacket(nullptr) +, _remainingLengthBufferPosition(0) +, _remainingLengthBuffer{0} +, _pendingPubRels() { + _client.onConnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onConnect(); }, this); + _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onDisconnect(); }, this); + // _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast(obj))->_onError(error); }, this); + // _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast(obj))->_onTimeout(); }, this); + _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(len); }, this); + _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast(obj))->_onData(static_cast(data), len); }, this); + _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(); }, this); + _client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes) #ifdef ESP32 - sprintf(_generatedClientId, "esp32%06x", (int)ESP.getEfuseMac()); - _xSemaphore = xSemaphoreCreateMutex(); + sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac()); + _xSemaphore = xSemaphoreCreateMutex(); #elif defined(ESP8266) - sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); + sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId()); #endif - _clientId = _generatedClientId; + _clientId = _generatedClientId; - setMaxTopicLength(128); + setMaxTopicLength(128); } AsyncMqttClient::~AsyncMqttClient() { - delete _currentParsedPacket; - delete[] _parsingInformation.topicBuffer; - for (auto callback : _onMessageUserCallbacks) { - delete callback.first; - } + delete _currentParsedPacket; + delete[] _parsingInformation.topicBuffer; + _clear(); + _pendingPubRels.clear(); + _pendingPubRels.shrink_to_fit(); + _clearQueue(false); // _clear() doesn't clear session data #ifdef ESP32 - vSemaphoreDelete(_xSemaphore); + vSemaphoreDelete(_xSemaphore); #endif } -AsyncMqttClient & AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { - _keepAlive = keepAlive; - return *this; +AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { + _keepAlive = keepAlive; + return *this; } -AsyncMqttClient & AsyncMqttClient::setClientId(const char * clientId) { - _clientId = clientId; - return *this; +AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) { + _clientId = clientId; + return *this; } -AsyncMqttClient & AsyncMqttClient::setCleanSession(bool cleanSession) { - _cleanSession = cleanSession; - return *this; +AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) { + _cleanSession = cleanSession; + return *this; } -AsyncMqttClient & AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) { - _parsingInformation.maxTopicLength = maxTopicLength; - delete[] _parsingInformation.topicBuffer; - _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; - return *this; +AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) { + _parsingInformation.maxTopicLength = maxTopicLength; + delete[] _parsingInformation.topicBuffer; + _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; + return *this; } -AsyncMqttClient & AsyncMqttClient::setCredentials(const char * username, const char * password) { - _username = username; - _password = password; - return *this; +AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) { + _username = username; + _password = password; + return *this; } -AsyncMqttClient & AsyncMqttClient::setWill(const char * topic, uint8_t qos, bool retain, const char * payload, size_t length) { - _willTopic = topic; - _willQos = qos; - _willRetain = retain; - _willPayload = payload; - _willPayloadLength = length; - return *this; +AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) { + _willTopic = topic; + _willQos = qos; + _willRetain = retain; + _willPayload = payload; + _willPayloadLength = length; + return *this; } -AsyncMqttClient & AsyncMqttClient::setServer(IPAddress ip, uint16_t port) { - _useIp = true; - _ip = ip; - _port = port; - return *this; +AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) { + _useIp = true; + _ip = ip; + _port = port; + return *this; } -AsyncMqttClient & AsyncMqttClient::setServer(const char * host, uint16_t port) { - _useIp = false; - _host = host; - _port = port; - return *this; +AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) { + _useIp = false; + _host = host; + _port = port; + return *this; } #if ASYNC_TCP_SSL_ENABLED -AsyncMqttClient & AsyncMqttClient::setSecure(bool secure) { - _secure = secure; - return *this; +AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) { + _secure = secure; + return *this; } -AsyncMqttClient & AsyncMqttClient::addServerFingerprint(const uint8_t * fingerprint) { - std::array newFingerprint; - memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); - _secureServerFingerprints.push_back(newFingerprint); - return *this; +AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) { + std::array newFingerprint; + memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); + _secureServerFingerprints.push_back(newFingerprint); + return *this; } #endif -AsyncMqttClient & AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) { - _onConnectUserCallbacks.push_back(callback); - return *this; +AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) { + _onConnectUserCallbacks.push_back(callback); + return *this; } -AsyncMqttClient & AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) { - _onDisconnectUserCallbacks.push_back(callback); - return *this; +AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) { + _onDisconnectUserCallbacks.push_back(callback); + return *this; } -AsyncMqttClient & AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) { - _onSubscribeUserCallbacks.push_back(callback); - return *this; +AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) { + _onSubscribeUserCallbacks.push_back(callback); + return *this; } -AsyncMqttClient & AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) { - _onUnsubscribeUserCallbacks.push_back(callback); - return *this; +AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) { + _onUnsubscribeUserCallbacks.push_back(callback); + return *this; } -AsyncMqttClient & AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic) { - onFilteredMessage(callback, _userTopic); - return *this; +AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) { + _onMessageUserCallbacks.push_back(callback); + return *this; } -AsyncMqttClient & AsyncMqttClient::onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic) { - // _onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(_userTopic, callback)); - _onMessageUserCallbacks.push_back(AsyncMqttClientInternals::onFilteredMessageUserCallback(strdup(_userTopic), callback)); // leakage issue - return *this; -} - -AsyncMqttClient & AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) { - _onPublishUserCallbacks.push_back(callback); - return *this; +AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) { + _onPublishUserCallbacks.push_back(callback); + return *this; } void AsyncMqttClient::_freeCurrentParsedPacket() { - delete _currentParsedPacket; - _currentParsedPacket = nullptr; + delete _currentParsedPacket; + _currentParsedPacket = nullptr; } void AsyncMqttClient::_clear() { - _lastPingRequestTime = 0; - _connected = false; - _disconnectFlagged = false; - _connectPacketNotEnoughSpace = false; - _tlsBadFingerprint = false; - _freeCurrentParsedPacket(); + _lastPingRequestTime = 0; + _tlsBadFingerprint = false; + _freeCurrentParsedPacket(); + _clearQueue(true); // keep session data for now - _pendingPubRels.clear(); - _pendingPubRels.shrink_to_fit(); - - _toSendAcks.clear(); - _toSendAcks.shrink_to_fit(); - - _nextPacketId = 1; - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; } /* TCP */ -void AsyncMqttClient::_onConnect(AsyncClient * client) { - (void)client; - _lockMutiConnections = true; - +void AsyncMqttClient::_onConnect() { + log_i("TCP conn, MQTT CONNECT"); #if ASYNC_TCP_SSL_ENABLED - if (_secure && _secureServerFingerprints.size() > 0) { - SSL * clientSsl = _client.getSSL(); + if (_secure && _secureServerFingerprints.size() > 0) { + SSL* clientSsl = _client.getSSL(); - bool sslFoundFingerprint = false; - for (std::array fingerprint : _secureServerFingerprints) { - if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { - sslFoundFingerprint = true; - break; - } - } - - if (!sslFoundFingerprint) { - _tlsBadFingerprint = true; - _client.close(true); - return; - } + bool sslFoundFingerprint = false; + for (std::array fingerprint : _secureServerFingerprints) { + if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { + sslFoundFingerprint = true; + break; + } } + + if (!sslFoundFingerprint) { + _tlsBadFingerprint = true; + _client.close(true); + return; + } + } #endif + AsyncMqttClientInternals::OutPacket* msg = + new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession, + _username, + _password, + _willTopic, + _willRetain, + _willQos, + _willPayload, + _willPayloadLength, + _keepAlive, + _clientId); + _addFront(msg); + _handleQueue(); +} - char fixedHeader[5]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED; +void AsyncMqttClient::_onDisconnect() { + log_i("TCP disconn"); + _state = DISCONNECTED; + AsyncMqttClientDisconnectReason reason; - uint16_t protocolNameLength = 4; - char protocolNameLengthBytes[2]; - protocolNameLengthBytes[0] = protocolNameLength >> 8; - protocolNameLengthBytes[1] = protocolNameLength & 0xFF; + if (_tlsBadFingerprint) { + reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; + } else { + reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; + } - char protocolLevel[1]; - protocolLevel[0] = 0x04; + _clear(); - char connectFlags[1]; - connectFlags[0] = 0; - if (_cleanSession) - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION; - if (_username != nullptr) - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME; - if (_password != nullptr) - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD; - if (_willTopic != nullptr) { - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL; - if (_willRetain) - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN; - switch (_willQos) { - case 0: - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0; + for (auto callback : _onDisconnectUserCallbacks) callback(reason); +} + +/* +void AsyncMqttClient::_onError(int8_t error) { + (void)error; + // _onDisconnect called anyway +} + +void AsyncMqttClient::_onTimeout() { + // disconnection will be handled by ping/pong management +} +*/ + +void AsyncMqttClient::_onAck(size_t len) { + log_i("ack %u", len); + _handleQueue(); +} + +void AsyncMqttClient::_onData(char* data, size_t len) { + log_i("data rcv (%u)", len); + size_t currentBytePosition = 0; + char currentByte; + _lastServerActivity = millis(); + do { + switch (_parsingInformation.bufferState) { + case AsyncMqttClientInternals::BufferState::NONE: + currentByte = data[currentBytePosition++]; + _parsingInformation.packetType = currentByte >> 4; + _parsingInformation.packetFlags = (currentByte << 4) >> 4; + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; + switch (_parsingInformation.packetType) { + case AsyncMqttClientInternals::PacketType.CONNACK: + log_i("rcv CONNACK"); + _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); break; - case 1: - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1; + case AsyncMqttClientInternals::PacketType.PINGRESP: + log_i("rcv PINGRESP"); + _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); break; - case 2: - connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2; + case AsyncMqttClientInternals::PacketType.SUBACK: + log_i("rcv SUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.UNSUBACK: + log_i("rcv UNSUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBLISH: + log_i("rcv PUBLISH"); + _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); + break; + case AsyncMqttClientInternals::PacketType.PUBREL: + log_i("rcv PUBREL"); + _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBACK: + log_i("rcv PUBACK"); + _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBREC: + log_i("rcv PUBREC"); + _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); + break; + case AsyncMqttClientInternals::PacketType.PUBCOMP: + log_i("rcv PUBCOMP"); + _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); + break; + default: + log_i("rcv PROTOCOL VIOLATION"); + disconnect(true); break; } + break; + case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: + currentByte = data[currentBytePosition++]; + _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; + if (currentByte >> 7 == 0) { + _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); + _remainingLengthBufferPosition = 0; + if (_parsingInformation.remainingLength > 0) { + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; + } else { + // PINGRESP is a special case where it has no variable header, so the packet ends right here + _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; + _onPingResp(); + } + } + break; + case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: + _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); + break; + case AsyncMqttClientInternals::BufferState::PAYLOAD: + _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); + break; + default: + currentBytePosition = len; + } + } while (currentBytePosition != len); +} + +void AsyncMqttClient::_onPoll() { + // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections + if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { + log_w("PING t/o, disconnecting"); + disconnect(true); + return; + } + // send ping to ensure the server will receive at least one message inside keepalive window + if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + // send ping to verify if the server is still there (ensure this is not a half connection) + } else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) { + _sendPing(); + } + _handleQueue(); +} + +/* QUEUE */ + +void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) { + // We only use this for QoS2 PUBREL so there must be a PUBLISH packet present. + // The queue therefore cannot be empty and _head points to this PUBLISH packet. + SEMAPHORE_TAKE(); + log_i("new insert #%u", packet->packetType()); + packet->next = _head->next; + _head->next = packet; + if (_head == _tail) { // PUB packet is the only one in the queue + _tail = packet; + } + SEMAPHORE_GIVE(); + _handleQueue(); +} + +void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) { + // This is only used for the CONNECT packet, to be able to establish a connection + // before anything else. The queue can be empty or has packets from the continued session. + // In both cases, _head should always point to the CONNECT packet afterwards. + SEMAPHORE_TAKE(); + log_i("new front #%u", packet->packetType()); + if (_head == nullptr) { + _tail = packet; + } else { + packet->next = _head; + } + _head = packet; + SEMAPHORE_GIVE(); + _handleQueue(); +} + +void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) { + SEMAPHORE_TAKE(); + log_i("new back #%u", packet->packetType()); + if (!_tail) { + _head = packet; + } else { + _tail->next = packet; + } + _tail = packet; + _tail->next = nullptr; + SEMAPHORE_GIVE(); + _handleQueue(); +} + +void AsyncMqttClient::_handleQueue() { + SEMAPHORE_TAKE(); + // On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock + bool disconnect = false; + + while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes + // 1. try to send + if (_head->size() > _sent) { + // On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length. + // So we calculate the amount to be written ourselves. + size_t willSend = std::min(_head->size() - _sent, _client.space()); + size_t realSent = _client.add(reinterpret_cast(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity + _sent += willSend; + (void)realSent; + _client.send(); + _lastClientActivity = millis(); + _lastPingRequestTime = 0; + #if ASYNC_TCP_SSL_ENABLED + log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size()); + #else + log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size()); + #endif + if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) { + disconnect = true; + } } - char keepAliveBytes[2]; - keepAliveBytes[0] = _keepAlive >> 8; - keepAliveBytes[1] = _keepAlive & 0xFF; - - uint16_t clientIdLength = strlen(_clientId); - char clientIdLengthBytes[2]; - clientIdLengthBytes[0] = clientIdLength >> 8; - clientIdLengthBytes[1] = clientIdLength & 0xFF; - - // Optional fields - uint16_t willTopicLength = 0; - char willTopicLengthBytes[2]; - uint16_t willPayloadLength = _willPayloadLength; - char willPayloadLengthBytes[2]; - if (_willTopic != nullptr) { - willTopicLength = strlen(_willTopic); - willTopicLengthBytes[0] = willTopicLength >> 8; - willTopicLengthBytes[1] = willTopicLength & 0xFF; - - if (_willPayload != nullptr && willPayloadLength == 0) - willPayloadLength = strlen(_willPayload); - - willPayloadLengthBytes[0] = willPayloadLength >> 8; - willPayloadLengthBytes[1] = willPayloadLength & 0xFF; + // 2. stop processing when we have to wait for an MQTT acknowledgment + if (_head->size() == _sent) { + if (_head->released()) { + log_i("p #%d rel", _head->packetType()); + AsyncMqttClientInternals::OutPacket* tmp = _head; + _head = _head->next; + if (!_head) _tail = nullptr; + delete tmp; + _sent = 0; + } else { + break; // sending is complete however send next only after mqtt confirmation + } } + } - uint16_t usernameLength = 0; - char usernameLengthBytes[2]; - if (_username != nullptr) { - usernameLength = strlen(_username); - usernameLengthBytes[0] = usernameLength >> 8; - usernameLengthBytes[1] = usernameLength & 0xFF; - } + SEMAPHORE_GIVE(); + if (disconnect) { + log_i("snd DISCONN, disconnecting"); + _client.close(); + } +} - uint16_t passwordLength = 0; - char passwordLengthBytes[2]; - if (_password != nullptr) { - passwordLength = strlen(_password); - passwordLengthBytes[0] = passwordLength >> 8; - passwordLengthBytes[1] = passwordLength & 0xFF; - } +void AsyncMqttClient::_clearQueue(bool keepSessionData) { + SEMAPHORE_TAKE(); + AsyncMqttClientInternals::OutPacket* packet = _head; + _head = nullptr; + _tail = nullptr; - uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present - if (_willTopic != nullptr) - remainingLength += 2 + willTopicLength + 2 + willPayloadLength; - if (_username != nullptr) - remainingLength += 2 + usernameLength; - if (_password != nullptr) - remainingLength += 2 + passwordLength; - uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); - - uint32_t neededSpace = 1 + remainingLengthLength; - neededSpace += 2; - neededSpace += protocolNameLength; - neededSpace += 1; - neededSpace += 1; - neededSpace += 2; - neededSpace += 2; - neededSpace += clientIdLength; - if (_willTopic != nullptr) { - neededSpace += 2; - neededSpace += willTopicLength; - - neededSpace += 2; - if (_willPayload != nullptr) - neededSpace += willPayloadLength; - } - if (_username != nullptr) { - neededSpace += 2; - neededSpace += usernameLength; - } - if (_password != nullptr) { - neededSpace += 2; - neededSpace += passwordLength; - } - - SEMAPHORE_TAKE(); - if (_client.space() < neededSpace) { - _connectPacketNotEnoughSpace = true; - _client.close(true); + while (packet) { + /* MQTT spec 3.1.2.4 Clean Session: + * - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged. + * - QoS 2 messages which have been received from the Server, but have not been completely acknowledged. + * + (unsent PUB messages with QoS > 0) + * + * To be kept: + * - possibly first message (sent to server but not acked) + * - PUBREC messages (QoS 2 PUB received but not acked) + * - PUBCOMP messages (QoS 2 PUBREL received but not acked) + */ + if (keepSessionData) { + if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type + reinterpret_cast(packet)->setDup(); + AsyncMqttClientInternals::OutPacket* next = packet->next; + log_i("keep #%u", packet->packetType()); SEMAPHORE_GIVE(); - return; - } - - _client.add(fixedHeader, 1 + remainingLengthLength, ASYNC_WRITE_FLAG_COPY); - _client.add(protocolNameLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add("MQTT", protocolNameLength, ASYNC_WRITE_FLAG_COPY); - _client.add(protocolLevel, 1, ASYNC_WRITE_FLAG_COPY); - _client.add(connectFlags, 1, ASYNC_WRITE_FLAG_COPY); - _client.add(keepAliveBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(clientIdLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(_clientId, clientIdLength, ASYNC_WRITE_FLAG_COPY); - if (_willTopic != nullptr) { - _client.add(willTopicLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(_willTopic, willTopicLength, ASYNC_WRITE_FLAG_COPY); - - _client.add(willPayloadLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - if (_willPayload != nullptr) - _client.add(_willPayload, willPayloadLength, ASYNC_WRITE_FLAG_COPY); - } - if (_username != nullptr) { - _client.add(usernameLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(_username, usernameLength, ASYNC_WRITE_FLAG_COPY); - } - if (_password != nullptr) { - _client.add(passwordLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(_password, passwordLength, ASYNC_WRITE_FLAG_COPY); - } - _client.send(); - _lastClientActivity = millis(); - SEMAPHORE_GIVE(); -} - -void AsyncMqttClient::_onDisconnect(AsyncClient * client) { - (void)client; - _lockMutiConnections = false; - if (!_disconnectFlagged) { - AsyncMqttClientDisconnectReason reason; - - if (_connectPacketNotEnoughSpace) { - reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE; - } else if (_tlsBadFingerprint) { - reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; - } else { - reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; - } - for (auto callback : _onDisconnectUserCallbacks) - callback(reason); - } - _clear(); -} - -void AsyncMqttClient::_onError(AsyncClient * client, int8_t error) { - (void)client; - (void)error; - // _onDisconnect called anyway -} - -void AsyncMqttClient::_onTimeout(AsyncClient * client, uint32_t time) { - (void)client; - (void)time; - // disconnection will be handled by ping/pong management -} - -void AsyncMqttClient::_onAck(AsyncClient * client, size_t len, uint32_t time) { - (void)client; - (void)len; - (void)time; -} - -void AsyncMqttClient::_onData(AsyncClient * client, char * data, size_t len) { - (void)client; - size_t currentBytePosition = 0; - char currentByte; - do { - switch (_parsingInformation.bufferState) { - case AsyncMqttClientInternals::BufferState::NONE: - currentByte = data[currentBytePosition++]; - _parsingInformation.packetType = currentByte >> 4; - _parsingInformation.packetFlags = (currentByte << 4) >> 4; - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; - _lastServerActivity = millis(); - switch (_parsingInformation.packetType) { - case AsyncMqttClientInternals::PacketType.CONNACK: - _currentParsedPacket = - new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, - std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); - break; - case AsyncMqttClientInternals::PacketType.PINGRESP: - _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); - break; - case AsyncMqttClientInternals::PacketType.SUBACK: - _currentParsedPacket = - new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, - std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); - break; - case AsyncMqttClientInternals::PacketType.UNSUBACK: - _currentParsedPacket = - new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBLISH: - _currentParsedPacket = - new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, - std::bind(&AsyncMqttClient::_onMessage, - this, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4, - std::placeholders::_5, - std::placeholders::_6, - std::placeholders::_7, - std::placeholders::_8, - std::placeholders::_9), - std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); - break; - case AsyncMqttClientInternals::PacketType.PUBREL: - _currentParsedPacket = - new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBACK: - _currentParsedPacket = - new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBREC: - _currentParsedPacket = - new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); - break; - case AsyncMqttClientInternals::PacketType.PUBCOMP: - _currentParsedPacket = - new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); - break; - default: - break; - } - break; - case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: - currentByte = data[currentBytePosition++]; - _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; - if (currentByte >> 7 == 0) { - _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); - _remainingLengthBufferPosition = 0; - if (_parsingInformation.remainingLength > 0) { - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; - } else { - // PINGRESP is a special case where it has no variable header, so the packet ends right here - _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; - _onPingResp(); - } - } - break; - case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: - _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); - break; - case AsyncMqttClientInternals::BufferState::PAYLOAD: - _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); - break; - default: - currentBytePosition = len; - } - } while (currentBytePosition != len); -} - -void AsyncMqttClient::_onPoll(AsyncClient * client) { - if (!_connected) - return; - - // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections - if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { - disconnect(true); - return; - } else if (millis() - _lastServerActivity >= (_keepAlive * 1000 * 2)) { - disconnect(true); - return; - // send ping to ensure the server will receive at least one message inside keepalive window - } else if (_lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * static_cast(1000 * 0.7))) { - _sendPing(); - - // send ping to verify if the server is still there (ensure this is not a half connection) - } else if (_connected && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * static_cast(1000 * 0.7))) { - _sendPing(); - } - - - // handle to send ack packets - - _sendAcks(); - - // handle disconnect - - if (_disconnectFlagged) { - _sendDisconnect(); + _addBack(packet); + SEMAPHORE_TAKE(); + packet = next; + } else if (packet->qos() > 0 || + packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC || + packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) { + AsyncMqttClientInternals::OutPacket* next = packet->next; + log_i("keep #%u", packet->packetType()); + SEMAPHORE_GIVE(); + _addBack(packet); + SEMAPHORE_TAKE(); + packet = next; + } else { + AsyncMqttClientInternals::OutPacket* next = packet->next; + delete packet; + packet = next; + } + /* Delete everything when not keeping session data + */ + } else { + AsyncMqttClientInternals::OutPacket* next = packet->next; + delete packet; + packet = next; } + } + _sent = 0; + SEMAPHORE_GIVE(); } /* MQTT */ void AsyncMqttClient::_onPingResp() { - _freeCurrentParsedPacket(); - _lastPingRequestTime = 0; + log_i("PINGRESP"); + _freeCurrentParsedPacket(); + _lastPingRequestTime = 0; } void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) { - (void)sessionPresent; - _freeCurrentParsedPacket(); + log_i("CONNACK"); + _freeCurrentParsedPacket(); - if (connectReturnCode == 0) { - _connected = true; - for (auto callback : _onConnectUserCallbacks) - callback(sessionPresent); - } else { - for (auto callback : _onDisconnectUserCallbacks) - callback(static_cast(connectReturnCode)); - _disconnectFlagged = true; - } + if (!sessionPresent) { + _pendingPubRels.clear(); + _pendingPubRels.shrink_to_fit(); + _clearQueue(false); // remove session data + } + + if (connectReturnCode == 0) { + _state = CONNECTED; + for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); + } else { + // Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib + } + _handleQueue(); // send any remaining data from continued session } void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) { - _freeCurrentParsedPacket(); + log_i("SUBACK"); + _freeCurrentParsedPacket(); + SEMAPHORE_TAKE(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("SUB released"); + } + SEMAPHORE_GIVE(); - for (auto callback : _onSubscribeUserCallbacks) - callback(packetId, status); + for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status); + + _handleQueue(); // subscribe confirmed, ready to send next queued item } void AsyncMqttClient::_onUnsubAck(uint16_t packetId) { - _freeCurrentParsedPacket(); + log_i("UNSUBACK"); + _freeCurrentParsedPacket(); + SEMAPHORE_TAKE(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("UNSUB released"); + } + SEMAPHORE_GIVE(); - for (auto callback : _onUnsubscribeUserCallbacks) - callback(packetId); + for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId); + + _handleQueue(); // unsubscribe confirmed, ready to send next queued item } -void AsyncMqttClient::_onMessage(char * topic, char * payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) { - bool notifyPublish = true; +void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) { + bool notifyPublish = true; - if (qos == 2) { - for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { - if (pendingPubRel.packetId == packetId) { - notifyPublish = false; - break; - } - } + if (qos == 2) { + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + notifyPublish = false; + break; + } } + } - if (notifyPublish) { - AsyncMqttClientMessageProperties properties; - properties.qos = qos; - properties.dup = dup; - properties.retain = retain; + if (notifyPublish) { + AsyncMqttClientMessageProperties properties; + properties.qos = qos; + properties.dup = dup; + properties.retain = retain; - for (auto callback : _onMessageUserCallbacks) { - bool mqttTopicMatch = false; - - if (strcmp(callback.first, "#") == 0 || strcmp(callback.first, topic) == 0) { - mqttTopicMatch = true; - } else { - char * messageTopic = strdup(topic); - char * userTopic = strdup(callback.first); - char * messageSubTopic = strtok_r(messageTopic, "/", &messageTopic); - char * userSubTopic = strtok_r(userTopic, "/", &userTopic); - - while (messageSubTopic != NULL || userSubTopic != NULL) { - if (messageSubTopic != NULL && userSubTopic == NULL) { - mqttTopicMatch = false; - break; - } else if (messageSubTopic == NULL && userSubTopic != NULL) { - mqttTopicMatch = false; - break; - } else if (mqttTopicMatch && strcmp(userSubTopic, "#") == 0) { - mqttTopicMatch = true; - break; - } else if (strcmp(messageSubTopic, userSubTopic) == 0 || strcmp(userSubTopic, "+") == 0) { - messageSubTopic = strtok_r(messageTopic, "/", &messageTopic); - userSubTopic = strtok_r(userTopic, "/", &userTopic); - mqttTopicMatch = true; - } else { - mqttTopicMatch = false; - break; - } - } - } - - if (mqttTopicMatch) { - callback.second(topic, payload, properties, len, index, total); - } - } - } + for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); + } } void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) { - AsyncMqttClientInternals::PendingAck pendingAck; + AsyncMqttClientInternals::PendingAck pendingAck; - if (qos == 1) { - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; - pendingAck.packetId = packetId; - _toSendAcks.push_back(pendingAck); - } else if (qos == 2) { - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; - pendingAck.packetId = packetId; - _toSendAcks.push_back(pendingAck); + if (qos == 1) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; + pendingAck.packetId = packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _addBack(msg); + } else if (qos == 2) { + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; + pendingAck.packetId = packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _addBack(msg); - bool pubRelAwaiting = false; - for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { - if (pendingPubRel.packetId == packetId) { - pubRelAwaiting = true; - break; - } - } - - if (!pubRelAwaiting) { - AsyncMqttClientInternals::PendingPubRel pendingPubRel; - pendingPubRel.packetId = packetId; - _pendingPubRels.push_back(pendingPubRel); - } - - _sendAcks(); + bool pubRelAwaiting = false; + for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { + if (pendingPubRel.packetId == packetId) { + pubRelAwaiting = true; + break; + } } - _freeCurrentParsedPacket(); + if (!pubRelAwaiting) { + AsyncMqttClientInternals::PendingPubRel pendingPubRel; + pendingPubRel.packetId = packetId; + _pendingPubRels.push_back(pendingPubRel); + } + } + + _freeCurrentParsedPacket(); } void AsyncMqttClient::_onPubRel(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); - AsyncMqttClientInternals::PendingAck pendingAck; - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; - pendingAck.packetId = packetId; - _toSendAcks.push_back(pendingAck); + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; + pendingAck.packetId = packetId; + if (_head && _head->packetId() == packetId) { + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + _head->release(); + _insert(msg); + log_i("PUBREC released"); + } - for (size_t i = 0; i < _pendingPubRels.size(); i++) { - if (_pendingPubRels[i].packetId == packetId) { - _pendingPubRels.erase(_pendingPubRels.begin() + i); - _pendingPubRels.shrink_to_fit(); - } + for (size_t i = 0; i < _pendingPubRels.size(); i++) { + if (_pendingPubRels[i].packetId == packetId) { + _pendingPubRels.erase(_pendingPubRels.begin() + i); + _pendingPubRels.shrink_to_fit(); } - - _sendAcks(); + } } void AsyncMqttClient::_onPubAck(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUB released"); + } - for (auto callback : _onPublishUserCallbacks) - callback(packetId); + for (auto callback : _onPublishUserCallbacks) callback(packetId); } void AsyncMqttClient::_onPubRec(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); - AsyncMqttClientInternals::PendingAck pendingAck; - pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; - pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; - pendingAck.packetId = packetId; - _toSendAcks.push_back(pendingAck); + // We will only be sending 1 QoS>0 PUB message at a time (to honor message + // ordering). So no need to store ACKS in a separate container as it will + // be stored in the outgoing queue until a PUBCOMP comes in. + AsyncMqttClientInternals::PendingAck pendingAck; + pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; + pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; + pendingAck.packetId = packetId; + log_i("snd PUBREL"); - _sendAcks(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUB released"); + } + _insert(msg); } void AsyncMqttClient::_onPubComp(uint16_t packetId) { - _freeCurrentParsedPacket(); + _freeCurrentParsedPacket(); - for (auto callback : _onPublishUserCallbacks) - callback(packetId); + // _head points to the PUBREL package + if (_head && _head->packetId() == packetId) { + _head->release(); + log_i("PUBREL released"); + } + + for (auto callback : _onPublishUserCallbacks) callback(packetId); } -bool AsyncMqttClient::_sendPing() { - char fixedHeader[2]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.PINGREQ; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED; - fixedHeader[1] = 0; - - size_t neededSpace = 2; - - SEMAPHORE_TAKE(false); - if (_client.space() < neededSpace) { - SEMAPHORE_GIVE(); - return false; - } - - _client.add(fixedHeader, 2, ASYNC_WRITE_FLAG_COPY); - _client.send(); - _lastClientActivity = millis(); - _lastPingRequestTime = millis(); - - SEMAPHORE_GIVE(); - return true; -} - -void AsyncMqttClient::_sendAcks() { - uint8_t neededAckSpace = 2 + 2; - - SEMAPHORE_TAKE(); - for (size_t i = 0; i < _toSendAcks.size(); i++) { - if (_client.space() < neededAckSpace) - break; - - AsyncMqttClientInternals::PendingAck pendingAck = _toSendAcks[i]; - - char fixedHeader[2]; - fixedHeader[0] = pendingAck.packetType; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | pendingAck.headerFlag; - fixedHeader[1] = 2; - - char packetIdBytes[2]; - packetIdBytes[0] = pendingAck.packetId >> 8; - packetIdBytes[1] = pendingAck.packetId & 0xFF; - - _client.add(fixedHeader, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(packetIdBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.send(); - - _toSendAcks.erase(_toSendAcks.begin() + i); - _toSendAcks.shrink_to_fit(); - - _lastClientActivity = millis(); - } - SEMAPHORE_GIVE(); -} - -bool AsyncMqttClient::_sendDisconnect() { - if (!_connected) - return true; - - const uint8_t neededSpace = 2; - - SEMAPHORE_TAKE(false); - - if (_client.space() < neededSpace) { - SEMAPHORE_GIVE(); - _client.close(true); - _clear(); - return false; - } - - char fixedHeader[2]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED; - fixedHeader[1] = 0; - - _client.add(fixedHeader, 2, ASYNC_WRITE_FLAG_COPY); - _client.send(); - _client.close(true); - - _disconnectFlagged = false; - - SEMAPHORE_GIVE(); - return true; -} - -uint16_t AsyncMqttClient::_getNextPacketId() { - uint16_t nextPacketId = _nextPacketId; - - if (_nextPacketId == 65535) - _nextPacketId = 0; // 0 is forbidden - _nextPacketId++; - - return nextPacketId; +void AsyncMqttClient::_sendPing() { + log_i("PING"); + _lastPingRequestTime = millis(); + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket; + _addBack(msg); } bool AsyncMqttClient::connected() const { - return _connected; + return _state == CONNECTED; } void AsyncMqttClient::connect() { - if (_connected) - return; - if (_lockMutiConnections) - return; - _lockMutiConnections = true; + if (_state != DISCONNECTED) return; + log_i("CONNECTING"); + _state = CONNECTING; + #if ASYNC_TCP_SSL_ENABLED - if (_useIp) { - _client.connect(_ip, _port, _secure); - } else { - _client.connect(_host, _port, _secure); - } + if (_useIp) { + _client.connect(_ip, _port, _secure); + } else { + _client.connect(_host, _port, _secure); + } #else - if (_useIp) { - _client.connect(_ip, _port); - } else { - _client.connect(_host, _port); - } + if (_useIp) { + _client.connect(_ip, _port); + } else { + _client.connect(_host, _port); + } #endif } void AsyncMqttClient::disconnect(bool force) { - if (!_connected) - return; - if (!_lockMutiConnections) - return; - _lockMutiConnections = false; - if (force) { - _client.close(true); - _clear(); - } else { - _disconnectFlagged = true; - _sendDisconnect(); - } + if (_state == DISCONNECTED) return; + log_i("DISCONNECT (f:%d)", force); + if (force) { + _state = DISCONNECTED; + _client.close(true); + } else if (_state != DISCONNECTING) { + _state = DISCONNECTING; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket; + _addBack(msg); + } } -uint16_t AsyncMqttClient::subscribe(const char * topic, uint8_t qos) { - if (!_connected) - return 0; +uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { + if (_state != CONNECTED) return 0; + log_i("SUBSCRIBE"); - char fixedHeader[5]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED; - - uint16_t topicLength = strlen(topic); - char topicLengthBytes[2]; - topicLengthBytes[0] = topicLength >> 8; - topicLengthBytes[1] = topicLength & 0xFF; - - char qosByte[1]; - qosByte[0] = qos; - - uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1); - - size_t neededSpace = 0; - neededSpace += 1 + remainingLengthLength; - neededSpace += 2; - neededSpace += 2; - neededSpace += topicLength; - neededSpace += 1; - - SEMAPHORE_TAKE(0); - if (_client.space() < neededSpace) { - SEMAPHORE_GIVE(); - return 0; - } - - uint16_t packetId = _getNextPacketId(); - char packetIdBytes[2]; - packetIdBytes[0] = packetId >> 8; - packetIdBytes[1] = packetId & 0xFF; - - _client.add(fixedHeader, 1 + remainingLengthLength, ASYNC_WRITE_FLAG_COPY); - _client.add(packetIdBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(topicLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(topic, topicLength, ASYNC_WRITE_FLAG_COPY); - _client.add(qosByte, 1, ASYNC_WRITE_FLAG_COPY); - _client.send(); - _lastClientActivity = millis(); - - SEMAPHORE_GIVE(); - return packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos); + _addBack(msg); + return msg->packetId(); } -uint16_t AsyncMqttClient::subscribe(const char * topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback) { - onFilteredMessage(callback, topic); - return subscribe(topic, qos); +uint16_t AsyncMqttClient::unsubscribe(const char* topic) { + if (_state != CONNECTED) return 0; + log_i("UNSUBSCRIBE"); + + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic); + _addBack(msg); + return msg->packetId(); } -uint16_t AsyncMqttClient::unsubscribe(const char * topic) { - if (!_connected) - return 0; +uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) { + if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0; + log_i("PUBLISH"); - char fixedHeader[5]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE; - fixedHeader[0] = fixedHeader[0] << 4; - fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED; - - uint16_t topicLength = strlen(topic); - char topicLengthBytes[2]; - topicLengthBytes[0] = topicLength >> 8; - topicLengthBytes[1] = topicLength & 0xFF; - - uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1); - - size_t neededSpace = 0; - neededSpace += 1 + remainingLengthLength; - neededSpace += 2; - neededSpace += 2; - neededSpace += topicLength; - - SEMAPHORE_TAKE(0); - if (_client.space() < neededSpace) { - SEMAPHORE_GIVE(); - return 0; - } - - uint16_t packetId = _getNextPacketId(); - char packetIdBytes[2]; - packetIdBytes[0] = packetId >> 8; - packetIdBytes[1] = packetId & 0xFF; - - _client.add(fixedHeader, 1 + remainingLengthLength, ASYNC_WRITE_FLAG_COPY); - _client.add(packetIdBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(topicLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(topic, topicLength, ASYNC_WRITE_FLAG_COPY); - _client.send(); - _lastClientActivity = millis(); - - SEMAPHORE_GIVE(); - return packetId; + AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length); + _addBack(msg); + return msg->packetId(); } -uint16_t AsyncMqttClient::publish(const char * topic, uint8_t qos, bool retain, const char * payload, size_t length, bool dup, uint16_t message_id) { - if (!_connected) - return 0; - - char fixedHeader[5]; - fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH; - fixedHeader[0] = fixedHeader[0] << 4; - if (dup) - fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP; - if (retain) - fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN; - switch (qos) { - case 0: - fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0; - break; - case 1: - fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1; - break; - case 2: - fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2; - break; - } - - uint16_t topicLength = strlen(topic); - char topicLengthBytes[2]; - topicLengthBytes[0] = topicLength >> 8; - topicLengthBytes[1] = topicLength & 0xFF; - - uint32_t payloadLength = length; - if (payload != nullptr && payloadLength == 0) - payloadLength = strlen(payload); - - uint32_t remainingLength = 2 + topicLength + payloadLength; - if (qos != 0) - remainingLength += 2; - uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); - - size_t neededSpace = 0; - neededSpace += 1 + remainingLengthLength; - neededSpace += 2; - neededSpace += topicLength; - if (qos != 0) - neededSpace += 2; - if (payload != nullptr) - neededSpace += payloadLength; - - SEMAPHORE_TAKE(0); - if (_client.space() < neededSpace) { - SEMAPHORE_GIVE(); - return 0; - } - - uint16_t packetId = 0; - char packetIdBytes[2]; - if (qos != 0) { - if (dup && message_id > 0) { - packetId = message_id; - } else { - packetId = _getNextPacketId(); - } - - packetIdBytes[0] = packetId >> 8; - packetIdBytes[1] = packetId & 0xFF; - } - - _client.add(fixedHeader, 1 + remainingLengthLength, ASYNC_WRITE_FLAG_COPY); - _client.add(topicLengthBytes, 2, ASYNC_WRITE_FLAG_COPY); - _client.add(topic, topicLength, ASYNC_WRITE_FLAG_COPY); - if (qos != 0) - _client.add(packetIdBytes, 2, ASYNC_WRITE_FLAG_COPY); - if (payload != nullptr) - _client.add(payload, payloadLength, ASYNC_WRITE_FLAG_COPY); - _client.send(); - _lastClientActivity = millis(); - - SEMAPHORE_GIVE(); - if (qos != 0) { - return packetId; - } else { - return 1; - } -} - -const char * AsyncMqttClient::getClientId() { - return _clientId; +const char* AsyncMqttClient::getClientId() const { + return _clientId; } diff --git a/lib/async-mqtt-client/src/AsyncMqttClient.h b/lib/async-mqtt-client/src/AsyncMqttClient.h index c3d22c585..23d30554d 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient.h +++ b/lib/async-mqtt-client/src/AsyncMqttClient.h @@ -3,4 +3,4 @@ #include "AsyncMqttClient.hpp" -#endif // SRC_ASYNCMQTTCLIENT_H_ +#endif // SRC_ASYNCMQTTCLIENT_H_ diff --git a/lib/async-mqtt-client/src/AsyncMqttClient.hpp b/lib/async-mqtt-client/src/AsyncMqttClient.hpp index 48976a296..9ae566223 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient.hpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient.hpp @@ -5,6 +5,10 @@ #include "Arduino.h" +#ifndef MQTT_MIN_FREE_MEMORY +#define MQTT_MIN_FREE_MEMORY 4096 +#endif + #ifdef ESP32 #include #include @@ -38,137 +42,137 @@ #include "AsyncMqttClient/Packets/PubRecPacket.hpp" #include "AsyncMqttClient/Packets/PubCompPacket.hpp" -#if ESP32 -#define SEMAPHORE_TAKE(X) \ - if (xSemaphoreTake(_xSemaphore, 1000 / portTICK_PERIOD_MS) != pdTRUE) { \ - return X; \ - } // Waits max 1000ms -#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore); -#elif defined(ESP8266) -#define SEMAPHORE_TAKE(X) void() -#define SEMAPHORE_GIVE() void() -#endif +#include "AsyncMqttClient/Packets/Out/Connect.hpp" +#include "AsyncMqttClient/Packets/Out/PingReq.hpp" +#include "AsyncMqttClient/Packets/Out/PubAck.hpp" +#include "AsyncMqttClient/Packets/Out/Disconn.hpp" +#include "AsyncMqttClient/Packets/Out/Subscribe.hpp" +#include "AsyncMqttClient/Packets/Out/Unsubscribe.hpp" +#include "AsyncMqttClient/Packets/Out/Publish.hpp" class AsyncMqttClient { - public: - AsyncMqttClient(); - ~AsyncMqttClient(); + public: + AsyncMqttClient(); + ~AsyncMqttClient(); - AsyncMqttClient & setKeepAlive(uint16_t keepAlive); - AsyncMqttClient & setClientId(const char * clientId); - AsyncMqttClient & setCleanSession(bool cleanSession); - AsyncMqttClient & setMaxTopicLength(uint16_t maxTopicLength); - AsyncMqttClient & setCredentials(const char * username, const char * password = nullptr); - AsyncMqttClient & setWill(const char * topic, uint8_t qos, bool retain, const char * payload = nullptr, size_t length = 0); - AsyncMqttClient & setServer(IPAddress ip, uint16_t port); - AsyncMqttClient & setServer(const char * host, uint16_t port); + AsyncMqttClient& setKeepAlive(uint16_t keepAlive); + AsyncMqttClient& setClientId(const char* clientId); + AsyncMqttClient& setCleanSession(bool cleanSession); + AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength); + AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr); + AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0); + AsyncMqttClient& setServer(IPAddress ip, uint16_t port); + AsyncMqttClient& setServer(const char* host, uint16_t port); #if ASYNC_TCP_SSL_ENABLED - AsyncMqttClient & setSecure(bool secure); - AsyncMqttClient & addServerFingerprint(const uint8_t * fingerprint); + AsyncMqttClient& setSecure(bool secure); + AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint); #endif - AsyncMqttClient & onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); - AsyncMqttClient & onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); - AsyncMqttClient & onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); - AsyncMqttClient & onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); - AsyncMqttClient & onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic = "#"); - AsyncMqttClient & onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic); - AsyncMqttClient & onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); + AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); + AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); + AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); + AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); + AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback); + AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback); - bool connected() const; - void connect(); - void disconnect(bool force = false); - uint16_t subscribe(const char * topic, uint8_t qos); - uint16_t subscribe(const char * topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback); - uint16_t unsubscribe(const char * topic); - uint16_t publish(const char * topic, uint8_t qos, bool retain, const char * payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); + bool connected() const; + void connect(); + void disconnect(bool force = false); + uint16_t subscribe(const char* topic, uint8_t qos); + uint16_t unsubscribe(const char* topic); + uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0); - const char * getClientId(); + const char* getClientId() const; - private: - AsyncClient _client; + private: + AsyncClient _client; + AsyncMqttClientInternals::OutPacket* _head; + AsyncMqttClientInternals::OutPacket* _tail; + size_t _sent; + enum { + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED + } _state; + bool _tlsBadFingerprint; + uint32_t _lastClientActivity; + uint32_t _lastServerActivity; + uint32_t _lastPingRequestTime; - bool _connected; - bool _lockMutiConnections; - bool _connectPacketNotEnoughSpace; - bool _disconnectFlagged; - bool _tlsBadFingerprint; - uint32_t _lastClientActivity; - uint32_t _lastServerActivity; - uint32_t _lastPingRequestTime; - - char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456 - IPAddress _ip; - const char * _host; - bool _useIp; + char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456 + IPAddress _ip; + const char* _host; + bool _useIp; #if ASYNC_TCP_SSL_ENABLED - bool _secure; + bool _secure; #endif - uint16_t _port; - uint16_t _keepAlive; - bool _cleanSession; - const char * _clientId; - const char * _username; - const char * _password; - const char * _willTopic; - const char * _willPayload; - uint16_t _willPayloadLength; - uint8_t _willQos; - bool _willRetain; + uint16_t _port; + uint16_t _keepAlive; + bool _cleanSession; + const char* _clientId; + const char* _username; + const char* _password; + const char* _willTopic; + const char* _willPayload; + uint16_t _willPayloadLength; + uint8_t _willQos; + bool _willRetain; #if ASYNC_TCP_SSL_ENABLED - std::vector> _secureServerFingerprints; + std::vector> _secureServerFingerprints; #endif - std::vector _onConnectUserCallbacks; - std::vector _onDisconnectUserCallbacks; - std::vector _onSubscribeUserCallbacks; - std::vector _onUnsubscribeUserCallbacks; - std::vector _onMessageUserCallbacks; - std::vector _onPublishUserCallbacks; + std::vector _onConnectUserCallbacks; + std::vector _onDisconnectUserCallbacks; + std::vector _onSubscribeUserCallbacks; + std::vector _onUnsubscribeUserCallbacks; + std::vector _onMessageUserCallbacks; + std::vector _onPublishUserCallbacks; - AsyncMqttClientInternals::ParsingInformation _parsingInformation; - AsyncMqttClientInternals::Packet * _currentParsedPacket; - uint8_t _remainingLengthBufferPosition; - char _remainingLengthBuffer[4]; + AsyncMqttClientInternals::ParsingInformation _parsingInformation; + AsyncMqttClientInternals::Packet* _currentParsedPacket; + uint8_t _remainingLengthBufferPosition; + char _remainingLengthBuffer[4]; - uint16_t _nextPacketId; + std::vector _pendingPubRels; - std::vector _pendingPubRels; - - std::vector _toSendAcks; - -#ifdef ESP32 - SemaphoreHandle_t _xSemaphore = nullptr; +#if defined(ESP32) + SemaphoreHandle_t _xSemaphore = nullptr; +#elif defined(ESP8266) + bool _xSemaphore = false; #endif - void _clear(); - void _freeCurrentParsedPacket(); + void _clear(); + void _freeCurrentParsedPacket(); - // TCP - void _onConnect(AsyncClient * client); - void _onDisconnect(AsyncClient * client); - static void _onError(AsyncClient * client, int8_t error); - void _onTimeout(AsyncClient * client, uint32_t time); - static void _onAck(AsyncClient * client, size_t len, uint32_t time); - void _onData(AsyncClient * client, char * data, size_t len); - void _onPoll(AsyncClient * client); + // TCP + void _onConnect(); + void _onDisconnect(); + // void _onError(int8_t error); + // void _onTimeout(); + void _onAck(size_t len); + void _onData(char* data, size_t len); + void _onPoll(); - // MQTT - void _onPingResp(); - void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); - void _onSubAck(uint16_t packetId, char status); - void _onUnsubAck(uint16_t packetId); - void _onMessage(char * topic, char * payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); - void _onPublish(uint16_t packetId, uint8_t qos); - void _onPubRel(uint16_t packetId); - void _onPubAck(uint16_t packetId); - void _onPubRec(uint16_t packetId); - void _onPubComp(uint16_t packetId); + // QUEUE + void _insert(AsyncMqttClientInternals::OutPacket* packet); // for PUBREL + void _addFront(AsyncMqttClientInternals::OutPacket* packet); // for CONNECT + void _addBack(AsyncMqttClientInternals::OutPacket* packet); // all the rest + void _handleQueue(); + void _clearQueue(bool keepSessionData); - bool _sendPing(); - void _sendAcks(); - bool _sendDisconnect(); + // MQTT + void _onPingResp(); + void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); + void _onSubAck(uint16_t packetId, char status); + void _onUnsubAck(uint16_t packetId); + void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); + void _onPublish(uint16_t packetId, uint8_t qos); + void _onPubRel(uint16_t packetId); + void _onPubAck(uint16_t packetId); + void _onPubRec(uint16_t packetId); + void _onPubComp(uint16_t packetId); - uint16_t _getNextPacketId(); + void _sendPing(); }; diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Callbacks.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Callbacks.hpp index b0d722265..414034daa 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient/Callbacks.hpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Callbacks.hpp @@ -4,6 +4,7 @@ #include "DisconnectReasons.hpp" #include "MessageProperties.hpp" +#include "Errors.hpp" namespace AsyncMqttClientInternals { // user callbacks @@ -12,9 +13,8 @@ typedef std::function OnDisconnect typedef std::function OnSubscribeUserCallback; typedef std::function OnUnsubscribeUserCallback; typedef std::function OnMessageUserCallback; -// typedef std::pair> onFilteredMessageUserCallback; -typedef std::pair> onFilteredMessageUserCallback; typedef std::function OnPublishUserCallback; +typedef std::function OnErrorUserCallback; // internal callbacks typedef std::function OnConnAckInternalCallback; diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Errors.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Errors.hpp new file mode 100644 index 000000000..f93e80e39 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Errors.hpp @@ -0,0 +1,6 @@ +#pragma once + +enum class AsyncMqttClientError : uint8_t { + MAX_RETRIES = 0, + OUT_OF_MEMORY = 1 +}; diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp index 711303328..ecb620f74 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Helpers.hpp @@ -35,4 +35,27 @@ class Helpers { return bytesNeeded; } }; + +#if defined(ARDUINO_ARCH_ESP32) + #define SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY) + #define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore) + #define GET_FREE_MEMORY() ESP.getMaxAllocHeap() + #include +#elif defined(ARDUINO_ARCH_ESP8266) + #define SEMAPHORE_TAKE(X) while (_xSemaphore) { /*ESP.wdtFeed();*/ } _xSemaphore = true + #define SEMAPHORE_GIVE() _xSemaphore = false + #define GET_FREE_MEMORY() ESP.getMaxFreeBlockSize() + #if defined(DEBUG_ESP_PORT) && defined(DEBUG_ASYNC_MQTT_CLIENT) + #define log_i(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n") + #define log_e(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n") + #define log_w(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n") + #else + #define log_i(...) + #define log_e(...) + #define log_w(...) + #endif +#else + #pragma error "No valid architecture" +#endif + } // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.cpp new file mode 100644 index 000000000..a9a86e453 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.cpp @@ -0,0 +1,162 @@ +#include "Connect.hpp" + +using AsyncMqttClientInternals::ConnectOutPacket; + +ConnectOutPacket::ConnectOutPacket(bool cleanSession, + const char* username, + const char* password, + const char* willTopic, + bool willRetain, + uint8_t willQos, + const char* willPayload, + uint16_t willPayloadLength, + uint16_t keepAlive, + const char* clientId) { + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED; + + uint16_t protocolNameLength = 4; + char protocolNameLengthBytes[2]; + protocolNameLengthBytes[0] = protocolNameLength >> 8; + protocolNameLengthBytes[1] = protocolNameLength & 0xFF; + + char protocolLevel[1]; + protocolLevel[0] = 0x04; + + char connectFlags[1]; + connectFlags[0] = 0; + if (cleanSession) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION; + if (username != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME; + if (password != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD; + if (willTopic != nullptr) { + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL; + if (willRetain) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN; + switch (willQos) { + case 0: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0; + break; + case 1: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1; + break; + case 2: + connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2; + break; + } + } + + char keepAliveBytes[2]; + keepAliveBytes[0] = keepAlive >> 8; + keepAliveBytes[1] = keepAlive & 0xFF; + + uint16_t clientIdLength = strlen(clientId); + char clientIdLengthBytes[2]; + clientIdLengthBytes[0] = clientIdLength >> 8; + clientIdLengthBytes[1] = clientIdLength & 0xFF; + + // Optional fields + uint16_t willTopicLength = 0; + char willTopicLengthBytes[2]; + char willPayloadLengthBytes[2]; + if (willTopic != nullptr) { + willTopicLength = strlen(willTopic); + willTopicLengthBytes[0] = willTopicLength >> 8; + willTopicLengthBytes[1] = willTopicLength & 0xFF; + + if (willPayload != nullptr && willPayloadLength == 0) willPayloadLength = strlen(willPayload); + + willPayloadLengthBytes[0] = willPayloadLength >> 8; + willPayloadLengthBytes[1] = willPayloadLength & 0xFF; + } + + uint16_t usernameLength = 0; + char usernameLengthBytes[2]; + if (username != nullptr) { + usernameLength = strlen(username); + usernameLengthBytes[0] = usernameLength >> 8; + usernameLengthBytes[1] = usernameLength & 0xFF; + } + + uint16_t passwordLength = 0; + char passwordLengthBytes[2]; + if (password != nullptr) { + passwordLength = strlen(password); + passwordLengthBytes[0] = passwordLength >> 8; + passwordLengthBytes[1] = passwordLength & 0xFF; + } + + uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present + if (willTopic != nullptr) remainingLength += 2 + willTopicLength + 2 + willPayloadLength; + if (username != nullptr) remainingLength += 2 + usernameLength; + if (password != nullptr) remainingLength += 2 + passwordLength; + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); + + uint32_t neededSpace = 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += protocolNameLength; + neededSpace += 1; + neededSpace += 1; + neededSpace += 2; + neededSpace += 2; + neededSpace += clientIdLength; + if (willTopic != nullptr) { + neededSpace += 2; + neededSpace += willTopicLength; + + neededSpace += 2; + if (willPayload != nullptr) neededSpace += willPayloadLength; + } + if (username != nullptr) { + neededSpace += 2; + neededSpace += usernameLength; + } + if (password != nullptr) { + neededSpace += 2; + neededSpace += passwordLength; + } + + _data.reserve(neededSpace); + + _data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength); + + _data.push_back(protocolNameLengthBytes[0]); + _data.push_back(protocolNameLengthBytes[1]); + + _data.push_back('M'); + _data.push_back('Q'); + _data.push_back('T'); + _data.push_back('T'); + + _data.push_back(protocolLevel[0]); + _data.push_back(connectFlags[0]); + _data.push_back(keepAliveBytes[0]); + _data.push_back(keepAliveBytes[1]); + _data.push_back(clientIdLengthBytes[0]); + _data.push_back(clientIdLengthBytes[1]); + + _data.insert(_data.end(), clientId, clientId + clientIdLength); + if (willTopic != nullptr) { + _data.insert(_data.end(), willTopicLengthBytes, willTopicLengthBytes + 2); + _data.insert(_data.end(), willTopic, willTopic + willTopicLength); + + _data.insert(_data.end(), willPayloadLengthBytes, willPayloadLengthBytes + 2); + if (willPayload != nullptr) _data.insert(_data.end(), willPayload, willPayload + willPayloadLength); + } + if (username != nullptr) { + _data.insert(_data.end(), usernameLengthBytes, usernameLengthBytes + 2); + _data.insert(_data.end(), username, username + usernameLength); + } + if (password != nullptr) { + _data.insert(_data.end(), passwordLengthBytes, passwordLengthBytes + 2); + _data.insert(_data.end(), password, password + passwordLength); + } +} + +const uint8_t* ConnectOutPacket::data(size_t index) const { + return &_data.data()[index]; +} + +size_t ConnectOutPacket::size() const { + return _data.size(); +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.hpp new file mode 100644 index 000000000..5b17632fd --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Connect.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include +#include // strlen + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" + +namespace AsyncMqttClientInternals { +class ConnectOutPacket : public OutPacket { + public: + ConnectOutPacket(bool cleanSession, + const char* username, + const char* password, + const char* willTopic, + bool willRetain, + uint8_t willQos, + const char* willPayload, + uint16_t willPayloadLength, + uint16_t keepAlive, + const char* clientId); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + std::vector _data; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.cpp new file mode 100644 index 000000000..3e2890df1 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.cpp @@ -0,0 +1,18 @@ +#include "Disconn.hpp" + +using AsyncMqttClientInternals::DisconnOutPacket; + +DisconnOutPacket::DisconnOutPacket() { + _data[0] = AsyncMqttClientInternals::PacketType.DISCONNECT; + _data[0] = _data[0] << 4; + _data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED; + _data[1] = 0; +} + +const uint8_t* DisconnOutPacket::data(size_t index) const { + return &_data[index]; +} + +size_t DisconnOutPacket::size() const { + return 2; +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.hpp new file mode 100644 index 000000000..38dc9152f --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Disconn.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" + +namespace AsyncMqttClientInternals { +class DisconnOutPacket : public OutPacket { + public: + DisconnOutPacket(); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + uint8_t _data[2]; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.cpp new file mode 100644 index 000000000..e69a87f8f --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.cpp @@ -0,0 +1,44 @@ +#include "OutPacket.hpp" + +using AsyncMqttClientInternals::OutPacket; + +OutPacket::OutPacket() +: next(nullptr) +, timeout(0) +, noTries(0) +, _released(true) +, _packetId(0) {} + +OutPacket::~OutPacket() {} + +bool OutPacket::released() const { + return _released; +} + +uint8_t OutPacket::packetType() const { + return data(0)[0] >> 4; +} + +uint16_t OutPacket::packetId() const { + return _packetId; +} + +uint8_t OutPacket::qos() const { + if (packetType() == AsyncMqttClientInternals::PacketType.PUBLISH) { + return (data()[1] & 0x06) >> 1; + } + return 0; +} + +void OutPacket::release() { + _released = true; +} + +uint16_t OutPacket::_nextPacketId = 0; + +uint16_t OutPacket::_getNextPacketId() { + if (++_nextPacketId == 0) { + ++_nextPacketId; + } + return _nextPacketId; +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.hpp new file mode 100644 index 000000000..52c37de0b --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/OutPacket.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include // uint*_t +#include // size_t +#include // std::min + +#include "../../Flags.hpp" + +namespace AsyncMqttClientInternals { +class OutPacket { + public: + OutPacket(); + virtual ~OutPacket(); + virtual const uint8_t* data(size_t index = 0) const = 0; + virtual size_t size() const = 0; + bool released() const; + uint8_t packetType() const; + uint16_t packetId() const; + uint8_t qos() const; + void release(); + + public: + OutPacket* next; + uint32_t timeout; + uint8_t noTries; + + protected: + static uint16_t _getNextPacketId(); + bool _released; + uint16_t _packetId; + + private: + static uint16_t _nextPacketId; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.cpp new file mode 100644 index 000000000..d59cf3dc1 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.cpp @@ -0,0 +1,18 @@ +#include "PingReq.hpp" + +using AsyncMqttClientInternals::PingReqOutPacket; + +PingReqOutPacket::PingReqOutPacket() { + _data[0] = AsyncMqttClientInternals::PacketType.PINGREQ; + _data[0] = _data[0] << 4; + _data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED; + _data[1] = 0; +} + +const uint8_t* PingReqOutPacket::data(size_t index) const { + return &_data[index];; +} + +size_t PingReqOutPacket::size() const { + return 2; +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.hpp new file mode 100644 index 000000000..1cb19a39e --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PingReq.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" + +namespace AsyncMqttClientInternals { +class PingReqOutPacket : public OutPacket { + public: + PingReqOutPacket(); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + uint8_t _data[2]; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.cpp new file mode 100644 index 000000000..8538404d5 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.cpp @@ -0,0 +1,26 @@ +#include "PubAck.hpp" + +using AsyncMqttClientInternals::PubAckOutPacket; + +PubAckOutPacket::PubAckOutPacket(PendingAck pendingAck) { + _data[0] = pendingAck.packetType; + _data[0] = _data[0] << 4; + _data[0] = _data[0] | pendingAck.headerFlag; + _data[1] = 2; + _packetId = pendingAck.packetId; + _data[2] = pendingAck.packetId >> 8; + _data[3] = pendingAck.packetId & 0xFF; + _released = false; + if (packetType() == AsyncMqttClientInternals::PacketType.PUBREL || + packetType() == AsyncMqttClientInternals::PacketType.PUBREC) { + _released = false; + } +} + +const uint8_t* PubAckOutPacket::data(size_t index) const { + return &_data[index]; +} + +size_t PubAckOutPacket::size() const { + return 4; +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.hpp new file mode 100644 index 000000000..9cd830ee4 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/PubAck.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" +#include "../../Storage.hpp" + +namespace AsyncMqttClientInternals { +class PubAckOutPacket : public OutPacket { + public: + explicit PubAckOutPacket(PendingAck pendingAck); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + uint8_t _data[4]; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.cpp new file mode 100644 index 000000000..3f4365b2d --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.cpp @@ -0,0 +1,69 @@ +#include "Publish.hpp" + +using AsyncMqttClientInternals::PublishOutPacket; + +PublishOutPacket::PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) { + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH; + fixedHeader[0] = fixedHeader[0] << 4; + // if (dup) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP; + if (retain) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN; + switch (qos) { + case 0: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0; + break; + case 1: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1; + break; + case 2: + fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2; + break; + } + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + uint32_t payloadLength = length; + if (payload != nullptr && payloadLength == 0) payloadLength = strlen(payload); + + uint32_t remainingLength = 2 + topicLength + payloadLength; + if (qos != 0) remainingLength += 2; + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += topicLength; + if (qos != 0) neededSpace += 2; + if (payload != nullptr) neededSpace += payloadLength; + + _data.reserve(neededSpace); + + _packetId = (qos !=0) ? _getNextPacketId() : 1; + char packetIdBytes[2]; + packetIdBytes[0] = _packetId >> 8; + packetIdBytes[1] = _packetId & 0xFF; + + _data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength); + _data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2); + _data.insert(_data.end(), topic, topic + topicLength); + if (qos != 0) { + _data.insert(_data.end(), packetIdBytes, packetIdBytes + 2); + _released = false; + } + if (payload != nullptr) _data.insert(_data.end(), payload, payload + payloadLength); +} + +const uint8_t* PublishOutPacket::data(size_t index) const { + return &_data.data()[index]; +} + +size_t PublishOutPacket::size() const { + return _data.size(); +} + +void PublishOutPacket::setDup() { + _data[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP; +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.hpp new file mode 100644 index 000000000..6b8272ec9 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Publish.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include // strlen +#include + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" +#include "../../Storage.hpp" + +namespace AsyncMqttClientInternals { +class PublishOutPacket : public OutPacket { + public: + PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + void setDup(); // you cannot unset dup + + private: + std::vector _data; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.cpp new file mode 100644 index 000000000..85c10db4b --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.cpp @@ -0,0 +1,49 @@ +#include "Subscribe.hpp" + +using AsyncMqttClientInternals::SubscribeOutPacket; + +SubscribeOutPacket::SubscribeOutPacket(const char* topic, uint8_t qos) { + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED; + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + char qosByte[1]; + qosByte[0] = qos; + + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += 2; + neededSpace += topicLength; + neededSpace += 1; + + _data.reserve(neededSpace); + + _packetId = _getNextPacketId(); + char packetIdBytes[2]; + packetIdBytes[0] = _packetId >> 8; + packetIdBytes[1] = _packetId & 0xFF; + + _data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength); + _data.insert(_data.end(), packetIdBytes, packetIdBytes + 2); + _data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2); + _data.insert(_data.end(), topic, topic + topicLength); + _data.push_back(qosByte[0]); + _released = false; +} + +const uint8_t* SubscribeOutPacket::data(size_t index) const { + return &_data.data()[index]; +} + +size_t SubscribeOutPacket::size() const { + return _data.size(); +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.hpp new file mode 100644 index 000000000..1f85f5913 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Subscribe.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include // strlen +#include + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" +#include "../../Storage.hpp" + +namespace AsyncMqttClientInternals { +class SubscribeOutPacket : public OutPacket { + public: + SubscribeOutPacket(const char* topic, uint8_t qos); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + std::vector _data; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.cpp new file mode 100644 index 000000000..4d859c9a4 --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.cpp @@ -0,0 +1,42 @@ +#include "Unsubscribe.hpp" + +using AsyncMqttClientInternals::UnsubscribeOutPacket; + +UnsubscribeOutPacket::UnsubscribeOutPacket(const char* topic) { + char fixedHeader[5]; + fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE; + fixedHeader[0] = fixedHeader[0] << 4; + fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED; + + uint16_t topicLength = strlen(topic); + char topicLengthBytes[2]; + topicLengthBytes[0] = topicLength >> 8; + topicLengthBytes[1] = topicLength & 0xFF; + + uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1); + + size_t neededSpace = 0; + neededSpace += 1 + remainingLengthLength; + neededSpace += 2; + neededSpace += 2; + neededSpace += topicLength; + + _packetId = _getNextPacketId(); + char packetIdBytes[2]; + packetIdBytes[0] = _packetId >> 8; + packetIdBytes[1] = _packetId & 0xFF; + + _data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength); + _data.insert(_data.end(), packetIdBytes, packetIdBytes + 2); + _data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2); + _data.insert(_data.end(), topic, topic + topicLength); + _released = false; +} + +const uint8_t* UnsubscribeOutPacket::data(size_t index) const { + return &_data.data()[index]; +} + +size_t UnsubscribeOutPacket::size() const { + return _data.size(); +} diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.hpp new file mode 100644 index 000000000..621802f6f --- /dev/null +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/Out/Unsubscribe.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include // strlen +#include + +#include "OutPacket.hpp" +#include "../../Flags.hpp" +#include "../../Helpers.hpp" +#include "../../Storage.hpp" + +namespace AsyncMqttClientInternals { +class UnsubscribeOutPacket : public OutPacket { + public: + explicit UnsubscribeOutPacket(const char* topic); + const uint8_t* data(size_t index = 0) const; + size_t size() const; + + private: + std::vector _data; +}; +} // namespace AsyncMqttClientInternals diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.cpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.cpp index ee3ce9734..2c5192f0c 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.cpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.cpp @@ -16,8 +16,7 @@ PublishPacket::PublishPacket(ParsingInformation* parsingInformation, OnMessageIn , _packetIdMsb(0) , _packetId(0) , _payloadLength(0) -, _payloadBytesRead(0) -, _ptempbuff(0) { +, _payloadBytesRead(0) { _dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP; _retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN; char qosMasked = _parsingInformation->packetFlags & 0x06; @@ -79,36 +78,14 @@ void PublishPacket::_preparePayloadHandling(uint32_t payloadLength) { void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { size_t remainToRead = len - (*currentBytePosition); - if (_payloadBytesRead + remainToRead > _payloadLength) - remainToRead = _payloadLength - _payloadBytesRead; - if (!_ignore) { - if (remainToRead < _payloadLength) { - if (!_ptempbuff) { - _ptempbuff = new char[_payloadLength + 1]; - if (_ptempbuff == nullptr) { - _ignore = true; - return; - } - memset(_ptempbuff, 0, _payloadLength + 1); - memcpy(&_ptempbuff[_payloadBytesRead], &data[(*currentBytePosition)], remainToRead); - } else { - memcpy(&_ptempbuff[_payloadBytesRead], &data[(*currentBytePosition)], remainToRead); - if ((_payloadBytesRead + remainToRead) == _payloadLength) { - _dataCallback(_parsingInformation->topicBuffer, _ptempbuff, _qos, _dup, _retain, _payloadLength, 0, _payloadLength, _packetId); - delete[] _ptempbuff; - _ptempbuff = NULL; - } - } - } else { - _dataCallback(_parsingInformation->topicBuffer, &data[(*currentBytePosition)], _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId); - } - } + if (_payloadBytesRead + remainToRead > _payloadLength) remainToRead = _payloadLength - _payloadBytesRead; + + if (!_ignore) _dataCallback(_parsingInformation->topicBuffer, data + (*currentBytePosition), _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId); _payloadBytesRead += remainToRead; (*currentBytePosition) += remainToRead; if (_payloadBytesRead == _payloadLength) { _parsingInformation->bufferState = BufferState::NONE; - if (!_ignore) - _completeCallback(_packetId, _qos); + if (!_ignore) _completeCallback(_packetId, _qos); } } diff --git a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.hpp b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.hpp index 7d7cf92c4..d97205ce1 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.hpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient/Packets/PublishPacket.hpp @@ -34,6 +34,5 @@ class PublishPacket : public Packet { uint16_t _packetId; uint32_t _payloadLength; uint32_t _payloadBytesRead; - char* _ptempbuff; }; } // namespace AsyncMqttClientInternals diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 6aaf5f0df..9935e4298 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -656,7 +656,6 @@ void Mqtt::publish_ha(const __FlashStringHelper * topic, const JsonObject & payl } // publish a Home Assistant config topic and payload, with retain flag off. -// for ESP32 its added to the queue, for ESP8266 is sent immediatelty void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) { if (!enabled()) { return; @@ -676,20 +675,7 @@ void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) { #endif // queue messages if the MQTT connection is not yet established. to ensure we don't miss messages - bool queued = !connected(); - - if (queued) { - queue_publish_message(topic, payload_text, true); // with retain true - return; - } - - // send immediately and then wait a while - if (!mqttClient_->publish(topic.c_str(), 0, true, payload_text.c_str())) { - LOG_ERROR(F("Failed to publish topic %s"), topic.c_str()); - mqtt_publish_fails_++; // increment failure counter - } - - delay(MQTT_HA_PUBLISH_DELAY); // enough time to send the short message out + queue_publish_message(topic, payload_text, true); // with retain true } // take top from queue and perform the publish or subscribe action diff --git a/src/mqtt.h b/src/mqtt.h index 67bf9f292..f120a8ecc 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -42,13 +42,7 @@ using uuid::console::Shell; #define MQTT_HA_PUBLISH_DELAY 50 // size of queue -#if defined(EMSESP_STANDALONE) -#define MAX_MQTT_MESSAGES 70 -#elif defined(ESP32) #define MAX_MQTT_MESSAGES 100 -#else -#define MAX_MQTT_MESSAGES 20 -#endif enum { BOOL_FORMAT_ONOFF = 1, BOOL_FORMAT_TRUEFALSE, BOOL_FORMAT_10 }; // matches Web UI settings diff --git a/src/version.h b/src/version.h index ae42691a6..e8e30fdde 100644 --- a/src/version.h +++ b/src/version.h @@ -1 +1 @@ -#define EMSESP_APP_VERSION "3.0.0b2" +#define EMSESP_APP_VERSION "3.0.0b3"