This commit is contained in:
Paul
2020-05-25 17:13:05 +02:00
parent b2bb8e2b5a
commit d3953d90ca
29 changed files with 461 additions and 657 deletions

View File

@@ -93,11 +93,11 @@ void Mqtt::reconnect() {
#ifndef EMSESP_STANDALONE
mqttClient_.disconnect();
#endif
DEBUG_LOG(F("Reconnecting..."));
LOG_DEBUG(F("Reconnecting..."));
}
// MQTT setup
void Mqtt::start() {
void Mqtt::setup() {
// exit if already initialized
if (mqtt_start_) {
return;
@@ -145,7 +145,7 @@ void Mqtt::start() {
mqtt_last_connection_ = millis();
mqtt_reconnect_delay_ = Mqtt::MQTT_RECONNECT_DELAY_MIN;
DEBUG_LOG(F("Configuring MQTT service..."));
LOG_DEBUG(F("Configuring MQTT service..."));
}
// MQTT init callbacks
@@ -161,19 +161,19 @@ void Mqtt::init() {
mqttClient_.onDisconnect([this](AsyncMqttClientDisconnectReason reason) {
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
logger_.err(F("Disconnected from server"));
LOG_DEBUG(F("Disconnected from server"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
logger_.err(F("Server identifier Rejected"));
LOG_ERROR(F("Server identifier Rejected"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
logger_.err(F("Server unavailable"));
LOG_ERROR(F("Server unavailable"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
logger_.err(F("Malformed credentials"));
LOG_ERROR(F("Malformed credentials"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
logger_.err(F("Not authorized"));
LOG_ERROR(F("Not authorized"));
}
// Reset reconnection delay
@@ -182,7 +182,7 @@ void Mqtt::init() {
mqtt_start_ = false; // will force a new start()
});
// mqttClient.onSubscribe([this](uint16_t packetId, uint8_t qos) { DEBUG_LOG(F("Subscribe ACK for PID %d"), packetId); });
// mqttClient.onSubscribe([this](uint16_t packetId, uint8_t qos) { LOG_DEBUG(F("Subscribe ACK for PID %d"), packetId); });
mqttClient_.onPublish([this](uint16_t packetId) { on_publish(packetId); });
@@ -271,8 +271,8 @@ void Mqtt::loop() {
}
#ifndef EMSESP_STANDALONE
start();
logger_.info(F("Connecting to MQTT server..."));
setup();
LOG_INFO(F("Connecting to the MQTT server..."));
mqttClient_.connect(); // Connect to the MQTT broker
#endif
}
@@ -338,7 +338,7 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
strlcpy(message, payload, len + 1);
#ifdef EMSESP_DEBUG
DEBUG_LOG(F("Received %s => %s (length %d)"), topic, message, len);
LOG_DEBUG(F("Received %s => %s (length %d)"), topic, message, len);
#endif
// strip out everything until the last /
@@ -357,7 +357,7 @@ void Mqtt::on_message(char * topic, char * payload, size_t len) {
}
// if we got here we didn't find a topic match
DEBUG_LOG(F("No responding handler found for topic %s"), topic);
LOG_DEBUG(F("No responding handler found for topic %s"), topic);
}
// print all the topics related to a specific device_id
@@ -382,7 +382,7 @@ void Mqtt::show_topic_handlers(uuid::console::Shell & shell, const uint8_t devic
// and always remove from queue
void Mqtt::on_publish(uint16_t packetId) {
// find the MQTT message in the queue and remove it
if ((mqtt_messages_.empty()) || (mqtt_qos_ == 0)) {
if (mqtt_messages_.empty()) {
return;
}
@@ -394,9 +394,9 @@ void Mqtt::on_publish(uint16_t packetId) {
}
if (mqtt_message.packet_id_ == packetId) {
DEBUG_LOG(F("Acknowledged PID %d. Removing from queue"), packetId);
LOG_DEBUG(F("Acknowledged PID %d. Removing from queue"), packetId);
} else {
DEBUG_LOG(F("Mismatch, expecting PID %d, got %d"), mqtt_message.packet_id_, packetId);
LOG_DEBUG(F("Mismatch, expecting PID %d, got %d"), mqtt_message.packet_id_, packetId);
mqtt_publish_fails_++; // increment error count
}
@@ -419,31 +419,26 @@ char * Mqtt::make_topic(char * result, const std::string & topic) {
return result;
}
void Mqtt::start() {
publish("status", "online", true); // say we're alive to the Last Will topic, with retain on
send_start_topic();
send_heartbeat(); // send heartbeat if enabled
}
// send online appended with the version information as JSON
void Mqtt::send_start_topic() {
StaticJsonDocument<90> doc;
doc["event"] = "start";
doc["version"] = Settings().app_version();
#ifndef EMSESP_STANDALONE
doc["IP"] = WiFi.localIP().toString();
#endif
publish("info", doc, false); // send with retain off
}
// MQTT onConnect - when a connect is established
void Mqtt::on_connect() {
DEBUG_LOG(F("MQTT connected"));
mqtt_reconnect_delay_ = Mqtt::MQTT_RECONNECT_DELAY_MIN;
mqtt_last_connection_ = millis();
mqtt_connecting_ = false;
publish("status", "online", true); // say we're alive to the Last Will topic, with retain on
send_start_topic();
send_heartbeat(); // send heartbeat if enabled
LOG_INFO(F("MQTT connected"));
}
// send periodic MQTT message with system information
@@ -462,38 +457,6 @@ void Mqtt::send_heartbeat() {
publish("heartbeat", doc, false); // send to MQTT with retain off
}
// add MQTT message to queue, payload is a JSON doc.
// NOTE this only prints first 255 chars
void Mqtt::queue_publish_message(const std::string & topic, const JsonDocument & payload, const bool retain) {
// can't have bogus topics, but empty payloads are ok
if (topic.empty()) {
return;
}
/*
// check for empty JSON doc - we don't like those
size_t capacity = measureJson(payload);
if (capacity <= 3) {
// DEBUG_LOG(("Empty JSON payload for topic %s. Skipping"), topic);
return;
}
*/
std::string payload_text;
serializeJson(payload, payload_text);
auto message = std::make_shared<MqttMessage>(Operation::PUBLISH, topic, payload_text, retain);
// DEBUG_LOG(F("Adding JSON publish message created with topic %s, message %s"), topic, payload_text.c_str());
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
mqtt_messages_.pop_front();
}
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
}
// add MQTT message to queue, payload is a string
void Mqtt::queue_publish_message(const std::string & topic, const std::string & payload, const bool retain) {
// can't have bogus topics, but empty payloads are ok
@@ -518,7 +481,7 @@ void Mqtt::queue_subscribe_message(const std::string & topic) {
}
auto message = std::make_shared<MqttMessage>(Operation::SUBSCRIBE, topic, "", false);
DEBUG_LOG(F("Adding a subscription for %s"), topic.c_str());
LOG_DEBUG(F("Adding a subscription for %s"), topic.c_str());
// if the queue is full, make room but removing the last one
if (mqtt_messages_.size() >= maximum_mqtt_messages_) {
@@ -528,24 +491,27 @@ void Mqtt::queue_subscribe_message(const std::string & topic) {
mqtt_messages_.emplace_back(mqtt_message_id_++, std::move(message));
}
// Publish using the user's custom retain flag
void Mqtt::publish(const std::string & topic, const std::string & payload) {
publish(topic, payload, mqtt_retain_);
}
void Mqtt::publish(const std::string & topic, const JsonDocument & payload) {
publish(topic, payload, mqtt_retain_);
}
// MQTT Publish, using a specific retain flag
void Mqtt::publish(const std::string & topic, const std::string & payload, bool retain) {
queue_publish_message(topic, payload, retain);
}
// Publish using the user's custom retain flag
void Mqtt::publish(const std::string & topic, const std::string & payload) {
publish(topic, payload, mqtt_retain_);
}
void Mqtt::publish(const std::string & topic, const JsonDocument & payload) {
publish(topic, payload, mqtt_retain_);
}
void Mqtt::publish(const std::string & topic, const JsonDocument & payload, bool retain) {
queue_publish_message(topic, payload, retain);
// convert json to string
std::string payload_text;
serializeJson(payload, payload_text);
queue_publish_message(topic, payload_text, retain);
}
// for booleans, which get converted to string values 1 and 0
void Mqtt::publish(const std::string & topic, const bool value) {
queue_publish_message(topic, value ? "1" : "0", mqtt_retain_);
}
@@ -576,7 +542,7 @@ void Mqtt::process_queue() {
// append the hostname and base to the topic, unless we're doing native HA which has a different format
char full_topic[MQTT_TOPIC_MAX_SIZE];
// if the topic starts with "homeassistant" we leave it untouched, otherwise append ho st and base
// if the topic starts with "homeassistant" we leave it untouched, otherwise append host and base
if (strncmp(message->topic.c_str(), "homeassistant/", 13) == 0) {
strcpy(full_topic, message->topic.c_str());
} else {
@@ -585,14 +551,14 @@ void Mqtt::process_queue() {
// if we're subscribing...
if (message->operation == Operation::SUBSCRIBE) {
DEBUG_LOG(F("Subscribing to topic: %s"), full_topic);
LOG_DEBUG(F("Subscribing to topic: %s"), full_topic);
#ifndef EMSESP_STANDALONE
uint16_t packet_id = mqttClient_.subscribe(full_topic, mqtt_qos_);
#else
uint16_t packet_id = 1;
#endif
if (!packet_id) {
DEBUG_LOG(F("Error subscribing to %s, error %d"), full_topic, packet_id);
LOG_DEBUG(F("Error subscribing to %s, error %d"), full_topic, packet_id);
}
mqtt_messages_.pop_front(); // remove the message from the queue
@@ -606,25 +572,26 @@ void Mqtt::process_queue() {
return;
}
// else try and publish it
// else try and publish it
#ifndef EMSESP_STANDALONE
uint16_t packet_id = mqttClient_.publish(full_topic, mqtt_qos_, message->retain, message->payload.c_str());
// uint16_t packet_id = mqttClient_.publish(full_topic, mqtt_qos_, message->retain, message->payload.c_str());
uint16_t packet_id = mqttClient_.publish(full_topic, mqtt_qos_, message->retain, message->payload.c_str(), message->payload.size(), false, mqtt_message.id_);
#else
uint16_t packet_id = 1;
#endif
DEBUG_LOG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
LOG_DEBUG(F("Publishing topic %s (#%02d, attempt #%d, pid %d)"), full_topic, mqtt_message.id_, mqtt_message.retry_count_ + 1, packet_id);
if (packet_id == 0) {
// it failed. if we retried n times, give up. remove from queue
if (mqtt_message.retry_count_ == (MQTT_PUBLISH_MAX_RETRY - 1)) {
logger_.err(F("Failed to publish to %s after %d attempts"), full_topic, mqtt_message.retry_count_ + 1);
LOG_ERROR(F("Failed to publish to %s after %d attempts"), full_topic, mqtt_message.retry_count_ + 1);
mqtt_publish_fails_++; // increment failure counter
mqtt_messages_.pop_front(); // delete
return;
} else {
mqtt_messages_.front().retry_count_++;
// logger_.err(F("Failed to publish to %s. Trying again, #%d"), full_topic, mqtt_message.retry_count_ + 1);
DEBUG_LOG(F("Failed to publish to %s. Trying again, #%d"), full_topic, mqtt_message.retry_count_ + 1);
LOG_DEBUG(F("Failed to publish to %s. Trying again, #%d"), full_topic, mqtt_message.retry_count_ + 1);
return; // leave on queue for next time so it gets republished
}
}
@@ -633,7 +600,7 @@ void Mqtt::process_queue() {
// but add the packet_id so we can check it later
if (mqtt_qos_ != 0) {
mqtt_messages_.front().packet_id_ = packet_id;
DEBUG_LOG(F("Setting packetID for ACK to %d"), packet_id);
LOG_DEBUG(F("Setting packetID for ACK to %d"), packet_id);
return;
}