added back MAX_MQTT_MESSAGES

This commit is contained in:
Proddy
2023-02-23 19:26:58 +01:00
parent 077a9937fe
commit 425cf33f74
2 changed files with 29 additions and 25 deletions

View File

@@ -674,11 +674,10 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
} }
#ifndef EMSESP_STANDALONE #ifndef EMSESP_STANDALONE
// anything below 65MB available free heap is dangerously low, so we stop adding to prevent a crash // anything below 60MB available free heap is dangerously low, so we stop adding to prevent a crash
// instead of doing a mqtt_messages_.pop_front() // instead of doing a mqtt_messages_.pop_front()
auto free_heap = ESP.getFreeHeap() / 1024; if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES || ESP.getFreeHeap() < (60 * 1024)) {
if (free_heap < 65) { LOG_WARNING("Queue overflow (queue count=%d, topic=%s)", mqtt_messages_.size(), topic.c_str());
LOG_WARNING("Queue overflow (size %d, heap=%d)", mqtt_messages_.size(), free_heap);
mqtt_publish_fails_++; mqtt_publish_fails_++;
return; // don't add to top of queue return; // don't add to top of queue
} }
@@ -689,7 +688,7 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
if (operation == Operation::PUBLISH) { if (operation == Operation::PUBLISH) {
if (message->payload.empty()) { if (message->payload.empty()) {
LOG_DEBUG("Adding to queue: (publish) topic='%s' empty payload", message->topic.c_str()); LOG_DEBUG("Adding to queue: (remove) topic='%s'", message->topic.c_str());
} else { } else {
LOG_DEBUG("Adding to queue: (publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str()); LOG_DEBUG("Adding to queue: (publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str());
} }
@@ -769,10 +768,8 @@ void Mqtt::queue_remove_topic(const char * topic) {
} }
if (ha_enabled_) { if (ha_enabled_) {
LOG_DEBUG("Publishing HA topic %s%s", Mqtt::discovery_prefix().c_str(), topic);
queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker
} else { } else {
LOG_DEBUG("Publishing topic %s", topic);
queue_publish_message(topic, "", true); // publish with retain to remove from broker queue_publish_message(topic, "", true); // publish with retain to remove from broker
} }
} }
@@ -787,11 +784,7 @@ void Mqtt::queue_ha(const char * topic, const JsonObject & payload) {
payload_text.reserve(measureJson(payload) + 1); payload_text.reserve(measureJson(payload) + 1);
serializeJson(payload, payload_text); // convert json to string serializeJson(payload, payload_text); // convert json to string
std::string fulltopic = Mqtt::discovery_prefix() + topic; queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true
LOG_DEBUG("Publishing HA topic=%s, payload=%s", fulltopic.c_str(), payload_text.c_str());
// queue messages if the MQTT connection is not yet established. to ensure we don't miss messages
queue_publish_message(fulltopic, payload_text, true); // with retain true
} }
// take top from queue and perform the publish or subscribe action // take top from queue and perform the publish or subscribe action
@@ -864,7 +857,7 @@ void Mqtt::process_queue() {
lasttopic_ = topic; lasttopic_ = topic;
lastpayload_ = message->payload; lastpayload_ = message->payload;
LOG_DEBUG("Publishing topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)", LOG_DEBUG("Published topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)",
topic, topic,
mqtt_message.id_, mqtt_message.id_,
message->retain, message->retain,
@@ -872,9 +865,11 @@ void Mqtt::process_queue() {
message->payload.size(), message->payload.size(),
packet_id); packet_id);
/*
if (!message->payload.empty()) { if (!message->payload.empty()) {
LOG_DEBUG("Payload:%s", message->payload.c_str()); // extra message for #784 LOG_DEBUG("Payload: %s", message->payload.c_str()); // extra message for #784
} }
*/
if (packet_id == 0) { if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue // it failed. if we retried n times, give up. remove from queue
@@ -987,15 +982,17 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
// create the device name // create the device name
auto device_name = EMSdevice::device_type_2_device_name(device_type); auto device_name = EMSdevice::device_type_2_device_name(device_type);
// create entity by add the hc/wwc tag if present, separating with a _ bool has_tag = ((tag < DeviceValue::NUM_TAGS) && (tag != DeviceValue::DeviceValueTAG::TAG_NONE) && strlen(DeviceValue::DeviceValueTAG_s[tag][0]));
// create entity by add the hc/wwc tag if present, separating with an _
char entity_with_tag[50]; char entity_with_tag[50];
if (tag >= DeviceValueTAG::TAG_HC1) { if (has_tag) {
snprintf(entity_with_tag, sizeof(entity_with_tag), "%s_%s", EMSdevice::tag_to_mqtt(tag), entity); snprintf(entity_with_tag, sizeof(entity_with_tag), "%s_%s", EMSdevice::tag_to_mqtt(tag), entity);
} else { } else {
snprintf(entity_with_tag, sizeof(entity_with_tag), "%s", entity); snprintf(entity_with_tag, sizeof(entity_with_tag), "%s", entity);
} }
// build unique identifier also used as object_id and becomes the Entity ID in HA // build unique identifier also used as object_id which also becomes the Entity ID in HA
char uniq_id[80]; char uniq_id[80];
if (Mqtt::entity_format() == 2) { if (Mqtt::entity_format() == 2) {
// prefix base name to each uniq_id and use the shortname // prefix base name to each uniq_id and use the shortname
@@ -1009,10 +1006,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
char uniq_s[60]; char uniq_s[60];
strlcpy(uniq_s, en_name, sizeof(uniq_s)); strlcpy(uniq_s, en_name, sizeof(uniq_s));
Helpers::replace_char(uniq_s, ' ', '_'); Helpers::replace_char(uniq_s, ' ', '_');
if (tag == DeviceValueTAG::TAG_NONE) { if (has_tag) {
snprintf(uniq_id, sizeof(uniq_id), "%s_%s", device_name, Helpers::toLower(uniq_s).c_str()); snprintf(uniq_id, sizeof(uniq_id), "%s_%s", device_name, Helpers::toLower(uniq_s).c_str());
} else { } else {
snprintf(uniq_id, sizeof(uniq_id), "%s_%s_%s", device_name, EMSdevice::tag_to_string(tag, false), Helpers::toLower(uniq_s).c_str()); snprintf(uniq_id, sizeof(uniq_id), "%s_%s_%s", device_name, DeviceValue::DeviceValueTAG_s[tag][0], Helpers::toLower(uniq_s).c_str());
} }
} }
@@ -1066,11 +1063,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
} }
} }
// if we're asking to remove this topic, send an empty payload and exit // if we're asking to remove this topic, send an empty payload and exit
// https://github.com/emsesp/EMS-ESP32/issues/196 // https://github.com/emsesp/EMS-ESP32/issues/196
if (remove) { if (remove) {
LOG_DEBUG("Removing HA config for %s", uniq_id); LOG_DEBUG("Queuing removing topic %s", topic);
queue_remove_topic(topic); queue_remove_topic(topic);
return; return;
} }
@@ -1150,9 +1146,9 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
char ha_name[70]; char ha_name[70];
char * F_name = strdup(fullname); char * F_name = strdup(fullname);
Helpers::CharToUpperUTF8(F_name); // capitalize first letter Helpers::CharToUpperUTF8(F_name); // capitalize first letter
if (tag > DeviceValueTAG::TAG_HEARTBEAT) { if (has_tag) {
// exclude heartbeat tag // exclude heartbeat tag
snprintf(ha_name, sizeof(ha_name), "%s %s", EMSdevice::tag_to_string(tag), F_name); snprintf(ha_name, sizeof(ha_name), "%s %s", DeviceValue::DeviceValueTAG_s[tag][0], F_name);
} else { } else {
snprintf(ha_name, sizeof(ha_name), "%s", F_name); // no tag snprintf(ha_name, sizeof(ha_name), "%s", F_name); // no tag
} }
@@ -1301,6 +1297,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
// add "availability" section // add "availability" section
add_avty_to_doc(stat_t, doc.as<JsonObject>(), val_cond); add_avty_to_doc(stat_t, doc.as<JsonObject>(), val_cond);
// TODO queue it or send it directly via publish?
queue_ha(topic, doc.as<JsonObject>()); queue_ha(topic, doc.as<JsonObject>());
} }
@@ -1312,8 +1309,8 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
char hc_mode_s[30]; char hc_mode_s[30];
char seltemp_s[30]; char seltemp_s[30];
char currtemp_s[30]; char currtemp_s[30];
char hc_mode_cond[70]; char hc_mode_cond[80];
char seltemp_cond[70]; char seltemp_cond[80];
char currtemp_cond[170]; char currtemp_cond[170];
char mode_str_tpl[400]; char mode_str_tpl[400];
char name_s[10]; char name_s[10];
@@ -1358,6 +1355,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
snprintf(mode_str_tpl, snprintf(mode_str_tpl,
sizeof(mode_str_tpl), sizeof(mode_str_tpl),
"{%%if %s%%}off{%%elif %s=='manual'%%}heat{%%elif %s=='day'%%}heat{%%elif %s=='night'%%}off{%%elif %s=='off'%%}off{%%else%%}auto{%%endif%%}", "{%%if %s%%}off{%%elif %s=='manual'%%}heat{%%elif %s=='day'%%}heat{%%elif %s=='night'%%}off{%%elif %s=='off'%%}off{%%else%%}auto{%%endif%%}",
hc_mode_cond,
hc_mode_s, hc_mode_s,
hc_mode_s, hc_mode_s,
hc_mode_s, hc_mode_s,
@@ -1445,6 +1443,7 @@ void Mqtt::add_avty_to_doc(const char * state_t, const JsonObject & doc, const c
snprintf(tpl, sizeof(tpl), tpl_draft, "value == 'online'"); snprintf(tpl, sizeof(tpl), tpl_draft, "value == 'online'");
avty_json["val_tpl"] = tpl; avty_json["val_tpl"] = tpl;
avty.add(avty_json); avty.add(avty_json);
avty.clear();
avty_json["t"] = state_t; avty_json["t"] = state_t;
snprintf(tpl, sizeof(tpl), tpl_draft, cond1 == nullptr ? "value is defined" : cond1); snprintf(tpl, sizeof(tpl), tpl_draft, cond1 == nullptr ? "value is defined" : cond1);
@@ -1452,12 +1451,14 @@ void Mqtt::add_avty_to_doc(const char * state_t, const JsonObject & doc, const c
avty.add(avty_json); avty.add(avty_json);
if (cond2 != nullptr) { if (cond2 != nullptr) {
avty.clear();
snprintf(tpl, sizeof(tpl), tpl_draft, cond2); snprintf(tpl, sizeof(tpl), tpl_draft, cond2);
avty_json["val_tpl"] = tpl; avty_json["val_tpl"] = tpl;
avty.add(avty_json); avty.add(avty_json);
} }
if (negcond != nullptr) { if (negcond != nullptr) {
avty.clear();
snprintf(tpl, sizeof(tpl), "{{'offline' if %s else 'online'}}", negcond); snprintf(tpl, sizeof(tpl), "{{'offline' if %s else 'online'}}", negcond);
avty_json["val_tpl"] = tpl; avty_json["val_tpl"] = tpl;
avty.add(avty_json); avty.add(avty_json);

View File

@@ -31,6 +31,9 @@ using uuid::console::Shell;
namespace emsesp { namespace emsesp {
// size of queue
static constexpr uint16_t MAX_MQTT_MESSAGES = 300;
using mqtt_sub_function_p = std::function<bool(const char * message)>; using mqtt_sub_function_p = std::function<bool(const char * message)>;
struct MqttMessage { struct MqttMessage {