experimental mqtt client supporting large packets

This commit is contained in:
proddy
2021-02-27 14:45:51 +01:00
parent 333c5c16c0
commit ce45374b54
28 changed files with 1358 additions and 1046 deletions

View File

@@ -1,18 +0,0 @@
Async MQTT client for ESP8266 and ESP32
=============================
[![Build Status](https://img.shields.io/travis/marvinroger/async-mqtt-client/master.svg?style=flat-square)](https://travis-ci.org/marvinroger/async-mqtt-client)
An Arduino for ESP8266 and ESP32 asynchronous [MQTT](http://mqtt.org/) client implementation, built on [me-no-dev/ESPAsyncTCP (ESP8266)](https://github.com/me-no-dev/ESPAsyncTCP) | [me-no-dev/AsyncTCP (ESP32)](https://github.com/me-no-dev/AsyncTCP) .
## Features
* Compliant with the 3.1.1 version of the protocol
* Fully asynchronous
* Subscribe at QoS 0, 1 and 2
* Publish at QoS 0, 1 and 2
* SSL/TLS support
* Available in the [PlatformIO registry](http://platformio.org/lib/show/346/AsyncMqttClient)
## Requirements, installation and usage
The project is documented in the [/docs folder](docs).

File diff suppressed because it is too large Load Diff

View File

@@ -3,4 +3,4 @@
#include "AsyncMqttClient.hpp" #include "AsyncMqttClient.hpp"
#endif // SRC_ASYNCMQTTCLIENT_H_ #endif // SRC_ASYNCMQTTCLIENT_H_

View File

@@ -5,6 +5,10 @@
#include "Arduino.h" #include "Arduino.h"
#ifndef MQTT_MIN_FREE_MEMORY
#define MQTT_MIN_FREE_MEMORY 4096
#endif
#ifdef ESP32 #ifdef ESP32
#include <AsyncTCP.h> #include <AsyncTCP.h>
#include <freertos/semphr.h> #include <freertos/semphr.h>
@@ -38,137 +42,137 @@
#include "AsyncMqttClient/Packets/PubRecPacket.hpp" #include "AsyncMqttClient/Packets/PubRecPacket.hpp"
#include "AsyncMqttClient/Packets/PubCompPacket.hpp" #include "AsyncMqttClient/Packets/PubCompPacket.hpp"
#if ESP32 #include "AsyncMqttClient/Packets/Out/Connect.hpp"
#define SEMAPHORE_TAKE(X) \ #include "AsyncMqttClient/Packets/Out/PingReq.hpp"
if (xSemaphoreTake(_xSemaphore, 1000 / portTICK_PERIOD_MS) != pdTRUE) { \ #include "AsyncMqttClient/Packets/Out/PubAck.hpp"
return X; \ #include "AsyncMqttClient/Packets/Out/Disconn.hpp"
} // Waits max 1000ms #include "AsyncMqttClient/Packets/Out/Subscribe.hpp"
#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore); #include "AsyncMqttClient/Packets/Out/Unsubscribe.hpp"
#elif defined(ESP8266) #include "AsyncMqttClient/Packets/Out/Publish.hpp"
#define SEMAPHORE_TAKE(X) void()
#define SEMAPHORE_GIVE() void()
#endif
class AsyncMqttClient { class AsyncMqttClient {
public: public:
AsyncMqttClient(); AsyncMqttClient();
~AsyncMqttClient(); ~AsyncMqttClient();
AsyncMqttClient & setKeepAlive(uint16_t keepAlive); AsyncMqttClient& setKeepAlive(uint16_t keepAlive);
AsyncMqttClient & setClientId(const char * clientId); AsyncMqttClient& setClientId(const char* clientId);
AsyncMqttClient & setCleanSession(bool cleanSession); AsyncMqttClient& setCleanSession(bool cleanSession);
AsyncMqttClient & setMaxTopicLength(uint16_t maxTopicLength); AsyncMqttClient& setMaxTopicLength(uint16_t maxTopicLength);
AsyncMqttClient & setCredentials(const char * username, const char * password = nullptr); AsyncMqttClient& setCredentials(const char* username, const char* password = nullptr);
AsyncMqttClient & setWill(const char * topic, uint8_t qos, bool retain, const char * payload = nullptr, size_t length = 0); AsyncMqttClient& setWill(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0);
AsyncMqttClient & setServer(IPAddress ip, uint16_t port); AsyncMqttClient& setServer(IPAddress ip, uint16_t port);
AsyncMqttClient & setServer(const char * host, uint16_t port); AsyncMqttClient& setServer(const char* host, uint16_t port);
#if ASYNC_TCP_SSL_ENABLED #if ASYNC_TCP_SSL_ENABLED
AsyncMqttClient & setSecure(bool secure); AsyncMqttClient& setSecure(bool secure);
AsyncMqttClient & addServerFingerprint(const uint8_t * fingerprint); AsyncMqttClient& addServerFingerprint(const uint8_t* fingerprint);
#endif #endif
AsyncMqttClient & onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback); AsyncMqttClient& onConnect(AsyncMqttClientInternals::OnConnectUserCallback callback);
AsyncMqttClient & onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback); AsyncMqttClient& onDisconnect(AsyncMqttClientInternals::OnDisconnectUserCallback callback);
AsyncMqttClient & onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback); AsyncMqttClient& onSubscribe(AsyncMqttClientInternals::OnSubscribeUserCallback callback);
AsyncMqttClient & onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback); AsyncMqttClient& onUnsubscribe(AsyncMqttClientInternals::OnUnsubscribeUserCallback callback);
AsyncMqttClient & onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic = "#"); AsyncMqttClient& onMessage(AsyncMqttClientInternals::OnMessageUserCallback callback);
AsyncMqttClient & onFilteredMessage(AsyncMqttClientInternals::OnMessageUserCallback callback, const char * _userTopic); AsyncMqttClient& onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);
AsyncMqttClient & onPublish(AsyncMqttClientInternals::OnPublishUserCallback callback);
bool connected() const; bool connected() const;
void connect(); void connect();
void disconnect(bool force = false); void disconnect(bool force = false);
uint16_t subscribe(const char * topic, uint8_t qos); uint16_t subscribe(const char* topic, uint8_t qos);
uint16_t subscribe(const char * topic, uint8_t qos, AsyncMqttClientInternals::OnMessageUserCallback callback); uint16_t unsubscribe(const char* topic);
uint16_t unsubscribe(const char * topic); uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);
uint16_t publish(const char * topic, uint8_t qos, bool retain, const char * payload = nullptr, size_t length = 0, bool dup = false, uint16_t message_id = 0);
const char * getClientId(); const char* getClientId() const;
private: private:
AsyncClient _client; AsyncClient _client;
AsyncMqttClientInternals::OutPacket* _head;
AsyncMqttClientInternals::OutPacket* _tail;
size_t _sent;
enum {
CONNECTING,
CONNECTED,
DISCONNECTING,
DISCONNECTED
} _state;
bool _tlsBadFingerprint;
uint32_t _lastClientActivity;
uint32_t _lastServerActivity;
uint32_t _lastPingRequestTime;
bool _connected; char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456
bool _lockMutiConnections; IPAddress _ip;
bool _connectPacketNotEnoughSpace; const char* _host;
bool _disconnectFlagged; bool _useIp;
bool _tlsBadFingerprint;
uint32_t _lastClientActivity;
uint32_t _lastServerActivity;
uint32_t _lastPingRequestTime;
char _generatedClientId[18 + 1]; // esp8266-abc123 and esp32-abcdef123456
IPAddress _ip;
const char * _host;
bool _useIp;
#if ASYNC_TCP_SSL_ENABLED #if ASYNC_TCP_SSL_ENABLED
bool _secure; bool _secure;
#endif #endif
uint16_t _port; uint16_t _port;
uint16_t _keepAlive; uint16_t _keepAlive;
bool _cleanSession; bool _cleanSession;
const char * _clientId; const char* _clientId;
const char * _username; const char* _username;
const char * _password; const char* _password;
const char * _willTopic; const char* _willTopic;
const char * _willPayload; const char* _willPayload;
uint16_t _willPayloadLength; uint16_t _willPayloadLength;
uint8_t _willQos; uint8_t _willQos;
bool _willRetain; bool _willRetain;
#if ASYNC_TCP_SSL_ENABLED #if ASYNC_TCP_SSL_ENABLED
std::vector<std::array<uint8_t, SHA1_SIZE>> _secureServerFingerprints; std::vector<std::array<uint8_t, SHA1_SIZE>> _secureServerFingerprints;
#endif #endif
std::vector<AsyncMqttClientInternals::OnConnectUserCallback> _onConnectUserCallbacks; std::vector<AsyncMqttClientInternals::OnConnectUserCallback> _onConnectUserCallbacks;
std::vector<AsyncMqttClientInternals::OnDisconnectUserCallback> _onDisconnectUserCallbacks; std::vector<AsyncMqttClientInternals::OnDisconnectUserCallback> _onDisconnectUserCallbacks;
std::vector<AsyncMqttClientInternals::OnSubscribeUserCallback> _onSubscribeUserCallbacks; std::vector<AsyncMqttClientInternals::OnSubscribeUserCallback> _onSubscribeUserCallbacks;
std::vector<AsyncMqttClientInternals::OnUnsubscribeUserCallback> _onUnsubscribeUserCallbacks; std::vector<AsyncMqttClientInternals::OnUnsubscribeUserCallback> _onUnsubscribeUserCallbacks;
std::vector<AsyncMqttClientInternals::onFilteredMessageUserCallback> _onMessageUserCallbacks; std::vector<AsyncMqttClientInternals::OnMessageUserCallback> _onMessageUserCallbacks;
std::vector<AsyncMqttClientInternals::OnPublishUserCallback> _onPublishUserCallbacks; std::vector<AsyncMqttClientInternals::OnPublishUserCallback> _onPublishUserCallbacks;
AsyncMqttClientInternals::ParsingInformation _parsingInformation; AsyncMqttClientInternals::ParsingInformation _parsingInformation;
AsyncMqttClientInternals::Packet * _currentParsedPacket; AsyncMqttClientInternals::Packet* _currentParsedPacket;
uint8_t _remainingLengthBufferPosition; uint8_t _remainingLengthBufferPosition;
char _remainingLengthBuffer[4]; char _remainingLengthBuffer[4];
uint16_t _nextPacketId; std::vector<AsyncMqttClientInternals::PendingPubRel> _pendingPubRels;
std::vector<AsyncMqttClientInternals::PendingPubRel> _pendingPubRels; #if defined(ESP32)
SemaphoreHandle_t _xSemaphore = nullptr;
std::vector<AsyncMqttClientInternals::PendingAck> _toSendAcks; #elif defined(ESP8266)
bool _xSemaphore = false;
#ifdef ESP32
SemaphoreHandle_t _xSemaphore = nullptr;
#endif #endif
void _clear(); void _clear();
void _freeCurrentParsedPacket(); void _freeCurrentParsedPacket();
// TCP // TCP
void _onConnect(AsyncClient * client); void _onConnect();
void _onDisconnect(AsyncClient * client); void _onDisconnect();
static void _onError(AsyncClient * client, int8_t error); // void _onError(int8_t error);
void _onTimeout(AsyncClient * client, uint32_t time); // void _onTimeout();
static void _onAck(AsyncClient * client, size_t len, uint32_t time); void _onAck(size_t len);
void _onData(AsyncClient * client, char * data, size_t len); void _onData(char* data, size_t len);
void _onPoll(AsyncClient * client); void _onPoll();
// MQTT // QUEUE
void _onPingResp(); void _insert(AsyncMqttClientInternals::OutPacket* packet); // for PUBREL
void _onConnAck(bool sessionPresent, uint8_t connectReturnCode); void _addFront(AsyncMqttClientInternals::OutPacket* packet); // for CONNECT
void _onSubAck(uint16_t packetId, char status); void _addBack(AsyncMqttClientInternals::OutPacket* packet); // all the rest
void _onUnsubAck(uint16_t packetId); void _handleQueue();
void _onMessage(char * topic, char * payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId); void _clearQueue(bool keepSessionData);
void _onPublish(uint16_t packetId, uint8_t qos);
void _onPubRel(uint16_t packetId);
void _onPubAck(uint16_t packetId);
void _onPubRec(uint16_t packetId);
void _onPubComp(uint16_t packetId);
bool _sendPing(); // MQTT
void _sendAcks(); void _onPingResp();
bool _sendDisconnect(); void _onConnAck(bool sessionPresent, uint8_t connectReturnCode);
void _onSubAck(uint16_t packetId, char status);
void _onUnsubAck(uint16_t packetId);
void _onMessage(char* topic, char* payload, uint8_t qos, bool dup, bool retain, size_t len, size_t index, size_t total, uint16_t packetId);
void _onPublish(uint16_t packetId, uint8_t qos);
void _onPubRel(uint16_t packetId);
void _onPubAck(uint16_t packetId);
void _onPubRec(uint16_t packetId);
void _onPubComp(uint16_t packetId);
uint16_t _getNextPacketId(); void _sendPing();
}; };

View File

@@ -4,6 +4,7 @@
#include "DisconnectReasons.hpp" #include "DisconnectReasons.hpp"
#include "MessageProperties.hpp" #include "MessageProperties.hpp"
#include "Errors.hpp"
namespace AsyncMqttClientInternals { namespace AsyncMqttClientInternals {
// user callbacks // user callbacks
@@ -12,9 +13,8 @@ typedef std::function<void(AsyncMqttClientDisconnectReason reason)> OnDisconnect
typedef std::function<void(uint16_t packetId, uint8_t qos)> OnSubscribeUserCallback; typedef std::function<void(uint16_t packetId, uint8_t qos)> OnSubscribeUserCallback;
typedef std::function<void(uint16_t packetId)> OnUnsubscribeUserCallback; typedef std::function<void(uint16_t packetId)> OnUnsubscribeUserCallback;
typedef std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)> OnMessageUserCallback; typedef std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)> OnMessageUserCallback;
// typedef std::pair<std::string, std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)>> onFilteredMessageUserCallback;
typedef std::pair<char*, std::function<void(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total)>> onFilteredMessageUserCallback;
typedef std::function<void(uint16_t packetId)> OnPublishUserCallback; typedef std::function<void(uint16_t packetId)> OnPublishUserCallback;
typedef std::function<void(uint16_t packetId, AsyncMqttClientError error)> OnErrorUserCallback;
// internal callbacks // internal callbacks
typedef std::function<void(bool sessionPresent, uint8_t connectReturnCode)> OnConnAckInternalCallback; typedef std::function<void(bool sessionPresent, uint8_t connectReturnCode)> OnConnAckInternalCallback;

View File

@@ -0,0 +1,6 @@
#pragma once
enum class AsyncMqttClientError : uint8_t {
MAX_RETRIES = 0,
OUT_OF_MEMORY = 1
};

View File

@@ -35,4 +35,27 @@ class Helpers {
return bytesNeeded; return bytesNeeded;
} }
}; };
#if defined(ARDUINO_ARCH_ESP32)
#define SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY)
#define SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore)
#define GET_FREE_MEMORY() ESP.getMaxAllocHeap()
#include <esp32-hal-log.h>
#elif defined(ARDUINO_ARCH_ESP8266)
#define SEMAPHORE_TAKE(X) while (_xSemaphore) { /*ESP.wdtFeed();*/ } _xSemaphore = true
#define SEMAPHORE_GIVE() _xSemaphore = false
#define GET_FREE_MEMORY() ESP.getMaxFreeBlockSize()
#if defined(DEBUG_ESP_PORT) && defined(DEBUG_ASYNC_MQTT_CLIENT)
#define log_i(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
#define log_e(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
#define log_w(...) DEBUG_ESP_PORT.printf(__VA_ARGS__); DEBUG_ESP_PORT.print("\n")
#else
#define log_i(...)
#define log_e(...)
#define log_w(...)
#endif
#else
#pragma error "No valid architecture"
#endif
} // namespace AsyncMqttClientInternals } // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,162 @@
#include "Connect.hpp"
using AsyncMqttClientInternals::ConnectOutPacket;
ConnectOutPacket::ConnectOutPacket(bool cleanSession,
const char* username,
const char* password,
const char* willTopic,
bool willRetain,
uint8_t willQos,
const char* willPayload,
uint16_t willPayloadLength,
uint16_t keepAlive,
const char* clientId) {
char fixedHeader[5];
fixedHeader[0] = AsyncMqttClientInternals::PacketType.CONNECT;
fixedHeader[0] = fixedHeader[0] << 4;
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.CONNECT_RESERVED;
uint16_t protocolNameLength = 4;
char protocolNameLengthBytes[2];
protocolNameLengthBytes[0] = protocolNameLength >> 8;
protocolNameLengthBytes[1] = protocolNameLength & 0xFF;
char protocolLevel[1];
protocolLevel[0] = 0x04;
char connectFlags[1];
connectFlags[0] = 0;
if (cleanSession) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.CLEAN_SESSION;
if (username != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.USERNAME;
if (password != nullptr) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.PASSWORD;
if (willTopic != nullptr) {
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL;
if (willRetain) connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_RETAIN;
switch (willQos) {
case 0:
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS0;
break;
case 1:
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS1;
break;
case 2:
connectFlags[0] |= AsyncMqttClientInternals::ConnectFlag.WILL_QOS2;
break;
}
}
char keepAliveBytes[2];
keepAliveBytes[0] = keepAlive >> 8;
keepAliveBytes[1] = keepAlive & 0xFF;
uint16_t clientIdLength = strlen(clientId);
char clientIdLengthBytes[2];
clientIdLengthBytes[0] = clientIdLength >> 8;
clientIdLengthBytes[1] = clientIdLength & 0xFF;
// Optional fields
uint16_t willTopicLength = 0;
char willTopicLengthBytes[2];
char willPayloadLengthBytes[2];
if (willTopic != nullptr) {
willTopicLength = strlen(willTopic);
willTopicLengthBytes[0] = willTopicLength >> 8;
willTopicLengthBytes[1] = willTopicLength & 0xFF;
if (willPayload != nullptr && willPayloadLength == 0) willPayloadLength = strlen(willPayload);
willPayloadLengthBytes[0] = willPayloadLength >> 8;
willPayloadLengthBytes[1] = willPayloadLength & 0xFF;
}
uint16_t usernameLength = 0;
char usernameLengthBytes[2];
if (username != nullptr) {
usernameLength = strlen(username);
usernameLengthBytes[0] = usernameLength >> 8;
usernameLengthBytes[1] = usernameLength & 0xFF;
}
uint16_t passwordLength = 0;
char passwordLengthBytes[2];
if (password != nullptr) {
passwordLength = strlen(password);
passwordLengthBytes[0] = passwordLength >> 8;
passwordLengthBytes[1] = passwordLength & 0xFF;
}
uint32_t remainingLength = 2 + protocolNameLength + 1 + 1 + 2 + 2 + clientIdLength; // always present
if (willTopic != nullptr) remainingLength += 2 + willTopicLength + 2 + willPayloadLength;
if (username != nullptr) remainingLength += 2 + usernameLength;
if (password != nullptr) remainingLength += 2 + passwordLength;
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
uint32_t neededSpace = 1 + remainingLengthLength;
neededSpace += 2;
neededSpace += protocolNameLength;
neededSpace += 1;
neededSpace += 1;
neededSpace += 2;
neededSpace += 2;
neededSpace += clientIdLength;
if (willTopic != nullptr) {
neededSpace += 2;
neededSpace += willTopicLength;
neededSpace += 2;
if (willPayload != nullptr) neededSpace += willPayloadLength;
}
if (username != nullptr) {
neededSpace += 2;
neededSpace += usernameLength;
}
if (password != nullptr) {
neededSpace += 2;
neededSpace += passwordLength;
}
_data.reserve(neededSpace);
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
_data.push_back(protocolNameLengthBytes[0]);
_data.push_back(protocolNameLengthBytes[1]);
_data.push_back('M');
_data.push_back('Q');
_data.push_back('T');
_data.push_back('T');
_data.push_back(protocolLevel[0]);
_data.push_back(connectFlags[0]);
_data.push_back(keepAliveBytes[0]);
_data.push_back(keepAliveBytes[1]);
_data.push_back(clientIdLengthBytes[0]);
_data.push_back(clientIdLengthBytes[1]);
_data.insert(_data.end(), clientId, clientId + clientIdLength);
if (willTopic != nullptr) {
_data.insert(_data.end(), willTopicLengthBytes, willTopicLengthBytes + 2);
_data.insert(_data.end(), willTopic, willTopic + willTopicLength);
_data.insert(_data.end(), willPayloadLengthBytes, willPayloadLengthBytes + 2);
if (willPayload != nullptr) _data.insert(_data.end(), willPayload, willPayload + willPayloadLength);
}
if (username != nullptr) {
_data.insert(_data.end(), usernameLengthBytes, usernameLengthBytes + 2);
_data.insert(_data.end(), username, username + usernameLength);
}
if (password != nullptr) {
_data.insert(_data.end(), passwordLengthBytes, passwordLengthBytes + 2);
_data.insert(_data.end(), password, password + passwordLength);
}
}
const uint8_t* ConnectOutPacket::data(size_t index) const {
return &_data.data()[index];
}
size_t ConnectOutPacket::size() const {
return _data.size();
}

View File

@@ -0,0 +1,29 @@
#pragma once
#include <vector>
#include <cstring> // strlen
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
namespace AsyncMqttClientInternals {
class ConnectOutPacket : public OutPacket {
public:
ConnectOutPacket(bool cleanSession,
const char* username,
const char* password,
const char* willTopic,
bool willRetain,
uint8_t willQos,
const char* willPayload,
uint16_t willPayloadLength,
uint16_t keepAlive,
const char* clientId);
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
std::vector<uint8_t> _data;
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,18 @@
#include "Disconn.hpp"
using AsyncMqttClientInternals::DisconnOutPacket;
DisconnOutPacket::DisconnOutPacket() {
_data[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
_data[0] = _data[0] << 4;
_data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED;
_data[1] = 0;
}
const uint8_t* DisconnOutPacket::data(size_t index) const {
return &_data[index];
}
size_t DisconnOutPacket::size() const {
return 2;
}

View File

@@ -0,0 +1,17 @@
#pragma once
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
namespace AsyncMqttClientInternals {
class DisconnOutPacket : public OutPacket {
public:
DisconnOutPacket();
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
uint8_t _data[2];
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,44 @@
#include "OutPacket.hpp"
using AsyncMqttClientInternals::OutPacket;
OutPacket::OutPacket()
: next(nullptr)
, timeout(0)
, noTries(0)
, _released(true)
, _packetId(0) {}
OutPacket::~OutPacket() {}
bool OutPacket::released() const {
return _released;
}
uint8_t OutPacket::packetType() const {
return data(0)[0] >> 4;
}
uint16_t OutPacket::packetId() const {
return _packetId;
}
uint8_t OutPacket::qos() const {
if (packetType() == AsyncMqttClientInternals::PacketType.PUBLISH) {
return (data()[1] & 0x06) >> 1;
}
return 0;
}
void OutPacket::release() {
_released = true;
}
uint16_t OutPacket::_nextPacketId = 0;
uint16_t OutPacket::_getNextPacketId() {
if (++_nextPacketId == 0) {
++_nextPacketId;
}
return _nextPacketId;
}

View File

@@ -0,0 +1,35 @@
#pragma once
#include <stdint.h> // uint*_t
#include <stddef.h> // size_t
#include <algorithm> // std::min
#include "../../Flags.hpp"
namespace AsyncMqttClientInternals {
class OutPacket {
public:
OutPacket();
virtual ~OutPacket();
virtual const uint8_t* data(size_t index = 0) const = 0;
virtual size_t size() const = 0;
bool released() const;
uint8_t packetType() const;
uint16_t packetId() const;
uint8_t qos() const;
void release();
public:
OutPacket* next;
uint32_t timeout;
uint8_t noTries;
protected:
static uint16_t _getNextPacketId();
bool _released;
uint16_t _packetId;
private:
static uint16_t _nextPacketId;
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,18 @@
#include "PingReq.hpp"
using AsyncMqttClientInternals::PingReqOutPacket;
PingReqOutPacket::PingReqOutPacket() {
_data[0] = AsyncMqttClientInternals::PacketType.PINGREQ;
_data[0] = _data[0] << 4;
_data[0] = _data[0] | AsyncMqttClientInternals::HeaderFlag.PINGREQ_RESERVED;
_data[1] = 0;
}
const uint8_t* PingReqOutPacket::data(size_t index) const {
return &_data[index];;
}
size_t PingReqOutPacket::size() const {
return 2;
}

View File

@@ -0,0 +1,17 @@
#pragma once
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
namespace AsyncMqttClientInternals {
class PingReqOutPacket : public OutPacket {
public:
PingReqOutPacket();
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
uint8_t _data[2];
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,26 @@
#include "PubAck.hpp"
using AsyncMqttClientInternals::PubAckOutPacket;
PubAckOutPacket::PubAckOutPacket(PendingAck pendingAck) {
_data[0] = pendingAck.packetType;
_data[0] = _data[0] << 4;
_data[0] = _data[0] | pendingAck.headerFlag;
_data[1] = 2;
_packetId = pendingAck.packetId;
_data[2] = pendingAck.packetId >> 8;
_data[3] = pendingAck.packetId & 0xFF;
_released = false;
if (packetType() == AsyncMqttClientInternals::PacketType.PUBREL ||
packetType() == AsyncMqttClientInternals::PacketType.PUBREC) {
_released = false;
}
}
const uint8_t* PubAckOutPacket::data(size_t index) const {
return &_data[index];
}
size_t PubAckOutPacket::size() const {
return 4;
}

View File

@@ -0,0 +1,18 @@
#pragma once
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
#include "../../Storage.hpp"
namespace AsyncMqttClientInternals {
class PubAckOutPacket : public OutPacket {
public:
explicit PubAckOutPacket(PendingAck pendingAck);
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
uint8_t _data[4];
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,69 @@
#include "Publish.hpp"
using AsyncMqttClientInternals::PublishOutPacket;
PublishOutPacket::PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length) {
char fixedHeader[5];
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBLISH;
fixedHeader[0] = fixedHeader[0] << 4;
// if (dup) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP;
if (retain) fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_RETAIN;
switch (qos) {
case 0:
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS0;
break;
case 1:
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS1;
break;
case 2:
fixedHeader[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_QOS2;
break;
}
uint16_t topicLength = strlen(topic);
char topicLengthBytes[2];
topicLengthBytes[0] = topicLength >> 8;
topicLengthBytes[1] = topicLength & 0xFF;
uint32_t payloadLength = length;
if (payload != nullptr && payloadLength == 0) payloadLength = strlen(payload);
uint32_t remainingLength = 2 + topicLength + payloadLength;
if (qos != 0) remainingLength += 2;
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
size_t neededSpace = 0;
neededSpace += 1 + remainingLengthLength;
neededSpace += 2;
neededSpace += topicLength;
if (qos != 0) neededSpace += 2;
if (payload != nullptr) neededSpace += payloadLength;
_data.reserve(neededSpace);
_packetId = (qos !=0) ? _getNextPacketId() : 1;
char packetIdBytes[2];
packetIdBytes[0] = _packetId >> 8;
packetIdBytes[1] = _packetId & 0xFF;
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
_data.insert(_data.end(), topic, topic + topicLength);
if (qos != 0) {
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
_released = false;
}
if (payload != nullptr) _data.insert(_data.end(), payload, payload + payloadLength);
}
const uint8_t* PublishOutPacket::data(size_t index) const {
return &_data.data()[index];
}
size_t PublishOutPacket::size() const {
return _data.size();
}
void PublishOutPacket::setDup() {
_data[0] |= AsyncMqttClientInternals::HeaderFlag.PUBLISH_DUP;
}

View File

@@ -0,0 +1,23 @@
#pragma once
#include <cstring> // strlen
#include <vector>
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
#include "../../Storage.hpp"
namespace AsyncMqttClientInternals {
class PublishOutPacket : public OutPacket {
public:
PublishOutPacket(const char* topic, uint8_t qos, bool retain, const char* payload, size_t length);
const uint8_t* data(size_t index = 0) const;
size_t size() const;
void setDup(); // you cannot unset dup
private:
std::vector<uint8_t> _data;
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,49 @@
#include "Subscribe.hpp"
using AsyncMqttClientInternals::SubscribeOutPacket;
SubscribeOutPacket::SubscribeOutPacket(const char* topic, uint8_t qos) {
char fixedHeader[5];
fixedHeader[0] = AsyncMqttClientInternals::PacketType.SUBSCRIBE;
fixedHeader[0] = fixedHeader[0] << 4;
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.SUBSCRIBE_RESERVED;
uint16_t topicLength = strlen(topic);
char topicLengthBytes[2];
topicLengthBytes[0] = topicLength >> 8;
topicLengthBytes[1] = topicLength & 0xFF;
char qosByte[1];
qosByte[0] = qos;
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength + 1, fixedHeader + 1);
size_t neededSpace = 0;
neededSpace += 1 + remainingLengthLength;
neededSpace += 2;
neededSpace += 2;
neededSpace += topicLength;
neededSpace += 1;
_data.reserve(neededSpace);
_packetId = _getNextPacketId();
char packetIdBytes[2];
packetIdBytes[0] = _packetId >> 8;
packetIdBytes[1] = _packetId & 0xFF;
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
_data.insert(_data.end(), topic, topic + topicLength);
_data.push_back(qosByte[0]);
_released = false;
}
const uint8_t* SubscribeOutPacket::data(size_t index) const {
return &_data.data()[index];
}
size_t SubscribeOutPacket::size() const {
return _data.size();
}

View File

@@ -0,0 +1,21 @@
#pragma once
#include <cstring> // strlen
#include <vector>
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
#include "../../Storage.hpp"
namespace AsyncMqttClientInternals {
class SubscribeOutPacket : public OutPacket {
public:
SubscribeOutPacket(const char* topic, uint8_t qos);
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
std::vector<uint8_t> _data;
};
} // namespace AsyncMqttClientInternals

View File

@@ -0,0 +1,42 @@
#include "Unsubscribe.hpp"
using AsyncMqttClientInternals::UnsubscribeOutPacket;
UnsubscribeOutPacket::UnsubscribeOutPacket(const char* topic) {
char fixedHeader[5];
fixedHeader[0] = AsyncMqttClientInternals::PacketType.UNSUBSCRIBE;
fixedHeader[0] = fixedHeader[0] << 4;
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.UNSUBSCRIBE_RESERVED;
uint16_t topicLength = strlen(topic);
char topicLengthBytes[2];
topicLengthBytes[0] = topicLength >> 8;
topicLengthBytes[1] = topicLength & 0xFF;
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(2 + 2 + topicLength, fixedHeader + 1);
size_t neededSpace = 0;
neededSpace += 1 + remainingLengthLength;
neededSpace += 2;
neededSpace += 2;
neededSpace += topicLength;
_packetId = _getNextPacketId();
char packetIdBytes[2];
packetIdBytes[0] = _packetId >> 8;
packetIdBytes[1] = _packetId & 0xFF;
_data.insert(_data.end(), fixedHeader, fixedHeader + 1 + remainingLengthLength);
_data.insert(_data.end(), packetIdBytes, packetIdBytes + 2);
_data.insert(_data.end(), topicLengthBytes, topicLengthBytes + 2);
_data.insert(_data.end(), topic, topic + topicLength);
_released = false;
}
const uint8_t* UnsubscribeOutPacket::data(size_t index) const {
return &_data.data()[index];
}
size_t UnsubscribeOutPacket::size() const {
return _data.size();
}

View File

@@ -0,0 +1,21 @@
#pragma once
#include <cstring> // strlen
#include <vector>
#include "OutPacket.hpp"
#include "../../Flags.hpp"
#include "../../Helpers.hpp"
#include "../../Storage.hpp"
namespace AsyncMqttClientInternals {
class UnsubscribeOutPacket : public OutPacket {
public:
explicit UnsubscribeOutPacket(const char* topic);
const uint8_t* data(size_t index = 0) const;
size_t size() const;
private:
std::vector<uint8_t> _data;
};
} // namespace AsyncMqttClientInternals

View File

@@ -16,8 +16,7 @@ PublishPacket::PublishPacket(ParsingInformation* parsingInformation, OnMessageIn
, _packetIdMsb(0) , _packetIdMsb(0)
, _packetId(0) , _packetId(0)
, _payloadLength(0) , _payloadLength(0)
, _payloadBytesRead(0) , _payloadBytesRead(0) {
, _ptempbuff(0) {
_dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP; _dup = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_DUP;
_retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN; _retain = _parsingInformation->packetFlags & HeaderFlag.PUBLISH_RETAIN;
char qosMasked = _parsingInformation->packetFlags & 0x06; char qosMasked = _parsingInformation->packetFlags & 0x06;
@@ -79,36 +78,14 @@ void PublishPacket::_preparePayloadHandling(uint32_t payloadLength) {
void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) { void PublishPacket::parsePayload(char* data, size_t len, size_t* currentBytePosition) {
size_t remainToRead = len - (*currentBytePosition); size_t remainToRead = len - (*currentBytePosition);
if (_payloadBytesRead + remainToRead > _payloadLength) if (_payloadBytesRead + remainToRead > _payloadLength) remainToRead = _payloadLength - _payloadBytesRead;
remainToRead = _payloadLength - _payloadBytesRead;
if (!_ignore) { if (!_ignore) _dataCallback(_parsingInformation->topicBuffer, data + (*currentBytePosition), _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId);
if (remainToRead < _payloadLength) {
if (!_ptempbuff) {
_ptempbuff = new char[_payloadLength + 1];
if (_ptempbuff == nullptr) {
_ignore = true;
return;
}
memset(_ptempbuff, 0, _payloadLength + 1);
memcpy(&_ptempbuff[_payloadBytesRead], &data[(*currentBytePosition)], remainToRead);
} else {
memcpy(&_ptempbuff[_payloadBytesRead], &data[(*currentBytePosition)], remainToRead);
if ((_payloadBytesRead + remainToRead) == _payloadLength) {
_dataCallback(_parsingInformation->topicBuffer, _ptempbuff, _qos, _dup, _retain, _payloadLength, 0, _payloadLength, _packetId);
delete[] _ptempbuff;
_ptempbuff = NULL;
}
}
} else {
_dataCallback(_parsingInformation->topicBuffer, &data[(*currentBytePosition)], _qos, _dup, _retain, remainToRead, _payloadBytesRead, _payloadLength, _packetId);
}
}
_payloadBytesRead += remainToRead; _payloadBytesRead += remainToRead;
(*currentBytePosition) += remainToRead; (*currentBytePosition) += remainToRead;
if (_payloadBytesRead == _payloadLength) { if (_payloadBytesRead == _payloadLength) {
_parsingInformation->bufferState = BufferState::NONE; _parsingInformation->bufferState = BufferState::NONE;
if (!_ignore) if (!_ignore) _completeCallback(_packetId, _qos);
_completeCallback(_packetId, _qos);
} }
} }

View File

@@ -34,6 +34,5 @@ class PublishPacket : public Packet {
uint16_t _packetId; uint16_t _packetId;
uint32_t _payloadLength; uint32_t _payloadLength;
uint32_t _payloadBytesRead; uint32_t _payloadBytesRead;
char* _ptempbuff;
}; };
} // namespace AsyncMqttClientInternals } // namespace AsyncMqttClientInternals

View File

@@ -656,7 +656,6 @@ void Mqtt::publish_ha(const __FlashStringHelper * topic, const JsonObject & payl
} }
// publish a Home Assistant config topic and payload, with retain flag off. // publish a Home Assistant config topic and payload, with retain flag off.
// for ESP32 its added to the queue, for ESP8266 is sent immediatelty
void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) { void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) {
if (!enabled()) { if (!enabled()) {
return; return;
@@ -676,20 +675,7 @@ void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) {
#endif #endif
// queue messages if the MQTT connection is not yet established. to ensure we don't miss messages // queue messages if the MQTT connection is not yet established. to ensure we don't miss messages
bool queued = !connected(); queue_publish_message(topic, payload_text, true); // with retain true
if (queued) {
queue_publish_message(topic, payload_text, true); // with retain true
return;
}
// send immediately and then wait a while
if (!mqttClient_->publish(topic.c_str(), 0, true, payload_text.c_str())) {
LOG_ERROR(F("Failed to publish topic %s"), topic.c_str());
mqtt_publish_fails_++; // increment failure counter
}
delay(MQTT_HA_PUBLISH_DELAY); // enough time to send the short message out
} }
// take top from queue and perform the publish or subscribe action // take top from queue and perform the publish or subscribe action

View File

@@ -42,13 +42,7 @@ using uuid::console::Shell;
#define MQTT_HA_PUBLISH_DELAY 50 #define MQTT_HA_PUBLISH_DELAY 50
// size of queue // size of queue
#if defined(EMSESP_STANDALONE)
#define MAX_MQTT_MESSAGES 70
#elif defined(ESP32)
#define MAX_MQTT_MESSAGES 100 #define MAX_MQTT_MESSAGES 100
#else
#define MAX_MQTT_MESSAGES 20
#endif
enum { BOOL_FORMAT_ONOFF = 1, BOOL_FORMAT_TRUEFALSE, BOOL_FORMAT_10 }; // matches Web UI settings enum { BOOL_FORMAT_ONOFF = 1, BOOL_FORMAT_TRUEFALSE, BOOL_FORMAT_10 }; // matches Web UI settings

View File

@@ -1 +1 @@
#define EMSESP_APP_VERSION "3.0.0b2" #define EMSESP_APP_VERSION "3.0.0b3"