diff --git a/lib/espMqttClient/src/MqttClient.cpp b/lib/espMqttClient/src/MqttClient.cpp index c371c9f93..2f1d0d643 100644 --- a/lib/espMqttClient/src/MqttClient.cpp +++ b/lib/espMqttClient/src/MqttClient.cpp @@ -707,9 +707,7 @@ uint16_t MqttClient::getQueue() const { espMqttClientInternals::Outbox::Iterator it = _outbox.front(); uint16_t count = 0; while (it) { - if (it.get()->packet.packetType() == PacketType.PUBLISH) { - ++count; - } + ++count; ++it; } EMC_SEMAPHORE_GIVE(); diff --git a/src/emsdevice.cpp b/src/emsdevice.cpp index aa22b4b14..91de94609 100644 --- a/src/emsdevice.cpp +++ b/src/emsdevice.cpp @@ -1689,23 +1689,26 @@ void EMSdevice::mqtt_ha_entity_config_create() { for (auto & dv : devicevalues_) { if (!strcmp(dv.short_name, FL_(haclimate)[0]) && !dv.has_state(DeviceValueState::DV_API_MQTT_EXCLUDE) && dv.has_state(DeviceValueState::DV_ACTIVE)) { if (*(int8_t *)(dv.value_p) == 1 && (!dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED) || dv.has_state(DeviceValueState::DV_HA_CLIMATE_NO_RT))) { - dv.remove_state(DeviceValueState::DV_HA_CLIMATE_NO_RT); - dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); - Mqtt::publish_ha_climate_config(dv.tag, true, false, dv.min, dv.max); // roomTemp + if (Mqtt::publish_ha_climate_config(dv.tag, true, false, dv.min, dv.max)) { // roomTemp + dv.remove_state(DeviceValueState::DV_HA_CLIMATE_NO_RT); + dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); + } } else if (*(int8_t *)(dv.value_p) == 0 && (!dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED) || !dv.has_state(DeviceValueState::DV_HA_CLIMATE_NO_RT))) { - dv.add_state(DeviceValueState::DV_HA_CLIMATE_NO_RT); - dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); - Mqtt::publish_ha_climate_config(dv.tag, false, false, dv.min, dv.max); // no roomTemp + if (Mqtt::publish_ha_climate_config(dv.tag, false, false, dv.min, dv.max)) { // no roomTemp + dv.add_state(DeviceValueState::DV_HA_CLIMATE_NO_RT); + dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); + } } } if (!dv.has_state(DeviceValueState::DV_HA_CONFIG_CREATED) && (dv.type != DeviceValueType::CMD) && dv.has_state(DeviceValueState::DV_ACTIVE) && !dv.has_state(DeviceValueState::DV_API_MQTT_EXCLUDE)) { // create_device_config is only done once for the EMS device. It can added to any entity, so we take the first - Mqtt::publish_ha_sensor_config(dv, name(), brand_to_char(), false, create_device_config); - dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); - create_device_config = false; // only create the main config once + if (Mqtt::publish_ha_sensor_config(dv, name(), brand_to_char(), false, create_device_config)) { + dv.add_state(DeviceValueState::DV_HA_CONFIG_CREATED); + create_device_config = false; // only create the main config once + } #ifndef EMSESP_STANDALONE // always create minimum one config if (ESP.getMaxAllocHeap() < (6 * 1024) || (!emsesp::EMSESP::system_.PSram() && ESP.getFreeHeap() < (65 * 1024))) { diff --git a/src/emsesp.cpp b/src/emsesp.cpp index f9ec33684..ceeb25c6f 100644 --- a/src/emsesp.cpp +++ b/src/emsesp.cpp @@ -493,7 +493,7 @@ void EMSESP::publish_all_loop() { } // wait for free queue before sending next message, HA-messages are also queued - if (!Mqtt::is_empty()) { + if (Mqtt::publish_queued() > 0) { return; } diff --git a/src/mqtt.cpp b/src/mqtt.cpp index 3461403c5..974e60740 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -47,13 +47,13 @@ bool Mqtt::send_response_; bool Mqtt::publish_single_; bool Mqtt::publish_single2cmd_; -std::deque Mqtt::mqtt_messages_; -std::vector Mqtt::mqtt_subfunctions_; +std::vector Mqtt::mqtt_subfunctions_; uint32_t Mqtt::mqtt_publish_fails_ = 0; bool Mqtt::connecting_ = false; bool Mqtt::initialized_ = false; bool Mqtt::ha_climate_reset_ = false; +uint16_t Mqtt::queuecount_ = 0; uint8_t Mqtt::connectcount_ = 0; uint32_t Mqtt::mqtt_message_id_ = 0; char will_topic_[Mqtt::MQTT_TOPIC_MAX_SIZE]; // because MQTT library keeps only char pointer @@ -95,7 +95,7 @@ void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_ // We store the original topic string without base mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(cb)); - if (!enabled()) { + if (!enabled() || !connected()) { return; } @@ -117,14 +117,7 @@ void Mqtt::resubscribe() { } for (const auto & mqtt_subfunction : mqtt_subfunctions_) { - bool found = false; - for (const auto & message : mqtt_messages_) { - found |= ((message.content_->operation == Operation::SUBSCRIBE) && (mqtt_subfunction.topic_ == message.content_->topic)); - } - - if (!found) { - queue_subscribe_message(mqtt_subfunction.topic_); - } + queue_subscribe_message(mqtt_subfunction.topic_); } } @@ -137,12 +130,6 @@ void Mqtt::loop() { uint32_t currentMillis = uuid::get_uptime(); - // publish MQTT queue, but timed to avoid overloading the TCP pipe - if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) { - last_mqtt_poll_ = currentMillis; - process_queue(); - } - // send heartbeat if ((currentMillis - last_publish_heartbeat_ > publish_time_heartbeat_)) { last_publish_heartbeat_ = currentMillis; @@ -154,7 +141,8 @@ void Mqtt::loop() { EMSESP::publish_sensor_values(false); } - if (!mqtt_messages_.empty()) { + queuecount_ = mqttClient_->getQueue(); + if (queuecount_ > 0) { return; } @@ -197,6 +185,7 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { shell.printfln("MQTT Entity ID format is %d", entity_format_); shell.printfln("MQTT publish errors: %lu", mqtt_publish_fails_); + shell.printfln("MQTT queue: %d", queuecount_); shell.println(); // show subscriptions @@ -206,47 +195,6 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { } shell.println(); - // show queues - if (mqtt_messages_.empty()) { - shell.printfln("MQTT queue is empty"); - shell.println(); - return; - } - - shell.printfln("MQTT queue (%d):", mqtt_messages_.size()); - - for (const auto & message : mqtt_messages_) { - auto content = message.content_; - char topic[MQTT_TOPIC_MAX_SIZE]; - - // prefix base, only if it's not a discovery topic - if (content->topic.compare(0, discovery_prefix().size(), discovery_prefix()) == 0) { - snprintf(topic, sizeof(topic), "%s/%s", mqtt_base_.c_str(), content->topic.c_str()); - } else { - snprintf(topic, sizeof(topic), "%s", content->topic.c_str()); - } - - if (content->operation == Operation::PUBLISH) { - // Publish messages - if (message.retry_count_ == 0) { - if (message.packet_id_ == 0) { - shell.printfln(" [%02d] (Pub) topic=%s payload=%s", message.id_, topic, content->payload.c_str()); - } else { - shell.printfln(" [%02d] (Pub) topic=%s payload=%s (pid %d)", message.id_, topic, content->payload.c_str(), message.packet_id_); - } - } else { - shell.printfln(" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)", - message.id_, - topic, - content->payload.c_str(), - message.packet_id_, - message.retry_count_); - } - } else { - // Subscribe messages - shell.printfln(" [%02d] (Sub) topic=%s", message.id_, topic); - } - } shell.println(); } @@ -265,7 +213,7 @@ void Mqtt::incoming(const char * topic, const char * payload) { void Mqtt::on_message(const char * topic, const char * payload, size_t len) const { // sometimes the payload is not terminated correctly, so make a copy // convert payload to a null-terminated char string - char message[len + 2]; + char message[len + 2] = {'\0'}; if (payload != nullptr) { strlcpy(message, payload, len + 1); } @@ -366,42 +314,12 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic } // called when an MQTT Publish ACK is received -// its a poor man's QOS we assume the ACK represents the last Publish sent -// check if ACK matches the last Publish we sent, if not report an error. Only if qos is 1 or 2 -// and always remove from queue void Mqtt::on_publish(uint16_t packetId) const { - // find the MQTT message in the queue and remove it - if (mqtt_messages_.empty()) { - LOG_DEBUG("No message stored for ACK pid %d", packetId); - return; - } - - auto mqtt_message = mqtt_messages_.front(); - - // if the last published failed, don't bother checking it. wait for the next retry - if (mqtt_message.packet_id_ == 0) { -#if defined(EMSESP_DEBUG) - LOG_DEBUG("ACK for failed message pid 0"); -#endif - return; - } - - if (mqtt_message.packet_id_ != packetId) { - LOG_ERROR("Mismatch, expecting PID %d, got %d", mqtt_message.packet_id_, packetId); - mqtt_publish_fails_++; // increment error count - } - - LOG_DEBUG("ACK pid %d", packetId); - - mqtt_messages_.pop_front(); // always remove from queue, regardless if there was a successful ACK + LOG_DEBUG("Packet %d sent successfull", packetId); } // called when MQTT settings have changed via the Web forms void Mqtt::reset_mqtt() { - if (!mqtt_enabled_) { - mqtt_messages_.clear(); - } - if (!mqttClient_) { return; } @@ -465,6 +383,8 @@ void Mqtt::start() { static char will_topic[MQTT_TOPIC_MAX_SIZE]; if (!mqtt_base_.empty()) { snprintf(will_topic, MQTT_TOPIC_MAX_SIZE, "%s/status", mqtt_base_.c_str()); + } else { + snprintf(will_topic, MQTT_TOPIC_MAX_SIZE, "status"); } mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true @@ -474,9 +394,11 @@ void Mqtt::start() { on_message(topic, (const char *)payload, len); // receiving mqtt }); + /* mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); // publish }); + */ } void Mqtt::set_publish_time_boiler(uint16_t publish_time) { @@ -567,6 +489,7 @@ void Mqtt::on_connect() { connecting_ = true; connectcount_++; // count # reconnects. not currently used. + queuecount_ = 0; load_settings(); // reload MQTT settings - in case they have changes @@ -595,15 +518,6 @@ void Mqtt::on_connect() { queue_publish_retain("status", "online", true); // with retain on mqtt_publish_fails_ = 0; // reset fail count to 0 - - /* - // for debugging only - LOG_INFO("Queue size: %d", mqtt_messages_.size()); - for (const auto & message : mqtt_messages_) { - auto content = message.content_; - LOG_INFO(" [%02d] (%d) topic=%s payload=%s", message.id_, content->operation, content->topic.c_str(), content->payload.c_str()); - } - */ } // Home Assistant Discovery - the main master Device called EMS-ESP @@ -673,45 +587,44 @@ void Mqtt::ha_status() { // add sub or pub task to the queue. // the base is not included in the topic -void Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain) { +bool Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain) { if (topic.empty()) { - return; + return false; } + uint16_t packet_id = 0; + char fulltopic[MQTT_TOPIC_MAX_SIZE]; -#ifndef EMSESP_STANDALONE - // anything below 60MB available free heap is dangerously low, so we stop adding to prevent a crash - // instead of doing a mqtt_messages_.pop_front() - if (mqtt_messages_.size() >= MAX_MQTT_MESSAGES || (!emsesp::EMSESP::system_.PSram() && ESP.getFreeHeap() < (60 * 1024))) { - LOG_WARNING("Queue overflow (queue count=%d, topic=%s)", mqtt_messages_.size(), topic.c_str()); - mqtt_publish_fails_++; - return; // don't add to top of queue + if (topic.find(discovery_prefix_) == 0) { + strlcpy(fulltopic, topic.c_str(), sizeof(fulltopic)); // leave topic as it is + } else { + // it's a discovery topic, added the mqtt base to the topic path + snprintf(fulltopic, sizeof(fulltopic), "%s/%s", mqtt_base_.c_str(), topic.c_str()); // uses base } -#endif - - // take the topic and prefix the base, unless its for HA - std::shared_ptr message = std::make_shared(operation, topic, payload, retain); if (operation == Operation::PUBLISH) { - if (message->payload.empty()) { - LOG_DEBUG("Adding to queue: (remove) topic='%s'", message->topic.c_str()); - } else { - LOG_DEBUG("Adding to queue: (publish) topic='%s' payload=%s", message->topic.c_str(), message->payload.c_str()); - } - } else { - LOG_DEBUG("Adding to queue: (subscribe) topic='%s'", message->topic.c_str()); + packet_id = mqttClient_->publish(fulltopic, mqtt_qos_, retain, payload.c_str()); + mqtt_message_id_++; + LOG_DEBUG("Publishing topic '%s', pid %d", fulltopic, packet_id); + } else if (operation == Operation::SUBSCRIBE) { + packet_id = mqttClient_->subscribe(fulltopic, mqtt_qos_); + LOG_DEBUG("Subscribing to topic '%s', pid %d", fulltopic, packet_id); + } else if (operation == Operation::UNSUBSCRIBE) { + packet_id = mqttClient_->unsubscribe(fulltopic); + LOG_DEBUG("Unsubscribing to topic '%s', pid %d", fulltopic, packet_id); } - - mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message)); - - return; + if (packet_id == 0) { + LOG_WARNING("%s failed: %s", operation == Operation::PUBLISH ? "Publish" : operation == Operation::SUBSCRIBE ? "Subscribe" : "Unsubscribe", fulltopic); + mqtt_publish_fails_++; + } + return (packet_id != 0); } // add MQTT message to queue, payload is a string -void Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, bool retain) { +bool Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) { if (!enabled()) { - return; + return false; }; - queue_message(Operation::PUBLISH, topic, payload, retain); + return queue_message(Operation::PUBLISH, topic, payload, retain); } // add MQTT subscribe message to queue @@ -725,187 +638,77 @@ void Mqtt::queue_unsubscribe_message(const std::string & topic) { } // MQTT Publish, using a user's retain flag -void Mqtt::queue_publish(const std::string & topic, const std::string & payload) { - queue_publish_message(topic, payload, mqtt_retain_); +bool Mqtt::queue_publish(const std::string & topic, const std::string & payload) { + return queue_publish_message(topic, payload, mqtt_retain_); } // MQTT Publish, using a user's retain flag - except for char * strings -void Mqtt::queue_publish(const char * topic, const char * payload) { - queue_publish_message((topic), payload, mqtt_retain_); +bool Mqtt::queue_publish(const char * topic, const char * payload) { + return queue_publish_message((topic), payload, mqtt_retain_); } // MQTT Publish, using a specific retain flag, topic is a flash string -void Mqtt::queue_publish(const char * topic, const std::string & payload) { - queue_publish_message((topic), payload, mqtt_retain_); +bool Mqtt::queue_publish(const char * topic, const std::string & payload) { + return queue_publish_message((topic), payload, mqtt_retain_); } -void Mqtt::queue_publish(const char * topic, const JsonObject & payload) { - queue_publish_retain(topic, payload, mqtt_retain_); +bool Mqtt::queue_publish(const char * topic, const JsonObject & payload) { + return queue_publish_retain(topic, payload, mqtt_retain_); } // publish json doc, only if its not empty -void Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) { - queue_publish_retain(topic, payload, mqtt_retain_); +bool Mqtt::queue_publish(const std::string & topic, const JsonObject & payload) { + return queue_publish_retain(topic, payload, mqtt_retain_); } // MQTT Publish, using a specific retain flag, topic is a flash string, forcing retain flag -void Mqtt::queue_publish_retain(const char * topic, const std::string & payload, bool retain) { - queue_publish_message((topic), payload, retain); +bool Mqtt::queue_publish_retain(const char * topic, const std::string & payload, const bool retain) { + return queue_publish_message((topic), payload, retain); } // publish json doc, only if its not empty, using the retain flag -void Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain) { - queue_publish_retain(topic.c_str(), payload, retain); +bool Mqtt::queue_publish_retain(const std::string & topic, const JsonObject & payload, const bool retain) { + return queue_publish_retain(topic.c_str(), payload, retain); } -void Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, bool retain) { +bool Mqtt::queue_publish_retain(const char * topic, const JsonObject & payload, const bool retain) { if (enabled() && payload.size()) { std::string payload_text; serializeJson(payload, payload_text); // convert json to string - queue_publish_message(topic, payload_text, retain); + return queue_publish_message(topic, payload_text, retain); } + return false; } // publish empty payload to remove the topic -void Mqtt::queue_remove_topic(const char * topic) { +bool Mqtt::queue_remove_topic(const char * topic) { if (!enabled()) { - return; + return false; } if (ha_enabled_) { - queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker + return queue_publish_message(Mqtt::discovery_prefix() + topic, "", true); // publish with retain to remove from broker } else { - queue_publish_message(topic, "", true); // publish with retain to remove from broker + return queue_publish_message(topic, "", true); // publish with retain to remove from broker } } // queue a Home Assistant config topic and payload, with retain flag off. -void Mqtt::queue_ha(const char * topic, const JsonObject & payload) { +bool Mqtt::queue_ha(const char * topic, const JsonObject & payload) { if (!enabled()) { - return; + return false; } std::string payload_text; payload_text.reserve(measureJson(payload) + 1); - serializeJson(payload, payload_text); // convert json to string + serializeJson(payload, payload_text); // convert json to string - queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true -} - -// take top from queue and perform the publish or subscribe action -// assumes there is an MQTT connection -void Mqtt::process_queue() { - if (mqtt_messages_.empty()) { - return; - } - - // fetch first from queue and create the full topic name - auto mqtt_message = mqtt_messages_.front(); - auto message = mqtt_message.content_; - char topic[MQTT_TOPIC_MAX_SIZE]; - - if (message->topic.find(discovery_prefix_) == 0) { - strlcpy(topic, message->topic.c_str(), sizeof(topic)); // leave topic as it is - } else { - // it's a discovery topic, added the mqtt base to the topic path - snprintf(topic, MQTT_TOPIC_MAX_SIZE, "%s/%s", mqtt_base_.c_str(), message->topic.c_str()); // uses base - } - - // if this has already been published and we're waiting for an ACK, don't publish again - // it will have a real packet ID - if (mqtt_message.packet_id_ > 0) { - LOG_DEBUG("Waiting for QOS-ACK"); - // if we don't get the ack within 10 seconds, republish with new packet_id - if (uuid::get_uptime_sec() - last_publish_queue_ < 10) { - return; - } - } - last_publish_queue_ = uuid::get_uptime_sec(); - - // if we're subscribing... - if (message->operation == Operation::SUBSCRIBE) { - LOG_DEBUG("Subscribing to topic '%s'", topic); - uint16_t packet_id = mqttClient_->subscribe(topic, mqtt_qos_); - if (!packet_id) { - if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) { - return; - } - LOG_ERROR("Error subscribing to topic '%s'", topic); - mqtt_publish_fails_++; // increment failure counter - } - - mqtt_messages_.pop_front(); // remove the message from the queue - - return; - } - - // if we're unsubscribing... - if (message->operation == Operation::UNSUBSCRIBE) { - LOG_DEBUG("Subscribing to topic '%s'", topic); - uint16_t packet_id = mqttClient_->unsubscribe(topic); - if (!packet_id) { - if (++mqtt_messages_.front().retry_count_ < MQTT_PUBLISH_MAX_RETRY) { - return; - } - LOG_ERROR("Error unsubscribing to topic '%s'", topic); - mqtt_publish_fails_++; // increment failure counter - } - - mqtt_messages_.pop_front(); // remove the message from the queue - - return; - } - - // else try and publish it - // this is where the *real* publish happens - // uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); - uint16_t packet_id = mqttClient_->publish(topic, mqtt_qos_, message->retain, message->payload.c_str()); - lasttopic_ = topic; - lastpayload_ = message->payload; - - LOG_DEBUG("Published topic %s (#%02d, retain=%d, retry=%d, size=%d, pid=%d)", - topic, - mqtt_message.id_, - message->retain, - mqtt_message.retry_count_ + 1, - message->payload.size(), - packet_id); - - /* - if (!message->payload.empty()) { - LOG_DEBUG("Payload: %s", message->payload.c_str()); // extra message for #784 - } - */ - - if (packet_id == 0) { - // it failed. if we retried n times, give up. remove from queue - if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) { - LOG_ERROR("Failed to publish to %s after %d attempts", topic, mqtt_message.retry_count_ + 1); - mqtt_publish_fails_++; // increment failure counter - mqtt_messages_.pop_front(); // delete - return; - } else { - // update the record - mqtt_messages_.front().retry_count_++; - LOG_DEBUG("Failed to publish to %s. Trying again, #%d", topic, mqtt_message.retry_count_ + 1); - return; // leave on queue for next time so it gets republished - } - } - - // if we have ACK set with QOS 1 or 2, leave on queue and let the ACK process remove it - // but add the packet_id so we can check it later - if (mqtt_qos_ != 0) { - mqtt_messages_.front().packet_id_ = packet_id; - LOG_DEBUG("Setting packetID for ACK to %d", packet_id); - return; - } - - mqtt_messages_.pop_front(); // remove the message from the queue + return queue_publish_message(Mqtt::discovery_prefix() + topic, payload_text, true); // with retain true } // create's a ha sensor config topic from a device value object // and also takes a flag (create_device_config) used to also create the main HA device config. This is only needed for one entity -void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config) { +bool Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config) { StaticJsonDocument dev_json; // always create the ids @@ -934,38 +737,39 @@ void Mqtt::publish_ha_sensor_config(DeviceValue & dv, const char * model, const // unless the entity has been marked as read-only and so it'll default to using the sensor/ type bool has_cmd = dv.has_cmd && !dv.has_state(DeviceValueState::DV_READONLY); - publish_ha_sensor_config(dv.type, - dv.tag, - dv.get_fullname().c_str(), - (dv.fullname ? dv.fullname[0] : nullptr), // EN name - dv.device_type, - dv.short_name, - dv.uom, - remove, - has_cmd, - dv.options, - dv.options_size, - dv_set_min, - dv_set_max, - dv.numeric_operator, - dev_json.as()); + return publish_ha_sensor_config(dv.type, + dv.tag, + dv.get_fullname().c_str(), + (dv.fullname ? dv.fullname[0] : nullptr), // EN name + dv.device_type, + dv.short_name, + dv.uom, + remove, + has_cmd, + dv.options, + dv.options_size, + dv_set_min, + dv_set_max, + dv.numeric_operator, + dev_json.as()); } // publish HA sensor for System using the heartbeat tag -void Mqtt::publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom) { +bool Mqtt::publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom) { StaticJsonDocument doc; JsonObject dev_json = doc.createNestedObject("dev"); JsonArray ids = dev_json.createNestedArray("ids"); ids.add("ems-esp"); - publish_ha_sensor_config(type, DeviceValueTAG::TAG_HEARTBEAT, name, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, 0, dev_json); + return publish_ha_sensor_config( + type, DeviceValueTAG::TAG_HEARTBEAT, name, name, EMSdevice::DeviceType::SYSTEM, entity, uom, false, false, nullptr, 0, 0, 0, 0, dev_json); } // MQTT discovery configs // entity must match the key/value pair in the *_data topic // note: some extra string copying done here, it looks messy but does help with heap fragmentation issues -void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType +bool Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdevice::DeviceValueType uint8_t tag, // EMSdevice::DeviceValueTAG const char * const fullname, // fullname, already translated const char * const en_name, // original name in english @@ -982,7 +786,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev const JsonObject & dev_json) { // ignore if name (fullname) is empty if (!fullname || !en_name) { - return; + return false; } // create the device name @@ -1074,8 +878,7 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev // https://github.com/emsesp/EMS-ESP32/issues/196 if (remove) { LOG_DEBUG("Queuing removing topic %s", topic); - queue_remove_topic(topic); - return; + return queue_remove_topic(topic); } // build the payload @@ -1305,10 +1108,10 @@ void Mqtt::publish_ha_sensor_config(uint8_t type, // EMSdev add_avty_to_doc(stat_t, doc.as(), val_cond); // TODO queue it or send it directly via publish? - queue_ha(topic, doc.as()); + return queue_ha(topic, doc.as()); } -void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) { +bool Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove, const int16_t min, const uint16_t max) { uint8_t hc_num = tag - DeviceValueTAG::TAG_HC1 + 1; char topic[Mqtt::MQTT_TOPIC_MAX_SIZE]; @@ -1329,8 +1132,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, snprintf(topic, sizeof(topic), "climate/%s/thermostat_hc%d/config", mqtt_basename_.c_str(), hc_num); if (remove) { - queue_remove_topic(topic); // publish empty payload with retain flag - return; + return queue_remove_topic(topic); // publish empty payload with retain flag } if (Mqtt::is_nested()) { @@ -1415,7 +1217,7 @@ void Mqtt::publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, // add "availability" section add_avty_to_doc(topic_t, doc.as(), seltemp_cond, has_roomtemp ? currtemp_cond : nullptr, hc_mode_cond); - queue_ha(topic, doc.as()); // publish the config payload with retain flag + return queue_ha(topic, doc.as()); // publish the config payload with retain flag } // based on the device and tag, create the MQTT topic name (without the basename) diff --git a/src/mqtt.h b/src/mqtt.h index ad77fb53b..a6f54ee8d 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -31,26 +31,8 @@ using uuid::console::Shell; namespace emsesp { -// size of queue -static constexpr uint16_t MAX_MQTT_MESSAGES = 300; - using mqtt_sub_function_p = std::function; -struct MqttMessage { - const uint8_t operation; - const std::string topic; - const std::string payload; - const bool retain; - - MqttMessage(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain) - : operation(operation) - , topic(topic) - , payload(payload) - , retain(retain) { - } - ~MqttMessage() = default; -}; - class Mqtt { public: enum discoveryType : uint8_t { HOMEASSISTANT, DOMOTICZ }; @@ -77,24 +59,23 @@ class Mqtt { static void on_connect(); static void on_disconnect(espMqttClientTypes::DisconnectReason reason); - static void subscribe(const uint8_t device_type, const std::string & topic, mqtt_sub_function_p cb); static void subscribe(const std::string & topic); static void resubscribe(); - static void queue_publish(const std::string & topic, const std::string & payload); - static void queue_publish(const char * topic, const char * payload); - static void queue_publish(const std::string & topic, const JsonObject & payload); - static void queue_publish(const char * topic, const JsonObject & payload); - static void queue_publish(const char * topic, const std::string & payload); - static void queue_publish_retain(const std::string & topic, const JsonObject & payload, bool retain); - static void queue_publish_retain(const char * topic, const std::string & payload, bool retain); - static void queue_publish_retain(const char * topic, const JsonObject & payload, bool retain); - static void queue_ha(const char * topic, const JsonObject & payload); - static void queue_remove_topic(const char * topic); + static bool queue_publish(const std::string & topic, const std::string & payload); + static bool queue_publish(const char * topic, const char * payload); + static bool queue_publish(const std::string & topic, const JsonObject & payload); + static bool queue_publish(const char * topic, const JsonObject & payload); + static bool queue_publish(const char * topic, const std::string & payload); + static bool queue_publish_retain(const std::string & topic, const JsonObject & payload, const bool retain); + static bool queue_publish_retain(const char * topic, const std::string & payload, const bool retain); + static bool queue_publish_retain(const char * topic, const JsonObject & payload, const bool retain); + static bool queue_ha(const char * topic, const JsonObject & payload); + static bool queue_remove_topic(const char * topic); - static void publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config = false); - static void publish_ha_sensor_config(uint8_t type, + static bool publish_ha_sensor_config(DeviceValue & dv, const char * model, const char * brand, const bool remove, const bool create_device_config = false); + static bool publish_ha_sensor_config(uint8_t type, uint8_t tag, const char * const fullname, const char * const en_name, @@ -110,8 +91,8 @@ class Mqtt { const int8_t num_op, const JsonObject & dev_json); - static void publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom); - static void publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove = false, const int16_t min = 5, const uint16_t max = 30); + static bool publish_system_ha_sensor_config(uint8_t type, const char * name, const char * entity, const uint8_t uom); + static bool publish_ha_climate_config(const uint8_t tag, const bool has_roomtemp, const bool remove = false, const int16_t min = 5, const uint16_t max = 30); static void show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_type); static void show_mqtt(uuid::console::Shell & shell); @@ -171,7 +152,7 @@ class Mqtt { } static uint32_t publish_queued() { - return mqtt_messages_.size(); + return queuecount_; } static uint8_t connect_count() { @@ -240,47 +221,24 @@ class Mqtt { mqtt_retain_ = mqtt_retain; } - static bool is_empty() { - return mqtt_messages_.empty(); - } - static std::string tag_to_topic(uint8_t device_type, uint8_t tag); static void add_avty_to_doc(const char * state_t, const JsonObject & doc, const char * cond1 = nullptr, const char * cond2 = nullptr, const char * negcond = nullptr); - struct QueuedMqttMessage { - const uint32_t id_; - const std::shared_ptr content_; - uint8_t retry_count_ = 0; - uint16_t packet_id_ = 0; - - ~QueuedMqttMessage() = default; - QueuedMqttMessage(uint32_t id, std::shared_ptr && content) - : id_(id) - , content_(std::move(content)) { - } - }; - static std::deque mqtt_messages_; - - private: static uuid::log::Logger logger_; static espMqttClient * mqttClient_; - static uint32_t mqtt_message_id_; + static uint32_t mqtt_message_id_; - static constexpr uint32_t MQTT_PUBLISH_WAIT = 100; // delay in ms between sending publishes, to account for large payloads - static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing - - static void queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain); - static void queue_publish_message(const std::string & topic, const std::string & payload, bool retain); + static bool queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain); + static bool queue_publish_message(const std::string & topic, const std::string & payload, const bool retain); static void queue_subscribe_message(const std::string & topic); static void queue_unsubscribe_message(const std::string & topic); void on_publish(uint16_t packetId) const; void on_message(const char * topic, const char * payload, size_t len) const; - void process_queue(); // function handlers for MQTT subscriptions struct MQTTSubFunction { @@ -310,6 +268,7 @@ class Mqtt { static bool connecting_; static bool initialized_; static uint32_t mqtt_publish_fails_; + static uint16_t queuecount_; static uint8_t connectcount_; static bool ha_climate_reset_;