update espMqttClient

This commit is contained in:
MichaelDvP
2023-08-31 11:50:31 +02:00
parent ae1bf1cbfb
commit 005463c41f
6 changed files with 37 additions and 33 deletions

View File

@@ -103,7 +103,7 @@ bool MqttClient::disconnected() const {
}
bool MqttClient::connect() {
bool result = true;
bool result = false;
if (_state == State::disconnected) {
EMC_SEMAPHORE_TAKE();
if (_addPacketFront(_cleanSession,
@@ -116,19 +116,21 @@ bool MqttClient::connect() {
_willPayloadLength,
(uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally
_clientId)) {
result = true;
_state = State::connectingTcp1;
#if defined(ARDUINO_ARCH_ESP32)
if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
vTaskResume(_taskHandle);
}
#endif
_state = State::connectingTcp1;
} else {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
result = false;
}
EMC_SEMAPHORE_GIVE();
} else if (_state <= State::connected) { // already connected or connecting
result = true;
}
return result;
}
@@ -196,6 +198,14 @@ const char * MqttClient::getClientId() const {
return _clientId;
}
size_t MqttClient::queueSize() {
size_t ret = 0;
EMC_SEMAPHORE_TAKE();
ret = _outbox.size();
EMC_SEMAPHORE_GIVE();
return ret;
}
void MqttClient::loop() {
switch ((State)_state) { // modified by proddy for EMS-ESP compiling standalone
case State::disconnected:
@@ -335,7 +345,6 @@ int MqttClient::_sendPacket() {
size_t wantToWrite = 0;
size_t written = 0;
if (packet && (wantToWrite == written)) {
// mixing signed with unsigned here but safe because of MQTT packet size limits
wantToWrite = packet->packet.available(_bytesSent);
if (wantToWrite == 0) {
EMC_SEMAPHORE_GIVE();
@@ -630,9 +639,6 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return
if ((it.get()->packet.packetType()) == PacketType.PUBREL) {
if (it.get()->packet.packetId() == idToMatch) {
// if (!_addPacket(PacketType.PUBCOMP, idToMatch)) {
// emc_log_e("Could not create PUBCOMP packet");
// }
callback = true;
_outbox.remove(it);
break;
@@ -697,20 +703,6 @@ void MqttClient::_onUnsuback() {
}
}
uint16_t MqttClient::getQueue() const {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<OutgoingPacket>::Iterator it = _outbox.front();
uint16_t count = 0;
while (it) {
// if (it.get()->packet.packetType() == PacketType.PUBLISH) {
++count;
// }
++it;
}
EMC_SEMAPHORE_GIVE();
return count;
}
void MqttClient::_clearQueue(int clearData) {
emc_log_i("clearing queue (clear session: %d)", clearData);
EMC_SEMAPHORE_TAKE();

View File

@@ -65,7 +65,7 @@ class MqttClient {
uint16_t publish(const char * topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true!
const char * getClientId() const;
uint16_t getQueue() const;
size_t queueSize(); // No const because of mutex
void loop();
protected:
@@ -131,9 +131,11 @@ class MqttClient {
uint32_t timeSent;
espMqttClientInternals::Packet packet;
template <typename... Args>
OutgoingPacket(uint32_t t, espMqttClientTypes::Error& error, Args&&... args) : // NOLINT(runtime/references)
timeSent(t),
packet(error, std::forward<Args>(args)...) {}
OutgoingPacket(uint32_t t, espMqttClientTypes::Error & error, Args &&... args)
: // NOLINT(runtime/references)
timeSent(t)
, packet(error, std::forward<Args>(args)...) {
}
};
espMqttClientInternals::Outbox<OutgoingPacket> _outbox;
size_t _bytesSent;

View File

@@ -163,6 +163,16 @@ class Outbox {
return false;
}
size_t size() const {
Node* n = _first;
size_t count = 0;
while (n) {
n = n->next;
++count;
}
return count;
}
private:
Node* _first;
Node* _last;

View File

@@ -100,7 +100,7 @@ Packet::Packet(espMqttClientTypes::Error& error,
(password ? 2 + strlen(password) : 0);
// allocate memory
if (!_allocate(remainingLength)) {
if (!_allocate(remainingLength, false)) {
error = espMqttClientTypes::Error::OUT_OF_MEMORY;
return;
}
@@ -300,8 +300,8 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type)
}
bool Packet::_allocate(size_t remainingLength) {
if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
bool Packet::_allocate(size_t remainingLength, bool check) {
if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) {
emc_log_w("Packet buffer not allocated: low memory");
return false;
}

View File

@@ -133,7 +133,7 @@ class Packet {
private:
// pass remainingLength = total size - header - remainingLengthLength!
bool _allocate(size_t remainingLength);
bool _allocate(size_t remainingLength, bool check = true);
// fills header and returns index of next available byte in buffer
size_t _fillPublishHeader(uint16_t packetId,

View File

@@ -124,7 +124,7 @@ void Mqtt::resubscribe() {
// Main MQTT loop - sends out top item on publish queue
void Mqtt::loop() {
queuecount_ = mqttClient_->getQueue();
queuecount_ = mqttClient_->queueSize();
// exit if MQTT is not enabled or if there is no network connection
if (!connected()) {
@@ -484,7 +484,7 @@ void Mqtt::on_connect() {
connecting_ = true;
connectcount_++; // count # reconnects. not currently used.
queuecount_ = mqttClient_->getQueue();
queuecount_ = mqttClient_->queueSize();
load_settings(); // reload MQTT settings - in case they have changes
@@ -606,9 +606,9 @@ bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
char fulltopic[MQTT_TOPIC_MAX_SIZE];
if (topic.find(discovery_prefix_) == 0) {
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave discovery topic as it is
} else {
// it's a discovery topic, added the mqtt base to the topic path
// it's not a discovery topic, added the mqtt base to the topic path
snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base
}