use espMqttClient, qos2 fixed

This commit is contained in:
MichaelDvP
2023-06-03 16:36:53 +02:00
parent d9c2fe0fb9
commit 7865ddc51f
43 changed files with 3749 additions and 230 deletions

View File

@@ -0,0 +1,77 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
Parts are based on the original work of Marvin Roger:
https://github.com/marvinroger/async-mqtt-client
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#pragma once
#include <stdint.h>
namespace espMqttClientInternals {
constexpr const char PROTOCOL[] = "MQTT";
constexpr const uint8_t PROTOCOL_LEVEL = 0b00000100;
typedef uint8_t MQTTPacketType;
constexpr struct {
const uint8_t RESERVED1 = 0;
const uint8_t CONNECT = 1 << 4;
const uint8_t CONNACK = 2 << 4;
const uint8_t PUBLISH = 3 << 4;
const uint8_t PUBACK = 4 << 4;
const uint8_t PUBREC = 5 << 4;
const uint8_t PUBREL = 6 << 4;
const uint8_t PUBCOMP = 7 << 4;
const uint8_t SUBSCRIBE = 8 << 4;
const uint8_t SUBACK = 9 << 4;
const uint8_t UNSUBSCRIBE = 10 << 4;
const uint8_t UNSUBACK = 11 << 4;
const uint8_t PINGREQ = 12 << 4;
const uint8_t PINGRESP = 13 << 4;
const uint8_t DISCONNECT = 14 << 4;
const uint8_t RESERVED2 = 1 << 4;
} PacketType;
constexpr struct {
const uint8_t CONNECT_RESERVED = 0x00;
const uint8_t CONNACK_RESERVED = 0x00;
const uint8_t PUBLISH_DUP = 0x08;
const uint8_t PUBLISH_QOS0 = 0x00;
const uint8_t PUBLISH_QOS1 = 0x02;
const uint8_t PUBLISH_QOS2 = 0x04;
const uint8_t PUBLISH_QOSRESERVED = 0x06;
const uint8_t PUBLISH_RETAIN = 0x01;
const uint8_t PUBACK_RESERVED = 0x00;
const uint8_t PUBREC_RESERVED = 0x00;
const uint8_t PUBREL_RESERVED = 0x02;
const uint8_t PUBCOMP_RESERVED = 0x00;
const uint8_t SUBSCRIBE_RESERVED = 0x02;
const uint8_t SUBACK_RESERVED = 0x00;
const uint8_t UNSUBSCRIBE_RESERVED = 0x02;
const uint8_t UNSUBACK_RESERVED = 0x00;
const uint8_t PINGREQ_RESERVED = 0x00;
const uint8_t PINGRESP_RESERVED = 0x00;
const uint8_t DISCONNECT_RESERVED = 0x00;
const uint8_t RESERVED2_RESERVED = 0x00;
} HeaderFlag;
constexpr struct {
const uint8_t USERNAME = 0x80;
const uint8_t PASSWORD = 0x40;
const uint8_t WILL_RETAIN = 0x20;
const uint8_t WILL_QOS0 = 0x00;
const uint8_t WILL_QOS1 = 0x08;
const uint8_t WILL_QOS2 = 0x10;
const uint8_t WILL = 0x04;
const uint8_t CLEAN_SESSION = 0x02;
const uint8_t RESERVED = 0x00;
} ConnectFlag;
} // end namespace espMqttClientInternals

View File

@@ -0,0 +1,438 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#include "Packet.h"
namespace espMqttClientInternals {
Packet::~Packet() {
free(_data);
}
size_t Packet::available(size_t index) {
if (index >= _size) return 0;
if (!_getPayload) return _size - index;
return _chunkedAvailable(index);
}
const uint8_t* Packet::data(size_t index) const {
if (!_getPayload) {
if (!_data) return nullptr;
if (index >= _size) return nullptr;
return &_data[index];
}
return _chunkedData(index);
}
size_t Packet::size() const {
return _size;
}
void Packet::setDup() {
if (!_data) return;
if (packetType() != PacketType.PUBLISH) return;
if (_packetId == 0) return;
_data[0] |= 0x08;
}
uint16_t Packet::packetId() const {
return _packetId;
}
MQTTPacketType Packet::packetType() const {
if (_data) return static_cast<MQTTPacketType>(_data[0] & 0xF0);
return static_cast<MQTTPacketType>(0);
}
bool Packet::removable() const {
if (_packetId == 0) return true;
if ((packetType() == PacketType.PUBACK) || (packetType() == PacketType.PUBCOMP)) return true;
return false;
}
Packet::Packet(espMqttClientTypes::Error& error,
bool cleanSession,
const char* username,
const char* password,
const char* willTopic,
bool willRetain,
uint8_t willQos,
const uint8_t* willPayload,
uint16_t willPayloadLength,
uint16_t keepAlive,
const char* clientId)
: _packetId(0)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
if (willPayload && willPayloadLength == 0) {
size_t length = strlen(reinterpret_cast<const char*>(willPayload));
if (length > UINT16_MAX) {
emc_log_w("Payload length truncated (l:%zu)", length);
willPayloadLength = UINT16_MAX;
} else {
willPayloadLength = length;
}
}
if (!clientId || strlen(clientId) == 0) {
emc_log_w("clientId not set error");
error = espMqttClientTypes::Error::MALFORMED_PARAMETER;
return;
}
// Calculate size
size_t remainingLength =
6 + // protocol
1 + // protocol level
1 + // connect flags
2 + // keepalive
2 + strlen(clientId) +
(willTopic ? 2 + strlen(willTopic) + 2 + willPayloadLength : 0) +
(username ? 2 + strlen(username) : 0) +
(password ? 2 + strlen(password) : 0);
// allocate memory
if (!_allocate(remainingLength)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
// serialize
size_t pos = 0;
// FIXED HEADER
_data[pos++] = PacketType.CONNECT | HeaderFlag.CONNECT_RESERVED;
pos += encodeRemainingLength(remainingLength, &_data[pos]);
pos += encodeString(PROTOCOL, &_data[pos]);
_data[pos++] = PROTOCOL_LEVEL;
uint8_t connectFlags = 0;
if (cleanSession) connectFlags |= espMqttClientInternals::ConnectFlag.CLEAN_SESSION;
if (username != nullptr) connectFlags |= espMqttClientInternals::ConnectFlag.USERNAME;
if (password != nullptr) connectFlags |= espMqttClientInternals::ConnectFlag.PASSWORD;
if (willTopic != nullptr) {
connectFlags |= espMqttClientInternals::ConnectFlag.WILL;
if (willRetain) connectFlags |= espMqttClientInternals::ConnectFlag.WILL_RETAIN;
switch (willQos) {
case 0:
connectFlags |= espMqttClientInternals::ConnectFlag.WILL_QOS0;
break;
case 1:
connectFlags |= espMqttClientInternals::ConnectFlag.WILL_QOS1;
break;
case 2:
connectFlags |= espMqttClientInternals::ConnectFlag.WILL_QOS2;
break;
}
}
_data[pos++] = connectFlags;
_data[pos++] = keepAlive >> 8;
_data[pos++] = keepAlive & 0xFF;
// PAYLOAD
// client ID
pos += encodeString(clientId, &_data[pos]);
// will
if (willTopic != nullptr && willPayload != nullptr) {
pos += encodeString(willTopic, &_data[pos]);
_data[pos++] = willPayloadLength >> 8;
_data[pos++] = willPayloadLength & 0xFF;
memcpy(&_data[pos], willPayload, willPayloadLength);
pos += willPayloadLength;
}
// credentials
if (username != nullptr) pos += encodeString(username, &_data[pos]);
if (password != nullptr) encodeString(password, &_data[pos]);
error = espMqttClientTypes::Error::SUCCESS;
}
Packet::Packet(espMqttClientTypes::Error& error,
uint16_t packetId,
const char* topic,
const uint8_t* payload,
size_t payloadLength,
uint8_t qos,
bool retain)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
size_t remainingLength =
2 + strlen(topic) + // topic length + topic
2 + // packet ID
payloadLength;
if (qos == 0) {
remainingLength -= 2;
_packetId = 0;
}
if (!_allocate(remainingLength)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
size_t pos = _fillPublishHeader(packetId, topic, remainingLength, qos, retain);
// PAYLOAD
memcpy(&_data[pos], payload, payloadLength);
error = espMqttClientTypes::Error::SUCCESS;
}
Packet::Packet(espMqttClientTypes::Error& error,
uint16_t packetId,
const char* topic,
espMqttClientTypes::PayloadCallback payloadCallback,
size_t payloadLength,
uint8_t qos,
bool retain)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(payloadCallback) {
size_t remainingLength =
2 + strlen(topic) + // topic length + topic
2 + // packet ID
payloadLength;
if (qos == 0) {
remainingLength -= 2;
_packetId = 0;
}
if (!_allocate(remainingLength - payloadLength + std::min(payloadLength, static_cast<size_t>(EMC_RX_BUFFER_SIZE)))) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
size_t pos = _fillPublishHeader(packetId, topic, remainingLength, qos, retain);
// payload will be added by 'Packet::available'
_size = pos + payloadLength;
_payloadIndex = pos;
_payloadStartIndex = _payloadIndex;
_payloadEndIndex = _payloadIndex;
error = espMqttClientTypes::Error::SUCCESS;
}
Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* topic, uint8_t qos)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
SubscribeItem list[1] = {topic, qos};
_createSubscribe(error, list, 1);
}
Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type, uint16_t packetId)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
if (!_allocate(2)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
size_t pos = 0;
_data[pos] = type;
if (type == PacketType.PUBREL) {
_data[pos++] |= HeaderFlag.PUBREL_RESERVED;
} else {
pos++;
}
pos += encodeRemainingLength(2, &_data[pos]);
_data[pos++] = packetId >> 8;
_data[pos] = packetId & 0xFF;
error = espMqttClientTypes::Error::SUCCESS;
}
Packet::Packet(espMqttClientTypes::Error& error, uint16_t packetId, const char* topic)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
const char* list[1] = {topic};
_createUnsubscribe(error, list, 1);
}
Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
: _packetId(0)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
if (!_allocate(0)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
_data[0] |= type;
error = espMqttClientTypes::Error::SUCCESS;
}
bool Packet::_allocate(size_t remainingLength) {
if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory");
return false;
}
_size = 1 + remainingLengthLength(remainingLength) + remainingLength;
_data = reinterpret_cast<uint8_t*>(malloc(_size));
if (!_data) {
_size = 0;
emc_log_w("Alloc failed (l:%zu)", _size);
return false;
}
emc_log_i("Alloc (l:%zu)", _size);
memset(_data, 0, _size);
return true;
}
size_t Packet::_fillPublishHeader(uint16_t packetId,
const char* topic,
size_t remainingLength,
uint8_t qos,
bool retain) {
size_t index = 0;
// FIXED HEADER
_data[index] = PacketType.PUBLISH;
if (retain) _data[index] |= HeaderFlag.PUBLISH_RETAIN;
if (qos == 0) {
_data[index++] |= HeaderFlag.PUBLISH_QOS0;
} else if (qos == 1) {
_data[index++] |= HeaderFlag.PUBLISH_QOS1;
} else if (qos == 2) {
_data[index++] |= HeaderFlag.PUBLISH_QOS2;
}
index += encodeRemainingLength(remainingLength, &_data[index]);
// VARIABLE HEADER
index += encodeString(topic, &_data[index]);
if (qos > 0) {
_data[index++] = packetId >> 8;
_data[index++] = packetId & 0xFF;
}
return index;
}
void Packet::_createSubscribe(espMqttClientTypes::Error& error,
SubscribeItem* list,
size_t numberTopics) {
// Calculate size
size_t payload = 0;
for (size_t i = 0; i < numberTopics; ++i) {
payload += 2 + strlen(list[i].topic) + 1; // length bytes, string, qos
}
size_t remainingLength = 2 + payload; // packetId + payload
// allocate memory
if (!_allocate(remainingLength)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
// serialize
size_t pos = 0;
_data[pos++] = PacketType.SUBSCRIBE | HeaderFlag.SUBSCRIBE_RESERVED;
pos += encodeRemainingLength(remainingLength, &_data[pos]);
_data[pos++] = _packetId >> 8;
_data[pos++] = _packetId & 0xFF;
for (size_t i = 0; i < numberTopics; ++i) {
pos += encodeString(list[i].topic, &_data[pos]);
_data[pos++] = list[i].qos;
}
error = espMqttClientTypes::Error::SUCCESS;
}
void Packet::_createUnsubscribe(espMqttClientTypes::Error& error,
const char** list,
size_t numberTopics) {
// Calculate size
size_t payload = 0;
for (size_t i = 0; i < numberTopics; ++i) {
payload += 2 + strlen(list[i]); // length bytes, string
}
size_t remainingLength = 2 + payload; // packetId + payload
// allocate memory
if (!_allocate(remainingLength)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
// serialize
size_t pos = 0;
_data[pos++] = PacketType.UNSUBSCRIBE | HeaderFlag.UNSUBSCRIBE_RESERVED;
pos += encodeRemainingLength(remainingLength, &_data[pos]);
_data[pos++] = _packetId >> 8;
_data[pos++] = _packetId & 0xFF;
for (size_t i = 0; i < numberTopics; ++i) {
pos += encodeString(list[i], &_data[pos]);
}
error = espMqttClientTypes::Error::SUCCESS;
}
size_t Packet::_chunkedAvailable(size_t index) {
// index vs size check done in 'available(index)'
// index points to header or first payload byte
if (index < _payloadIndex) {
if (_size > _payloadIndex && _payloadEndIndex != 0) {
size_t copied = _getPayload(&_data[_payloadIndex], std::min(static_cast<size_t>(EMC_TX_BUFFER_SIZE), _size - _payloadStartIndex), index);
_payloadStartIndex = _payloadIndex;
_payloadEndIndex = _payloadStartIndex + copied - 1;
}
// index points to payload unavailable
} else if (index > _payloadEndIndex || _payloadStartIndex > index) {
_payloadStartIndex = index;
size_t copied = _getPayload(&_data[_payloadIndex], std::min(static_cast<size_t>(EMC_TX_BUFFER_SIZE), _size - _payloadStartIndex), index);
_payloadEndIndex = _payloadStartIndex + copied - 1;
}
// now index points to header or payload available
return _payloadEndIndex - index + 1;
}
const uint8_t* Packet::_chunkedData(size_t index) const {
// CAUTION!! available(index) has to be called first to check available data and possibly fill payloadbuffer
if (index < _payloadIndex) {
return &_data[index];
}
return &_data[index - _payloadStartIndex + _payloadIndex];
}
} // end namespace espMqttClientInternals

View File

@@ -0,0 +1,155 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include "Constants.h"
#include "../Config.h"
#include "../TypeDefs.h"
#include "../Helpers.h"
#include "../Logging.h"
#include "RemainingLength.h"
#include "String.h"
namespace espMqttClientInternals {
class Packet {
public:
~Packet();
size_t available(size_t index);
const uint8_t* data(size_t index) const;
size_t size() const;
void setDup();
uint16_t packetId() const;
MQTTPacketType packetType() const;
bool removable() const;
protected:
uint16_t _packetId; // save as separate variable: will be accessed frequently
uint8_t* _data;
size_t _size;
// variables for chunked payload handling
size_t _payloadIndex;
size_t _payloadStartIndex;
size_t _payloadEndIndex;
espMqttClientTypes::PayloadCallback _getPayload;
struct SubscribeItem {
const char* topic;
uint8_t qos;
};
public:
// CONNECT
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
bool cleanSession,
const char* username,
const char* password,
const char* willTopic,
bool willRetain,
uint8_t willQos,
const uint8_t* willPayload,
uint16_t willPayloadLength,
uint16_t keepAlive,
const char* clientId);
// PUBLISH
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic,
const uint8_t* payload,
size_t payloadLength,
uint8_t qos,
bool retain);
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic,
espMqttClientTypes::PayloadCallback payloadCallback,
size_t payloadLength,
uint8_t qos,
bool retain);
// SUBSCRIBE
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic,
uint8_t qos);
template<typename ... Args>
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic1,
uint8_t qos1,
const char* topic2,
uint8_t qos2,
Args&& ... args)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
static_assert(sizeof...(Args) % 2 == 0, "Subscribe should be in topic/qos pairs");
size_t numberTopics = 2 + (sizeof...(Args) / 2);
SubscribeItem list[numberTopics] = {topic1, qos1, topic2, qos2, args...};
_createSubscribe(error, list, numberTopics);
}
// UNSUBSCRIBE
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic);
template<typename ... Args>
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
uint16_t packetId,
const char* topic1,
const char* topic2,
Args&& ... args)
: _packetId(packetId)
, _data(nullptr)
, _size(0)
, _payloadIndex(0)
, _payloadStartIndex(0)
, _payloadEndIndex(0)
, _getPayload(nullptr) {
size_t numberTopics = 2 + sizeof...(Args);
const char* list[numberTopics] = {topic1, topic2, args...};
_createUnsubscribe(error, list, numberTopics);
}
// PUBACK, PUBREC, PUBREL, PUBCOMP
Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
MQTTPacketType type,
uint16_t packetId);
// PING, DISCONN
explicit Packet(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
MQTTPacketType type);
private:
// pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength);
// fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId,
const char* topic,
size_t remainingLength,
uint8_t qos,
bool retain);
void _createSubscribe(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
SubscribeItem* list,
size_t numberTopics);
void _createUnsubscribe(espMqttClientTypes::Error& error, // NOLINT(runtime/references)
const char** list,
size_t numberTopics);
size_t _chunkedAvailable(size_t index);
const uint8_t* _chunkedData(size_t index) const;
};
} // end namespace espMqttClientInternals

View File

@@ -0,0 +1,316 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#include "Parser.h"
namespace espMqttClientInternals {
uint8_t IncomingPacket::qos() const {
if ((fixedHeader.packetType & 0xF0) != PacketType.PUBLISH) return 0;
return (fixedHeader.packetType & 0x06) >> 1; // mask 0x00000110
}
bool IncomingPacket::retain() const {
if ((fixedHeader.packetType & 0xF0) != PacketType.PUBLISH) return 0;
return fixedHeader.packetType & 0x01; // mask 0x00000001
}
bool IncomingPacket::dup() const {
if ((fixedHeader.packetType & 0xF0) != PacketType.PUBLISH) return 0;
return fixedHeader.packetType & 0x08; // mask 0x00001000
}
void IncomingPacket::reset() {
fixedHeader.packetType = 0;
variableHeader.topicLength = 0;
variableHeader.fixed.packetId = 0;
payload.index = 0;
payload.length = 0;
}
Parser::Parser()
: _data(nullptr)
, _len(0)
, _bytesRead(0)
, _bytePos(0)
, _parse(_fixedHeader)
, _packet()
, _payloadBuffer{0} {
// empty
}
ParserResult Parser::parse(const uint8_t* data, size_t len, size_t* bytesRead) {
_data = data;
_len = len;
_bytesRead = 0;
ParserResult result = ParserResult::awaitData;
while (result == ParserResult::awaitData && _bytesRead < _len) {
result = _parse(this);
++_bytesRead;
}
(*bytesRead) += _bytesRead;
return result;
}
const IncomingPacket& Parser::getPacket() const {
return _packet;
}
void Parser::reset() {
_parse = _fixedHeader;
_bytesRead = 0;
_bytePos = 0;
_packet.reset();
}
ParserResult Parser::_fixedHeader(Parser* p) {
p->_packet.reset();
p->_packet.fixedHeader.packetType = p->_data[p->_bytesRead];
// keep PUBLISH out of the switch and handle in separate if/else
if ((p->_packet.fixedHeader.packetType & 0xF0) == PacketType.PUBLISH) {
uint8_t headerFlags = p->_packet.fixedHeader.packetType & 0x0F;
/* flags can be: 0b0000 --> no dup, qos 0, no retain
0x0001 --> no dup, qos 0, retain
0x0010 --> no dup, qos 1, no retain
0x0011 --> no dup, qos 1, retain
0x0100 --> no dup, qos 2, no retain
0x0101 --> no dup, qos 2, retain
0x1010 --> dup, qos 1, no retain
0x1011 --> dup, qos 1, retain
0x1100 --> dup, qos 2, no retain
0x1101 --> dup, qos 2, retain
*/
if (headerFlags <= 0x05 || headerFlags >= 0x0A) {
p->_parse = _remainingLengthVariable;
p->_bytePos = 0;
} else {
emc_log_w("Invalid packet header: 0x%02x", p->_packet.fixedHeader.packetType);
return ParserResult::protocolError;
}
} else {
switch (p->_packet.fixedHeader.packetType) {
case PacketType.CONNACK | HeaderFlag.CONNACK_RESERVED:
case PacketType.PUBACK | HeaderFlag.PUBACK_RESERVED:
case PacketType.PUBREC | HeaderFlag.PUBREC_RESERVED:
case PacketType.PUBREL | HeaderFlag.PUBREL_RESERVED:
case PacketType.PUBCOMP | HeaderFlag.PUBCOMP_RESERVED:
case PacketType.UNSUBACK | HeaderFlag.UNSUBACK_RESERVED:
p->_parse = _remainingLengthFixed;
break;
case PacketType.SUBACK | HeaderFlag.SUBACK_RESERVED:
p->_parse = _remainingLengthVariable;
p->_bytePos = 0;
break;
case PacketType.PINGRESP | HeaderFlag.PINGRESP_RESERVED:
p->_parse = _remainingLengthNone;
break;
default:
emc_log_w("Invalid packet header: 0x%02x", p->_packet.fixedHeader.packetType);
return ParserResult::protocolError;
}
}
emc_log_i("Packet type: 0x%02x", p->_packet.fixedHeader.packetType);
return ParserResult::awaitData;
}
ParserResult Parser::_remainingLengthFixed(Parser* p) {
p->_packet.fixedHeader.remainingLength.remainingLength = p->_data[p->_bytesRead];
if (p->_packet.fixedHeader.remainingLength.remainingLength == 2) { // variable header is 2 bytes long
if ((p->_packet.fixedHeader.packetType & 0xF0) != PacketType.CONNACK) {
p->_parse = _varHeaderPacketId1;
} else {
p->_parse = _varHeaderConnack1;
}
emc_log_i("Remaining length: %zu", p->_packet.fixedHeader.remainingLength.remainingLength);
return ParserResult::awaitData;
}
p->_parse = _fixedHeader;
emc_log_w("Invalid remaining length (fixed): %zu", p->_packet.fixedHeader.remainingLength.remainingLength);
return ParserResult::protocolError;
}
ParserResult Parser::_remainingLengthVariable(Parser* p) {
p->_packet.fixedHeader.remainingLength.remainingLengthRaw[p->_bytePos] = p->_data[p->_bytesRead];
if (p->_packet.fixedHeader.remainingLength.remainingLengthRaw[p->_bytePos] & 0x80) {
p->_bytePos++;
if (p->_bytePos == 4) {
emc_log_w("Invalid remaining length (variable)");
return ParserResult::protocolError;
} else {
return ParserResult::awaitData;
}
}
// no need to check for negative decoded length, check is already done
p->_packet.fixedHeader.remainingLength.remainingLength = decodeRemainingLength(p->_packet.fixedHeader.remainingLength.remainingLengthRaw);
if ((p->_packet.fixedHeader.packetType & 0xF0) == PacketType.PUBLISH) {
p->_parse = _varHeaderTopicLength1;
emc_log_i("Remaining length: %zu", p->_packet.fixedHeader.remainingLength.remainingLength);
return ParserResult::awaitData;
} else {
int32_t payloadSize = p->_packet.fixedHeader.remainingLength.remainingLength - 2; // total - packet ID
if (0 < payloadSize && payloadSize < EMC_PAYLOAD_BUFFER_SIZE) {
p->_bytePos = 0;
p->_packet.payload.data = p->_payloadBuffer;
p->_packet.payload.index = 0;
p->_packet.payload.length = payloadSize;
p->_packet.payload.total = payloadSize;
p->_parse = _varHeaderPacketId1;
emc_log_i("Remaining length: %zu", p->_packet.fixedHeader.remainingLength.remainingLength);
return ParserResult::awaitData;
} else {
emc_log_w("Invalid payload length");
}
}
p->_parse = _fixedHeader;
return ParserResult::protocolError;
}
ParserResult Parser::_remainingLengthNone(Parser* p) {
p->_packet.fixedHeader.remainingLength.remainingLength = p->_data[p->_bytesRead];
p->_parse = _fixedHeader;
if (p->_packet.fixedHeader.remainingLength.remainingLength == 0) {
emc_log_i("Remaining length: %zu", p->_packet.fixedHeader.remainingLength.remainingLength);
return ParserResult::packet;
}
emc_log_w("Invalid remaining length (none)");
return ParserResult::protocolError;
}
ParserResult Parser::_varHeaderConnack1(Parser* p) {
uint8_t data = p->_data[p->_bytesRead];
if (data < 2) { // session present flag: equal to 0 or 1
p->_packet.variableHeader.fixed.connackVarHeader.sessionPresent = data;
p->_parse = _varHeaderConnack2;
return ParserResult::awaitData;
}
p->_parse = _fixedHeader;
emc_log_w("Invalid session flags");
return ParserResult::protocolError;
}
ParserResult Parser::_varHeaderConnack2(Parser* p) {
uint8_t data = p->_data[p->_bytesRead];
p->_parse = _fixedHeader;
if (data <= 5) { // connect return code max is 5
p->_packet.variableHeader.fixed.connackVarHeader.returnCode = data;
emc_log_i("Packet complete");
return ParserResult::packet;
}
emc_log_w("Invalid connack return code");
return ParserResult::protocolError;
}
ParserResult Parser::_varHeaderPacketId1(Parser* p) {
p->_packet.variableHeader.fixed.packetId |= p->_data[p->_bytesRead] << 8;
p->_parse = _varHeaderPacketId2;
return ParserResult::awaitData;
}
ParserResult Parser::_varHeaderPacketId2(Parser* p) {
p->_packet.variableHeader.fixed.packetId |= p->_data[p->_bytesRead];
p->_parse = _fixedHeader;
if (p->_packet.variableHeader.fixed.packetId != 0) {
emc_log_i("Packet variable header complete");
if ((p->_packet.fixedHeader.packetType & 0xF0) == PacketType.SUBACK) {
p->_parse = _payloadSuback;
return ParserResult::awaitData;
} else if ((p->_packet.fixedHeader.packetType & 0xF0) == PacketType.PUBLISH) {
p->_packet.payload.total -= 2; // substract packet id length from payload
if (p->_packet.payload.total == 0) {
p->_parse = _fixedHeader;
return ParserResult::packet;
} else {
p->_parse = _payloadPublish;
}
return ParserResult::awaitData;
} else {
return ParserResult::packet;
}
} else {
emc_log_w("Invalid packet id");
return ParserResult::protocolError;
}
}
ParserResult Parser::_varHeaderTopicLength1(Parser* p) {
p->_packet.variableHeader.topicLength = p->_data[p->_bytesRead] << 8;
p->_parse = _varHeaderTopicLength2;
return ParserResult::awaitData;
}
ParserResult Parser::_varHeaderTopicLength2(Parser* p) {
p->_packet.variableHeader.topicLength |= p->_data[p->_bytesRead];
size_t maxTopicLength =
p->_packet.fixedHeader.remainingLength.remainingLength
- 2 // topic length bytes
- ((p->_packet.fixedHeader.packetType & (HeaderFlag.PUBLISH_QOS1 | HeaderFlag.PUBLISH_QOS2)) ? 2 : 0);
if (p->_packet.variableHeader.topicLength <= maxTopicLength) {
p->_parse = _varHeaderTopic;
p->_bytePos = 0;
p->_packet.payload.total = p->_packet.fixedHeader.remainingLength.remainingLength - 2 - p->_packet.variableHeader.topicLength;
return ParserResult::awaitData;
}
emc_log_w("Invalid topic length: %u > %zu", p->_packet.variableHeader.topicLength, maxTopicLength);
p->_parse = _fixedHeader;
return ParserResult::protocolError;
}
ParserResult Parser::_varHeaderTopic(Parser* p) {
// no checking for character [MQTT-3.3.2-1] [MQTT-3.3.2-2]
p->_packet.variableHeader.topic[p->_bytePos] = static_cast<char>(p->_data[p->_bytesRead]);
p->_bytePos++;
if (p->_bytePos == p->_packet.variableHeader.topicLength || p->_bytePos == EMC_MAX_TOPIC_LENGTH) {
p->_packet.variableHeader.topic[p->_bytePos] = 0x00; // add c-string delimiter
emc_log_i("Packet variable header topic complete");
if (p->_packet.fixedHeader.packetType & (HeaderFlag.PUBLISH_QOS1 | HeaderFlag.PUBLISH_QOS2)) {
p->_parse = _varHeaderPacketId1;
} else if (p->_packet.payload.total == 0) {
p->_parse = _fixedHeader;
return ParserResult::packet;
} else {
p->_parse = _payloadPublish;
}
}
return ParserResult::awaitData;
}
ParserResult Parser::_payloadSuback(Parser* p) {
uint8_t data = p->_data[p->_bytesRead];
if (data < 0x03 || data == 0x80) {
p->_payloadBuffer[p->_bytePos] = data;
p->_bytePos++;
} else {
p->_parse = _fixedHeader;
emc_log_w("Invalid suback return code");
return ParserResult::protocolError;
}
if (p->_bytePos == p->_packet.payload.total) {
p->_parse = _fixedHeader;
emc_log_i("Packet complete");
return ParserResult::packet;
}
return ParserResult::awaitData;
}
ParserResult Parser::_payloadPublish(Parser* p) {
p->_packet.payload.index += p->_packet.payload.length;
p->_packet.payload.data = &p->_data[p->_bytesRead];
emc_log_i("payload: index %zu, total %zu, avail %zu/%zu", p->_packet.payload.index, p->_packet.payload.total, p->_len - p->_bytesRead, p->_len);
p->_packet.payload.length = std::min(p->_len - p->_bytesRead, p->_packet.payload.total - p->_packet.payload.index);
p->_bytesRead += p->_packet.payload.length - 1; // compensate for increment in _parse-loop
if (p->_packet.payload.index + p->_packet.payload.length == p->_packet.payload.total) {
p->_parse = _fixedHeader;
}
return ParserResult::packet;
}
} // end namespace espMqttClientInternals

View File

@@ -0,0 +1,100 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <algorithm>
#include "../Config.h"
#include "Constants.h"
#include "../Logging.h"
#include "RemainingLength.h"
namespace espMqttClientInternals {
struct IncomingPacket {
struct __attribute__((__packed__)) {
MQTTPacketType packetType;
union {
size_t remainingLength;
uint8_t remainingLengthRaw[4];
} remainingLength;
} fixedHeader;
struct __attribute__((__packed__)) {
uint16_t topicLength;
char topic[EMC_MAX_TOPIC_LENGTH + 1]; // + 1 for c-string delimiter
union {
struct {
uint8_t sessionPresent;
uint8_t returnCode;
} connackVarHeader;
uint16_t packetId;
} fixed;
} variableHeader;
struct {
const uint8_t* data;
size_t length;
size_t index;
size_t total;
} payload;
uint8_t qos() const;
bool retain() const;
bool dup() const;
void reset();
};
enum class ParserResult : uint8_t {
awaitData,
packet,
protocolError
};
class Parser;
typedef ParserResult(*ParserFunc)(Parser*);
class Parser {
public:
Parser();
ParserResult parse(const uint8_t* data, size_t len, size_t* bytesRead);
const IncomingPacket& getPacket() const;
void reset();
private:
// keep data variables in class to avoid copying on every iteration of the parser
const uint8_t* _data;
size_t _len;
size_t _bytesRead;
size_t _bytePos;
ParserFunc _parse;
IncomingPacket _packet;
uint8_t _payloadBuffer[EMC_PAYLOAD_BUFFER_SIZE];
static ParserResult _fixedHeader(Parser* p);
static ParserResult _remainingLengthFixed(Parser* p);
static ParserResult _remainingLengthNone(Parser* p);
static ParserResult _remainingLengthVariable(Parser* p);
static ParserResult _varHeaderConnack1(Parser* p);
static ParserResult _varHeaderConnack2(Parser* p);
static ParserResult _varHeaderPacketId1(Parser* p);
static ParserResult _varHeaderPacketId2(Parser* p);
static ParserResult _varHeaderTopicLength1(Parser* p);
static ParserResult _varHeaderTopicLength2(Parser* p);
static ParserResult _varHeaderTopic(Parser* p);
static ParserResult _payloadSuback(Parser* p);
static ParserResult _payloadPublish(Parser* p);
};
} // end namespace espMqttClientInternals

View File

@@ -0,0 +1,57 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#include "RemainingLength.h"
namespace espMqttClientInternals {
int32_t decodeRemainingLength(const uint8_t* stream) {
uint32_t multiplier = 1;
int32_t remainingLength = 0;
uint8_t currentByte = 0;
uint8_t encodedByte;
do {
encodedByte = stream[currentByte++];
remainingLength += (encodedByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128) {
emc_log_e("Malformed Remaining Length");
return -1;
}
multiplier *= 128;
} while ((encodedByte & 128) != 0);
return remainingLength;
}
uint8_t remainingLengthLength(uint32_t remainingLength) {
if (remainingLength < 128) return 1;
if (remainingLength < 16384) return 2;
if (remainingLength < 2097152) return 3;
if (remainingLength > 268435455) return 0;
return 4;
}
uint8_t encodeRemainingLength(uint32_t remainingLength, uint8_t* destination) {
uint8_t currentByte = 0;
uint8_t bytesNeeded = 0;
do {
uint8_t encodedByte = remainingLength % 128;
remainingLength /= 128;
if (remainingLength > 0) {
encodedByte = encodedByte | 128;
}
destination[currentByte++] = encodedByte;
bytesNeeded++;
} while (remainingLength > 0);
return bytesNeeded;
}
} // namespace espMqttClientInternals

View File

@@ -0,0 +1,32 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#pragma once
#include <stdint.h>
#include "../Logging.h"
namespace espMqttClientInternals {
// Calculations are based on non normative comment in section 2.2.3 Remaining Length of the MQTT specification
// returns decoded length based on input stream
// stream is expected to contain full encoded remaining length
// return -1 on error.
int32_t decodeRemainingLength(const uint8_t* stream);
// returns the number of bytes needed to encode the remaining length
uint8_t remainingLengthLength(uint32_t remainingLength);
// encodes the given remaining length to destination and returns number of bytes used
// destination is expected to be large enough to hold the number of bytes needed
uint8_t encodeRemainingLength(uint32_t remainingLength, uint8_t* destination);
} // namespace espMqttClientInternals

View File

@@ -0,0 +1,26 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#include "String.h"
namespace espMqttClientInternals {
size_t encodeString(const char* source, uint8_t* dest) {
size_t length = strlen(source);
if (length > 65535) {
emc_log_e("String length error");
return 0;
}
dest[0] = static_cast<uint16_t>(length) >> 8;
dest[1] = static_cast<uint16_t>(length) & 0xFF;
memcpy(&dest[2], source, length);
return 2 + length;
}
} // namespace espMqttClientInternals

View File

@@ -0,0 +1,22 @@
/*
Copyright (c) 2022 Bert Melis. All rights reserved.
This work is licensed under the terms of the MIT license.
For a copy, see <https://opensource.org/licenses/MIT> or
the LICENSE file.
*/
#pragma once
#include <stdint.h>
#include <cstring> // memcpy
#include "../Logging.h"
namespace espMqttClientInternals {
// encodes the given source string into destination and returns number of bytes used
// destination is expected to be large enough to hold the number of bytes needed
size_t encodeString(const char* source, uint8_t* dest);
} // namespace espMqttClientInternals