add Mqtt-base on publish, do not store fulltopic in each message

This commit is contained in:
MichaelDvP
2021-03-04 16:10:34 +01:00
parent e86df03b66
commit 300fff1909
2 changed files with 109 additions and 55 deletions

View File

@@ -83,7 +83,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_
// register in our libary with the callback function.
// We store both the original topic and the fully-qualified one
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(message->topic), std::move(cb));
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb));
}
// subscribe to the command topic if it doesn't exist yet
@@ -131,37 +131,51 @@ void Mqtt::loop() {
uint32_t currentMillis = uuid::get_uptime();
// create publish messages for each of the EMS device values, adding to queue
if (publish_time_boiler_ && (currentMillis - last_publish_boiler_ > publish_time_boiler_)) {
last_publish_boiler_ = currentMillis;
EMSESP::publish_device_values(EMSdevice::DeviceType::BOILER);
}
if (publish_time_thermostat_ && (currentMillis - last_publish_thermostat_ > publish_time_thermostat_)) {
last_publish_thermostat_ = currentMillis;
EMSESP::publish_device_values(EMSdevice::DeviceType::THERMOSTAT);
}
if (publish_time_solar_ && (currentMillis - last_publish_solar_ > publish_time_solar_)) {
last_publish_solar_ = currentMillis;
EMSESP::publish_device_values(EMSdevice::DeviceType::SOLAR);
}
if (publish_time_mixer_ && (currentMillis - last_publish_mixer_ > publish_time_mixer_)) {
last_publish_mixer_ = currentMillis;
EMSESP::publish_device_values(EMSdevice::DeviceType::MIXER);
}
if (currentMillis - last_publish_sensor_ > publish_time_sensor_) {
last_publish_sensor_ = currentMillis;
EMSESP::publish_sensor_values(publish_time_sensor_ != 0);
}
// publish top item from MQTT queue to stop flooding
if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) {
last_mqtt_poll_ = currentMillis;
process_queue();
}
// dallas publish on change
if (!publish_time_sensor_) {
EMSESP::publish_sensor_values(false);
}
if (!mqtt_messages_.empty()) {
return;
}
// create publish messages for each of the EMS device values, adding to queue, only one device per loop
if (publish_time_boiler_ && (currentMillis - last_publish_boiler_ > publish_time_boiler_)) {
last_publish_boiler_ = (currentMillis / publish_time_boiler_) * publish_time_boiler_;
EMSESP::publish_device_values(EMSdevice::DeviceType::BOILER);
} else
if (publish_time_thermostat_ && (currentMillis - last_publish_thermostat_ > publish_time_thermostat_)) {
last_publish_thermostat_ = (currentMillis / publish_time_thermostat_) * publish_time_thermostat_;
EMSESP::publish_device_values(EMSdevice::DeviceType::THERMOSTAT);
} else
if (publish_time_solar_ && (currentMillis - last_publish_solar_ > publish_time_solar_)) {
last_publish_solar_ = (currentMillis / publish_time_solar_) * publish_time_solar_;
EMSESP::publish_device_values(EMSdevice::DeviceType::SOLAR);
} else
if (publish_time_mixer_ && (currentMillis - last_publish_mixer_ > publish_time_mixer_)) {
last_publish_mixer_ = (currentMillis / publish_time_mixer_) * publish_time_mixer_;
EMSESP::publish_device_values(EMSdevice::DeviceType::MIXER);
} else
if (publish_time_other_ && (currentMillis - last_publish_other_ > publish_time_other_)) {
last_publish_other_ = (currentMillis / publish_time_other_) * publish_time_other_;
EMSESP::publish_other_values();
} else
if (publish_time_sensor_ && (currentMillis - last_publish_sensor_ > publish_time_sensor_)) {
last_publish_sensor_ = (currentMillis / publish_time_sensor_) * publish_time_sensor_;
EMSESP::publish_sensor_values(true);
}
}
// print MQTT log and other stuff to console
@@ -174,7 +188,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
// show subscriptions
shell.printfln(F("MQTT topic subscriptions:"));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s"), mqtt_subfunction.full_topic_.c_str());
shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
shell.println();
@@ -214,10 +228,17 @@ void Mqtt::incoming(const char * topic, const char * payload) {
}
// received an MQTT message that we subscribed too
void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
void Mqtt::on_message(const char * fulltopic, const char * payload, size_t len) {
if (len == 0) {
LOG_DEBUG(F("Received empty message %s"), fulltopic);
return; // ignore empty payloads
}
if (strncmp(fulltopic, mqtt_base_.c_str(), strlen(mqtt_base_.c_str())) != 0) {
LOG_DEBUG(F("Received unknown message %s - %s"), fulltopic, payload);
return; // not for us
}
char topic[100];
strlcpy(topic, &fulltopic[1 + strlen(mqtt_base_.c_str())], 100);
// convert payload to a null-terminated char string
char message[len + 2];
@@ -227,7 +248,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
// see if we have this topic in our subscription list, then call its callback handler
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.full_topic_.c_str()) == 0) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
if (mf.mqtt_subfunction_) {
// matching function, call it. If it returns true keep quit
if ((mf.mqtt_subfunction_)(message)) {
@@ -293,7 +314,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
shell.print(F(" Subscribed MQTT topics: "));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if (mqtt_subfunction.device_type_ == device_type) {
shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
shell.printf(F("%s/%s "), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
}
shell.println();
@@ -306,6 +327,9 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
void Mqtt::on_publish(uint16_t packetId) {
// find the MQTT message in the queue and remove it
if (mqtt_messages_.empty()) {
#if defined(EMSESP_DEBUG)
LOG_DEBUG(F("[DEBUG] No message stored for ACK pid %d"), packetId);
#endif
return;
}
@@ -313,6 +337,9 @@ void Mqtt::on_publish(uint16_t packetId) {
// if the last published failed, don't bother checking it. wait for the next retry
if (mqtt_message.packet_id_ == 0) {
#if defined(EMSESP_DEBUG)
LOG_DEBUG(F("[DEBUG] ACK for failed message pid 0"));
#endif
return;
}
@@ -321,6 +348,10 @@ void Mqtt::on_publish(uint16_t packetId) {
mqtt_publish_fails_++; // increment error count
}
#if defined(EMSESP_DEBUG)
LOG_DEBUG(F("[DEBUG] ACK pid %d"), packetId);
#endif
mqtt_messages_.pop_front(); // always remove from queue, regardless if there was a successful ACK
}
@@ -356,6 +387,9 @@ void Mqtt::start() {
mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); });
mqttClient_->onDisconnect([this](AsyncMqttClientDisconnectReason reason) {
if (!connecting_) {
return;
}
connecting_ = false;
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
LOG_INFO(F("MQTT disconnected: TCP"));
@@ -372,6 +406,14 @@ void Mqtt::start() {
if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
LOG_INFO(F("MQTT disconnected: Not authorized"));
}
// remove message with pending ack
if (!mqtt_messages_.empty()) {
auto mqtt_message = mqtt_messages_.front();
if (mqtt_message.packet_id_ != 0) {
mqtt_messages_.pop_front();
}
}
// mqtt_messages_.clear();
});
// create will_topic with the base prefixed. It has to be static because asyncmqttclient destroys the reference
@@ -390,7 +432,7 @@ void Mqtt::start() {
});
// create space for command buffer, to avoid heap memory fragmentation
mqtt_subfunctions_.reserve(10);
mqtt_subfunctions_.reserve(50);
}
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {
@@ -469,11 +511,15 @@ void Mqtt::on_connect() {
bool_format_ = mqttSettings.bool_format;
});
// first time to connect
if (connectcount_ == 1) {
// send info topic appended with the version information as JSON
StaticJsonDocument<EMSESP_JSON_SIZE_SMALL> doc;
// first time to connect
if (connectcount_ == 1) {
doc["event"] = FJSON("start");
} else {
doc["event"] = FJSON("reconnect");
}
doc["version"] = EMSESP_APP_VERSION;
#ifndef EMSESP_STANDALONE
doc["ip"] = WiFi.localIP().toString();
@@ -489,7 +535,8 @@ void Mqtt::on_connect() {
EMSESP::shower_.send_mqtt_stat(false); // Send shower_activated as false
EMSESP::system_.send_heartbeat(); // send heatbeat
} else {
// } else {
if (connectcount_ > 1) {
// we doing a re-connect from a TCP break
// only re-subscribe again to all MQTT topics
resubscribe();
@@ -559,15 +606,7 @@ std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation,
// take the topic and prefix the base, unless its for HA
std::shared_ptr<MqttMessage> message;
if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0)) {
// leave topic as it is
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
} else {
// prefix the base
std::string full_topic(MQTT_TOPIC_MAX_SIZE, '\0');
snprintf_P(&full_topic[0], full_topic.capacity() + 1, PSTR("%s/%s"), mqtt_base_.c_str(), topic.c_str());
message = std::make_shared<MqttMessage>(operation, full_topic, payload, retain);
}
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
// LOG_INFO("Added to queue: %s %s", message->topic.c_str(), message->payload.c_str()); // debugging only
@@ -677,13 +716,20 @@ void Mqtt::process_queue() {
// fetch first from queue and create the full topic name
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
char topic[MQTT_TOPIC_MAX_SIZE];
if ((strncmp(message->topic.c_str(), "homeassistant/", 13) == 0)) {
// leave topic as it is
strcpy(topic, message->topic.c_str());
} else {
snprintf_P(topic, MQTT_TOPIC_MAX_SIZE, PSTR("%s/%s"), mqtt_base_.c_str(), message->topic.c_str());
}
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str());
uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_);
LOG_DEBUG(F("Subscribing to topic: %s"), topic);
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
if (!packet_id) {
LOG_DEBUG(F("Error subscribing to %s"), message->topic.c_str());
LOG_DEBUG(F("Error subscribing to %s"), topic);
}
mqtt_messages_.pop_front(); // remove the message from the queue
@@ -694,24 +740,27 @@ void Mqtt::process_queue() {
// if this has already been published and we're waiting for an ACK, don't publish again
// it will have a real packet ID
if (mqtt_message.packet_id_ > 0) {
#if defined(EMSESP_DEBUG)
LOG_DEBUG(F("[DEBUG] Waitig for QOS-ACK"));
#endif
return;
}
// else try and publish it
uint16_t packet_id = mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"), message->topic.c_str(), mqtt_message.id_, message->retain, mqtt_message.retry_count_ + 1, message->payload.size(), packet_id);
uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
LOG_DEBUG(F("Publishing topic %s (#%02d, retain=%d, try#%d, size %d, pid %d)"), topic, mqtt_message.id_, message->retain, mqtt_message.retry_count_ + 1, message->payload.size(), packet_id);
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
LOG_ERROR(F("Failed to publish to %s after %d attempts"), topic, mqtt_message.retry_count_ + 1);
mqtt_publish_fails_++; // increment failure counter
mqtt_messages_.pop_front(); // delete
return;
} else {
// update the record
mqtt_messages_.front().retry_count_++;
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), topic, mqtt_message.retry_count_ + 1);
return; // leave on queue for next time so it gets republished
}
}
@@ -818,7 +867,7 @@ void Mqtt::publish_mqtt_ha_sensor(uint8_t type, // EMSdevice
snprintf_P(topic, sizeof(topic), PSTR("homeassistant/sensor/%s/%s/config"), mqtt_base_.c_str(), uniq.c_str()); // topic
// unit of measure and map the HA icon
if (uom != DeviceValueUOM::NONE) {
if (uom != DeviceValueUOM::NONE && uom != DeviceValueUOM::PUMP) {
doc["unit_of_meas"] = EMSdevice::uom_to_string(uom);
}
switch (uom) {
@@ -828,6 +877,9 @@ void Mqtt::publish_mqtt_ha_sensor(uint8_t type, // EMSdevice
case DeviceValueUOM::PERCENT:
doc["ic"] = F_(iconpercent);
break;
case DeviceValueUOM::PUMP:
doc["ic"] = F_(iconpump);
break;
case DeviceValueUOM::NONE:
default:
break;

View File

@@ -180,6 +180,10 @@ class Mqtt {
mqtt_retain_ = mqtt_retain;
}
static bool is_empty() {
return mqtt_messages_.empty();
}
/*
struct QueuedMqttMessage {
uint16_t id_;
@@ -226,13 +230,11 @@ class Mqtt {
struct MQTTSubFunction {
uint8_t device_type_; // which device type, from DeviceType::
const std::string topic_; // short topic name
const std::string full_topic_; // the fully qualified topic name, usually with the hostname prefixed
mqtt_subfunction_p mqtt_subfunction_; // can be empty
MQTTSubFunction(uint8_t device_type, const std::string && topic, const std::string && full_topic, mqtt_subfunction_p mqtt_subfunction)
MQTTSubFunction(uint8_t device_type, const std::string && topic, mqtt_subfunction_p mqtt_subfunction)
: device_type_(device_type)
, topic_(topic)
, full_topic_(full_topic)
, mqtt_subfunction_(mqtt_subfunction) {
}
};