diff --git a/src/mqtt.cpp b/src/mqtt.cpp index fbe669579..143412e04 100644 --- a/src/mqtt.cpp +++ b/src/mqtt.cpp @@ -93,6 +93,11 @@ void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_fu queue_subscribe_message(topic); // add subscription to queue } +// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device +void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) { + subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP +} + // resubscribe to all MQTT topics again void Mqtt::resubscribe() { if (mqtt_functions_.empty()) { @@ -104,11 +109,6 @@ void Mqtt::resubscribe() { } } -// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device -void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) { - subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP -} - // Main MQTT loop // Checks for connection, establishes a connection if not // sends out top item on publish queue @@ -138,6 +138,13 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) { shell.printfln(F("MQTT publish fails: %lu"), mqtt_publish_fails_); shell.println(); + // show subscriptions + shell.printfln(F("MQTT subscriptions:")); + for (const auto & mqtt_function : mqtt_functions_) { + shell.printfln(F(" %s"), mqtt_function.topic_.c_str()); + } + shell.println(); + // show queues if (mqtt_messages_.empty()) { shell.printfln(F("MQTT queue is empty")); @@ -283,13 +290,16 @@ void Mqtt::start(AsyncMqttClient * mqttClient) { mqtt_qos_ = mqttSettings.mqtt_qos; }); - mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); }); mqttClient_->setWill(make_topic(will_topic_, "status"), 1, true, "offline"); // with qos 1, retain true mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { on_message(topic, payload, len); mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); }); }); + + // add the system MQTT subscriptions + Mqtt::subscribe("cmd", System::mqtt_commands); + // Mqtt::subscribe("cmd", std::bind(&System::mqtt_commands, this, std::placeholders::_1)); } void Mqtt::set_publish_time(uint16_t publish_time) { @@ -327,7 +337,11 @@ void Mqtt::queue_publish_message(const std::string & topic, const std::string & return; } - auto message = std::make_shared(Operation::PUBLISH, topic, payload, retain); + // prefix the hostname to the topic + char full_topic[MQTT_TOPIC_MAX_SIZE]; + make_topic(full_topic, topic); + + auto message = std::make_shared(Operation::PUBLISH, full_topic, payload, retain); // if the queue is full, make room but removing the last one if (mqtt_messages_.size() >= maximum_mqtt_messages_) { @@ -396,17 +410,12 @@ void Mqtt::process_queue() { auto mqtt_message = mqtt_messages_.front(); auto message = mqtt_message.content_; - // append the hostname to the topic, unless we're doing native HA which has a different format - // if the topic starts with "homeassistant" we leave it untouched, otherwise append host - char full_topic[MQTT_TOPIC_MAX_SIZE]; - make_topic(full_topic, message->topic); - // if we're subscribing... if (message->operation == Operation::SUBSCRIBE) { - LOG_DEBUG(F("Subscribing to topic: %s"), full_topic); - uint16_t packet_id = mqttClient_->subscribe(full_topic, mqtt_qos_); + LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str()); + uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_); if (!packet_id) { - LOG_DEBUG(F("Error subscribing to %s, error %d"), full_topic, packet_id); + LOG_DEBUG(F("Error subscribing to %s, error %d"), message->topic.c_str(), packet_id); } mqtt_messages_.pop_front(); // remove the message from the queue @@ -421,19 +430,20 @@ void Mqtt::process_queue() { } // else try and publish it - uint16_t packet_id = mqttClient_->publish(full_topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); - LOG_DEBUG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id); + uint16_t packet_id = + mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_); + LOG_DEBUG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), message->topic.c_str(), mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id); if (packet_id == 0) { // it failed. if we retried n times, give up. remove from queue if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) { - LOG_ERROR(F("Failed to publish to %s after %d attempts"), full_topic, mqtt_message.retry_count_ + 1); + LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1); mqtt_publish_fails_++; // increment failure counter mqtt_messages_.pop_front(); // delete return; } else { mqtt_messages_.front().retry_count_++; - LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), full_topic, mqtt_message.retry_count_ + 1); + LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1); return; // leave on queue for next time so it gets republished } }