diff --git a/CHANGELOG_LATEST.md b/CHANGELOG_LATEST.md index 944616eab..90e1e5577 100644 --- a/CHANGELOG_LATEST.md +++ b/CHANGELOG_LATEST.md @@ -23,6 +23,7 @@ - Manually Controlling Solar Circuit [#107](https://github.com/emsesp/EMS-ESP32/issues/107) - Fix thermostat commands not defaulting to the master thermostat [#110](https://github.com/emsesp/EMS-ESP32/issues/110) - Enlarge parse-buffer for long names like `cylinderpumpmodulation` +- MQTT not subscribing to all device entities [#166](https://github.com/emsesp/EMS-ESP32/issues/166) ## Changed diff --git a/src/mqtt.cpp b/src/mqtt.cpp index e131fd8b9..a8f6620ab 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -55,7 +55,7 @@ uuid::log::Logger Mqtt::logger_{F_(mqtt), uuid::log::Facility::DAEMON}; // subscribe to an MQTT topic, and store the associated callback function // only if it already hasn't been added void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_sub_function_p cb) { - // check if we already have the topic subscribed, if so don't add it again + // check if we already have the topic subscribed for this specific device type, if so don't add it again if (!mqtt_subfunctions_.empty()) { for (auto & mqtt_subfunction : mqtt_subfunctions_) { if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) { @@ -69,58 +69,55 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_ } // register in our libary with the callback function. - // We store the original topic without base + // We store the original topic string without base mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb)); if (!enabled()) { return; } - LOG_DEBUG(F("Subscribing MQTT topic %s for device type %s"), topic.c_str(), EMSdevice::device_type_2_device_name(device_type).c_str()); - // add to MQTT queue as a subscribe operation queue_subscribe_message(topic); } // subscribe to the command topic if it doesn't exist yet -void Mqtt::register_command(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cb, uint8_t flags) { - std::string cmd_topic = EMSdevice::device_type_2_device_name(device_type); // thermostat, boiler, etc... +void Mqtt::sub_command(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cb, uint8_t flags) { + if (!mqtt_enabled_) { + return; + } + + std::string topic = EMSdevice::device_type_2_device_name(device_type); // thermostat, boiler, etc... // see if we have already a handler for the device type (boiler, thermostat). If not add it bool exists = false; if (!mqtt_subfunctions_.empty()) { for (const auto & mqtt_subfunction : mqtt_subfunctions_) { - if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), cmd_topic.c_str()) == 0)) { + if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) { exists = true; } } } if (!exists) { - Mqtt::subscribe(device_type, cmd_topic, nullptr); // use an empty function handler to signal this is a command function only (e.g. ems-esp/boiler) - LOG_DEBUG(F("Registering MQTT cmd %s with topic %s"), uuid::read_flash_string(cmd).c_str(), EMSdevice::device_type_2_device_name(device_type).c_str()); + Mqtt::subscribe(device_type, topic, nullptr); // use an empty function handler to signal this is a command function only (e.g. ems-esp/boiler) } - if (!enabled()) { - return; - } - - // register the individual commands too (e.g. ems-esp/boiler/wwonetime) + // add the individual commands too (e.g. ems-esp/boiler/wwonetime) // https://github.com/emsesp/EMS-ESP32/issues/31 if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && ((flags & CommandFlag::MQTT_SUB_FLAG_HC) == CommandFlag::MQTT_SUB_FLAG_HC)) { - std::string topic(MQTT_TOPIC_MAX_SIZE, '\0'); - topic = cmd_topic + "/hc1/" + uuid::read_flash_string(cmd); - queue_subscribe_message(topic); - topic = cmd_topic + "/hc2/" + uuid::read_flash_string(cmd); - queue_subscribe_message(topic); - topic = cmd_topic + "/hc3/" + uuid::read_flash_string(cmd); - queue_subscribe_message(topic); - topic = cmd_topic + "/hc4/" + uuid::read_flash_string(cmd); - queue_subscribe_message(topic); - } else if (subscribe_format_ != Subscribe_Format::GENERAL && ((flags & CommandFlag::MQTT_SUB_FLAG_NOSUB) == CommandFlag::MQTT_SUB_FLAG_NOSUB)) { - std::string topic(MQTT_TOPIC_MAX_SIZE, '\0'); - topic = cmd_topic + "/" + uuid::read_flash_string(cmd); - queue_subscribe_message(topic); + std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0'); + hc_topic = topic + "/hc1/" + uuid::read_flash_string(cmd); + queue_subscribe_message(hc_topic); + hc_topic = topic + "/hc2/" + uuid::read_flash_string(cmd); + queue_subscribe_message(hc_topic); + hc_topic = topic + "/hc3/" + uuid::read_flash_string(cmd); + queue_subscribe_message(hc_topic); + hc_topic = topic + "/hc4/" + uuid::read_flash_string(cmd); + queue_subscribe_message(hc_topic); + } else if (subscribe_format_ != Subscribe_Format::GENERAL && ((flags & CommandFlag::MQTT_SUB_FLAG_NOSUB) != CommandFlag::MQTT_SUB_FLAG_NOSUB)) { + std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0'); + hc_topic = topic + "/" + uuid::read_flash_string(cmd); + queue_subscribe_message(hc_topic); } } @@ -137,8 +134,16 @@ void Mqtt::resubscribe() { } for (const auto & mqtt_subfunction : mqtt_subfunctions_) { - queue_subscribe_message(mqtt_subfunction.topic_); + // if it's already in the queue, ignore it + bool found = false; + for (const auto & message : mqtt_messages_) { + found |= ((message.content_->operation == Operation::SUBSCRIBE) && (mqtt_subfunction.topic_ == message.content_->topic)); + } + if (!found) { + queue_subscribe_message(mqtt_subfunction.topic_); + } } + for (const auto & cf : Command::commands()) { std::string topic(MQTT_TOPIC_MAX_SIZE, '\0'); if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && cf.has_flags(CommandFlag::MQTT_SUB_FLAG_HC)) { @@ -568,8 +573,8 @@ void Mqtt::start() { on_publish(packetId); }); - // create space for command buffer, to avoid heap memory fragmentation - mqtt_subfunctions_.reserve(5); + // register command + Command::add(EMSdevice::DeviceType::SYSTEM, F_(publish), System::command_publish, F("forces a MQTT publish"), CommandFlag::ADMIN_ONLY); } void Mqtt::set_publish_time_boiler(uint16_t publish_time) { @@ -746,7 +751,13 @@ std::shared_ptr Mqtt::queue_message(const uint8_t operation, std::shared_ptr message; message = std::make_shared(operation, topic, payload, retain); - // LOG_INFO("Added to queue: %s %s", message->topic.c_str(), message->payload.c_str()); // debugging only +#ifdef EMSESP_DEBUG + if (operation == Operation::PUBLISH) { + LOG_INFO("Adding to queue: (Publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str()); + } else { + LOG_INFO("Adding to queue: (Subscribe) topic='%s'", message->topic.c_str()); + } +#endif // if the queue is full, make room but removing the last one if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) { @@ -865,10 +876,10 @@ void Mqtt::process_queue() { // if we're subscribing... if (message->operation == Operation::SUBSCRIBE) { - LOG_DEBUG(F("Subscribing to topic: %s"), topic); + LOG_DEBUG(F("Subscribing to topic '%s'"), topic); uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_); if (!packet_id) { - LOG_DEBUG(F("Error subscribing to %s"), topic); + LOG_DEBUG(F("Error subscribing to topic '%s'"), topic); } mqtt_messages_.pop_front(); // remove the message from the queue diff --git a/src/test/test.cpp b/src/test/test.cpp index 2ef6e7c9d..abbe90e31 100644 --- a/src/test/test.cpp +++ b/src/test/test.cpp @@ -473,15 +473,16 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd) { if (command == "mqtt_individual") { shell.printfln(F("Testing individual MQTT")); - Mqtt::ha_enabled(false); // turn off HA Discovery to stop the chatter - Mqtt::subscribe_format(2); // individual topics, all HC - // Mqtt::subscribe_format(1); // individual topics, only main HC + Mqtt::ha_enabled(false); // turn off HA Discovery to stop the chatter + Mqtt::nested_format(1); + // Mqtt::subscribe_format(2); // individual topics, all HC + Mqtt::subscribe_format(1); // individual topics, only main HC run_test("boiler"); run_test("thermostat"); - shell.invoke_command("show mqtt"); - EMSESP::mqtt_.incoming("ems-esp/boiler/wwcircpump", "off"); + // shell.invoke_command("show mqtt"); + // EMSESP::mqtt_.incoming("ems-esp/boiler/wwcircpump", "off"); } if (command == "mqtt_nested") { diff --git a/src/version.h b/src/version.h index ea04ed793..d867dc969 100644 --- a/src/version.h +++ b/src/version.h @@ -1 +1 @@ -#define EMSESP_APP_VERSION "3.2.2b13" +#define EMSESP_APP_VERSION "3.2.2b14"