#include "AsyncMqttClient.hpp" AsyncMqttClient::AsyncMqttClient() : _client() , _head(nullptr) , _tail(nullptr) , _sent(0) , _state(DISCONNECTED) , _disconnectReason(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) , _lastClientActivity(0) , _lastServerActivity(0) , _lastPingRequestTime(0) , _generatedClientId{0} , _ip() , _ipv6() , _host(nullptr) , _useIp(false) , _useIpv6(false) #if ASYNC_TCP_SSL_ENABLED , _secure(false) #endif , _port(0) , _keepAlive(15) , _cleanSession(true) , _clientId(nullptr) , _username(nullptr) , _password(nullptr) , _willTopic(nullptr) , _willPayload(nullptr) , _willPayloadLength(0) , _willQos(0) , _willRetain(false) #if ASYNC_TCP_SSL_ENABLED , _secureServerFingerprints() #endif , _onConnectUserCallbacks() , _onDisconnectUserCallbacks() , _onSubscribeUserCallbacks() , _onUnsubscribeUserCallbacks() , _onMessageUserCallbacks() , _onPublishUserCallbacks() , _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE } , _currentParsedPacket(nullptr) , _remainingLengthBufferPosition(0) , _remainingLengthBuffer{0} , _pendingPubRels() { _client.onConnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onConnect(); }, this); _client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast(obj))->_onDisconnect(); }, this); // _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast(obj))->_onError(error); }, this); // _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast(obj))->_onTimeout(); }, this); _client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast(obj))->_onAck(len); }, this); _client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast(obj))->_onData(static_cast(data), len); }, this); _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(); }, this); _client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes) #ifdef ESP32 sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac()); _xSemaphore = xSemaphoreCreateMutex(); #elif defined(ESP8266) sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId()); #endif _clientId = _generatedClientId; setMaxTopicLength(128); } AsyncMqttClient::~AsyncMqttClient() { delete _currentParsedPacket; delete[] _parsingInformation.topicBuffer; _clear(); _pendingPubRels.clear(); _pendingPubRels.shrink_to_fit(); _clearQueue(false); // _clear() doesn't clear session data #ifdef ESP32 vSemaphoreDelete(_xSemaphore); #endif } AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) { _keepAlive = keepAlive; return *this; } AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) { _clientId = clientId; return *this; } AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) { _cleanSession = cleanSession; return *this; } AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) { _parsingInformation.maxTopicLength = maxTopicLength; delete[] _parsingInformation.topicBuffer; _parsingInformation.topicBuffer = new char[maxTopicLength + 1]; return *this; } AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) { _username = username; _password = password; return *this; } AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) { _willTopic = topic; _willQos = qos; _willRetain = retain; _willPayload = payload; _willPayloadLength = length; return *this; } AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) { _useIp = true; _useIpv6 = false; _ip = ip; _port = port; return *this; } AsyncMqttClient& AsyncMqttClient::setServer(IPv6Address ipv6, uint16_t port) { _useIpv6 = true; _useIp = false; _ipv6 = ipv6; _port = port; return *this; } AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) { _port = port; _useIp = false; _useIpv6 = false; _host = host; if (_ipv6.fromString(host)) { _useIpv6 = true; _useIp = false; } else if (_ip.fromString(host)) { _useIpv6 = false; _useIp = true; } return *this; } #if ASYNC_TCP_SSL_ENABLED AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) { _secure = secure; return *this; } AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) { std::array newFingerprint; memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE); _secureServerFingerprints.push_back(newFingerprint); return *this; } #endif AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) { _onConnectUserCallbacks.push_back(callback); return *this; } AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) { _onDisconnectUserCallbacks.push_back(callback); return *this; } AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) { _onSubscribeUserCallbacks.push_back(callback); return *this; } AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) { _onUnsubscribeUserCallbacks.push_back(callback); return *this; } AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) { _onMessageUserCallbacks.push_back(callback); return *this; } AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) { _onPublishUserCallbacks.push_back(callback); return *this; } void AsyncMqttClient::_freeCurrentParsedPacket() { delete _currentParsedPacket; _currentParsedPacket = nullptr; } void AsyncMqttClient::_clear() { _lastPingRequestTime = 0; _freeCurrentParsedPacket(); _clearQueue(true); // keep session data for now _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; _client.setRxTimeout(0); } /* TCP */ void AsyncMqttClient::_onConnect() { log_i("TCP conn, MQTT CONNECT"); #if ASYNC_TCP_SSL_ENABLED if (_secure && _secureServerFingerprints.size() > 0) { SSL* clientSsl = _client.getSSL(); bool sslFoundFingerprint = false; for (std::array fingerprint : _secureServerFingerprints) { if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) { sslFoundFingerprint = true; break; } } if (!sslFoundFingerprint) { _disconnectReason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT; _client.close(true); return; } } #endif AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession, _username, _password, _willTopic, _willRetain, _willQos, _willPayload, _willPayloadLength, _keepAlive, _clientId); _addFront(msg); _handleQueue(); } void AsyncMqttClient::_onDisconnect() { log_i("TCP disconn"); _state = DISCONNECTED; _clear(); for (auto callback : _onDisconnectUserCallbacks) callback(_disconnectReason); } /* void AsyncMqttClient::_onError(int8_t error) { (void)error; // _onDisconnect called anyway } void AsyncMqttClient::_onTimeout() { // disconnection will be handled by ping/pong management } */ void AsyncMqttClient::_onAck(size_t len) { log_i("ack %u", len); _handleQueue(); } void AsyncMqttClient::_onData(char* data, size_t len) { log_i("data rcv (%u)", len); size_t currentBytePosition = 0; char currentByte; _lastServerActivity = millis(); do { switch (_parsingInformation.bufferState) { case AsyncMqttClientInternals::BufferState::NONE: currentByte = data[currentBytePosition++]; _parsingInformation.packetType = currentByte >> 4; _parsingInformation.packetFlags = (currentByte << 4) >> 4; _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH; switch (_parsingInformation.packetType) { case AsyncMqttClientInternals::PacketType.CONNACK: log_i("rcv CONNACK"); _currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2)); _client.setRxTimeout(0); break; case AsyncMqttClientInternals::PacketType.PINGRESP: log_i("rcv PINGRESP"); _currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this)); break; case AsyncMqttClientInternals::PacketType.SUBACK: log_i("rcv SUBACK"); _currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2)); break; case AsyncMqttClientInternals::PacketType.UNSUBACK: log_i("rcv UNSUBACK"); _currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1)); break; case AsyncMqttClientInternals::PacketType.PUBLISH: log_i("rcv PUBLISH"); _currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2)); break; case AsyncMqttClientInternals::PacketType.PUBREL: log_i("rcv PUBREL"); _currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1)); break; case AsyncMqttClientInternals::PacketType.PUBACK: log_i("rcv PUBACK"); _currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1)); break; case AsyncMqttClientInternals::PacketType.PUBREC: log_i("rcv PUBREC"); _currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1)); break; case AsyncMqttClientInternals::PacketType.PUBCOMP: log_i("rcv PUBCOMP"); _currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1)); break; default: log_i("rcv PROTOCOL VIOLATION"); disconnect(true); break; } break; case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH: currentByte = data[currentBytePosition++]; _remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte; if (currentByte >> 7 == 0) { _parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer); _remainingLengthBufferPosition = 0; if (_parsingInformation.remainingLength > 0) { _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER; } else { // PINGRESP is a special case where it has no variable header, so the packet ends right here _parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE; _onPingResp(); } } break; case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER: _currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition); break; case AsyncMqttClientInternals::BufferState::PAYLOAD: _currentParsedPacket->parsePayload(data, len, ¤tBytePosition); break; default: currentBytePosition = len; } } while (currentBytePosition != len); } void AsyncMqttClient::_onPoll() { // if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) { log_w("PING t/o, disconnecting"); disconnect(true); return; } // send ping to ensure the server will receive at least one message inside keepalive window if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) { _sendPing(); // send ping to verify if the server is still there (ensure this is not a half connection) } else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) { _sendPing(); } _handleQueue(); } /* QUEUE */ void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) { // We only use this for QoS2 PUBREL so there must be a PUBLISH packet present. // The queue therefore cannot be empty and _head points to this PUBLISH packet. SEMAPHORE_TAKE(); log_i("new insert #%u", packet->packetType()); packet->next = _head->next; _head->next = packet; if (_head == _tail) { // PUB packet is the only one in the queue _tail = packet; } SEMAPHORE_GIVE(); _handleQueue(); } void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) { // This is only used for the CONNECT packet, to be able to establish a connection // before anything else. The queue can be empty or has packets from the continued session. // In both cases, _head should always point to the CONNECT packet afterwards. SEMAPHORE_TAKE(); log_i("new front #%u", packet->packetType()); if (_head == nullptr) { _tail = packet; } else { packet->next = _head; } _head = packet; SEMAPHORE_GIVE(); _handleQueue(); } void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) { SEMAPHORE_TAKE(); log_i("new back #%u", packet->packetType()); if (!_tail) { _head = packet; } else { _tail->next = packet; } _tail = packet; _tail->next = nullptr; SEMAPHORE_GIVE(); _handleQueue(); } void AsyncMqttClient::_handleQueue() { SEMAPHORE_TAKE(); // On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock bool disconnect = false; while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes // 1. try to send if (_head->size() > _sent) { // On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length. // So we calculate the amount to be written ourselves. size_t willSend = std::min(_head->size() - _sent, _client.space()); size_t realSent = _client.add(reinterpret_cast(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity _sent += willSend; (void)realSent; _client.send(); _lastClientActivity = millis(); // _lastPingRequestTime = 0; // https://github.com/marvinroger/async-mqtt-client/issues/281#issuecomment-1112897839 #if ASYNC_TCP_SSL_ENABLED log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size()); #else log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size()); #endif if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) { disconnect = true; } } // 2. stop processing when we have to wait for an MQTT acknowledgment if (_head->size() == _sent) { if (_head->released()) { log_i("p #%d rel", _head->packetType()); AsyncMqttClientInternals::OutPacket* tmp = _head; _head = _head->next; if (!_head) _tail = nullptr; delete tmp; _sent = 0; } else { break; // sending is complete however send next only after mqtt confirmation } } } SEMAPHORE_GIVE(); if (disconnect) { log_i("snd DISCONN, disconnecting"); _client.close(); } } void AsyncMqttClient::_clearQueue(bool keepSessionData) { SEMAPHORE_TAKE(); AsyncMqttClientInternals::OutPacket* packet = _head; _head = nullptr; _tail = nullptr; while (packet) { /* MQTT spec 3.1.2.4 Clean Session: * - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged. * - QoS 2 messages which have been received from the Server, but have not been completely acknowledged. * + (unsent PUB messages with QoS > 0) * * To be kept: * - possibly first message (sent to server but not acked) * - PUBREC messages (QoS 2 PUB received but not acked) * - PUBCOMP messages (QoS 2 PUBREL received but not acked) */ if (keepSessionData) { if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type reinterpret_cast(packet)->setDup(); AsyncMqttClientInternals::OutPacket* next = packet->next; log_i("keep #%u", packet->packetType()); SEMAPHORE_GIVE(); _addBack(packet); SEMAPHORE_TAKE(); packet = next; } else if (packet->qos() > 0 || packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC || packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) { AsyncMqttClientInternals::OutPacket* next = packet->next; log_i("keep #%u", packet->packetType()); SEMAPHORE_GIVE(); _addBack(packet); SEMAPHORE_TAKE(); packet = next; } else { AsyncMqttClientInternals::OutPacket* next = packet->next; delete packet; packet = next; } /* Delete everything when not keeping session data */ } else { AsyncMqttClientInternals::OutPacket* next = packet->next; delete packet; packet = next; } } _sent = 0; SEMAPHORE_GIVE(); } /* MQTT */ void AsyncMqttClient::_onPingResp() { log_i("PINGRESP"); _freeCurrentParsedPacket(); _lastPingRequestTime = 0; } void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) { log_i("CONNACK"); _freeCurrentParsedPacket(); if (!sessionPresent) { _pendingPubRels.clear(); _pendingPubRels.shrink_to_fit(); _clearQueue(false); // remove session data } if (connectReturnCode == 0) { _state = CONNECTED; for (auto callback : _onConnectUserCallbacks) callback(sessionPresent); } else { // Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib _disconnectReason = static_cast(connectReturnCode); return; } _handleQueue(); // send any remaining data from continued session } void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) { log_i("SUBACK"); _freeCurrentParsedPacket(); SEMAPHORE_TAKE(); if (_head && _head->packetId() == packetId) { _head->release(); log_i("SUB released"); } SEMAPHORE_GIVE(); for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status); _handleQueue(); // subscribe confirmed, ready to send next queued item } void AsyncMqttClient::_onUnsubAck(uint16_t packetId) { log_i("UNSUBACK"); _freeCurrentParsedPacket(); SEMAPHORE_TAKE(); if (_head && _head->packetId() == packetId) { _head->release(); log_i("UNSUB released"); } SEMAPHORE_GIVE(); for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId); _handleQueue(); // unsubscribe confirmed, ready to send next queued item } void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) { bool notifyPublish = true; if (qos == 2) { for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { if (pendingPubRel.packetId == packetId) { notifyPublish = false; break; } } } if (notifyPublish) { AsyncMqttClientMessageProperties properties; properties.qos = qos; properties.dup = dup; properties.retain = retain; for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total); } } void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) { AsyncMqttClientInternals::PendingAck pendingAck; if (qos == 1) { pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK; pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED; pendingAck.packetId = packetId; AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); _addBack(msg); } else if (qos == 2) { pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC; pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED; pendingAck.packetId = packetId; AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); _addBack(msg); bool pubRelAwaiting = false; for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) { if (pendingPubRel.packetId == packetId) { pubRelAwaiting = true; break; } } if (!pubRelAwaiting) { AsyncMqttClientInternals::PendingPubRel pendingPubRel; pendingPubRel.packetId = packetId; _pendingPubRels.push_back(pendingPubRel); } } _freeCurrentParsedPacket(); } void AsyncMqttClient::_onPubRel(uint16_t packetId) { _freeCurrentParsedPacket(); AsyncMqttClientInternals::PendingAck pendingAck; pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP; pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED; pendingAck.packetId = packetId; if (_head && _head->packetId() == packetId) { AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); _head->release(); _insert(msg); log_i("PUBREC released"); } for (size_t i = 0; i < _pendingPubRels.size(); i++) { if (_pendingPubRels[i].packetId == packetId) { _pendingPubRels.erase(_pendingPubRels.begin() + i); _pendingPubRels.shrink_to_fit(); } } } void AsyncMqttClient::_onPubAck(uint16_t packetId) { _freeCurrentParsedPacket(); if (_head && _head->packetId() == packetId) { _head->release(); log_i("PUB released"); } for (auto callback : _onPublishUserCallbacks) callback(packetId); } void AsyncMqttClient::_onPubRec(uint16_t packetId) { _freeCurrentParsedPacket(); // We will only be sending 1 QoS>0 PUB message at a time (to honor message // ordering). So no need to store ACKS in a separate container as it will // be stored in the outgoing queue until a PUBCOMP comes in. AsyncMqttClientInternals::PendingAck pendingAck; pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL; pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED; pendingAck.packetId = packetId; log_i("snd PUBREL"); AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck); if (_head && _head->packetId() == packetId) { _head->release(); log_i("PUB released"); } _insert(msg); } void AsyncMqttClient::_onPubComp(uint16_t packetId) { _freeCurrentParsedPacket(); // _head points to the PUBREL package if (_head && _head->packetId() == packetId) { _head->release(); log_i("PUBREL released"); } for (auto callback : _onPublishUserCallbacks) callback(packetId); } void AsyncMqttClient::_sendPing() { log_i("PING"); _lastPingRequestTime = millis(); AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket; _addBack(msg); } bool AsyncMqttClient::connected() const { return _state == CONNECTED; } void AsyncMqttClient::connect() { if (_state != DISCONNECTED) return; log_i("CONNECTING"); _state = CONNECTING; _disconnectReason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; // reset any previous _client.setRxTimeout(_keepAlive); #if ASYNC_TCP_SSL_ENABLED if (_useIp) { _client.connect(_ip, _port, _secure); } else { _client.connect(_host, _port, _secure); } #else if (_useIp) { _client.connect(_ip, _port); } else if (_useIpv6) { _client.connect(_ipv6, _port); } else { _client.connect(_host, _port); } #endif } void AsyncMqttClient::disconnect(bool force) { if (_state == DISCONNECTED) return; log_i("DISCONNECT (f:%d)", force); if (force) { _state = DISCONNECTED; _client.close(true); } else if (_state != DISCONNECTING) { _state = DISCONNECTING; AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket; _addBack(msg); } } uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) { if (_state != CONNECTED) return 0; log_i("SUBSCRIBE"); AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos); _addBack(msg); return msg->packetId(); } uint16_t AsyncMqttClient::unsubscribe(const char* topic) { if (_state != CONNECTED) return 0; log_i("UNSUBSCRIBE"); AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic); _addBack(msg); return msg->packetId(); } uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) { if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0; log_i("PUBLISH"); AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length); _addBack(msg); return msg->packetId(); } bool AsyncMqttClient::clearQueue() { if (_state != DISCONNECTED) return false; _clearQueue(false); return true; } const char* AsyncMqttClient::getClientId() const { return _clientId; }