From ae766b09e13036de1a037f1d2ee28a0a8253ebf5 Mon Sep 17 00:00:00 2001 From: Paul Date: Fri, 16 Aug 2019 21:49:52 +0200 Subject: [PATCH] added mqtt log --- src/MyESP.cpp | 235 +++++++++++++++++++++++++++++------- src/MyESP.h | 50 +++++--- src/custom.htm | 19 ++- src/custom.js | 4 + src/ems-esp.cpp | 101 +++++++++------- src/ems.cpp | 18 ++- src/version.h | 2 +- src/websrc/index.html | 3 +- src/websrc/myesp.htm | 16 ++- src/websrc/myesp.js | 56 ++++++++- tools/wsemulator/wserver.js | 38 ++++-- 11 files changed, 398 insertions(+), 144 deletions(-) diff --git a/src/MyESP.cpp b/src/MyESP.cpp index 4fb0b7732..dd1bc70f2 100644 --- a/src/MyESP.cpp +++ b/src/MyESP.cpp @@ -22,7 +22,6 @@ union system_rtcmem_t { uint32_t value; }; - // nasty global variables that are called from internal ws functions static char * _general_password = nullptr; static bool _shouldRestart = false; @@ -103,6 +102,13 @@ MyESP::MyESP() { // get the build time _buildTime = _getBuildTime(); + + // MQTT log + for (uint8_t i = 0; i < MYESP_MQTTLOG_MAX; i++) { + MQTT_log[i].timestamp = 0; + MQTT_log[i].topic = nullptr; + MQTT_log[i].payload = nullptr; + } } MyESP::~MyESP() { @@ -190,12 +196,6 @@ uint32_t MyESP::_getInitialFreeHeap() { return _heap; } -// used heap mem -// note calls to getFreeHeap sometimes causes some ESPs to crash -uint32_t MyESP::_getUsedHeap() { - return _getInitialFreeHeap() - ESP.getFreeHeap(); -} - // called when WiFi is connected, and used to start OTA, MQTT void MyESP::_wifiCallback(justwifi_messages_t code, char * parameter) { if ((code == MESSAGE_CONNECTED)) { @@ -370,6 +370,8 @@ void MyESP::mqttUnsubscribe(const char * topic) { void MyESP::mqttPublish(const char * topic, const char * payload) { // myDebug_P(PSTR("[MQTT] Sending pubish to %s with payload %s"), _mqttTopic(topic), payload); // for debugging mqttClient.publish(_mqttTopic(topic), _mqtt_qos, _mqtt_retain, payload); + + _addMQTTLog(topic, payload); // add to the log } // MQTT onConnect - when a connect is established @@ -380,7 +382,7 @@ void MyESP::_mqttOnConnect() { _mqtt_last_connection = millis(); // say we're alive to the Last Will topic - mqttClient.publish(_mqttTopic(_mqtt_will_topic), 1, true, _mqtt_will_online_payload); + mqttClient.publish(_mqttTopic(_mqtt_will_topic), 1, true, _mqtt_will_online_payload); // qos=1, retain=true // subscribe to general subs mqttSubscribe(MQTT_TOPIC_RESTART); @@ -389,6 +391,9 @@ void MyESP::_mqttOnConnect() { mqttSubscribe(MQTT_TOPIC_START); mqttPublish(MQTT_TOPIC_START, MQTT_TOPIC_START_PAYLOAD); + // send heartbeat if enabled + _heartbeatCheck(true); + // call custom function to handle mqtt receives (_mqtt_callback_f)(MQTT_CONNECT_EVENT, nullptr, nullptr); } @@ -604,7 +609,7 @@ void MyESP::_consoleShowHelp() { myDebug_P(PSTR("*")); myDebug_P(PSTR("* Commands:")); myDebug_P(PSTR("* ?=help, CTRL-D/quit=exit telnet session")); - myDebug_P(PSTR("* set, system, restart")); + myDebug_P(PSTR("* set, system, restart, mqttlog")); #ifdef CRASH myDebug_P(PSTR("* crash ")); #endif @@ -944,6 +949,13 @@ void MyESP::_telnetCommand(char * commandLine) { // restart command if ((strcmp(ptrToCommandName, "restart") == 0) && (wc == 1)) { resetESP(); + return; + } + + // print mqtt log command + if ((strcmp(ptrToCommandName, "mqttlog") == 0) && (wc == 1)) { + _printMQTTLog(); + return; } // show system stats @@ -1108,7 +1120,7 @@ bool MyESP::_rtcmemStatus() { if (reason == REASON_EXT_SYS_RST) { // external system reset if (getSystemBootStatus() == MYESP_BOOTSTATUS_BOOTING) { _setSystemBootStatus(MYESP_BOOTSTATUS_RESETNEEDED); - // _formatreq = true; // do a wipe next in the loop() TODO commented out for now + // _formatreq = true; // do a wipe next in the loop() - commented out for now because we use the web } else { _setSystemBootStatus(MYESP_BOOTSTATUS_POWERON); } @@ -1199,7 +1211,6 @@ void MyESP::_systemCheckLoop() { } } - // print out ESP system stats // for battery power is ESP.getVcc() void MyESP::showSystemStats() { @@ -1340,6 +1351,8 @@ void MyESP::_heartbeatCheck(bool force = false) { if ((millis() - last_heartbeat > MYESP_HEARTBEAT_INTERVAL) || force) { last_heartbeat = millis(); + // _printHeap("Heartbeat"); // for heartbeat debugging + if (!isMQTTConnected() || !(_mqtt_heartbeat)) { return; } @@ -1819,7 +1832,7 @@ bool MyESP::_fs_createCustomConfig() { // if it doesn't exist try and create it void MyESP::_fs_setup() { if (!SPIFFS.begin()) { - Serial.print(F("[WARN] Formatting filesystem...")); + myDebug_P(PSTR("[FS] Formatting filesystem...")); if (SPIFFS.format()) { _writeEvent("WARN", "sys", "File system formatted", ""); } else { @@ -2108,7 +2121,7 @@ void MyESP::_writeEvent(const char * type, const char * src, const char * desc, return; } - StaticJsonDocument<300> root; + StaticJsonDocument root; root["type"] = type; root["src"] = src; root["desc"] = desc; @@ -2132,9 +2145,16 @@ void MyESP::_sendEventLog(uint8_t page) { if (!eventlog) { eventlog.close(); myDebug_P(PSTR("[WEB] Event log is missing")); + if (_ota_post_callback_f) { + (_ota_post_callback_f)(); // call custom function + } return; // file can't be opened } + if (_ota_pre_callback_f) { + (_ota_pre_callback_f)(); // call custom function + } + // the size of the json will be quite big so best not to use stack (StaticJsonDocument) DynamicJsonDocument doc(MYESP_JSON_MAXSIZE); JsonObject root = doc.to(); @@ -2143,31 +2163,67 @@ void MyESP::_sendEventLog(uint8_t page) { JsonArray list = doc.createNestedArray("list"); - uint8_t first = (page - 1) * 10; - uint8_t last = page * 10; - uint8_t i = 0; + uint8_t first = ((page - 1) * 10) + 1; + uint8_t last = page * 10; + uint8_t char_count = 0; + uint8_t line_count = 0; + uint16_t read_count = 0; + bool abort = false; + char char_buffer[MYESP_JSON_LOG_MAXSIZE]; - while (eventlog.available()) { - String item = String(); - item = eventlog.readStringUntil('\n'); - if (i >= first && i < last) { - list.add(item); + // if at start, start immediately recording + bool record = (first == 1) ? true : false; + + // start at top and read until we find the page we want (sets of 10) + while (eventlog.available() && !abort) { + char c = eventlog.read(); + + // see if we've overrun, which means corrupt so ignore rest + if (read_count++ > MYESP_JSON_LOG_MAXSIZE - 1) { + abort = true; } - i++; - } - eventlog.close(); - float pages = i / 10.0; + // see if we have reached the end of the string + if (c == '\0' || c == '\n') { + line_count++; + + // save line + if (record) { + char_buffer[char_count] = '\0'; + list.add(char_buffer); + } + + char_count = 0; + read_count = 0; + if (line_count == first - 1) { // have we come to the start position, start recording + record = true; + } else if (line_count == last) { // finish recording and exit loop + record = false; + } + } else { + // add the char to the buffer if recording + if (record && (char_count < MYESP_JSON_LOG_MAXSIZE)) { + char_buffer[char_count++] = c; + } + } + } + eventlog.close(); // close SPIFFS + + float pages = line_count / 10.0; root["haspages"] = ceil(pages); char buffer[MYESP_JSON_MAXSIZE]; size_t len = serializeJson(root, buffer); - //Serial.printf("\nEVENTLOG: page %d\n", page); // turn on for debugging - //serializeJson(root, Serial); // turn on for debugging + //Serial.printf("\nEVENTLOG: page %d\n", page); // turn on for debugging XXX + //serializeJson(root, Serial); // turn on for debugging _ws->textAll(buffer, len); _ws->textAll("{\"command\":\"result\",\"resultof\":\"eventlist\",\"result\": true}"); + + if (_ota_post_callback_f) { + (_ota_post_callback_f)(); // call custom function + } } // Handles WebSocket Events @@ -2334,22 +2390,27 @@ void MyESP::_sendCustomStatus() { // send system status via ws void MyESP::_sendStatus() { - StaticJsonDocument<400> doc; - JsonObject root = doc.to(); + // capture memory before we stick in a huge json buffer on the heap! + uint32_t total_memory = _getInitialFreeHeap(); + uint32_t free_memory = ESP.getFreeHeap(); + + DynamicJsonDocument doc(MQTT_MAX_PAYLOAD_SIZE_LARGE); + JsonObject root = doc.to(); + root["command"] = "status"; FSInfo fsinfo; if (!SPIFFS.info(fsinfo)) { myDebug("[SYSTEM] Error getting info on SPIFFS"); + } else { + root["availspiffs"] = (fsinfo.totalBytes - fsinfo.usedBytes) / 1000; + root["spiffssize"] = (fsinfo.totalBytes / 1000); } - root["command"] = "status"; - // all sizes in bytes converted to KB - root["heap"] = ESP.getFreeHeap() / 1000; - root["sketchsize"] = ESP.getSketchSize() / 1000; - root["availsize"] = ESP.getFreeSketchSpace() / 1000; - root["availspiffs"] = (fsinfo.totalBytes - fsinfo.usedBytes) / 1000; - root["spiffssize"] = (fsinfo.totalBytes / 1000); + root["initheap"] = total_memory; + root["heap"] = free_memory; + root["sketchsize"] = ESP.getSketchSize() / 1000; + root["availsize"] = ESP.getFreeSketchSpace() / 1000; if (isAPmode()) { root["ip"] = WiFi.softAPIP().toString(); @@ -2377,8 +2438,32 @@ void MyESP::_sendStatus() { sprintf(uptime, "%d day%s %d hour%s %d minute%s %d second%s", d, (d == 1) ? "" : "s", h, (h == 1) ? "" : "s", m, (m == 1) ? "" : "s", sec, (sec == 1) ? "" : "s"); root["uptime"] = uptime; - char buffer[400]; + char topic_s[MQTT_MAX_TOPIC_SIZE] = {0}; + if (_hasValue(_mqtt_base)) { + strlcpy(topic_s, _mqtt_base, sizeof(topic_s)); + strlcat(topic_s, "/", sizeof(topic_s)); + strlcat(topic_s, _general_hostname, sizeof(topic_s)); + } else { + strlcpy(topic_s, _general_hostname, sizeof(topic_s)); + } + strlcat(topic_s, "/", sizeof(topic_s)); + root["mqttloghdr"] = topic_s; + + // create MQTT log + JsonArray list = root.createNestedArray("mqttlog"); + + for (uint8_t i = 0; i < MYESP_MQTTLOG_MAX; i++) { + if (MQTT_log[i].topic != nullptr) { + JsonObject item = list.createNestedObject(); + item["topic"] = MQTT_log[i].topic; + item["payload"] = MQTT_log[i].payload; + item["time"] = MQTT_log[i].timestamp; + } + } + + char buffer[MQTT_MAX_PAYLOAD_SIZE_LARGE]; size_t len = serializeJson(root, buffer); + _ws->textAll(buffer, len); } @@ -2525,6 +2610,72 @@ void MyESP::_webserver_setup() { myDebug_P(PSTR("[WEB] Web server started")); } +// print memory +void MyESP::_printHeap(const char * s) { + uint32_t total_memory = _getInitialFreeHeap(); + uint32_t free_memory = ESP.getFreeHeap(); + + myDebug(" [%s] Free Heap: %d bytes initially | %d bytes used (%2u%%) | %d bytes free (%2u%%)", + s, + total_memory, + total_memory - free_memory, + 100 * (total_memory - free_memory) / total_memory, + free_memory, + 100 * free_memory / total_memory); +} + +// print MQTT log - everything that was published last per topic +void MyESP::_printMQTTLog() { + myDebug_P(PSTR("MQTT publish log:")); + + for (uint8_t i = 0; i < MYESP_MQTTLOG_MAX; i++) { + if (MQTT_log[i].topic != nullptr) { + myDebug_P(PSTR("(%d) [%lu] Topic:%s Payload:%s"), i, MQTT_log[i].timestamp, MQTT_log[i].topic, MQTT_log[i].payload); + } + } + + myDebug_P(PSTR("")); // newline +} + +// add an MQTT log entry +void MyESP::_addMQTTLog(const char * topic, const char * payload) { + static uint8_t logCount = 0; + uint8_t logPointer = 0; + bool found = false; + + // myDebug("Publish [#%d] %s (%d) %s (%d)", logCount, topic, strlen(topic), payload, strlen(payload)); // for debugging + + // find the topic + while ((_hasValue(MQTT_log[logPointer].topic) && logPointer < MYESP_MQTTLOG_MAX)) { + if (strcmp(MQTT_log[logPointer].topic, topic) == 0) { + found = true; + break; + } + logPointer++; + } + + // if not found add it and increment next free space pointer + if (!found) { + logPointer = logCount; + if (++logCount == MYESP_MQTTLOG_MAX) { + logCount = 0; // rotate + } + } + + // delete old record + if (MQTT_log[logPointer].topic) { + free(MQTT_log[logPointer].topic); + } + + if (MQTT_log[logPointer].payload) { + free(MQTT_log[logPointer].payload); + } + + // add new record + MQTT_log[logPointer].topic = strdup(topic); + MQTT_log[logPointer].payload = strdup(payload); + MQTT_log[logPointer].timestamp = now(); +} // send UTC time via ws void MyESP::_sendTime() { @@ -2594,6 +2745,10 @@ void MyESP::begin(const char * app_hostname, const char * app_name, const char * _telnet_setup(); // Telnet setup, called first to set Serial + // _fs_printFile(MYESP_CONFIG_FILE); // for debugging + //_fs_printFile(MYESP_CUSTOMCONFIG_FILE); // for debugging + //_fs_printFile(MYESP_EVENTLOG_FILE); // for debugging + // print a welcome message myDebug_P(PSTR("\n\n* %s version %s"), _app_name, _app_version); @@ -2613,10 +2768,6 @@ void MyESP::begin(const char * app_hostname, const char * app_name, const char * _heartbeatCheck(true); // force heartbeat SerialAndTelnet.flush(); - - //_fs_printFile(MYESP_CONFIG_FILE); // for debugging - //_fs_printFile(MYESP_CUSTOMCONFIG_FILE); // for debugging - //_fs_printFile(MYESP_EVENTLOG_FILE); // for debugging } /* @@ -2643,7 +2794,7 @@ void MyESP::loop() { } if (_formatreq) { - myDebug("[SYSTEM] Factory reset initiated"); + myDebug("[SYSTEM] Factory reset initiated. Please wait. System will automatically restart when complete..."); SPIFFS.end(); _ws->enable(false); SPIFFS.format(); diff --git a/src/MyESP.h b/src/MyESP.h index 146adc2af..913ee033e 100644 --- a/src/MyESP.h +++ b/src/MyESP.h @@ -72,7 +72,6 @@ extern struct rst_info resetInfo; #define MQTT_RECONNECT_DELAY_MIN 2000 // Try to reconnect in 3 seconds upon disconnection #define MQTT_RECONNECT_DELAY_STEP 3000 // Increase the reconnect delay in 3 seconds after each failed attempt #define MQTT_RECONNECT_DELAY_MAX 120000 // Set reconnect time to 2 minutes at most -#define MQTT_MAX_TOPIC_SIZE 50 // max length of MQTT topic #define MQTT_TOPIC_START "start" #define MQTT_TOPIC_HEARTBEAT "heartbeat" #define MQTT_TOPIC_START_PAYLOAD "start" @@ -82,7 +81,13 @@ extern struct rst_info resetInfo; #define MQTT_RETAIN false #define MQTT_KEEPALIVE 60 // 1 minute #define MQTT_QOS 1 -#define MQTT_WILL_TOPIC "status" // for last will & testament topic name +#define MQTT_WILL_TOPIC "status" // for last will & testament topic name +#define MQTT_MAX_TOPIC_SIZE 50 // max length of MQTT topic +#define MQTT_MAX_PAYLOAD_SIZE 500 // max size of a JSON object. See https://arduinojson.org/v6/assistant/ +#define MQTT_MAX_PAYLOAD_SIZE_LARGE 2000 // max size of a large JSON object, like for sending MQTT log +#define MYESP_JSON_MAXSIZE 2000 // for large Dynamic json files +#define MYESP_MQTTLOG_MAX 20 // max number of log entries for MQTT publishes +#define MYESP_JSON_LOG_MAXSIZE 300 // max size of an JSON log entry // Internal MQTT events #define MQTT_CONNECT_EVENT 0 @@ -134,7 +139,6 @@ PROGMEM const char * const custom_reset_string[] = {custom_reset_hardware, cus // SPIFFS #define MYESP_SPIFFS_MAXSIZE 800 // https://arduinojson.org/v6/assistant/ -#define MYESP_JSON_MAXSIZE 2000 // for large Dynamic json files // CRASH /** @@ -186,8 +190,8 @@ struct RtcmemData { static_assert(sizeof(RtcmemData) <= (RTCMEM_BLOCKS * 4u), "RTCMEM struct is too big"); -#define MYESP_SYSTEM_CHECK_TIME 60000 // The system is considered stable after these many millis (1 minute) -#define MYESP_SYSTEM_CHECK_MAX 10 // After this many crashes on boot +#define MYESP_SYSTEM_CHECK_TIME 60000 // The system is considered stable after these many millis (1 minute) +#define MYESP_SYSTEM_CHECK_MAX 10 // After this many crashes on boot #define MYESP_HEARTBEAT_INTERVAL 120000 // in milliseconds, how often the MQTT heartbeat is sent (2 mins) typedef struct { @@ -205,6 +209,13 @@ typedef enum { MYESP_BOOTSTATUS_RESETNEEDED = 3 } MYESP_BOOTSTATUS; // boot messages +// for storing all MQTT publish messages +typedef struct { + char * topic; + char * payload; + time_t timestamp; +} _MQTT_Log; + typedef std::function mqtt_callback_f; typedef std::function wifi_callback_f; typedef std::function ota_callback_f; @@ -297,28 +308,34 @@ class MyESP { private: // mqtt - AsyncMqttClient mqttClient; - unsigned long _mqtt_reconnect_delay; - void _mqttOnMessage(char * topic, char * payload, size_t len); - void _mqttConnect(); - void _mqtt_setup(); + void _mqttOnMessage(char * topic, char * payload, size_t len); + void _mqttConnect(); + void _mqtt_setup(); + void _mqttOnConnect(); + void _sendStart(); + char * _mqttTopic(const char * topic); + + // mqtt log + _MQTT_Log MQTT_log[MYESP_MQTTLOG_MAX]; // log for publish messages + void _printMQTTLog(); + void _addMQTTLog(const char * topic, const char * payload); + + AsyncMqttClient mqttClient; // the MQTT class + uint32_t _mqtt_reconnect_delay; mqtt_callback_f _mqtt_callback_f; - void _mqttOnConnect(); - void _sendStart(); - char * _mqttTopic(const char * topic); char * _mqtt_ip; char * _mqtt_user; char * _mqtt_password; int _mqtt_port; char * _mqtt_base; bool _mqtt_enabled; - unsigned long _mqtt_keepalive; - unsigned char _mqtt_qos; + uint32_t _mqtt_keepalive; + uint8_t _mqtt_qos; bool _mqtt_retain; char * _mqtt_will_topic; char * _mqtt_will_online_payload; char * _mqtt_will_offline_payload; - unsigned long _mqtt_last_connection; + uint32_t _mqtt_last_connection; bool _mqtt_connecting; bool _mqtt_heartbeat; @@ -383,6 +400,7 @@ class MyESP { bool _timerequest; bool _formatreq; bool _hasValue(char * s); + void _printHeap(const char * s); // reset reason and rtcmem bool _rtcmem_status; diff --git a/src/custom.htm b/src/custom.htm index a5fe891f9..f1f68e630 100644 --- a/src/custom.htm +++ b/src/custom.htm @@ -9,7 +9,7 @@ + data-content="Please choose if you want to enable an LED to show status">