From c31324feed0c06c1182cff60ef9ebca7c30775f1 Mon Sep 17 00:00:00 2001 From: proddy Date: Sun, 19 Jul 2020 16:50:29 +0200 Subject: [PATCH] resubscribe to mqtt on re-connect - #421 --- src/mqtt.cpp | 76 +++++++++++++++++++++++++++++++++++++++++----------- src/mqtt.h | 7 +++-- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/mqtt.cpp b/src/mqtt.cpp index c1cd5e63b..0b603460d 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -67,15 +67,50 @@ Mqtt::MQTTFunction::MQTTFunction(uint8_t device_id, const std::string && topic, // subscribe to an MQTT topic, and store the associated callback function void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb) { - // We don't want to store the whole topic string in our lookup, just the last cmd, as this is wasteful. + /* + // We don't want to store the whole topic string in our lookup, just the last cmd, as this can take up too much memory // strip out everything until the last / - size_t found = topic.find_last_of("/"); // returns npos which is -1 - mqtt_functions_.emplace_back(device_id, std::move(topic.substr(found + 1)), cb); // register a call back function for a specific telegram type + size_t topic_pos = topic.find_last_of("/"); // returns npos which is -1 + topic_pos += 1; // skip the / + */ + + // convert the topic to it's full path, so either prefixed with the hostname unless hardcoded like 'homeassistant' + char full_topic[MQTT_TOPIC_MAX_SIZE]; + make_topic(full_topic, topic); + + // check if we already have the topic subscribed, if so don't add it again + bool exists = false; + if (!mqtt_functions_.empty()) { + for (const auto & mqtt_function : mqtt_functions_) { + if ((mqtt_function.device_id_ == device_id) && (strcmp(mqtt_function.topic_.c_str(), full_topic) == 0)) { + exists = true; + } + } + } + + if (!exists) { + mqtt_functions_.emplace_back(device_id, std::move(full_topic), cb); // register a call back function for a specific telegram type + } else { +#ifdef EMSESP_DEBUG + LOG_DEBUG(F("[DEBUG] Resubscribing to topic %s"), full_topic); +#endif + } queue_subscribe_message(topic); // add subscription to queue } -// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a device +// resubscribe to all MQTT topics again +void Mqtt::resubscribe() { + if (mqtt_functions_.empty()) { + return; + } + + for (const auto & mqtt_function : mqtt_functions_) { + queue_subscribe_message(mqtt_function.topic_); + } +} + +// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) { subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP } @@ -84,7 +119,7 @@ void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) { // Checks for connection, establishes a connection if not // sends out top item on publish queue void Mqtt::loop() { - // exit if MQTT is not enabled or ig there is no WIFI + // exit if MQTT is not enabled or if there is no WIFI #ifndef EMSESP_STANDALONE if (!connected()) { #else @@ -152,12 +187,12 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { shell.println(); } -// simulate receiving a MQTT message, only for testing +// simulate receiving a MQTT message, used only for testing void Mqtt::incoming(char * topic, char * payload) { on_message(topic, payload, strlen(payload)); } -// received MQTT message +// received an MQTT message that we subscribed too void Mqtt::on_message(char * topic, char * payload, size_t len) { if (len == 0) { return; @@ -171,14 +206,16 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) { LOG_DEBUG(F("[DEBUG] Received %s => %s (length %d)"), topic, message, len); #endif + /* // strip out everything until the last / char * topic_magnitude = strrchr(topic, '/'); if (topic_magnitude != nullptr) { topic = topic_magnitude + 1; } + */ - // Send message event to custom service - // this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail + // see if we have this topic in our subscription list, then call its callback handler + // note: this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail. Not sure if this is going to be an issue? for (const auto & mf : mqtt_functions_) { if (strcmp(topic, mf.topic_.c_str()) == 0) { (mf.mqtt_function_)(message); @@ -232,7 +269,14 @@ void Mqtt::on_publish(uint16_t packetId) { } // builds up a topic by prefixing the hostname +// unless it's hardcoded like "homeassistant" char * Mqtt::make_topic(char * result, const std::string & topic) { + // check for homesassistant + if (strncmp(topic.c_str(), "homeassistant/", 13) == 0) { + strlcpy(result, topic.c_str(), MQTT_TOPIC_MAX_SIZE); + return result; + } + strlcpy(result, hostname_.c_str(), MQTT_TOPIC_MAX_SIZE); strlcat(result, "/", MQTT_TOPIC_MAX_SIZE); strlcat(result, topic.c_str(), MQTT_TOPIC_MAX_SIZE); @@ -283,6 +327,8 @@ void Mqtt::on_connect() { reset_publish_fails(); // reset fail count to 0 + resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics + LOG_INFO(F("MQTT connected")); } @@ -310,7 +356,9 @@ void Mqtt::queue_subscribe_message(const std::string & topic) { } auto message = std::make_shared(Operation::SUBSCRIBE, topic, "", false); - // LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str()); +#ifdef DEBUG + LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str()); +#endif // if the queue is full, make room but removing the last one if (mqtt_messages_.size() >= maximum_mqtt_messages_) { @@ -349,7 +397,7 @@ void Mqtt::process_all_queue() { } } -// take top from queue and try and publish it +// take top from queue and perform the publish or subscribe action // assumes there is an MQTT connection void Mqtt::process_queue() { if (mqtt_messages_.empty()) { @@ -363,11 +411,7 @@ void Mqtt::process_queue() { // append the hostname to the topic, unless we're doing native HA which has a different format // if the topic starts with "homeassistant" we leave it untouched, otherwise append host char full_topic[MQTT_TOPIC_MAX_SIZE]; - if (strncmp(message->topic.c_str(), "homeassistant/", 13) == 0) { - strcpy(full_topic, message->topic.c_str()); - } else { - make_topic(full_topic, message->topic); - } + make_topic(full_topic, message->topic); // if we're subscribing... if (message->operation == Operation::SUBSCRIBE) { diff --git a/src/mqtt.h b/src/mqtt.h index 6fe18ef1c..56f53132b 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -69,11 +69,11 @@ class Mqtt { enum Operation { PUBLISH, SUBSCRIBE }; - static constexpr uint8_t MQTT_TOPIC_MAX_SIZE = 60; + static constexpr uint8_t MQTT_TOPIC_MAX_SIZE = 100; - // are static to be accessed from EMS devices static void subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb); static void subscribe(const std::string & topic, mqtt_function_p cb); + static void resubscribe(); static void publish(const std::string & topic, const std::string & payload, bool retain = false); static void publish(const std::string & topic, const JsonDocument & payload, bool retain = false); @@ -81,7 +81,6 @@ class Mqtt { static void publish(const std::string & topic); static void show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id); - static void show_mqtt(uuid::console::Shell & shell); static void on_connect(); @@ -161,7 +160,7 @@ class Mqtt { mqtt_function_p mqtt_function_; }; - static std::vector mqtt_functions_; // list of mqtt callbacks for all devices + static std::vector mqtt_functions_; // list of mqtt subscribe callbacks for all devices uint32_t last_mqtt_poll_ = 0; uint32_t last_publish_ = 0;