initial commit with refactored mqtt commands

This commit is contained in:
proddy
2020-08-03 23:14:43 +02:00
parent 56c4e043fe
commit 6154ff38f2
41 changed files with 1208 additions and 1351 deletions

View File

@@ -20,12 +20,6 @@
#include "emsesp.h"
#include "version.h"
MAKE_PSTR_WORD(connected)
MAKE_PSTR_WORD(disconnected)
MAKE_PSTR(system_heartbeat_fmt, "Heartbeat is %s")
MAKE_PSTR(logger_name, "mqtt")
namespace emsesp {
AsyncMqttClient * Mqtt::mqttClient_;
@@ -35,14 +29,16 @@ std::string Mqtt::hostname_;
uint8_t Mqtt::mqtt_qos_;
uint16_t Mqtt::publish_time_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
std::vector<Mqtt::MQTTCmdFunction> Mqtt::mqtt_cmdfunctions_;
uint16_t Mqtt::mqtt_publish_fails_ = 0;
size_t Mqtt::maximum_mqtt_messages_ = Mqtt::MAX_MQTT_MESSAGES;
uint16_t Mqtt::mqtt_message_id_ = 0;
std::deque<Mqtt::QueuedMqttMessage> Mqtt::mqtt_messages_;
char will_topic_[Mqtt::MQTT_TOPIC_MAX_SIZE]; // because MQTT library keeps only char pointer
uuid::log::Logger Mqtt::logger_{F_(logger_name), uuid::log::Facility::DAEMON};
uuid::log::Logger Mqtt::logger_{F_(mqtt), uuid::log::Facility::DAEMON};
Mqtt::QueuedMqttMessage::QueuedMqttMessage(uint16_t id, std::shared_ptr<MqttMessage> && content)
: id_(id)
@@ -58,34 +54,65 @@ MqttMessage::MqttMessage(const uint8_t operation, const std::string & topic, con
, retain(retain) {
}
Mqtt::MQTTSubFunction::MQTTSubFunction(const uint8_t device_id, const std::string && topic, mqtt_subfunction_p mqtt_subfunction)
: device_id_(device_id)
Mqtt::MQTTSubFunction::MQTTSubFunction(const uint8_t device_type, const std::string && topic, const std::string && full_topic, mqtt_subfunction_p mqtt_subfunction)
: device_type_(device_type)
, topic_(topic)
, full_topic_(full_topic)
, mqtt_subfunction_(mqtt_subfunction) {
}
Mqtt::MQTTCmdFunction::MQTTCmdFunction(const uint8_t device_type, const __FlashStringHelper * cmd, mqtt_cmdfunction_p mqtt_cmdfunction)
: device_type_(device_type)
, cmd_(cmd)
, mqtt_cmdfunction_(mqtt_cmdfunction) {
}
// subscribe to an MQTT topic, and store the associated callback function
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_subfunction_p cb) {
auto message = queue_subscribe_message(topic); // add subscription to queue. The hostname will automatically be appended
if (message == nullptr) {
return;
}
// the message will contain the full topic, with the hostname prefixed
// only if it already hasn't been added
void Mqtt::subscribe(const uint8_t device_type, const std::string & topic, mqtt_subfunction_p cb) {
// check if we already have the topic subscribed, if so don't add it again
bool exists = false;
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_id_ == device_id) && (strcmp(mqtt_subfunction.topic_.c_str(), message->topic.c_str()) == 0)) {
exists = true;
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), topic.c_str()) == 0)) {
return; // it exists, exit
}
}
}
if (!exists) {
mqtt_subfunctions_.emplace_back(device_id, std::move(message->topic), cb); // register a call back function for a specific telegram type
// add to MQTT queue as a subscribe operation
auto message = queue_subscribe_message(topic);
// register in our libary with the callback function.
// We store both the original topic and the fully-qualified
mqtt_subfunctions_.emplace_back(device_type, std::move(topic), std::move(message->topic), cb);
}
// adds a command and callback function for a specific device
void Mqtt::add_command(const uint8_t device_type, const __FlashStringHelper * cmd, mqtt_cmdfunction_p cb) {
// subscribe to the command topic if it doesn't exist yet
// create the cmd topic for a device like "<device_type>_cmd" e.g. "boiler_cmd"
// unless its a system MQTT command
std::string cmd_topic(40, '\0');
if (device_type == EMSdevice::DeviceType::SERVICEKEY) {
cmd_topic = "system"; // hard-coded system
} else {
snprintf_P(&cmd_topic[0], 40, PSTR("%s_cmd"), EMSdevice::device_type_topic_name(device_type).c_str());
}
bool exists = false;
if (!mqtt_subfunctions_.empty()) {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if ((mqtt_subfunction.device_type_ == device_type) && (strcmp(mqtt_subfunction.topic_.c_str(), cmd_topic.c_str()) == 0)) {
exists = true;
}
}
}
if (!exists) {
Mqtt::subscribe(device_type, cmd_topic, nullptr); // use an empty function handler to signal this is a command function
}
// add the function to our list
mqtt_cmdfunctions_.emplace_back(device_type, cmd, cb);
}
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a specific device
@@ -142,7 +169,20 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
// show subscriptions
shell.printfln(F("MQTT subscriptions:"));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s"), mqtt_subfunction.topic_.c_str());
shell.printfln(F(" %s"), mqtt_subfunction.full_topic_.c_str());
}
shell.println();
// show command handlers
shell.printfln(F("MQTT commands:"));
for (const auto & mqtt_cmdfunction : mqtt_cmdfunctions_) {
if (mqtt_cmdfunction.device_type_ == EMSdevice::DeviceType::SERVICEKEY) {
shell.printfln(F(" on topic: system, cmd: %s"), uuid::read_flash_string(mqtt_cmdfunction.cmd_).c_str()); // hardcoded topic is system
} else {
shell.printfln(F(" on topic: %s_cmd, cmd: %s"),
EMSdevice::device_type_topic_name(mqtt_cmdfunction.device_type_).c_str(),
uuid::read_flash_string(mqtt_cmdfunction.cmd_).c_str());
}
}
shell.println();
@@ -190,6 +230,31 @@ void Mqtt::incoming(char * topic, char * payload) {
on_message(topic, payload, strlen(payload));
}
// calls a command, context is the device_type
// id may be used to represent a heating circuit for example
bool Mqtt::call_command(const uint8_t device_type, const char * cmd, const char * value, const int8_t id) {
#ifdef EMSESP_DEBUG
if (id == -1) {
LOG_DEBUG(F("calling command %s, value %s, id is default"), cmd, value);
} else {
LOG_DEBUG(F("calling command %s, value %s, id is %d"), cmd, value, id);
}
#endif
if (!mqtt_cmdfunctions_.empty()) {
for (const auto & cf : mqtt_cmdfunctions_) {
if (cf.device_type_ == device_type) {
const char * cf_cmd = uuid::read_flash_string(cf.cmd_).c_str();
if (strcmp(cf_cmd, cmd) == 0) {
(cf.mqtt_cmdfunction_)(value, id); // call function, data needs to be a string and can be null
return true;
}
}
}
}
return false;
}
// received an MQTT message that we subscribed too
void Mqtt::on_message(char * topic, char * payload, size_t len) {
if (len == 0) {
@@ -199,33 +264,65 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
// convert payload to a null-terminated char string
char message[len + 2];
strlcpy(message, payload, len + 1);
#ifdef EMSESP_DEBUG
LOG_DEBUG(F("[DEBUG] Received %s => %s (length %d)"), topic, message, len);
#endif
// see if we have this topic in our subscription list, then call its callback handler
// note: this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail. Not sure if this is going to be an issue?
for (const auto & mf : mqtt_subfunctions_) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
(mf.mqtt_subfunction_)(message);
return;
if (strcmp(topic, mf.full_topic_.c_str()) == 0) {
if (mf.mqtt_subfunction_) {
(mf.mqtt_subfunction_)(message); // matching function, call it
return;
} else {
// empty function. It's a command then. Find the command from the json and call it directly.
StaticJsonDocument<EMSESP_MAX_JSON_SIZE_SMALL> doc;
DeserializationError error = deserializeJson(doc, message);
if (error) {
LOG_ERROR(F("MQTT error: payload %s, error %s"), message, error.c_str());
return;
}
const char * command = doc["cmd"];
if (command == nullptr) {
LOG_ERROR(F("MQTT error: invalid payload cmd format. message=%s"), message);
return;
}
// check for hc and id
int8_t n = -1; // no value
if (doc.containsKey("hc")) {
n = doc["hc"];
} else if (doc.containsKey("id")) {
n = doc["id"];
}
if (!call_command(mf.device_type_, command, doc["data"], n)) {
// if we got here we didn't find a matching command
LOG_ERROR(F("MQTT error: no matching cmd: %s"), command);
}
return;
}
}
}
// if we got here we didn't find a topic match
LOG_DEBUG(F("No responding handler found for topic %s"), topic);
LOG_ERROR(F("No MQTT handler found for topic %s and payload %s"), topic, message);
}
// print all the topics related to a specific device_id
void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_id) {
// print all the topics related to a specific device type
void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t device_type) {
if (std::count_if(mqtt_subfunctions_.cbegin(),
mqtt_subfunctions_.cend(),
[=](MQTTSubFunction const & mqtt_subfunction) { return device_id == mqtt_subfunction.device_id_; })
[=](MQTTSubFunction const & mqtt_subfunction) { return device_type == mqtt_subfunction.device_type_; })
== 0) {
return;
}
shell.print(F(" Subscribed MQTT topics: "));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
if (mqtt_subfunction.device_id_ == device_id) {
if (mqtt_subfunction.device_type_ == device_type) {
shell.printf(F("%s "), mqtt_subfunction.topic_.c_str());
}
}
@@ -297,8 +394,13 @@ void Mqtt::start() {
mqttClient_->setWill(will_topic, 1, true, "offline"); // with qos 1, retain true
mqttClient_->onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
// receiving mqtt
on_message(topic, payload, len);
mqttClient_->onPublish([this](uint16_t packetId) { on_publish(packetId); });
});
mqttClient_->onPublish([this](uint16_t packetId) {
// publish
on_publish(packetId);
});
}
@@ -328,15 +430,18 @@ void Mqtt::on_connect() {
resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics
// add the system MQTT subscriptions, only if its a fresh start with no previous subscriptions
// these commands respond to the topic "system" and take a payload like {cmd:"", data:"", id:""}
if (mqtt_subfunctions_.empty()) {
Mqtt::subscribe("cmd", System::mqtt_commands);
add_command(EMSdevice::DeviceType::SERVICEKEY, F("gpio"), System::mqtt_command_gpio);
add_command(EMSdevice::DeviceType::SERVICEKEY, F("send"), System::mqtt_command_send);
}
LOG_INFO(F("MQTT connected"));
}
// add sub or pub task to the queue. When the message is created, the topic will have
// automatically the hostname prefixed.
// add sub or pub task to the queue.
// a fully-qualified topic is created by prefixing the hostname, unless it's HA
// returns a pointer to the message created
std::shared_ptr<const MqttMessage>
Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix) {
if (topic.empty()) {