MQTT updates: added HA discovery, removed heartbeat - HomeAssistant Discovery #288

This commit is contained in:
proddy
2020-09-28 18:17:23 +02:00
parent e7d069fdb7
commit b5062df8f4
26 changed files with 503 additions and 318 deletions

View File

@@ -34,6 +34,7 @@ uint32_t Mqtt::publish_time_solar_;
uint32_t Mqtt::publish_time_mixing_;
uint32_t Mqtt::publish_time_other_;
uint32_t Mqtt::publish_time_sensor_;
uint8_t Mqtt::mqtt_format_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
@@ -50,13 +51,19 @@ uuid::log::Logger Mqtt::logger_{F_(mqtt), uuid::log::Facility::DAEMON};
void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_subfunction_p cb) {
// check if we already have the topic subscribed, if so don't add it again
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
for (auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
// add the function, in case its not there
if (cb) {
mqtt_subfunction.mqtt_subfunction_ = cb;
}
return; // it exists, exit
}
}
}
LOG_DEBUG(F("Subscribing MQTT topic %s for device type %s"), topic.c_str(), EMSdevice::device_type_2_device_name(device_type).c_str());
// add to MQTT queue as a subscribe operation
auto message = queue_subscribe_message(topic);
@@ -77,6 +84,7 @@ void Mqtt::register_command(const uint8_t device_type, const uint8_t device_id,
}
}
}
if (!exists) {
Mqtt::subscribe(device_type, cmd_topic, nullptr); // use an empty function handler to signal this is a command function
}
@@ -151,10 +159,7 @@ void Mqtt::loop() {
void Mqtt::show_mqtt(uuid::console::Shell & shell) {
shell.printfln(F("MQTT is %s"), connected() ? uuid::read_flash_string(F_(connected)).c_str() : uuid::read_flash_string(F_(disconnected)).c_str());
EMSESP::esp8266React.getMqttSettingsService()->read([&](MqttSettings & settings) {
shell.printfln(F_(mqtt_heartbeat_fmt), settings.system_heartbeat ? F_(enabled) : F_(disabled));
shell.printfln(F_(mqtt_format_fmt), settings.mqtt_format);
});
EMSESP::esp8266React.getMqttSettingsService()->read([&](MqttSettings & settings) { shell.printfln(F_(mqtt_format_fmt), settings.mqtt_format); });
shell.printfln(F("MQTT publish fails count: %lu"), mqtt_publish_fails_);
shell.println();
@@ -215,7 +220,7 @@ void Mqtt::incoming(const char * topic, const char * payload) {
// received an MQTT message that we subscribed too
void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
if (len == 0) {
return;
return; // ignore empty payloads
}
// convert payload to a null-terminated char string
@@ -230,56 +235,59 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.full_topic_.c_str()) == 0) {
if (mf.mqtt_subfunction_) {
(mf.mqtt_subfunction_)(message); // matching function, call it
return;
} else {
// empty function. It's a command then. Find the command from the json and call it directly.
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_SMALL> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
LOG_ERROR(F("MQTT error: payload %s, error %s"), message, error.c_str());
return;
}
const char * command = doc["cmd"];
if (command == nullptr) {
LOG_ERROR(F("MQTT error: invalid payload cmd format. message=%s"), message);
return;
}
// check for hc and id, and convert to int
int8_t n = -1; // no value
if (doc.containsKey("hc")) {
n = doc["hc"];
} else if (doc.containsKey("id")) {
n = doc["id"];
}
bool cmd_known = false;
JsonVariant data = doc["data"];
JsonObject output; // empty object
if (data.is<char *>()) {
cmd_known = Command::call(mf.device_type_, command, data.as<char *>(), n, output);
} else if (data.is<int>()) {
char data_str[10];
cmd_known = Command::call(mf.device_type_, command, Helpers::itoa(data_str, (int16_t)data.as<int>()), n, output);
} else if (data.is<float>()) {
char data_str[10];
cmd_known = Command::call(mf.device_type_, command, Helpers::render_value(data_str, (float)data.as<float>(), 2), n, output);
} else if (data.isNull()) {
cmd_known = Command::call(mf.device_type_, command, "", n, output);
}
if (!cmd_known) {
LOG_ERROR(F("MQTT: no matching cmd or invalid data: %s"), message);
// matching function, call it. If it returns true keep quit
if ((mf.mqtt_subfunction_)(message)) {
return; // function executed successfully
}
}
// empty function. It's a command then. Find the command from the json and call it directly.
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_SMALL> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
LOG_ERROR(F("MQTT error: payload %s, error %s"), message, error.c_str());
return;
}
const char * command = doc["cmd"];
if (command == nullptr) {
LOG_ERROR(F("MQTT error: invalid payload cmd format. message=%s"), message);
return;
}
// check for hc and id, and convert to int
int8_t n = -1; // no value
if (doc.containsKey("hc")) {
n = doc["hc"];
} else if (doc.containsKey("id")) {
n = doc["id"];
}
bool cmd_known = false;
JsonVariant data = doc["data"];
JsonObject output; // empty object
if (data.is<char *>()) {
cmd_known = Command::call(mf.device_type_, command, data.as<char *>(), n, output);
} else if (data.is<int>()) {
char data_str[10];
cmd_known = Command::call(mf.device_type_, command, Helpers::itoa(data_str, (int16_t)data.as<int>()), n, output);
} else if (data.is<float>()) {
char data_str[10];
cmd_known = Command::call(mf.device_type_, command, Helpers::render_value(data_str, (float)data.as<float>(), 2), n, output);
} else if (data.isNull()) {
cmd_known = Command::call(mf.device_type_, command, "", n, output);
}
if (!cmd_known) {
LOG_ERROR(F("MQTT: no matching cmd (%s), invalid data or command failed"), command);
}
return;
}
}
// if we got here we didn't find a topic match
LOG_ERROR(F("No MQTT handler found for topic %s and payload %s"), topic, message);
}
@@ -343,6 +351,7 @@ void Mqtt::start() {
publish_time_sensor_ = mqttSettings.publish_time_sensor * 1000;
mqtt_qos_ = mqttSettings.mqtt_qos;
mqtt_retain_ = mqttSettings.mqtt_retain;
mqtt_format_ = mqttSettings.mqtt_format;
});
mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); });
@@ -458,9 +467,36 @@ void Mqtt::on_connect() {
resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics
if (mqtt_format() == Format::HA) {
ha_status(); // create a device in HA
}
LOG_INFO(F("MQTT connected"));
}
// Home Assistant Discovery
// homeassistant/sensor/ems-esp/status/config
void Mqtt::ha_status() {
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_MEDIUM> doc;
doc["name"] = F("EMS-ESP status");
doc["uniq_id"] = F("status");
doc["avty_t"] = F("ems-esp/status");
doc["json_attr_t"] = F("ems-esp/heartbeat");
doc["stat_t"] = F("ems-esp/heartbeat");
doc["val_tpl"] = F("{{value_json['status']}}");
doc["ic"] = F("mdi:home-thermometer-outline");
JsonObject dev = doc.createNestedObject("dev");
dev["name"] = F("EMS-ESP");
dev["sw"] = EMSESP_APP_VERSION;
dev["mf"] = F("proddy");
dev["mdl"] = F("EMS-ESP");
JsonArray ids = dev.createNestedArray("ids");
ids.add("ems-esp");
Mqtt::publish_retain(F("homeassistant/sensor/ems-esp/status/config"), doc.as<JsonObject>(), true); // publish the config payload with retain flag
}
// add sub or pub task to the queue.
// a fully-qualified topic is created by prefixing the hostname, unless it's HA
// returns a pointer to the message created
@@ -478,8 +514,8 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,
} else {
// prefix the hostname
std::string full_topic(50, '\0');
snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), Mqtt::hostname_.c_str(), topic.c_str());
std::string full_topic(100, '\0');
snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), hostname_.c_str(), topic.c_str());
// message = std::make_shared<MqttMessage>(operation, full_topic, std::move(payload), retain);
message = std::make_shared<MqttMessage>(operation, full_topic, std::move(payload), retain);
}
@@ -508,6 +544,12 @@ void Mqtt::publish(const std::string & topic, const std::string & payload) {
queue_publish_message(topic, payload, mqtt_retain_);
}
// MQTT Publish, using a user's retain flag - except for char * strings
void Mqtt::publish(const __FlashStringHelper * topic, const char * payload) {
queue_publish_message(uuid::read_flash_string(topic), payload, mqtt_retain_);
}
// MQTT Publish, using a specific retain flag, topic is a flash string
void Mqtt::publish(const __FlashStringHelper * topic, const std::string & payload) {
queue_publish_message(uuid::read_flash_string(topic), payload, mqtt_retain_);
@@ -517,25 +559,13 @@ void Mqtt::publish(const __FlashStringHelper * topic, const JsonObject & payload
publish(uuid::read_flash_string(topic), payload);
}
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
void Mqtt::publish_retain(const __FlashStringHelper * topic, const std::string & payload, bool retain) {
queue_publish_message(uuid::read_flash_string(topic), payload, retain);
}
void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
std::string payload_text;
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, retain);
}
void Mqtt::publish_retain(const __FlashStringHelper * topic, const JsonObject & payload, bool retain) {
publish_retain(uuid::read_flash_string(topic), payload, retain);
}
// publish json doc, only if its not empty
void Mqtt::publish(const std::string & topic, const JsonObject & payload) {
std::string payload_text;
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, mqtt_retain_);
if (payload.size()) {
std::string payload_text;
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, mqtt_retain_);
}
}
// for booleans, which get converted to string values 1 and 0
@@ -552,6 +582,24 @@ void Mqtt::publish(const std::string & topic) {
queue_publish_message(topic, "", false);
}
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
void Mqtt::publish_retain(const __FlashStringHelper * topic, const std::string & payload, bool retain) {
queue_publish_message(uuid::read_flash_string(topic), payload, retain);
}
// publish json doc, only if its not empty, using the retain flag
void Mqtt::publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
if (payload.size()) {
std::string payload_text;
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, retain);
}
}
void Mqtt::publish_retain(const __FlashStringHelper * topic, const JsonObject & payload, bool retain) {
publish_retain(uuid::read_flash_string(topic), payload, retain);
}
// take top from queue and perform the publish or subscribe action
// assumes there is an MQTT connection
void Mqtt::process_queue() {
@@ -632,4 +680,80 @@ void Mqtt::process_queue() {
mqtt_messages_.pop_front(); // remove the message from the queue
}
// HA config for a binary_sensor
void Mqtt::register_mqtt_ha_binary_sensor(const __FlashStringHelper * name, const char * entity) {
if (mqtt_format() != Format::HA) {
return;
}
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_MEDIUM> doc;
doc["name"] = name;
doc["uniq_id"] = entity;
std::string state_t(50, '\0');
snprintf_P(&state_t[0], state_t.capacity() + 1, PSTR("%s/%s"), hostname_.c_str(), entity);
doc["stat_t"] = state_t;
EMSESP::emsespSettingsService.read([&](EMSESPSettings & settings) {
if (settings.bool_format == BOOL_FORMAT_ONOFF) {
doc[F("payload_on")] = F("on");
doc[F("payload_off")] = F("off");
} else if (settings.bool_format == BOOL_FORMAT_TRUEFALSE) {
doc[F("payload_on")] = F("true");
doc[F("payload_off")] = F("false");
} else {
doc[F("payload_on")] = "1";
doc[F("payload_off")] = "0";
}
});
JsonObject dev = doc.createNestedObject(F("dev"));
JsonArray ids = dev.createNestedArray(F("ids"));
ids.add(F("ems-esp"));
std::string topic(100, '\0');
snprintf_P(&topic[0], topic.capacity() + 1, PSTR("homeassistant/binary_sensor/ems-esp/%s/config"), entity);
Mqtt::publish_retain(topic, doc.as<JsonObject>(), true); // publish the config payload with retain flag
}
// HA config for a normal sensor
void Mqtt::register_mqtt_ha_sensor(const __FlashStringHelper * name, const uint8_t device_type, const char * entity, const char * uom, const char * icon) {
if (mqtt_format() != Format::HA) {
return;
}
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_MEDIUM> doc;
doc["name"] = name;
std::string uniq(50, '\0');
snprintf_P(&uniq[0], uniq.capacity() + 1, PSTR("%s"), entity);
doc["uniq_id"] = uniq;
doc["unit_of_meas"] = uom;
std::string state_t(50, '\0');
snprintf_P(&state_t[0], state_t.capacity() + 1, PSTR("%s/%s_data"), hostname_.c_str(), EMSdevice::device_type_2_device_name(device_type).c_str());
doc["stat_t"] = state_t;
std::string tpl(50, '\0');
snprintf_P(&tpl[0], tpl.capacity() + 1, PSTR("{{value_json.%s}}"), entity);
doc["val_tpl"] = tpl;
if (strlen(icon)) {
doc["ic"] = icon;
}
JsonObject dev = doc.createNestedObject(F("dev"));
JsonArray ids = dev.createNestedArray(F("ids"));
ids.add(F("ems-esp"));
std::string topic(100, '\0');
snprintf_P(&topic[0], topic.capacity() + 1, PSTR("homeassistant/sensor/ems-esp/%s/config"), entity);
Mqtt::publish_retain(topic, doc.as<JsonObject>(), true); // publish the config payload with retain flag
}
} // namespace emsesp