Working with multiply MQTT brokers

This commit is contained in:
2024-03-11 01:09:53 +03:00
parent 0770e32b31
commit 242db1552e
3 changed files with 42 additions and 48 deletions

View File

@@ -39,7 +39,8 @@ enum payloadType
OTAFrame=4, OTAFrame=4,
auth=5, auth=5,
metric=6, metric=6,
sysCmd=7 sysCmd=7,
rawPinCtrl=8
}; };
enum metricType enum metricType

View File

@@ -113,13 +113,13 @@ char cryptoKey[] = QUOTE(SHAREDSECRET);
#if defined(__SAM3X8E__) || defined(ARDUINO_ARCH_STM32) #if defined(__SAM3X8E__) || defined(ARDUINO_ARCH_STM32)
UID UniqueID; UID UniqueID;
#endif #endif
uint8_t brokers = 0;
char *deviceName = NULL; char *deviceName = NULL;
aJsonObject *topics = NULL; aJsonObject *topics = NULL;
aJsonObject *root = NULL; aJsonObject *root = NULL;
aJsonObject *items = NULL; aJsonObject *items = NULL;
aJsonObject *inputs = NULL; aJsonObject *inputs = NULL;
aJsonObject *brokersArr = NULL;
aJsonObject *mqttArr = NULL; aJsonObject *mqttArr = NULL;
#ifdef _modbus #ifdef _modbus
aJsonObject *modbusObj = NULL; aJsonObject *modbusObj = NULL;
@@ -246,8 +246,10 @@ debugSerial<<F("Deleting conf. RAM was:")<<freeRam();
inputs = NULL; inputs = NULL;
items = NULL; items = NULL;
topics = NULL; topics = NULL;
brokersArr = NULL;
mqttArr = NULL; mqttArr = NULL;
deviceName = NULL; deviceName = NULL;
brokers = 0;
#ifdef _dmxout #ifdef _dmxout
dmxArr = NULL; dmxArr = NULL;
@@ -447,7 +449,7 @@ if (lanStatus == RETAINING_COLLECTING)
pfxlen = inTopic(topic,T_DEV); pfxlen = inTopic(topic,T_DEV);
if (!pfxlen) return; // Not command topic if (!pfxlen) return; // Not command topic
if (strrchr(topic,'$')) return; if (strrchr(topic,'$')) return;
debugSerial<<F("CleanUp retained topic ")<<topic<<endl; debugSerial<<F("MQTT: CleanUp retained topic ")<<topic<<endl;
mqttClient.deleteTopic(topic); mqttClient.deleteTopic(topic);
} }
return; return;
@@ -481,7 +483,7 @@ else
//if (lanStatus != RETAINING_COLLECTING && (mqttClient.isRetained())) //if (lanStatus != RETAINING_COLLECTING && (mqttClient.isRetained()))
{ {
debugSerial<<F("Complete. Remove topic ")<<savedTopic<<endl; debugSerial<<F("MQTT: Complete. Remove topic ")<<savedTopic<<endl;
mqttClient.deleteTopic(savedTopic); mqttClient.deleteTopic(savedTopic);
} }
@@ -720,7 +722,7 @@ lan_status lanLoop() {
#endif #endif
lanStatus = OPERATION;//3; lanStatus = OPERATION;//3;
infoSerial<<F("Accepting commands...\n"); infoSerial<<F("MQTT: Accepting commands...\n");
} }
break; break;
case OPERATION: case OPERATION:
@@ -788,6 +790,7 @@ lan_status lanLoop() {
case DO_GET: case DO_GET:
if (mqttClient.connected()) mqttClient.disconnect(); if (mqttClient.connected()) mqttClient.disconnect();
setFirstBroker();
timerLanCheckTime = millis();// + 5000; timerLanCheckTime = millis();// + 5000;
lanStatus = GET; lanStatus = GET;
break; break;
@@ -1009,6 +1012,20 @@ configLocked--;
#endif #endif
} }
void setFirstBroker()
{
if (!brokers) return;
if (brokersArr) mqttArr = brokersArr->child;
infoSerial<<F("MQTT: set first broker")<<endl;
}
void setNextBroker()
{
if (!brokers) return;
mqttArr = mqttArr->next;
if (!mqttArr) setFirstBroker();
else infoSerial << F("MQTT: tryiyng nenxt broker")<<endl;
}
void ip_ready_config_loaded_connecting_to_broker() { void ip_ready_config_loaded_connecting_to_broker() {
@@ -1028,7 +1045,7 @@ void ip_ready_config_loaded_connecting_to_broker() {
if (!mqttArr || ((n = aJson.getArraySize(mqttArr)) < 2)) //At least device name and broker IP must be configured if (!mqttArr || ((n = aJson.getArraySize(mqttArr)) < 2)) //At least device name and broker IP must be configured
{ {
errorSerial<<F("At least device name and broker IP must be configured")<<endl; errorSerial<<F("MQTT: At least device name and broker IP must be configured")<<endl;
lanStatus = DO_READ_RE_CONFIG; lanStatus = DO_READ_RE_CONFIG;
return; return;
} }
@@ -1071,7 +1088,7 @@ void ip_ready_config_loaded_connecting_to_broker() {
strncat_P(willTopic, state_P, sizeof(willTopic)); strncat_P(willTopic, state_P, sizeof(willTopic));
infoSerial<<F("\nAttempting MQTT connection to ")<<servername<<F(":")<<port<<F(" user:")<<user<<F(" ..."); infoSerial<<F("\nMQTT: Attempting connection to ")<<servername<<F(":")<<port<<F(" user:")<<user<<F(" ...");
if (!strlen(user)) if (!strlen(user))
{ {
user = NULL; user = NULL;
@@ -1106,15 +1123,16 @@ void ip_ready_config_loaded_connecting_to_broker() {
// if (_once) {DMXput(); _once=0;} // if (_once) {DMXput(); _once=0;}
lanStatus = RETAINING_COLLECTING;//4; lanStatus = RETAINING_COLLECTING;//4;
timerLanCheckTime = millis();// + 5000; timerLanCheckTime = millis();// + 5000;
infoSerial<<F("Awaiting for retained topics")<<endl; infoSerial<<F("MQTT: Awaiting for retained topics")<<endl;
} else } else
{ {
errorSerial<<F("failed, rc=")<<mqttClient.state()<<F(" try again in 5 seconds")<<endl; errorSerial<<F("MQTT: failed, rc=")<<mqttClient.state()<<F(" try again in 5 seconds")<<endl;
setNextBroker();
timerLanCheckTime = millis();// + 5000; timerLanCheckTime = millis();// + 5000;
#ifdef RESTART_LAN_ON_MQTT_ERRORS #ifdef RESTART_LAN_ON_MQTT_ERRORS
mqttErrorRate++; mqttErrorRate++;
if(mqttErrorRate>50){ if(mqttErrorRate>50){
errorSerial<<F("Too many MQTT connection errors. Restart LAN")<<endl; errorSerial<<F("MQTT: Too many connection errors. Restart LAN")<<endl;
mqttErrorRate=0; mqttErrorRate=0;
#ifdef RESET_PIN #ifdef RESET_PIN
resetHard(); resetHard();
@@ -1355,8 +1373,16 @@ infoSerial<<F("Applying config")<<endl;
items = aJson.getObjectItem(root, "items"); items = aJson.getObjectItem(root, "items");
topics = aJson.getObjectItem(root, "topics"); topics = aJson.getObjectItem(root, "topics");
inputs = aJson.getObjectItem(root, "in"); inputs = aJson.getObjectItem(root, "in");
mqttArr = aJson.getObjectItem(root, "mqtt"); brokersArr = aJson.getObjectItem(root, "mqtt");
if (brokersArr && brokersArr->child && (brokersArr->child->type == aJson_Array))
{
infoSerial<<F("Brokers configured:")<<(brokers=aJson.getArraySize(brokersArr))<<endl;
mqttArr = brokersArr->child;
}
else
mqttArr=brokersArr;
setupSyslog(); setupSyslog();
@@ -1574,42 +1600,6 @@ int loadConfigFromEEPROM()
} }
int loadConfigFromCAN()
{
if (configLocked) return 0;
configLocked++;
infoSerial<<F("Loading Config from CAN")<<endl;
CANConfStream.open(LHCAN.getControllerID(),payloadType::configFrame,'r');
if (CANConfStream.peek() == '{') {
debugSerial<<F("JSON detected")<<endl;
aJsonStream as = aJsonStream(&CANConfStream);
cleanConf(false);
root = aJson.parse(&as);
CANConfStream.close();
if (!root) {
errorSerial<<F("load failed")<<endl;
sysConf.setETAG("");
// sysConfStream.close();
configLocked--;
return 0;
}
infoSerial<<F("Loaded from CAN")<<endl;
configLocked--;
applyConfig();
sysConf.loadETAG();
return 1;
}
CANConfStream.close();
infoSerial<<F("No stored config")<<endl;
configLocked--;
return 0;
}
int cmdFunctionSave(int arg_cnt, char **args) int cmdFunctionSave(int arg_cnt, char **args)
{ {

View File

@@ -338,4 +338,7 @@ void printCurentLanConfig();
int16_t attachMaturaTimer(); int16_t attachMaturaTimer();
void setFirstBroker();
void setNextBroker();
//void printFreeRam(); //void printFreeRam();