fix publish, added subscriptions to show mqtt. Subscribe to cmd during initialization

This commit is contained in:
proddy
2020-07-21 14:35:57 +02:00
parent bab2ac8d65
commit 2346682eea

View File

@@ -93,6 +93,11 @@ void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_fu
queue_subscribe_message(topic); // add subscription to queue
}
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device
void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP
}
// resubscribe to all MQTT topics again
void Mqtt::resubscribe() {
if (mqtt_functions_.empty()) {
@@ -104,11 +109,6 @@ void Mqtt::resubscribe() {
}
}
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device
void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP
}
// Main MQTT loop
// Checks for connection, establishes a connection if not
// sends out top item on publish queue
@@ -138,6 +138,13 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
shell.printfln(F("MQTT publish fails: %lu"), mqtt_publish_fails_);
shell.println();
// show subscriptions
shell.printfln(F("MQTT subscriptions:"));
for (const auto & mqtt_function : mqtt_functions_) {
shell.printfln(F(" %s"), mqtt_function.topic_.c_str());
}
shell.println();
// show queues
if (mqtt_messages_.empty()) {
shell.printfln(F("MQTT queue is empty"));
@@ -283,13 +290,16 @@ void Mqtt::start(AsyncMqttClient * mqttClient) {
mqtt_qos_ = mqttSettings.mqtt_qos;
});
mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); });
mqttClient_->setWill(make_topic(will_topic_, "status"), 1, true, "offline"); // with qos 1, retain true
mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
on_message(topic, payload, len);
mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); });
});
// add the system MQTT subscriptions
Mqtt::subscribe("cmd", System::mqtt_commands);
// Mqtt::subscribe("cmd", std::bind(&System::mqtt_commands, this, std::placeholders::_1));
}
void Mqtt::set_publish_time(uint16_t publish_time) {
@@ -327,7 +337,11 @@ void Mqtt::queue_publish_message(const std::string & topic, const std::string &
return;
}
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, topic, payload, retain);
// prefix the hostname to the topic
char full_topic[MQTT_TOPIC_MAX_SIZE];
make_topic(full_topic, topic);
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, full_topic, payload, retain);
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
@@ -396,17 +410,12 @@ void Mqtt::process_queue() {
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
// append the hostname to the topic, unless we're doing native HA which has a different format
// if the topic starts with "homeassistant" we leave it untouched, otherwise append host
char full_topic[MQTT_TOPIC_MAX_SIZE];
make_topic(full_topic, message->topic);
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
LOG_DEBUG(F("Subscribing to topic: %s"), full_topic);
uint16_t packet_id = mqttClient_->subscribe(full_topic, mqtt_qos_);
LOG_DEBUG(F("Subscribing to topic: %s"), message->topic.c_str());
uint16_t packet_id = mqttClient_->subscribe(message->topic.c_str(), mqtt_qos_);
if (!packet_id) {
LOG_DEBUG(F("Error subscribing to %s, error %d"), full_topic, packet_id);
LOG_DEBUG(F("Error subscribing to %s, error %d"), message->topic.c_str(), packet_id);
}
mqtt_messages_.pop_front(); // remove the message from the queue
@@ -421,19 +430,20 @@ void Mqtt::process_queue() {
}
// else try and publish it
uint16_t packet_id = mqttClient_->publish(full_topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
LOG_DEBUG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
uint16_t packet_id =
mqttClient_->publish(message->topic.c_str(), mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
LOG_DEBUG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), message->topic.c_str(), mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
LOG_ERROR(F("Failed to publish to %s after %d attempts"), full_topic, mqtt_message.retry_count_ + 1);
LOG_ERROR(F("Failed to publish to %s after %d attempts"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
mqtt_publish_fails_++; // increment failure counter
mqtt_messages_.pop_front(); // delete
return;
} else {
mqtt_messages_.front().retry_count_++;
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), full_topic, mqtt_message.retry_count_ + 1);
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), message->topic.c_str(), mqtt_message.retry_count_ + 1);
return; // leave on queue for next time so it gets republished
}
}