refactor tx retry

This commit is contained in:
proddy
2020-06-19 20:26:58 +02:00
parent 020f7d205c
commit cf8404be86
2 changed files with 86 additions and 75 deletions

View File

@@ -320,9 +320,10 @@ void RxService::add(uint8_t * data, uint8_t length) {
// Tx CODE starts here...
//
TxService::QueuedTxTelegram::QueuedTxTelegram(uint16_t id, std::shared_ptr<Telegram> && telegram)
TxService::QueuedTxTelegram::QueuedTxTelegram(uint16_t id, std::shared_ptr<Telegram> && telegram, bool retry)
: id_(id)
, telegram_(std::move(telegram)) {
, telegram_(std::move(telegram))
, retry_(retry) {
}
// empty queue, don't process
@@ -385,7 +386,7 @@ void TxService::send() {
// process a Tx telegram
void TxService::send_telegram(const QueuedTxTelegram & tx_telegram) {
uint8_t telegram_raw[EMS_MAX_TELEGRAM_LENGTH];
static uint8_t telegram_raw[EMS_MAX_TELEGRAM_LENGTH];
// build the header
auto telegram = tx_telegram.telegram_;
@@ -443,15 +444,15 @@ void TxService::send_telegram(const QueuedTxTelegram & tx_telegram) {
uint8_t length = message_p;
remember_tx(telegram->operation, telegram_raw, length); // make a copy of it in case we want to re-send it, without the CRC
telegram_last_ = std::make_shared<Telegram>(*telegram); // make a copy of the telegram
telegram_raw[length] = calculate_crc(telegram_raw, length); // generate and append CRC to the end
length++; // add one since we want to now include the CRC
// logging interferes with the UART so disable this
#if defined(ESP8266)
Settings settings;
if (settings.ems_tx_mode() <= 4) {
if (Settings().ems_tx_mode() <= 4) {
#endif
// This logging causes errors with timer based tx-modes on esp8266!
LOG_DEBUG(F("Sending %s Tx [#%d], telegram: %s"),
@@ -468,6 +469,7 @@ void TxService::send_telegram(const QueuedTxTelegram & tx_telegram) {
#if defined(ESP8266)
}
#endif
// send the telegram to the UART Tx
uint16_t status = EMSuart::transmit(telegram_raw, length);
@@ -516,28 +518,46 @@ void TxService::add(const uint8_t operation, const uint8_t dest, const uint16_t
tx_telegrams_.pop_front();
}
tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram));
tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram), false); // first tx, no retry
}
// builds a Tx telegram and adds to queue
// this is used by the retry() function to put the last failed Tx back into the queue
// format is EMS 1.0 (src, dest, type_id, offset, data)
// length is the length of the whole telegram data
// front = true if adding to the front of queue, e.g. with an Tx retry. Default is false.
void TxService::add(uint8_t operation, uint8_t * data, const uint8_t length, bool front) {
if (length < 5) {
LOG_ERROR(F("Tx telegram too short (telegram length is %d)"), length);
return;
void TxService::add(uint8_t operation, uint8_t * data, const uint8_t length) {
// build header. src, dest and offset have fixed positions
uint8_t src = data[0];
uint8_t dest = data[1];
uint8_t offset = data[3];
uint16_t type_id;
uint8_t * message_data; // where the message block starts
uint8_t message_length; // length of the message block, excluding CRC
// work out depending on the type, where the data message block starts and the message length
// same logic as in RxService::add()
if (data[2] < 0xF0) {
// EMS 1.0
type_id = data[2];
message_data = data + 4;
message_length = length - 5;
} else {
// EMS 2.0 / EMS+
uint8_t shift = 0; // default when data[2] is 0xFF
if (data[2] != 0xFF) {
// its F9 or F7, re-calculate shift. If the 5th byte is not 0xFF then telegram is 1 byte longer
shift = (data[4] != 0xFF) ? 2 : 1;
}
type_id = (data[4 + shift] << 8) + data[5 + shift] + 256;
message_data = data + 6 + shift;
message_length = length - 6 - shift;
}
uint8_t message_length = length - 4;
// build header
uint8_t src = data[0];
uint8_t dest = data[1];
uint8_t type_id = data[2];
uint8_t offset = data[3];
uint8_t * message_data = data + 4;
// if we don't have a type_id or empty data block, exit
if ((type_id == 0) || (message_length == 0)) {
return;
}
auto telegram = std::make_shared<Telegram>(operation, src, dest, type_id, offset, message_data, message_length); // operation is TX_WRITE or TX_READ
@@ -550,12 +570,7 @@ void TxService::add(uint8_t operation, uint8_t * data, const uint8_t length, boo
LOG_DEBUG(F("[DEBUG] New Tx [#%d] telegram, length %d"), tx_telegram_id_, message_length);
#endif
// add to either front or back of queue
if (front) {
tx_telegrams_.emplace_front(tx_telegram_id_++, std::move(telegram));
} else {
tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram));
}
tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram), false); // add to back of queue
}
// send a Tx telegram to request data from an EMS device
@@ -603,56 +618,60 @@ void TxService::send_raw(const char * telegram_data) {
add(Telegram::Operation::TX_RAW, data, count + 1); // add to Tx queue
}
// save the last Tx sent, only the telegram, excluding the CRC
void TxService::remember_tx(const uint8_t operation, const uint8_t * data, const uint8_t length) {
for (uint8_t i = 0; i < length; i++) {
telegram_last_[i] = data[i];
}
telegram_last_length_ = length;
telegram_last_op_ = operation;
if (ems_mask() != EMS_MASK_UNSET) {
telegram_last_[0] ^= ems_mask();
}
}
// add last Tx to tx queue and increment count
// returns retry count, or 0 if all done
uint8_t TxService::retry_tx() {
if (++retry_count_ == MAXIMUM_TX_RETRIES) {
void TxService::retry_tx(const uint8_t operation, const uint8_t * data, const uint8_t length) {
retry_count_++;
// have we reached the limit? if so, reset count and give up
if (retry_count_ > MAXIMUM_TX_RETRIES) {
reset_retry_count(); // give up
increment_telegram_fail_count(); // another Tx fail
return 0;
LOG_ERROR(F("Last Tx %s operation failed after %d retries. Ignoring request."),
(operation == Telegram::Operation::TX_WRITE) ? F("Write") : F("Read"),
MAXIMUM_TX_RETRIES);
} else {
LOG_DEBUG(F("[DEBUG] Last Tx %s operation failed. Retry #%d. sent message data: %s, received: %s"),
(operation == Telegram::Operation::TX_WRITE) ? F("Write") : F("Read"),
retry_count_,
telegram_last_->to_string().c_str(),
Helpers::data_to_hex(data, length).c_str());
return;
}
add(telegram_last_op_, telegram_last_, telegram_last_length_, true); // add the last Tx telegram to the front of the tx queue, at the top
// add to the top of the queue
if (tx_telegrams_.size() >= MAX_TX_TELEGRAMS) {
tx_telegrams_.pop_back();
}
return retry_count_;
tx_telegrams_.emplace_front(tx_telegram_id_++, std::move(telegram_last_), true);
}
// checks if a telegram is sent to us matches the last Tx request
// incoming Rx src must match the last Tx dest
// and incoming Rx dest must be us (our ems_bus_id)
// for both src and dest we strip the MSB 8th bit
// returns true if the src/dest match the last Tx sent
bool TxService::is_last_tx(const uint8_t src, const uint8_t dest) const {
// LOG_DEBUG(F("Comparing %02X=%02X , %02X,%02X"), (telegram_last_[1] & 0x7F), (src & 0x7F), (dest & 0x7F), ems_bus_id());
return (((telegram_last_[1] & 0x7F) == (src & 0x7F)) && ((dest & 0x7F) == ems_bus_id()));
return (((telegram_last_->dest & 0x7F) == (src & 0x7F)) && ((dest & 0x7F) == ems_bus_id()));
}
// sends a type_id read request to fetch values after a successful Tx write operation
void TxService::post_send_query() {
if (telegram_last_post_send_query_) {
uint8_t dest = (telegram_last_[1] & 0x7F);
LOG_DEBUG(F("Sending post validate read, type ID 0x%02X to dest 0x%02X"), telegram_last_post_send_query_, dest);
uint8_t dest = (telegram_last_->dest & 0x7F);
read_request(telegram_last_post_send_query_, dest, 0); // no offset
LOG_DEBUG(F("Sending post validate read, type ID 0x%02X to dest 0x%02X"), telegram_last_post_send_query_, dest);
}
}
// returns details of the last Tx message that was sent (for debugging)
std::string TxService::last_tx_to_string() const {
return Helpers::data_to_hex(telegram_last_, telegram_last_length_);
// print out the last Tx that was sent
void TxService::print_last_tx() {
LOG_DEBUG(F("Last Tx %s operation: %s"),
(telegram_last_->operation == Telegram::Operation::TX_WRITE) ? F("Write") : F("Read"),
telegram_last_->to_string().c_str());
}
} // namespace emsesp

View File

@@ -69,11 +69,7 @@ class Telegram {
const uint16_t type_id;
const uint8_t offset;
const uint8_t message_length;
uint8_t message_data[EMS_MAX_TELEGRAM_MESSAGE_LENGTH];
std::string to_string() const;
std::string to_string(const uint8_t * telegram, uint8_t length) const;
uint8_t message_data[EMS_MAX_TELEGRAM_MESSAGE_LENGTH];
enum Operation : uint8_t {
NONE = 0,
@@ -83,6 +79,9 @@ class Telegram {
TX_WRITE,
};
std::string to_string() const;
std::string to_string(const uint8_t * telegram, uint8_t length) const;
void read_value(uint16_t & param, const uint8_t index) const;
void read_value(uint32_t & param, const uint8_t index) const;
void read_value32(uint32_t & param, const uint8_t index) const;
@@ -145,8 +144,6 @@ class EMSbus {
bus_connected_ = true;
}
static bool tx_active() {
return tx_active_;
}
@@ -173,7 +170,7 @@ class EMSbus {
static uint32_t last_bus_activity_; // timestamp of last time a valid Rx came in
static bool bus_connected_; // start assuming the bus hasn't been connected
static uint8_t ems_mask_; // unset 0x00 buderus 0x80 junkers/ht3
static uint8_t ems_mask_; // unset=0xFF, buderus=0x00, junkers/ht3=0x80
static uint8_t ems_bus_id_; // the bus id, which configurable and stored in settings
static uint8_t tx_waiting_; // state of the Tx line (NONE or waiting on a TX_READ or TX_WRITE)
static bool tx_active_; // is true is we have a working Tx connection
@@ -248,7 +245,7 @@ class TxService : public EMSbus {
void send();
void add(const uint8_t operation, const uint8_t dest, const uint16_t type_id, const uint8_t offset, uint8_t * message_data, const uint8_t message_length);
void add(const uint8_t operation, uint8_t * data, const uint8_t length, bool front = false);
void add(const uint8_t operation, uint8_t * data, const uint8_t length);
void read_request(const uint16_t type_id, const uint8_t dest, const uint8_t offset = 0);
@@ -258,9 +255,7 @@ class TxService : public EMSbus {
void flush_tx_queue();
void remember_tx(const uint8_t operation, const uint8_t * data, const uint8_t length);
uint8_t retry_tx();
void retry_tx(const uint8_t operation, const uint8_t * data, const uint8_t length);
uint8_t retry_count() const {
return retry_count_;
@@ -302,21 +297,22 @@ class TxService : public EMSbus {
void post_send_query();
void print_last_tx();
class QueuedTxTelegram {
public:
QueuedTxTelegram(uint16_t id, std::shared_ptr<Telegram> && telegram);
QueuedTxTelegram(uint16_t id, std::shared_ptr<Telegram> && telegram, bool retry);
~QueuedTxTelegram() = default;
uint16_t id_; // sequential identifier
const std::shared_ptr<const Telegram> telegram_;
bool retry_; // is a retry
};
const std::deque<QueuedTxTelegram> queue() const {
return tx_telegrams_;
}
std::string last_tx_to_string() const;
static constexpr uint8_t MAXIMUM_TX_RETRIES = 3;
private:
@@ -331,13 +327,9 @@ class TxService : public EMSbus {
uint16_t telegram_write_count_ = 0; // # Tx successful writes
uint16_t telegram_fail_count_ = 0; // # Tx unsuccessful transmits
const std::shared_ptr<const Telegram> telegram_last;
uint8_t telegram_last_[EMS_MAX_TELEGRAM_LENGTH]; // copy of last telegram
uint8_t telegram_last_length_; // and its length
uint16_t telegram_last_post_send_query_; // which type ID to query after a successful send, to read back the values just written
uint8_t telegram_last_op_; // TX_WRITE or TX_READ
uint8_t retry_count_ = 0; // count for # Tx retries
std::shared_ptr<Telegram> telegram_last_;
uint16_t telegram_last_post_send_query_; // which type ID to query after a successful send, to read back the values just written
uint8_t retry_count_ = 0; // count for # Tx retries
void send_telegram(const QueuedTxTelegram & tx_telegram);
void send_telegram(const uint8_t * data, const uint8_t length);