mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-07 00:09:51 +03:00
rename publish functions to queue_* because that is what they do
This commit is contained in:
@@ -385,7 +385,7 @@ void AnalogSensor::publish_sensor(const Sensor & sensor) const {
|
|||||||
snprintf(topic, sizeof(topic), "%s%s/%s", F_(analogsensor), "_data", sensor.name().c_str());
|
snprintf(topic, sizeof(topic), "%s%s/%s", F_(analogsensor), "_data", sensor.name().c_str());
|
||||||
}
|
}
|
||||||
char payload[10];
|
char payload[10];
|
||||||
Mqtt::publish(topic, Helpers::render_value(payload, sensor.value(), 2)); // always publish as doubles
|
Mqtt::queue_publish(topic, Helpers::render_value(payload, sensor.value(), 2)); // always publish as doubles
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -398,7 +398,7 @@ void AnalogSensor::remove_ha_topic(const uint8_t gpio) const {
|
|||||||
LOG_DEBUG("Removing HA config for analog sensor GPIO %02d", gpio);
|
LOG_DEBUG("Removing HA config for analog sensor GPIO %02d", gpio);
|
||||||
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
||||||
snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), gpio);
|
snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), gpio);
|
||||||
Mqtt::remove_topic(topic);
|
Mqtt::queue_remove_topic(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
// send all sensor values as a JSON package to MQTT
|
// send all sensor values as a JSON package to MQTT
|
||||||
@@ -495,14 +495,14 @@ void AnalogSensor::publish_values(const bool force) {
|
|||||||
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
||||||
snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), sensor.gpio());
|
snprintf(topic, sizeof(topic), "sensor/%s/analogsensor_%02d/config", Mqtt::basename().c_str(), sensor.gpio());
|
||||||
|
|
||||||
Mqtt::publish_ha(topic, config.as<JsonObject>());
|
Mqtt::queue_ha(topic, config.as<JsonObject>());
|
||||||
|
|
||||||
sensor.ha_registered = true;
|
sensor.ha_registered = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Mqtt::publish("analogsensor_data", doc.as<JsonObject>());
|
Mqtt::queue_publish("analogsensor_data", doc.as<JsonObject>());
|
||||||
}
|
}
|
||||||
|
|
||||||
// called from emsesp.cpp, similar to the emsdevice->get_value_info
|
// called from emsesp.cpp, similar to the emsdevice->get_value_info
|
||||||
|
|||||||
@@ -447,7 +447,7 @@ void DallasSensor::publish_sensor(const Sensor & sensor) {
|
|||||||
snprintf(topic, sizeof(topic), "%s%s/%s", (F_(dallassensor)), "_data", sensor.name().c_str());
|
snprintf(topic, sizeof(topic), "%s%s/%s", (F_(dallassensor)), "_data", sensor.name().c_str());
|
||||||
}
|
}
|
||||||
char payload[10];
|
char payload[10];
|
||||||
Mqtt::publish(topic, Helpers::render_value(payload, sensor.temperature_c, 10, EMSESP::system_.fahrenheit() ? 2 : 0));
|
Mqtt::queue_publish(topic, Helpers::render_value(payload, sensor.temperature_c, 10, EMSESP::system_.fahrenheit() ? 2 : 0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -462,7 +462,7 @@ void DallasSensor::remove_ha_topic(const std::string & id) {
|
|||||||
std::replace(sensorid.begin(), sensorid.end(), '-', '_');
|
std::replace(sensorid.begin(), sensorid.end(), '-', '_');
|
||||||
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
||||||
snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str());
|
snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str());
|
||||||
Mqtt::remove_topic(topic);
|
Mqtt::queue_remove_topic(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
// send all dallas sensor values as a JSON package to MQTT
|
// send all dallas sensor values as a JSON package to MQTT
|
||||||
@@ -555,14 +555,14 @@ void DallasSensor::publish_values(const bool force) {
|
|||||||
|
|
||||||
snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str());
|
snprintf(topic, sizeof(topic), "sensor/%s/dallassensor_%s/config", Mqtt::basename().c_str(), sensorid.c_str());
|
||||||
|
|
||||||
Mqtt::publish_ha(topic, config.as<JsonObject>());
|
Mqtt::queue_ha(topic, config.as<JsonObject>());
|
||||||
|
|
||||||
sensor.ha_registered = true;
|
sensor.ha_registered = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Mqtt::publish("dallassensor_data", doc.as<JsonObject>());
|
Mqtt::queue_publish("dallassensor_data", doc.as<JsonObject>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -847,7 +847,7 @@ void Boiler::check_active(const bool force) {
|
|||||||
if (heatingActive_ != val || force) {
|
if (heatingActive_ != val || force) {
|
||||||
heatingActive_ = val;
|
heatingActive_ = val;
|
||||||
char s[12];
|
char s[12];
|
||||||
Mqtt::publish(F_(heating_active), Helpers::render_boolean(s, b));
|
Mqtt::queue_publish(F_(heating_active), Helpers::render_boolean(s, b));
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if we can use tapactivated in flow systems
|
// check if we can use tapactivated in flow systems
|
||||||
@@ -871,7 +871,7 @@ void Boiler::check_active(const bool force) {
|
|||||||
if (tapwaterActive_ != val || force) {
|
if (tapwaterActive_ != val || force) {
|
||||||
tapwaterActive_ = val;
|
tapwaterActive_ = val;
|
||||||
char s[12];
|
char s[12];
|
||||||
Mqtt::publish(F_(tapwater_active), Helpers::render_boolean(s, b));
|
Mqtt::queue_publish(F_(tapwater_active), Helpers::render_boolean(s, b));
|
||||||
EMSESP::tap_water_active(b); // let EMS-ESP know, used in the Shower class
|
EMSESP::tap_water_active(b); // let EMS-ESP know, used in the Shower class
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -770,7 +770,7 @@ void EMSdevice::publish_value(void * value_p) const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (payload[0] != '\0') {
|
if (payload[0] != '\0') {
|
||||||
Mqtt::publish(topic, payload);
|
Mqtt::queue_publish(topic, payload);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1639,6 +1639,7 @@ bool EMSdevice::generate_values(JsonObject & output, const uint8_t tag_filter, c
|
|||||||
json[name] = time_value;
|
json[name] = time_value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// commenting out - we don't want Commands in MQTT or Console
|
// commenting out - we don't want Commands in MQTT or Console
|
||||||
// else if (dv.type == DeviceValueType::CMD && output_target != EMSdevice::OUTPUT_TARGET::MQTT) {
|
// else if (dv.type == DeviceValueType::CMD && output_target != EMSdevice::OUTPUT_TARGET::MQTT) {
|
||||||
// json[name] = "";
|
// json[name] = "";
|
||||||
@@ -1670,7 +1671,6 @@ bool EMSdevice::generate_values(JsonObject & output, const uint8_t tag_filter, c
|
|||||||
|
|
||||||
// remove the Home Assistant configs for each device value/entity if its not visible or active or marked as read-only
|
// remove the Home Assistant configs for each device value/entity if its not visible or active or marked as read-only
|
||||||
// this is called when an MQTT publish is done via an EMS Device in emsesp.cpp::publish_device_values()
|
// this is called when an MQTT publish is done via an EMS Device in emsesp.cpp::publish_device_values()
|
||||||
// TODO remove topic remove on cold start
|
|
||||||
void EMSdevice::mqtt_ha_entity_config_remove() {
|
void EMSdevice::mqtt_ha_entity_config_remove() {
|
||||||
for (auto & dv : devicevalues_) {
|
for (auto & dv : devicevalues_) {
|
||||||
if (dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED)
|
if (dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED)
|
||||||
@@ -1681,6 +1681,7 @@ void EMSdevice::mqtt_ha_entity_config_remove() {
|
|||||||
if (dv.short_name == FL_(climate)[0]) {
|
if (dv.short_name == FL_(climate)[0]) {
|
||||||
Mqtt::publish_ha_climate_config(dv.tag, false, true); // delete topic (remove = true)
|
Mqtt::publish_ha_climate_config(dv.tag, false, true); // delete topic (remove = true)
|
||||||
} else {
|
} else {
|
||||||
|
// TODO check if we still need to remove topic on a cold start?
|
||||||
Mqtt::publish_ha_sensor_config(dv, "", "", true); // delete topic (remove = true)
|
Mqtt::publish_ha_sensor_config(dv, "", "", true); // delete topic (remove = true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1696,7 +1697,7 @@ void EMSdevice::mqtt_ha_entity_config_create() {
|
|||||||
// create climate if roomtemp is visible
|
// create climate if roomtemp is visible
|
||||||
// create the discovery topic if if hasn't already been created, not a command (like reset) and is active and visible
|
// create the discovery topic if if hasn't already been created, not a command (like reset) and is active and visible
|
||||||
for (auto & dv : devicevalues_) {
|
for (auto & dv : devicevalues_) {
|
||||||
// TODO removed
|
// TODO remove
|
||||||
// if (dv.has_state(DeviceValueState::DV_HA_CONFIG_RECREATE)) {
|
// if (dv.has_state(DeviceValueState::DV_HA_CONFIG_RECREATE)) {
|
||||||
// dv.remove_state(DeviceValueState::DV_HA_CONFIG_CREATED);
|
// dv.remove_state(DeviceValueState::DV_HA_CONFIG_CREATED);
|
||||||
// dv.remove_state(DeviceValueState::DV_HA_CONFIG_RECREATE);
|
// dv.remove_state(DeviceValueState::DV_HA_CONFIG_RECREATE);
|
||||||
@@ -1730,7 +1731,7 @@ void EMSdevice::mqtt_ha_entity_config_create() {
|
|||||||
// remove all config topics in HA
|
// remove all config topics in HA
|
||||||
void EMSdevice::ha_config_clear() {
|
void EMSdevice::ha_config_clear() {
|
||||||
for (auto & dv : devicevalues_) {
|
for (auto & dv : devicevalues_) {
|
||||||
// dv.add_state(DeviceValueState::DV_HA_CONFIG_RECREATE); // TODO removed
|
// dv.add_state(DeviceValueState::DV_HA_CONFIG_RECREATE); // TODO remove
|
||||||
if (ha_config_firstrun()) {
|
if (ha_config_firstrun()) {
|
||||||
dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); // make sure it is removed if not active
|
dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); // make sure it is removed if not active
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -482,7 +482,7 @@ void EMSESP::publish_all(bool force) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// on command "publish HA" loop and wait between devices for publishing all sensors
|
// loop and wait between devices for publishing all values
|
||||||
void EMSESP::publish_all_loop() {
|
void EMSESP::publish_all_loop() {
|
||||||
if (!Mqtt::connected() || !publish_all_idx_) {
|
if (!Mqtt::connected() || !publish_all_idx_) {
|
||||||
return;
|
return;
|
||||||
@@ -557,17 +557,17 @@ void EMSESP::publish_device_values(uint8_t device_type) {
|
|||||||
// we may have some RETAINED /config topics that reference fields in the data payloads that no longer exist
|
// we may have some RETAINED /config topics that reference fields in the data payloads that no longer exist
|
||||||
// remove them immediately to prevent HA from complaining
|
// remove them immediately to prevent HA from complaining
|
||||||
// we need to do this first before the data payload is published, and only done once!
|
// we need to do this first before the data payload is published, and only done once!
|
||||||
if (emsdevice->ha_config_firstrun()) {
|
// TODO remove
|
||||||
emsdevice->ha_config_clear();
|
// if (emsdevice->ha_config_firstrun()) {
|
||||||
emsdevice->ha_config_firstrun(false);
|
// emsdevice->ha_config_clear();
|
||||||
return;
|
// emsdevice->ha_config_firstrun(false);
|
||||||
} else {
|
// return;
|
||||||
|
// } else {
|
||||||
// see if we need to delete and /config topics before adding the payloads
|
// see if we need to delete and /config topics before adding the payloads
|
||||||
emsdevice->mqtt_ha_entity_config_remove();
|
emsdevice->mqtt_ha_entity_config_remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
for (uint8_t tag = DeviceValueTAG::TAG_BOILER_DATA_WW; tag <= DeviceValueTAG::TAG_HS16; tag++) {
|
for (uint8_t tag = DeviceValueTAG::TAG_BOILER_DATA_WW; tag <= DeviceValueTAG::TAG_HS16; tag++) {
|
||||||
JsonObject json_hc = json;
|
JsonObject json_hc = json;
|
||||||
bool nest_created = false;
|
bool nest_created = false;
|
||||||
@@ -581,7 +581,7 @@ void EMSESP::publish_device_values(uint8_t device_type) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (need_publish && ((!nested && tag >= DeviceValueTAG::TAG_DEVICE_DATA_WW) || (tag == DeviceValueTAG::TAG_BOILER_DATA_WW))) {
|
if (need_publish && ((!nested && tag >= DeviceValueTAG::TAG_DEVICE_DATA_WW) || (tag == DeviceValueTAG::TAG_BOILER_DATA_WW))) {
|
||||||
Mqtt::publish(Mqtt::tag_to_topic(device_type, tag), json);
|
Mqtt::queue_publish(Mqtt::tag_to_topic(device_type, tag), json);
|
||||||
json = doc.to<JsonObject>();
|
json = doc.to<JsonObject>();
|
||||||
need_publish = false;
|
need_publish = false;
|
||||||
}
|
}
|
||||||
@@ -590,7 +590,7 @@ void EMSESP::publish_device_values(uint8_t device_type) {
|
|||||||
if (doc.overflowed()) {
|
if (doc.overflowed()) {
|
||||||
LOG_WARNING("MQTT buffer overflow, please use individual topics");
|
LOG_WARNING("MQTT buffer overflow, please use individual topics");
|
||||||
}
|
}
|
||||||
Mqtt::publish(Mqtt::tag_to_topic(device_type, DeviceValueTAG::TAG_NONE), json);
|
Mqtt::queue_publish(Mqtt::tag_to_topic(device_type, DeviceValueTAG::TAG_NONE), json);
|
||||||
}
|
}
|
||||||
|
|
||||||
// we want to create the /config topic after the data payload to prevent HA from throwing up a warning
|
// we want to create the /config topic after the data payload to prevent HA from throwing up a warning
|
||||||
@@ -598,6 +598,7 @@ void EMSESP::publish_device_values(uint8_t device_type) {
|
|||||||
for (const auto & emsdevice : emsdevices) {
|
for (const auto & emsdevice : emsdevices) {
|
||||||
if (emsdevice && (emsdevice->device_type() == device_type)) {
|
if (emsdevice && (emsdevice->device_type() == device_type)) {
|
||||||
emsdevice->mqtt_ha_entity_config_create();
|
emsdevice->mqtt_ha_entity_config_create();
|
||||||
|
// EMSESP::mqtt_.loop(); // TODO experimental
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -651,7 +652,7 @@ void EMSESP::publish_response(std::shared_ptr<const Telegram> telegram) {
|
|||||||
doc["value"] = value;
|
doc["value"] = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
Mqtt::publish("response", doc.as<JsonObject>());
|
Mqtt::queue_publish("response", doc.as<JsonObject>());
|
||||||
}
|
}
|
||||||
|
|
||||||
// builds json with the detail of each value, for a specific EMS device type or the dallas sensor
|
// builds json with the detail of each value, for a specific EMS device type or the dallas sensor
|
||||||
|
|||||||
73
src/mqtt.cpp
73
src/mqtt.cpp
@@ -301,7 +301,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons
|
|||||||
if (!(mf.mqtt_subfunction_)(message)) {
|
if (!(mf.mqtt_subfunction_)(message)) {
|
||||||
LOG_ERROR("error: invalid payload %s for this topic %s", message, topic);
|
LOG_ERROR("error: invalid payload %s for this topic %s", message, topic);
|
||||||
if (send_response_) {
|
if (send_response_) {
|
||||||
Mqtt::publish("response", "error: invalid data");
|
Mqtt::queue_publish("response", "error: invalid data");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -336,12 +336,12 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons
|
|||||||
}
|
}
|
||||||
LOG_ERROR(error);
|
LOG_ERROR(error);
|
||||||
if (send_response_) {
|
if (send_response_) {
|
||||||
Mqtt::publish("response", error);
|
Mqtt::queue_publish("response", error);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// all good, send back json output from call
|
// all good, send back json output from call
|
||||||
if (send_response_) {
|
if (send_response_) {
|
||||||
Mqtt::publish("response", output);
|
Mqtt::queue_publish("response", output);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -589,7 +589,7 @@ void Mqtt::on_connect() {
|
|||||||
resubscribe();
|
resubscribe();
|
||||||
|
|
||||||
// publish to the last will topic (see Mqtt::start() function) to say we're alive
|
// publish to the last will topic (see Mqtt::start() function) to say we're alive
|
||||||
publish_retain("status", "online", true); // with retain on
|
queue_publish_retain("status", "online", true); // with retain on
|
||||||
|
|
||||||
mqtt_publish_fails_ = 0; // reset fail count to 0
|
mqtt_publish_fails_ = 0; // reset fail count to 0
|
||||||
|
|
||||||
@@ -643,7 +643,7 @@ void Mqtt::ha_status() {
|
|||||||
|
|
||||||
char topic[MQTT_TOPIC_MAX_SIZE];
|
char topic[MQTT_TOPIC_MAX_SIZE];
|
||||||
snprintf(topic, sizeof(topic), "binary_sensor/%s/system_status/config", mqtt_basename_.c_str());
|
snprintf(topic, sizeof(topic), "binary_sensor/%s/system_status/config", mqtt_basename_.c_str());
|
||||||
Mqtt::publish_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
Mqtt::queue_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
||||||
|
|
||||||
// create the sensors - must match the MQTT payload keys
|
// create the sensors - must match the MQTT payload keys
|
||||||
// these are all from the heartbeat MQTT topic
|
// these are all from the heartbeat MQTT topic
|
||||||
@@ -673,6 +673,17 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifndef EMSESP_STANDALONE
|
||||||
|
// anything below 65MB available free heap is dangerously low, so we stop adding to prevent a crash
|
||||||
|
// instead of doing a mqtt_messages_.pop_front()
|
||||||
|
auto free_heap = ESP.getFreeHeap() / 1024;
|
||||||
|
if (free_heap < 65) {
|
||||||
|
LOG_WARNING("Queue overflow (size %d, heap=%d)", mqtt_messages_.size(), free_heap);
|
||||||
|
mqtt_publish_fails_++;
|
||||||
|
return; // don't add to top of queue
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// take the topic and prefix the base, unless its for HA
|
// take the topic and prefix the base, unless its for HA
|
||||||
std::shared_ptr<MqttMessage> message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
|
std::shared_ptr<MqttMessage> message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
|
||||||
|
|
||||||
@@ -686,21 +697,6 @@ void Mqtt::queue_message(const uint8_t operation, const std::string & topic, con
|
|||||||
LOG_DEBUG("Adding to queue: (subscribe) topic='%s'", message->topic.c_str());
|
LOG_DEBUG("Adding to queue: (subscribe) topic='%s'", message->topic.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef EMSESP_STANDALONE
|
|
||||||
// TODO to look at with @MichaelDvP ...
|
|
||||||
// TODO also reduce the time to process the queue so it empties quicker? I changed MQTT_PUBLISH_WAIT from 100 to 75
|
|
||||||
// TODO or call process_queue() to process the front of queue immediately?
|
|
||||||
// TODO because it takes 10 seconds (default publish interval) before the queue gets published
|
|
||||||
// TODO and does returning with mqtt_messages_.pop_front() have any negative side affects?
|
|
||||||
|
|
||||||
// anything below 65MB available free heap is dangerously low
|
|
||||||
if (ESP.getFreeHeap() < (65 * 1024)) {
|
|
||||||
LOG_WARNING("Queue overflow (size %d)", mqtt_messages_.size());
|
|
||||||
mqtt_publish_fails_++;
|
|
||||||
return; // don't add to top of queue
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
|
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@@ -725,40 +721,40 @@ void Mqtt::queue_unsubscribe_message(const std::string & topic) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MQTT Publish, using a user's retain flag
|
// MQTT Publish, using a user's retain flag
|
||||||
void Mqtt::publish(const std::string & topic, const std::string & payload) {
|
void Mqtt::queue_publish(const std::string & topic, const std::string & payload) {
|
||||||
queue_publish_message(topic, payload, mqtt_retain_);
|
queue_publish_message(topic, payload, mqtt_retain_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// MQTT Publish, using a user's retain flag - except for char * strings
|
// MQTT Publish, using a user's retain flag - except for char * strings
|
||||||
void Mqtt::publish(const char * topic, const char * payload) {
|
void Mqtt::queue_publish(const char * topic, const char * payload) {
|
||||||
queue_publish_message((topic), payload, mqtt_retain_);
|
queue_publish_message((topic), payload, mqtt_retain_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// MQTT Publish, using a specific retain flag, topic is a flash string
|
// MQTT Publish, using a specific retain flag, topic is a flash string
|
||||||
void Mqtt::publish(const char * topic, const std::string & payload) {
|
void Mqtt::queue_publish(const char * topic, const std::string & payload) {
|
||||||
queue_publish_message((topic), payload, mqtt_retain_);
|
queue_publish_message((topic), payload, mqtt_retain_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Mqtt::publish(const char * topic, const JsonObject & payload) {
|
void Mqtt::queue_publish(const char * topic, const JsonObject & payload) {
|
||||||
publish_retain(topic, payload, mqtt_retain_);
|
queue_publish_retain(topic, payload, mqtt_retain_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish json doc, only if its not empty
|
// publish json doc, only if its not empty
|
||||||
void Mqtt::publish(const std::string & topic, const JsonObject & payload) {
|
void Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) {
|
||||||
publish_retain(topic, payload, mqtt_retain_);
|
queue_publish_retain(topic, payload, mqtt_retain_);
|
||||||
}
|
}
|
||||||
|
|
||||||
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
|
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
|
||||||
void Mqtt::publish_retain(const char * topic, const std::string & payload, bool retain) {
|
void Mqtt::queue_publish_retain(const char * topic, const std::string & payload, bool retain) {
|
||||||
queue_publish_message((topic), payload, retain);
|
queue_publish_message((topic), payload, retain);
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish json doc, only if its not empty, using the retain flag
|
// publish json doc, only if its not empty, using the retain flag
|
||||||
void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
|
void Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
|
||||||
publish_retain(topic.c_str(), payload, retain);
|
queue_publish_retain(topic.c_str(), payload, retain);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Mqtt::publish_retain(const char * topic, const JsonObject & payload, bool retain) {
|
void Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, bool retain) {
|
||||||
if (enabled() && payload.size()) {
|
if (enabled() && payload.size()) {
|
||||||
std::string payload_text;
|
std::string payload_text;
|
||||||
serializeJson(payload, payload_text); // convert json to string
|
serializeJson(payload, payload_text); // convert json to string
|
||||||
@@ -767,7 +763,7 @@ void Mqtt::publish_retain(const char * topic, const JsonObject & payload, bool r
|
|||||||
}
|
}
|
||||||
|
|
||||||
// publish empty payload to remove the topic
|
// publish empty payload to remove the topic
|
||||||
void Mqtt::remove_topic(const char * topic) {
|
void Mqtt::queue_remove_topic(const char * topic) {
|
||||||
if (!enabled()) {
|
if (!enabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -781,8 +777,8 @@ void Mqtt::remove_topic(const char * topic) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish a Home Assistant config topic and payload, with retain flag off.
|
// queue a Home Assistant config topic and payload, with retain flag off.
|
||||||
void Mqtt::publish_ha(const char * topic, const JsonObject & payload) {
|
void Mqtt::queue_ha(const char * topic, const JsonObject & payload) {
|
||||||
if (!enabled()) {
|
if (!enabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -863,6 +859,7 @@ void Mqtt::process_queue() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// else try and publish it
|
// else try and publish it
|
||||||
|
// this is where the *real* publish happens
|
||||||
uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
|
uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
|
||||||
lasttopic_ = topic;
|
lasttopic_ = topic;
|
||||||
lastpayload_ = message->payload;
|
lastpayload_ = message->payload;
|
||||||
@@ -1074,7 +1071,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
|
|||||||
// https://github.com/emsesp/EMS-ESP32/issues/196
|
// https://github.com/emsesp/EMS-ESP32/issues/196
|
||||||
if (remove) {
|
if (remove) {
|
||||||
LOG_DEBUG("Removing HA config for %s", uniq_id);
|
LOG_DEBUG("Removing HA config for %s", uniq_id);
|
||||||
remove_topic(topic);
|
queue_remove_topic(topic);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1304,7 +1301,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
|
|||||||
// add "availability" section
|
// add "availability" section
|
||||||
add_avty_to_doc(stat_t, doc.as<JsonObject>(), val_cond);
|
add_avty_to_doc(stat_t, doc.as<JsonObject>(), val_cond);
|
||||||
|
|
||||||
publish_ha(topic, doc.as<JsonObject>());
|
queue_ha(topic, doc.as<JsonObject>());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) {
|
void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) {
|
||||||
@@ -1328,7 +1325,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
|
|||||||
|
|
||||||
snprintf(topic, sizeof(topic), "climate/%s/thermostat_hc%d/config", mqtt_basename_.c_str(), hc_num);
|
snprintf(topic, sizeof(topic), "climate/%s/thermostat_hc%d/config", mqtt_basename_.c_str(), hc_num);
|
||||||
if (remove) {
|
if (remove) {
|
||||||
remove_topic(topic); // publish empty payload with retain flag
|
queue_remove_topic(topic); // publish empty payload with retain flag
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1413,7 +1410,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
|
|||||||
// add "availability" section
|
// add "availability" section
|
||||||
add_avty_to_doc(topic_t, doc.as<JsonObject>(), seltemp_cond, has_roomtemp ? currtemp_cond : nullptr, hc_mode_cond);
|
add_avty_to_doc(topic_t, doc.as<JsonObject>(), seltemp_cond, has_roomtemp ? currtemp_cond : nullptr, hc_mode_cond);
|
||||||
|
|
||||||
publish_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
queue_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
||||||
}
|
}
|
||||||
|
|
||||||
// based on the device and tag, create the MQTT topic name (without the basename)
|
// based on the device and tag, create the MQTT topic name (without the basename)
|
||||||
|
|||||||
22
src/mqtt.h
22
src/mqtt.h
@@ -75,16 +75,16 @@ class Mqtt {
|
|||||||
static void subscribe(const std::string & topic);
|
static void subscribe(const std::string & topic);
|
||||||
static void resubscribe();
|
static void resubscribe();
|
||||||
|
|
||||||
static void publish(const std::string & topic, const std::string & payload);
|
static void queue_publish(const std::string & topic, const std::string & payload);
|
||||||
static void publish(const char * topic, const char * payload);
|
static void queue_publish(const char * topic, const char * payload);
|
||||||
static void publish(const std::string & topic, const JsonObject & payload);
|
static void queue_publish(const std::string & topic, const JsonObject & payload);
|
||||||
static void publish(const char * topic, const JsonObject & payload);
|
static void queue_publish(const char * topic, const JsonObject & payload);
|
||||||
static void publish(const char * topic, const std::string & payload);
|
static void queue_publish(const char * topic, const std::string & payload);
|
||||||
static void publish_retain(const std::string & topic, const JsonObject & payload, bool retain);
|
static void queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain);
|
||||||
static void publish_retain(const char * topic, const std::string & payload, bool retain);
|
static void queue_publish_retain(const char * topic, const std::string & payload, bool retain);
|
||||||
static void publish_retain(const char * topic, const JsonObject & payload, bool retain);
|
static void queue_publish_retain(const char * topic, const JsonObject & payload, bool retain);
|
||||||
static void publish_ha(const char * topic, const JsonObject & payload);
|
static void queue_ha(const char * topic, const JsonObject & payload);
|
||||||
static void remove_topic(const char * topic);
|
static void queue_remove_topic(const char * topic);
|
||||||
|
|
||||||
static void publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config = false);
|
static void publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config = false);
|
||||||
static void publish_ha_sensor_config(uint8_t type,
|
static void publish_ha_sensor_config(uint8_t type,
|
||||||
@@ -263,7 +263,7 @@ class Mqtt {
|
|||||||
static AsyncMqttClient * mqttClient_;
|
static AsyncMqttClient * mqttClient_;
|
||||||
static uint32_t mqtt_message_id_;
|
static uint32_t mqtt_message_id_;
|
||||||
|
|
||||||
static constexpr uint32_t MQTT_PUBLISH_WAIT = 75; // delay in ms between sending publishes, to account for large payloads
|
static constexpr uint32_t MQTT_PUBLISH_WAIT = 100; // delay in ms between sending publishes, to account for large payloads
|
||||||
static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing
|
static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing
|
||||||
|
|
||||||
static void queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain);
|
static void queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain);
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ void Shower::loop() {
|
|||||||
char s[50];
|
char s[50];
|
||||||
snprintf(s, 50, "%d minutes and %d seconds", (uint8_t)(duration_ / 60000), (uint8_t)((duration_ / 1000) % 60));
|
snprintf(s, 50, "%d minutes and %d seconds", (uint8_t)(duration_ / 60000), (uint8_t)((duration_ / 1000) % 60));
|
||||||
doc["duration"] = s;
|
doc["duration"] = s;
|
||||||
Mqtt::publish("shower_data", doc.as<JsonObject>());
|
Mqtt::queue_publish("shower_data", doc.as<JsonObject>());
|
||||||
LOG_DEBUG("[Shower] finished with duration %d", duration_);
|
LOG_DEBUG("[Shower] finished with duration %d", duration_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -144,7 +144,7 @@ void Shower::set_shower_state(bool state, bool force) {
|
|||||||
|
|
||||||
// always publish as a string
|
// always publish as a string
|
||||||
char s[12];
|
char s[12];
|
||||||
Mqtt::publish("shower_active", Helpers::render_boolean(s, shower_state_)); // https://github.com/emsesp/EMS-ESP/issues/369
|
Mqtt::queue_publish("shower_active", Helpers::render_boolean(s, shower_state_)); // https://github.com/emsesp/EMS-ESP/issues/369
|
||||||
|
|
||||||
// send out HA MQTT Discovery config topic
|
// send out HA MQTT Discovery config topic
|
||||||
if ((Mqtt::ha_enabled()) && (!ha_configdone_ || force)) {
|
if ((Mqtt::ha_enabled()) && (!ha_configdone_ || force)) {
|
||||||
@@ -188,7 +188,8 @@ void Shower::set_shower_state(bool state, bool force) {
|
|||||||
|
|
||||||
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
|
||||||
snprintf(topic, sizeof(topic), "binary_sensor/%s/shower_active/config", Mqtt::basename().c_str());
|
snprintf(topic, sizeof(topic), "binary_sensor/%s/shower_active/config", Mqtt::basename().c_str());
|
||||||
Mqtt::publish_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
|
||||||
|
Mqtt::queue_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -185,9 +185,9 @@ bool System::command_watch(const char * value, const int8_t id) {
|
|||||||
}
|
}
|
||||||
if (Mqtt::publish_single() && w != EMSESP::watch()) {
|
if (Mqtt::publish_single() && w != EMSESP::watch()) {
|
||||||
if (Mqtt::publish_single2cmd()) {
|
if (Mqtt::publish_single2cmd()) {
|
||||||
Mqtt::publish("system/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w]));
|
Mqtt::queue_publish("system/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w]));
|
||||||
} else {
|
} else {
|
||||||
Mqtt::publish("system_data/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w]));
|
Mqtt::queue_publish("system_data/watch", EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(w) : (FL_(list_watch)[w]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EMSESP::watch(w);
|
EMSESP::watch(w);
|
||||||
@@ -195,9 +195,9 @@ bool System::command_watch(const char * value, const int8_t id) {
|
|||||||
} else if (i) {
|
} else if (i) {
|
||||||
if (Mqtt::publish_single() && i != EMSESP::watch_id()) {
|
if (Mqtt::publish_single() && i != EMSESP::watch_id()) {
|
||||||
if (Mqtt::publish_single2cmd()) {
|
if (Mqtt::publish_single2cmd()) {
|
||||||
Mqtt::publish("system/watch", Helpers::hextoa(i));
|
Mqtt::queue_publish("system/watch", Helpers::hextoa(i));
|
||||||
} else {
|
} else {
|
||||||
Mqtt::publish("system_data/watch", Helpers::hextoa(i));
|
Mqtt::queue_publish("system_data/watch", Helpers::hextoa(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EMSESP::watch_id(i);
|
EMSESP::watch_id(i);
|
||||||
@@ -273,21 +273,21 @@ void System::syslog_init() {
|
|||||||
}
|
}
|
||||||
if (Mqtt::publish_single()) {
|
if (Mqtt::publish_single()) {
|
||||||
if (Mqtt::publish_single2cmd()) {
|
if (Mqtt::publish_single2cmd()) {
|
||||||
Mqtt::publish("system/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off");
|
Mqtt::queue_publish("system/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off");
|
||||||
if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) {
|
if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) {
|
||||||
Mqtt::publish("system/watch",
|
Mqtt::queue_publish("system/watch",
|
||||||
EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()]));
|
EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()]));
|
||||||
} else {
|
} else {
|
||||||
Mqtt::publish("system/watch", Helpers::hextoa(EMSESP::watch_id()));
|
Mqtt::queue_publish("system/watch", Helpers::hextoa(EMSESP::watch_id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
Mqtt::publish("system_data/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off");
|
Mqtt::queue_publish("system_data/syslog", syslog_enabled_ ? (FL_(list_syslog_level)[syslog_level_ + 1]) : "off");
|
||||||
if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) {
|
if (EMSESP::watch_id() == 0 || EMSESP::watch() == 0) {
|
||||||
Mqtt::publish("system_data/watch",
|
Mqtt::queue_publish("system_data/watch",
|
||||||
EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()]));
|
EMSESP::system_.enum_format() == ENUM_FORMAT_INDEX ? Helpers::itoa(EMSESP::watch()) : (FL_(list_watch)[EMSESP::watch()]));
|
||||||
} else {
|
} else {
|
||||||
Mqtt::publish("system_data/watch", Helpers::hextoa(EMSESP::watch_id()));
|
Mqtt::queue_publish("system_data/watch", Helpers::hextoa(EMSESP::watch_id()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -565,7 +565,7 @@ void System::send_info_mqtt(const char * event_str, bool send_ntp) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
Mqtt::publish_retain(F_(info), doc.as<JsonObject>(), true); // topic called "info" and it's Retained
|
Mqtt::queue_publish_retain(F_(info), doc.as<JsonObject>(), true); // topic called "info" and it's Retained
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the json for heartbeat
|
// create the json for heartbeat
|
||||||
@@ -638,7 +638,7 @@ void System::send_heartbeat() {
|
|||||||
JsonObject json = doc.to<JsonObject>();
|
JsonObject json = doc.to<JsonObject>();
|
||||||
|
|
||||||
if (heartbeat_json(json)) {
|
if (heartbeat_json(json)) {
|
||||||
Mqtt::publish(F_(heartbeat), json); // send to MQTT with retain off. This will add to MQTT queue.
|
Mqtt::queue_publish(F_(heartbeat), json); // send to MQTT with retain off. This will add to MQTT queue.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1472,7 +1472,7 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd, const
|
|||||||
shell.printfln("Size of JSON payload = %d", jo.memoryUsage());
|
shell.printfln("Size of JSON payload = %d", jo.memoryUsage());
|
||||||
shell.printfln("Length of JSON payload = %d", measureJson(jo));
|
shell.printfln("Length of JSON payload = %d", measureJson(jo));
|
||||||
|
|
||||||
Mqtt::publish("test", jo);
|
Mqtt::queue_publish("test", jo);
|
||||||
Mqtt::show_mqtt(shell); // show queue
|
Mqtt::show_mqtt(shell); // show queue
|
||||||
ok = true;
|
ok = true;
|
||||||
}
|
}
|
||||||
@@ -1523,7 +1523,7 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & cmd, const
|
|||||||
strlcpy(system_topic, "ems-esp/system", sizeof(system_topic));
|
strlcpy(system_topic, "ems-esp/system", sizeof(system_topic));
|
||||||
|
|
||||||
// test publishing
|
// test publishing
|
||||||
EMSESP::mqtt_.publish(boiler_topic, "test me");
|
EMSESP::mqtt_.queue_publish(boiler_topic, "test me");
|
||||||
|
|
||||||
// test receiving
|
// test receiving
|
||||||
EMSESP::mqtt_.incoming(boiler_topic, ""); // test if ignore empty payloads, should return values
|
EMSESP::mqtt_.incoming(boiler_topic, ""); // test if ignore empty payloads, should return values
|
||||||
|
|||||||
Reference in New Issue
Block a user