From 425cf33f74682513b13a34f29b704a3d373078d0 Mon Sep 17 00:00:00 2001 From: Proddy Date: Thu, 23 Feb 2023 19:26:58 +0100 Subject: [PATCH] added back MAX_MQTT_MESSAGES --- src/mqtt.cpp | 51 ++++++++++++++++++++++++++------------------------- src/mqtt.h | 3 +++ 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/mqtt.cpp b/src/mqtt.cpp index ffd9f7ec3..1964bdd56 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -674,11 +674,10 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con } #ifndef EMSESP_STANDALONE - // anything below 65MB available free heap is dangerously low, so we stop adding to prevent a crash + // anything below 60MB available free heap is dangerously low, so we stop adding to prevent a crash // instead of doing a mqtt_messages_.pop_front() - auto free_heap = ESP.getFreeHeap() / 1024; - if (free_heap < 65) { - LOG_WARNING("Queue overflow (size %d, heap=%d)", mqtt_messages_.size(), free_heap); + if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES || ESP.getFreeHeap() < (60 * 1024)) { + LOG_WARNING("Queue overflow (queue count=%d, topic=%s)", mqtt_messages_.size(), topic.c_str()); mqtt_publish_fails_++; return; // don't add to top of queue } @@ -689,7 +688,7 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con if (operation == Operation::PUBLISH) { if (message->payload.empty()) { - LOG_DEBUG("Adding to queue: (publish) topic='%s' empty payload", message->topic.c_str()); + LOG_DEBUG("Adding to queue: (remove) topic='%s'", message->topic.c_str()); } else { LOG_DEBUG("Adding to queue: (publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str()); } @@ -769,10 +768,8 @@ void Mqtt::queue_remove_topic(const char * topic) { } if (ha_enabled_) { - LOG_DEBUG("Publishing HA topic %s%s", Mqtt::discovery_prefix().c_str(), topic); queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker } else { - LOG_DEBUG("Publishing topic %s", topic); queue_publish_message(topic, "", true); // publish with retain to remove from broker } } @@ -787,11 +784,7 @@ void Mqtt::queue_ha(const char * topic, const JsonObject & payload) { payload_text.reserve(measureJson(payload) + 1); serializeJson(payload, payload_text); // convert json to string - std::string fulltopic = Mqtt::discovery_prefix() + topic; - LOG_DEBUG("Publishing HA topic=%s, payload=%s", fulltopic.c_str(), payload_text.c_str()); - - // queue messages if the MQTT connection is not yet established. to ensure we don't miss messages - queue_publish_message(fulltopic, payload_text, true); // with retain true + queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true } // take top from queue and perform the publish or subscribe action @@ -864,7 +857,7 @@ void Mqtt::process_queue() { lasttopic_ = topic; lastpayload_ = message->payload; - LOG_DEBUG("Publishing topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)", + LOG_DEBUG("Published topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)", topic, mqtt_message.id_, message->retain, @@ -872,9 +865,11 @@ void Mqtt::process_queue() { message->payload.size(), packet_id); + /* if (!message->payload.empty()) { - LOG_DEBUG("Payload:%s", message->payload.c_str()); // extra message for #784 + LOG_DEBUG("Payload: %s", message->payload.c_str()); // extra message for #784 } + */ if (packet_id == 0) { // it failed. if we retried n times, give up. remove from queue @@ -987,15 +982,17 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev // create the device name auto device_name = EMSdevice::device_type_2_device_name(device_type); - // create entity by add the hc/wwc tag if present, separating with a _ + bool has_tag = ((tag < DeviceValue::NUM_TAGS) && (tag != DeviceValue::DeviceValueTAG::TAG_NONE) && strlen(DeviceValue::DeviceValueTAG_s[tag][0])); + + // create entity by add the hc/wwc tag if present, separating with an _ char entity_with_tag[50]; - if (tag >= DeviceValueTAG::TAG_HC1) { + if (has_tag) { snprintf(entity_with_tag, sizeof(entity_with_tag), "%s_%s", EMSdevice::tag_to_mqtt(tag), entity); } else { snprintf(entity_with_tag, sizeof(entity_with_tag), "%s", entity); } - // build unique identifier also used as object_id and becomes the Entity ID in HA + // build unique identifier also used as object_id which also becomes the Entity ID in HA char uniq_id[80]; if (Mqtt::entity_format() == 2) { // prefix base name to each uniq_id and use the shortname @@ -1009,10 +1006,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev char uniq_s[60]; strlcpy(uniq_s, en_name, sizeof(uniq_s)); Helpers::replace_char(uniq_s, ' ', '_'); - if (tag == DeviceValueTAG::TAG_NONE) { + if (has_tag) { snprintf(uniq_id, sizeof(uniq_id), "%s_%s", device_name, Helpers::toLower(uniq_s).c_str()); } else { - snprintf(uniq_id, sizeof(uniq_id), "%s_%s_%s", device_name, EMSdevice::tag_to_string(tag, false), Helpers::toLower(uniq_s).c_str()); + snprintf(uniq_id, sizeof(uniq_id), "%s_%s_%s", device_name, DeviceValue::DeviceValueTAG_s[tag][0], Helpers::toLower(uniq_s).c_str()); } } @@ -1066,11 +1063,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev } } - // if we're asking to remove this topic, send an empty payload and exit // https://github.com/emsesp/EMS-ESP32/issues/196 if (remove) { - LOG_DEBUG("Removing HA config for %s", uniq_id); + LOG_DEBUG("Queuing removing topic %s", topic); queue_remove_topic(topic); return; } @@ -1150,9 +1146,9 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev char ha_name[70]; char * F_name = strdup(fullname); Helpers::CharToUpperUTF8(F_name); // capitalize first letter - if (tag > DeviceValueTAG::TAG_HEARTBEAT) { + if (has_tag) { // exclude heartbeat tag - snprintf(ha_name, sizeof(ha_name), "%s %s", EMSdevice::tag_to_string(tag), F_name); + snprintf(ha_name, sizeof(ha_name), "%s %s", DeviceValue::DeviceValueTAG_s[tag][0], F_name); } else { snprintf(ha_name, sizeof(ha_name), "%s", F_name); // no tag } @@ -1301,6 +1297,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev // add "availability" section add_avty_to_doc(stat_t, doc.as(), val_cond); + // TODO queue it or send it directly via publish? queue_ha(topic, doc.as()); } @@ -1312,8 +1309,8 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, char hc_mode_s[30]; char seltemp_s[30]; char currtemp_s[30]; - char hc_mode_cond[70]; - char seltemp_cond[70]; + char hc_mode_cond[80]; + char seltemp_cond[80]; char currtemp_cond[170]; char mode_str_tpl[400]; char name_s[10]; @@ -1358,6 +1355,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, snprintf(mode_str_tpl, sizeof(mode_str_tpl), "{%%if %s%%}off{%%elif %s=='manual'%%}heat{%%elif %s=='day'%%}heat{%%elif %s=='night'%%}off{%%elif %s=='off'%%}off{%%else%%}auto{%%endif%%}", + hc_mode_cond, hc_mode_s, hc_mode_s, hc_mode_s, @@ -1445,6 +1443,7 @@ void Mqtt::add_avty_to_doc(const char * state_t, const JsonObject & doc, const c snprintf(tpl, sizeof(tpl), tpl_draft, "value == 'online'"); avty_json["val_tpl"] = tpl; avty.add(avty_json); + avty.clear(); avty_json["t"] = state_t; snprintf(tpl, sizeof(tpl), tpl_draft, cond1 == nullptr ? "value is defined" : cond1); @@ -1452,12 +1451,14 @@ void Mqtt::add_avty_to_doc(const char * state_t, const JsonObject & doc, const c avty.add(avty_json); if (cond2 != nullptr) { + avty.clear(); snprintf(tpl, sizeof(tpl), tpl_draft, cond2); avty_json["val_tpl"] = tpl; avty.add(avty_json); } if (negcond != nullptr) { + avty.clear(); snprintf(tpl, sizeof(tpl), "{{'offline' if %s else 'online'}}", negcond); avty_json["val_tpl"] = tpl; avty.add(avty_json); diff --git a/src/mqtt.h b/src/mqtt.h index 5f72faf7c..19db5c866 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -31,6 +31,9 @@ using uuid::console::Shell; namespace emsesp { +// size of queue +static constexpr uint16_t MAX_MQTT_MESSAGES = 300; + using mqtt_sub_function_p = std::function; struct MqttMessage {