From 005463c41fef790b832a0b53f9de639949d3ea0d Mon Sep 17 00:00:00 2001 From: MichaelDvP Date: Thu, 31 Aug 2023 11:50:31 +0200 Subject: [PATCH] update espMqttClient --- lib/espMqttClient/src/MqttClient.cpp | 34 +++++++++--------------- lib/espMqttClient/src/MqttClient.h | 10 ++++--- lib/espMqttClient/src/Outbox.h | 10 +++++++ lib/espMqttClient/src/Packets/Packet.cpp | 6 ++--- lib/espMqttClient/src/Packets/Packet.h | 2 +- src/mqtt.cpp | 8 +++--- 6 files changed, 37 insertions(+), 33 deletions(-) diff --git a/lib/espMqttClient/src/MqttClient.cpp b/lib/espMqttClient/src/MqttClient.cpp index a428158ed..97db0dcce 100644 --- a/lib/espMqttClient/src/MqttClient.cpp +++ b/lib/espMqttClient/src/MqttClient.cpp @@ -103,7 +103,7 @@ bool MqttClient::disconnected() const { } bool MqttClient::connect() { - bool result = true; + bool result = false; if (_state == State::disconnected) { EMC_SEMAPHORE_TAKE(); if (_addPacketFront(_cleanSession, @@ -116,19 +116,21 @@ bool MqttClient::connect() { _willPayloadLength, (uint16_t)(_keepAlive / 1000), // 32b to 16b doesn't overflow because it comes from 16b orignally _clientId)) { + result = true; + _state = State::connectingTcp1; #if defined(ARDUINO_ARCH_ESP32) if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) { vTaskResume(_taskHandle); } #endif - _state = State::connectingTcp1; } else { EMC_SEMAPHORE_GIVE(); emc_log_e("Could not create CONNECT packet"); _onError(0, Error::OUT_OF_MEMORY); - result = false; } EMC_SEMAPHORE_GIVE(); + } else if (_state <= State::connected) { // already connected or connecting + result = true; } return result; } @@ -196,6 +198,14 @@ const char * MqttClient::getClientId() const { return _clientId; } +size_t MqttClient::queueSize() { + size_t ret = 0; + EMC_SEMAPHORE_TAKE(); + ret = _outbox.size(); + EMC_SEMAPHORE_GIVE(); + return ret; +} + void MqttClient::loop() { switch ((State)_state) { // modified by proddy for EMS-ESP compiling standalone case State::disconnected: @@ -335,7 +345,6 @@ int MqttClient::_sendPacket() { size_t wantToWrite = 0; size_t written = 0; if (packet && (wantToWrite == written)) { - // mixing signed with unsigned here but safe because of MQTT packet size limits wantToWrite = packet->packet.available(_bytesSent); if (wantToWrite == 0) { EMC_SEMAPHORE_GIVE(); @@ -630,9 +639,6 @@ void MqttClient::_onPubcomp() { // if it doesn't match the ID, return if ((it.get()->packet.packetType()) == PacketType.PUBREL) { if (it.get()->packet.packetId() == idToMatch) { - // if (!_addPacket(PacketType.PUBCOMP, idToMatch)) { - // emc_log_e("Could not create PUBCOMP packet"); - // } callback = true; _outbox.remove(it); break; @@ -697,20 +703,6 @@ void MqttClient::_onUnsuback() { } } -uint16_t MqttClient::getQueue() const { - EMC_SEMAPHORE_TAKE(); - espMqttClientInternals::Outbox::Iterator it = _outbox.front(); - uint16_t count = 0; - while (it) { - // if (it.get()->packet.packetType() == PacketType.PUBLISH) { - ++count; - // } - ++it; - } - EMC_SEMAPHORE_GIVE(); - return count; -} - void MqttClient::_clearQueue(int clearData) { emc_log_i("clearing queue (clear session: %d)", clearData); EMC_SEMAPHORE_TAKE(); diff --git a/lib/espMqttClient/src/MqttClient.h b/lib/espMqttClient/src/MqttClient.h index 6cf8d0664..dba4bf245 100644 --- a/lib/espMqttClient/src/MqttClient.h +++ b/lib/espMqttClient/src/MqttClient.h @@ -65,7 +65,7 @@ class MqttClient { uint16_t publish(const char * topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length); void clearQueue(bool deleteSessionData = false); // Not MQTT compliant and may cause unpredictable results when `deleteSessionData` = true! const char * getClientId() const; - uint16_t getQueue() const; + size_t queueSize(); // No const because of mutex void loop(); protected: @@ -131,9 +131,11 @@ class MqttClient { uint32_t timeSent; espMqttClientInternals::Packet packet; template - OutgoingPacket(uint32_t t, espMqttClientTypes::Error& error, Args&&... args) : // NOLINT(runtime/references) - timeSent(t), - packet(error, std::forward(args)...) {} + OutgoingPacket(uint32_t t, espMqttClientTypes::Error & error, Args &&... args) + : // NOLINT(runtime/references) + timeSent(t) + , packet(error, std::forward(args)...) { + } }; espMqttClientInternals::Outbox _outbox; size_t _bytesSent; diff --git a/lib/espMqttClient/src/Outbox.h b/lib/espMqttClient/src/Outbox.h index dfbbd13c0..cfb9f244d 100644 --- a/lib/espMqttClient/src/Outbox.h +++ b/lib/espMqttClient/src/Outbox.h @@ -163,6 +163,16 @@ class Outbox { return false; } + size_t size() const { + Node* n = _first; + size_t count = 0; + while (n) { + n = n->next; + ++count; + } + return count; + } + private: Node* _first; Node* _last; diff --git a/lib/espMqttClient/src/Packets/Packet.cpp b/lib/espMqttClient/src/Packets/Packet.cpp index df463ef7b..2f84b503a 100644 --- a/lib/espMqttClient/src/Packets/Packet.cpp +++ b/lib/espMqttClient/src/Packets/Packet.cpp @@ -100,7 +100,7 @@ Packet::Packet(espMqttClientTypes::Error& error, (password ? 2 + strlen(password) : 0); // allocate memory - if (!_allocate(remainingLength)) { + if (!_allocate(remainingLength, false)) { error = espMqttClientTypes::Error::OUT_OF_MEMORY; return; } @@ -300,8 +300,8 @@ Packet::Packet(espMqttClientTypes::Error& error, MQTTPacketType type) } -bool Packet::_allocate(size_t remainingLength) { - if (EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) { +bool Packet::_allocate(size_t remainingLength, bool check) { + if (check && EMC_GET_FREE_MEMORY() < EMC_MIN_FREE_MEMORY) { emc_log_w("Packet buffer not allocated: low memory"); return false; } diff --git a/lib/espMqttClient/src/Packets/Packet.h b/lib/espMqttClient/src/Packets/Packet.h index 1af2f06af..f2b290293 100644 --- a/lib/espMqttClient/src/Packets/Packet.h +++ b/lib/espMqttClient/src/Packets/Packet.h @@ -133,7 +133,7 @@ class Packet { private: // pass remainingLength = total size - header - remainingLengthLength! - bool _allocate(size_t remainingLength); + bool _allocate(size_t remainingLength, bool check = true); // fills header and returns index of next available byte in buffer size_t _fillPublishHeader(uint16_t packetId, diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 2b931be8c..d62367503 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -124,7 +124,7 @@ void Mqtt::resubscribe() { // Main MQTT loop - sends out top item on publish queue void Mqtt::loop() { - queuecount_ = mqttClient_->getQueue(); + queuecount_ = mqttClient_->queueSize(); // exit if MQTT is not enabled or if there is no network connection if (!connected()) { @@ -484,7 +484,7 @@ void Mqtt::on_connect() { connecting_ = true; connectcount_++; // count # reconnects. not currently used. - queuecount_ = mqttClient_->getQueue(); + queuecount_ = mqttClient_->queueSize(); load_settings(); // reload MQTT settings - in case they have changes @@ -606,9 +606,9 @@ bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, con char fulltopic[MQTT_TOPIC_MAX_SIZE]; if (topic.find(discovery_prefix_) == 0) { - strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is + strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave discovery topic as it is } else { - // it's a discovery topic, added the mqtt base to the topic path + // it's not a discovery topic, added the mqtt base to the topic path snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base }