mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-07 08:19:52 +03:00
resubscribe to mqtt on re-connect - #421
This commit is contained in:
76
src/mqtt.cpp
76
src/mqtt.cpp
@@ -67,15 +67,50 @@ Mqtt::MQTTFunction::MQTTFunction(uint8_t device_id, const std::string && topic,
|
|||||||
|
|
||||||
// subscribe to an MQTT topic, and store the associated callback function
|
// subscribe to an MQTT topic, and store the associated callback function
|
||||||
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb) {
|
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb) {
|
||||||
// We don't want to store the whole topic string in our lookup, just the last cmd, as this is wasteful.
|
/*
|
||||||
|
// We don't want to store the whole topic string in our lookup, just the last cmd, as this can take up too much memory
|
||||||
// strip out everything until the last /
|
// strip out everything until the last /
|
||||||
size_t found = topic.find_last_of("/"); // returns npos which is -1
|
size_t topic_pos = topic.find_last_of("/"); // returns npos which is -1
|
||||||
mqtt_functions_.emplace_back(device_id, std::move(topic.substr(found + 1)), cb); // register a call back function for a specific telegram type
|
topic_pos += 1; // skip the /
|
||||||
|
*/
|
||||||
|
|
||||||
|
// convert the topic to it's full path, so either prefixed with the hostname unless hardcoded like 'homeassistant'
|
||||||
|
char full_topic[MQTT_TOPIC_MAX_SIZE];
|
||||||
|
make_topic(full_topic, topic);
|
||||||
|
|
||||||
|
// check if we already have the topic subscribed, if so don't add it again
|
||||||
|
bool exists = false;
|
||||||
|
if (!mqtt_functions_.empty()) {
|
||||||
|
for (const auto & mqtt_function : mqtt_functions_) {
|
||||||
|
if ((mqtt_function.device_id_ == device_id) && (strcmp(mqtt_function.topic_.c_str(), full_topic) == 0)) {
|
||||||
|
exists = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!exists) {
|
||||||
|
mqtt_functions_.emplace_back(device_id, std::move(full_topic), cb); // register a call back function for a specific telegram type
|
||||||
|
} else {
|
||||||
|
#ifdef EMSESP_DEBUG
|
||||||
|
LOG_DEBUG(F("[DEBUG] Resubscribing to topic %s"), full_topic);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
queue_subscribe_message(topic); // add subscription to queue
|
queue_subscribe_message(topic); // add subscription to queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a device
|
// resubscribe to all MQTT topics again
|
||||||
|
void Mqtt::resubscribe() {
|
||||||
|
if (mqtt_functions_.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & mqtt_function : mqtt_functions_) {
|
||||||
|
queue_subscribe_message(mqtt_function.topic_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device
|
||||||
void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
|
void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
|
||||||
subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP
|
subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP
|
||||||
}
|
}
|
||||||
@@ -84,7 +119,7 @@ void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
|
|||||||
// Checks for connection, establishes a connection if not
|
// Checks for connection, establishes a connection if not
|
||||||
// sends out top item on publish queue
|
// sends out top item on publish queue
|
||||||
void Mqtt::loop() {
|
void Mqtt::loop() {
|
||||||
// exit if MQTT is not enabled or ig there is no WIFI
|
// exit if MQTT is not enabled or if there is no WIFI
|
||||||
#ifndef EMSESP_STANDALONE
|
#ifndef EMSESP_STANDALONE
|
||||||
if (!connected()) {
|
if (!connected()) {
|
||||||
#else
|
#else
|
||||||
@@ -152,12 +187,12 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
|
|||||||
shell.println();
|
shell.println();
|
||||||
}
|
}
|
||||||
|
|
||||||
// simulate receiving a MQTT message, only for testing
|
// simulate receiving a MQTT message, used only for testing
|
||||||
void Mqtt::incoming(char * topic, char * payload) {
|
void Mqtt::incoming(char * topic, char * payload) {
|
||||||
on_message(topic, payload, strlen(payload));
|
on_message(topic, payload, strlen(payload));
|
||||||
}
|
}
|
||||||
|
|
||||||
// received MQTT message
|
// received an MQTT message that we subscribed too
|
||||||
void Mqtt::on_message(char * topic, char * payload, size_t len) {
|
void Mqtt::on_message(char * topic, char * payload, size_t len) {
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
return;
|
return;
|
||||||
@@ -171,14 +206,16 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
|
|||||||
LOG_DEBUG(F("[DEBUG] Received %s => %s (length %d)"), topic, message, len);
|
LOG_DEBUG(F("[DEBUG] Received %s => %s (length %d)"), topic, message, len);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
// strip out everything until the last /
|
// strip out everything until the last /
|
||||||
char * topic_magnitude = strrchr(topic, '/');
|
char * topic_magnitude = strrchr(topic, '/');
|
||||||
if (topic_magnitude != nullptr) {
|
if (topic_magnitude != nullptr) {
|
||||||
topic = topic_magnitude + 1;
|
topic = topic_magnitude + 1;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// Send message event to custom service
|
// see if we have this topic in our subscription list, then call its callback handler
|
||||||
// this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail
|
// note: this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail. Not sure if this is going to be an issue?
|
||||||
for (const auto & mf : mqtt_functions_) {
|
for (const auto & mf : mqtt_functions_) {
|
||||||
if (strcmp(topic, mf.topic_.c_str()) == 0) {
|
if (strcmp(topic, mf.topic_.c_str()) == 0) {
|
||||||
(mf.mqtt_function_)(message);
|
(mf.mqtt_function_)(message);
|
||||||
@@ -232,7 +269,14 @@ void Mqtt::on_publish(uint16_t packetId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// builds up a topic by prefixing the hostname
|
// builds up a topic by prefixing the hostname
|
||||||
|
// unless it's hardcoded like "homeassistant"
|
||||||
char * Mqtt::make_topic(char * result, const std::string & topic) {
|
char * Mqtt::make_topic(char * result, const std::string & topic) {
|
||||||
|
// check for homesassistant
|
||||||
|
if (strncmp(topic.c_str(), "homeassistant/", 13) == 0) {
|
||||||
|
strlcpy(result, topic.c_str(), MQTT_TOPIC_MAX_SIZE);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
strlcpy(result, hostname_.c_str(), MQTT_TOPIC_MAX_SIZE);
|
strlcpy(result, hostname_.c_str(), MQTT_TOPIC_MAX_SIZE);
|
||||||
strlcat(result, "/", MQTT_TOPIC_MAX_SIZE);
|
strlcat(result, "/", MQTT_TOPIC_MAX_SIZE);
|
||||||
strlcat(result, topic.c_str(), MQTT_TOPIC_MAX_SIZE);
|
strlcat(result, topic.c_str(), MQTT_TOPIC_MAX_SIZE);
|
||||||
@@ -283,6 +327,8 @@ void Mqtt::on_connect() {
|
|||||||
|
|
||||||
reset_publish_fails(); // reset fail count to 0
|
reset_publish_fails(); // reset fail count to 0
|
||||||
|
|
||||||
|
resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics
|
||||||
|
|
||||||
LOG_INFO(F("MQTT connected"));
|
LOG_INFO(F("MQTT connected"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -310,7 +356,9 @@ void Mqtt::queue_subscribe_message(const std::string & topic) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto message = std::make_shared<MqttMessage>(Operation::SUBSCRIBE, topic, "", false);
|
auto message = std::make_shared<MqttMessage>(Operation::SUBSCRIBE, topic, "", false);
|
||||||
// LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str());
|
#ifdef DEBUG
|
||||||
|
LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str());
|
||||||
|
#endif
|
||||||
|
|
||||||
// if the queue is full, make room but removing the last one
|
// if the queue is full, make room but removing the last one
|
||||||
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
|
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
|
||||||
@@ -349,7 +397,7 @@ void Mqtt::process_all_queue() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// take top from queue and try and publish it
|
// take top from queue and perform the publish or subscribe action
|
||||||
// assumes there is an MQTT connection
|
// assumes there is an MQTT connection
|
||||||
void Mqtt::process_queue() {
|
void Mqtt::process_queue() {
|
||||||
if (mqtt_messages_.empty()) {
|
if (mqtt_messages_.empty()) {
|
||||||
@@ -363,11 +411,7 @@ void Mqtt::process_queue() {
|
|||||||
// append the hostname to the topic, unless we're doing native HA which has a different format
|
// append the hostname to the topic, unless we're doing native HA which has a different format
|
||||||
// if the topic starts with "homeassistant" we leave it untouched, otherwise append host
|
// if the topic starts with "homeassistant" we leave it untouched, otherwise append host
|
||||||
char full_topic[MQTT_TOPIC_MAX_SIZE];
|
char full_topic[MQTT_TOPIC_MAX_SIZE];
|
||||||
if (strncmp(message->topic.c_str(), "homeassistant/", 13) == 0) {
|
make_topic(full_topic, message->topic);
|
||||||
strcpy(full_topic, message->topic.c_str());
|
|
||||||
} else {
|
|
||||||
make_topic(full_topic, message->topic);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we're subscribing...
|
// if we're subscribing...
|
||||||
if (message->operation == Operation::SUBSCRIBE) {
|
if (message->operation == Operation::SUBSCRIBE) {
|
||||||
|
|||||||
@@ -69,11 +69,11 @@ class Mqtt {
|
|||||||
|
|
||||||
enum Operation { PUBLISH, SUBSCRIBE };
|
enum Operation { PUBLISH, SUBSCRIBE };
|
||||||
|
|
||||||
static constexpr uint8_t MQTT_TOPIC_MAX_SIZE = 60;
|
static constexpr uint8_t MQTT_TOPIC_MAX_SIZE = 100;
|
||||||
|
|
||||||
// are static to be accessed from EMS devices
|
|
||||||
static void subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb);
|
static void subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb);
|
||||||
static void subscribe(const std::string & topic, mqtt_function_p cb);
|
static void subscribe(const std::string & topic, mqtt_function_p cb);
|
||||||
|
static void resubscribe();
|
||||||
|
|
||||||
static void publish(const std::string & topic, const std::string & payload, bool retain = false);
|
static void publish(const std::string & topic, const std::string & payload, bool retain = false);
|
||||||
static void publish(const std::string & topic, const JsonDocument & payload, bool retain = false);
|
static void publish(const std::string & topic, const JsonDocument & payload, bool retain = false);
|
||||||
@@ -81,7 +81,6 @@ class Mqtt {
|
|||||||
static void publish(const std::string & topic);
|
static void publish(const std::string & topic);
|
||||||
|
|
||||||
static void show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id);
|
static void show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id);
|
||||||
|
|
||||||
static void show_mqtt(uuid::console::Shell & shell);
|
static void show_mqtt(uuid::console::Shell & shell);
|
||||||
|
|
||||||
static void on_connect();
|
static void on_connect();
|
||||||
@@ -161,7 +160,7 @@ class Mqtt {
|
|||||||
mqtt_function_p mqtt_function_;
|
mqtt_function_p mqtt_function_;
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::vector<MQTTFunction> mqtt_functions_; // list of mqtt callbacks for all devices
|
static std::vector<MQTTFunction> mqtt_functions_; // list of mqtt subscribe callbacks for all devices
|
||||||
|
|
||||||
uint32_t last_mqtt_poll_ = 0;
|
uint32_t last_mqtt_poll_ = 0;
|
||||||
uint32_t last_publish_ = 0;
|
uint32_t last_publish_ = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user