mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-07 00:09:51 +03:00
305
src/MyESP.cpp
305
src/MyESP.cpp
@@ -23,6 +23,15 @@ union system_rtcmem_t {
|
||||
uint32_t value;
|
||||
};
|
||||
|
||||
struct mqtt_message_t {
|
||||
uint16_t packetId = 0;
|
||||
char * topic = nullptr;
|
||||
char * payload = nullptr;
|
||||
bool retain = false;
|
||||
uint8_t retry_count = 0;
|
||||
};
|
||||
std::deque<mqtt_message_t> _mqtt_queue;
|
||||
|
||||
// nasty global variables that are called from internal ws functions
|
||||
static char * _general_password = nullptr;
|
||||
static bool _shouldRestart = false;
|
||||
@@ -111,14 +120,6 @@ MyESP::MyESP() {
|
||||
|
||||
// get the build time
|
||||
_buildTime = _getBuildTime();
|
||||
|
||||
// MQTT log
|
||||
for (uint8_t i = 0; i < MYESP_MQTTLOG_MAX; i++) {
|
||||
MQTT_log[i].type = 0;
|
||||
MQTT_log[i].timestamp = 0;
|
||||
MQTT_log[i].topic = nullptr;
|
||||
MQTT_log[i].payload = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
MyESP::~MyESP() {
|
||||
@@ -383,21 +384,17 @@ bool MyESP::mqttSubscribe(const char * topic) {
|
||||
if (mqttClient.connected() && (strlen(topic) > 0)) {
|
||||
char * topic_s = _mqttTopic(topic);
|
||||
|
||||
uint16_t packet_id = mqttClient.subscribe(topic_s, _mqtt_qos);
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Subscribing to %s"), topic_s);
|
||||
#endif
|
||||
|
||||
if (packet_id) {
|
||||
// add to mqtt log
|
||||
_addMQTTLog(topic_s, "", MYESP_MQTTLOGTYPE_SUBSCRIBE); // Has an empty payload for now
|
||||
return true;
|
||||
} else {
|
||||
uint16_t packet_id = mqttClient.subscribe(topic_s, _mqtt_qos);
|
||||
if (!packet_id) {
|
||||
myDebug_P(PSTR("[MQTT] Error subscribing to %s, error %d"), _mqttTopic(topic), packet_id);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false; // didn't work
|
||||
return true;
|
||||
}
|
||||
|
||||
// MQTT unsubscribe
|
||||
@@ -408,33 +405,152 @@ void MyESP::mqttUnsubscribe(const char * topic) {
|
||||
}
|
||||
}
|
||||
|
||||
// print MQTT log
|
||||
void MyESP::_printMQTTLog() {
|
||||
myDebug_P(PSTR("MQTT publish queue:"));
|
||||
|
||||
if (_mqtt_queue.empty()) {
|
||||
myDebug_P(PSTR(" queue is empty!"));
|
||||
myDebug_P(PSTR("")); // newline
|
||||
return;
|
||||
}
|
||||
|
||||
for (mqtt_message_t it : _mqtt_queue) {
|
||||
if (it.retry_count == 0) {
|
||||
if (it.packetId == 0) {
|
||||
myDebug_P(PSTR(" topic=%s payload=%s"), it.topic, it.payload);
|
||||
} else {
|
||||
myDebug_P(PSTR(" topic=%s payload=%s (pid %d)"), it.topic, it.payload, it.packetId);
|
||||
}
|
||||
} else {
|
||||
myDebug_P(PSTR(" topic=%s payload=%s (pid %d, retry #%d)"), it.topic, it.payload, it.packetId, it.retry_count);
|
||||
}
|
||||
}
|
||||
|
||||
myDebug_P(PSTR("")); // newline
|
||||
}
|
||||
|
||||
// Publish using the user's custom retain flag
|
||||
bool MyESP::mqttPublish(const char * topic, const char * payload) {
|
||||
return mqttPublish(topic, payload, _mqtt_retain);
|
||||
void MyESP::mqttPublish(const char * topic, const char * payload) {
|
||||
mqttPublish(topic, payload, _mqtt_retain);
|
||||
}
|
||||
|
||||
// MQTT Publish
|
||||
// returns true if all good
|
||||
bool MyESP::mqttPublish(const char * topic, const char * payload, bool retain) {
|
||||
if (!mqttClient.connected() || !_hasValue(topic) || !_hasValue(payload)) {
|
||||
void MyESP::mqttPublish(const char * topic, const char * payload, bool retain) {
|
||||
if (!_hasValue(topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_mqttQueue(topic, payload, retain); // queue the message
|
||||
}
|
||||
|
||||
bool MyESP::_mqttQueue(const char * topic, const char * payload, bool retain) {
|
||||
// Queue is not meant to send message "offline"
|
||||
// We must prevent the queue does not get full while offline
|
||||
if (!mqttClient.connected() || (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// create a new message
|
||||
mqtt_message_t element;
|
||||
element.topic = strdup(topic);
|
||||
element.retain = retain;
|
||||
element.packetId = 0;
|
||||
element.retry_count = 0;
|
||||
if (NULL != payload) {
|
||||
element.payload = strdup(payload);
|
||||
}
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Sending publish to %s with payload %s"), _mqttTopic(topic), payload);
|
||||
myDebug_P(PSTR("[MQTT] Adding to queue: #%d [%s] %s"), _mqtt_queue.size(), element.topic, element.payload);
|
||||
#endif
|
||||
uint16_t packet_id = mqttClient.publish(_mqttTopic(topic), _mqtt_qos, retain, payload);
|
||||
_mqtt_queue.push_back(element);
|
||||
|
||||
if (packet_id) {
|
||||
_addMQTTLog(topic, payload, MYESP_MQTTLOGTYPE_PUBLISH); // add to the log
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
|
||||
// called when an MQTT Publish ACK is received
|
||||
// check if ACK matches the last Publish we sent, if not report an error
|
||||
// and always remove from queue
|
||||
void MyESP::_mqttOnPublish(uint16_t packetId) {
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Publish ACK for PID %d"), packetId);
|
||||
#endif
|
||||
|
||||
// find the MQTT message in the queue and remove it
|
||||
if ((_mqtt_queue.empty()) || (_mqtt_qos == 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// it failed, we should try again https://github.com/proddy/EMS-ESP/issues/264
|
||||
myDebug_P(PSTR("[MQTT] Error publishing to %s with payload %s [error %d]"), _mqttTopic(topic), payload, packet_id);
|
||||
_mqtt_publish_fails++; // increment failure counter
|
||||
mqtt_message_t element = _mqtt_queue.front(); // get top of list
|
||||
|
||||
return false; // failed
|
||||
// if the last published failed, don't bother checking it. wait for the re-try
|
||||
if (element.packetId == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (element.packetId == packetId) {
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Found PID %d. Removing from queue."), packetId);
|
||||
#endif
|
||||
} else {
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Mismatch, expecting PID %d, got %d."), element.packetId, packetId);
|
||||
_mqtt_publish_fails++; // increment error count
|
||||
#endif
|
||||
}
|
||||
|
||||
_mqttRemoveLastPublish(); // always remove
|
||||
}
|
||||
|
||||
// removes top of queue
|
||||
void MyESP::_mqttRemoveLastPublish() {
|
||||
mqtt_message_t element = _mqtt_queue.front(); // get top of list
|
||||
free(element.topic);
|
||||
if (element.payload) {
|
||||
free(element.payload);
|
||||
}
|
||||
_mqtt_queue.pop_front();
|
||||
}
|
||||
|
||||
// take top from queue and try and publish it
|
||||
void MyESP::_mqttPublishQueue() {
|
||||
if ((!mqttClient.connected()) || (_mqtt_queue.empty())) {
|
||||
return;
|
||||
}
|
||||
|
||||
mqtt_message_t element = _mqtt_queue.front(); // fetch from queue
|
||||
|
||||
// try and publish it
|
||||
uint16_t packet_id = mqttClient.publish(_mqttTopic(element.topic), _mqtt_qos, element.retain, element.payload);
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Sent publish (attempt #%d, pid %d) [%s] [%s]"), element.retry_count, packet_id, _mqttTopic(element.topic), element.payload);
|
||||
#endif
|
||||
|
||||
if (packet_id == 0) {
|
||||
// it failed
|
||||
// if we retried 3 times, give up. remove from queue
|
||||
if (element.retry_count == 2) {
|
||||
myDebug_P(PSTR("[MQTT] Failed to publish to %s with payload %s"), _mqttTopic(element.topic), element.payload);
|
||||
_mqtt_publish_fails++; // increment failure counter
|
||||
_mqttRemoveLastPublish();
|
||||
} else {
|
||||
_mqtt_queue[0].retry_count++;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// if we have ACK set with QOS 1 or 2, leave on queue and let the ACK process remove it
|
||||
// but add the packet_id so we can check it later
|
||||
if (_mqtt_qos != 0) {
|
||||
_mqtt_queue[0].packetId = packet_id;
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug_P(PSTR("[MQTT] Setting packetID %d"), packet_id);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
// delete it from queue
|
||||
_mqttRemoveLastPublish();
|
||||
}
|
||||
|
||||
// MQTT onConnect - when a connect is established
|
||||
@@ -456,7 +572,7 @@ void MyESP::_mqttOnConnect() {
|
||||
mqttPublish(MQTT_TOPIC_START, MQTT_TOPIC_START_PAYLOAD, false);
|
||||
|
||||
// send heartbeat if enabled
|
||||
_heartbeatCheck(true);
|
||||
heartbeatCheck(true);
|
||||
|
||||
// call custom function to handle mqtt receives
|
||||
(_mqtt_callback_f)(MQTT_CONNECT_EVENT, nullptr, nullptr);
|
||||
@@ -493,7 +609,8 @@ void MyESP::_mqtt_setup() {
|
||||
});
|
||||
|
||||
//mqttClient.onSubscribe([this](uint16_t packetId, uint8_t qos) { myDebug_P(PSTR("[MQTT] Subscribe ACK for PID %d"), packetId); });
|
||||
//mqttClient.onPublish([this](uint16_t packetId) { myDebug_P(PSTR("[MQTT] Publish ACK for PID %d"), packetId); });
|
||||
|
||||
mqttClient.onPublish([this](uint16_t packetId) { _mqttOnPublish(packetId); });
|
||||
|
||||
mqttClient.onMessage([this](char * topic, char * payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
|
||||
_mqttOnMessage(topic, payload, len);
|
||||
@@ -719,7 +836,7 @@ void MyESP::_consoleShowHelp() {
|
||||
myDebug_P(PSTR("*"));
|
||||
myDebug_P(PSTR("* Commands:"));
|
||||
myDebug_P(PSTR("* ?/help=show commands, CTRL-D/quit=end telnet session"));
|
||||
myDebug_P(PSTR("* set, system, restart, mqttlog [all], kick, save"));
|
||||
myDebug_P(PSTR("* set, system, restart, mqttlog, kick, save"));
|
||||
|
||||
#ifdef CRASH
|
||||
myDebug_P(PSTR("* crash <dump | clear | test [n]>"));
|
||||
@@ -876,8 +993,8 @@ char * MyESP::_telnet_readWord(bool allow_all_chars) {
|
||||
// messy code but effective since we don't have too many settings
|
||||
// wc is word count, number of parameters after the 'set' command
|
||||
bool MyESP::_changeSetting(uint8_t wc, const char * setting, const char * value) {
|
||||
bool save_config = false;
|
||||
bool restart = false;
|
||||
bool save_config = false;
|
||||
bool restart = false;
|
||||
|
||||
MYESP_FSACTION_t save_custom_config = MYESP_FSACTION_ERR; // default
|
||||
|
||||
@@ -942,7 +1059,7 @@ bool MyESP::_changeSetting(uint8_t wc, const char * setting, const char * value)
|
||||
// finally check for any custom commands
|
||||
if (_fs_setlist_callback_f) {
|
||||
save_custom_config = (_fs_setlist_callback_f)(MYESP_FSACTION_SET, wc, setting, value);
|
||||
restart = (save_custom_config == MYESP_FSACTION_RESTART);
|
||||
restart = (save_custom_config == MYESP_FSACTION_RESTART);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1048,7 +1165,7 @@ void MyESP::_telnetCommand(char * commandLine) {
|
||||
|
||||
// print mqtt log command
|
||||
if (strcmp(ptrToCommandName, "mqttlog") == 0) {
|
||||
_printMQTTLog(wc != 1);
|
||||
_printMQTTLog();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1461,14 +1578,14 @@ void MyESP::showSystemStats() {
|
||||
/*
|
||||
* Send heartbeat via MQTT with all system data
|
||||
*/
|
||||
void MyESP::_heartbeatCheck(bool force) {
|
||||
void MyESP::heartbeatCheck(bool force) {
|
||||
static uint32_t last_heartbeat = 0;
|
||||
|
||||
if ((millis() - last_heartbeat > MYESP_HEARTBEAT_INTERVAL) || force) {
|
||||
last_heartbeat = millis();
|
||||
|
||||
#ifdef MYESP_DEBUG
|
||||
_printHeap("[HEARTBEAT] ");
|
||||
_printHeap("[HEARTBEAT]");
|
||||
#endif
|
||||
if (!isMQTTConnected() || !(_mqtt_heartbeat)) {
|
||||
return;
|
||||
@@ -2498,30 +2615,6 @@ void MyESP::_sendStatus() {
|
||||
sprintf(uptime, "%d day%s %d hour%s %d minute%s %d second%s", d, (d == 1) ? "" : "s", h, (h == 1) ? "" : "s", m, (m == 1) ? "" : "s", sec, (sec == 1) ? "" : "s");
|
||||
root["uptime"] = uptime;
|
||||
|
||||
char topic_s[MQTT_MAX_TOPIC_SIZE] = {0};
|
||||
if (_hasValue(_mqtt_base)) {
|
||||
strlcpy(topic_s, _mqtt_base, sizeof(topic_s));
|
||||
strlcat(topic_s, "/", sizeof(topic_s));
|
||||
strlcat(topic_s, _general_hostname, sizeof(topic_s));
|
||||
} else {
|
||||
strlcpy(topic_s, _general_hostname, sizeof(topic_s));
|
||||
}
|
||||
strlcat(topic_s, "/", sizeof(topic_s));
|
||||
root["mqttloghdr"] = topic_s;
|
||||
|
||||
// create MQTT log
|
||||
JsonArray list = root.createNestedArray("mqttlog");
|
||||
|
||||
// only send Publish
|
||||
for (uint8_t i = 0; i < MYESP_MQTTLOG_MAX; i++) {
|
||||
if ((MQTT_log[i].type == 1) && (MQTT_log[i].topic != nullptr)) {
|
||||
JsonObject item = list.createNestedObject();
|
||||
item["topic"] = MQTT_log[i].topic;
|
||||
item["payload"] = MQTT_log[i].payload;
|
||||
item["time"] = MQTT_log[i].timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
char buffer[MQTT_MAX_PAYLOAD_SIZE_LARGE];
|
||||
size_t len = serializeJson(root, buffer);
|
||||
|
||||
@@ -2688,80 +2781,6 @@ void MyESP::_printHeap(const char * prefix) {
|
||||
100 * free_memory / total_memory);
|
||||
}
|
||||
|
||||
// print MQTT log - everything that was published last per topic
|
||||
void MyESP::_printMQTTLog(bool show_sub = false) {
|
||||
myDebug_P(PSTR("MQTT publish log:"));
|
||||
uint8_t i;
|
||||
|
||||
for (i = 0; i < MYESP_MQTTLOG_MAX; i++) {
|
||||
if ((MQTT_log[i].topic != nullptr) && (MQTT_log[i].type == MYESP_MQTTLOGTYPE_PUBLISH)) {
|
||||
myDebug_P(PSTR(" (%02d:%02d:%02d) Topic: %s Payload: %s"),
|
||||
to_hour(MQTT_log[i].timestamp),
|
||||
to_minute(MQTT_log[i].timestamp),
|
||||
to_second(MQTT_log[i].timestamp),
|
||||
MQTT_log[i].topic,
|
||||
MQTT_log[i].payload);
|
||||
}
|
||||
}
|
||||
|
||||
// show subscriptions
|
||||
if (show_sub) {
|
||||
myDebug_P(PSTR("")); // newline
|
||||
myDebug_P(PSTR("MQTT subscriptions:"));
|
||||
|
||||
for (i = 0; i < MYESP_MQTTLOG_MAX; i++) {
|
||||
if ((MQTT_log[i].topic != nullptr) && (MQTT_log[i].type == MYESP_MQTTLOGTYPE_SUBSCRIBE)) {
|
||||
myDebug_P(PSTR(" Topic: %s"), MQTT_log[i].topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
myDebug_P(PSTR("")); // newline
|
||||
}
|
||||
|
||||
// add an MQTT log entry to our buffer
|
||||
void MyESP::_addMQTTLog(const char * topic, const char * payload, const MYESP_MQTTLOGTYPE_t type) {
|
||||
static uint8_t logCount = 0;
|
||||
uint8_t logPointer = 0;
|
||||
bool found = false;
|
||||
|
||||
#ifdef MYESP_DEBUG
|
||||
myDebug("_addMQTTLog [#%d] %s (%d) [%s] (%d)", logCount, topic, strlen(topic), payload, strlen(payload));
|
||||
#endif
|
||||
|
||||
// find the topic
|
||||
// topics must be unique for either publish or subscribe
|
||||
while ((logPointer < MYESP_MQTTLOG_MAX) && (_hasValue(MQTT_log[logPointer].topic))) {
|
||||
if ((strcmp(MQTT_log[logPointer].topic, topic) == 0) && (MQTT_log[logPointer].type == type)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
logPointer++;
|
||||
}
|
||||
|
||||
// if not found add it and increment next free space pointer
|
||||
if (!found) {
|
||||
logPointer = logCount;
|
||||
if (++logCount == MYESP_MQTTLOG_MAX) {
|
||||
logCount = 0; // rotate
|
||||
}
|
||||
}
|
||||
|
||||
// delete old record
|
||||
if (MQTT_log[logPointer].topic) {
|
||||
free(MQTT_log[logPointer].topic);
|
||||
}
|
||||
if (MQTT_log[logPointer].payload) {
|
||||
free(MQTT_log[logPointer].payload);
|
||||
}
|
||||
|
||||
// and add new record
|
||||
MQTT_log[logPointer].type = type;
|
||||
MQTT_log[logPointer].topic = strdup(topic);
|
||||
MQTT_log[logPointer].payload = strdup(payload);
|
||||
MQTT_log[logPointer].timestamp = now();
|
||||
}
|
||||
|
||||
// send UTC time via ws
|
||||
void MyESP::_sendTime() {
|
||||
StaticJsonDocument<MYESP_JSON_MAXSIZE_SMALL> doc;
|
||||
@@ -2898,7 +2917,7 @@ void MyESP::begin(const char * app_hostname, const char * app_name, const char *
|
||||
void MyESP::loop() {
|
||||
_calculateLoad();
|
||||
_systemCheckLoop();
|
||||
_heartbeatCheck();
|
||||
heartbeatCheck();
|
||||
_bootupSequence(); // see if a reset was pressed during bootup
|
||||
|
||||
jw.loop(); // WiFi
|
||||
@@ -2911,6 +2930,14 @@ void MyESP::loop() {
|
||||
|
||||
_mqttConnect(); // MQTT
|
||||
|
||||
// every second check MQTT queue for publishing
|
||||
static unsigned long lastMqttPoll = 0;
|
||||
unsigned long currentMillis = millis();
|
||||
if ((unsigned long)(currentMillis - lastMqttPoll) >= MQTT_PUBLISH_WAIT) {
|
||||
_mqttPublishQueue();
|
||||
lastMqttPoll = currentMillis;
|
||||
}
|
||||
|
||||
// SysLog
|
||||
uuid::loop();
|
||||
syslog.loop();
|
||||
@@ -2939,4 +2966,6 @@ void MyESP::loop() {
|
||||
delay(MYESP_DELAY); // some time to WiFi and everything else to catch up, calls yield, and also prevent overheating
|
||||
}
|
||||
|
||||
|
||||
|
||||
MyESP myESP;
|
||||
|
||||
Reference in New Issue
Block a user