This commit is contained in:
proddy
2021-10-21 22:56:35 +02:00
parent fb7bafdb87
commit 50f6d0ab26
4 changed files with 52 additions and 39 deletions

View File

@@ -23,6 +23,7 @@
- Manually Controlling Solar Circuit [#107](https://github.com/emsesp/EMS-ESP32/issues/107)
- Fix thermostat commands not defaulting to the master thermostat [#110](https://github.com/emsesp/EMS-ESP32/issues/110)
- Enlarge parse-buffer for long names like `cylinderpumpmodulation`
- MQTT not subscribing to all device entities [#166](https://github.com/emsesp/EMS-ESP32/issues/166)
## Changed

View File

@@ -55,7 +55,7 @@ uuid::log::Logger Mqtt::logger_{F_(mqtt), uuid::log::Facility::DAEMON};
// subscribe to an MQTT topic, and store the associated callback function
// only if it already hasn't been added
void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_sub_function_p cb) {
// check if we already have the topic subscribed, if so don't add it again
// check if we already have the topic subscribed for this specific device type, if so don't add it again
if (!mqtt_subfunctions_.empty()) {
for (auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
@@ -69,58 +69,55 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_
}
// register in our libary with the callback function.
// We store the original topic without base
// We store the original topic string without base
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb));
if (!enabled()) {
return;
}
LOG_DEBUG(F("Subscribing MQTT topic %s for device type %s"), topic.c_str(), EMSdevice::device_type_2_device_name(device_type).c_str());
// add to MQTT queue as a subscribe operation
queue_subscribe_message(topic);
}
// subscribe to the command topic if it doesn't exist yet
void Mqtt::register_command(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cb, uint8_t flags) {
std::string cmd_topic = EMSdevice::device_type_2_device_name(device_type); // thermostat, boiler, etc...
void Mqtt::sub_command(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cb, uint8_t flags) {
if (!mqtt_enabled_) {
return;
}
std::string topic = EMSdevice::device_type_2_device_name(device_type); // thermostat, boiler, etc...
// see if we have already a handler for the device type (boiler, thermostat). If not add it
bool exists = false;
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), cmd_topic.c_str()) == 0)) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
exists = true;
}
}
}
if (!exists) {
Mqtt::subscribe(device_type, cmd_topic, nullptr); // use an empty function handler to signal this is a command function only (e.g. ems-esp/boiler)
LOG_DEBUG(F("Registering MQTT cmd %s with topic %s"), uuid::read_flash_string(cmd).c_str(), EMSdevice::device_type_2_device_name(device_type).c_str());
Mqtt::subscribe(device_type, topic, nullptr); // use an empty function handler to signal this is a command function only (e.g. ems-esp/boiler)
}
if (!enabled()) {
return;
}
// register the individual commands too (e.g. ems-esp/boiler/wwonetime)
// add the individual commands too (e.g. ems-esp/boiler/wwonetime)
// https://github.com/emsesp/EMS-ESP32/issues/31
if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && ((flags & CommandFlag::MQTT_SUB_FLAG_HC) == CommandFlag::MQTT_SUB_FLAG_HC)) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
topic = cmd_topic + "/hc1/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
topic = cmd_topic + "/hc2/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
topic = cmd_topic + "/hc3/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
topic = cmd_topic + "/hc4/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
} else if (subscribe_format_ != Subscribe_Format::GENERAL && ((flags & CommandFlag::MQTT_SUB_FLAG_NOSUB) == CommandFlag::MQTT_SUB_FLAG_NOSUB)) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
topic = cmd_topic + "/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0');
hc_topic = topic + "/hc1/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc2/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc3/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc4/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
} else if (subscribe_format_ != Subscribe_Format::GENERAL && ((flags & CommandFlag::MQTT_SUB_FLAG_NOSUB) != CommandFlag::MQTT_SUB_FLAG_NOSUB)) {
std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0');
hc_topic = topic + "/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
}
}
@@ -137,8 +134,16 @@ void Mqtt::resubscribe() {
}
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
// if it's already in the queue, ignore it
bool found = false;
for (const auto & message : mqtt_messages_) {
found |= ((message.content_->operation == Operation::SUBSCRIBE) && (mqtt_subfunction.topic_ == message.content_->topic));
}
if (!found) {
queue_subscribe_message(mqtt_subfunction.topic_);
}
}
for (const auto & cf : Command::commands()) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && cf.has_flags(CommandFlag::MQTT_SUB_FLAG_HC)) {
@@ -568,8 +573,8 @@ void Mqtt::start() {
on_publish(packetId);
});
// create space for command buffer, to avoid heap memory fragmentation
mqtt_subfunctions_.reserve(5);
// register command
Command::add(EMSdevice::DeviceType::SYSTEM, F_(publish), System::command_publish, F("forces a MQTT publish"), CommandFlag::ADMIN_ONLY);
}
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {
@@ -746,7 +751,13 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,
std::shared_ptr<MqttMessage> message;
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
// LOG_INFO("Added to queue: %s %s", message->topic.c_str(), message->payload.c_str()); // debugging only
#ifdef EMSESP_DEBUG
if (operation == Operation::PUBLISH) {
LOG_INFO("Adding to queue: (Publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str());
} else {
LOG_INFO("Adding to queue: (Subscribe) topic='%s'", message->topic.c_str());
}
#endif
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) {
@@ -865,10 +876,10 @@ void Mqtt::process_queue() {
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
LOG_DEBUG(F("Subscribing to topic: %s"), topic);
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
if (!packet_id) {
LOG_DEBUG(F("Error subscribing to %s"), topic);
LOG_DEBUG(F("Error subscribing to topic '%s'"), topic);
}
mqtt_messages_.pop_front(); // remove the message from the queue

View File

@@ -474,14 +474,15 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd) {
if (command == "mqtt_individual") {
shell.printfln(F("Testing individual MQTT"));
Mqtt::ha_enabled(false); // turn off HA Discovery to stop the chatter
Mqtt::subscribe_format(2); // individual topics, all HC
// Mqtt::subscribe_format(1); // individual topics, only main HC
Mqtt::nested_format(1);
// Mqtt::subscribe_format(2); // individual topics, all HC
Mqtt::subscribe_format(1); // individual topics, only main HC
run_test("boiler");
run_test("thermostat");
shell.invoke_command("show mqtt");
EMSESP::mqtt_.incoming("ems-esp/boiler/wwcircpump", "off");
// shell.invoke_command("show mqtt");
// EMSESP::mqtt_.incoming("ems-esp/boiler/wwcircpump", "off");
}
if (command == "mqtt_nested") {

View File

@@ -1 +1 @@
#define EMSESP_APP_VERSION "3.2.2b13"
#define EMSESP_APP_VERSION "3.2.2b14"