From 580f3ea45c86f4a24aab3ac5d3d44dd7c46843ee Mon Sep 17 00:00:00 2001 From: proddy Date: Sat, 25 Jul 2020 18:27:37 +0200 Subject: [PATCH] optimizing mqtt --- src/mqtt.cpp | 152 +++++++++++++++++++++++++++++---------------------- src/mqtt.h | 35 ++++++------ 2 files changed, 105 insertions(+), 82 deletions(-) diff --git a/src/mqtt.cpp b/src/mqtt.cpp index a616ec4c8..ce596dc66 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -35,7 +35,7 @@ std::string Mqtt::hostname_; uint8_t Mqtt::mqtt_qos_; uint16_t Mqtt::publish_time_; -std::vector Mqtt::mqtt_functions_; +std::vector Mqtt::mqtt_subfunctions_; uint16_t Mqtt::mqtt_publish_fails_ = 0; size_t Mqtt::maximum_mqtt_messages_ = Mqtt::MAX_MQTT_MESSAGES; uint16_t Mqtt::mqtt_message_id_ = 0; @@ -51,62 +51,56 @@ Mqtt::QueuedMqttMessage::QueuedMqttMessage(uint16_t id, std::shared_ptrtopic.c_str()) == 0)) { exists = true; } } } if (!exists) { - mqtt_functions_.emplace_back(device_id, std::move(full_topic), cb); // register a call back function for a specific telegram type + mqtt_subfunctions_.emplace_back(device_id, std::move(message->topic), cb); // register a call back function for a specific telegram type } - - queue_subscribe_message(topic); // add subscription to queue } // subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device -void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) { +void Mqtt::subscribe(const std::string & topic, mqtt_subfunction_p cb) { subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP } // resubscribe to all MQTT topics again void Mqtt::resubscribe() { - if (mqtt_functions_.empty()) { + if (mqtt_subfunctions_.empty()) { return; } - for (const auto & mqtt_function : mqtt_functions_) { - queue_subscribe_message(mqtt_function.topic_); + for (const auto & mqtt_subfunction : mqtt_subfunctions_) { + queue_message(Operation::SUBSCRIBE, mqtt_subfunction.topic_, "", false, true); // no payload, no topic prefixing } } @@ -146,8 +140,8 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { // show subscriptions shell.printfln(F("MQTT subscriptions:")); - for (const auto & mqtt_function : mqtt_functions_) { - shell.printfln(F(" %s"), mqtt_function.topic_.c_str()); + for (const auto & mqtt_subfunction : mqtt_subfunctions_) { + shell.printfln(F(" %s"), mqtt_subfunction.topic_.c_str()); } shell.println(); @@ -216,9 +210,9 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) { // see if we have this topic in our subscription list, then call its callback handler // note: this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail. Not sure if this is going to be an issue? - for (const auto & mf : mqtt_functions_) { + for (const auto & mf : mqtt_subfunctions_) { if (strcmp(topic, mf.topic_.c_str()) == 0) { - (mf.mqtt_function_)(message); + (mf.mqtt_subfunction_)(message); return; } } @@ -229,15 +223,17 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) { // print all the topics related to a specific device_id void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id) { - if (std::count_if(mqtt_functions_.cbegin(), mqtt_functions_.cend(), [=](MQTTFunction const & mqtt_function) { return device_id == mqtt_function.device_id_; }) + if (std::count_if(mqtt_subfunctions_.cbegin(), + mqtt_subfunctions_.cend(), + [=](MQTTSubFunction const & mqtt_subfunction) { return device_id == mqtt_subfunction.device_id_; }) == 0) { return; } shell.print(F(" Subscribed MQTT topics: ")); - for (const auto & mqtt_function : mqtt_functions_) { - if (mqtt_function.device_id_ == device_id) { - shell.printf(F("%s "), mqtt_function.topic_.c_str()); + for (const auto & mqtt_subfunction : mqtt_subfunctions_) { + if (mqtt_subfunction.device_id_ == device_id) { + shell.printf(F("%s "), mqtt_subfunction.topic_.c_str()); } } shell.println(); @@ -285,7 +281,6 @@ char * Mqtt::make_topic(char * result, const std::string & topic) { } void Mqtt::start() { - mqttClient_ = EMSESP::esp8266React.getMqttClient(); // get the hostname, which we'll use to prefix to all topics @@ -298,15 +293,18 @@ void Mqtt::start() { }); mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); }); - mqttClient_->setWill(make_topic(will_topic_, "status"), 1, true, "offline"); // with qos 1, retain true + + // create will_topic with the hostname prefixed. It has to be static because asyncmqttclient destroys the reference + static char will_topic[MQTT_TOPIC_MAX_SIZE]; + strlcpy(will_topic, hostname_.c_str(), MQTT_TOPIC_MAX_SIZE); + strlcat(will_topic, "/", MQTT_TOPIC_MAX_SIZE); + strlcat(will_topic, "status", MQTT_TOPIC_MAX_SIZE); + mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true + mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { on_message(topic, payload, len); mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); }); }); - - // add the system MQTT subscriptions - Mqtt::subscribe("cmd", System::mqtt_commands); - // Mqtt::subscribe("cmd", std::bind(&System::mqtt_commands, this, std::placeholders::_1)); } void Mqtt::set_publish_time(uint16_t publish_time) { @@ -334,47 +332,50 @@ void Mqtt::on_connect() { resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics + // add the system MQTT subscriptions, only if its a fresh start with no previous subscriptions + if (mqtt_subfunctions_.empty()) { + Mqtt::subscribe("cmd", System::mqtt_commands); + } + LOG_INFO(F("MQTT connected")); } -// add MQTT message to queue, payload is a string -void Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) { - // can't have bogus topics, but empty payloads are ok +// add sub or pub task to the queue. When the message is created, the topic will have +// automatically the hostname prefixed. +std::shared_ptr +Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix) { if (topic.empty()) { - return; + return nullptr; } - // prefix the hostname to the topic - char full_topic[MQTT_TOPIC_MAX_SIZE]; - make_topic(full_topic, topic); - - auto message = std::make_shared(Operation::PUBLISH, full_topic, payload, retain); + // take the topic and prefix the hostname, unless its for HA + std::shared_ptr message; + if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0) || no_prefix) { + // leave topic as it is + message = std::make_shared(operation, topic, payload, retain); + } else { + // prefix the hostname + std::string full_topic = Mqtt::hostname_ + "/" + topic; + message = std::make_shared(operation, full_topic, payload, retain); + } // if the queue is full, make room but removing the last one if (mqtt_messages_.size() >= maximum_mqtt_messages_) { mqtt_messages_.pop_front(); } - mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message)); + + return mqtt_messages_.back().content_; // this is because the message has been moved +} + +// add MQTT message to queue, payload is a string +std::shared_ptr Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) { + return queue_message(Operation::PUBLISH, topic, payload, retain); } // add MQTT subscribe message to queue -void Mqtt::queue_subscribe_message(const std::string & topic) { - if (topic.empty()) { - return; - } - - auto message = std::make_shared(Operation::SUBSCRIBE, topic, "", false); -#ifdef DEBUG - LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str()); -#endif - - // if the queue is full, make room but removing the last one - if (mqtt_messages_.size() >= maximum_mqtt_messages_) { - mqtt_messages_.pop_front(); - } - - mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message)); +std::shared_ptr Mqtt::queue_subscribe_message(const std::string & topic) { + return queue_message(Operation::SUBSCRIBE, topic, "", false); // no payload } // MQTT Publish, using a specific retain flag @@ -383,9 +384,8 @@ void Mqtt::publish(const std::string & topic, const std::string & payload, bool } void Mqtt::publish(const std::string & topic, const JsonDocument & payload, bool retain) { - // convert json to string std::string payload_text; - serializeJson(payload, payload_text); + serializeJson(payload, payload_text); // convert json to string queue_publish_message(topic, payload_text, retain); } @@ -413,6 +413,26 @@ void Mqtt::process_queue() { return; } + // show queue - Debug only + /* + Serial.printf("MQTT queue:\n\r"); + for (const auto & message : mqtt_messages_) { + auto content = message.content_; + if (content->operation == Operation::PUBLISH) { + // Publish messages + Serial.printf(" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)\n\r", + message.id_, + content->topic.c_str(), + content->payload.c_str(), + message.packet_id_, + message.retry_count_); + } else { + // Subscribe messages + Serial.printf(" [%02d] (Sub) topic=%s\n\r", message.id_, content->topic.c_str()); + } + } + */ + // fetch first from queue and create the full topic name auto mqtt_message = mqtt_messages_.front(); auto message = mqtt_message.content_; diff --git a/src/mqtt.h b/src/mqtt.h index 992286d12..c9d233c76 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -43,11 +43,11 @@ using uuid::console::Shell; namespace emsesp { -using mqtt_function_p = std::function; +using mqtt_subfunction_p = std::function; using namespace std::placeholders; // for `_1` struct MqttMessage { - MqttMessage(uint8_t operation, const std::string & topic, const std::string & payload, bool retain); + MqttMessage(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain); ~MqttMessage() = default; const uint8_t operation; @@ -68,8 +68,8 @@ class Mqtt { static constexpr uint8_t MQTT_TOPIC_MAX_SIZE = 100; - static void subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb); - static void subscribe(const std::string & topic, mqtt_function_p cb); + static void subscribe(const uint8_t device_id, const std::string & topic, mqtt_subfunction_p cb); + static void subscribe(const std::string & topic, mqtt_subfunction_p cb); static void resubscribe(); static void publish(const std::string & topic, const std::string & payload, bool retain = false); @@ -100,6 +100,8 @@ class Mqtt { mqtt_publish_fails_ = 0; } + static std::string hostname_; + private: static uuid::log::Logger logger_; @@ -125,8 +127,9 @@ class Mqtt { static constexpr uint32_t MQTT_PUBLISH_WAIT = 200; // delay between sending publishes, to account for large payloads static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing - static void queue_publish_message(const std::string & topic, const std::string & payload, const bool retain); - static void queue_subscribe_message(const std::string & topic); + static std::shared_ptr queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix = false); + static std::shared_ptr queue_publish_message(const std::string & topic, const std::string & payload, const bool retain); + static std::shared_ptr queue_subscribe_message(const std::string & topic); void on_publish(uint16_t packetId); void on_message(char * topic, char * payload, size_t len); @@ -136,25 +139,25 @@ class Mqtt { static uint16_t mqtt_publish_fails_; - class MQTTFunction { + // function handlers for MQTT subscriptions + class MQTTSubFunction { public: - MQTTFunction(uint8_t device_id, const std::string && topic, mqtt_function_p mqtt_function); - ~MQTTFunction() = default; + MQTTSubFunction(const uint8_t device_id, const std::string && topic, mqtt_subfunction_p mqtt_subfunction); + ~MQTTSubFunction() = default; - uint8_t device_id_; // which device ID owns this - std::string topic_; - mqtt_function_p mqtt_function_; + const uint8_t device_id_; // which device ID owns this + const std::string topic_; + mqtt_subfunction_p mqtt_subfunction_; }; - static std::vector mqtt_functions_; // list of mqtt subscribe callbacks for all devices + static std::vector mqtt_subfunctions_; // list of mqtt subscribe callbacks for all devices uint32_t last_mqtt_poll_ = 0; uint32_t last_publish_ = 0; // settings, copied over - static std::string hostname_; - static uint8_t mqtt_qos_; - static uint16_t publish_time_; + static uint8_t mqtt_qos_; + static uint16_t publish_time_; }; } // namespace emsesp