MQTT topics config & CallBack refactoring

This commit is contained in:
2019-03-04 04:49:35 +03:00
parent 23e493097e
commit 141bb9c657
13 changed files with 258 additions and 78 deletions

View File

@@ -115,13 +115,13 @@ unsigned long nextSyslogPingTime;
lan_status lanStatus = INITIAL_STATE;
const char outprefix[] PROGMEM = OUTTOPIC;
const char inprefix[] PROGMEM = INTOPIC;
const char configserver[] PROGMEM = CONFIG_SERVER;
unsigned int UniqueID[5] = {0,0,0,0,0};
char *deviceName = NULL;
aJsonObject *topics = NULL;
aJsonObject *root = NULL;
aJsonObject *items = NULL;
aJsonObject *inputs = NULL;
@@ -181,29 +181,63 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) {
debugSerial<<((char) payload[i]);
debugSerial<<endl;
if(!strcmp(topic,CMDTOPIC)) {
cmd_parse((char *)payload);
return;
}
short intopic = 0;
short pfxlen = 0;
char * itemName;
{
char buf[MQTT_TOPIC_LENGTH + 1];
strncpy_P(buf, inprefix, sizeof(buf));
intopic = strncmp(topic, buf, strlen(inprefix));
// strncpy_P(buf, inprefix, sizeof(buf));
if (lanStatus == RETAINING_COLLECTING)
{
setTopic(buf,sizeof(buf),T_OUT);
pfxlen = strlen(buf);
intopic = strncmp(topic, buf, pfxlen);
}
else
{
setTopic(buf,sizeof(buf),T_BCST);
pfxlen = strlen(buf);
intopic = strncmp(topic, buf, pfxlen );
if (intopic)
{
setTopic(buf,sizeof(buf),T_DEV);
pfxlen = strlen(buf);
intopic = strncmp(topic, buf, pfxlen);
}
}
}
// in Retaining status - trying to restore previous state from retained output topic. Retained input topics are not relevant.
if ((lanStatus == RETAINING_COLLECTING) && !intopic) {
if (intopic) {
debugSerial<<F("Skipping..");
return;
}
char subtopic[MQTT_SUBJECT_LENGTH] = "";
// int cmd = 0;
//cmd = txt2cmd((char *) payload);
itemName=topic+pfxlen;
if(!strcmp(itemName,CMDTOPIC)) {
cmd_parse((char *)payload);
return;
}
//char subtopic[MQTT_SUBJECT_LENGTH] = "";
char *t;
if (t = strrchr(topic, '/'))
strncpy(subtopic, t + 1, MQTT_SUBJECT_LENGTH - 1);
Item item(subtopic);
if (t = strchr(itemName, '/'))
{
*t = 0;
t++;
debugSerial<<F("Subtopic:")<<t<<endl;
//strncpy(subtopic, t + 1, MQTT_SUBJECT_LENGTH - 1);
}
debugSerial<<F("Item:")<<itemName<<endl;
Item item(itemName);
if (item.isValid()) {
if (item.itemType == CH_GROUP && (lanStatus == RETAINING_COLLECTING))
return; //Do not restore group channels - they consist not relevant data
@@ -211,14 +245,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length) {
} //valid item
}
void printIPAddress(IPAddress ipAddress) {
for (byte i = 0; i < 4; i++)
#ifdef WITH_PRINTEX_LIB
(i < 3) ? debugSerial << (ipAddress[i]) << F(".") : debugSerial << (ipAddress[i])<<F(", ");
#else
(i < 3) ? debugSerial << _DEC(ipAddress[i]) << F(".") : debugSerial << _DEC(ipAddress[i]) << F(", ");
#endif
}
void printMACAddress() {
debugSerial<<F("Configured MAC:");
@@ -230,10 +257,6 @@ void printMACAddress() {
#endif
}
void restoreState() {
// Once connected, publish an announcement... // Once connected, publish an announcement...
//mqttClient.publish("/myhome/out/RestoreState", "ON");
};
lan_status lanLoop() {
@@ -263,7 +286,8 @@ lan_status lanLoop() {
char buf[MQTT_TOPIC_LENGTH];
//Unsubscribe from status topics..
strncpy_P(buf, outprefix, sizeof(buf));
//strncpy_P(buf, outprefix, sizeof(buf));
setTopic(buf,sizeof(buf),T_OUT);
strncat(buf, "#", sizeof(buf));
mqttClient.unsubscribe(buf);
@@ -353,24 +377,28 @@ void onMQTTConnect(){
char buf[128] = "";
// High level homie topics publishing
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat_P(topic, state_P, sizeof(topic));
strncpy_P(buf, ready_P, sizeof(buf));
mqttClient.publish(topic,buf,true);
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat_P(topic, name_P, sizeof(topic));
strncpy_P(buf, nameval_P, sizeof(buf));
mqttClient.publish(topic,buf,true);
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat_P(topic, stats_P, sizeof(topic));
strncpy_P(buf, statsval_P, sizeof(buf));
mqttClient.publish(topic,buf,true);
#ifndef NO_HOMIE
strncpy_P(topic, outprefix, sizeof(topic));
// strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat_P(topic, homie_P, sizeof(topic));
strncpy_P(buf, homiever_P, sizeof(buf));
mqttClient.publish(topic,buf,true);
@@ -411,7 +439,9 @@ void onMQTTConnect(){
break;
} //switch
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat(topic,item->name,sizeof(topic));
strncat(topic,"/",sizeof(topic));
strncat_P(topic,datatype_P,sizeof(topic));
@@ -419,7 +449,9 @@ void onMQTTConnect(){
if (format[0])
{
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat(topic,item->name,sizeof(topic));
strncat(topic,"/",sizeof(topic));
strncat_P(topic,format_P,sizeof(topic));
@@ -427,7 +459,8 @@ void onMQTTConnect(){
}
item = item->next;
} //if
strncpy_P(topic, outprefix, sizeof(topic));
//strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_OUT);
strncat_P(topic, nodes_P, sizeof(topic));
mqttClient.publish(topic,buf,true);
}
@@ -441,18 +474,26 @@ void ip_ready_config_loaded_connecting_to_broker() {
char *user = &empty;
char passwordBuf[16] = "";
char *password = passwordBuf;
int syslogPort = 514;
char syslogDeviceHostname[16];
if (mqttArr && (aJson.getArraySize(mqttArr))) deviceName = aJson.getArrayItem(mqttArr, 0)->valuestring;
#ifdef SYSLOG_ENABLE
debugSerial<<"debugSerial:";
//debugSerial<<"debugSerial:";
delay(100);
if (udpSyslogArr && aJson.getArraySize(udpSyslogArr)) {
if (udpSyslogArr && (n = aJson.getArraySize(udpSyslogArr))) {
char *syslogServer = aJson.getArrayItem(udpSyslogArr, 0)->valuestring;
int syslogPort = aJson.getArrayItem(udpSyslogArr, 1)->valueint;
if (n>1) syslogPort = aJson.getArrayItem(udpSyslogArr, 1)->valueint;
inet_ntoa_r(Ethernet.localIP(),syslogDeviceHostname,sizeof(syslogDeviceHostname));
/*
char *syslogDeviceHostname = aJson.getArrayItem(udpSyslogArr, 2)->valuestring;
char *syslogAppname = aJson.getArrayItem(udpSyslogArr, 3)->valuestring;
debugSerial<<F("Syslog params:")<<syslogServer<<syslogPort<<syslogDeviceHostname<<syslogAppname;
*/
debugSerial<<F("Syslog params:")<<syslogServer<<":"<<syslogPort<<":"<<syslogDeviceHostname<<":"<<deviceName<<endl;
udpSyslog.server(syslogServer, syslogPort);
udpSyslog.deviceHostname(syslogDeviceHostname);
udpSyslog.appName(syslogAppname);
if (deviceName) udpSyslog.appName(deviceName);
udpSyslog.defaultPriority(LOG_KERN);
udpSyslog.log(LOG_INFO, F("UDP Syslog initialized!"));
debugSerial<<F("UDP Syslog initialized!\n");
@@ -460,7 +501,7 @@ void ip_ready_config_loaded_connecting_to_broker() {
#endif
if (!mqttClient.connected() && mqttArr && ((n = aJson.getArraySize(mqttArr)) > 1)) {
char *client_id = aJson.getArrayItem(mqttArr, 0)->valuestring;
// char *client_id = aJson.getArrayItem(mqttArr, 0)->valuestring;
char *servername = aJson.getArrayItem(mqttArr, 1)->valuestring;
if (n >= 3) port = aJson.getArrayItem(mqttArr, 2)->valueint;
if (n >= 4) user = aJson.getArrayItem(mqttArr, 3)->valuestring;
@@ -477,26 +518,35 @@ void ip_ready_config_loaded_connecting_to_broker() {
strncpy_P(willMessage,disconnected_P,sizeof(willMessage));
strncpy_P(willTopic, outprefix, sizeof(willTopic));
// strncpy_P(willTopic, outprefix, sizeof(willTopic));
setTopic(willTopic,sizeof(willTopic),T_OUT);
strncat_P(willTopic, state_P, sizeof(willTopic));
debugSerial<<F("\nAttempting MQTT connection to ")<<servername<<F(":")<<port<<F(" user:")<<user<<F(" ...");
wdt_dis(); //potential unsafe for ethernetIdle(), but needed to avoid cyclic reboot if mosquitto out of order
if (mqttClient.connect(client_id, user, password,willTopic,MQTTQOS1,true,willMessage)) {
if (mqttClient.connect(deviceName, user, password,willTopic,MQTTQOS1,true,willMessage)) {
mqttErrorRate = 0;
debugSerial<<F("connected as ")<<client_id <<endl;
debugSerial<<F("connected as ")<<deviceName <<endl;
wdt_en();
configOk = true;
// ... Temporary subscribe to status topic
char buf[MQTT_TOPIC_LENGTH];
strncpy_P(buf, outprefix, sizeof(buf));
// strncpy_P(buf, outprefix, sizeof(buf));
setTopic(buf,sizeof(buf),T_OUT);
strncat(buf, "#", sizeof(buf));
mqttClient.subscribe(buf);
//Subscribing for command topics
strncpy_P(buf, inprefix, sizeof(buf));
//strncpy_P(buf, inprefix, sizeof(buf));
setTopic(buf,sizeof(buf),T_BCST);
strncat(buf, "#", sizeof(buf));
Serial.println(buf);
mqttClient.subscribe(buf);
setTopic(buf,sizeof(buf),T_DEV);
strncat(buf, "#", sizeof(buf));
Serial.println(buf);
mqttClient.subscribe(buf);
@@ -695,7 +745,8 @@ void Changed(int i, DeviceAddress addr, float currentTemp) {
}
#endif
strcpy_P(addrstr, outprefix);
//strcpy_P(addrstr, outprefix);
setTopic(addrstr,sizeof(addrstr),T_OUT);
strncat(addrstr, owEmitString, sizeof(addrstr));
mqttClient.publish(addrstr, valstr);
}
@@ -792,6 +843,7 @@ void applyConfig() {
}
#endif
items = aJson.getObjectItem(root, "items");
topics = aJson.getObjectItem(root, "topics");
// Digital output related Items initialization
pollingItem=NULL;
@@ -852,7 +904,7 @@ void printConfigSummary() {
void cmdFunctionLoad(int arg_cnt, char **args) {
loadConfigFromEEPROM();
restoreState();
// restoreState();
}
int loadConfigFromEEPROM()
@@ -882,7 +934,7 @@ int loadConfigFromEEPROM()
void cmdFunctionReq(int arg_cnt, char **args) {
mqttConfigRequest(arg_cnt, args);
restoreState();
// restoreState();
}
@@ -1297,10 +1349,16 @@ void printFirmwareVersionAndBuildOptions() {
#else
debugSerial<<F("\n(+)OWIRE");
#endif
#ifndef DHT_COUNTER_DISABLE
debugSerial<<F("\n(+)DHT COUNTER");
#ifndef DHT_DISABLE
debugSerial<<F("\n(+)DHT");
#else
debugSerial<<F("\n(-)DHT COUNTER");
debugSerial<<F("\n(-)DHT");
#endif
#ifndef COUNTER_DISABLE
debugSerial<<F("\n(+)COUNTER");
#else
debugSerial<<F("\n(-)COUNTER");
#endif
#ifdef SD_CARD_INSERTED
@@ -1323,7 +1381,7 @@ debugSerial<<endl;
// WDT_Disable( WDT ) ;
Serial.println("Reading 128 bits unique identifier \n\r" ) ;
Serial.println(F("Reading 128 bits unique identifier") ) ;
ReadUniqueID( UniqueID ) ;
Serial.print ("ID: ") ;
@@ -1333,6 +1391,9 @@ debugSerial<<endl;
}
void publishStat(){
long fr = freeRam();
char topic[64];
@@ -1340,15 +1401,14 @@ void publishStat(){
long ut = millis()/1000;
// debugSerial<<F("\nfree RAM: ")<<fr;
strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_DEV);
strncat_P(topic, stats_P, sizeof(topic));
strncat(topic, "/", sizeof(topic));
strncat_P(topic, freeheap_P, sizeof(topic));
mqttClient.publish(topic,itoa(fr,intbuf,10),true);
strncpy_P(topic, outprefix, sizeof(topic));
setTopic(topic,sizeof(topic),T_DEV);
strncat_P(topic, stats_P, sizeof(topic));
strncat(topic, "/", sizeof(topic));
strncat_P(topic, uptime_P, sizeof(topic));
@@ -1427,9 +1487,9 @@ void loop_main() {
#ifndef MODBUS_DISABLE
if (lanStatus != RETAINING_COLLECTING) pollingLoop();
#endif
#ifdef _owire
//#ifdef _owire
thermoLoop();
#endif
//#endif
}