Files
EMS-ESP32/lib/async-mqtt-client/src/AsyncMqttClient.cpp
2021-07-08 10:17:50 +02:00

771 lines
26 KiB
C++

#include "AsyncMqttClient.hpp"
AsyncMqttClient::AsyncMqttClient()
: _client()
, _head(nullptr)
, _tail(nullptr)
, _sent(0)
, _state(DISCONNECTED)
, _tlsBadFingerprint(false)
, _lastClientActivity(0)
, _lastServerActivity(0)
, _lastPingRequestTime(0)
, _generatedClientId{0}
, _ip()
, _ipv6()
, _host(nullptr)
, _useIp(false)
, _useIpv6(false)
#if ASYNC_TCP_SSL_ENABLED
, _secure(false)
#endif
, _port(0)
, _keepAlive(15)
, _cleanSession(true)
, _clientId(nullptr)
, _username(nullptr)
, _password(nullptr)
, _willTopic(nullptr)
, _willPayload(nullptr)
, _willPayloadLength(0)
, _willQos(0)
, _willRetain(false)
#if ASYNC_TCP_SSL_ENABLED
, _secureServerFingerprints()
#endif
, _onConnectUserCallbacks()
, _onDisconnectUserCallbacks()
, _onSubscribeUserCallbacks()
, _onUnsubscribeUserCallbacks()
, _onMessageUserCallbacks()
, _onPublishUserCallbacks()
, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE }
, _currentParsedPacket(nullptr)
, _remainingLengthBufferPosition(0)
, _remainingLengthBuffer{0}
, _pendingPubRels() {
_client.onConnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onConnect(); }, this);
_client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onDisconnect(); }, this);
// _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast<AsyncMqttClient*>(obj))->_onError(error); }, this);
// _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onTimeout(); }, this);
_client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onAck(len); }, this);
_client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast<AsyncMqttClient*>(obj))->_onData(static_cast<char*>(data), len); }, this);
_client.onPoll([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onPoll(); }, this);
_client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes)
#ifdef ESP32
sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac());
_xSemaphore = xSemaphoreCreateMutex();
#elif defined(ESP8266)
sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId());
#endif
_clientId = _generatedClientId;
setMaxTopicLength(128);
}
AsyncMqttClient::~AsyncMqttClient() {
delete _currentParsedPacket;
delete[] _parsingInformation.topicBuffer;
_clear();
_pendingPubRels.clear();
_pendingPubRels.shrink_to_fit();
_clearQueue(false); // _clear() doesn't clear session data
#ifdef ESP32
vSemaphoreDelete(_xSemaphore);
#endif
}
AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) {
_keepAlive = keepAlive;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) {
_clientId = clientId;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) {
_cleanSession = cleanSession;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) {
_parsingInformation.maxTopicLength = maxTopicLength;
delete[] _parsingInformation.topicBuffer;
_parsingInformation.topicBuffer = new char[maxTopicLength + 1];
return *this;
}
AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) {
_username = username;
_password = password;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
_willTopic = topic;
_willQos = qos;
_willRetain = retain;
_willPayload = payload;
_willPayloadLength = length;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) {
_useIp = true;
_useIpv6 = false;
_ip = ip;
_port = port;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setServer(IPv6Address ipv6, uint16_t port) {
_useIpv6 = true;
_useIp = false;
_ipv6 = ipv6;
_port = port;
return *this;
}
AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) {
_port = port;
_useIp = false;
_useIpv6 = false;
_host = host;
if (_ipv6.fromString(host)) {
_useIpv6 = true;
_useIp = false;
} else if (_ip.fromString(host)) {
_useIpv6 = false;
_useIp = true;
}
return *this;
}
#if ASYNC_TCP_SSL_ENABLED
AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) {
_secure = secure;
return *this;
}
AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) {
std::array<uint8_t, SHA1_SIZE> newFingerprint;
memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE);
_secureServerFingerprints.push_back(newFingerprint);
return *this;
}
#endif
AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) {
_onConnectUserCallbacks.push_back(callback);
return *this;
}
AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) {
_onDisconnectUserCallbacks.push_back(callback);
return *this;
}
AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) {
_onSubscribeUserCallbacks.push_back(callback);
return *this;
}
AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) {
_onUnsubscribeUserCallbacks.push_back(callback);
return *this;
}
AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) {
_onMessageUserCallbacks.push_back(callback);
return *this;
}
AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) {
_onPublishUserCallbacks.push_back(callback);
return *this;
}
void AsyncMqttClient::_freeCurrentParsedPacket() {
delete _currentParsedPacket;
_currentParsedPacket = nullptr;
}
void AsyncMqttClient::_clear() {
_lastPingRequestTime = 0;
_tlsBadFingerprint = false;
_freeCurrentParsedPacket();
_clearQueue(true); // keep session data for now
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
}
/* TCP */
void AsyncMqttClient::_onConnect() {
log_i("TCP conn, MQTT CONNECT");
#if ASYNC_TCP_SSL_ENABLED
if (_secure && _secureServerFingerprints.size() > 0) {
SSL* clientSsl = _client.getSSL();
bool sslFoundFingerprint = false;
for (std::array<uint8_t, SHA1_SIZE> fingerprint : _secureServerFingerprints) {
if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) {
sslFoundFingerprint = true;
break;
}
}
if (!sslFoundFingerprint) {
_tlsBadFingerprint = true;
_client.close(true);
return;
}
}
#endif
AsyncMqttClientInternals::OutPacket* msg =
new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession,
_username,
_password,
_willTopic,
_willRetain,
_willQos,
_willPayload,
_willPayloadLength,
_keepAlive,
_clientId);
_addFront(msg);
_handleQueue();
}
void AsyncMqttClient::_onDisconnect() {
log_i("TCP disconn");
_state = DISCONNECTED;
AsyncMqttClientDisconnectReason reason;
if (_tlsBadFingerprint) {
reason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT;
} else {
reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED;
}
_clear();
for (auto callback : _onDisconnectUserCallbacks) callback(reason);
}
/*
void AsyncMqttClient::_onError(int8_t error) {
(void)error;
// _onDisconnect called anyway
}
void AsyncMqttClient::_onTimeout() {
// disconnection will be handled by ping/pong management
}
*/
void AsyncMqttClient::_onAck(size_t len) {
log_i("ack %u", len);
_handleQueue();
}
void AsyncMqttClient::_onData(char* data, size_t len) {
log_i("data rcv (%u)", len);
size_t currentBytePosition = 0;
char currentByte;
_lastServerActivity = millis();
do {
switch (_parsingInformation.bufferState) {
case AsyncMqttClientInternals::BufferState::NONE:
currentByte = data[currentBytePosition++];
_parsingInformation.packetType = currentByte >> 4;
_parsingInformation.packetFlags = (currentByte << 4) >> 4;
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH;
switch (_parsingInformation.packetType) {
case AsyncMqttClientInternals::PacketType.CONNACK:
log_i("rcv CONNACK");
_currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2));
break;
case AsyncMqttClientInternals::PacketType.PINGRESP:
log_i("rcv PINGRESP");
_currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this));
break;
case AsyncMqttClientInternals::PacketType.SUBACK:
log_i("rcv SUBACK");
_currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2));
break;
case AsyncMqttClientInternals::PacketType.UNSUBACK:
log_i("rcv UNSUBACK");
_currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1));
break;
case AsyncMqttClientInternals::PacketType.PUBLISH:
log_i("rcv PUBLISH");
_currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2));
break;
case AsyncMqttClientInternals::PacketType.PUBREL:
log_i("rcv PUBREL");
_currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1));
break;
case AsyncMqttClientInternals::PacketType.PUBACK:
log_i("rcv PUBACK");
_currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1));
break;
case AsyncMqttClientInternals::PacketType.PUBREC:
log_i("rcv PUBREC");
_currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1));
break;
case AsyncMqttClientInternals::PacketType.PUBCOMP:
log_i("rcv PUBCOMP");
_currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1));
break;
default:
log_i("rcv PROTOCOL VIOLATION");
disconnect(true);
break;
}
break;
case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH:
currentByte = data[currentBytePosition++];
_remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte;
if (currentByte >> 7 == 0) {
_parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer);
_remainingLengthBufferPosition = 0;
if (_parsingInformation.remainingLength > 0) {
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER;
} else {
// PINGRESP is a special case where it has no variable header, so the packet ends right here
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
_onPingResp();
}
}
break;
case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER:
_currentParsedPacket->parseVariableHeader(data, len, &currentBytePosition);
break;
case AsyncMqttClientInternals::BufferState::PAYLOAD:
_currentParsedPacket->parsePayload(data, len, &currentBytePosition);
break;
default:
currentBytePosition = len;
}
} while (currentBytePosition != len);
}
void AsyncMqttClient::_onPoll() {
// if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
log_w("PING t/o, disconnecting");
disconnect(true);
return;
}
// send ping to ensure the server will receive at least one message inside keepalive window
if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
_sendPing();
// send ping to verify if the server is still there (ensure this is not a half connection)
} else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
_sendPing();
}
_handleQueue();
}
/* QUEUE */
void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) {
// We only use this for QoS2 PUBREL so there must be a PUBLISH packet present.
// The queue therefore cannot be empty and _head points to this PUBLISH packet.
SEMAPHORE_TAKE();
log_i("new insert #%u", packet->packetType());
packet->next = _head->next;
_head->next = packet;
if (_head == _tail) { // PUB packet is the only one in the queue
_tail = packet;
}
SEMAPHORE_GIVE();
_handleQueue();
}
void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) {
// This is only used for the CONNECT packet, to be able to establish a connection
// before anything else. The queue can be empty or has packets from the continued session.
// In both cases, _head should always point to the CONNECT packet afterwards.
SEMAPHORE_TAKE();
log_i("new front #%u", packet->packetType());
if (_head == nullptr) {
_tail = packet;
} else {
packet->next = _head;
}
_head = packet;
SEMAPHORE_GIVE();
_handleQueue();
}
void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) {
SEMAPHORE_TAKE();
log_i("new back #%u", packet->packetType());
if (!_tail) {
_head = packet;
} else {
_tail->next = packet;
}
_tail = packet;
_tail->next = nullptr;
SEMAPHORE_GIVE();
_handleQueue();
}
void AsyncMqttClient::_handleQueue() {
SEMAPHORE_TAKE();
// On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock
bool disconnect = false;
while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes
// 1. try to send
if (_head->size() > _sent) {
// On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length.
// So we calculate the amount to be written ourselves.
size_t willSend = std::min(_head->size() - _sent, _client.space());
size_t realSent = _client.add(reinterpret_cast<const char*>(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity
_sent += willSend;
(void)realSent;
_client.send();
_lastClientActivity = millis();
_lastPingRequestTime = 0;
#if ASYNC_TCP_SSL_ENABLED
log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size());
#else
log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size());
#endif
if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) {
disconnect = true;
}
}
// 2. stop processing when we have to wait for an MQTT acknowledgment
if (_head->size() == _sent) {
if (_head->released()) {
log_i("p #%d rel", _head->packetType());
AsyncMqttClientInternals::OutPacket* tmp = _head;
_head = _head->next;
if (!_head) _tail = nullptr;
delete tmp;
_sent = 0;
} else {
break; // sending is complete however send next only after mqtt confirmation
}
}
}
SEMAPHORE_GIVE();
if (disconnect) {
log_i("snd DISCONN, disconnecting");
_client.close();
}
}
void AsyncMqttClient::_clearQueue(bool keepSessionData) {
SEMAPHORE_TAKE();
AsyncMqttClientInternals::OutPacket* packet = _head;
_head = nullptr;
_tail = nullptr;
while (packet) {
/* MQTT spec 3.1.2.4 Clean Session:
* - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
* - QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
* + (unsent PUB messages with QoS > 0)
*
* To be kept:
* - possibly first message (sent to server but not acked)
* - PUBREC messages (QoS 2 PUB received but not acked)
* - PUBCOMP messages (QoS 2 PUBREL received but not acked)
*/
if (keepSessionData) {
if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type
reinterpret_cast<AsyncMqttClientInternals::PublishOutPacket*>(packet)->setDup();
AsyncMqttClientInternals::OutPacket* next = packet->next;
log_i("keep #%u", packet->packetType());
SEMAPHORE_GIVE();
_addBack(packet);
SEMAPHORE_TAKE();
packet = next;
} else if (packet->qos() > 0 ||
packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC ||
packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) {
AsyncMqttClientInternals::OutPacket* next = packet->next;
log_i("keep #%u", packet->packetType());
SEMAPHORE_GIVE();
_addBack(packet);
SEMAPHORE_TAKE();
packet = next;
} else {
AsyncMqttClientInternals::OutPacket* next = packet->next;
delete packet;
packet = next;
}
/* Delete everything when not keeping session data
*/
} else {
AsyncMqttClientInternals::OutPacket* next = packet->next;
delete packet;
packet = next;
}
}
_sent = 0;
SEMAPHORE_GIVE();
}
/* MQTT */
void AsyncMqttClient::_onPingResp() {
log_i("PINGRESP");
_freeCurrentParsedPacket();
_lastPingRequestTime = 0;
}
void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) {
log_i("CONNACK");
_freeCurrentParsedPacket();
if (!sessionPresent) {
_pendingPubRels.clear();
_pendingPubRels.shrink_to_fit();
_clearQueue(false); // remove session data
}
if (connectReturnCode == 0) {
_state = CONNECTED;
for (auto callback : _onConnectUserCallbacks) callback(sessionPresent);
} else {
// Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib
}
_handleQueue(); // send any remaining data from continued session
}
void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) {
log_i("SUBACK");
_freeCurrentParsedPacket();
SEMAPHORE_TAKE();
if (_head && _head->packetId() == packetId) {
_head->release();
log_i("SUB released");
}
SEMAPHORE_GIVE();
for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status);
_handleQueue(); // subscribe confirmed, ready to send next queued item
}
void AsyncMqttClient::_onUnsubAck(uint16_t packetId) {
log_i("UNSUBACK");
_freeCurrentParsedPacket();
SEMAPHORE_TAKE();
if (_head && _head->packetId() == packetId) {
_head->release();
log_i("UNSUB released");
}
SEMAPHORE_GIVE();
for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId);
_handleQueue(); // unsubscribe confirmed, ready to send next queued item
}
void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) {
bool notifyPublish = true;
if (qos == 2) {
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
if (pendingPubRel.packetId == packetId) {
notifyPublish = false;
break;
}
}
}
if (notifyPublish) {
AsyncMqttClientMessageProperties properties;
properties.qos = qos;
properties.dup = dup;
properties.retain = retain;
for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total);
}
}
void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
AsyncMqttClientInternals::PendingAck pendingAck;
if (qos == 1) {
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK;
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
pendingAck.packetId = packetId;
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
_addBack(msg);
} else if (qos == 2) {
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC;
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
pendingAck.packetId = packetId;
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
_addBack(msg);
bool pubRelAwaiting = false;
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
if (pendingPubRel.packetId == packetId) {
pubRelAwaiting = true;
break;
}
}
if (!pubRelAwaiting) {
AsyncMqttClientInternals::PendingPubRel pendingPubRel;
pendingPubRel.packetId = packetId;
_pendingPubRels.push_back(pendingPubRel);
}
}
_freeCurrentParsedPacket();
}
void AsyncMqttClient::_onPubRel(uint16_t packetId) {
_freeCurrentParsedPacket();
AsyncMqttClientInternals::PendingAck pendingAck;
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP;
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
pendingAck.packetId = packetId;
if (_head && _head->packetId() == packetId) {
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
_head->release();
_insert(msg);
log_i("PUBREC released");
}
for (size_t i = 0; i < _pendingPubRels.size(); i++) {
if (_pendingPubRels[i].packetId == packetId) {
_pendingPubRels.erase(_pendingPubRels.begin() + i);
_pendingPubRels.shrink_to_fit();
}
}
}
void AsyncMqttClient::_onPubAck(uint16_t packetId) {
_freeCurrentParsedPacket();
if (_head && _head->packetId() == packetId) {
_head->release();
log_i("PUB released");
}
for (auto callback : _onPublishUserCallbacks) callback(packetId);
}
void AsyncMqttClient::_onPubRec(uint16_t packetId) {
_freeCurrentParsedPacket();
// We will only be sending 1 QoS>0 PUB message at a time (to honor message
// ordering). So no need to store ACKS in a separate container as it will
// be stored in the outgoing queue until a PUBCOMP comes in.
AsyncMqttClientInternals::PendingAck pendingAck;
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL;
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
pendingAck.packetId = packetId;
log_i("snd PUBREL");
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
if (_head && _head->packetId() == packetId) {
_head->release();
log_i("PUB released");
}
_insert(msg);
}
void AsyncMqttClient::_onPubComp(uint16_t packetId) {
_freeCurrentParsedPacket();
// _head points to the PUBREL package
if (_head && _head->packetId() == packetId) {
_head->release();
log_i("PUBREL released");
}
for (auto callback : _onPublishUserCallbacks) callback(packetId);
}
void AsyncMqttClient::_sendPing() {
log_i("PING");
_lastPingRequestTime = millis();
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket;
_addBack(msg);
}
bool AsyncMqttClient::connected() const {
return _state == CONNECTED;
}
void AsyncMqttClient::connect() {
if (_state != DISCONNECTED) return;
log_i("CONNECTING");
_state = CONNECTING;
#if ASYNC_TCP_SSL_ENABLED
if (_useIp) {
_client.connect(_ip, _port, _secure);
} else {
_client.connect(_host, _port, _secure);
}
#else
if (_useIp) {
_client.connect(_ip, _port);
} else if (_useIpv6) {
_client.connect(_ipv6, _port);
} else {
_client.connect(_host, _port);
}
#endif
}
void AsyncMqttClient::disconnect(bool force) {
if (_state == DISCONNECTED) return;
log_i("DISCONNECT (f:%d)", force);
if (force) {
_state = DISCONNECTED;
_client.close(true);
} else if (_state != DISCONNECTING) {
_state = DISCONNECTING;
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket;
_addBack(msg);
}
}
uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
if (_state != CONNECTED) return 0;
log_i("SUBSCRIBE");
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos);
_addBack(msg);
return msg->packetId();
}
uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
if (_state != CONNECTED) return 0;
log_i("UNSUBSCRIBE");
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic);
_addBack(msg);
return msg->packetId();
}
uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) {
if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0;
log_i("PUBLISH");
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length);
_addBack(msg);
return msg->packetId();
}
const char* AsyncMqttClient::getClientId() const {
return _clientId;
}