optimized MQTT json

This commit is contained in:
Paul
2020-02-23 14:23:10 +01:00
parent 7d402d4518
commit 89a2dcac05
4 changed files with 252 additions and 190 deletions

View File

@@ -433,22 +433,23 @@ void MyESP::_printMQTTLog() {
// Publish using the user's custom retain flag
void MyESP::mqttPublish(const char * topic, const char * payload) {
mqttPublish(topic, payload, _mqtt_retain);
_mqttQueue(topic, payload, _mqtt_retain);
}
void MyESP::mqttPublish(const char * topic, JsonDocument payload) {
_mqttQueue(topic, payload, _mqtt_retain);
}
// MQTT Publish
void MyESP::mqttPublish(const char * topic, const char * payload, bool retain) {
if (!_hasValue(topic)) {
return;
}
_mqttQueue(topic, payload, retain); // queue the message
_mqttQueue(topic, payload, retain);
}
void MyESP::mqttPublish(const char * topic, JsonDocument payload, bool retain) {
_mqttQueue(topic, payload, retain);
}
// put a payload string into the queue
bool MyESP::_mqttQueue(const char * topic, const char * payload, bool retain) {
// Queue is not meant to send message "offline"
// We must prevent the queue does not get full while offline
if (!mqttClient.connected() || (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE)) {
if (!mqttClient.connected() || _mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE || !_hasValue(topic)) {
return false;
}
@@ -458,7 +459,7 @@ bool MyESP::_mqttQueue(const char * topic, const char * payload, bool retain) {
element.retain = retain;
element.packetId = 0;
element.retry_count = 0;
if (NULL != payload) {
if (payload != NULL) {
element.payload = strdup(payload);
}
#ifdef MYESP_DEBUG
@@ -469,6 +470,34 @@ bool MyESP::_mqttQueue(const char * topic, const char * payload, bool retain) {
return true;
}
// convert json doc to a string buffer and place on queue
bool MyESP::_mqttQueue(const char * topic, JsonDocument payload, bool retain) {
if (!mqttClient.connected() || _mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE || !_hasValue(topic)) {
return false;
}
// create a new message
mqtt_message_t element;
element.topic = strdup(topic);
element.retain = retain;
element.packetId = 0;
element.retry_count = 0;
// reserve space for buffer and serialize json into it
const size_t capacity = measureJson(payload) + 1;
if (capacity) {
element.payload = (char *)malloc(capacity);
serializeJson(payload, (char *)element.payload, capacity);
}
#ifdef MYESP_DEBUG
myDebug_P(PSTR("[MQTT] Adding to queue: #%d [%s] %s"), _mqtt_queue.size(), element.topic, element.payload);
#endif
_mqtt_queue.push_back(element);
return true;
}
// called when an MQTT Publish ACK is received
// check if ACK matches the last Publish we sent, if not report an error
// and always remove from queue
@@ -1604,11 +1633,10 @@ void MyESP::heartbeatCheck(bool force) {
uint32_t free_memory = ESP.getFreeHeap();
uint8_t mem_available = 100 * free_memory / total_memory; // as a %
StaticJsonDocument<MYESP_JSON_MAXSIZE_SMALL> doc;
JsonObject rootHeartbeat = doc.to<JsonObject>();
const size_t capacity = JSON_OBJECT_SIZE(6);
StaticJsonDocument<capacity> doc;
JsonObject rootHeartbeat = doc.to<JsonObject>();
//rootHeartbeat["version"] = _app_version;
//rootHeartbeat["IP"] = WiFi.localIP().toString();
rootHeartbeat["rssid"] = getWifiQuality();
rootHeartbeat["load"] = getSystemLoadAverage();
rootHeartbeat["uptime"] = _getUptime();
@@ -1616,10 +1644,7 @@ void MyESP::heartbeatCheck(bool force) {
rootHeartbeat["tcpdrops"] = _getSystemDropoutCounter();
rootHeartbeat["mqttpublishfails"] = _mqtt_publish_fails;
char data[300] = {0};
serializeJson(doc, data, sizeof(data));
(void)mqttPublish(MQTT_TOPIC_HEARTBEAT, data, false); // send to MQTT with retain off
mqttPublish(MQTT_TOPIC_HEARTBEAT, doc, false); // send to MQTT with retain off
}
}
@@ -1632,13 +1657,13 @@ void MyESP::heartbeatPrint() {
uint32_t total_memory = _getInitialFreeHeap();
uint32_t free_memory = ESP.getFreeHeap();
myDebug("[%d] uptime:%d bytesfree:%d (%2u%%), load:%d, dropouts:%d",
i++,
_getUptime(),
free_memory,
100 * free_memory / total_memory,
getSystemLoadAverage(),
_getSystemDropoutCounter()
myDebug_P(PSTR("[%d] uptime:%d bytesfree:%d (%2u%%), load:%d, dropouts:%d"),
i++,
_getUptime(),
free_memory,
100 * free_memory / total_memory,
getSystemLoadAverage(),
_getSystemDropoutCounter()
);
}
@@ -2418,7 +2443,7 @@ void MyESP::writeLogEvent(const uint8_t type, const char * msg) {
// Handles WebSocket Events
void MyESP::_onWsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t * data, size_t len) {
if (type == WS_EVT_ERROR) {
myDebug("[WEB] WebSocket[%s][%u] error(%u): %s\r\n", server->url(), client->id(), *((uint16_t *)arg), (char *)data);
myDebug_P(PSTR("[WEB] WebSocket[%s][%u] error(%u): %s\r\n"), server->url(), client->id(), *((uint16_t *)arg), (char *)data);
} else if (type == WS_EVT_DATA) {
AwsFrameInfo * info = (AwsFrameInfo *)arg;
uint64_t index = info->index;
@@ -2550,7 +2575,7 @@ bool MyESP::_fs_sendConfig() {
// send custom status via ws
void MyESP::_sendCustomStatus() {
DynamicJsonDocument doc(MYESP_JSON_MAXSIZE_LARGE);
DynamicJsonDocument doc(MYESP_JSON_MAXSIZE_MEDIUM);
JsonObject root = doc.to<JsonObject>();
@@ -2566,7 +2591,7 @@ void MyESP::_sendCustomStatus() {
(_web_callback_f)(root);
}
char buffer[MYESP_JSON_MAXSIZE_LARGE];
char buffer[MYESP_JSON_MAXSIZE_MEDIUM];
size_t len = serializeJson(root, buffer);
#ifdef MYESP_DEBUG
@@ -2582,13 +2607,13 @@ void MyESP::_sendStatus() {
uint32_t total_memory = _getInitialFreeHeap();
uint32_t free_memory = ESP.getFreeHeap();
DynamicJsonDocument doc(MQTT_MAX_PAYLOAD_SIZE_LARGE);
DynamicJsonDocument doc(MYESP_JSON_MAXSIZE_MEDIUM);
JsonObject root = doc.to<JsonObject>();
root["command"] = "status";
FSInfo fsinfo;
if (!SPIFFS.info(fsinfo)) {
myDebug("[SYSTEM] Error getting info on SPIFFS");
myDebug_P(PSTR("[SYSTEM] Error getting info on SPIFFS"));
} else {
root["availspiffs"] = (fsinfo.totalBytes - fsinfo.usedBytes) / 1000;
root["spiffssize"] = (fsinfo.totalBytes / 1000);
@@ -2626,7 +2651,7 @@ void MyESP::_sendStatus() {
sprintf(uptime, "%d day%s %d hour%s %d minute%s %d second%s", d, (d == 1) ? "" : "s", h, (h == 1) ? "" : "s", m, (m == 1) ? "" : "s", sec, (sec == 1) ? "" : "s");
root["uptime"] = uptime;
char buffer[MQTT_MAX_PAYLOAD_SIZE_LARGE];
char buffer[MYESP_JSON_MAXSIZE_MEDIUM];
size_t len = serializeJson(root, buffer);
_ws->textAll(buffer, len);
@@ -2783,13 +2808,13 @@ void MyESP::_printHeap(const char * prefix) {
uint32_t total_memory = _getInitialFreeHeap();
uint32_t free_memory = ESP.getFreeHeap();
myDebug("%s Free Heap: %d bytes initially | %d bytes used (%2u%%) | %d bytes free (%2u%%)",
prefix,
total_memory,
total_memory - free_memory,
100 * (total_memory - free_memory) / total_memory,
free_memory,
100 * free_memory / total_memory);
myDebug_P(PSTR("%s Free Heap: %d bytes initially | %d bytes used (%2u%%) | %d bytes free (%2u%%)"),
prefix,
total_memory,
total_memory - free_memory,
100 * (total_memory - free_memory) / total_memory,
free_memory,
100 * free_memory / total_memory);
}
// send UTC time via ws
@@ -2959,7 +2984,7 @@ void MyESP::loop() {
}
if (_formatreq) {
myDebug("[SYSTEM] Factory reset initiated. Please wait. System will automatically restart when complete...");
myDebug_P(PSTR("[SYSTEM] Factory reset initiated. Please wait. System will automatically restart when complete..."));
SPIFFS.end();
_ws->enable(false);
SPIFFS.format();
@@ -2969,7 +2994,7 @@ void MyESP::loop() {
if (_shouldRestart) {
writeLogEvent(MYESP_SYSLOG_INFO, "System is restarting");
myDebug("[SYSTEM] Restarting...");
myDebug_P(PSTR("[SYSTEM] Restarting..."));
_deferredReset(500, CUSTOM_RESET_TERMINAL);
ESP.restart();
}
@@ -2977,6 +3002,4 @@ void MyESP::loop() {
delay(MYESP_DELAY); // some time to WiFi and everything else to catch up, calls yield, and also prevent overheating
}
MyESP myESP;