remove Rx queue. Now all incoming Rx telegrams are processed immediately

This commit is contained in:
proddy
2020-08-17 17:56:25 +02:00
parent 51807f2678
commit c87e532457
4 changed files with 24 additions and 85 deletions

View File

@@ -58,7 +58,7 @@ uint8_t EMSESP::actual_master_thermostat_ = EMSESP_DEFAULT_MASTER_THERMOSTAT; /
uint16_t EMSESP::watch_id_ = WATCH_ID_NONE; // for when log is TRACE. 0 means no trace set uint16_t EMSESP::watch_id_ = WATCH_ID_NONE; // for when log is TRACE. 0 means no trace set
uint8_t EMSESP::watch_ = 0; // trace off uint8_t EMSESP::watch_ = 0; // trace off
uint16_t EMSESP::read_id_ = WATCH_ID_NONE; uint16_t EMSESP::read_id_ = WATCH_ID_NONE;
bool EMSESP::tap_water_active_ = false; // for when Boiler states we having running warm water. used in Shower() bool EMSESP::tap_water_active_ = false; // for when Boiler states we having running warm water. used in Shower()
uint32_t EMSESP::last_fetch_ = 0; uint32_t EMSESP::last_fetch_ = 0;
uint8_t EMSESP::unique_id_count_ = 0; uint8_t EMSESP::unique_id_count_ = 0;
@@ -219,19 +219,6 @@ void EMSESP::show_ems(uuid::console::Shell & shell) {
shell.println(); shell.println();
// Rx queue
auto rx_telegrams = rxservice_.queue();
if (rx_telegrams.empty()) {
shell.printfln(F("Rx Queue is empty"));
} else {
shell.printfln(F("Rx Queue (%ld telegram%s):"), rx_telegrams.size(), rx_telegrams.size() == 1 ? "" : "s");
for (const auto & it : rx_telegrams) {
shell.printfln(F(" [%02d] %s"), it.id_, pretty_telegram(it.telegram_).c_str());
}
}
shell.println();
// Tx queue // Tx queue
auto tx_telegrams = txservice_.queue(); auto tx_telegrams = txservice_.queue();
if (tx_telegrams.empty()) { if (tx_telegrams.empty()) {
@@ -829,12 +816,11 @@ void EMSESP::loop() {
return; return;
} }
system_.loop(); // does LED and checks system health, and syslog service system_.loop(); // does LED and checks system health, and syslog service
rxservice_.loop(); // process what ever is in the rx queue shower_.loop(); // check for shower on/off
shower_.loop(); // check for shower on/off sensors_.loop(); // this will also send out via MQTT
sensors_.loop(); // this will also send out via MQTT mqtt_.loop(); // sends out anything in the queue via MQTT
mqtt_.loop(); // sends out anything in the queue via MQTT console_.loop(); // telnet/serial console
console_.loop(); // telnet/serial console
// force a query on the EMS devices to fetch latest data at a set interval (1 min) // force a query on the EMS devices to fetch latest data at a set interval (1 min)
if ((uuid::get_uptime() - last_fetch_ > EMS_FETCH_FREQUENCY)) { if ((uuid::get_uptime() - last_fetch_ > EMS_FETCH_FREQUENCY)) {

View File

@@ -127,23 +127,6 @@ std::string Telegram::to_string_message() const {
return Helpers::data_to_hex(this->message_data, this->message_length); return Helpers::data_to_hex(this->message_data, this->message_length);
} }
// empty queue, don't process them
void RxService::flush_rx_queue() {
rx_telegrams_.clear();
rx_telegram_id_ = 0;
}
// Rx loop, run as many times as you can
// processes all telegrams on the queue. Assumes there are valid (i.e. CRC checked)
void RxService::loop() {
while (!rx_telegrams_.empty()) {
auto telegram = rx_telegrams_.front().telegram_;
(void)EMSESP::process_telegram(telegram); // further process the telegram
increment_telegram_count(); // increase count
rx_telegrams_.pop_front(); // remove it from the queue
}
}
// add a new rx telegram object // add a new rx telegram object
// data is the whole telegram, assuming last byte holds the CRC // data is the whole telegram, assuming last byte holds the CRC
// length includes the CRC // length includes the CRC
@@ -225,14 +208,11 @@ void RxService::add(uint8_t * data, uint8_t length) {
src = EMSESP::check_master_device(src, type_id, true); src = EMSESP::check_master_device(src, type_id, true);
// create the telegram // create the telegram
auto telegram = std::make_shared<Telegram>(Telegram::Operation::RX, src, dest, type_id, offset, message_data, message_length); rx_telegram = std::make_shared<Telegram>(Telegram::Operation::RX, src, dest, type_id, offset, message_data, message_length);
// check if queue is full, if so remove top item to make space // process it immediately
if (rx_telegrams_.size() >= MAX_RX_TELEGRAMS) { EMSESP::process_telegram(rx_telegram); // further process the telegram
rx_telegrams_.pop_front(); increment_telegram_count(); // increase count
}
rx_telegrams_.emplace_back(rx_telegram_id_++, std::move(telegram)); // add to queue
} }
// //
@@ -280,7 +260,6 @@ void TxService::send() {
} }
// if there's nothing in the queue to transmit, send back a poll and quit // if there's nothing in the queue to transmit, send back a poll and quit
// unless tx_mode is 0
if (tx_telegrams_.empty()) { if (tx_telegrams_.empty()) {
send_poll(); send_poll();
return; return;

View File

@@ -195,64 +195,39 @@ class EMSbus {
class RxService : public EMSbus { class RxService : public EMSbus {
public: public:
static constexpr size_t MAX_RX_TELEGRAMS = 10;
RxService() = default; RxService() = default;
~RxService() = default; ~RxService() = default;
void loop();
void add(uint8_t * data, uint8_t length); void add(uint8_t * data, uint8_t length);
void flush_rx_queue();
uint16_t telegram_count() const { uint16_t telegram_count() const {
return telegram_count_; return telegram_count_;
} }
uint16_t telegram_error_count() const {
return telegram_error_count_;
}
void increment_telegram_count() { void increment_telegram_count() {
telegram_count_++; telegram_count_++;
} }
uint16_t telegram_error_count() const {
return telegram_error_count_;
}
void increment_telegram_error_count() { void increment_telegram_error_count() {
telegram_error_count_++; telegram_error_count_++;
} }
class QueuedRxTelegram {
public:
const uint16_t id_;
const std::shared_ptr<const Telegram> telegram_;
~QueuedRxTelegram() = default;
QueuedRxTelegram(uint16_t id, std::shared_ptr<Telegram> && telegram)
: id_(id)
, telegram_(std::move(telegram)) {
}
};
const std::list<QueuedRxTelegram> queue() const {
return rx_telegrams_;
}
private: private:
uint32_t last_rx_check_ = 0; uint16_t telegram_count_ = 0; // # Rx received
uint8_t rx_telegram_id_ = 0; // queue counter uint16_t telegram_error_count_ = 0; // # Rx CRC errors
uint16_t telegram_count_ = 0; // # Rx received std::shared_ptr<const Telegram> rx_telegram; // the incoming Rx telegram
uint16_t telegram_error_count_ = 0; // # Rx CRC errors
std::list<QueuedRxTelegram> rx_telegrams_;
}; };
class TxService : public EMSbus { class TxService : public EMSbus {
public: public:
static constexpr size_t MAX_TX_TELEGRAMS = 20; // size of Tx queue static constexpr size_t MAX_TX_TELEGRAMS = 20; // size of Tx queue
static constexpr uint8_t TX_WRITE_FAIL = 4; static constexpr uint8_t TX_WRITE_FAIL = 4; // EMS return code for fail
static constexpr uint8_t TX_WRITE_SUCCESS = 1; static constexpr uint8_t TX_WRITE_SUCCESS = 1; // EMS return code for success
TxService() = default; TxService() = default;
~TxService() = default; ~TxService() = default;
@@ -358,10 +333,7 @@ class TxService : public EMSbus {
#endif #endif
private: private:
uint8_t tx_telegram_id_ = 0; // queue counter std::list<QueuedTxTelegram> tx_telegrams_; // the Tx queue
uint32_t last_tx_check_ = 0;
std::list<QueuedTxTelegram> tx_telegrams_;
uint16_t telegram_read_count_ = 0; // # Tx successful reads uint16_t telegram_read_count_ = 0; // # Tx successful reads
uint16_t telegram_write_count_ = 0; // # Tx successful writes uint16_t telegram_write_count_ = 0; // # Tx successful writes
@@ -369,7 +341,9 @@ class TxService : public EMSbus {
std::shared_ptr<Telegram> telegram_last_; std::shared_ptr<Telegram> telegram_last_;
uint16_t telegram_last_post_send_query_; // which type ID to query after a successful send, to read back the values just written uint16_t telegram_last_post_send_query_; // which type ID to query after a successful send, to read back the values just written
uint8_t retry_count_ = 0; // count for # Tx retries uint8_t retry_count_ = 0; // count for # Tx retries
uint8_t tx_telegram_id_ = 0; // queue counter
void send_telegram(const QueuedTxTelegram & tx_telegram); void send_telegram(const QueuedTxTelegram & tx_telegram);
void send_telegram(const uint8_t * data, const uint8_t length); void send_telegram(const uint8_t * data, const uint8_t length);

View File

@@ -1 +1 @@
#define EMSESP_APP_VERSION "2.0.0b12-4" #define EMSESP_APP_VERSION "2.0.0b12-5"