individual subscriptions: resubscribe, show, some system commands

This commit is contained in:
MichaelDvP
2021-03-23 15:28:33 +01:00
parent 1dae9f8beb
commit a83d3a12fb
5 changed files with 53 additions and 30 deletions

View File

@@ -91,7 +91,7 @@ void Command::add(const uint8_t device_type, const __FlashStringHelper * cmd, cm
return;
}
cmdfunctions_.emplace_back(device_type, cmd, cb, nullptr);
cmdfunctions_.emplace_back(device_type, flag, cmd, cb, nullptr);
// see if we need to subscribe
if (Mqtt::enabled()) {
@@ -106,7 +106,7 @@ void Command::add_with_json(const uint8_t device_type, const __FlashStringHelper
return;
}
cmdfunctions_.emplace_back(device_type, cmd, nullptr, cb); // add command
cmdfunctions_.emplace_back(device_type, MqttSubFlag::FLAG_NOSUB, cmd, nullptr, cb); // add command
}
// see if a command exists for that device type

View File

@@ -41,12 +41,14 @@ class Command {
public:
struct CmdFunction {
uint8_t device_type_; // DeviceType::
uint8_t flag_;
const __FlashStringHelper * cmd_;
cmdfunction_p cmdfunction_;
cmdfunction_json_p cmdfunction_json_;
CmdFunction(const uint8_t device_type, const __FlashStringHelper * cmd, cmdfunction_p cmdfunction, cmdfunction_json_p cmdfunction_json)
CmdFunction(const uint8_t device_type, const uint8_t flag, const __FlashStringHelper * cmd, cmdfunction_p cmdfunction, cmdfunction_json_p cmdfunction_json)
: device_type_(device_type)
, flag_(flag)
, cmd_(cmd)
, cmdfunction_(cmdfunction)
, cmdfunction_json_(cmdfunction_json) {

View File

@@ -140,9 +140,10 @@ enum DeviceValueTAG : uint8_t {
// mqtt flags for command subscriptions
enum MqttSubFlag : uint8_t {
FLAG_NONE = 0,
FLAG_NORMAL = 0,
FLAG_HC,
FLAG_WWC
FLAG_WWC,
FLAG_NOSUB
};
class EMSdevice {

View File

@@ -107,14 +107,8 @@ void Mqtt::register_command(const uint8_t device_type, const __FlashStringHelper
LOG_DEBUG(F("Registering MQTT cmd %s with topic %s"), uuid::read_flash_string(cmd).c_str(), EMSdevice::device_type_2_device_name(device_type).c_str());
}
// only general device topics
if (subscribes_ == 0) {
return;
}
// register the individual commands too (e.g. ems-esp/boiler/wwonetime)
// https://github.com/emsesp/EMS-ESP32/issues/31
if (device_type != EMSdevice::DeviceType::SYSTEM) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
if (subscribes_ == 2 && flag == MqttSubFlag::FLAG_HC) {
topic = cmd_topic + "/hc1/" + uuid::read_flash_string(cmd);
@@ -125,12 +119,11 @@ void Mqtt::register_command(const uint8_t device_type, const __FlashStringHelper
queue_subscribe_message(topic);
topic = cmd_topic + "/hc4/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
} else {
} else if (subscribes_ && flag != MqttSubFlag::FLAG_NOSUB) {
topic = cmd_topic + "/" + uuid::read_flash_string(cmd);
queue_subscribe_message(topic);
}
}
}
// subscribe to an MQTT topic, and store the associated callback function
// For generic functions not tied to a specific device
@@ -147,6 +140,22 @@ void Mqtt::resubscribe() {
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
queue_subscribe_message(mqtt_subfunction.topic_);
}
for (const auto & cf : Command::commands()) {
std::string topic(MQTT_TOPIC_MAX_SIZE, '\0');
if (subscribes_ == 2 && cf.flag_ == MqttSubFlag::FLAG_HC) {
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc1/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc2/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc3/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/hc4/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
} else if (subscribes_ && cf.flag_ != MqttSubFlag::FLAG_NOSUB) {
topic = EMSdevice::device_type_2_device_name(cf.device_type_) + "/" + uuid::read_flash_string(cf.cmd_);
queue_subscribe_message(topic);
}
}
}
// Main MQTT loop - sends out top item on publish queue
@@ -215,7 +224,17 @@ void Mqtt::show_mqtt(uuid::console::Shell & shell) {
// show subscriptions
shell.printfln(F("MQTT topic subscriptions:"));
for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
shell.printfln(F(" %s/%s (%s)"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str(), EMSdevice::device_type_2_device_name(mqtt_subfunction.device_type_).c_str());
shell.printfln(F(" %s/%s"), mqtt_base_.c_str(), mqtt_subfunction.topic_.c_str());
}
for (const auto & cf : Command::commands()) {
if (subscribes_ == 2 && cf.flag_ == MqttSubFlag::FLAG_HC) {
shell.printfln(F(" %s/%s/hc1/%s"), mqtt_base_.c_str(), EMSdevice::device_type_2_device_name(cf.device_type_).c_str(), uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc2/%s"), mqtt_base_.c_str(), EMSdevice::device_type_2_device_name(cf.device_type_).c_str(), uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc3/%s"), mqtt_base_.c_str(), EMSdevice::device_type_2_device_name(cf.device_type_).c_str(), uuid::read_flash_string(cf.cmd_).c_str());
shell.printfln(F(" %s/%s/hc4/%s"), mqtt_base_.c_str(), EMSdevice::device_type_2_device_name(cf.device_type_).c_str(), uuid::read_flash_string(cf.cmd_).c_str());
} else if (subscribes_ && cf.flag_ != MqttSubFlag::FLAG_NOSUB) {
shell.printfln(F(" %s/%s/%s"), mqtt_base_.c_str(), EMSdevice::device_type_2_device_name(cf.device_type_).c_str(), uuid::read_flash_string(cf.cmd_).c_str());
}
}
shell.println();
@@ -517,7 +536,7 @@ void Mqtt::start() {
});
// create space for command buffer, to avoid heap memory fragmentation
mqtt_subfunctions_.reserve(50);
mqtt_subfunctions_.reserve(5);
}
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {

View File

@@ -400,7 +400,7 @@ void System::send_heartbeat() {
doc["uptime"] = uuid::log::format_timestamp_ms(uuid::get_uptime_ms(), 3);
doc["uptime_sec"] = uuid::get_uptime_sec();
doc["mqttfails"] = Mqtt::publish_fails();
doc["rxsent"] = EMSESP::rxservice_.telegram_count();
doc["rxreceived"] = EMSESP::rxservice_.telegram_count();
doc["rxfails"] = EMSESP::rxservice_.telegram_error_count();
doc["txread"] = EMSESP::txservice_.telegram_read_count();
doc["txwrite"] = EMSESP::txservice_.telegram_write_count();
@@ -553,8 +553,9 @@ void System::system_check() {
// commands - takes static function pointers
// these commands respond to the topic "system" and take a payload like {cmd:"", data:"", id:""}
// no individual subsribe for pin command because id is needed
void System::commands_init() {
Command::add(EMSdevice::DeviceType::SYSTEM, F_(pin), System::command_pin);
Command::add(EMSdevice::DeviceType::SYSTEM, F_(pin), System::command_pin, MqttSubFlag::FLAG_NOSUB);
Command::add(EMSdevice::DeviceType::SYSTEM, F_(send), System::command_send);
Command::add(EMSdevice::DeviceType::SYSTEM, F_(publish), System::command_publish);
Command::add(EMSdevice::DeviceType::SYSTEM, F_(fetch), System::command_fetch);