diff --git a/src/analogsensor.cpp b/src/analogsensor.cpp index a46cc0b7e..2a36ffce2 100644 --- a/src/analogsensor.cpp +++ b/src/analogsensor.cpp @@ -385,7 +385,7 @@ void AnalogSensor::publish_sensor(const Sensor & sensor) const { snprintf(topic, sizeof(topic), "%s%s/%s", F_(analogsensor), "_data", sensor.name().c_str()); } char payload[10]; - Mqtt::publish(topic, Helpers::render_value(payload, sensor.value(), 2)); // always publish as doubles + Mqtt::queue_publish(topic, Helpers::render_value(payload, sensor.value(), 2)); // always publish as doubles } } @@ -398,7 +398,7 @@ void AnalogSensor::remove_ha_topic(const uint8_t gpio) const { LOG_DEBUG("Removing HA config for analog sensor GPIO %02d", gpio); char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), gpio); - Mqtt::remove_topic(topic); + Mqtt::queue_remove_topic(topic); } // send all sensor values as a JSON package to MQTT @@ -495,14 +495,14 @@ void AnalogSensor::publish_values(const bool force) { char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), sensor.gpio()); - Mqtt::publish_ha(topic, config.as()); + Mqtt::queue_ha(topic, config.as()); sensor.ha_registered = true; } } } - Mqtt::publish("analogsensor_data", doc.as()); + Mqtt::queue_publish("analogsensor_data", doc.as()); } // called from emsesp.cpp, similar to the emsdevice->get_value_info diff --git a/src/dallassensor.cpp b/src/dallassensor.cpp index 7897355a6..278315940 100644 --- a/src/dallassensor.cpp +++ b/src/dallassensor.cpp @@ -447,7 +447,7 @@ void DallasSensor::publish_sensor(const Sensor & sensor) { snprintf(topic, sizeof(topic), "%s%s/%s", (F_(dallassensor)), "_data", sensor.name().c_str()); } char payload[10]; - Mqtt::publish(topic, Helpers::render_value(payload, sensor.temperature_c, 10, EMSESP::system_.fahrenheit() ? 2 : 0)); + Mqtt::queue_publish(topic, Helpers::render_value(payload, sensor.temperature_c, 10, EMSESP::system_.fahrenheit() ? 2 : 0)); } } @@ -462,7 +462,7 @@ void DallasSensor::remove_ha_topic(const std::string & id) { std::replace(sensorid.begin(), sensorid.end(), '-', '_'); char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str()); - Mqtt::remove_topic(topic); + Mqtt::queue_remove_topic(topic); } // send all dallas sensor values as a JSON package to MQTT @@ -555,14 +555,14 @@ void DallasSensor::publish_values(const bool force) { snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str()); - Mqtt::publish_ha(topic, config.as()); + Mqtt::queue_ha(topic, config.as()); sensor.ha_registered = true; } } } - Mqtt::publish("dallassensor_data", doc.as()); + Mqtt::queue_publish("dallassensor_data", doc.as()); } diff --git a/src/devices/boiler.cpp b/src/devices/boiler.cpp index bae5d35fa..1b56edb70 100644 --- a/src/devices/boiler.cpp +++ b/src/devices/boiler.cpp @@ -847,7 +847,7 @@ void Boiler::check_active(const bool force) { if (heatingActive_ != val || force) { heatingActive_ = val; char s[12]; - Mqtt::publish(F_(heating_active), Helpers::render_boolean(s, b)); + Mqtt::queue_publish(F_(heating_active), Helpers::render_boolean(s, b)); } // check if we can use tapactivated in flow systems @@ -871,7 +871,7 @@ void Boiler::check_active(const bool force) { if (tapwaterActive_ != val || force) { tapwaterActive_ = val; char s[12]; - Mqtt::publish(F_(tapwater_active), Helpers::render_boolean(s, b)); + Mqtt::queue_publish(F_(tapwater_active), Helpers::render_boolean(s, b)); EMSESP::tap_water_active(b); // let EMS-ESP know, used in the Shower class } } diff --git a/src/emsdevice.cpp b/src/emsdevice.cpp index 282399e69..e023ec06c 100644 --- a/src/emsdevice.cpp +++ b/src/emsdevice.cpp @@ -770,7 +770,7 @@ void EMSdevice::publish_value(void * value_p) const { } if (payload[0] != '\0') { - Mqtt::publish(topic, payload); + Mqtt::queue_publish(topic, payload); } } } @@ -1639,6 +1639,7 @@ bool EMSdevice::generate_values(JsonObject & output, const uint8_t tag_filter, c json[name] = time_value; } } + // commenting out - we don't want Commands in MQTT or Console // else if (dv.type == DeviceValueType::CMD && output_target != EMSdevice::OUTPUT_TARGET::MQTT) { // json[name] = ""; @@ -1670,7 +1671,6 @@ bool EMSdevice::generate_values(JsonObject & output, const uint8_t tag_filter, c // remove the Home Assistant configs for each device value/entity if its not visible or active or marked as read-only // this is called when an MQTT publish is done via an EMS Device in emsesp.cpp::publish_device_values() -// TODO remove topic remove on cold start void EMSdevice::mqtt_ha_entity_config_remove() { for (auto & dv : devicevalues_) { if (dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED) @@ -1681,6 +1681,7 @@ void EMSdevice::mqtt_ha_entity_config_remove() { if (dv.short_name == FL_(climate)[0]) { Mqtt::publish_ha_climate_config(dv.tag, false, true); // delete topic (remove = true) } else { + // TODO check if we still need to remove topic on a cold start? Mqtt::publish_ha_sensor_config(dv, "", "", true); // delete topic (remove = true) } } @@ -1696,7 +1697,7 @@ void EMSdevice::mqtt_ha_entity_config_create() { // create climate if roomtemp is visible // create the discovery topic if if hasn't already been created, not a command (like reset) and is active and visible for (auto & dv : devicevalues_) { - // TODO removed + // TODO remove // if (dv.has_state(DeviceValueState::DV_HA_CONFIG_RECREATE)) { // dv.remove_state(DeviceValueState::DV_HA_CONFIG_CREATED); // dv.remove_state(DeviceValueState::DV_HA_CONFIG_RECREATE); @@ -1730,7 +1731,7 @@ void EMSdevice::mqtt_ha_entity_config_create() { // remove all config topics in HA void EMSdevice::ha_config_clear() { for (auto & dv : devicevalues_) { - // dv.add_state(DeviceValueState::DV_HA_CONFIG_RECREATE); // TODO removed + // dv.add_state(DeviceValueState::DV_HA_CONFIG_RECREATE); // TODO remove if (ha_config_firstrun()) { dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); // make sure it is removed if not active } diff --git a/src/emsesp.cpp b/src/emsesp.cpp index 272b29b23..e831b3509 100644 --- a/src/emsesp.cpp +++ b/src/emsesp.cpp @@ -482,7 +482,7 @@ void EMSESP::publish_all(bool force) { } } -// on command "publish HA" loop and wait between devices for publishing all sensors +// loop and wait between devices for publishing all values void EMSESP::publish_all_loop() { if (!Mqtt::connected() || !publish_all_idx_) { return; @@ -557,14 +557,14 @@ void EMSESP::publish_device_values(uint8_t device_type) { // we may have some RETAINED /config topics that reference fields in the data payloads that no longer exist // remove them immediately to prevent HA from complaining // we need to do this first before the data payload is published, and only done once! - if (emsdevice->ha_config_firstrun()) { - emsdevice->ha_config_clear(); - emsdevice->ha_config_firstrun(false); - return; - } else { - // see if we need to delete and /config topics before adding the payloads - emsdevice->mqtt_ha_entity_config_remove(); - } + // TODO remove + // if (emsdevice->ha_config_firstrun()) { + // emsdevice->ha_config_clear(); + // emsdevice->ha_config_firstrun(false); + // return; + // } else { + // see if we need to delete and /config topics before adding the payloads + emsdevice->mqtt_ha_entity_config_remove(); } } } @@ -581,7 +581,7 @@ void EMSESP::publish_device_values(uint8_t device_type) { } } if (need_publish && ((!nested && tag >= DeviceValueTAG::TAG_DEVICE_DATA_WW) || (tag == DeviceValueTAG::TAG_BOILER_DATA_WW))) { - Mqtt::publish(Mqtt::tag_to_topic(device_type, tag), json); + Mqtt::queue_publish(Mqtt::tag_to_topic(device_type, tag), json); json = doc.to(); need_publish = false; } @@ -590,7 +590,7 @@ void EMSESP::publish_device_values(uint8_t device_type) { if (doc.overflowed()) { LOG_WARNING("MQTT buffer overflow, please use individual topics"); } - Mqtt::publish(Mqtt::tag_to_topic(device_type, DeviceValueTAG::TAG_NONE), json); + Mqtt::queue_publish(Mqtt::tag_to_topic(device_type, DeviceValueTAG::TAG_NONE), json); } // we want to create the /config topic after the data payload to prevent HA from throwing up a warning @@ -598,6 +598,7 @@ void EMSESP::publish_device_values(uint8_t device_type) { for (const auto & emsdevice : emsdevices) { if (emsdevice && (emsdevice->device_type() == device_type)) { emsdevice->mqtt_ha_entity_config_create(); + // EMSESP::mqtt_.loop(); // TODO experimental } } } @@ -651,7 +652,7 @@ void EMSESP::publish_response(std::shared_ptr telegram) { doc["value"] = value; } - Mqtt::publish("response", doc.as()); + Mqtt::queue_publish("response", doc.as()); } // builds json with the detail of each value, for a specific EMS device type or the dallas sensor diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 5d92bb37f..ffd9f7ec3 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -301,7 +301,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons if (!(mf.mqtt_subfunction_)(message)) { LOG_ERROR("error: invalid payload %s for this topic %s", message, topic); if (send_response_) { - Mqtt::publish("response", "error: invalid data"); + Mqtt::queue_publish("response", "error: invalid data"); } } return; @@ -336,12 +336,12 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons } LOG_ERROR(error); if (send_response_) { - Mqtt::publish("response", error); + Mqtt::queue_publish("response", error); } } else { // all good, send back json output from call if (send_response_) { - Mqtt::publish("response", output); + Mqtt::queue_publish("response", output); } } } @@ -589,7 +589,7 @@ void Mqtt::on_connect() { resubscribe(); // publish to the last will topic (see Mqtt::start() function) to say we're alive - publish_retain("status", "online", true); // with retain on + queue_publish_retain("status", "online", true); // with retain on mqtt_publish_fails_ = 0; // reset fail count to 0 @@ -643,7 +643,7 @@ void Mqtt::ha_status() { char topic[MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "binary_sensor/%s/system_status/config", mqtt_basename_.c_str()); - Mqtt::publish_ha(topic, doc.as()); // publish the config payload with retain flag + Mqtt::queue_ha(topic, doc.as()); // publish the config payload with retain flag // create the sensors - must match the MQTT payload keys // these are all from the heartbeat MQTT topic @@ -673,6 +673,17 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con return; } +#ifndef EMSESP_STANDALONE + // anything below 65MB available free heap is dangerously low, so we stop adding to prevent a crash + // instead of doing a mqtt_messages_.pop_front() + auto free_heap = ESP.getFreeHeap() / 1024; + if (free_heap < 65) { + LOG_WARNING("Queue overflow (size %d, heap=%d)", mqtt_messages_.size(), free_heap); + mqtt_publish_fails_++; + return; // don't add to top of queue + } +#endif + // take the topic and prefix the base, unless its for HA std::shared_ptr message = std::make_shared(operation, topic, payload, retain); @@ -686,21 +697,6 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con LOG_DEBUG("Adding to queue: (subscribe) topic='%s'", message->topic.c_str()); } -#ifndef EMSESP_STANDALONE - // TODO to look at with @MichaelDvP ... - // TODO also reduce the time to process the queue so it empties quicker? I changed MQTT_PUBLISH_WAIT from 100 to 75 - // TODO or call process_queue() to process the front of queue immediately? - // TODO because it takes 10 seconds (default publish interval) before the queue gets published - // TODO and does returning with mqtt_messages_.pop_front() have any negative side affects? - - // anything below 65MB available free heap is dangerously low - if (ESP.getFreeHeap() < (65 * 1024)) { - LOG_WARNING("Queue overflow (size %d)", mqtt_messages_.size()); - mqtt_publish_fails_++; - return; // don't add to top of queue - } -#endif - mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message)); return; @@ -725,40 +721,40 @@ void Mqtt::queue_unsubscribe_message(const std::string & topic) { } // MQTT Publish, using a user's retain flag -void Mqtt::publish(const std::string & topic, const std::string & payload) { +void Mqtt::queue_publish(const std::string & topic, const std::string & payload) { queue_publish_message(topic, payload, mqtt_retain_); } // MQTT Publish, using a user's retain flag - except for char * strings -void Mqtt::publish(const char * topic, const char * payload) { +void Mqtt::queue_publish(const char * topic, const char * payload) { queue_publish_message((topic), payload, mqtt_retain_); } // MQTT Publish, using a specific retain flag, topic is a flash string -void Mqtt::publish(const char * topic, const std::string & payload) { +void Mqtt::queue_publish(const char * topic, const std::string & payload) { queue_publish_message((topic), payload, mqtt_retain_); } -void Mqtt::publish(const char * topic, const JsonObject & payload) { - publish_retain(topic, payload, mqtt_retain_); +void Mqtt::queue_publish(const char * topic, const JsonObject & payload) { + queue_publish_retain(topic, payload, mqtt_retain_); } // publish json doc, only if its not empty -void Mqtt::publish(const std::string & topic, const JsonObject & payload) { - publish_retain(topic, payload, mqtt_retain_); +void Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) { + queue_publish_retain(topic, payload, mqtt_retain_); } // MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag -void Mqtt::publish_retain(const char * topic, const std::string & payload, bool retain) { +void Mqtt::queue_publish_retain(const char * topic, const std::string & payload, bool retain) { queue_publish_message((topic), payload, retain); } // publish json doc, only if its not empty, using the retain flag -void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload, bool retain) { - publish_retain(topic.c_str(), payload, retain); +void Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain) { + queue_publish_retain(topic.c_str(), payload, retain); } -void Mqtt::publish_retain(const char * topic, const JsonObject & payload, bool retain) { +void Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, bool retain) { if (enabled() && payload.size()) { std::string payload_text; serializeJson(payload, payload_text); // convert json to string @@ -767,7 +763,7 @@ void Mqtt::publish_retain(const char * topic, const JsonObject & payload, bool r } // publish empty payload to remove the topic -void Mqtt::remove_topic(const char * topic) { +void Mqtt::queue_remove_topic(const char * topic) { if (!enabled()) { return; } @@ -781,8 +777,8 @@ void Mqtt::remove_topic(const char * topic) { } } -// publish a Home Assistant config topic and payload, with retain flag off. -void Mqtt::publish_ha(const char * topic, const JsonObject & payload) { +// queue a Home Assistant config topic and payload, with retain flag off. +void Mqtt::queue_ha(const char * topic, const JsonObject & payload) { if (!enabled()) { return; } @@ -863,6 +859,7 @@ void Mqtt::process_queue() { } // else try and publish it + // this is where the *real* publish happens uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); lasttopic_ = topic; lastpayload_ = message->payload; @@ -1074,7 +1071,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev // https://github.com/emsesp/EMS-ESP32/issues/196 if (remove) { LOG_DEBUG("Removing HA config for %s", uniq_id); - remove_topic(topic); + queue_remove_topic(topic); return; } @@ -1304,7 +1301,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev // add "availability" section add_avty_to_doc(stat_t, doc.as(), val_cond); - publish_ha(topic, doc.as()); + queue_ha(topic, doc.as()); } void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) { @@ -1328,7 +1325,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, snprintf(topic, sizeof(topic), "climate/%s/thermostat_hc%d/config", mqtt_basename_.c_str(), hc_num); if (remove) { - remove_topic(topic); // publish empty payload with retain flag + queue_remove_topic(topic); // publish empty payload with retain flag return; } @@ -1413,7 +1410,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, // add "availability" section add_avty_to_doc(topic_t, doc.as(), seltemp_cond, has_roomtemp ? currtemp_cond : nullptr, hc_mode_cond); - publish_ha(topic, doc.as()); // publish the config payload with retain flag + queue_ha(topic, doc.as()); // publish the config payload with retain flag } // based on the device and tag, create the MQTT topic name (without the basename) diff --git a/src/mqtt.h b/src/mqtt.h index 24c6d63c9..5f72faf7c 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -75,16 +75,16 @@ class Mqtt { static void subscribe(const std::string & topic); static void resubscribe(); - static void publish(const std::string & topic, const std::string & payload); - static void publish(const char * topic, const char * payload); - static void publish(const std::string & topic, const JsonObject & payload); - static void publish(const char * topic, const JsonObject & payload); - static void publish(const char * topic, const std::string & payload); - static void publish_retain(const std::string & topic, const JsonObject & payload, bool retain); - static void publish_retain(const char * topic, const std::string & payload, bool retain); - static void publish_retain(const char * topic, const JsonObject & payload, bool retain); - static void publish_ha(const char * topic, const JsonObject & payload); - static void remove_topic(const char * topic); + static void queue_publish(const std::string & topic, const std::string & payload); + static void queue_publish(const char * topic, const char * payload); + static void queue_publish(const std::string & topic, const JsonObject & payload); + static void queue_publish(const char * topic, const JsonObject & payload); + static void queue_publish(const char * topic, const std::string & payload); + static void queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain); + static void queue_publish_retain(const char * topic, const std::string & payload, bool retain); + static void queue_publish_retain(const char * topic, const JsonObject & payload, bool retain); + static void queue_ha(const char * topic, const JsonObject & payload); + static void queue_remove_topic(const char * topic); static void publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config = false); static void publish_ha_sensor_config(uint8_t type, @@ -263,8 +263,8 @@ class Mqtt { static AsyncMqttClient * mqttClient_; static uint32_t mqtt_message_id_; - static constexpr uint32_t MQTT_PUBLISH_WAIT = 75; // delay in ms between sending publishes, to account for large payloads - static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing + static constexpr uint32_t MQTT_PUBLISH_WAIT = 100; // delay in ms between sending publishes, to account for large payloads + static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing static void queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain); static void queue_publish_message(const std::string & topic, const std::string & payload, bool retain); diff --git a/src/shower.cpp b/src/shower.cpp index ecac7a3ed..d5ad3af12 100644 --- a/src/shower.cpp +++ b/src/shower.cpp @@ -82,7 +82,7 @@ void Shower::loop() { char s[50]; snprintf(s, 50, "%d minutes and %d seconds", (uint8_t)(duration_ / 60000), (uint8_t)((duration_ / 1000) % 60)); doc["duration"] = s; - Mqtt::publish("shower_data", doc.as()); + Mqtt::queue_publish("shower_data", doc.as()); LOG_DEBUG("[Shower] finished with duration %d", duration_); } } @@ -144,7 +144,7 @@ void Shower::set_shower_state(bool state, bool force) { // always publish as a string char s[12]; - Mqtt::publish("shower_active", Helpers::render_boolean(s, shower_state_)); // https://github.com/emsesp/EMS-ESP/issues/369 + Mqtt::queue_publish("shower_active", Helpers::render_boolean(s, shower_state_)); // https://github.com/emsesp/EMS-ESP/issues/369 // send out HA MQTT Discovery config topic if ((Mqtt::ha_enabled()) && (!ha_configdone_ || force)) { @@ -188,7 +188,8 @@ void Shower::set_shower_state(bool state, bool force) { char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; snprintf(topic, sizeof(topic), "binary_sensor/%s/shower_active/config", Mqtt::basename().c_str()); - Mqtt::publish_ha(topic, doc.as()); // publish the config payload with retain flag + + Mqtt::queue_ha(topic, doc.as()); // publish the config payload with retain flag } } diff --git a/src/system.cpp b/src/system.cpp index 7ee311762..fe706cb67 100644 --- a/src/system.cpp +++ b/src/system.cpp @@ -185,9 +185,9 @@ bool System::command_watch(const char * value, const int8_t id) { } if (Mqtt::publish_single() && w != EMSESP::watch()) { if (Mqtt::publish_single2cmd()) { - Mqtt::publish("system/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w])); + Mqtt::queue_publish("system/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w])); } else { - Mqtt::publish("system_data/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w])); + Mqtt::queue_publish("system_data/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w])); } } EMSESP::watch(w); @@ -195,9 +195,9 @@ bool System::command_watch(const char * value, const int8_t id) { } else if (i) { if (Mqtt::publish_single() && i != EMSESP::watch_id()) { if (Mqtt::publish_single2cmd()) { - Mqtt::publish("system/watch", Helpers::hextoa(i)); + Mqtt::queue_publish("system/watch", Helpers::hextoa(i)); } else { - Mqtt::publish("system_data/watch", Helpers::hextoa(i)); + Mqtt::queue_publish("system_data/watch", Helpers::hextoa(i)); } } EMSESP::watch_id(i); @@ -273,21 +273,21 @@ void System::syslog_init() { } if (Mqtt::publish_single()) { if (Mqtt::publish_single2cmd()) { - Mqtt::publish("system/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off"); + Mqtt::queue_publish("system/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off"); if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) { - Mqtt::publish("system/watch", - EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()])); + Mqtt::queue_publish("system/watch", + EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()])); } else { - Mqtt::publish("system/watch", Helpers::hextoa(EMSESP::watch_id())); + Mqtt::queue_publish("system/watch", Helpers::hextoa(EMSESP::watch_id())); } } else { - Mqtt::publish("system_data/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off"); + Mqtt::queue_publish("system_data/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off"); if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) { - Mqtt::publish("system_data/watch", - EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()])); + Mqtt::queue_publish("system_data/watch", + EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()])); } else { - Mqtt::publish("system_data/watch", Helpers::hextoa(EMSESP::watch_id())); + Mqtt::queue_publish("system_data/watch", Helpers::hextoa(EMSESP::watch_id())); } } } @@ -565,7 +565,7 @@ void System::send_info_mqtt(const char * event_str, bool send_ntp) { } } #endif - Mqtt::publish_retain(F_(info), doc.as(), true); // topic called "info" and it's Retained + Mqtt::queue_publish_retain(F_(info), doc.as(), true); // topic called "info" and it's Retained } // create the json for heartbeat @@ -638,7 +638,7 @@ void System::send_heartbeat() { JsonObject json = doc.to(); if (heartbeat_json(json)) { - Mqtt::publish(F_(heartbeat), json); // send to MQTT with retain off. This will add to MQTT queue. + Mqtt::queue_publish(F_(heartbeat), json); // send to MQTT with retain off. This will add to MQTT queue. } } diff --git a/src/test/test.cpp b/src/test/test.cpp index f559ac9dd..9c70310f7 100644 --- a/src/test/test.cpp +++ b/src/test/test.cpp @@ -1472,7 +1472,7 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd, const shell.printfln("Size of JSON payload = %d", jo.memoryUsage()); shell.printfln("Length of JSON payload = %d", measureJson(jo)); - Mqtt::publish("test", jo); + Mqtt::queue_publish("test", jo); Mqtt::show_mqtt(shell); // show queue ok = true; } @@ -1523,7 +1523,7 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd, const strlcpy(system_topic, "ems-esp/system", sizeof(system_topic)); // test publishing - EMSESP::mqtt_.publish(boiler_topic, "test me"); + EMSESP::mqtt_.queue_publish(boiler_topic, "test me"); // test receiving EMSESP::mqtt_.incoming(boiler_topic, ""); // test if ignore empty payloads, should return values