mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 15:59:52 +03:00
Mqtt: remove all HA if not active, timeout QoS, option single2cmd
This commit is contained in:
122
src/mqtt.cpp
122
src/mqtt.cpp
@@ -41,6 +41,7 @@ uint8_t Mqtt::nested_format_;
|
||||
std::string Mqtt::discovery_prefix_;
|
||||
bool Mqtt::send_response_;
|
||||
bool Mqtt::publish_single_;
|
||||
bool Mqtt::publish_single2cmd_;
|
||||
|
||||
std::deque<Mqtt::QueuedMqttMessage> Mqtt::mqtt_messages_;
|
||||
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
|
||||
@@ -264,6 +265,14 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
|
||||
LOG_DEBUG(F("Received topic `%s`"), topic);
|
||||
}
|
||||
#endif
|
||||
// remove HA topics if we don't use discovery
|
||||
if (strncmp(topic, discovery_prefix().c_str(), discovery_prefix().size()) == 0) {
|
||||
if (!ha_enabled_ && len) { // don't ping pong the empty message
|
||||
queue_publish_message(topic, "", true);
|
||||
LOG_DEBUG(F("Remove topic %s"), topic);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// check first againts any of our subscribed topics
|
||||
for (const auto & mf : mqtt_subfunctions_) {
|
||||
@@ -290,7 +299,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
|
||||
// convert payload into a json doc
|
||||
// if the payload doesn't not contain the key 'value' or 'data', treat the whole payload as the 'value'
|
||||
if (len != 0) {
|
||||
DeserializationError error = deserializeJson(input_doc, message);
|
||||
DeserializationError error = deserializeJson(input_doc, (const char *)message);
|
||||
if ((!input_doc.containsKey("value") && !input_doc.containsKey("data")) || error) {
|
||||
input_doc.clear();
|
||||
input_doc["value"] = (const char *)message; // always a string
|
||||
@@ -387,15 +396,16 @@ void Mqtt::reset_mqtt() {
|
||||
|
||||
void Mqtt::load_settings() {
|
||||
EMSESP::esp8266React.getMqttSettingsService()->read([&](MqttSettings & mqttSettings) {
|
||||
mqtt_base_ = mqttSettings.base.c_str(); // Convert String to std::string
|
||||
mqtt_qos_ = mqttSettings.mqtt_qos;
|
||||
mqtt_retain_ = mqttSettings.mqtt_retain;
|
||||
mqtt_enabled_ = mqttSettings.enabled;
|
||||
ha_enabled_ = mqttSettings.ha_enabled;
|
||||
nested_format_ = mqttSettings.nested_format;
|
||||
publish_single_ = mqttSettings.publish_single;
|
||||
send_response_ = mqttSettings.send_response;
|
||||
discovery_prefix_ = mqttSettings.discovery_prefix.c_str();
|
||||
mqtt_base_ = mqttSettings.base.c_str(); // Convert String to std::string
|
||||
mqtt_qos_ = mqttSettings.mqtt_qos;
|
||||
mqtt_retain_ = mqttSettings.mqtt_retain;
|
||||
mqtt_enabled_ = mqttSettings.enabled;
|
||||
ha_enabled_ = mqttSettings.ha_enabled;
|
||||
nested_format_ = mqttSettings.nested_format;
|
||||
publish_single_ = mqttSettings.publish_single;
|
||||
publish_single2cmd_ = mqttSettings.publish_single2cmd;
|
||||
send_response_ = mqttSettings.send_response;
|
||||
discovery_prefix_ = mqttSettings.discovery_prefix.c_str();
|
||||
|
||||
// convert to milliseconds
|
||||
publish_time_boiler_ = mqttSettings.publish_time_boiler * 1000;
|
||||
@@ -444,14 +454,6 @@ void Mqtt::start() {
|
||||
if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
|
||||
LOG_INFO(F("MQTT disconnected: Not authorized"));
|
||||
}
|
||||
// remove message with pending ack
|
||||
if (!mqtt_messages_.empty()) {
|
||||
auto mqtt_message = mqtt_messages_.front();
|
||||
if (mqtt_message.packet_id_ != 0) {
|
||||
mqtt_messages_.pop_front();
|
||||
}
|
||||
}
|
||||
// mqtt_messages_.clear();
|
||||
});
|
||||
|
||||
// create will_topic with the base prefixed. It has to be static because asyncmqttclient destroys the reference
|
||||
@@ -570,9 +572,23 @@ void Mqtt::on_connect() {
|
||||
#endif
|
||||
publish(F_(info), doc.as<JsonObject>()); // topic called "info"
|
||||
|
||||
// create the EMS-ESP device in HA, which is MQTT retained
|
||||
if (ha_enabled()) {
|
||||
ha_status();
|
||||
if (ha_enabled_) {
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/climate/" + mqtt_base_ + "/#");
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/sensor/" + mqtt_base_ + "/#");
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/binary_sensor/" + mqtt_base_ + "/#");
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/number/" + mqtt_base_ + "/#");
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/select/" + mqtt_base_ + "/#");
|
||||
queue_unsubscribe_message(discovery_prefix_ + "/switch/" + mqtt_base_ + "/#");
|
||||
EMSESP::reset_mqtt_ha(); // re-create all HA devices if there are any
|
||||
ha_status(); // create the EMS-ESP device in HA, which is MQTT retained
|
||||
} else {
|
||||
queue_subscribe_message(discovery_prefix_ + "/climate/" + mqtt_base_ + "/#");
|
||||
queue_subscribe_message(discovery_prefix_ + "/sensor/" + mqtt_base_ + "/#");
|
||||
queue_subscribe_message(discovery_prefix_ + "/binary_sensor/" + mqtt_base_ + "/#");
|
||||
queue_subscribe_message(discovery_prefix_ + "/number/" + mqtt_base_ + "/#");
|
||||
queue_subscribe_message(discovery_prefix_ + "/select/" + mqtt_base_ + "/#");
|
||||
queue_subscribe_message(discovery_prefix_ + "/switch/" + mqtt_base_ + "/#");
|
||||
LOG_INFO(F("start removing topics %s/+/%s/#"), discovery_prefix_.c_str(), mqtt_base_.c_str());
|
||||
}
|
||||
|
||||
// send initial MQTT messages for some of our services
|
||||
@@ -582,8 +598,6 @@ void Mqtt::on_connect() {
|
||||
// re-subscribe to all custom registered MQTT topics
|
||||
resubscribe();
|
||||
|
||||
EMSESP::reset_mqtt_ha(); // re-create all HA devices if there are any
|
||||
|
||||
publish_retain(F("status"), "online", true); // say we're alive to the Last Will topic, with retain on
|
||||
|
||||
mqtt_publish_fails_ = 0; // reset fail count to 0
|
||||
@@ -669,6 +683,7 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,
|
||||
if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) {
|
||||
mqtt_messages_.pop_front();
|
||||
LOG_WARNING(F("Queue overflow, removing one message"));
|
||||
mqtt_publish_fails_++;
|
||||
}
|
||||
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
|
||||
|
||||
@@ -688,6 +703,11 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_subscribe_message(const std::stri
|
||||
return queue_message(Operation::SUBSCRIBE, topic, "", false); // no payload
|
||||
}
|
||||
|
||||
// add MQTT unsubscribe message to queue
|
||||
std::shared_ptr<const MqttMessage> Mqtt::queue_unsubscribe_message(const std::string & topic) {
|
||||
return queue_message(Operation::UNSUBSCRIBE, topic, "", false); // no payload
|
||||
}
|
||||
|
||||
// MQTT Publish, using a user's retain flag
|
||||
void Mqtt::publish(const std::string & topic, const std::string & payload) {
|
||||
queue_publish_message(topic, payload, mqtt_retain_);
|
||||
@@ -712,11 +732,6 @@ void Mqtt::publish(const std::string & topic, const JsonObject & payload) {
|
||||
publish_retain(topic, payload, mqtt_retain_);
|
||||
}
|
||||
|
||||
// no payload
|
||||
void Mqtt::publish(const std::string & topic) {
|
||||
queue_publish_message(topic, "", false);
|
||||
}
|
||||
|
||||
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
|
||||
void Mqtt::publish_retain(const __FlashStringHelper * topic, const std::string & payload, bool retain) {
|
||||
queue_publish_message(read_flash_string(topic), payload, retain);
|
||||
@@ -750,7 +765,7 @@ void Mqtt::publish_ha(const std::string & topic) {
|
||||
LOG_DEBUG(F("[DEBUG] Publishing empty HA topic=%s"), fulltopic.c_str());
|
||||
#endif
|
||||
|
||||
publish(fulltopic);
|
||||
queue_publish_message(fulltopic, "", true); // publish with retain to remove from broker
|
||||
}
|
||||
|
||||
// publish a Home Assistant config topic and payload, with retain flag off.
|
||||
@@ -792,12 +807,29 @@ void Mqtt::process_queue() {
|
||||
snprintf(topic, MQTT_TOPIC_MAX_SIZE, "%s/%s", mqtt_base_.c_str(), message->topic.c_str());
|
||||
}
|
||||
|
||||
// if this has already been published and we're waiting for an ACK, don't publish again
|
||||
// it will have a real packet ID
|
||||
if (mqtt_message.packet_id_ > 0) {
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] Waiting for QOS-ACK"));
|
||||
#endif
|
||||
// if we don't get the ack within 10 minutes, republish with new packet_id
|
||||
if (uuid::get_uptime_sec() - last_publish_queue_ < 600) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
last_publish_queue_ = uuid::get_uptime_sec();
|
||||
|
||||
// if we're subscribing...
|
||||
if (message->operation == Operation::SUBSCRIBE) {
|
||||
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
|
||||
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
|
||||
if (!packet_id) {
|
||||
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
|
||||
return;
|
||||
}
|
||||
LOG_ERROR(F("Error subscribing to topic '%s'"), topic);
|
||||
mqtt_publish_fails_++; // increment failure counter
|
||||
}
|
||||
|
||||
mqtt_messages_.pop_front(); // remove the message from the queue
|
||||
@@ -805,12 +837,20 @@ void Mqtt::process_queue() {
|
||||
return;
|
||||
}
|
||||
|
||||
// if this has already been published and we're waiting for an ACK, don't publish again
|
||||
// it will have a real packet ID
|
||||
if (mqtt_message.packet_id_ > 0) {
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] Waiting for QOS-ACK"));
|
||||
#endif
|
||||
// if we're unsubscribing...
|
||||
if (message->operation == Operation::UNSUBSCRIBE) {
|
||||
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
|
||||
uint16_t packet_id = mqttClient_->unsubscribe(topic);
|
||||
if (!packet_id) {
|
||||
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
|
||||
return;
|
||||
}
|
||||
LOG_ERROR(F("Error unsubscribing to topic '%s'"), topic);
|
||||
mqtt_publish_fails_++; // increment failure counter
|
||||
}
|
||||
|
||||
mqtt_messages_.pop_front(); // remove the message from the queue
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -992,8 +1032,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
return;
|
||||
}
|
||||
|
||||
bool have_tag = !EMSdevice::tag_to_string(tag).empty();
|
||||
bool is_nested = (nested_format_ == 1); // nested_format is 1 if nested, otherwise 2 for single topics
|
||||
bool have_tag = !EMSdevice::tag_to_string(tag).empty();
|
||||
|
||||
// build the payload
|
||||
DynamicJsonDocument doc(EMSESP_JSON_SIZE_HA_CONFIG);
|
||||
@@ -1069,7 +1108,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
// value template
|
||||
// if its nested mqtt format then use the appended entity name, otherwise take the original
|
||||
char val_tpl[75];
|
||||
if (is_nested) {
|
||||
if (is_nested()) {
|
||||
snprintf(val_tpl, sizeof(val_tpl), "{{value_json.%s}}", new_entity);
|
||||
} else {
|
||||
snprintf(val_tpl, sizeof(val_tpl), "{{value_json.%s}}", read_flash_string(entity).c_str());
|
||||
@@ -1193,10 +1232,11 @@ const std::string Mqtt::tag_to_topic(uint8_t device_type, uint8_t tag) {
|
||||
}
|
||||
|
||||
// if there is a tag add it
|
||||
if ((EMSdevice::tag_to_mqtt(tag).empty()) || ((nested_format_ == 1) && (device_type != EMSdevice::DeviceType::BOILER))) {
|
||||
return EMSdevice::device_type_2_device_name(device_type) + "_data";
|
||||
} else {
|
||||
if (!EMSdevice::tag_to_mqtt(tag).empty()
|
||||
&& ((device_type == EMSdevice::DeviceType::BOILER && tag == DeviceValueTAG::TAG_DEVICE_DATA_WW) || (!is_nested() && tag >= DeviceValueTAG::TAG_HC1))) {
|
||||
return EMSdevice::device_type_2_device_name(device_type) + "_data_" + EMSdevice::tag_to_mqtt(tag);
|
||||
} else {
|
||||
return EMSdevice::device_type_2_device_name(device_type) + "_data";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user