mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 15:59:52 +03:00
remove unused async-mqtt-client-lib
This commit is contained in:
@@ -1,21 +0,0 @@
|
|||||||
The MIT License (MIT)
|
|
||||||
|
|
||||||
Copyright (c) 2015-2021 Marvin Roger
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
||||||
@@ -1,776 +0,0 @@
|
|||||||
#include "AsyncMqttClient.hpp"
|
|
||||||
|
|
||||||
AsyncMqttClient::AsyncMqttClient()
|
|
||||||
: _client()
|
|
||||||
, _head(nullptr)
|
|
||||||
, _tail(nullptr)
|
|
||||||
, _sent(0)
|
|
||||||
, _state(DISCONNECTED)
|
|
||||||
, _disconnectReason(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED)
|
|
||||||
, _lastClientActivity(0)
|
|
||||||
, _lastServerActivity(0)
|
|
||||||
, _lastPingRequestTime(0)
|
|
||||||
, _generatedClientId{0}
|
|
||||||
, _ip()
|
|
||||||
, _ipv6()
|
|
||||||
, _host(nullptr)
|
|
||||||
, _useIp(false)
|
|
||||||
, _useIpv6(false)
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
, _secure(false)
|
|
||||||
#endif
|
|
||||||
, _port(0)
|
|
||||||
, _keepAlive(15)
|
|
||||||
, _cleanSession(true)
|
|
||||||
, _clientId(nullptr)
|
|
||||||
, _username(nullptr)
|
|
||||||
, _password(nullptr)
|
|
||||||
, _willTopic(nullptr)
|
|
||||||
, _willPayload(nullptr)
|
|
||||||
, _willPayloadLength(0)
|
|
||||||
, _willQos(0)
|
|
||||||
, _willRetain(false)
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
, _secureServerFingerprints()
|
|
||||||
#endif
|
|
||||||
, _onConnectUserCallbacks()
|
|
||||||
, _onDisconnectUserCallbacks()
|
|
||||||
, _onSubscribeUserCallbacks()
|
|
||||||
, _onUnsubscribeUserCallbacks()
|
|
||||||
, _onMessageUserCallbacks()
|
|
||||||
, _onPublishUserCallbacks()
|
|
||||||
, _parsingInformation { .bufferState = AsyncMqttClientInternals::BufferState::NONE }
|
|
||||||
, _currentParsedPacket(nullptr)
|
|
||||||
, _remainingLengthBufferPosition(0)
|
|
||||||
, _remainingLengthBuffer{0}
|
|
||||||
, _pendingPubRels() {
|
|
||||||
_client.onConnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onConnect(); }, this);
|
|
||||||
_client.onDisconnect([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onDisconnect(); }, this);
|
|
||||||
// _client.onError([](void* obj, AsyncClient* c, int8_t error) { (static_cast<AsyncMqttClient*>(obj))->_onError(error); }, this);
|
|
||||||
// _client.onTimeout([](void* obj, AsyncClient* c, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onTimeout(); }, this);
|
|
||||||
_client.onAck([](void* obj, AsyncClient* c, size_t len, uint32_t time) { (static_cast<AsyncMqttClient*>(obj))->_onAck(len); }, this);
|
|
||||||
_client.onData([](void* obj, AsyncClient* c, void* data, size_t len) { (static_cast<AsyncMqttClient*>(obj))->_onData(static_cast<char*>(data), len); }, this);
|
|
||||||
_client.onPoll([](void* obj, AsyncClient* c) { (static_cast<AsyncMqttClient*>(obj))->_onPoll(); }, this);
|
|
||||||
_client.setNoDelay(true); // send small packets immediately (PINGREQ/DISCONN are only 2 bytes)
|
|
||||||
#ifdef ESP32
|
|
||||||
sprintf(_generatedClientId, "esp32-%06llx", ESP.getEfuseMac());
|
|
||||||
_xSemaphore = xSemaphoreCreateMutex();
|
|
||||||
#elif defined(ESP8266)
|
|
||||||
sprintf(_generatedClientId, "esp8266-%06x", ESP.getChipId());
|
|
||||||
#endif
|
|
||||||
_clientId = _generatedClientId;
|
|
||||||
|
|
||||||
setMaxTopicLength(128);
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient::~AsyncMqttClient() {
|
|
||||||
delete _currentParsedPacket;
|
|
||||||
delete[] _parsingInformation.topicBuffer;
|
|
||||||
_clear();
|
|
||||||
_pendingPubRels.clear();
|
|
||||||
_pendingPubRels.shrink_to_fit();
|
|
||||||
_clearQueue(false); // _clear() doesn't clear session data
|
|
||||||
#ifdef ESP32
|
|
||||||
vSemaphoreDelete(_xSemaphore);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setKeepAlive(uint16_t keepAlive) {
|
|
||||||
_keepAlive = keepAlive;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setClientId(const char* clientId) {
|
|
||||||
_clientId = clientId;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setCleanSession(bool cleanSession) {
|
|
||||||
_cleanSession = cleanSession;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setMaxTopicLength(uint16_t maxTopicLength) {
|
|
||||||
_parsingInformation.maxTopicLength = maxTopicLength;
|
|
||||||
delete[] _parsingInformation.topicBuffer;
|
|
||||||
_parsingInformation.topicBuffer = new char[maxTopicLength + 1];
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setCredentials(const char* username, const char* password) {
|
|
||||||
_username = username;
|
|
||||||
_password = password;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setWill(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
|
|
||||||
_willTopic = topic;
|
|
||||||
_willQos = qos;
|
|
||||||
_willRetain = retain;
|
|
||||||
_willPayload = payload;
|
|
||||||
_willPayloadLength = length;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setServer(IPAddress ip, uint16_t port) {
|
|
||||||
_useIp = true;
|
|
||||||
_useIpv6 = false;
|
|
||||||
_ip = ip;
|
|
||||||
_port = port;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setServer(IPv6Address ipv6, uint16_t port) {
|
|
||||||
_useIpv6 = true;
|
|
||||||
_useIp = false;
|
|
||||||
_ipv6 = ipv6;
|
|
||||||
_port = port;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setServer(const char* host, uint16_t port) {
|
|
||||||
_port = port;
|
|
||||||
_useIp = false;
|
|
||||||
_useIpv6 = false;
|
|
||||||
_host = host;
|
|
||||||
if (_ipv6.fromString(host)) {
|
|
||||||
_useIpv6 = true;
|
|
||||||
_useIp = false;
|
|
||||||
} else if (_ip.fromString(host)) {
|
|
||||||
_useIpv6 = false;
|
|
||||||
_useIp = true;
|
|
||||||
}
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
AsyncMqttClient& AsyncMqttClient::setSecure(bool secure) {
|
|
||||||
_secure = secure;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::addServerFingerprint(const uint8_t* fingerprint) {
|
|
||||||
std::array<uint8_t, SHA1_SIZE> newFingerprint;
|
|
||||||
memcpy(newFingerprint.data(), fingerprint, SHA1_SIZE);
|
|
||||||
_secureServerFingerprints.push_back(newFingerprint);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback) {
|
|
||||||
_onConnectUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback) {
|
|
||||||
_onDisconnectUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback) {
|
|
||||||
_onSubscribeUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback) {
|
|
||||||
_onUnsubscribeUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback) {
|
|
||||||
_onMessageUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncMqttClient& AsyncMqttClient::onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback) {
|
|
||||||
_onPublishUserCallbacks.push_back(callback);
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_freeCurrentParsedPacket() {
|
|
||||||
delete _currentParsedPacket;
|
|
||||||
_currentParsedPacket = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_clear() {
|
|
||||||
_lastPingRequestTime = 0;
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
_clearQueue(true); // keep session data for now
|
|
||||||
|
|
||||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
|
|
||||||
|
|
||||||
_client.setRxTimeout(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* TCP */
|
|
||||||
void AsyncMqttClient::_onConnect() {
|
|
||||||
log_i("TCP conn, MQTT CONNECT");
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
if (_secure && _secureServerFingerprints.size() > 0) {
|
|
||||||
SSL* clientSsl = _client.getSSL();
|
|
||||||
|
|
||||||
bool sslFoundFingerprint = false;
|
|
||||||
for (std::array<uint8_t, SHA1_SIZE> fingerprint : _secureServerFingerprints) {
|
|
||||||
if (ssl_match_fingerprint(clientSsl, fingerprint.data()) == SSL_OK) {
|
|
||||||
sslFoundFingerprint = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!sslFoundFingerprint) {
|
|
||||||
_disconnectReason = AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT;
|
|
||||||
_client.close(true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg =
|
|
||||||
new AsyncMqttClientInternals::ConnectOutPacket(_cleanSession,
|
|
||||||
_username,
|
|
||||||
_password,
|
|
||||||
_willTopic,
|
|
||||||
_willRetain,
|
|
||||||
_willQos,
|
|
||||||
_willPayload,
|
|
||||||
_willPayloadLength,
|
|
||||||
_keepAlive,
|
|
||||||
_clientId);
|
|
||||||
_addFront(msg);
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onDisconnect() {
|
|
||||||
log_i("TCP disconn");
|
|
||||||
_state = DISCONNECTED;
|
|
||||||
|
|
||||||
_clear();
|
|
||||||
|
|
||||||
for (auto callback : _onDisconnectUserCallbacks) callback(_disconnectReason);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
void AsyncMqttClient::_onError(int8_t error) {
|
|
||||||
(void)error;
|
|
||||||
// _onDisconnect called anyway
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onTimeout() {
|
|
||||||
// disconnection will be handled by ping/pong management
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onAck(size_t len) {
|
|
||||||
log_i("ack %u", len);
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onData(char* data, size_t len) {
|
|
||||||
log_i("data rcv (%u)", len);
|
|
||||||
size_t currentBytePosition = 0;
|
|
||||||
char currentByte;
|
|
||||||
_lastServerActivity = millis();
|
|
||||||
do {
|
|
||||||
switch (_parsingInformation.bufferState) {
|
|
||||||
case AsyncMqttClientInternals::BufferState::NONE:
|
|
||||||
currentByte = data[currentBytePosition++];
|
|
||||||
_parsingInformation.packetType = currentByte >> 4;
|
|
||||||
_parsingInformation.packetFlags = (currentByte << 4) >> 4;
|
|
||||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::REMAINING_LENGTH;
|
|
||||||
switch (_parsingInformation.packetType) {
|
|
||||||
case AsyncMqttClientInternals::PacketType.CONNACK:
|
|
||||||
log_i("rcv CONNACK");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::ConnAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onConnAck, this, std::placeholders::_1, std::placeholders::_2));
|
|
||||||
_client.setRxTimeout(0);
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PINGRESP:
|
|
||||||
log_i("rcv PINGRESP");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PingRespPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPingResp, this));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.SUBACK:
|
|
||||||
log_i("rcv SUBACK");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::SubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onSubAck, this, std::placeholders::_1, std::placeholders::_2));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.UNSUBACK:
|
|
||||||
log_i("rcv UNSUBACK");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::UnsubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onUnsubAck, this, std::placeholders::_1));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PUBLISH:
|
|
||||||
log_i("rcv PUBLISH");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PublishPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6, std::placeholders::_7, std::placeholders::_8, std::placeholders::_9), std::bind(&AsyncMqttClient::_onPublish, this, std::placeholders::_1, std::placeholders::_2));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PUBREL:
|
|
||||||
log_i("rcv PUBREL");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PubRelPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRel, this, std::placeholders::_1));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PUBACK:
|
|
||||||
log_i("rcv PUBACK");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PubAckPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubAck, this, std::placeholders::_1));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PUBREC:
|
|
||||||
log_i("rcv PUBREC");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PubRecPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubRec, this, std::placeholders::_1));
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::PacketType.PUBCOMP:
|
|
||||||
log_i("rcv PUBCOMP");
|
|
||||||
_currentParsedPacket = new AsyncMqttClientInternals::PubCompPacket(&_parsingInformation, std::bind(&AsyncMqttClient::_onPubComp, this, std::placeholders::_1));
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
log_i("rcv PROTOCOL VIOLATION");
|
|
||||||
disconnect(true);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::BufferState::REMAINING_LENGTH:
|
|
||||||
currentByte = data[currentBytePosition++];
|
|
||||||
_remainingLengthBuffer[_remainingLengthBufferPosition++] = currentByte;
|
|
||||||
if (currentByte >> 7 == 0) {
|
|
||||||
_parsingInformation.remainingLength = AsyncMqttClientInternals::Helpers::decodeRemainingLength(_remainingLengthBuffer);
|
|
||||||
_remainingLengthBufferPosition = 0;
|
|
||||||
if (_parsingInformation.remainingLength > 0) {
|
|
||||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::VARIABLE_HEADER;
|
|
||||||
} else {
|
|
||||||
// PINGRESP is a special case where it has no variable header, so the packet ends right here
|
|
||||||
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
|
|
||||||
_onPingResp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::BufferState::VARIABLE_HEADER:
|
|
||||||
_currentParsedPacket->parseVariableHeader(data, len, ¤tBytePosition);
|
|
||||||
break;
|
|
||||||
case AsyncMqttClientInternals::BufferState::PAYLOAD:
|
|
||||||
_currentParsedPacket->parsePayload(data, len, ¤tBytePosition);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
currentBytePosition = len;
|
|
||||||
}
|
|
||||||
} while (currentBytePosition != len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPoll() {
|
|
||||||
// if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
|
|
||||||
if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
|
|
||||||
log_w("PING t/o, disconnecting");
|
|
||||||
disconnect(true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// send ping to ensure the server will receive at least one message inside keepalive window
|
|
||||||
if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
|
|
||||||
_sendPing();
|
|
||||||
// send ping to verify if the server is still there (ensure this is not a half connection)
|
|
||||||
} else if (_state == CONNECTED && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
|
|
||||||
_sendPing();
|
|
||||||
}
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* QUEUE */
|
|
||||||
|
|
||||||
void AsyncMqttClient::_insert(AsyncMqttClientInternals::OutPacket* packet) {
|
|
||||||
// We only use this for QoS2 PUBREL so there must be a PUBLISH packet present.
|
|
||||||
// The queue therefore cannot be empty and _head points to this PUBLISH packet.
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
log_i("new insert #%u", packet->packetType());
|
|
||||||
packet->next = _head->next;
|
|
||||||
_head->next = packet;
|
|
||||||
if (_head == _tail) { // PUB packet is the only one in the queue
|
|
||||||
_tail = packet;
|
|
||||||
}
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_addFront(AsyncMqttClientInternals::OutPacket* packet) {
|
|
||||||
// This is only used for the CONNECT packet, to be able to establish a connection
|
|
||||||
// before anything else. The queue can be empty or has packets from the continued session.
|
|
||||||
// In both cases, _head should always point to the CONNECT packet afterwards.
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
log_i("new front #%u", packet->packetType());
|
|
||||||
if (_head == nullptr) {
|
|
||||||
_tail = packet;
|
|
||||||
} else {
|
|
||||||
packet->next = _head;
|
|
||||||
}
|
|
||||||
_head = packet;
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_addBack(AsyncMqttClientInternals::OutPacket* packet) {
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
log_i("new back #%u", packet->packetType());
|
|
||||||
if (!_tail) {
|
|
||||||
_head = packet;
|
|
||||||
} else {
|
|
||||||
_tail->next = packet;
|
|
||||||
}
|
|
||||||
_tail = packet;
|
|
||||||
_tail->next = nullptr;
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
_handleQueue();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_handleQueue() {
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
// On ESP32, onDisconnect is called within the close()-call. So we need to make sure we don't lock
|
|
||||||
bool disconnect = false;
|
|
||||||
|
|
||||||
while (_head && _client.space() > 10) { // safe but arbitrary value, send at least 10 bytes
|
|
||||||
// 1. try to send
|
|
||||||
if (_head->size() > _sent) {
|
|
||||||
// On SSL the TCP library returns the total amount of bytes, not just the unencrypted payload length.
|
|
||||||
// So we calculate the amount to be written ourselves.
|
|
||||||
size_t willSend = std::min(_head->size() - _sent, _client.space());
|
|
||||||
size_t realSent = _client.add(reinterpret_cast<const char*>(_head->data(_sent)), willSend, ASYNC_WRITE_FLAG_COPY); // flag is set by LWIP anyway, added for clarity
|
|
||||||
_sent += willSend;
|
|
||||||
(void)realSent;
|
|
||||||
_client.send();
|
|
||||||
_lastClientActivity = millis();
|
|
||||||
// _lastPingRequestTime = 0; // https://github.com/marvinroger/async-mqtt-client/issues/281#issuecomment-1112897839
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
log_i("snd #%u: (tls: %u) %u/%u", _head->packetType(), realSent, _sent, _head->size());
|
|
||||||
#else
|
|
||||||
log_i("snd #%u: %u/%u", _head->packetType(), _sent, _head->size());
|
|
||||||
#endif
|
|
||||||
if (_head->packetType() == AsyncMqttClientInternals::PacketType.DISCONNECT) {
|
|
||||||
disconnect = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. stop processing when we have to wait for an MQTT acknowledgment
|
|
||||||
if (_head->size() == _sent) {
|
|
||||||
if (_head->released()) {
|
|
||||||
log_i("p #%d rel", _head->packetType());
|
|
||||||
AsyncMqttClientInternals::OutPacket* tmp = _head;
|
|
||||||
_head = _head->next;
|
|
||||||
if (!_head) _tail = nullptr;
|
|
||||||
delete tmp;
|
|
||||||
_sent = 0;
|
|
||||||
} else {
|
|
||||||
break; // sending is complete however send next only after mqtt confirmation
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
if (disconnect) {
|
|
||||||
log_i("snd DISCONN, disconnecting");
|
|
||||||
_client.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_clearQueue(bool keepSessionData) {
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
AsyncMqttClientInternals::OutPacket* packet = _head;
|
|
||||||
_head = nullptr;
|
|
||||||
_tail = nullptr;
|
|
||||||
|
|
||||||
while (packet) {
|
|
||||||
/* MQTT spec 3.1.2.4 Clean Session:
|
|
||||||
* - QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
|
|
||||||
* - QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
|
|
||||||
* + (unsent PUB messages with QoS > 0)
|
|
||||||
*
|
|
||||||
* To be kept:
|
|
||||||
* - possibly first message (sent to server but not acked)
|
|
||||||
* - PUBREC messages (QoS 2 PUB received but not acked)
|
|
||||||
* - PUBCOMP messages (QoS 2 PUBREL received but not acked)
|
|
||||||
*/
|
|
||||||
if (keepSessionData) {
|
|
||||||
if (packet->qos() > 0 && packet->size() <= _sent) { // check for qos includes check for PUB-packet type
|
|
||||||
reinterpret_cast<AsyncMqttClientInternals::PublishOutPacket*>(packet)->setDup();
|
|
||||||
AsyncMqttClientInternals::OutPacket* next = packet->next;
|
|
||||||
log_i("keep #%u", packet->packetType());
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
_addBack(packet);
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
packet = next;
|
|
||||||
} else if (packet->qos() > 0 ||
|
|
||||||
packet->packetType() == AsyncMqttClientInternals::PacketType.PUBREC ||
|
|
||||||
packet->packetType() == AsyncMqttClientInternals::PacketType.PUBCOMP) {
|
|
||||||
AsyncMqttClientInternals::OutPacket* next = packet->next;
|
|
||||||
log_i("keep #%u", packet->packetType());
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
_addBack(packet);
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
packet = next;
|
|
||||||
} else {
|
|
||||||
AsyncMqttClientInternals::OutPacket* next = packet->next;
|
|
||||||
delete packet;
|
|
||||||
packet = next;
|
|
||||||
}
|
|
||||||
/* Delete everything when not keeping session data
|
|
||||||
*/
|
|
||||||
} else {
|
|
||||||
AsyncMqttClientInternals::OutPacket* next = packet->next;
|
|
||||||
delete packet;
|
|
||||||
packet = next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_sent = 0;
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* MQTT */
|
|
||||||
void AsyncMqttClient::_onPingResp() {
|
|
||||||
log_i("PINGRESP");
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
_lastPingRequestTime = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onConnAck(bool sessionPresent, uint8_t connectReturnCode) {
|
|
||||||
log_i("CONNACK");
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
|
|
||||||
if (!sessionPresent) {
|
|
||||||
_pendingPubRels.clear();
|
|
||||||
_pendingPubRels.shrink_to_fit();
|
|
||||||
_clearQueue(false); // remove session data
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connectReturnCode == 0) {
|
|
||||||
_state = CONNECTED;
|
|
||||||
for (auto callback : _onConnectUserCallbacks) callback(sessionPresent);
|
|
||||||
} else {
|
|
||||||
// Callbacks are handled by the onDisconnect function which is called from the AsyncTcp lib
|
|
||||||
_disconnectReason = static_cast<AsyncMqttClientDisconnectReason>(connectReturnCode);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_handleQueue(); // send any remaining data from continued session
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onSubAck(uint16_t packetId, char status) {
|
|
||||||
log_i("SUBACK");
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
_head->release();
|
|
||||||
log_i("SUB released");
|
|
||||||
}
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
|
|
||||||
for (auto callback : _onSubscribeUserCallbacks) callback(packetId, status);
|
|
||||||
|
|
||||||
_handleQueue(); // subscribe confirmed, ready to send next queued item
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onUnsubAck(uint16_t packetId) {
|
|
||||||
log_i("UNSUBACK");
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
SEMAPHORE_TAKE();
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
_head->release();
|
|
||||||
log_i("UNSUB released");
|
|
||||||
}
|
|
||||||
SEMAPHORE_GIVE();
|
|
||||||
|
|
||||||
for (auto callback : _onUnsubscribeUserCallbacks) callback(packetId);
|
|
||||||
|
|
||||||
_handleQueue(); // unsubscribe confirmed, ready to send next queued item
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId) {
|
|
||||||
bool notifyPublish = true;
|
|
||||||
|
|
||||||
if (qos == 2) {
|
|
||||||
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
|
|
||||||
if (pendingPubRel.packetId == packetId) {
|
|
||||||
notifyPublish = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (notifyPublish) {
|
|
||||||
AsyncMqttClientMessageProperties properties;
|
|
||||||
properties.qos = qos;
|
|
||||||
properties.dup = dup;
|
|
||||||
properties.retain = retain;
|
|
||||||
|
|
||||||
for (auto callback : _onMessageUserCallbacks) callback(topic, payload, properties, len, index, total);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
|
|
||||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
|
||||||
|
|
||||||
if (qos == 1) {
|
|
||||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK;
|
|
||||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
|
|
||||||
pendingAck.packetId = packetId;
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
|
|
||||||
_addBack(msg);
|
|
||||||
} else if (qos == 2) {
|
|
||||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC;
|
|
||||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
|
|
||||||
pendingAck.packetId = packetId;
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
|
|
||||||
_addBack(msg);
|
|
||||||
|
|
||||||
bool pubRelAwaiting = false;
|
|
||||||
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
|
|
||||||
if (pendingPubRel.packetId == packetId) {
|
|
||||||
pubRelAwaiting = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pubRelAwaiting) {
|
|
||||||
AsyncMqttClientInternals::PendingPubRel pendingPubRel;
|
|
||||||
pendingPubRel.packetId = packetId;
|
|
||||||
_pendingPubRels.push_back(pendingPubRel);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPubRel(uint16_t packetId) {
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
|
||||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP;
|
|
||||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
|
|
||||||
pendingAck.packetId = packetId;
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
|
|
||||||
_head->release();
|
|
||||||
_insert(msg);
|
|
||||||
log_i("PUBREC released");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i < _pendingPubRels.size(); i++) {
|
|
||||||
if (_pendingPubRels[i].packetId == packetId) {
|
|
||||||
_pendingPubRels.erase(_pendingPubRels.begin() + i);
|
|
||||||
_pendingPubRels.shrink_to_fit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPubAck(uint16_t packetId) {
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
_head->release();
|
|
||||||
log_i("PUB released");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto callback : _onPublishUserCallbacks) callback(packetId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPubRec(uint16_t packetId) {
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
|
|
||||||
// We will only be sending 1 QoS>0 PUB message at a time (to honor message
|
|
||||||
// ordering). So no need to store ACKS in a separate container as it will
|
|
||||||
// be stored in the outgoing queue until a PUBCOMP comes in.
|
|
||||||
AsyncMqttClientInternals::PendingAck pendingAck;
|
|
||||||
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL;
|
|
||||||
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
|
|
||||||
pendingAck.packetId = packetId;
|
|
||||||
log_i("snd PUBREL");
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PubAckOutPacket(pendingAck);
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
_head->release();
|
|
||||||
log_i("PUB released");
|
|
||||||
}
|
|
||||||
_insert(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_onPubComp(uint16_t packetId) {
|
|
||||||
_freeCurrentParsedPacket();
|
|
||||||
|
|
||||||
// _head points to the PUBREL package
|
|
||||||
if (_head && _head->packetId() == packetId) {
|
|
||||||
_head->release();
|
|
||||||
log_i("PUBREL released");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto callback : _onPublishUserCallbacks) callback(packetId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::_sendPing() {
|
|
||||||
log_i("PING");
|
|
||||||
_lastPingRequestTime = millis();
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PingReqOutPacket;
|
|
||||||
_addBack(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool AsyncMqttClient::connected() const {
|
|
||||||
return _state == CONNECTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::connect() {
|
|
||||||
if (_state != DISCONNECTED) return;
|
|
||||||
log_i("CONNECTING");
|
|
||||||
_state = CONNECTING;
|
|
||||||
_disconnectReason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED; // reset any previous
|
|
||||||
|
|
||||||
_client.setRxTimeout(_keepAlive);
|
|
||||||
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
if (_useIp) {
|
|
||||||
_client.connect(_ip, _port, _secure);
|
|
||||||
} else {
|
|
||||||
_client.connect(_host, _port, _secure);
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if (_useIp) {
|
|
||||||
_client.connect(_ip, _port);
|
|
||||||
} else if (_useIpv6) {
|
|
||||||
_client.connect(_ipv6, _port);
|
|
||||||
} else {
|
|
||||||
_client.connect(_host, _port);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncMqttClient::disconnect(bool force) {
|
|
||||||
if (_state == DISCONNECTED) return;
|
|
||||||
log_i("DISCONNECT (f:%d)", force);
|
|
||||||
if (force) {
|
|
||||||
_state = DISCONNECTED;
|
|
||||||
_client.close(true);
|
|
||||||
} else if (_state != DISCONNECTING) {
|
|
||||||
_state = DISCONNECTING;
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::DisconnOutPacket;
|
|
||||||
_addBack(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {
|
|
||||||
if (_state != CONNECTED) return 0;
|
|
||||||
log_i("SUBSCRIBE");
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::SubscribeOutPacket(topic, qos);
|
|
||||||
_addBack(msg);
|
|
||||||
return msg->packetId();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t AsyncMqttClient::unsubscribe(const char* topic) {
|
|
||||||
if (_state != CONNECTED) return 0;
|
|
||||||
log_i("UNSUBSCRIBE");
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::UnsubscribeOutPacket(topic);
|
|
||||||
_addBack(msg);
|
|
||||||
return msg->packetId();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t AsyncMqttClient::publish(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length, bool dup, uint16_t message_id) {
|
|
||||||
if (_state != CONNECTED || GET_FREE_MEMORY() < MQTT_MIN_FREE_MEMORY) return 0;
|
|
||||||
log_i("PUBLISH");
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::OutPacket* msg = new AsyncMqttClientInternals::PublishOutPacket(topic, qos, retain, payload, length);
|
|
||||||
_addBack(msg);
|
|
||||||
return msg->packetId();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool AsyncMqttClient::clearQueue() {
|
|
||||||
if (_state != DISCONNECTED) return false;
|
|
||||||
_clearQueue(false);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* AsyncMqttClient::getClientId() const {
|
|
||||||
return _clientId;
|
|
||||||
}
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
#ifndef SRC_ASYNCMQTTCLIENT_H_
|
|
||||||
#define SRC_ASYNCMQTTCLIENT_H_
|
|
||||||
|
|
||||||
#include "AsyncMqttClient.hpp"
|
|
||||||
|
|
||||||
#endif // SRC_ASYNCMQTTCLIENT_H_
|
|
||||||
@@ -1,182 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <functional>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
|
|
||||||
#ifndef MQTT_MIN_FREE_MEMORY
|
|
||||||
#define MQTT_MIN_FREE_MEMORY 4096
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef ESP32
|
|
||||||
#include <AsyncTCP.h>
|
|
||||||
#include <freertos/semphr.h>
|
|
||||||
#elif defined(ESP8266)
|
|
||||||
#include <ESPAsyncTCP.h>
|
|
||||||
#else
|
|
||||||
#error Platform not supported
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
#include <tcp_axtls.h>
|
|
||||||
#define SHA1_SIZE 20
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "AsyncMqttClient/Flags.hpp"
|
|
||||||
#include "AsyncMqttClient/ParsingInformation.hpp"
|
|
||||||
#include "AsyncMqttClient/MessageProperties.hpp"
|
|
||||||
#include "AsyncMqttClient/Helpers.hpp"
|
|
||||||
#include "AsyncMqttClient/Callbacks.hpp"
|
|
||||||
#include "AsyncMqttClient/DisconnectReasons.hpp"
|
|
||||||
#include "AsyncMqttClient/Storage.hpp"
|
|
||||||
|
|
||||||
#include "AsyncMqttClient/Packets/Packet.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/ConnAckPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PingRespPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/SubAckPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/UnsubAckPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PublishPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PubRelPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PubAckPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PubRecPacket.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/PubCompPacket.hpp"
|
|
||||||
|
|
||||||
#include "AsyncMqttClient/Packets/Out/Connect.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/PingReq.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/PubAck.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/Disconn.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/Subscribe.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/Unsubscribe.hpp"
|
|
||||||
#include "AsyncMqttClient/Packets/Out/Publish.hpp"
|
|
||||||
|
|
||||||
class AsyncMqttClient {
|
|
||||||
public:
|
|
||||||
AsyncMqttClient();
|
|
||||||
~AsyncMqttClient();
|
|
||||||
|
|
||||||
AsyncMqttClient& setKeepAlive(uint16_t keepAlive);
|
|
||||||
AsyncMqttClient& setClientId(const char* clientId);
|
|
||||||
AsyncMqttClient& setCleanSession(bool cleanSession);
|
|
||||||
AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength);
|
|
||||||
AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr);
|
|
||||||
AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0);
|
|
||||||
AsyncMqttClient& setServer(IPAddress ip, uint16_t port);
|
|
||||||
AsyncMqttClient& setServer(IPv6Address ipv6, uint16_t port);
|
|
||||||
AsyncMqttClient& setServer(const char* host, uint16_t port);
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
AsyncMqttClient& setSecure(bool secure);
|
|
||||||
AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback);
|
|
||||||
AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback);
|
|
||||||
AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback);
|
|
||||||
AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback);
|
|
||||||
AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback);
|
|
||||||
AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);
|
|
||||||
|
|
||||||
bool connected() const;
|
|
||||||
void connect();
|
|
||||||
void disconnect(bool force = false);
|
|
||||||
uint16_t subscribe(const char* topic, uint8_t qos);
|
|
||||||
uint16_t unsubscribe(const char* topic);
|
|
||||||
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);
|
|
||||||
bool clearQueue(); // Not MQTT compliant!
|
|
||||||
|
|
||||||
const char* getClientId() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
AsyncClient _client;
|
|
||||||
AsyncMqttClientInternals::OutPacket* _head;
|
|
||||||
AsyncMqttClientInternals::OutPacket* _tail;
|
|
||||||
size_t _sent;
|
|
||||||
enum {
|
|
||||||
CONNECTING,
|
|
||||||
CONNECTED,
|
|
||||||
DISCONNECTING,
|
|
||||||
DISCONNECTED
|
|
||||||
} _state;
|
|
||||||
AsyncMqttClientDisconnectReason _disconnectReason;
|
|
||||||
uint32_t _lastClientActivity;
|
|
||||||
uint32_t _lastServerActivity;
|
|
||||||
uint32_t _lastPingRequestTime;
|
|
||||||
|
|
||||||
char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456
|
|
||||||
IPAddress _ip;
|
|
||||||
IPv6Address _ipv6;
|
|
||||||
const char* _host;
|
|
||||||
bool _useIp;
|
|
||||||
bool _useIpv6;
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
bool _secure;
|
|
||||||
#endif
|
|
||||||
uint16_t _port;
|
|
||||||
uint16_t _keepAlive;
|
|
||||||
bool _cleanSession;
|
|
||||||
const char* _clientId;
|
|
||||||
const char* _username;
|
|
||||||
const char* _password;
|
|
||||||
const char* _willTopic;
|
|
||||||
const char* _willPayload;
|
|
||||||
uint16_t _willPayloadLength;
|
|
||||||
uint8_t _willQos;
|
|
||||||
bool _willRetain;
|
|
||||||
|
|
||||||
#if ASYNC_TCP_SSL_ENABLED
|
|
||||||
std::vector<std::array<uint8_t, SHA1_SIZE>> _secureServerFingerprints;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
std::vector<AsyncMqttClientInternals::OnConnectUserCallback> _onConnectUserCallbacks;
|
|
||||||
std::vector<AsyncMqttClientInternals::OnDisconnectUserCallback> _onDisconnectUserCallbacks;
|
|
||||||
std::vector<AsyncMqttClientInternals::OnSubscribeUserCallback> _onSubscribeUserCallbacks;
|
|
||||||
std::vector<AsyncMqttClientInternals::OnUnsubscribeUserCallback> _onUnsubscribeUserCallbacks;
|
|
||||||
std::vector<AsyncMqttClientInternals::OnMessageUserCallback> _onMessageUserCallbacks;
|
|
||||||
std::vector<AsyncMqttClientInternals::OnPublishUserCallback> _onPublishUserCallbacks;
|
|
||||||
|
|
||||||
AsyncMqttClientInternals::ParsingInformation _parsingInformation;
|
|
||||||
AsyncMqttClientInternals::Packet* _currentParsedPacket;
|
|
||||||
uint8_t _remainingLengthBufferPosition;
|
|
||||||
char _remainingLengthBuffer[4];
|
|
||||||
|
|
||||||
std::vector<AsyncMqttClientInternals::PendingPubRel> _pendingPubRels;
|
|
||||||
|
|
||||||
#if defined(ESP32)
|
|
||||||
SemaphoreHandle_t _xSemaphore = nullptr;
|
|
||||||
#elif defined(ESP8266)
|
|
||||||
bool _xSemaphore = false;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void _clear();
|
|
||||||
void _freeCurrentParsedPacket();
|
|
||||||
|
|
||||||
// TCP
|
|
||||||
void _onConnect();
|
|
||||||
void _onDisconnect();
|
|
||||||
// void _onError(int8_t error);
|
|
||||||
// void _onTimeout();
|
|
||||||
void _onAck(size_t len);
|
|
||||||
void _onData(char* data, size_t len);
|
|
||||||
void _onPoll();
|
|
||||||
|
|
||||||
// QUEUE
|
|
||||||
void _insert(AsyncMqttClientInternals::OutPacket* packet); // for PUBREL
|
|
||||||
void _addFront(AsyncMqttClientInternals::OutPacket* packet); // for CONNECT
|
|
||||||
void _addBack(AsyncMqttClientInternals::OutPacket* packet); // all the rest
|
|
||||||
void _handleQueue();
|
|
||||||
void _clearQueue(bool keepSessionData);
|
|
||||||
|
|
||||||
// MQTT
|
|
||||||
void _onPingResp();
|
|
||||||
void _onConnAck(bool sessionPresent, uint8_t connectReturnCode);
|
|
||||||
void _onSubAck(uint16_t packetId, char status);
|
|
||||||
void _onUnsubAck(uint16_t packetId);
|
|
||||||
void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId);
|
|
||||||
void _onPublish(uint16_t packetId, uint8_t qos);
|
|
||||||
void _onPubRel(uint16_t packetId);
|
|
||||||
void _onPubAck(uint16_t packetId);
|
|
||||||
void _onPubRec(uint16_t packetId);
|
|
||||||
void _onPubComp(uint16_t packetId);
|
|
||||||
|
|
||||||
void _sendPing();
|
|
||||||
};
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <functional>
|
|
||||||
|
|
||||||
#include "DisconnectReasons.hpp"
|
|
||||||
#include "MessageProperties.hpp"
|
|
||||||
#include "Errors.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
// user callbacks
|
|
||||||
typedef std::function<void(bool sessionPresent)> OnConnectUserCallback;
|
|
||||||
typedef std::function<void(AsyncMqttClientDisconnectReason reason)> OnDisconnectUserCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnSubscribeUserCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnUnsubscribeUserCallback;
|
|
||||||
typedef std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)> OnMessageUserCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnPublishUserCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId, AsyncMqttClientError error)> OnErrorUserCallback;
|
|
||||||
|
|
||||||
// internal callbacks
|
|
||||||
typedef std::function<void(bool sessionPresent, uint8_t connectReturnCode)> OnConnAckInternalCallback;
|
|
||||||
typedef std::function<void()> OnPingRespInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId, char status)> OnSubAckInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnUnsubAckInternalCallback;
|
|
||||||
typedef std::function<void(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId)> OnMessageInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnPublishInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnPubRelInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnPubAckInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnPubRecInternalCallback;
|
|
||||||
typedef std::function<void(uint16_t packetId)> OnPubCompInternalCallback;
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
enum class AsyncMqttClientDisconnectReason : uint8_t {
|
|
||||||
TCP_DISCONNECTED = 0,
|
|
||||||
|
|
||||||
MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 1,
|
|
||||||
MQTT_IDENTIFIER_REJECTED = 2,
|
|
||||||
MQTT_SERVER_UNAVAILABLE = 3,
|
|
||||||
MQTT_MALFORMED_CREDENTIALS = 4,
|
|
||||||
MQTT_NOT_AUTHORIZED = 5,
|
|
||||||
|
|
||||||
ESP8266_NOT_ENOUGH_SPACE = 6,
|
|
||||||
|
|
||||||
TLS_BAD_FINGERPRINT = 7
|
|
||||||
};
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
enum class AsyncMqttClientError : uint8_t {
|
|
||||||
MAX_RETRIES = 0,
|
|
||||||
OUT_OF_MEMORY = 1
|
|
||||||
};
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
constexpr struct {
|
|
||||||
const uint8_t RESERVED = 0;
|
|
||||||
const uint8_t CONNECT = 1;
|
|
||||||
const uint8_t CONNACK = 2;
|
|
||||||
const uint8_t PUBLISH = 3;
|
|
||||||
const uint8_t PUBACK = 4;
|
|
||||||
const uint8_t PUBREC = 5;
|
|
||||||
const uint8_t PUBREL = 6;
|
|
||||||
const uint8_t PUBCOMP = 7;
|
|
||||||
const uint8_t SUBSCRIBE = 8;
|
|
||||||
const uint8_t SUBACK = 9;
|
|
||||||
const uint8_t UNSUBSCRIBE = 10;
|
|
||||||
const uint8_t UNSUBACK = 11;
|
|
||||||
const uint8_t PINGREQ = 12;
|
|
||||||
const uint8_t PINGRESP = 13;
|
|
||||||
const uint8_t DISCONNECT = 14;
|
|
||||||
const uint8_t RESERVED2 = 1;
|
|
||||||
} PacketType;
|
|
||||||
|
|
||||||
constexpr struct {
|
|
||||||
const uint8_t CONNECT_RESERVED = 0x00;
|
|
||||||
const uint8_t CONNACK_RESERVED = 0x00;
|
|
||||||
const uint8_t PUBLISH_DUP = 0x08;
|
|
||||||
const uint8_t PUBLISH_QOS0 = 0x00;
|
|
||||||
const uint8_t PUBLISH_QOS1 = 0x02;
|
|
||||||
const uint8_t PUBLISH_QOS2 = 0x04;
|
|
||||||
const uint8_t PUBLISH_QOSRESERVED = 0x06;
|
|
||||||
const uint8_t PUBLISH_RETAIN = 0x01;
|
|
||||||
const uint8_t PUBACK_RESERVED = 0x00;
|
|
||||||
const uint8_t PUBREC_RESERVED = 0x00;
|
|
||||||
const uint8_t PUBREL_RESERVED = 0x02;
|
|
||||||
const uint8_t PUBCOMP_RESERVED = 0x00;
|
|
||||||
const uint8_t SUBSCRIBE_RESERVED = 0x02;
|
|
||||||
const uint8_t SUBACK_RESERVED = 0x00;
|
|
||||||
const uint8_t UNSUBSCRIBE_RESERVED = 0x02;
|
|
||||||
const uint8_t UNSUBACK_RESERVED = 0x00;
|
|
||||||
const uint8_t PINGREQ_RESERVED = 0x00;
|
|
||||||
const uint8_t PINGRESP_RESERVED = 0x00;
|
|
||||||
const uint8_t DISCONNECT_RESERVED = 0x00;
|
|
||||||
const uint8_t RESERVED2_RESERVED = 0x00;
|
|
||||||
} HeaderFlag;
|
|
||||||
|
|
||||||
constexpr struct {
|
|
||||||
const uint8_t USERNAME = 0x80;
|
|
||||||
const uint8_t PASSWORD = 0x40;
|
|
||||||
const uint8_t WILL_RETAIN = 0x20;
|
|
||||||
const uint8_t WILL_QOS0 = 0x00;
|
|
||||||
const uint8_t WILL_QOS1 = 0x08;
|
|
||||||
const uint8_t WILL_QOS2 = 0x10;
|
|
||||||
const uint8_t WILL = 0x04;
|
|
||||||
const uint8_t CLEAN_SESSION = 0x02;
|
|
||||||
const uint8_t RESERVED = 0x00;
|
|
||||||
} ConnectFlag;
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class Helpers {
|
|
||||||
public:
|
|
||||||
static uint32_t decodeRemainingLength(char* bytes) {
|
|
||||||
uint32_t multiplier = 1;
|
|
||||||
uint32_t value = 0;
|
|
||||||
uint8_t currentByte = 0;
|
|
||||||
uint8_t encodedByte;
|
|
||||||
do {
|
|
||||||
encodedByte = bytes[currentByte++];
|
|
||||||
value += (encodedByte & 127) * multiplier;
|
|
||||||
multiplier *= 128;
|
|
||||||
} while ((encodedByte & 128) != 0);
|
|
||||||
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
static uint8_t encodeRemainingLength(uint32_t remainingLength, char* destination) {
|
|
||||||
uint8_t currentByte = 0;
|
|
||||||
uint8_t bytesNeeded = 0;
|
|
||||||
|
|
||||||
do {
|
|
||||||
uint8_t encodedByte = remainingLength % 128;
|
|
||||||
remainingLength /= 128;
|
|
||||||
if (remainingLength > 0) {
|
|
||||||
encodedByte = encodedByte | 128;
|
|
||||||
}
|
|
||||||
|
|
||||||
destination[currentByte++] = encodedByte;
|
|
||||||
bytesNeeded++;
|
|
||||||
} while (remainingLength > 0);
|
|
||||||
|
|
||||||
return bytesNeeded;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
#if defined(ARDUINO_ARCH_ESP32)
|
|
||||||
#define SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY)
|
|
||||||
#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore)
|
|
||||||
#define GET_FREE_MEMORY() ESP.getMaxAllocHeap()
|
|
||||||
#include <esp32-hal-log.h>
|
|
||||||
#elif defined(ARDUINO_ARCH_ESP8266)
|
|
||||||
#define SEMAPHORE_TAKE(X) while (_xSemaphore) { /*ESP.wdtFeed();*/ } _xSemaphore = true
|
|
||||||
#define SEMAPHORE_GIVE() _xSemaphore = false
|
|
||||||
#define GET_FREE_MEMORY() ESP.getMaxFreeBlockSize()
|
|
||||||
#if defined(DEBUG_ESP_PORT) && defined(DEBUG_ASYNC_MQTT_CLIENT)
|
|
||||||
#define log_i(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
|
|
||||||
#define log_e(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
|
|
||||||
#define log_w(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
|
|
||||||
#else
|
|
||||||
#define log_i(...)
|
|
||||||
#define log_e(...)
|
|
||||||
#define log_w(...)
|
|
||||||
#endif
|
|
||||||
#else
|
|
||||||
#pragma error "No valid architecture"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
struct AsyncMqttClientMessageProperties {
|
|
||||||
uint8_t qos;
|
|
||||||
bool dup;
|
|
||||||
bool retain;
|
|
||||||
};
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "ConnAckPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::ConnAckPacket;
|
|
||||||
|
|
||||||
ConnAckPacket::ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _sessionPresent(false)
|
|
||||||
, _connectReturnCode(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnAckPacket::~ConnAckPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_sessionPresent = (currentByte << 7) >> 7;
|
|
||||||
} else {
|
|
||||||
_connectReturnCode = currentByte;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_sessionPresent, _connectReturnCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class ConnAckPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit ConnAckPacket(ParsingInformation* parsingInformation, OnConnAckInternalCallback callback);
|
|
||||||
~ConnAckPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnConnAckInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
bool _sessionPresent;
|
|
||||||
uint8_t _connectReturnCode;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,162 +0,0 @@
|
|||||||
#include "Connect.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::ConnectOutPacket;
|
|
||||||
|
|
||||||
ConnectOutPacket::ConnectOutPacket(bool cleanSession,
|
|
||||||
const char* username,
|
|
||||||
const char* password,
|
|
||||||
const char* willTopic,
|
|
||||||
bool willRetain,
|
|
||||||
uint8_t willQos,
|
|
||||||
const char* willPayload,
|
|
||||||
uint16_t willPayloadLength,
|
|
||||||
uint16_t keepAlive,
|
|
||||||
const char* clientId) {
|
|
||||||
char fixedHeader[5];
|
|
||||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT;
|
|
||||||
fixedHeader[0] = fixedHeader[0] << 4;
|
|
||||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED;
|
|
||||||
|
|
||||||
uint16_t protocolNameLength = 4;
|
|
||||||
char protocolNameLengthBytes[2];
|
|
||||||
protocolNameLengthBytes[0] = protocolNameLength >> 8;
|
|
||||||
protocolNameLengthBytes[1] = protocolNameLength & 0xFF;
|
|
||||||
|
|
||||||
char protocolLevel[1];
|
|
||||||
protocolLevel[0] = 0x04;
|
|
||||||
|
|
||||||
char connectFlags[1];
|
|
||||||
connectFlags[0] = 0;
|
|
||||||
if (cleanSession) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION;
|
|
||||||
if (username != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME;
|
|
||||||
if (password != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD;
|
|
||||||
if (willTopic != nullptr) {
|
|
||||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL;
|
|
||||||
if (willRetain) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN;
|
|
||||||
switch (willQos) {
|
|
||||||
case 0:
|
|
||||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0;
|
|
||||||
break;
|
|
||||||
case 1:
|
|
||||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1;
|
|
||||||
break;
|
|
||||||
case 2:
|
|
||||||
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
char keepAliveBytes[2];
|
|
||||||
keepAliveBytes[0] = keepAlive >> 8;
|
|
||||||
keepAliveBytes[1] = keepAlive & 0xFF;
|
|
||||||
|
|
||||||
uint16_t clientIdLength = strlen(clientId);
|
|
||||||
char clientIdLengthBytes[2];
|
|
||||||
clientIdLengthBytes[0] = clientIdLength >> 8;
|
|
||||||
clientIdLengthBytes[1] = clientIdLength & 0xFF;
|
|
||||||
|
|
||||||
// Optional fields
|
|
||||||
uint16_t willTopicLength = 0;
|
|
||||||
char willTopicLengthBytes[2];
|
|
||||||
char willPayloadLengthBytes[2];
|
|
||||||
if (willTopic != nullptr) {
|
|
||||||
willTopicLength = strlen(willTopic);
|
|
||||||
willTopicLengthBytes[0] = willTopicLength >> 8;
|
|
||||||
willTopicLengthBytes[1] = willTopicLength & 0xFF;
|
|
||||||
|
|
||||||
if (willPayload != nullptr && willPayloadLength == 0) willPayloadLength = strlen(willPayload);
|
|
||||||
|
|
||||||
willPayloadLengthBytes[0] = willPayloadLength >> 8;
|
|
||||||
willPayloadLengthBytes[1] = willPayloadLength & 0xFF;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t usernameLength = 0;
|
|
||||||
char usernameLengthBytes[2];
|
|
||||||
if (username != nullptr) {
|
|
||||||
usernameLength = strlen(username);
|
|
||||||
usernameLengthBytes[0] = usernameLength >> 8;
|
|
||||||
usernameLengthBytes[1] = usernameLength & 0xFF;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t passwordLength = 0;
|
|
||||||
char passwordLengthBytes[2];
|
|
||||||
if (password != nullptr) {
|
|
||||||
passwordLength = strlen(password);
|
|
||||||
passwordLengthBytes[0] = passwordLength >> 8;
|
|
||||||
passwordLengthBytes[1] = passwordLength & 0xFF;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present
|
|
||||||
if (willTopic != nullptr) remainingLength += 2 + willTopicLength + 2 + willPayloadLength;
|
|
||||||
if (username != nullptr) remainingLength += 2 + usernameLength;
|
|
||||||
if (password != nullptr) remainingLength += 2 + passwordLength;
|
|
||||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
|
|
||||||
|
|
||||||
uint32_t neededSpace = 1 + remainingLengthLength;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += protocolNameLength;
|
|
||||||
neededSpace += 1;
|
|
||||||
neededSpace += 1;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += clientIdLength;
|
|
||||||
if (willTopic != nullptr) {
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += willTopicLength;
|
|
||||||
|
|
||||||
neededSpace += 2;
|
|
||||||
if (willPayload != nullptr) neededSpace += willPayloadLength;
|
|
||||||
}
|
|
||||||
if (username != nullptr) {
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += usernameLength;
|
|
||||||
}
|
|
||||||
if (password != nullptr) {
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += passwordLength;
|
|
||||||
}
|
|
||||||
|
|
||||||
_data.reserve(neededSpace);
|
|
||||||
|
|
||||||
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
|
|
||||||
|
|
||||||
_data.push_back(protocolNameLengthBytes[0]);
|
|
||||||
_data.push_back(protocolNameLengthBytes[1]);
|
|
||||||
|
|
||||||
_data.push_back('M');
|
|
||||||
_data.push_back('Q');
|
|
||||||
_data.push_back('T');
|
|
||||||
_data.push_back('T');
|
|
||||||
|
|
||||||
_data.push_back(protocolLevel[0]);
|
|
||||||
_data.push_back(connectFlags[0]);
|
|
||||||
_data.push_back(keepAliveBytes[0]);
|
|
||||||
_data.push_back(keepAliveBytes[1]);
|
|
||||||
_data.push_back(clientIdLengthBytes[0]);
|
|
||||||
_data.push_back(clientIdLengthBytes[1]);
|
|
||||||
|
|
||||||
_data.insert(_data.end(), clientId, clientId + clientIdLength);
|
|
||||||
if (willTopic != nullptr) {
|
|
||||||
_data.insert(_data.end(), willTopicLengthBytes, willTopicLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), willTopic, willTopic + willTopicLength);
|
|
||||||
|
|
||||||
_data.insert(_data.end(), willPayloadLengthBytes, willPayloadLengthBytes + 2);
|
|
||||||
if (willPayload != nullptr) _data.insert(_data.end(), willPayload, willPayload + willPayloadLength);
|
|
||||||
}
|
|
||||||
if (username != nullptr) {
|
|
||||||
_data.insert(_data.end(), usernameLengthBytes, usernameLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), username, username + usernameLength);
|
|
||||||
}
|
|
||||||
if (password != nullptr) {
|
|
||||||
_data.insert(_data.end(), passwordLengthBytes, passwordLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), password, password + passwordLength);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* ConnectOutPacket::data(size_t index) const {
|
|
||||||
return &_data.data()[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t ConnectOutPacket::size() const {
|
|
||||||
return _data.size();
|
|
||||||
}
|
|
||||||
@@ -1,29 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <vector>
|
|
||||||
#include <cstring> // strlen
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class ConnectOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
ConnectOutPacket(bool cleanSession,
|
|
||||||
const char* username,
|
|
||||||
const char* password,
|
|
||||||
const char* willTopic,
|
|
||||||
bool willRetain,
|
|
||||||
uint8_t willQos,
|
|
||||||
const char* willPayload,
|
|
||||||
uint16_t willPayloadLength,
|
|
||||||
uint16_t keepAlive,
|
|
||||||
const char* clientId);
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<uint8_t> _data;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,18 +0,0 @@
|
|||||||
#include "Disconn.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::DisconnOutPacket;
|
|
||||||
|
|
||||||
DisconnOutPacket::DisconnOutPacket() {
|
|
||||||
_data[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
|
|
||||||
_data[0] = _data[0] << 4;
|
|
||||||
_data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED;
|
|
||||||
_data[1] = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* DisconnOutPacket::data(size_t index) const {
|
|
||||||
return &_data[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t DisconnOutPacket::size() const {
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class DisconnOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
DisconnOutPacket();
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint8_t _data[2];
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
#include "OutPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::OutPacket;
|
|
||||||
|
|
||||||
OutPacket::OutPacket()
|
|
||||||
: next(nullptr)
|
|
||||||
, timeout(0)
|
|
||||||
, noTries(0)
|
|
||||||
, _released(true)
|
|
||||||
, _packetId(0) {}
|
|
||||||
|
|
||||||
OutPacket::~OutPacket() {}
|
|
||||||
|
|
||||||
bool OutPacket::released() const {
|
|
||||||
return _released;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t OutPacket::packetType() const {
|
|
||||||
return data(0)[0] >> 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t OutPacket::packetId() const {
|
|
||||||
return _packetId;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t OutPacket::qos() const {
|
|
||||||
if (packetType() == AsyncMqttClientInternals::PacketType.PUBLISH) {
|
|
||||||
return (data()[1] & 0x06) >> 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void OutPacket::release() {
|
|
||||||
_released = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t OutPacket::_nextPacketId = 0;
|
|
||||||
|
|
||||||
uint16_t OutPacket::_getNextPacketId() {
|
|
||||||
if (++_nextPacketId == 0) {
|
|
||||||
++_nextPacketId;
|
|
||||||
}
|
|
||||||
return _nextPacketId;
|
|
||||||
}
|
|
||||||
@@ -1,35 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <stdint.h> // uint*_t
|
|
||||||
#include <stddef.h> // size_t
|
|
||||||
#include <algorithm> // std::min
|
|
||||||
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class OutPacket {
|
|
||||||
public:
|
|
||||||
OutPacket();
|
|
||||||
virtual ~OutPacket();
|
|
||||||
virtual const uint8_t* data(size_t index = 0) const = 0;
|
|
||||||
virtual size_t size() const = 0;
|
|
||||||
bool released() const;
|
|
||||||
uint8_t packetType() const;
|
|
||||||
uint16_t packetId() const;
|
|
||||||
uint8_t qos() const;
|
|
||||||
void release();
|
|
||||||
|
|
||||||
public:
|
|
||||||
OutPacket* next;
|
|
||||||
uint32_t timeout;
|
|
||||||
uint8_t noTries;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
static uint16_t _getNextPacketId();
|
|
||||||
bool _released;
|
|
||||||
uint16_t _packetId;
|
|
||||||
|
|
||||||
private:
|
|
||||||
static uint16_t _nextPacketId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,18 +0,0 @@
|
|||||||
#include "PingReq.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PingReqOutPacket;
|
|
||||||
|
|
||||||
PingReqOutPacket::PingReqOutPacket() {
|
|
||||||
_data[0] = AsyncMqttClientInternals::PacketType.PINGREQ;
|
|
||||||
_data[0] = _data[0] << 4;
|
|
||||||
_data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED;
|
|
||||||
_data[1] = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* PingReqOutPacket::data(size_t index) const {
|
|
||||||
return &_data[index];;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t PingReqOutPacket::size() const {
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
@@ -1,17 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PingReqOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
PingReqOutPacket();
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint8_t _data[2];
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#include "PubAck.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PubAckOutPacket;
|
|
||||||
|
|
||||||
PubAckOutPacket::PubAckOutPacket(PendingAck pendingAck) {
|
|
||||||
_data[0] = pendingAck.packetType;
|
|
||||||
_data[0] = _data[0] << 4;
|
|
||||||
_data[0] = _data[0] | pendingAck.headerFlag;
|
|
||||||
_data[1] = 2;
|
|
||||||
_packetId = pendingAck.packetId;
|
|
||||||
_data[2] = pendingAck.packetId >> 8;
|
|
||||||
_data[3] = pendingAck.packetId & 0xFF;
|
|
||||||
if (packetType() == AsyncMqttClientInternals::PacketType.PUBREL ||
|
|
||||||
packetType() == AsyncMqttClientInternals::PacketType.PUBREC) {
|
|
||||||
_released = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* PubAckOutPacket::data(size_t index) const {
|
|
||||||
return &_data[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t PubAckOutPacket::size() const {
|
|
||||||
return 4;
|
|
||||||
}
|
|
||||||
@@ -1,18 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
#include "../../Storage.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PubAckOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
explicit PubAckOutPacket(PendingAck pendingAck);
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint8_t _data[4];
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,69 +0,0 @@
|
|||||||
#include "Publish.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PublishOutPacket;
|
|
||||||
|
|
||||||
PublishOutPacket::PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
|
|
||||||
char fixedHeader[5];
|
|
||||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH;
|
|
||||||
fixedHeader[0] = fixedHeader[0] << 4;
|
|
||||||
// if (dup) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP;
|
|
||||||
if (retain) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN;
|
|
||||||
switch (qos) {
|
|
||||||
case 0:
|
|
||||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0;
|
|
||||||
break;
|
|
||||||
case 1:
|
|
||||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1;
|
|
||||||
break;
|
|
||||||
case 2:
|
|
||||||
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint16_t topicLength = strlen(topic);
|
|
||||||
char topicLengthBytes[2];
|
|
||||||
topicLengthBytes[0] = topicLength >> 8;
|
|
||||||
topicLengthBytes[1] = topicLength & 0xFF;
|
|
||||||
|
|
||||||
uint32_t payloadLength = length;
|
|
||||||
if (payload != nullptr && payloadLength == 0) payloadLength = strlen(payload);
|
|
||||||
|
|
||||||
uint32_t remainingLength = 2 + topicLength + payloadLength;
|
|
||||||
if (qos != 0) remainingLength += 2;
|
|
||||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
|
|
||||||
|
|
||||||
size_t neededSpace = 0;
|
|
||||||
neededSpace += 1 + remainingLengthLength;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += topicLength;
|
|
||||||
if (qos != 0) neededSpace += 2;
|
|
||||||
if (payload != nullptr) neededSpace += payloadLength;
|
|
||||||
|
|
||||||
_data.reserve(neededSpace);
|
|
||||||
|
|
||||||
_packetId = (qos !=0) ? _getNextPacketId() : 1;
|
|
||||||
char packetIdBytes[2];
|
|
||||||
packetIdBytes[0] = _packetId >> 8;
|
|
||||||
packetIdBytes[1] = _packetId & 0xFF;
|
|
||||||
|
|
||||||
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
|
|
||||||
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), topic, topic + topicLength);
|
|
||||||
if (qos != 0) {
|
|
||||||
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
|
|
||||||
_released = false;
|
|
||||||
}
|
|
||||||
if (payload != nullptr) _data.insert(_data.end(), payload, payload + payloadLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* PublishOutPacket::data(size_t index) const {
|
|
||||||
return &_data.data()[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t PublishOutPacket::size() const {
|
|
||||||
return _data.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
void PublishOutPacket::setDup() {
|
|
||||||
_data[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP;
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <cstring> // strlen
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
#include "../../Storage.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PublishOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length);
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
void setDup(); // you cannot unset dup
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<uint8_t> _data;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,49 +0,0 @@
|
|||||||
#include "Subscribe.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::SubscribeOutPacket;
|
|
||||||
|
|
||||||
SubscribeOutPacket::SubscribeOutPacket(const char* topic, uint8_t qos) {
|
|
||||||
char fixedHeader[5];
|
|
||||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE;
|
|
||||||
fixedHeader[0] = fixedHeader[0] << 4;
|
|
||||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED;
|
|
||||||
|
|
||||||
uint16_t topicLength = strlen(topic);
|
|
||||||
char topicLengthBytes[2];
|
|
||||||
topicLengthBytes[0] = topicLength >> 8;
|
|
||||||
topicLengthBytes[1] = topicLength & 0xFF;
|
|
||||||
|
|
||||||
char qosByte[1];
|
|
||||||
qosByte[0] = qos;
|
|
||||||
|
|
||||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1);
|
|
||||||
|
|
||||||
size_t neededSpace = 0;
|
|
||||||
neededSpace += 1 + remainingLengthLength;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += topicLength;
|
|
||||||
neededSpace += 1;
|
|
||||||
|
|
||||||
_data.reserve(neededSpace);
|
|
||||||
|
|
||||||
_packetId = _getNextPacketId();
|
|
||||||
char packetIdBytes[2];
|
|
||||||
packetIdBytes[0] = _packetId >> 8;
|
|
||||||
packetIdBytes[1] = _packetId & 0xFF;
|
|
||||||
|
|
||||||
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
|
|
||||||
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
|
|
||||||
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), topic, topic + topicLength);
|
|
||||||
_data.push_back(qosByte[0]);
|
|
||||||
_released = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* SubscribeOutPacket::data(size_t index) const {
|
|
||||||
return &_data.data()[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t SubscribeOutPacket::size() const {
|
|
||||||
return _data.size();
|
|
||||||
}
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <cstring> // strlen
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
#include "../../Storage.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class SubscribeOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
SubscribeOutPacket(const char* topic, uint8_t qos);
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<uint8_t> _data;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,42 +0,0 @@
|
|||||||
#include "Unsubscribe.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::UnsubscribeOutPacket;
|
|
||||||
|
|
||||||
UnsubscribeOutPacket::UnsubscribeOutPacket(const char* topic) {
|
|
||||||
char fixedHeader[5];
|
|
||||||
fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE;
|
|
||||||
fixedHeader[0] = fixedHeader[0] << 4;
|
|
||||||
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED;
|
|
||||||
|
|
||||||
uint16_t topicLength = strlen(topic);
|
|
||||||
char topicLengthBytes[2];
|
|
||||||
topicLengthBytes[0] = topicLength >> 8;
|
|
||||||
topicLengthBytes[1] = topicLength & 0xFF;
|
|
||||||
|
|
||||||
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1);
|
|
||||||
|
|
||||||
size_t neededSpace = 0;
|
|
||||||
neededSpace += 1 + remainingLengthLength;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += 2;
|
|
||||||
neededSpace += topicLength;
|
|
||||||
|
|
||||||
_packetId = _getNextPacketId();
|
|
||||||
char packetIdBytes[2];
|
|
||||||
packetIdBytes[0] = _packetId >> 8;
|
|
||||||
packetIdBytes[1] = _packetId & 0xFF;
|
|
||||||
|
|
||||||
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
|
|
||||||
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
|
|
||||||
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
|
|
||||||
_data.insert(_data.end(), topic, topic + topicLength);
|
|
||||||
_released = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
const uint8_t* UnsubscribeOutPacket::data(size_t index) const {
|
|
||||||
return &_data.data()[index];
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t UnsubscribeOutPacket::size() const {
|
|
||||||
return _data.size();
|
|
||||||
}
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <cstring> // strlen
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "OutPacket.hpp"
|
|
||||||
#include "../../Flags.hpp"
|
|
||||||
#include "../../Helpers.hpp"
|
|
||||||
#include "../../Storage.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class UnsubscribeOutPacket : public OutPacket {
|
|
||||||
public:
|
|
||||||
explicit UnsubscribeOutPacket(const char* topic);
|
|
||||||
const uint8_t* data(size_t index = 0) const;
|
|
||||||
size_t size() const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<uint8_t> _data;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,11 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class Packet {
|
|
||||||
public:
|
|
||||||
virtual ~Packet() {}
|
|
||||||
|
|
||||||
virtual void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) = 0;
|
|
||||||
virtual void parsePayload(char* data, size_t len, size_t* currentBytePosition) = 0;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#include "PingRespPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PingRespPacket;
|
|
||||||
|
|
||||||
PingRespPacket::PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback) {
|
|
||||||
}
|
|
||||||
|
|
||||||
PingRespPacket::~PingRespPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PingRespPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
|
|
||||||
void PingRespPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PingRespPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PingRespPacket(ParsingInformation* parsingInformation, OnPingRespInternalCallback callback);
|
|
||||||
~PingRespPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnPingRespInternalCallback _callback;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "PubAckPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PubAckPacket;
|
|
||||||
|
|
||||||
PubAckPacket::PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
PubAckPacket::~PubAckPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PubAckPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PubAckPacket(ParsingInformation* parsingInformation, OnPubAckInternalCallback callback);
|
|
||||||
~PubAckPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnPubAckInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "PubCompPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PubCompPacket;
|
|
||||||
|
|
||||||
PubCompPacket::PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
PubCompPacket::~PubCompPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubCompPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubCompPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PubCompPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PubCompPacket(ParsingInformation* parsingInformation, OnPubCompInternalCallback callback);
|
|
||||||
~PubCompPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnPubCompInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "PubRecPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PubRecPacket;
|
|
||||||
|
|
||||||
PubRecPacket::PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
PubRecPacket::~PubRecPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubRecPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubRecPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PubRecPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PubRecPacket(ParsingInformation* parsingInformation, OnPubRecInternalCallback callback);
|
|
||||||
~PubRecPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnPubRecInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "PubRelPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PubRelPacket;
|
|
||||||
|
|
||||||
PubRelPacket::PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
PubRelPacket::~PubRelPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubRelPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void PubRelPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PubRelPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PubRelPacket(ParsingInformation* parsingInformation, OnPubRelInternalCallback callback);
|
|
||||||
~PubRelPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnPubRelInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,91 +0,0 @@
|
|||||||
#include "PublishPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::PublishPacket;
|
|
||||||
|
|
||||||
PublishPacket::PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _dataCallback(dataCallback)
|
|
||||||
, _completeCallback(completeCallback)
|
|
||||||
, _dup(false)
|
|
||||||
, _qos(0)
|
|
||||||
, _retain(0)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _topicLengthMsb(0)
|
|
||||||
, _topicLength(0)
|
|
||||||
, _ignore(false)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0)
|
|
||||||
, _payloadLength(0)
|
|
||||||
, _payloadBytesRead(0) {
|
|
||||||
_dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP;
|
|
||||||
_retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN;
|
|
||||||
char qosMasked = _parsingInformation->packetFlags & 0x06;
|
|
||||||
switch (qosMasked) {
|
|
||||||
case HeaderFlag.PUBLISH_QOS0:
|
|
||||||
_qos = 0;
|
|
||||||
break;
|
|
||||||
case HeaderFlag.PUBLISH_QOS1:
|
|
||||||
_qos = 1;
|
|
||||||
break;
|
|
||||||
case HeaderFlag.PUBLISH_QOS2:
|
|
||||||
_qos = 2;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PublishPacket::~PublishPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void PublishPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition == 0) {
|
|
||||||
_topicLengthMsb = currentByte;
|
|
||||||
} else if (_bytePosition == 1) {
|
|
||||||
_topicLength = currentByte | _topicLengthMsb << 8;
|
|
||||||
if (_topicLength > _parsingInformation->maxTopicLength) {
|
|
||||||
_ignore = true;
|
|
||||||
} else {
|
|
||||||
_parsingInformation->topicBuffer[_topicLength] = '\0';
|
|
||||||
}
|
|
||||||
} else if (_bytePosition >= 2 && _bytePosition < 2 + _topicLength) {
|
|
||||||
// Starting from here, _ignore might be true
|
|
||||||
if (!_ignore) _parsingInformation->topicBuffer[_bytePosition - 2] = currentByte;
|
|
||||||
if (_bytePosition == 2 + _topicLength - 1 && _qos == 0) {
|
|
||||||
_preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else if (_bytePosition == 2 + _topicLength) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_preparePayloadHandling(_parsingInformation->remainingLength - (_bytePosition + 1));
|
|
||||||
}
|
|
||||||
_bytePosition++;
|
|
||||||
}
|
|
||||||
|
|
||||||
void PublishPacket::_preparePayloadHandling(uint32_t payloadLength) {
|
|
||||||
_payloadLength = payloadLength;
|
|
||||||
if (payloadLength == 0) {
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
if (!_ignore) {
|
|
||||||
_dataCallback(_parsingInformation->topicBuffer, nullptr, _qos, _dup, _retain, 0, 0, 0, _packetId);
|
|
||||||
_completeCallback(_packetId, _qos);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_parsingInformation->bufferState = BufferState::PAYLOAD;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
size_t remainToRead = len - (*currentBytePosition);
|
|
||||||
if (_payloadBytesRead + remainToRead > _payloadLength) remainToRead = _payloadLength - _payloadBytesRead;
|
|
||||||
|
|
||||||
if (!_ignore) _dataCallback(_parsingInformation->topicBuffer, data + (*currentBytePosition), _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId);
|
|
||||||
_payloadBytesRead += remainToRead;
|
|
||||||
(*currentBytePosition) += remainToRead;
|
|
||||||
|
|
||||||
if (_payloadBytesRead == _payloadLength) {
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
if (!_ignore) _completeCallback(_packetId, _qos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../Flags.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class PublishPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit PublishPacket(ParsingInformation* parsingInformation, OnMessageInternalCallback dataCallback, OnPublishInternalCallback completeCallback);
|
|
||||||
~PublishPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnMessageInternalCallback _dataCallback;
|
|
||||||
OnPublishInternalCallback _completeCallback;
|
|
||||||
|
|
||||||
void _preparePayloadHandling(uint32_t payloadLength);
|
|
||||||
|
|
||||||
bool _dup;
|
|
||||||
uint8_t _qos;
|
|
||||||
bool _retain;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _topicLengthMsb;
|
|
||||||
uint16_t _topicLength;
|
|
||||||
bool _ignore;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
uint32_t _payloadLength;
|
|
||||||
uint32_t _payloadBytesRead;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,46 +0,0 @@
|
|||||||
#include "SubAckPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::SubAckPacket;
|
|
||||||
|
|
||||||
SubAckPacket::SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
SubAckPacket::~SubAckPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void SubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::PAYLOAD;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void SubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char status = data[(*currentBytePosition)++];
|
|
||||||
|
|
||||||
/* switch (status) {
|
|
||||||
case 0:
|
|
||||||
Serial.println("Success QoS 0");
|
|
||||||
break;
|
|
||||||
case 1:
|
|
||||||
Serial.println("Success QoS 1");
|
|
||||||
break;
|
|
||||||
case 2:
|
|
||||||
Serial.println("Success QoS 2");
|
|
||||||
break;
|
|
||||||
case 0x80:
|
|
||||||
Serial.println("Failure");
|
|
||||||
break;
|
|
||||||
} */
|
|
||||||
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId, status);
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class SubAckPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit SubAckPacket(ParsingInformation* parsingInformation, OnSubAckInternalCallback callback);
|
|
||||||
~SubAckPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnSubAckInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
#include "UnsubAckPacket.hpp"
|
|
||||||
|
|
||||||
using AsyncMqttClientInternals::UnsubAckPacket;
|
|
||||||
|
|
||||||
UnsubAckPacket::UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback)
|
|
||||||
: _parsingInformation(parsingInformation)
|
|
||||||
, _callback(callback)
|
|
||||||
, _bytePosition(0)
|
|
||||||
, _packetIdMsb(0)
|
|
||||||
, _packetId(0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
UnsubAckPacket::~UnsubAckPacket() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void UnsubAckPacket::parseVariableHeader(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
char currentByte = data[(*currentBytePosition)++];
|
|
||||||
if (_bytePosition++ == 0) {
|
|
||||||
_packetIdMsb = currentByte;
|
|
||||||
} else {
|
|
||||||
_packetId = currentByte | _packetIdMsb << 8;
|
|
||||||
_parsingInformation->bufferState = BufferState::NONE;
|
|
||||||
_callback(_packetId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void UnsubAckPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
|
|
||||||
(void)data;
|
|
||||||
(void)currentBytePosition;
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include "Arduino.h"
|
|
||||||
#include "Packet.hpp"
|
|
||||||
#include "../ParsingInformation.hpp"
|
|
||||||
#include "../Callbacks.hpp"
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
class UnsubAckPacket : public Packet {
|
|
||||||
public:
|
|
||||||
explicit UnsubAckPacket(ParsingInformation* parsingInformation, OnUnsubAckInternalCallback callback);
|
|
||||||
~UnsubAckPacket();
|
|
||||||
|
|
||||||
void parseVariableHeader(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
void parsePayload(char* data, size_t len, size_t* currentBytePosition);
|
|
||||||
|
|
||||||
private:
|
|
||||||
ParsingInformation* _parsingInformation;
|
|
||||||
OnUnsubAckInternalCallback _callback;
|
|
||||||
|
|
||||||
uint8_t _bytePosition;
|
|
||||||
char _packetIdMsb;
|
|
||||||
uint16_t _packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
enum class BufferState : uint8_t {
|
|
||||||
NONE = 0,
|
|
||||||
REMAINING_LENGTH = 2,
|
|
||||||
VARIABLE_HEADER = 3,
|
|
||||||
PAYLOAD = 4
|
|
||||||
};
|
|
||||||
|
|
||||||
struct ParsingInformation {
|
|
||||||
BufferState bufferState;
|
|
||||||
|
|
||||||
uint16_t maxTopicLength;
|
|
||||||
char* topicBuffer;
|
|
||||||
|
|
||||||
uint8_t packetType;
|
|
||||||
uint16_t packetFlags;
|
|
||||||
uint32_t remainingLength;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
@@ -1,13 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
namespace AsyncMqttClientInternals {
|
|
||||||
struct PendingPubRel {
|
|
||||||
uint16_t packetId;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct PendingAck {
|
|
||||||
uint8_t packetType;
|
|
||||||
uint8_t headerFlag;
|
|
||||||
uint16_t packetId;
|
|
||||||
};
|
|
||||||
} // namespace AsyncMqttClientInternals
|
|
||||||
Reference in New Issue
Block a user