Refactor MQTT subscriptions #173

This commit is contained in:
proddy
2021-11-01 23:31:30 +01:00
parent 40a7026d4c
commit 01bace4048
38 changed files with 873 additions and 942 deletions

View File

@@ -38,7 +38,7 @@ bool Mqtt::mqtt_enabled_;
uint8_t Mqtt::ha_climate_format_;
bool Mqtt::ha_enabled_;
uint8_t Mqtt::nested_format_;
uint8_t Mqtt::subscribe_format_;
bool Mqtt::send_response_;
std::deque<Mqtt::QueuedMqttMessage> Mqtt::mqtt_messages_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
@@ -54,16 +54,17 @@ uuid::log::Logger Mqtt::logger_{F_(mqtt), uuid::log::Facility::DAEMON};
// subscribe to an MQTT topic, and store the associated callback function
// only if it already hasn't been added
// topics exclude the base
void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_sub_function_p cb) {
// check if we already have the topic subscribed for this specific device type, if so don't add it again
// add the function (in case its not there) and quit because it already exists
if (!mqtt_subfunctions_.empty()) {
for (auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
// add the function (in case its not there) and quit because it already exists
if (cb) {
mqtt_subfunction.mqtt_subfunction_ = cb;
}
return;
return; // exit - don't add
}
}
}
@@ -80,86 +81,23 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_
queue_subscribe_message(topic);
}
// subscribe to the command topic if it doesn't exist yet
void Mqtt::sub_command(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cb, uint8_t flags) {
if (!mqtt_enabled_) {
return;
}
std::string topic = EMSdevice::device_type_2_device_name(device_type); // thermostat, boiler, etc...
// see if we have already a handler for the device type (boiler, thermostat). If not add it
bool exists = false;
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
exists = true;
}
}
}
if (!exists) {
Mqtt::subscribe(device_type, topic, nullptr); // use an empty function handler to signal this is a command function only (e.g. ems-esp/boiler)
}
// add the individual commands too (e.g. ems-esp/boiler/wwonetime)
// https://github.com/emsesp/EMS-ESP32/issues/31
if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && ((flags & CommandFlag::MQTT_SUB_FLAG_HC) == CommandFlag::MQTT_SUB_FLAG_HC)) {
std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0');
hc_topic = topic + "/hc1/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc2/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc3/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
hc_topic = topic + "/hc4/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
} else if (subscribe_format_ != Subscribe_Format::GENERAL && ((flags & CommandFlag::MQTT_SUB_FLAG_NOSUB) != CommandFlag::MQTT_SUB_FLAG_NOSUB)) {
std::string hc_topic(MQTT_TOPIC_MAX_SIZE, '\0');
hc_topic = topic + "/" + uuid::read_flash_string(cmd);
queue_subscribe_message(hc_topic);
}
}
// subscribe to an MQTT topic, and store the associated callback function
// For generic functions not tied to a specific device
void Mqtt::subscribe(const std::string & topic, mqtt_sub_function_p cb) {
subscribe(0, topic, cb); // no device_id needed if generic to EMS-ESP
}
// resubscribe to all MQTT topics
// if it's already in the queue, ignore it
void Mqtt::resubscribe() {
if (mqtt_subfunctions_.empty()) {
return;
}
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
// if it's already in the queue, ignore it
bool found = false;
for (const auto & message : mqtt_messages_) {
found |= ((message.content_->operation == Operation::SUBSCRIBE) && (mqtt_subfunction.topic_ == message.content_->topic));
}
if (!found) {
queue_subscribe_message(mqtt_subfunction.topic_);
}
}
for (const auto & cf : Command::commands()) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && cf.has_flags(CommandFlag::MQTT_SUB_FLAG_HC)) {
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc1/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc2/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc3/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc4/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
} else if (subscribe_format_ != Subscribe_Format::GENERAL && !cf.has_flags(CommandFlag::MQTT_SUB_FLAG_NOSUB)) {
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
}
}
}
// Main MQTT loop - sends out top item on publish queue
@@ -230,33 +168,6 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
// now show the commands...
for (const auto & cf : Command::commands()) {
if (subscribe_format_ == Subscribe_Format::INDIVIDUAL_ALL_HC && cf.has_flags(CommandFlag::MQTT_SUB_FLAG_HC)) {
shell.printfln(F(" %s/%s/hc1/%s"),
mqtt_base_.c_str(),
EMSdevice::device_type_2_device_name(cf.device_type_).c_str(),
uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc2/%s"),
mqtt_base_.c_str(),
EMSdevice::device_type_2_device_name(cf.device_type_).c_str(),
uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc3/%s"),
mqtt_base_.c_str(),
EMSdevice::device_type_2_device_name(cf.device_type_).c_str(),
uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc4/%s"),
mqtt_base_.c_str(),
EMSdevice::device_type_2_device_name(cf.device_type_).c_str(),
uuid::read_flash_string(cf.cmd_).c_str());
} else if (subscribe_format_ != Subscribe_Format::GENERAL && !cf.has_flags(CommandFlag::MQTT_SUB_FLAG_NOSUB)) {
shell.printfln(F(" %s/%s/%s"),
mqtt_base_.c_str(),
EMSdevice::device_type_2_device_name(cf.device_type_).c_str(),
uuid::read_flash_string(cf.cmd_).c_str());
}
}
shell.println();
// show queues
@@ -301,129 +212,75 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
shell.println();
}
#if defined(EMSESP_DEBUG)
// simulate receiving a MQTT message, used only for testing
void Mqtt::incoming(const char * topic, const char * payload) {
on_message(topic, payload, strlen(payload));
}
#endif
// received an MQTT message that we subscribed too
void Mqtt::on_message(const char * fulltopic, const char * payload, size_t len) {
if (len == 0) {
LOG_DEBUG(F("Received empty message %s"), fulltopic);
return; // ignore empty payloads
}
if (strncmp(fulltopic, mqtt_base_.c_str(), strlen(mqtt_base_.c_str())) != 0) {
LOG_DEBUG(F("Received unknown message %s - %s"), fulltopic, payload);
return; // not for us
}
char topic[100];
strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100);
// strip the topic substrings
char * topic_end = strchr(topic, '/');
if (topic_end != nullptr) {
topic_end[0] = '\0';
}
// topic is the full path
// payload is json or a single string and converted to a json with key 'value'
void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
// sometimes the payload is not terminated correctly, so make a copy
// convert payload to a null-terminated char string
char message[len + 2];
strlcpy(message, payload, len + 1);
LOG_DEBUG(F("Received %s => %s (length %d)"), topic, message, len);
#if defined(EMSESP_DEBUG)
if (len) {
LOG_DEBUG(F("Received topic `%s` => payload `%s` (length %d)"), topic, message, len);
} else {
LOG_DEBUG(F("Received topic `%s`"), topic);
}
#endif
// see if we have this topic in our subscription list, then call its callback handler
// check first againts any of our subscribed topics
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
// if we have callback function then call it
// otherwise proceed as process as a command
// add the base back
char full_topic[MQTT_TOPIC_MAX_SIZE];
snprintf(full_topic, sizeof(full_topic), "%s/%s", mqtt_base_.c_str(), mf.topic_.c_str());
if (!strcmp(topic, full_topic)) {
if (mf.mqtt_subfunction_) {
if (!(mf.mqtt_subfunction_)(message)) {
LOG_ERROR(F("MQTT error: invalid payload %s for this topic %s"), message, topic);
Mqtt::publish(F_(response), "invalid");
LOG_ERROR(F("error: invalid payload %s for this topic %s"), message, topic);
if (send_response_) {
Mqtt::publish(F_(response), "error: invalid data");
}
}
return;
}
// check if it's not json, then try and extract the command from the topic name
if (message[0] != '{') {
// get topic with substrings again
strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100);
char * cmd_only = strchr(topic, '/');
if (cmd_only == NULL) {
return; // invalid topic name
}
cmd_only++; // skip the /
// LOG_INFO(F("devicetype= %d, topic = %s, cmd = %s, message = %s), mf.device_type_, topic, cmd_only, message);
// call command, assume admin authentication is allowed
uint8_t cmd_return = Command::call(mf.device_type_, cmd_only, message, true);
if (cmd_return == CommandRet::NOT_FOUND) {
LOG_ERROR(F("No matching cmd (%s) in topic %s"), cmd_only, topic);
Mqtt::publish(F_(response), "unknown");
} else if (cmd_return != CommandRet::OK) {
LOG_ERROR(F("Invalid data with cmd (%s) in topic %s"), cmd_only, topic);
Mqtt::publish(F_(response), "unknown");
}
return;
}
// It's a command then with the payload being JSON like {"cmd":"<cmd>", "data":<data>, "id":<n>}
// Find the command from the json and call it directly
StaticJsonDocument<EMSESP_JSON_SIZE_SMALL> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
LOG_ERROR(F("MQTT error: payload %s, error %s"), message, error.c_str());
return;
}
const char * command = doc["cmd"];
if (command == nullptr) {
LOG_ERROR(F("MQTT error: invalid payload cmd format. message=%s"), message);
return;
}
// check for hc and id, and convert to int
int8_t n = -1; // no value
if (doc.containsKey("hc")) {
n = doc["hc"];
} else if (doc.containsKey("id")) {
n = doc["id"];
}
uint8_t cmd_return = CommandRet::OK;
JsonVariant data = doc["data"];
if (data.is<const char *>()) {
cmd_return = Command::call(mf.device_type_, command, data.as<const char *>(), true, n);
} else if (data.is<int>()) {
char data_str[10];
cmd_return = Command::call(mf.device_type_, command, Helpers::itoa(data_str, (int16_t)data.as<int>()), true, n);
} else if (data.is<float>()) {
char data_str[10];
cmd_return = Command::call(mf.device_type_, command, Helpers::render_value(data_str, (float)data.as<float>(), 2), true, n);
} else if (data.isNull()) {
DynamicJsonDocument resp(EMSESP_JSON_SIZE_XLARGE_DYN);
JsonObject json = resp.to<JsonObject>();
cmd_return = Command::call(mf.device_type_, command, "", true, n, json);
if (json.size()) {
Mqtt::publish(F_(response), resp.as<JsonObject>());
return;
}
}
if (cmd_return == CommandRet::NOT_FOUND) {
LOG_ERROR(F("No matching cmd (%s)"), command);
Mqtt::publish(F_(response), "unknown");
} else if (cmd_return != CommandRet::OK) {
LOG_ERROR(F("Invalid data for cmd (%s)"), command);
Mqtt::publish(F_(response), "unknown");
}
return;
}
}
// if we got here we didn't find a topic match
LOG_ERROR(F("No MQTT handler found for topic %s and payload %s"), topic, message);
// convert payload into a json doc, if it's not empty
// if the payload is a single parameter (not JSON) create a JSON with the key 'value'
StaticJsonDocument<EMSESP_JSON_SIZE_SMALL> input;
if (len != 0) {
DeserializationError error = deserializeJson(input, message);
if (error == DeserializationError::Code::InvalidInput) {
input.clear(); // this is important to clear first
input["value"] = (const char *)message; // always a string
}
}
// parse and call the command
StaticJsonDocument<EMSESP_JSON_SIZE_LARGE_DYN> output_doc;
JsonObject output = output_doc.to<JsonObject>();
uint8_t return_code = Command::process(topic, true, input.as<JsonObject>(), output); // mqtt is always authenticated
// send MQTT response if enabled
if (!send_response_ || output.isNull()) {
return;
}
if (return_code != CommandRet::OK) {
Mqtt::publish(F_(response), (const char *)output["message"]);
} else {
Mqtt::publish(F_(response), output); // output response from call
}
}
// print all the topics related to a specific device type
@@ -438,7 +295,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
shell.print(F(" Subscribed MQTT topics: "));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if (mqtt_subfunction.device_type_ == device_type) {
shell.printf(F("%s/%s "), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
}
}
shell.println();
@@ -499,7 +356,7 @@ void Mqtt::load_settings() {
ha_enabled_ = mqttSettings.ha_enabled;
ha_climate_format_ = mqttSettings.ha_climate_format;
nested_format_ = mqttSettings.nested_format;
subscribe_format_ = mqttSettings.subscribe_format;
send_response_ = mqttSettings.send_response;
// convert to milliseconds
publish_time_boiler_ = mqttSettings.publish_time_boiler * 1000;
@@ -564,17 +421,12 @@ void Mqtt::start() {
mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true
mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
// receiving mqtt
on_message(topic, payload, len);
on_message(topic, payload, len); // receiving mqtt
});
mqttClient_->onPublish([this](uint16_t packetId) {
// publish
on_publish(packetId);
on_publish(packetId); // publish
});
// register command
Command::add(EMSdevice::DeviceType::SYSTEM, F_(publish), System::command_publish, F("forces a MQTT publish"), CommandFlag::ADMIN_ONLY);
}
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {
@@ -672,8 +524,9 @@ void Mqtt::on_connect() {
EMSESP::shower_.send_mqtt_stat(false); // Send shower_activated as false
EMSESP::system_.send_heartbeat(); // send heatbeat
// re-subscribe to all MQTT topics
// re-subscribe to all custom registered MQTT topics
resubscribe();
EMSESP::reset_mqtt_ha(); // re-create all HA devices if there are any
publish_retain(F("status"), "online", true); // say we're alive to the Last Will topic, with retain on
@@ -740,22 +593,27 @@ void Mqtt::ha_status() {
}
// add sub or pub task to the queue.
// a fully-qualified topic is created by prefixing the base, unless it's HA
// returns a pointer to the message created
// the base is not included in the topic
std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain) {
if (topic.empty()) {
return nullptr;
}
// if it's a publish and the payload is empty, stop
if ((operation == Operation::PUBLISH) && (payload.empty())) {
return nullptr;
}
// take the topic and prefix the base, unless its for HA
std::shared_ptr<MqttMessage> message;
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
#ifdef EMSESP_DEBUG
if (operation == Operation::PUBLISH) {
LOG_INFO("Adding to queue: (Publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str());
LOG_INFO("[DEBUG] Adding to queue: (Publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str());
} else {
LOG_INFO("Adding to queue: (Subscribe) topic='%s'", message->topic.c_str());
LOG_INFO("[DEBUG] Adding to queue: (Subscribe) topic='%s'", message->topic.c_str());
}
#endif
@@ -867,9 +725,9 @@ void Mqtt::process_queue() {
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
char topic[MQTT_TOPIC_MAX_SIZE];
if (message->topic.find(uuid::read_flash_string(F_(homeassistant))) == 0) {
// leave topic as it is
strcpy(topic, message->topic.c_str());
strcpy(topic, message->topic.c_str()); // leave topic as it is
} else {
snprintf(topic, MQTT_TOPIC_MAX_SIZE, "%s/%s", mqtt_base_.c_str(), message->topic.c_str());
}
@@ -879,7 +737,7 @@ void Mqtt::process_queue() {
LOG_DEBUG(F("Subscribing to topic '%s'"), topic);
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
if (!packet_id) {
LOG_DEBUG(F("Error subscribing to topic '%s'"), topic);
LOG_ERROR(F("Error subscribing to topic '%s'"), topic);
}
mqtt_messages_.pop_front(); // remove the message from the queue
@@ -1150,5 +1008,4 @@ const std::string Mqtt::tag_to_topic(uint8_t device_type, uint8_t tag) {
}
}
} // namespace emsesp