optimizing mqtt

This commit is contained in:
proddy
2020-07-25 18:27:37 +02:00
parent 82d8210754
commit 580f3ea45c
2 changed files with 105 additions and 82 deletions

View File

@@ -35,7 +35,7 @@ std::string Mqtt::hostname_;
uint8_t Mqtt::mqtt_qos_;
uint16_t Mqtt::publish_time_;
std::vector<Mqtt::MQTTFunction> Mqtt::mqtt_functions_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
uint16_t Mqtt::mqtt_publish_fails_ = 0;
size_t Mqtt::maximum_mqtt_messages_ = Mqtt::MAX_MQTT_MESSAGES;
uint16_t Mqtt::mqtt_message_id_ = 0;
@@ -51,62 +51,56 @@ Mqtt::QueuedMqttMessage::QueuedMqttMessage(uint16_t id, std::shared_ptr<MqttMess
packet_id_ = 0;
}
MqttMessage::MqttMessage(uint8_t operation, const std::string & topic, const std::string & payload, bool retain)
MqttMessage::MqttMessage(const uint8_t operation, const std::string & topic, const std::string & payload, bool retain)
: operation(operation)
, topic(topic)
, payload(payload)
, retain(retain) {
}
Mqtt::MQTTFunction::MQTTFunction(uint8_t device_id, const std::string && topic, mqtt_function_p mqtt_function)
Mqtt::MQTTSubFunction::MQTTSubFunction(const uint8_t device_id, const std::string && topic, mqtt_subfunction_p mqtt_subfunction)
: device_id_(device_id)
, topic_(topic)
, mqtt_function_(mqtt_function) {
, mqtt_subfunction_(mqtt_subfunction) {
}
// subscribe to an MQTT topic, and store the associated callback function
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb) {
/*
// We don't want to store the whole topic string in our lookup, just the last cmd, as this can take up too much memory
// strip out everything until the last /
size_t topic_pos = topic.find_last_of("/"); // returns npos which is -1
topic_pos += 1; // skip the /
*/
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_subfunction_p cb) {
auto message = queue_subscribe_message(topic); // add subscription to queue. The hostname will automatically be appended
// convert the topic to it's full path, so either prefixed with the hostname unless hardcoded like 'homeassistant'
char full_topic[MQTT_TOPIC_MAX_SIZE];
make_topic(full_topic, topic);
if (message == nullptr) {
return;
}
// the message will contain the full topic, with the hostname prefixed
// check if we already have the topic subscribed, if so don't add it again
bool exists = false;
if (!mqtt_functions_.empty()) {
for (const auto & mqtt_function : mqtt_functions_) {
if ((mqtt_function.device_id_ == device_id) && (strcmp(mqtt_function.topic_.c_str(), full_topic) == 0)) {
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_id_ == device_id) && (strcmp(mqtt_subfunction.topic_.c_str(), message->topic.c_str()) == 0)) {
exists = true;
}
}
}
if (!exists) {
mqtt_functions_.emplace_back(device_id, std::move(full_topic), cb); // register a call back function for a specific telegram type
mqtt_subfunctions_.emplace_back(device_id, std::move(message->topic), cb); // register a call back function for a specific telegram type
}
queue_subscribe_message(topic); // add subscription to queue
}
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device
void Mqtt::subscribe(const std::string & topic, mqtt_function_p cb) {
void Mqtt::subscribe(const std::string & topic, mqtt_subfunction_p cb) {
subscribe(0, topic, cb); // no device_id needed, if generic to EMS-ESP
}
// resubscribe to all MQTT topics again
void Mqtt::resubscribe() {
if (mqtt_functions_.empty()) {
if (mqtt_subfunctions_.empty()) {
return;
}
for (const auto & mqtt_function : mqtt_functions_) {
queue_subscribe_message(mqtt_function.topic_);
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
queue_message(Operation::SUBSCRIBE, mqtt_subfunction.topic_, "", false, true); // no payload, no topic prefixing
}
}
@@ -146,8 +140,8 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
// show subscriptions
shell.printfln(F("MQTT subscriptions:"));
for (const auto & mqtt_function : mqtt_functions_) {
shell.printfln(F(" %s"), mqtt_function.topic_.c_str());
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s"), mqtt_subfunction.topic_.c_str());
}
shell.println();
@@ -216,9 +210,9 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
// see if we have this topic in our subscription list, then call its callback handler
// note: this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail. Not sure if this is going to be an issue?
for (const auto & mf : mqtt_functions_) {
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
(mf.mqtt_function_)(message);
(mf.mqtt_subfunction_)(message);
return;
}
}
@@ -229,15 +223,17 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
// print all the topics related to a specific device_id
void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id) {
if (std::count_if(mqtt_functions_.cbegin(), mqtt_functions_.cend(), [=](MQTTFunction const & mqtt_function) { return device_id == mqtt_function.device_id_; })
if (std::count_if(mqtt_subfunctions_.cbegin(),
mqtt_subfunctions_.cend(),
[=](MQTTSubFunction const & mqtt_subfunction) { return device_id == mqtt_subfunction.device_id_; })
== 0) {
return;
}
shell.print(F(" Subscribed MQTT topics: "));
for (const auto & mqtt_function : mqtt_functions_) {
if (mqtt_function.device_id_ == device_id) {
shell.printf(F("%s "), mqtt_function.topic_.c_str());
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if (mqtt_subfunction.device_id_ == device_id) {
shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
}
}
shell.println();
@@ -285,7 +281,6 @@ char * Mqtt::make_topic(char * result, const std::string & topic) {
}
void Mqtt::start() {
mqttClient_ = EMSESP::esp8266React.getMqttClient();
// get the hostname, which we'll use to prefix to all topics
@@ -298,15 +293,18 @@ void Mqtt::start() {
});
mqttClient_->onConnect([this](bool sessionPresent) { on_connect(); });
mqttClient_->setWill(make_topic(will_topic_, "status"), 1, true, "offline"); // with qos 1, retain true
// create will_topic with the hostname prefixed. It has to be static because asyncmqttclient destroys the reference
static char will_topic[MQTT_TOPIC_MAX_SIZE];
strlcpy(will_topic, hostname_.c_str(), MQTT_TOPIC_MAX_SIZE);
strlcat(will_topic, "/", MQTT_TOPIC_MAX_SIZE);
strlcat(will_topic, "status", MQTT_TOPIC_MAX_SIZE);
mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true
mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
on_message(topic, payload, len);
mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); });
});
// add the system MQTT subscriptions
Mqtt::subscribe("cmd", System::mqtt_commands);
// Mqtt::subscribe("cmd", std::bind(&System::mqtt_commands, this, std::placeholders::_1));
}
void Mqtt::set_publish_time(uint16_t publish_time) {
@@ -334,47 +332,50 @@ void Mqtt::on_connect() {
resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics
// add the system MQTT subscriptions, only if its a fresh start with no previous subscriptions
if (mqtt_subfunctions_.empty()) {
Mqtt::subscribe("cmd", System::mqtt_commands);
}
LOG_INFO(F("MQTT connected"));
}
// add MQTT message to queue, payload is a string
void Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) {
// can't have bogus topics, but empty payloads are ok
// add sub or pub task to the queue. When the message is created, the topic will have
// automatically the hostname prefixed.
std::shared_ptr<const MqttMessage>
Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix) {
if (topic.empty()) {
return;
return nullptr;
}
// prefix the hostname to the topic
char full_topic[MQTT_TOPIC_MAX_SIZE];
make_topic(full_topic, topic);
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, full_topic, payload, retain);
// take the topic and prefix the hostname, unless its for HA
std::shared_ptr<MqttMessage> message;
if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0) || no_prefix) {
// leave topic as it is
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
} else {
// prefix the hostname
std::string full_topic = Mqtt::hostname_ + "/" + topic;
message = std::make_shared<MqttMessage>(operation, full_topic, payload, retain);
}
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
mqtt_messages_.pop_front();
}
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
return mqtt_messages_.back().content_; // this is because the message has been moved
}
// add MQTT message to queue, payload is a string
std::shared_ptr<const MqttMessage> Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) {
return queue_message(Operation::PUBLISH, topic, payload, retain);
}
// add MQTT subscribe message to queue
void Mqtt::queue_subscribe_message(const std::string & topic) {
if (topic.empty()) {
return;
}
auto message = std::make_shared<MqttMessage>(Operation::SUBSCRIBE, topic, "", false);
#ifdef DEBUG
LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str());
#endif
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
mqtt_messages_.pop_front();
}
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
std::shared_ptr<const MqttMessage> Mqtt::queue_subscribe_message(const std::string & topic) {
return queue_message(Operation::SUBSCRIBE, topic, "", false); // no payload
}
// MQTT Publish, using a specific retain flag
@@ -383,9 +384,8 @@ void Mqtt::publish(const std::string & topic, const std::string & payload, bool
}
void Mqtt::publish(const std::string & topic, const JsonDocument & payload, bool retain) {
// convert json to string
std::string payload_text;
serializeJson(payload, payload_text);
serializeJson(payload, payload_text); // convert json to string
queue_publish_message(topic, payload_text, retain);
}
@@ -413,6 +413,26 @@ void Mqtt::process_queue() {
return;
}
// show queue - Debug only
/*
Serial.printf("MQTT queue:\n\r");
for (const auto & message : mqtt_messages_) {
auto content = message.content_;
if (content->operation == Operation::PUBLISH) {
// Publish messages
Serial.printf(" [%02d] (Pub) topic=%s payload=%s (pid %d, retry #%d)\n\r",
message.id_,
content->topic.c_str(),
content->payload.c_str(),
message.packet_id_,
message.retry_count_);
} else {
// Subscribe messages
Serial.printf(" [%02d] (Sub) topic=%s\n\r", message.id_, content->topic.c_str());
}
}
*/
// fetch first from queue and create the full topic name
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;