This commit is contained in:
Paul
2020-05-18 21:00:38 +02:00
parent 97ddf80ea8
commit be4f075663
18 changed files with 270 additions and 141 deletions

View File

@@ -24,7 +24,9 @@ MAKE_PSTR_WORD(qos)
MAKE_PSTR_WORD(base)
MAKE_PSTR_WORD(heartbeat)
MAKE_PSTR_WORD(ip)
MAKE_PSTR_WORD(nested_json)
MAKE_PSTR_WORD(nested)
MAKE_PSTR_WORD(single)
MAKE_PSTR_WORD(ha)
MAKE_PSTR_WORD(publish_time)
MAKE_PSTR_WORD(publish)
MAKE_PSTR_WORD(connected)
@@ -38,7 +40,7 @@ MAKE_PSTR(mqtt_enabled_fmt, "MQTT is %s")
MAKE_PSTR(mqtt_base_fmt, "Base = %s")
MAKE_PSTR(mqtt_qos_fmt, "QOS = %ld")
MAKE_PSTR(mqtt_retain_fmt, "Retain Flag = %s")
MAKE_PSTR(mqtt_nestedjson_fmt, "Use nested JSON = %s")
MAKE_PSTR(mqtt_format_fmt, "JSON format = %s")
MAKE_PSTR(mqtt_heartbeat_fmt, "Heartbeat = %s")
MAKE_PSTR(mqtt_publish_time_fmt, "Publish time = %d seconds")
@@ -54,6 +56,7 @@ std::vector<Mqtt::MQTTFunction> Mqtt::mqtt_functions_;
bool Mqtt::mqtt_retain_;
uint8_t Mqtt::mqtt_qos_;
std::string Mqtt::mqtt_hostname_; // copy of hostname
uint8_t Mqtt::mqtt_format_;
std::string Mqtt::mqtt_base_;
uint16_t Mqtt::mqtt_publish_fails_ = 0;
size_t Mqtt::maximum_mqtt_messages_ = Mqtt::MAX_MQTT_MESSAGES;
@@ -71,9 +74,8 @@ Mqtt::QueuedMqttMessage::QueuedMqttMessage(uint16_t id, std::shared_ptr<MqttMess
packet_id_ = 0;
}
MqttMessage::MqttMessage(uint64_t uptime_ms, uint8_t operation, const std::string & topic, const std::string & payload, bool retain)
: uptime_ms(uptime_ms)
, operation(operation)
MqttMessage::MqttMessage(uint8_t operation, const std::string & topic, const std::string & payload, bool retain)
: operation(operation)
, topic(topic)
, payload(payload)
, retain(retain) {
@@ -107,6 +109,7 @@ void Mqtt::start() {
mqtt_hostname_ = settings.hostname();
mqtt_base_ = settings.mqtt_base();
mqtt_qos_ = settings.mqtt_qos();
mqtt_format_ = settings.mqtt_format();
mqtt_retain_ = settings.mqtt_retain();
mqtt_heartbeat_ = settings.mqtt_heartbeat();
mqtt_publish_time_ = settings.mqtt_publish_time() * 1000; // convert to seconds
@@ -196,8 +199,12 @@ Mqtt::MQTTFunction::MQTTFunction(uint8_t device_id, const std::string && topic,
// subscribe to an MQTT topic, and store the associated callback function
void Mqtt::subscribe(const uint8_t device_id, const std::string & topic, mqtt_function_p cb) {
mqtt_functions_.emplace_back(device_id, std::move(topic), cb); // register a call back function for a specific telegram type
queue_subscribe_message(topic); // add subscription to queue
// We don't want to store the whole topic string in our lookup, just the last cmd, as this is wasteful.
// strip out everything until the last /
size_t found = topic.find_last_of("/"); // returns npos which is -1
mqtt_functions_.emplace_back(device_id, std::move(topic.substr(found + 1)), cb); // register a call back function for a specific telegram type
queue_subscribe_message(topic); // add subscription to queue
}
// subscribe to an MQTT topic, and store the associated callback function. For generic functions not tied to a device
@@ -224,7 +231,7 @@ void Mqtt::loop() {
force_publish_ = false;
send_heartbeat(); // create a heartbeat payload
EMSESP::publish_all_values(); // add sensors and mqtt to queue
publish_all_queue(); // publish everything on queue
process_all_queue(); // publish everything on queue
}
// send out heartbeat
@@ -243,7 +250,7 @@ void Mqtt::loop() {
// publish top item from MQTT queue to stop flooding
if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) {
last_mqtt_poll_ = currentMillis;
publish_queue();
process_queue();
}
return;
@@ -325,7 +332,7 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
return;
}
// convert payload to a null-terminate char string
// convert payload to a null-terminated char string
char message[len + 2];
strlcpy(message, payload, len + 1);
@@ -333,7 +340,8 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
DEBUG_LOG(F("Received %s => %s (length %d)"), topic, message, len);
#endif
char * topic_magnitude = strrchr(topic, '/'); // strip out everything until last /
// strip out everything until the last /
char * topic_magnitude = strrchr(topic, '/');
if (topic_magnitude != nullptr) {
topic = topic_magnitude + 1;
}
@@ -341,7 +349,7 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
// Send message event to custom service
// this will pick the first topic that matches, so for multiple devices of the same type it's gonna fail
for (const auto & mf : mqtt_functions_) {
if (topic == mf.topic_) {
if (strcmp(topic, mf.topic_.c_str()) == 0) {
(mf.mqtt_function_)(message);
return;
}
@@ -470,7 +478,7 @@ void Mqtt::queue_publish_message(const char * topic, const JsonDocument & payloa
std::string payload_text;
serializeJson(payload, payload_text);
auto message = std::make_shared<MqttMessage>(uuid::get_uptime_ms(), Operation::PUBLISH, topic, payload_text, retain);
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, topic, payload_text, retain);
// DEBUG_LOG(F("Adding JSON publish message created with topic %s, message %s"), topic, payload_text.c_str());
@@ -484,11 +492,12 @@ void Mqtt::queue_publish_message(const char * topic, const JsonDocument & payloa
// add MQTT message to queue, payload is a string
void Mqtt::queue_publish_message(const char * topic, const std::string & payload, const bool retain) {
// can't have bogus topics, but empty payloads are ok
if (strlen(topic) == 0) {
return;
}
auto message = std::make_shared<MqttMessage>(uuid::get_uptime_ms(), Operation::PUBLISH, topic, payload, retain);
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, topic, payload, retain);
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
@@ -498,13 +507,13 @@ void Mqtt::queue_publish_message(const char * topic, const std::string & payload
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
}
// add MQTT message to queue, payload is a string
// add MQTT subscribe message to queue
void Mqtt::queue_subscribe_message(const std::string & topic) {
if (topic.empty()) {
return;
}
auto message = std::make_shared<MqttMessage>(uuid::get_uptime_ms(), Operation::SUBSCRIBE, topic, "", false);
auto message = std::make_shared<MqttMessage>(Operation::SUBSCRIBE, topic, "", false);
DEBUG_LOG(F("Adding a subscription for %s"), topic.c_str());
// if the queue is full, make room but removing the last one
@@ -537,16 +546,21 @@ void Mqtt::publish(const char * topic, const bool value) {
queue_publish_message(topic, value ? "1" : "0", mqtt_retain_);
}
// no payload
void Mqtt::publish(const char * topic) {
queue_publish_message(topic, "", mqtt_retain_);
}
// publish all queued messages to MQTT
void Mqtt::publish_all_queue() {
void Mqtt::process_all_queue() {
while (!mqtt_messages_.empty()) {
publish_queue();
process_queue();
}
}
// take top from queue and try and publish it
// assumes there is an MQTT connection
void Mqtt::publish_queue() {
void Mqtt::process_queue() {
if (mqtt_messages_.empty()) {
return;
}
@@ -554,8 +568,16 @@ void Mqtt::publish_queue() {
// fetch first from queue and create the full topic name
auto mqtt_message = mqtt_messages_.front();
auto message = mqtt_message.content_;
// append the hostname and base to the topic, unless we're doing native HA which has a different format
char full_topic[MQTT_TOPIC_MAX_SIZE];
make_topic(full_topic, message->topic);
// if the topic starts with "homeassistant" we leave it untouched, otherwise append ho st and base
if (strncmp(message->topic.c_str(), "homeassistant/", 13) == 0) {
strcpy(full_topic, message->topic.c_str());
} else {
make_topic(full_topic, message->topic);
}
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
@@ -586,7 +608,7 @@ void Mqtt::publish_queue() {
#else
uint16_t packet_id = 1;
#endif
DEBUG_LOG(F("Published topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
DEBUG_LOG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
@@ -612,7 +634,7 @@ void Mqtt::publish_queue() {
}
mqtt_messages_.pop_front(); // remove the message from the queue
}
} // namespace emsesp
// add console commands
void Mqtt::console_commands() {
@@ -641,25 +663,27 @@ void Mqtt::console_commands() {
EMSESPShell::commands->add_command(
ShellContext::MQTT,
CommandFlags::ADMIN,
flash_string_vector{F_(set), F_(nested_json)},
flash_string_vector{F_(bool_mandatory)},
flash_string_vector{F_(set), F_(format)},
flash_string_vector{F_(name_mandatory)},
[](Shell & shell, const std::vector<std::string> & arguments) {
Settings settings;
if (arguments[0] == read_flash_string(F_(on))) {
settings.mqtt_nestedjson(true);
settings.commit();
shell.println(F("Please restart EMS-ESP"));
} else if (arguments[0] == read_flash_string(F_(off))) {
settings.mqtt_nestedjson(false);
settings.commit();
shell.println(F("Please restart EMS-ESP"));
uint8_t value;
if (arguments[0] == read_flash_string(F_(single))) {
value = Settings::MQTT_format::SINGLE;
} else if (arguments[0] == read_flash_string(F_(nested))) {
value = Settings::MQTT_format::NESTED;
} else if (arguments[0] == read_flash_string(F_(ha))) {
value = Settings::MQTT_format::HA;
} else {
shell.println(F("Must be on or off"));
shell.println(F("Must be single, nested or ha"));
return;
}
settings.mqtt_format(value);
settings.commit();
shell.println(F("Please restart EMS-ESP"));
},
[](Shell & shell __attribute__((unused)), const std::vector<std::string> & arguments __attribute__((unused))) -> const std::vector<std::string> {
return std::vector<std::string>{read_flash_string(F_(on)), read_flash_string(F_(off))};
return std::vector<std::string>{read_flash_string(F_(single)), read_flash_string(F_(nested)), read_flash_string(F_(ha))};
});
EMSESPShell::commands->add_command(ShellContext::MQTT,
@@ -818,11 +842,17 @@ void Mqtt::console_commands() {
shell.printfln(F_(mqtt_base_fmt), settings.mqtt_base().empty() ? uuid::read_flash_string(F_(unset)).c_str() : settings.mqtt_base().c_str());
shell.printfln(F_(mqtt_qos_fmt), settings.mqtt_qos());
shell.printfln(F_(mqtt_retain_fmt), settings.mqtt_retain() ? F_(enabled) : F_(disabled));
shell.printfln(F_(mqtt_nestedjson_fmt), settings.mqtt_nestedjson() ? F_(enabled) : F_(disabled));
if (settings.mqtt_format() == Settings::MQTT_format::SINGLE) {
shell.printfln(F_(mqtt_format_fmt), F_(single));
} else if (settings.mqtt_format() == Settings::MQTT_format::NESTED) {
shell.printfln(F_(mqtt_format_fmt), F_(nested));
} else if (settings.mqtt_format() == Settings::MQTT_format::HA) {
shell.printfln(F_(mqtt_format_fmt), F_(ha));
}
shell.printfln(F_(mqtt_heartbeat_fmt), settings.mqtt_heartbeat() ? F_(enabled) : F_(disabled));
shell.printfln(F_(mqtt_publish_time_fmt), settings.mqtt_publish_time());
shell.println();
});
}
} // namespace emsesp
} // namespace emsesp