mirror of
https://github.com/emsesp/EMS-ESP32.git
synced 2025-12-06 15:59:52 +03:00
mqtt publish peiod per device, publish on change, ADC
This commit is contained in:
99
src/mqtt.cpp
99
src/mqtt.cpp
@@ -27,8 +27,13 @@ AsyncMqttClient * Mqtt::mqttClient_;
|
||||
// static parameters we make global
|
||||
std::string Mqtt::hostname_;
|
||||
uint8_t Mqtt::mqtt_qos_;
|
||||
uint16_t Mqtt::publish_time_;
|
||||
uint8_t Mqtt::bus_id_;
|
||||
uint32_t Mqtt::publish_time_boiler_;
|
||||
uint32_t Mqtt::publish_time_thermostat_;
|
||||
uint32_t Mqtt::publish_time_solar_;
|
||||
uint32_t Mqtt::publish_time_mixing_;
|
||||
uint32_t Mqtt::publish_time_other_;
|
||||
uint32_t Mqtt::publish_time_sensor_;
|
||||
|
||||
std::vector<Mqtt::MQTTSubFunction> Mqtt::mqtt_subfunctions_;
|
||||
std::vector<Mqtt::MQTTCmdFunction> Mqtt::mqtt_cmdfunctions_;
|
||||
@@ -111,11 +116,30 @@ void Mqtt::loop() {
|
||||
uint32_t currentMillis = uuid::get_uptime();
|
||||
|
||||
// create publish messages for each of the EMS device values, adding to queue
|
||||
if (publish_time_ && (currentMillis - last_publish_ > publish_time_)) {
|
||||
last_publish_ = currentMillis;
|
||||
EMSESP::publish_all_values();
|
||||
if (publish_time_boiler_ && (currentMillis - last_publish_boiler_ > publish_time_boiler_)) {
|
||||
last_publish_boiler_ = currentMillis;
|
||||
EMSESP::publish_device_values(EMSdevice::DeviceType::BOILER);
|
||||
}
|
||||
if (publish_time_thermostat_ && (currentMillis - last_publish_thermostat_ > publish_time_thermostat_)) {
|
||||
last_publish_thermostat_ = currentMillis;
|
||||
EMSESP::publish_device_values(EMSdevice::DeviceType::THERMOSTAT);
|
||||
}
|
||||
if (publish_time_solar_ && (currentMillis - last_publish_solar_ > publish_time_solar_)) {
|
||||
last_publish_solar_ = currentMillis;
|
||||
EMSESP::publish_device_values(EMSdevice::DeviceType::SOLAR);
|
||||
}
|
||||
if (publish_time_mixing_ && (currentMillis - last_publish_mixing_ > publish_time_mixing_)) {
|
||||
last_publish_mixing_ = currentMillis;
|
||||
EMSESP::publish_device_values(EMSdevice::DeviceType::MIXING);
|
||||
}
|
||||
if (publish_time_other_ && (currentMillis - last_publish_other_ > publish_time_other_)) {
|
||||
last_publish_other_ = currentMillis;
|
||||
EMSESP::publish_other_values();
|
||||
}
|
||||
if (currentMillis - last_publish_sensor_ > publish_time_sensor_) {
|
||||
last_publish_sensor_ = currentMillis;
|
||||
EMSESP::publish_sensor_values(publish_time_sensor_ != 0);
|
||||
}
|
||||
|
||||
// publish top item from MQTT queue to stop flooding
|
||||
if ((uint32_t)(currentMillis - last_mqtt_poll_) > MQTT_PUBLISH_WAIT) {
|
||||
last_mqtt_poll_ = currentMillis;
|
||||
@@ -280,7 +304,7 @@ void Mqtt::on_message(const char * topic, const char * payload, size_t len) {
|
||||
}
|
||||
|
||||
if (!cmd_known) {
|
||||
LOG_ERROR(F("MQTT: no matching cmd or invalid data: %s"), command);
|
||||
LOG_ERROR(F("MQTT: no matching cmd or invalid data: %s"), message);
|
||||
}
|
||||
|
||||
return;
|
||||
@@ -342,8 +366,13 @@ void Mqtt::start() {
|
||||
|
||||
// fetch MQTT settings
|
||||
EMSESP::esp8266React.getMqttSettingsService()->read([&](MqttSettings & mqttSettings) {
|
||||
publish_time_ = mqttSettings.publish_time * 1000; // convert to milliseconds
|
||||
mqtt_qos_ = mqttSettings.mqtt_qos;
|
||||
publish_time_boiler_ = mqttSettings.publish_time_boiler * 1000; // convert to milliseconds
|
||||
publish_time_thermostat_ = mqttSettings.publish_time_thermostat * 1000;
|
||||
publish_time_solar_ = mqttSettings.publish_time_solar * 1000;
|
||||
publish_time_mixing_ = mqttSettings.publish_time_mixing * 1000;
|
||||
publish_time_other_ = mqttSettings.publish_time_other * 1000;
|
||||
publish_time_sensor_ = mqttSettings.publish_time_sensor * 1000;
|
||||
mqtt_qos_ = mqttSettings.mqtt_qos;
|
||||
});
|
||||
|
||||
EMSESP::emsespSettingsService.read([&](EMSESPSettings & settings) { bus_id_ = settings.ems_bus_id; });
|
||||
@@ -390,8 +419,51 @@ void Mqtt::start() {
|
||||
mqtt_subfunctions_.reserve(10);
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time(uint16_t publish_time) {
|
||||
publish_time_ = publish_time * 1000; // convert to milliseconds
|
||||
void Mqtt::set_publish_time_boiler(uint16_t publish_time) {
|
||||
publish_time_boiler_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time_thermostat(uint16_t publish_time) {
|
||||
publish_time_thermostat_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time_solar(uint16_t publish_time) {
|
||||
publish_time_solar_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time_mixing(uint16_t publish_time) {
|
||||
publish_time_mixing_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time_other(uint16_t publish_time) {
|
||||
publish_time_other_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
void Mqtt::set_publish_time_sensor(uint16_t publish_time) {
|
||||
publish_time_sensor_ = publish_time * 1000; // convert to milliseconds
|
||||
}
|
||||
|
||||
bool Mqtt::get_publish_onchange(uint8_t device_type) {
|
||||
if (device_type == EMSdevice::DeviceType::BOILER) {
|
||||
if (!publish_time_boiler_) {
|
||||
return true;
|
||||
}
|
||||
} else if (device_type == EMSdevice::DeviceType::THERMOSTAT) {
|
||||
if (!publish_time_thermostat_) {
|
||||
return true;
|
||||
}
|
||||
} else if (device_type == EMSdevice::DeviceType::SOLAR) {
|
||||
if (!publish_time_solar_) {
|
||||
return true;
|
||||
}
|
||||
} else if (device_type == EMSdevice::DeviceType::MIXING) {
|
||||
if (!publish_time_mixing_) {
|
||||
return true;
|
||||
}
|
||||
} else if (!publish_time_other_) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void Mqtt::set_qos(uint8_t mqtt_qos) {
|
||||
@@ -482,13 +554,6 @@ void Mqtt::publish(const std::string & topic) {
|
||||
queue_publish_message(topic, "", false);
|
||||
}
|
||||
|
||||
// publish all queued messages to MQTT
|
||||
void Mqtt::process_all_queue() {
|
||||
while (!mqtt_messages_.empty()) {
|
||||
process_queue();
|
||||
}
|
||||
}
|
||||
|
||||
// take top from queue and perform the publish or subscribe action
|
||||
// assumes there is an MQTT connection
|
||||
void Mqtt::process_queue() {
|
||||
|
||||
Reference in New Issue
Block a user