More stability: MQTT Publish suppressed if input Loop invoked from ethernet Idle (reentrance issue? low-memory for stack?)

Side effects - some MQTT publishings may lost
Watchdog enabled when MQTT connection pending
This commit is contained in:
2019-04-06 23:53:59 +03:00
parent ebc908a3a1
commit 3d598a1217
3 changed files with 34 additions and 20 deletions

View File

@@ -33,6 +33,7 @@ e-mail anklimov@gmail.com
extern PubSubClient mqttClient;
extern aJsonObject *root;
extern int8_t ethernetIdleCount;
#if !defined(DHT_DISABLE) || !defined(COUNTER_DISABLE)
static volatile unsigned long nextPollMillisValue[5];
@@ -223,7 +224,7 @@ void Input::counterPoll() {
strncpy(addrstr,emit->valuestring,sizeof(addrstr));
if (!strchr(addrstr,'/')) setTopic(addrstr,sizeof(addrstr),T_OUT,emit->valuestring);
sprintf(valstr, "%d", counterValue);
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, valstr);
setNextPollTime(millis() + DHT_POLL_DELAY_DEFAULT);
debugSerial<<F(" NextPollMillis=")<<nextPollTime();
@@ -287,7 +288,7 @@ void Input::uptimePoll() {
char valstr[11];
// printUlongValueToStr(valstr,millis());
printUlongValueToStr(valstr, millis());
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(emit->valuestring, valstr);
}
setNextPollTime(millis() + UPTIME_POLL_DELAY_DEFAULT);
@@ -387,11 +388,11 @@ void Input::dht22Poll() {
if (!strchr(addrstr,'/')) setTopic(addrstr,sizeof(addrstr),T_OUT,emit->valuestring);
strcat(addrstr, "T");
printFloatValueToStr(temp, valstr);
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, valstr);
addrstr[strlen(addrstr) - 1] = 'H';
printFloatValueToStr(humidity, valstr);
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, valstr);
setNextPollTime(millis() + DHT_POLL_DELAY_DEFAULT);
@@ -528,7 +529,7 @@ void Input::onContactChanged(int newValue) {
{
char addrstr[MQTT_TOPIC_LENGTH];
strncpy(addrstr,emit->valuestring,sizeof(addrstr));
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
{
if (!strchr(addrstr,'/')) setTopic(addrstr,sizeof(addrstr),T_OUT,emit->valuestring);
if (newValue) { //send set command
@@ -578,7 +579,7 @@ void Input::onAnalogChanged(int newValue) {
if (!strchr(addrstr,'/')) setTopic(addrstr,sizeof(addrstr),T_OUT,emit->valuestring);
char strVal[16];
itoa(newValue,strVal,10);
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, strVal, true);
}
@@ -601,7 +602,7 @@ bool Input::publishDataToDomoticz(int pollTimeIncrement, aJsonObject *emit, cons
vsnprintf(valstr, sizeof(valstr) - 1, format, args);
va_end(args);
debugSerial << valstr;
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(emit->valuestring, valstr);
if (pollTimeIncrement)
setNextPollTime(millis() + pollTimeIncrement);

View File

@@ -63,6 +63,7 @@ extern aJsonObject *pollingItem;
//int modbusSet(int addr, uint16_t _reg, int _mask, uint16_t value);
extern PubSubClient mqttClient;
extern int8_t ethernetIdleCount;
//extern char outprefix[];
//const char outprefix[] PROGMEM = OUTTOPIC;
@@ -1049,14 +1050,14 @@ int Item::checkFM() {
else aJson.addNumberToObject(out, "pwr", 0);
if (ftemp > FM_OVERHEAT_CELSIUS && set) {
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish("/alarm/ovrht", itemArr->name);
Ctrl(CMD_OFF); //Shut down
}
} else
debugSerial << F("Modbus polling error=") << _HEX(result);
outch = aJson.print(out);
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, outch);
free(outch);
aJson.deleteItem(out);
@@ -1296,7 +1297,7 @@ int Item::SendStatus(short cmd, short n, int *Par, boolean deffered) {
return -1;
}
debugSerial<<F("Pub: ")<<addrstr<<F("->")<<valstr<<endl;
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, valstr,true);
return 0;
}

View File

@@ -151,6 +151,7 @@ aJsonObject *pollingItem = NULL;
bool owReady = false;
bool configOk = false;
int8_t ethernetIdleCount =0;
#ifdef _modbus
ModbusMaster node;
@@ -170,20 +171,32 @@ void watchdogSetup(void) {} //Do not remove - strong re-definition WDT Init f
void cleanConf()
{
if (!root) return;
debugSerial<<F("Deleting conf. RAM was:")<<freeRam();
aJson.deleteItem(root);
root = NULL;
//aJson.deleteItem(inputs);
inputs = NULL;
//aJson.deleteItem(items);
items = NULL;
//aJson.deleteItem(topics);
topics = NULL;
//aJson.deleteItem(mqttArr);
mqttArr = NULL;
#ifndef DMX_DISABLE
//aJson.deleteItem(dmxArr);
dmxArr = NULL;
#endif
#ifndef OWIRE_DISABLE
//aJson.deleteItem(owArr);
owArr = NULL;
#endif
#ifndef MODBUS_DISABLE
//aJson.deleteItem(modbusArr);
modbusArr = NULL;
#endif
debugSerial<<F(" is ")<<freeRam()<<endl;
}
void mqttCallback(char *topic, byte *payload, unsigned int length) {
@@ -555,11 +568,11 @@ void ip_ready_config_loaded_connecting_to_broker() {
debugSerial<<F("\nAttempting MQTT connection to ")<<servername<<F(":")<<port<<F(" user:")<<user<<F(" ...");
wdt_dis(); //potential unsafe for ethernetIdle(), but needed to avoid cyclic reboot if mosquitto out of order
// wdt_dis(); //potential unsafe for ethernetIdle(), but needed to avoid cyclic reboot if mosquitto out of order
if (mqttClient.connect(deviceName, user, password,willTopic,MQTTQOS1,true,willMessage)) {
mqttErrorRate = 0;
debugSerial<<F("connected as ")<<deviceName <<endl;
wdt_en();
// wdt_en();
configOk = true;
// ... Temporary subscribe to status topic
char buf[MQTT_TOPIC_LENGTH];
@@ -776,7 +789,7 @@ void Changed(int i, DeviceAddress addr, float currentTemp) {
char valstr[50];
sprintf(valstr, "{\"idx\":%s,\"svalue\":\"%.1f\"}", idx->valuestring, currentTemp);
debugSerial << valstr;
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(owEmitString, valstr);
return;
}
@@ -785,7 +798,7 @@ void Changed(int i, DeviceAddress addr, float currentTemp) {
//strcpy_P(addrstr, outprefix);
setTopic(addrstr,sizeof(addrstr),T_OUT);
strncat(addrstr, owEmitString, sizeof(addrstr));
if (mqttClient.connected())
if (mqttClient.connected() && !ethernetIdleCount)
mqttClient.publish(addrstr, valstr);
}
// And translate temp to internal items
@@ -944,6 +957,7 @@ void printConfigSummary() {
printBool(udpSyslogArr);
#endif
debugSerial << endl;
debugSerial<<F("RAM=")<<freeRam()<<endl;
}
void cmdFunctionLoad(int arg_cnt, char **args) {
@@ -960,7 +974,6 @@ int loadConfigFromEEPROM()
ch = EEPROM.read(EEPROM_offset);
if (ch == '{') {
aJsonEEPROMStream as = aJsonEEPROMStream(EEPROM_offset);
aJson.deleteItem(root);
cleanConf();
root = aJson.parse(&as);
if (!root) {
@@ -1163,7 +1176,7 @@ lan_status loadConfigFromHttp(int arg_cnt, char **args)
//HTTPClient hclientPrint(configServer, 80);
// FILE is the return STREAM type of the HTTPClient
configStream = hclient.getURI(URI);
Serial.println("got--");delay(500);
//Serial.println("got--");delay(500);
responseStatusCode = hclient.getLastReturnCode();
wdt_en();
@@ -1174,7 +1187,6 @@ lan_status loadConfigFromHttp(int arg_cnt, char **args)
char c;
aJsonFileStream as = aJsonFileStream(configStream);
noInterrupts();
aJson.deleteItem(root);
cleanConf();
root = aJson.parse(&as);
interrupts();
@@ -1237,7 +1249,6 @@ lan_status loadConfigFromHttp(int arg_cnt, char **args)
//debugSerial<<"GET Response: ");
if (responseStatusCode == 200) {
aJson.deleteItem(root);
cleanConf()
root = aJson.parse((char *) response.c_str());
@@ -1270,7 +1281,6 @@ lan_status loadConfigFromHttp(int arg_cnt, char **args)
if (httpResponseCode == HTTP_CODE_OK) {
String response = httpClient.getString();
debugSerial<<response;
aJson.deleteItem(root);
cleanConf()
root = aJson.parse((char *) response.c_str());
if (!root) {
@@ -1601,8 +1611,10 @@ void owIdle(void) {
#endif
}
void ethernetIdle(void){
ethernetIdleCount++;
wdt_res();
inputLoop();
ethernetIdleCount--;
};
void modbusIdle(void) {