From 300fff1909227f772fe7fb13022ac3982280e31c Mon Sep 17 00:00:00 2001 From: MichaelDvP Date: Thu, 4 Mar 2021 16:10:34 +0100 Subject: [PATCH] add Mqtt-base on publish, do not store fulltopic in each message --- src/mqtt.cpp | 156 ++++++++++++++++++++++++++++++++++----------------- src/mqtt.h | 8 ++- 2 files changed, 109 insertions(+), 55 deletions(-) diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 34e531143..4d643bb1d 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -83,7 +83,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_ // register in our libary with the callback function. // We store both the original topic and the fully-qualified one - mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(message->topic), std::move(cb)); + mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb)); } // subscribe to the command topic if it doesn't exist yet @@ -131,37 +131,51 @@ void Mqtt::loop() { uint32_t currentMillis = uuid::get_uptime(); - // create publish messages for each of the EMS device values, adding to queue - if (publish_time_boiler_ && (currentMillis - last_publish_boiler_ > publish_time_boiler_)) { - last_publish_boiler_ = currentMillis; - EMSESP::publish_device_values(EMSdevice::DeviceType::BOILER); - } - - if (publish_time_thermostat_ && (currentMillis - last_publish_thermostat_ > publish_time_thermostat_)) { - last_publish_thermostat_ = currentMillis; - EMSESP::publish_device_values(EMSdevice::DeviceType::THERMOSTAT); - } - - if (publish_time_solar_ && (currentMillis - last_publish_solar_ > publish_time_solar_)) { - last_publish_solar_ = currentMillis; - EMSESP::publish_device_values(EMSdevice::DeviceType::SOLAR); - } - - if (publish_time_mixer_ && (currentMillis - last_publish_mixer_ > publish_time_mixer_)) { - last_publish_mixer_ = currentMillis; - EMSESP::publish_device_values(EMSdevice::DeviceType::MIXER); - } - - if (currentMillis - last_publish_sensor_ > publish_time_sensor_) { - last_publish_sensor_ = currentMillis; - EMSESP::publish_sensor_values(publish_time_sensor_ != 0); - } - // publish top item from MQTT queue to stop flooding if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) { last_mqtt_poll_ = currentMillis; process_queue(); } + + // dallas publish on change + if (!publish_time_sensor_) { + EMSESP::publish_sensor_values(false); + } + + if (!mqtt_messages_.empty()) { + return; + } + + // create publish messages for each of the EMS device values, adding to queue, only one device per loop + if (publish_time_boiler_ && (currentMillis - last_publish_boiler_ > publish_time_boiler_)) { + last_publish_boiler_ = (currentMillis / publish_time_boiler_) * publish_time_boiler_; + EMSESP::publish_device_values(EMSdevice::DeviceType::BOILER); + } else + + if (publish_time_thermostat_ && (currentMillis - last_publish_thermostat_ > publish_time_thermostat_)) { + last_publish_thermostat_ = (currentMillis / publish_time_thermostat_) * publish_time_thermostat_; + EMSESP::publish_device_values(EMSdevice::DeviceType::THERMOSTAT); + } else + + if (publish_time_solar_ && (currentMillis - last_publish_solar_ > publish_time_solar_)) { + last_publish_solar_ = (currentMillis / publish_time_solar_) * publish_time_solar_; + EMSESP::publish_device_values(EMSdevice::DeviceType::SOLAR); + } else + + if (publish_time_mixer_ && (currentMillis - last_publish_mixer_ > publish_time_mixer_)) { + last_publish_mixer_ = (currentMillis / publish_time_mixer_) * publish_time_mixer_; + EMSESP::publish_device_values(EMSdevice::DeviceType::MIXER); + } else + + if (publish_time_other_ && (currentMillis - last_publish_other_ > publish_time_other_)) { + last_publish_other_ = (currentMillis / publish_time_other_) * publish_time_other_; + EMSESP::publish_other_values(); + } else + + if (publish_time_sensor_ && (currentMillis - last_publish_sensor_ > publish_time_sensor_)) { + last_publish_sensor_ = (currentMillis / publish_time_sensor_) * publish_time_sensor_; + EMSESP::publish_sensor_values(true); + } } // print MQTT log and other stuff to console @@ -174,7 +188,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { // show subscriptions shell.printfln(F("MQTT topic subscriptions:")); for (const auto & mqtt_subfunction : mqtt_subfunctions_) { - shell.printfln(F(" %s"), mqtt_subfunction.full_topic_.c_str()); + shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str()); } shell.println(); @@ -214,10 +228,17 @@ void Mqtt::incoming(const char * topic, const char * payload) { } // received an MQTT message that we subscribed too -void Mqtt::on_message(const char * topic, const char * payload, size_t len) { +void Mqtt::on_message(const char * fulltopic, const char * payload, size_t len) { if (len == 0) { + LOG_DEBUG(F("Received empty message %s"), fulltopic); return; // ignore empty payloads } + if (strncmp(fulltopic, mqtt_base_.c_str(), strlen(mqtt_base_.c_str())) != 0) { + LOG_DEBUG(F("Received unknown message %s - %s"), fulltopic, payload); + return; // not for us + } + char topic[100]; + strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100); // convert payload to a null-terminated char string char message[len + 2]; @@ -227,7 +248,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) { // see if we have this topic in our subscription list, then call its callback handler for (const auto & mf : mqtt_subfunctions_) { - if (strcmp(topic, mf.full_topic_.c_str()) == 0) { + if (strcmp(topic, mf.topic_.c_str()) == 0) { if (mf.mqtt_subfunction_) { // matching function, call it. If it returns true keep quit if ((mf.mqtt_subfunction_)(message)) { @@ -293,7 +314,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic shell.print(F(" Subscribed MQTT topics: ")); for (const auto & mqtt_subfunction : mqtt_subfunctions_) { if (mqtt_subfunction.device_type_ == device_type) { - shell.printf(F("%s "), mqtt_subfunction.topic_.c_str()); + shell.printf(F("%s/%s "), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str()); } } shell.println(); @@ -306,6 +327,9 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic void Mqtt::on_publish(uint16_t packetId) { // find the MQTT message in the queue and remove it if (mqtt_messages_.empty()) { +#if defined(EMSESP_DEBUG) + LOG_DEBUG(F("[DEBUG] No message stored for ACK pid %d"), packetId); +#endif return; } @@ -313,6 +337,9 @@ void Mqtt::on_publish(uint16_t packetId) { // if the last published failed, don't bother checking it. wait for the next retry if (mqtt_message.packet_id_ == 0) { +#if defined(EMSESP_DEBUG) + LOG_DEBUG(F("[DEBUG] ACK for failed message pid 0")); +#endif return; } @@ -321,6 +348,10 @@ void Mqtt::on_publish(uint16_t packetId) { mqtt_publish_fails_++; // increment error count } +#if defined(EMSESP_DEBUG) + LOG_DEBUG(F("[DEBUG] ACK pid %d"), packetId); +#endif + mqtt_messages_.pop_front(); // always remove from queue, regardless if there was a successful ACK } @@ -356,6 +387,9 @@ void Mqtt::start() { mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); }); mqttClient_->onDisconnect([this](AsyncMqttClientDisconnectReason reason) { + if (!connecting_) { + return; + } connecting_ = false; if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) { LOG_INFO(F("MQTT disconnected: TCP")); @@ -372,6 +406,14 @@ void Mqtt::start() { if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) { LOG_INFO(F("MQTT disconnected: Not authorized")); } + // remove message with pending ack + if (!mqtt_messages_.empty()) { + auto mqtt_message = mqtt_messages_.front(); + if (mqtt_message.packet_id_ != 0) { + mqtt_messages_.pop_front(); + } + } + // mqtt_messages_.clear(); }); // create will_topic with the base prefixed. It has to be static because asyncmqttclient destroys the reference @@ -390,7 +432,7 @@ void Mqtt::start() { }); // create space for command buffer, to avoid heap memory fragmentation - mqtt_subfunctions_.reserve(10); + mqtt_subfunctions_.reserve(50); } void Mqtt::set_publish_time_boiler(uint16_t publish_time) { @@ -469,11 +511,15 @@ void Mqtt::on_connect() { bool_format_ = mqttSettings.bool_format; }); - // first time to connect - if (connectcount_ == 1) { // send info topic appended with the version information as JSON StaticJsonDocument doc; + // first time to connect + if (connectcount_ == 1) { doc["event"] = FJSON("start"); + } else { + doc["event"] = FJSON("reconnect"); + } + doc["version"] = EMSESP_APP_VERSION; #ifndef EMSESP_STANDALONE doc["ip"] = WiFi.localIP().toString(); @@ -489,7 +535,8 @@ void Mqtt::on_connect() { EMSESP::shower_.send_mqtt_stat(false); // Send shower_activated as false EMSESP::system_.send_heartbeat(); // send heatbeat - } else { + // } else { + if (connectcount_ > 1) { // we doing a re-connect from a TCP break // only re-subscribe again to all MQTT topics resubscribe(); @@ -559,15 +606,7 @@ std::shared_ptr Mqtt::queue_message(const uint8_t operation, // take the topic and prefix the base, unless its for HA std::shared_ptr message; - if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0)) { - // leave topic as it is - message = std::make_shared(operation, topic, payload, retain); - } else { - // prefix the base - std::string full_topic(MQTT_TOPIC_MAX_SIZE, '\0'); - snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), mqtt_base_.c_str(), topic.c_str()); - message = std::make_shared(operation, full_topic, payload, retain); - } + message = std::make_shared(operation, topic, payload, retain); // LOG_INFO("Added to queue: %s %s", message->topic.c_str(), message->payload.c_str()); // debugging only @@ -677,13 +716,20 @@ void Mqtt::process_queue() { // fetch first from queue and create the full topic name auto mqtt_message = mqtt_messages_.front(); auto message = mqtt_message.content_; + char topic[MQTT_TOPIC_MAX_SIZE]; + if ((strncmp(message->topic.c_str(), "homeassistant/", 13) == 0)) { + // leave topic as it is + strcpy(topic, message->topic.c_str()); + } else { + snprintf_P(topic, MQTT_TOPIC_MAX_SIZE, PSTR("%s/%s"), mqtt_base_.c_str(), message->topic.c_str()); + } // if we're subscribing... if (message->operation == Operation::SUBSCRIBE) { - LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str()); - uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_); + LOG_DEBUG(F("Subscribing to topic: %s"), topic); + uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_); if (!packet_id) { - LOG_DEBUG(F("Error subscribing to %s"), message->topic.c_str()); + LOG_DEBUG(F("Error subscribing to %s"), topic); } mqtt_messages_.pop_front(); // remove the message from the queue @@ -694,24 +740,27 @@ void Mqtt::process_queue() { // if this has already been published and we're waiting for an ACK, don't publish again // it will have a real packet ID if (mqtt_message.packet_id_ > 0) { +#if defined(EMSESP_DEBUG) + LOG_DEBUG(F("[DEBUG] Waitig for QOS-ACK")); +#endif return; } // else try and publish it - uint16_t packet_id = mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); - LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"), message->topic.c_str(), mqtt_message.id_, message->retain, mqtt_message.retry_count_ + 1, message->payload.size(), packet_id); + uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); + LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"), topic, mqtt_message.id_, message->retain, mqtt_message.retry_count_ + 1, message->payload.size(), packet_id); if (packet_id == 0) { // it failed. if we retried n times, give up. remove from queue if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) { - LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1); + LOG_ERROR(F("Failed to publish to %s after %d attempts"), topic, mqtt_message.retry_count_ + 1); mqtt_publish_fails_++; // increment failure counter mqtt_messages_.pop_front(); // delete return; } else { // update the record mqtt_messages_.front().retry_count_++; - LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1); + LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), topic, mqtt_message.retry_count_ + 1); return; // leave on queue for next time so it gets republished } } @@ -818,7 +867,7 @@ void Mqtt::publish_mqtt_ha_sensor(uint8_t type, // EMSdevice snprintf_P(topic, sizeof(topic), PSTR("homeassistant/sensor/%s/%s/config"), mqtt_base_.c_str(), uniq.c_str()); // topic // unit of measure and map the HA icon - if (uom != DeviceValueUOM::NONE) { + if (uom != DeviceValueUOM::NONE && uom != DeviceValueUOM::PUMP) { doc["unit_of_meas"] = EMSdevice::uom_to_string(uom); } switch (uom) { @@ -828,6 +877,9 @@ void Mqtt::publish_mqtt_ha_sensor(uint8_t type, // EMSdevice case DeviceValueUOM::PERCENT: doc["ic"] = F_(iconpercent); break; + case DeviceValueUOM::PUMP: + doc["ic"] = F_(iconpump); + break; case DeviceValueUOM::NONE: default: break; diff --git a/src/mqtt.h b/src/mqtt.h index 6804f12ce..079a7dbd1 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -180,6 +180,10 @@ class Mqtt { mqtt_retain_ = mqtt_retain; } + static bool is_empty() { + return mqtt_messages_.empty(); + } + /* struct QueuedMqttMessage { uint16_t id_; @@ -226,13 +230,11 @@ class Mqtt { struct MQTTSubFunction { uint8_t device_type_; // which device type, from DeviceType:: const std::string topic_; // short topic name - const std::string full_topic_; // the fully qualified topic name, usually with the hostname prefixed mqtt_subfunction_p mqtt_subfunction_; // can be empty - MQTTSubFunction(uint8_t device_type, const std::string && topic, const std::string && full_topic, mqtt_subfunction_p mqtt_subfunction) + MQTTSubFunction(uint8_t device_type, const std::string && topic, mqtt_subfunction_p mqtt_subfunction) : device_type_(device_type) , topic_(topic) - , full_topic_(full_topic) , mqtt_subfunction_(mqtt_subfunction) { } };