mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 15:59:52 +03:00
Merge remote-tracking branch 'origin/flash_optimization' into dev #646
This commit is contained in:
247
src/mqtt.cpp
247
src/mqtt.cpp
@@ -186,26 +186,26 @@ void Mqtt::loop() {
|
||||
|
||||
// print MQTT log and other stuff to console
|
||||
void Mqtt::show_mqtt(uuid::console::Shell & shell) {
|
||||
shell.printfln(F("MQTT is %s"), connected() ? read_flash_string(F_(connected)).c_str() : read_flash_string(F_(disconnected)).c_str());
|
||||
shell.printfln("MQTT is %s", connected() ? F_(connected) : F_(disconnected));
|
||||
|
||||
shell.printfln(F("MQTT publish errors: %lu"), mqtt_publish_fails_);
|
||||
shell.printfln("MQTT publish errors: %lu", mqtt_publish_fails_);
|
||||
shell.println();
|
||||
|
||||
// show subscriptions
|
||||
shell.printfln(F("MQTT topic subscriptions:"));
|
||||
shell.printfln("MQTT topic subscriptions:");
|
||||
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
|
||||
shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
|
||||
shell.printfln(" %s/%s", mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
|
||||
}
|
||||
shell.println();
|
||||
|
||||
// show queues
|
||||
if (mqtt_messages_.empty()) {
|
||||
shell.printfln(F("MQTT queue is empty"));
|
||||
shell.printfln("MQTT queue is empty");
|
||||
shell.println();
|
||||
return;
|
||||
}
|
||||
|
||||
shell.printfln(F("MQTT queue (%d/%d messages):"), mqtt_messages_.size(), MAX_MQTT_MESSAGES);
|
||||
shell.printfln(("MQTT queue (%d/%d messages):"), mqtt_messages_.size(), MAX_MQTT_MESSAGES);
|
||||
|
||||
for (const auto & message : mqtt_messages_) {
|
||||
auto content = message.content_;
|
||||
@@ -222,12 +222,12 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
|
||||
// Publish messages
|
||||
if (message.retry_count_ == 0) {
|
||||
if (message.packet_id_ == 0) {
|
||||
shell.printfln(F(" [%02d] (Pub) topic=%s payload=%s"), message.id_, topic, content->payload.c_str());
|
||||
shell.printfln((" [%02d] (Pub) topic=%s payload=%s"), message.id_, topic, content->payload.c_str());
|
||||
} else {
|
||||
shell.printfln(F(" [%02d] (Pub) topic=%s payload=%s (pid %d)"), message.id_, topic, content->payload.c_str(), message.packet_id_);
|
||||
shell.printfln((" [%02d] (Pub) topic=%s payload=%s (pid %d)"), message.id_, topic, content->payload.c_str(), message.packet_id_);
|
||||
}
|
||||
} else {
|
||||
shell.printfln(F(" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)"),
|
||||
shell.printfln((" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)"),
|
||||
message.id_,
|
||||
topic,
|
||||
content->payload.c_str(),
|
||||
@@ -236,7 +236,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
|
||||
}
|
||||
} else {
|
||||
// Subscribe messages
|
||||
shell.printfln(F(" [%02d] (Sub) topic=%s"), message.id_, topic);
|
||||
shell.printfln((" [%02d] (Sub) topic=%s"), message.id_, topic);
|
||||
}
|
||||
}
|
||||
shell.println();
|
||||
@@ -264,16 +264,16 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons
|
||||
|
||||
#if defined(EMSESP_DEBUG)
|
||||
if (len) {
|
||||
LOG_DEBUG(F("Received topic `%s` => payload `%s` (length %d)"), topic, message, len);
|
||||
LOG_DEBUG(("Received topic `%s` => payload `%s` (length %d)"), topic, message, len);
|
||||
} else {
|
||||
LOG_DEBUG(F("Received topic `%s`"), topic);
|
||||
LOG_DEBUG("Received topic `%s`", topic);
|
||||
}
|
||||
#endif
|
||||
// remove HA topics if we don't use discovery
|
||||
if (strncmp(topic, discovery_prefix().c_str(), discovery_prefix().size()) == 0) {
|
||||
if (!ha_enabled_ && len) { // don't ping pong the empty message
|
||||
queue_publish_message(topic, "", true);
|
||||
LOG_DEBUG(F("Remove topic %s"), topic);
|
||||
LOG_DEBUG("Remove topic %s", topic);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -285,7 +285,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) cons
|
||||
snprintf(full_topic, sizeof(full_topic), "%s/%s", mqtt_base_.c_str(), mf.topic_.c_str());
|
||||
if ((!strcmp(topic, full_topic)) && (mf.mqtt_subfunction_)) {
|
||||
if (!(mf.mqtt_subfunction_)(message)) {
|
||||
LOG_ERROR(F("error: invalid payload %s for this topic %s"), message, topic);
|
||||
LOG_ERROR("error: invalid payload %s for this topic %s", message, topic);
|
||||
if (send_response_) {
|
||||
Mqtt::publish(F_(response), "error: invalid data");
|
||||
}
|
||||
@@ -341,10 +341,10 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
|
||||
return;
|
||||
}
|
||||
|
||||
// shell.print(F(" Subscribed MQTT topics: "));
|
||||
// shell.print(" Subscribed MQTT topics: ");
|
||||
// for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
|
||||
// if (mqtt_subfunction.device_type_ == device_type) {
|
||||
// shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
|
||||
// shell.printf("%s ", mqtt_subfunction.topic_.c_str());
|
||||
// }
|
||||
// }
|
||||
shell.println();
|
||||
@@ -358,7 +358,7 @@ void Mqtt::on_publish(uint16_t packetId) const {
|
||||
// find the MQTT message in the queue and remove it
|
||||
if (mqtt_messages_.empty()) {
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] No message stored for ACK pid %d"), packetId);
|
||||
LOG_DEBUG("[DEBUG] No message stored for ACK pid %d", packetId);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
@@ -368,18 +368,18 @@ void Mqtt::on_publish(uint16_t packetId) const {
|
||||
// if the last published failed, don't bother checking it. wait for the next retry
|
||||
if (mqtt_message.packet_id_ == 0) {
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] ACK for failed message pid 0"));
|
||||
LOG_DEBUG("[DEBUG] ACK for failed message pid 0");
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
if (mqtt_message.packet_id_ != packetId) {
|
||||
LOG_ERROR(F("Mismatch, expecting PID %d, got %d"), mqtt_message.packet_id_, packetId);
|
||||
LOG_ERROR("Mismatch, expecting PID %d, got %d", mqtt_message.packet_id_, packetId);
|
||||
mqtt_publish_fails_++; // increment error count
|
||||
}
|
||||
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] ACK pid %d"), packetId);
|
||||
LOG_DEBUG("[DEBUG] ACK pid %d", packetId);
|
||||
#endif
|
||||
|
||||
mqtt_messages_.pop_front(); // always remove from queue, regardless if there was a successful ACK
|
||||
@@ -450,17 +450,17 @@ void Mqtt::start() {
|
||||
}
|
||||
connecting_ = false;
|
||||
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
|
||||
LOG_WARNING(F("MQTT disconnected: TCP"));
|
||||
LOG_WARNING("MQTT disconnected: TCP");
|
||||
} else if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
|
||||
LOG_WARNING(F("MQTT disconnected: Identifier Rejected"));
|
||||
LOG_WARNING("MQTT disconnected: Identifier Rejected");
|
||||
} else if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
|
||||
LOG_WARNING(F("MQTT disconnected: Server unavailable"));
|
||||
LOG_WARNING("MQTT disconnected: Server unavailable");
|
||||
} else if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
|
||||
LOG_WARNING(F("MQTT disconnected: Malformed credentials"));
|
||||
LOG_WARNING("MQTT disconnected: Malformed credentials");
|
||||
} else if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
|
||||
LOG_WARNING(F("MQTT disconnected: Not authorized"));
|
||||
LOG_WARNING("MQTT disconnected: Not authorized");
|
||||
} else {
|
||||
LOG_WARNING(F("MQTT disconnected: code %d"), reason);
|
||||
LOG_WARNING("MQTT disconnected: code %d", reason);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -535,7 +535,7 @@ void Mqtt::on_connect() {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO(F("MQTT connected"));
|
||||
LOG_INFO("MQTT connected");
|
||||
|
||||
connecting_ = true;
|
||||
connectcount_++;
|
||||
@@ -546,15 +546,15 @@ void Mqtt::on_connect() {
|
||||
StaticJsonDocument<EMSESP_JSON_SIZE_MEDIUM> doc;
|
||||
// first time to connect
|
||||
if (connectcount_ == 1) {
|
||||
doc["event"] = FJSON("start");
|
||||
doc["event"] = "start";
|
||||
} else {
|
||||
doc["event"] = FJSON("reconnect");
|
||||
doc["event"] = "reconnect";
|
||||
}
|
||||
|
||||
doc["version"] = EMSESP_APP_VERSION;
|
||||
#ifndef EMSESP_STANDALONE
|
||||
if (WiFi.status() == WL_CONNECTED) {
|
||||
doc["connection"] = F("WiFi");
|
||||
doc["connection"] = ("WiFi");
|
||||
doc["hostname"] = WiFi.getHostname();
|
||||
doc["SSID"] = WiFi.SSID();
|
||||
doc["BSSID"] = WiFi.BSSIDstr();
|
||||
@@ -567,7 +567,7 @@ void Mqtt::on_connect() {
|
||||
doc["IPv6 address"] = uuid::printable_to_string(WiFi.localIPv6());
|
||||
}
|
||||
} else if (EMSESP::system_.ethernet_connected()) {
|
||||
doc["connection"] = F("Ethernet");
|
||||
doc["connection"] = ("Ethernet");
|
||||
doc["hostname"] = ETH.getHostname();
|
||||
doc["MAC"] = ETH.macAddress();
|
||||
doc["IPv4 address"] = uuid::printable_to_string(ETH.localIP()) + "/" + uuid::printable_to_string(ETH.subnetMask());
|
||||
@@ -600,7 +600,7 @@ void Mqtt::on_connect() {
|
||||
// re-subscribe to all custom registered MQTT topics
|
||||
resubscribe();
|
||||
|
||||
publish_retain(F("status"), "online", true); // say we're alive to the Last Will topic, with retain on
|
||||
publish_retain("status", "online", true); // say we're alive to the Last Will topic, with retain on
|
||||
|
||||
mqtt_publish_fails_ = 0; // reset fail count to 0
|
||||
|
||||
@@ -609,7 +609,7 @@ void Mqtt::on_connect() {
|
||||
LOG_INFO("Queue size: %d", mqtt_messages_.size());
|
||||
for (const auto & message : mqtt_messages_) {
|
||||
auto content = message.content_;
|
||||
LOG_INFO(F(" [%02d] (%d) topic=%s payload=%s"), message.id_, content->operation, content->topic.c_str(), content->payload.c_str());
|
||||
LOG_INFO((" [%02d] (%d) topic=%s payload=%s"), message.id_, content->operation, content->topic.c_str(), content->payload.c_str());
|
||||
}
|
||||
*/
|
||||
}
|
||||
@@ -620,21 +620,21 @@ void Mqtt::on_connect() {
|
||||
void Mqtt::ha_status() {
|
||||
StaticJsonDocument<EMSESP_JSON_SIZE_HA_CONFIG> doc;
|
||||
|
||||
doc["uniq_id"] = FJSON("ems-esp-system");
|
||||
doc["uniq_id"] = "ems-esp-system";
|
||||
doc["~"] = mqtt_base_; // default ems-esp
|
||||
// doc["avty_t"] = FJSON("~/status"); // commented out, as it causes errors in HA sometimes
|
||||
// doc["json_attr_t"] = FJSON("~/heartbeat"); // store also as HA attributes
|
||||
doc["stat_t"] = FJSON("~/heartbeat");
|
||||
doc["object_id"] = FJSON("ems_esp_status");
|
||||
doc["name"] = FJSON("EMS-ESP status");
|
||||
// doc["avty_t"] = "~/status"; // commented out, as it causes errors in HA sometimes
|
||||
// doc["json_attr_t"] = "~/heartbeat"; // store also as HA attributes
|
||||
doc["stat_t"] = "~/heartbeat";
|
||||
doc["object_id"] = "ems_esp_status";
|
||||
doc["name"] = "EMS-ESP status";
|
||||
doc["ic"] = F_(icondevice);
|
||||
doc["val_tpl"] = FJSON("{{value_json['bus_status']}}");
|
||||
doc["val_tpl"] = "{{value_json['bus_status']}}";
|
||||
|
||||
JsonObject dev = doc.createNestedObject("dev");
|
||||
dev["name"] = F_(EMSESP); // "EMS-ESP"
|
||||
dev["name"] = "EMS-ESP";
|
||||
dev["sw"] = "v" + std::string(EMSESP_APP_VERSION);
|
||||
dev["mf"] = FJSON("proddy");
|
||||
dev["mdl"] = F_(EMSESP); // "EMS-ESP"
|
||||
dev["mf"] = "proddy";
|
||||
dev["mdl"] = "EMS-ESP";
|
||||
JsonArray ids = dev.createNestedArray("ids");
|
||||
ids.add("ems-esp");
|
||||
|
||||
@@ -644,19 +644,20 @@ void Mqtt::ha_status() {
|
||||
|
||||
// create the sensors - must match the MQTT payload keys
|
||||
if (!EMSESP::system_.ethernet_connected()) {
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("WiFi RSSI"), F("rssi"), DeviceValueUOM::DBM);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("WiFi strength"), F("wifistrength"), DeviceValueUOM::PERCENT);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("WiFi RSSI"), ("rssi"), DeviceValueUOM::DBM);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("WiFi strength"), ("wifistrength"), DeviceValueUOM::PERCENT);
|
||||
}
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Uptime"), F("uptime"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Uptime (sec)"), F("uptime_sec"), DeviceValueUOM::SECONDS);
|
||||
publish_system_ha_sensor_config(DeviceValueType::BOOL, F("NTP status"), F("ntp_status"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Free memory"), F("freemem"), DeviceValueUOM::KB);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("MQTT fails"), F("mqttfails"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Rx received"), F("rxreceived"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Rx fails"), F("rxfails"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Tx reads"), F("txreads"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Tx writes"), F("txwrites"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, F("Tx fails"), F("txfails"), DeviceValueUOM::NONE);
|
||||
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Uptime"), ("uptime"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Uptime (sec)"), ("uptime_sec"), DeviceValueUOM::SECONDS);
|
||||
publish_system_ha_sensor_config(DeviceValueType::BOOL, ("NTP status"), ("ntp_status"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Free memory"), ("freemem"), DeviceValueUOM::KB);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("MQTT fails"), ("mqttfails"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Rx received"), ("rxreceived"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Rx fails"), ("rxfails"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Tx reads"), ("txreads"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Tx writes"), ("txwrites"), DeviceValueUOM::NONE);
|
||||
publish_system_ha_sensor_config(DeviceValueType::INT, ("Tx fails"), ("txfails"), DeviceValueUOM::NONE);
|
||||
}
|
||||
|
||||
// add sub or pub task to the queue.
|
||||
@@ -686,7 +687,7 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,
|
||||
// if the queue is full, make room but removing the last one
|
||||
if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES) {
|
||||
mqtt_messages_.pop_front();
|
||||
LOG_WARNING(F("Queue overflow, removing one message"));
|
||||
LOG_WARNING("Queue overflow, removing one message");
|
||||
mqtt_publish_fails_++;
|
||||
}
|
||||
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
|
||||
@@ -718,17 +719,17 @@ void Mqtt::publish(const std::string & topic, const std::string & payload) {
|
||||
}
|
||||
|
||||
// MQTT Publish, using a user's retain flag - except for char * strings
|
||||
void Mqtt::publish(const __FlashStringHelper * topic, const char * payload) {
|
||||
queue_publish_message(read_flash_string(topic), payload, mqtt_retain_);
|
||||
void Mqtt::publish(const char * topic, const char * payload) {
|
||||
queue_publish_message((topic), payload, mqtt_retain_);
|
||||
}
|
||||
|
||||
// MQTT Publish, using a specific retain flag, topic is a flash string
|
||||
void Mqtt::publish(const __FlashStringHelper * topic, const std::string & payload) {
|
||||
queue_publish_message(read_flash_string(topic), payload, mqtt_retain_);
|
||||
void Mqtt::publish(const char * topic, const std::string & payload) {
|
||||
queue_publish_message((topic), payload, mqtt_retain_);
|
||||
}
|
||||
|
||||
void Mqtt::publish(const __FlashStringHelper * topic, const JsonObject & payload) {
|
||||
publish(read_flash_string(topic), payload);
|
||||
void Mqtt::publish(const char * topic, const JsonObject & payload) {
|
||||
publish_retain(topic, payload, mqtt_retain_);
|
||||
}
|
||||
|
||||
// publish json doc, only if its not empty
|
||||
@@ -737,12 +738,16 @@ void Mqtt::publish(const std::string & topic, const JsonObject & payload) {
|
||||
}
|
||||
|
||||
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
|
||||
void Mqtt::publish_retain(const __FlashStringHelper * topic, const std::string & payload, bool retain) {
|
||||
queue_publish_message(read_flash_string(topic), payload, retain);
|
||||
void Mqtt::publish_retain(const char * topic, const std::string & payload, bool retain) {
|
||||
queue_publish_message((topic), payload, retain);
|
||||
}
|
||||
|
||||
// publish json doc, only if its not empty, using the retain flag
|
||||
void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
|
||||
publish_retain(topic.c_str(), payload, retain);
|
||||
}
|
||||
|
||||
void Mqtt::publish_retain(const char * topic, const JsonObject & payload, bool retain) {
|
||||
if (enabled() && payload.size()) {
|
||||
std::string payload_text;
|
||||
serializeJson(payload, payload_text); // convert json to string
|
||||
@@ -750,30 +755,22 @@ void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload,
|
||||
}
|
||||
}
|
||||
|
||||
void Mqtt::publish_retain(const __FlashStringHelper * topic, const JsonObject & payload, bool retain) {
|
||||
publish_retain(read_flash_string(topic), payload, retain);
|
||||
}
|
||||
|
||||
void Mqtt::publish_ha(const __FlashStringHelper * topic, const JsonObject & payload) {
|
||||
publish_ha(read_flash_string(topic), payload);
|
||||
}
|
||||
|
||||
// publish empty payload to remove the topic
|
||||
void Mqtt::publish_ha(const std::string & topic) {
|
||||
void Mqtt::publish_ha(const char * topic) {
|
||||
if (!enabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::string fulltopic = Mqtt::discovery_prefix() + topic;
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] Publishing empty HA topic=%s"), fulltopic.c_str());
|
||||
LOG_DEBUG("[DEBUG] Publishing empty HA topic=%s", fulltopic.c_str());
|
||||
#endif
|
||||
|
||||
queue_publish_message(fulltopic, "", true); // publish with retain to remove from broker
|
||||
}
|
||||
|
||||
// publish a Home Assistant config topic and payload, with retain flag off.
|
||||
void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) {
|
||||
void Mqtt::publish_ha(const char * topic, const JsonObject & payload) {
|
||||
if (!enabled()) {
|
||||
return;
|
||||
}
|
||||
@@ -784,9 +781,9 @@ void Mqtt::publish_ha(const std::string & topic, const JsonObject & payload) {
|
||||
|
||||
std::string fulltopic = Mqtt::discovery_prefix() + topic;
|
||||
#if defined(EMSESP_STANDALONE)
|
||||
LOG_DEBUG(F("Publishing HA topic=%s, payload=%s"), fulltopic.c_str(), payload_text.c_str());
|
||||
LOG_DEBUG("Publishing HA topic=%s, payload=%s", fulltopic.c_str(), payload_text.c_str());
|
||||
#elif defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[debug] Publishing HA topic=%s, payload=%s"), fulltopic.c_str(), payload_text.c_str());
|
||||
LOG_DEBUG("[debug] Publishing HA topic=%s, payload=%s", fulltopic.c_str(), payload_text.c_str());
|
||||
#endif
|
||||
|
||||
// queue messages if the MQTT connection is not yet established. to ensure we don't miss messages
|
||||
@@ -815,7 +812,7 @@ void Mqtt::process_queue() {
|
||||
// it will have a real packet ID
|
||||
if (mqtt_message.packet_id_ > 0) {
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] Waiting for QOS-ACK"));
|
||||
LOG_DEBUG("[DEBUG] Waiting for QOS-ACK");
|
||||
#endif
|
||||
// if we don't get the ack within 10 minutes, republish with new packet_id
|
||||
if (uuid::get_uptime_sec() - last_publish_queue_ < 600) {
|
||||
@@ -826,13 +823,13 @@ void Mqtt::process_queue() {
|
||||
|
||||
// if we're subscribing...
|
||||
if (message->operation == Operation::SUBSCRIBE) {
|
||||
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
|
||||
LOG_DEBUG("Subscribing to topic '%s'", topic);
|
||||
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
|
||||
if (!packet_id) {
|
||||
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
|
||||
return;
|
||||
}
|
||||
LOG_ERROR(F("Error subscribing to topic '%s'"), topic);
|
||||
LOG_ERROR("Error subscribing to topic '%s'", topic);
|
||||
mqtt_publish_fails_++; // increment failure counter
|
||||
}
|
||||
|
||||
@@ -843,13 +840,13 @@ void Mqtt::process_queue() {
|
||||
|
||||
// if we're unsubscribing...
|
||||
if (message->operation == Operation::UNSUBSCRIBE) {
|
||||
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
|
||||
LOG_DEBUG("Subscribing to topic '%s'", topic);
|
||||
uint16_t packet_id = mqttClient_->unsubscribe(topic);
|
||||
if (!packet_id) {
|
||||
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
|
||||
return;
|
||||
}
|
||||
LOG_ERROR(F("Error unsubscribing to topic '%s'"), topic);
|
||||
LOG_ERROR("Error unsubscribing to topic '%s'", topic);
|
||||
mqtt_publish_fails_++; // increment failure counter
|
||||
}
|
||||
|
||||
@@ -860,7 +857,7 @@ void Mqtt::process_queue() {
|
||||
|
||||
// else try and publish it
|
||||
uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
|
||||
LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)"),
|
||||
LOG_DEBUG(("Publishing topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)"),
|
||||
topic,
|
||||
mqtt_message.id_,
|
||||
message->retain,
|
||||
@@ -871,14 +868,14 @@ void Mqtt::process_queue() {
|
||||
if (packet_id == 0) {
|
||||
// it failed. if we retried n times, give up. remove from queue
|
||||
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
|
||||
LOG_ERROR(F("Failed to publish to %s after %d attempts"), topic, mqtt_message.retry_count_ + 1);
|
||||
LOG_ERROR("Failed to publish to %s after %d attempts", topic, mqtt_message.retry_count_ + 1);
|
||||
mqtt_publish_fails_++; // increment failure counter
|
||||
mqtt_messages_.pop_front(); // delete
|
||||
return;
|
||||
} else {
|
||||
// update the record
|
||||
mqtt_messages_.front().retry_count_++;
|
||||
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), topic, mqtt_message.retry_count_ + 1);
|
||||
LOG_DEBUG("Failed to publish to %s. Trying again, #%d", topic, mqtt_message.retry_count_ + 1);
|
||||
return; // leave on queue for next time so it gets republished
|
||||
}
|
||||
}
|
||||
@@ -888,7 +885,7 @@ void Mqtt::process_queue() {
|
||||
if (mqtt_qos_ != 0) {
|
||||
mqtt_messages_.front().packet_id_ = packet_id;
|
||||
#if defined(EMSESP_DEBUG)
|
||||
LOG_DEBUG(F("[DEBUG] Setting packetID for ACK to %d"), packet_id);
|
||||
LOG_DEBUG("[DEBUG] Setting packetID for ACK to %d", packet_id);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
@@ -927,8 +924,8 @@ void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const std::string & model,
|
||||
|
||||
publish_ha_sensor_config(dv.type,
|
||||
dv.tag,
|
||||
dv.get_fullname(),
|
||||
dv.fullname[0],
|
||||
dv.get_fullname().c_str(),
|
||||
(dv.fullname ? dv.fullname[0] : nullptr), // EN name
|
||||
dv.device_type,
|
||||
dv.short_name,
|
||||
dv.uom,
|
||||
@@ -942,37 +939,35 @@ void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const std::string & model,
|
||||
}
|
||||
|
||||
// publish HA sensor for System using the heartbeat tag
|
||||
void Mqtt::publish_system_ha_sensor_config(uint8_t type, const __FlashStringHelper * name, const __FlashStringHelper * entity, const uint8_t uom) {
|
||||
void Mqtt::publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom) {
|
||||
StaticJsonDocument<EMSESP_JSON_SIZE_HA_CONFIG> doc;
|
||||
JsonObject dev_json = doc.createNestedObject("dev");
|
||||
|
||||
JsonArray ids = dev_json.createNestedArray("ids");
|
||||
ids.add("ems-esp");
|
||||
|
||||
auto fullname = read_flash_string(name);
|
||||
|
||||
publish_ha_sensor_config(type, DeviceValueTAG::TAG_HEARTBEAT, fullname, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, dev_json);
|
||||
publish_ha_sensor_config(type, DeviceValueTAG::TAG_HEARTBEAT, name, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, dev_json);
|
||||
}
|
||||
|
||||
// MQTT discovery configs
|
||||
// entity must match the key/value pair in the *_data topic
|
||||
// note: some extra string copying done here, it looks messy but does help with heap fragmentation issues
|
||||
void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType
|
||||
uint8_t tag, // EMSdevice::DeviceValueTAG
|
||||
const std::string & fullname, // fullname, already translated
|
||||
const __FlashStringHelper * const en_name,
|
||||
const uint8_t device_type, // EMSdevice::DeviceType
|
||||
const __FlashStringHelper * const entity, // same as shortname
|
||||
const uint8_t uom, // EMSdevice::DeviceValueUOM (0=NONE)
|
||||
const bool remove, // true if we want to remove this topic
|
||||
const bool has_cmd,
|
||||
const __FlashStringHelper * const ** options,
|
||||
uint8_t options_size,
|
||||
const int16_t dv_set_min,
|
||||
const int16_t dv_set_max,
|
||||
const JsonObject & dev_json) {
|
||||
void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType
|
||||
uint8_t tag, // EMSdevice::DeviceValueTAG
|
||||
const char * const fullname, // fullname, already translated
|
||||
const char * const en_name,
|
||||
const uint8_t device_type, // EMSdevice::DeviceType
|
||||
const char * const entity, // same as shortname
|
||||
const uint8_t uom, // EMSdevice::DeviceValueUOM (0=NONE)
|
||||
const bool remove, // true if we want to remove this topic
|
||||
const bool has_cmd,
|
||||
const char * const ** options,
|
||||
uint8_t options_size,
|
||||
const int16_t dv_set_min,
|
||||
const int16_t dv_set_max,
|
||||
const JsonObject & dev_json) {
|
||||
// ignore if name (fullname) is empty
|
||||
if (fullname.empty() || en_name == nullptr) {
|
||||
if (fullname == nullptr || en_name == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -983,9 +978,9 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
// create entity by add the hc/wwc tag if present, separating with a .
|
||||
char new_entity[50];
|
||||
if (tag >= DeviceValueTAG::TAG_HC1) {
|
||||
snprintf(new_entity, sizeof(new_entity), "%s.%s", EMSdevice::tag_to_string(tag).c_str(), read_flash_string(entity).c_str());
|
||||
snprintf(new_entity, sizeof(new_entity), "%s.%s", EMSdevice::tag_to_string(tag).c_str(), (entity));
|
||||
} else {
|
||||
snprintf(new_entity, sizeof(new_entity), "%s", read_flash_string(entity).c_str());
|
||||
snprintf(new_entity, sizeof(new_entity), "%s", (entity));
|
||||
}
|
||||
|
||||
// build unique identifier which will be used in the topic, replacing all . with _ as not to break HA
|
||||
@@ -1037,7 +1032,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
// if we're asking to remove this topic, send an empty payload and exit
|
||||
// https://github.com/emsesp/EMS-ESP32/issues/196
|
||||
if (remove) {
|
||||
LOG_DEBUG(F("Removing HA config for %s"), uniq);
|
||||
LOG_DEBUG("Removing HA config for %s", uniq);
|
||||
publish_ha(topic);
|
||||
return;
|
||||
}
|
||||
@@ -1107,7 +1102,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
|
||||
// friendly name = <tag> <name>
|
||||
char ha_name[70];
|
||||
char * F_name = strdup(fullname.c_str());
|
||||
char * F_name = strdup(fullname);
|
||||
F_name[0] = toupper(F_name[0]); // capitalize first letter
|
||||
if (have_tag) {
|
||||
snprintf(ha_name, sizeof(ha_name), "%s %s", EMSdevice::tag_to_string(tag).c_str(), F_name);
|
||||
@@ -1120,14 +1115,12 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
// entity id is generated from the name, see https://www.home-assistant.io/docs/mqtt/discovery/#use-object_id-to-influence-the-entity-id
|
||||
// so we override it to make it unique using entity_id
|
||||
// See https://github.com/emsesp/EMS-ESP32/issues/596
|
||||
// keep it compatible to v3.4, use english fullname, no prefix (basename prefix commmented out)
|
||||
// keep it compatible to v3.4, use english fullname, no prefix (basename prefix commented out)
|
||||
char object_id[130];
|
||||
if (have_tag) {
|
||||
// snprintf(object_id, sizeof(object_id), "%s_%s_%s_%s", mqtt_basename_, device_name, EMSdevice::tag_to_string(tag).c_str(), read_flash_string(en_name).c_str());
|
||||
snprintf(object_id, sizeof(object_id), "%s_%s_%s", device_name, EMSdevice::tag_to_string(tag).c_str(), read_flash_string(en_name).c_str());
|
||||
snprintf(object_id, sizeof(object_id), "%s_%s_%s", device_name, EMSdevice::tag_to_string(tag).c_str(), en_name);
|
||||
} else {
|
||||
// snprintf(object_id, sizeof(object_id), "%s_%s_%s", mqtt_basename_, device_name, read_flash_string(en_name).c_str());
|
||||
snprintf(object_id, sizeof(object_id), "%s_%s", device_name, read_flash_string(en_name).c_str());
|
||||
snprintf(object_id, sizeof(object_id), "%s_%s", device_name, en_name);
|
||||
}
|
||||
doc["object_id"] = object_id;
|
||||
|
||||
@@ -1137,7 +1130,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
if (is_nested()) {
|
||||
snprintf(val_tpl, sizeof(val_tpl), "{{value_json.%s}}", new_entity);
|
||||
} else {
|
||||
snprintf(val_tpl, sizeof(val_tpl), "{{value_json.%s}}", read_flash_string(entity).c_str());
|
||||
snprintf(val_tpl, sizeof(val_tpl), "{{value_json.%s}}", entity);
|
||||
}
|
||||
doc["val_tpl"] = val_tpl;
|
||||
|
||||
@@ -1147,9 +1140,9 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
// and has no unit of measure or icon
|
||||
if (type == DeviceValueType::BOOL) {
|
||||
char result[10];
|
||||
doc[F("payload_on")] = Helpers::render_boolean(result, true);
|
||||
doc[F("payload_off")] = Helpers::render_boolean(result, false);
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[("payload_on")] = Helpers::render_boolean(result, true);
|
||||
doc[("payload_off")] = Helpers::render_boolean(result, false);
|
||||
doc[sc_ha] = F_(measurement);
|
||||
} else {
|
||||
// always set the uom
|
||||
if (uom != DeviceValueUOM::NONE) {
|
||||
@@ -1169,11 +1162,11 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
case DeviceValueUOM::DEGREES:
|
||||
case DeviceValueUOM::DEGREES_R:
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[dc_ha] = F("temperature"); // no icon needed
|
||||
doc[dc_ha] = "temperature"; // no icon needed
|
||||
break;
|
||||
case DeviceValueUOM::PERCENT:
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[dc_ha] = F("power_factor"); // no icon needed
|
||||
doc[dc_ha] = "power_factor"; // no icon needed
|
||||
break;
|
||||
case DeviceValueUOM::SECONDS:
|
||||
case DeviceValueUOM::MINUTES:
|
||||
@@ -1198,11 +1191,11 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
} else {
|
||||
doc[sc_ha] = F_(measurement);
|
||||
}
|
||||
doc[dc_ha] = F("energy"); // no icon needed
|
||||
doc[dc_ha] = "energy"; // no icon needed
|
||||
break;
|
||||
case DeviceValueUOM::KWH:
|
||||
doc[sc_ha] = F_(total_increasing);
|
||||
doc[dc_ha] = F("energy"); // no icon needed
|
||||
doc[dc_ha] = "energy"; // no icon needed
|
||||
break;
|
||||
case DeviceValueUOM::UA:
|
||||
doc[ic_ha] = F_(iconua);
|
||||
@@ -1210,16 +1203,16 @@ void Mqtt::publish_ha_sensor_config(uint8_t type,
|
||||
break;
|
||||
case DeviceValueUOM::BAR:
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[dc_ha] = F("pressure");
|
||||
doc[dc_ha] = "pressure";
|
||||
break;
|
||||
case DeviceValueUOM::W:
|
||||
case DeviceValueUOM::KW:
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[dc_ha] = F("power");
|
||||
doc[dc_ha] = "power";
|
||||
break;
|
||||
case DeviceValueUOM::DBM:
|
||||
doc[sc_ha] = F_(measurement);
|
||||
doc[dc_ha] = F("signal_strength");
|
||||
doc[dc_ha] = "signal_strength";
|
||||
break;
|
||||
case DeviceValueUOM::NONE:
|
||||
// for device entities which have numerical values, with no UOM
|
||||
|
||||
Reference in New Issue
Block a user