use only espMqttClient queue

This commit is contained in:
MichaelDvP
2023-06-05 10:06:19 +02:00
parent d2ff44e1cf
commit b28865a283
5 changed files with 124 additions and 362 deletions

View File

@@ -47,13 +47,13 @@ bool Mqtt::send_response_;
bool Mqtt::publish_single_;
bool Mqtt::publish_single2cmd_;
std::deque<Mqtt::QueuedMqttMessage> Mqtt::mqtt_messages_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
uint32_t Mqtt::mqtt_publish_fails_ = 0;
bool Mqtt::connecting_ = false;
bool Mqtt::initialized_ = false;
bool Mqtt::ha_climate_reset_ = false;
uint16_t Mqtt::queuecount_ = 0;
uint8_t Mqtt::connectcount_ = 0;
uint32_t Mqtt::mqtt_message_id_ = 0;
char will_topic_[Mqtt::MQTT_TOPIC_MAX_SIZE]; // because MQTT library keeps only char pointer
@@ -95,7 +95,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_
// We store the original topic string without base
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb));
if (!enabled()) {
if (!enabled() || !connected()) {
return;
}
@@ -117,14 +117,7 @@ void Mqtt::resubscribe() {
}
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
bool found = false;
for (const auto & message : mqtt_messages_) {
found |= ((message.content_->operation == Operation::SUBSCRIBE) && (mqtt_subfunction.topic_ == message.content_->topic));
}
if (!found) {
queue_subscribe_message(mqtt_subfunction.topic_);
}
queue_subscribe_message(mqtt_subfunction.topic_);
}
}
@@ -137,12 +130,6 @@ void Mqtt::loop() {
uint32_t currentMillis = uuid::get_uptime();
// publish MQTT queue, but timed to avoid overloading the TCP pipe
if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) {
last_mqtt_poll_ = currentMillis;
process_queue();
}
// send heartbeat
if ((currentMillis - last_publish_heartbeat_ > publish_time_heartbeat_)) {
last_publish_heartbeat_ = currentMillis;
@@ -154,7 +141,8 @@ void Mqtt::loop() {
EMSESP::publish_sensor_values(false);
}
if (!mqtt_messages_.empty()) {
queuecount_ = mqttClient_->getQueue();
if (queuecount_ > 0) {
return;
}
@@ -197,6 +185,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
shell.printfln("MQTT Entity ID format is %d", entity_format_);
shell.printfln("MQTT publish errors: %lu", mqtt_publish_fails_);
shell.printfln("MQTT queue: %d", queuecount_);
shell.println();
// show subscriptions
@@ -206,47 +195,6 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
}
shell.println();
// show queues
if (mqtt_messages_.empty()) {
shell.printfln("MQTT queue is empty");
shell.println();
return;
}
shell.printfln("MQTT queue (%d):", mqtt_messages_.size());
for (const auto & message : mqtt_messages_) {
auto content = message.content_;
char topic[MQTT_TOPIC_MAX_SIZE];
// prefix base, only if it's not a discovery topic
if (content->topic.compare(0, discovery_prefix().size(), discovery_prefix()) == 0) {
snprintf(topic, sizeof(topic), "%s/%s", mqtt_base_.c_str(), content->topic.c_str());
} else {
snprintf(topic, sizeof(topic), "%s", content->topic.c_str());
}
if (content->operation == Operation::PUBLISH) {
// Publish messages
if (message.retry_count_ == 0) {
if (message.packet_id_ == 0) {
shell.printfln(" [%02d] (Pub) topic=%s payload=%s", message.id_, topic, content->payload.c_str());
} else {
shell.printfln(" [%02d] (Pub) topic=%s payload=%s (pid %d)", message.id_, topic, content->payload.c_str(), message.packet_id_);
}
} else {
shell.printfln(" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)",
message.id_,
topic,
content->payload.c_str(),
message.packet_id_,
message.retry_count_);
}
} else {
// Subscribe messages
shell.printfln(" [%02d] (Sub) topic=%s", message.id_, topic);
}
}
shell.println();
}
@@ -265,7 +213,7 @@ void Mqtt::incoming(const char * topic, const char * payload) {
void Mqtt::on_message(const char * topic, const char * payload, size_t len) const {
// sometimes the payload is not terminated correctly, so make a copy
// convert payload to a null-terminated char string
char message[len + 2];
char message[len + 2] = {'\0'};
if (payload != nullptr) {
strlcpy(message, payload, len + 1);
}
@@ -366,42 +314,12 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
}
// called when an MQTT Publish ACK is received
// its a poor man's QOS we assume the ACK represents the last Publish sent
// check if ACK matches the last Publish we sent, if not report an error. Only if qos is 1 or 2
// and always remove from queue
void Mqtt::on_publish(uint16_t packetId) const {
// find the MQTT message in the queue and remove it
if (mqtt_messages_.empty()) {
LOG_DEBUG("No message stored for ACK pid %d", packetId);
return;
}
auto mqtt_message = mqtt_messages_.front();
// if the last published failed, don't bother checking it. wait for the next retry
if (mqtt_message.packet_id_ == 0) {
#if defined(EMSESP_DEBUG)
LOG_DEBUG("ACK for failed message pid 0");
#endif
return;
}
if (mqtt_message.packet_id_ != packetId) {
LOG_ERROR("Mismatch, expecting PID %d, got %d", mqtt_message.packet_id_, packetId);
mqtt_publish_fails_++; // increment error count
}
LOG_DEBUG("ACK pid %d", packetId);
mqtt_messages_.pop_front(); // always remove from queue, regardless if there was a successful ACK
LOG_DEBUG("Packet %d sent successfull", packetId);
}
// called when MQTT settings have changed via the Web forms
void Mqtt::reset_mqtt() {
if (!mqtt_enabled_) {
mqtt_messages_.clear();
}
if (!mqttClient_) {
return;
}
@@ -465,6 +383,8 @@ void Mqtt::start() {
static char will_topic[MQTT_TOPIC_MAX_SIZE];
if (!mqtt_base_.empty()) {
snprintf(will_topic, MQTT_TOPIC_MAX_SIZE, "%s/status", mqtt_base_.c_str());
} else {
snprintf(will_topic, MQTT_TOPIC_MAX_SIZE, "status");
}
mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true
@@ -474,9 +394,11 @@ void Mqtt::start() {
on_message(topic, (const char *)payload, len); // receiving mqtt
});
/*
mqttClient_->onPublish([this](uint16_t packetId) {
on_publish(packetId); // publish
});
*/
}
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {
@@ -567,6 +489,7 @@ void Mqtt::on_connect() {
connecting_ = true;
connectcount_++; // count # reconnects. not currently used.
queuecount_ = 0;
load_settings(); // reload MQTT settings - in case they have changes
@@ -595,15 +518,6 @@ void Mqtt::on_connect() {
queue_publish_retain("status", "online", true); // with retain on
mqtt_publish_fails_ = 0; // reset fail count to 0
/*
// for debugging only
LOG_INFO("Queue size: %d", mqtt_messages_.size());
for (const auto & message : mqtt_messages_) {
auto content = message.content_;
LOG_INFO(" [%02d] (%d) topic=%s payload=%s", message.id_, content->operation, content->topic.c_str(), content->payload.c_str());
}
*/
}
// Home Assistant Discovery - the main master Device called EMS-ESP
@@ -673,45 +587,44 @@ void Mqtt::ha_status() {
// add sub or pub task to the queue.
// the base is not included in the topic
void Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain) {
bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain) {
if (topic.empty()) {
return;
return false;
}
uint16_t packet_id = 0;
char fulltopic[MQTT_TOPIC_MAX_SIZE];
#ifndef EMSESP_STANDALONE
// anything below 60MB available free heap is dangerously low, so we stop adding to prevent a crash
// instead of doing a mqtt_messages_.pop_front()
if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES || (!emsesp::EMSESP::system_.PSram() && ESP.getFreeHeap() < (60 * 1024))) {
LOG_WARNING("Queue overflow (queue count=%d, topic=%s)", mqtt_messages_.size(), topic.c_str());
mqtt_publish_fails_++;
return; // don't add to top of queue
if (topic.find(discovery_prefix_) == 0) {
strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is
} else {
// it's a discovery topic, added the mqtt base to the topic path
snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base
}
#endif
// take the topic and prefix the base, unless its for HA
std::shared_ptr<MqttMessage> message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
if (operation == Operation::PUBLISH) {
if (message->payload.empty()) {
LOG_DEBUG("Adding to queue: (remove) topic='%s'", message->topic.c_str());
} else {
LOG_DEBUG("Adding to queue: (publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str());
}
} else {
LOG_DEBUG("Adding to queue: (subscribe) topic='%s'", message->topic.c_str());
packet_id = mqttClient_->publish(fulltopic, mqtt_qos_, retain, payload.c_str());
mqtt_message_id_++;
LOG_DEBUG("Publishing topic '%s', pid %d", fulltopic, packet_id);
} else if (operation == Operation::SUBSCRIBE) {
packet_id = mqttClient_->subscribe(fulltopic, mqtt_qos_);
LOG_DEBUG("Subscribing to topic '%s', pid %d", fulltopic, packet_id);
} else if (operation == Operation::UNSUBSCRIBE) {
packet_id = mqttClient_->unsubscribe(fulltopic);
LOG_DEBUG("Unsubscribing to topic '%s', pid %d", fulltopic, packet_id);
}
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
return;
if (packet_id == 0) {
LOG_WARNING("%s failed: %s", operation == Operation::PUBLISH ? "Publish" : operation == Operation::SUBSCRIBE ? "Subscribe" : "Unsubscribe", fulltopic);
mqtt_publish_fails_++;
}
return (packet_id != 0);
}
// add MQTT message to queue, payload is a string
void Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, bool retain) {
bool Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) {
if (!enabled()) {
return;
return false;
};
queue_message(Operation::PUBLISH, topic, payload, retain);
return queue_message(Operation::PUBLISH, topic, payload, retain);
}
// add MQTT subscribe message to queue
@@ -725,187 +638,77 @@ void Mqtt::queue_unsubscribe_message(const std::string & topic) {
}
// MQTT Publish, using a user's retain flag
void Mqtt::queue_publish(const std::string & topic, const std::string & payload) {
queue_publish_message(topic, payload, mqtt_retain_);
bool Mqtt::queue_publish(const std::string & topic, const std::string & payload) {
return queue_publish_message(topic, payload, mqtt_retain_);
}
// MQTT Publish, using a user's retain flag - except for char * strings
void Mqtt::queue_publish(const char * topic, const char * payload) {
queue_publish_message((topic), payload, mqtt_retain_);
bool Mqtt::queue_publish(const char * topic, const char * payload) {
return queue_publish_message((topic), payload, mqtt_retain_);
}
// MQTT Publish, using a specific retain flag, topic is a flash string
void Mqtt::queue_publish(const char * topic, const std::string & payload) {
queue_publish_message((topic), payload, mqtt_retain_);
bool Mqtt::queue_publish(const char * topic, const std::string & payload) {
return queue_publish_message((topic), payload, mqtt_retain_);
}
void Mqtt::queue_publish(const char * topic, const JsonObject & payload) {
queue_publish_retain(topic, payload, mqtt_retain_);
bool Mqtt::queue_publish(const char * topic, const JsonObject & payload) {
return queue_publish_retain(topic, payload, mqtt_retain_);
}
// publish json doc, only if its not empty
void Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) {
queue_publish_retain(topic, payload, mqtt_retain_);
bool Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) {
return queue_publish_retain(topic, payload, mqtt_retain_);
}
// MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag
void Mqtt::queue_publish_retain(const char * topic, const std::string & payload, bool retain) {
queue_publish_message((topic), payload, retain);
bool Mqtt::queue_publish_retain(const char * topic, const std::string & payload, const bool retain) {
return queue_publish_message((topic), payload, retain);
}
// publish json doc, only if its not empty, using the retain flag
void Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain) {
queue_publish_retain(topic.c_str(), payload, retain);
bool Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, const bool retain) {
return queue_publish_retain(topic.c_str(), payload, retain);
}
void Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, bool retain) {
bool Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, const bool retain) {
if (enabled() && payload.size()) {
std::string payload_text;
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, retain);
return queue_publish_message(topic, payload_text, retain);
}
return false;
}
// publish empty payload to remove the topic
void Mqtt::queue_remove_topic(const char * topic) {
bool Mqtt::queue_remove_topic(const char * topic) {
if (!enabled()) {
return;
return false;
}
if (ha_enabled_) {
queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker
return queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker
} else {
queue_publish_message(topic, "", true); // publish with retain to remove from broker
return queue_publish_message(topic, "", true); // publish with retain to remove from broker
}
}
// queue a Home Assistant config topic and payload, with retain flag off.
void Mqtt::queue_ha(const char * topic, const JsonObject & payload) {
bool Mqtt::queue_ha(const char * topic, const JsonObject & payload) {
if (!enabled()) {
return;
return false;
}
std::string payload_text;
payload_text.reserve(measureJson(payload) + 1);
serializeJson(payload, payload_text); // convert json to string
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true
}
// take top from queue and perform the publish or subscribe action
// assumes there is an MQTT connection
void Mqtt::process_queue() {
if (mqtt_messages_.empty()) {
return;
}
// fetch first from queue and create the full topic name
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
char topic[MQTT_TOPIC_MAX_SIZE];
if (message->topic.find(discovery_prefix_) == 0) {
strlcpy(topic, message->topic.c_str(), sizeof(topic)); // leave topic as it is
} else {
// it's a discovery topic, added the mqtt base to the topic path
snprintf(topic, MQTT_TOPIC_MAX_SIZE, "%s/%s", mqtt_base_.c_str(), message->topic.c_str()); // uses base
}
// if this has already been published and we're waiting for an ACK, don't publish again
// it will have a real packet ID
if (mqtt_message.packet_id_ > 0) {
LOG_DEBUG("Waiting for QOS-ACK");
// if we don't get the ack within 10 seconds, republish with new packet_id
if (uuid::get_uptime_sec() - last_publish_queue_ < 10) {
return;
}
}
last_publish_queue_ = uuid::get_uptime_sec();
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
LOG_DEBUG("Subscribing to topic '%s'", topic);
uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_);
if (!packet_id) {
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
return;
}
LOG_ERROR("Error subscribing to topic '%s'", topic);
mqtt_publish_fails_++; // increment failure counter
}
mqtt_messages_.pop_front(); // remove the message from the queue
return;
}
// if we're unsubscribing...
if (message->operation == Operation::UNSUBSCRIBE) {
LOG_DEBUG("Subscribing to topic '%s'", topic);
uint16_t packet_id = mqttClient_->unsubscribe(topic);
if (!packet_id) {
if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) {
return;
}
LOG_ERROR("Error unsubscribing to topic '%s'", topic);
mqtt_publish_fails_++; // increment failure counter
}
mqtt_messages_.pop_front(); // remove the message from the queue
return;
}
// else try and publish it
// this is where the *real* publish happens
// uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str());
lasttopic_ = topic;
lastpayload_ = message->payload;
LOG_DEBUG("Published topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)",
topic,
mqtt_message.id_,
message->retain,
mqtt_message.retry_count_ + 1,
message->payload.size(),
packet_id);
/*
if (!message->payload.empty()) {
LOG_DEBUG("Payload: %s", message->payload.c_str()); // extra message for #784
}
*/
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
LOG_ERROR("Failed to publish to %s after %d attempts", topic, mqtt_message.retry_count_ + 1);
mqtt_publish_fails_++; // increment failure counter
mqtt_messages_.pop_front(); // delete
return;
} else {
// update the record
mqtt_messages_.front().retry_count_++;
LOG_DEBUG("Failed to publish to %s. Trying again, #%d", topic, mqtt_message.retry_count_ + 1);
return; // leave on queue for next time so it gets republished
}
}
// if we have ACK set with QOS 1 or 2, leave on queue and let the ACK process remove it
// but add the packet_id so we can check it later
if (mqtt_qos_ != 0) {
mqtt_messages_.front().packet_id_ = packet_id;
LOG_DEBUG("Setting packetID for ACK to %d", packet_id);
return;
}
mqtt_messages_.pop_front(); // remove the message from the queue
return queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true
}
// create's a ha sensor config topic from a device value object
// and also takes a flag (create_device_config) used to also create the main HA device config. This is only needed for one entity
void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config) {
bool Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config) {
StaticJsonDocument<EMSESP_JSON_SIZE_LARGE> dev_json;
// always create the ids
@@ -934,38 +737,39 @@ void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const
// unless the entity has been marked as read-only and so it'll default to using the sensor/ type
bool has_cmd = dv.has_cmd && !dv.has_state(DeviceValueState::DV_READONLY);
publish_ha_sensor_config(dv.type,
dv.tag,
dv.get_fullname().c_str(),
(dv.fullname ? dv.fullname[0] : nullptr), // EN name
dv.device_type,
dv.short_name,
dv.uom,
remove,
has_cmd,
dv.options,
dv.options_size,
dv_set_min,
dv_set_max,
dv.numeric_operator,
dev_json.as<JsonObject>());
return publish_ha_sensor_config(dv.type,
dv.tag,
dv.get_fullname().c_str(),
(dv.fullname ? dv.fullname[0] : nullptr), // EN name
dv.device_type,
dv.short_name,
dv.uom,
remove,
has_cmd,
dv.options,
dv.options_size,
dv_set_min,
dv_set_max,
dv.numeric_operator,
dev_json.as<JsonObject>());
}
// publish HA sensor for System using the heartbeat tag
void Mqtt::publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom) {
bool Mqtt::publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom) {
StaticJsonDocument<EMSESP_JSON_SIZE_LARGE> doc;
JsonObject dev_json = doc.createNestedObject("dev");
JsonArray ids = dev_json.createNestedArray("ids");
ids.add("ems-esp");
publish_ha_sensor_config(type, DeviceValueTAG::TAG_HEARTBEAT, name, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, 0, dev_json);
return publish_ha_sensor_config(
type, DeviceValueTAG::TAG_HEARTBEAT, name, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, 0, dev_json);
}
// MQTT discovery configs
// entity must match the key/value pair in the *_data topic
// note: some extra string copying done here, it looks messy but does help with heap fragmentation issues
void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType
bool Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType
uint8_t tag, // EMSdevice::DeviceValueTAG
const char * const fullname, // fullname, already translated
const char * const en_name, // original name in english
@@ -982,7 +786,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
const JsonObject & dev_json) {
// ignore if name (fullname) is empty
if (!fullname || !en_name) {
return;
return false;
}
// create the device name
@@ -1074,8 +878,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
// https://github.com/emsesp/EMS-ESP32/issues/196
if (remove) {
LOG_DEBUG("Queuing removing topic %s", topic);
queue_remove_topic(topic);
return;
return queue_remove_topic(topic);
}
// build the payload
@@ -1305,10 +1108,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev
add_avty_to_doc(stat_t, doc.as<JsonObject>(), val_cond);
// TODO queue it or send it directly via publish?
queue_ha(topic, doc.as<JsonObject>());
return queue_ha(topic, doc.as<JsonObject>());
}
void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) {
bool Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) {
uint8_t hc_num = tag - DeviceValueTAG::TAG_HC1 + 1;
char topic[Mqtt::MQTT_TOPIC_MAX_SIZE];
@@ -1329,8 +1132,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
snprintf(topic, sizeof(topic), "climate/%s/thermostat_hc%d/config", mqtt_basename_.c_str(), hc_num);
if (remove) {
queue_remove_topic(topic); // publish empty payload with retain flag
return;
return queue_remove_topic(topic); // publish empty payload with retain flag
}
if (Mqtt::is_nested()) {
@@ -1415,7 +1217,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp,
// add "availability" section
add_avty_to_doc(topic_t, doc.as<JsonObject>(), seltemp_cond, has_roomtemp ? currtemp_cond : nullptr, hc_mode_cond);
queue_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
return queue_ha(topic, doc.as<JsonObject>()); // publish the config payload with retain flag
}
// based on the device and tag, create the MQTT topic name (without the basename)