fixed type_id for EMS+, cleaned up code, added watch, added option for Tx at top of queue

This commit is contained in:
proddy
2020-06-13 15:49:38 +02:00
parent 803505b1da
commit 10b6682842
3 changed files with 97 additions and 93 deletions

View File

@@ -29,12 +29,7 @@
#include "telegram.h" #include "telegram.h"
#if defined(ESP8266) #if defined(ESP8266)
#include "uart/emsuart_esp8266.h"
#include <RTCVars.h> #include <RTCVars.h>
#elif defined(ESP32)
#include "uart/emsuart_esp32.h"
#elif defined(EMSESP_STANDALONE)
#include <emsuart_standalone.h>
#endif #endif
#ifndef EMSESP_STANDALONE #ifndef EMSESP_STANDALONE

View File

@@ -68,8 +68,8 @@ Telegram::Telegram(uint8_t operation, uint8_t src, uint8_t dest, uint16_t type_i
, type_id(type_id) , type_id(type_id)
, offset(offset) , offset(offset)
, message_length(message_length) { , message_length(message_length) {
// copy complete telegram data over // copy complete telegram data over, preventing buffer overflow
for (uint8_t i = 0; i < message_length; i++) { for (uint8_t i = 0; ((i < message_length) && (i != EMS_MAX_TELEGRAM_MESSAGE_LENGTH - 1)); i++) {
message_data[i] = data[i]; message_data[i] = data[i];
} }
} }
@@ -211,36 +211,26 @@ void RxService::flush_rx_queue() {
rx_telegram_id_ = 0; rx_telegram_id_ = 0;
} }
// start and initialize the Rx incoming buffer
void RxService::start() {
// LOG_DEBUG(F("RxStart"));
// function not currently used
}
// Rx loop, run as many times as you can // Rx loop, run as many times as you can
// processes all telegrams on the queue. Assumes there are valid (i.e. CRC checked) // processes all telegrams on the queue. Assumes there are valid (i.e. CRC checked)
void RxService::loop() { void RxService::loop() {
/*
#ifndef EMSESP_STANDALONE #ifndef EMSESP_STANDALONE
// give rx some breathing space give rx some breathing space if ((uuid::get_uptime() - last_rx_check_) < RX_LOOP_WAIT) {
//if ((uuid::get_uptime() - last_rx_check_) < RX_LOOP_WAIT) { return;
// return; }
//} last_rx_check_ = uuid::get_uptime();
//last_rx_check_ = uuid::get_uptime();
#endif #endif
*/
while (!rx_telegrams_.empty()) { while (!rx_telegrams_.empty()) {
auto telegram = rx_telegrams_.front().telegram_; auto telegram = rx_telegrams_.front().telegram_;
// rx_telegrams_overflow_ = false;
(void)EMSESP::process_telegram(telegram); // further process the telegram (void)EMSESP::process_telegram(telegram); // further process the telegram
increment_telegram_count(); // increase count increment_telegram_count(); // increase count
// remove it from the queue rx_telegrams_.pop_front(); // remove it from the queue
// if (!rx_telegrams_overflow_) {
rx_telegrams_.pop_front();
// }
} }
} }
@@ -249,10 +239,15 @@ void RxService::loop() {
// length includes the CRC // length includes the CRC
// for EMS+ the type_id has the value + 256. We look for these type of telegrams with F7, F9 and FF in 3rd byte // for EMS+ the type_id has the value + 256. We look for these type of telegrams with F7, F9 and FF in 3rd byte
void RxService::add(uint8_t * data, uint8_t length) { void RxService::add(uint8_t * data, uint8_t length) {
if (length < 2) {
return;
}
// validate the CRC // validate the CRC
uint8_t crc = calculate_crc(data, length - 1); uint8_t crc = calculate_crc(data, length - 1);
if (data[length - 1] != crc) {
LOG_TRACE(F("Rx: %s %s(BAD, CRC %02X != %02X)%s"), Helpers::data_to_hex(data, length).c_str(), COLOR_RED, data[length - 1], crc, COLOR_RESET); if ((data[length - 1] != crc) && (EMSESP::watch() != 0)) {
LOG_ERROR(F("Rx: %s %s(BAD, CRC %02X != %02X)%s"), Helpers::data_to_hex(data, length).c_str(), COLOR_RED, data[length - 1], crc, COLOR_RESET);
increment_telegram_error_count(); increment_telegram_error_count();
return; return;
} }
@@ -270,42 +265,36 @@ void RxService::add(uint8_t * data, uint8_t length) {
uint8_t dest = data[1] & 0x7F; // strip MSB, don't care if its read or write for processing uint8_t dest = data[1] & 0x7F; // strip MSB, don't care if its read or write for processing
uint8_t offset = data[3]; // offset is always 4th byte uint8_t offset = data[3]; // offset is always 4th byte
uint16_t type_id = 0; // this could be 2 bytes for ems+ // set default values, which will be adjusted depending on the EMS1.0/2.0 logic below
uint8_t * message_data; uint16_t type_id = 0;
uint8_t message_length; uint8_t * message_data = data;
uint8_t message_length = length;
// work out depending on the type where the data message block starts // work out depending on the type where the data message block starts
if (data[2] < 0xF0 || length < 6) { if (data[2] < 0xF0 || length < 6) {
// EMS 1.0 // EMS 1.0
type_id = data[2]; type_id = data[2];
message_data = data + 4; // message block starts at 5th byte message_data += 4; // message block starts at 5th byte
message_length = length - 5; // remove 4 bytes header plus CRC message_length -= 5; // remove 4 bytes header plus CRC
} else { } else {
// EMS 2.0 / EMS+ // EMS 2.0 / EMS+
if (data[2] == 0xFF) { if (data[2] == 0xFF) {
// check for empty data // check for empty data
// special broadcast telegrams on ems+ have no data values, some even don't have a type ID, e.g. "21 0B FF 00" // special broadcast telegrams on ems+ have no data values, some even don't have a type ID, e.g. "21 0B FF 00"
if (length <= 7) { if (length > 8) {
message_data = data; // bogus pointer, will not be used message_length -= 7; // remove 6 byte header plus CRC
message_length = 0; message_data += 6; // message block starts at 7th position
if (length <= 5) {
type_id = 0; // has also an empty type_id
} else {
type_id = (data[4] << 8) + data[5] + 256;
} }
} else { if (length > 5) {
message_length = length - 7; // remove 6 byte header plus CRC type_id = (data[4] << 8) + data[5] + 256; // set type_id if there is one
message_data = data + 6; // message block starts at 7th position
} }
} else { } else {
// its F9 or F7 // its F9 or F7
uint8_t shift = (data[4] != 0xFF); // true (1) if 5th byte is not 0xFF, then telegram is 1 byte longer uint8_t shift = (data[4] != 0xFF); // true (1) if 5th byte is not 0xFF, then telegram is 1 byte longer
type_id = (data[5 + shift] << 8) + data[6 + shift] + 256; type_id = (data[5 + shift] << 8) + data[6 + shift] + 256;
message_data = data + 6 + shift; // there is a special byte after the typeID which we ignore for now message_data += 6 + shift; // there is a special byte after the typeID which we ignore for now
if (length <= (9 + shift)) { if (length > (9 + shift)) {
message_length = 0; // special broadcast on ems+ have no data values message_length -= (9 + shift);
} else {
message_length = length - (9 + shift);
} }
} }
} }
@@ -315,26 +304,27 @@ void RxService::add(uint8_t * data, uint8_t length) {
return; return;
} }
// if we're in "trace" and "raw" print out actual telegram as bytes to the console
if (EMSESP::watch() == 2) {
uint16_t trace_watch_id = EMSESP::watch_id();
if ((trace_watch_id == WATCH_NONE) || (src == trace_watch_id) || (dest == trace_watch_id) || (type_id == trace_watch_id)) {
LOG_INFO(F("Rx: %s"), Helpers::data_to_hex(data, length).c_str());
}
}
#ifdef EMSESP_DEBUG
LOG_DEBUG(F("[DEBUG] New Rx [#%d] telegram, message length %d"), rx_telegram_id_, message_length);
#endif
// create the telegram // create the telegram
auto telegram = std::make_shared<Telegram>(Telegram::Operation::RX, src, dest, type_id, offset, message_data, message_length); auto telegram = std::make_shared<Telegram>(Telegram::Operation::RX, src, dest, type_id, offset, message_data, message_length);
// check if queue is full, if so remove top item to make space // check if queue is full, if so remove top item to make space
if (rx_telegrams_.size() >= MAX_RX_TELEGRAMS) { if (rx_telegrams_.size() >= MAX_RX_TELEGRAMS) {
// rx_telegrams_overflow_ = true;
rx_telegrams_.pop_front(); rx_telegrams_.pop_front();
} }
// add to queue rx_telegrams_.emplace_back(rx_telegram_id_++, std::move(telegram)); // add to queue
LOG_DEBUG(F("New Rx [#%d] telegram, length %d"), rx_telegram_id_, message_length);
rx_telegrams_.emplace_back(rx_telegram_id_++, std::move(telegram));
// if we're in "trace" and "raw" print out actual telegram as bytes to the console
if (logger_.enabled(Level::TRACE) && EMSESP::trace_raw()) {
uint16_t trace_watch_id = EMSESP::trace_watch_id();
if ((trace_watch_id == LOG_TRACE_WATCH_NONE) || (src == trace_watch_id) || (dest == trace_watch_id) || (type_id == trace_watch_id)) {
LOG_TRACE(F("Rx: %s"), Helpers::data_to_hex(data, length).c_str());
}
}
} }
@@ -379,7 +369,7 @@ void TxService::loop() {
// sends a 1 byte poll which is our own device ID // sends a 1 byte poll which is our own device ID
void TxService::send_poll() { void TxService::send_poll() {
//LOG_TRACE(F("Ack %02X"),ems_bus_id() ^ ems_mask()); //LOG_DEBUG(F("Ack %02X"),ems_bus_id() ^ ems_mask());
EMSuart::send_poll(ems_bus_id() ^ ems_mask()); EMSuart::send_poll(ems_bus_id() ^ ems_mask());
} }
@@ -476,23 +466,19 @@ void TxService::send_telegram(const QueuedTxTelegram & tx_telegram) {
tx_telegram.id_, tx_telegram.id_,
telegram->to_string(telegram_raw, length).c_str()); telegram->to_string(telegram_raw, length).c_str());
// if we're watching an ID, then always show the full telegram
if ((logger_.enabled(Level::TRACE))
&& ((telegram->src == EMSESP::trace_watch_id()) || (telegram->dest == EMSESP::trace_watch_id()) || (telegram->type_id == EMSESP::trace_watch_id()))) {
logger_.trace(F("Sending %s Tx [#%d], telegram: %s"),
(telegram->operation == Telegram::Operation::TX_WRITE) ? F("write") : F("read"),
tx_telegram.id_,
telegram->to_string(telegram_raw, length).c_str());
}
// send the telegram to the UART Tx // send the telegram to the UART Tx
EMSUART_STATUS status = EMSuart::transmit(telegram_raw, length); uint16_t status = EMSuart::transmit(telegram_raw, length);
#ifdef EMSESP_DEBUG #ifdef EMSESP_DEBUG
LOG_TRACE(F("Tx: %s"), Helpers::data_to_hex(telegram_raw, length).c_str()); // if watching in 'raw' mode
if (EMSESP::watch() == 2) {
LOG_INFO(F("[DEBUG] Tx: %s"), Helpers::data_to_hex(telegram_raw, length).c_str());
}
#endif #endif
if (status != EMS_TX_STATUS_OK) { if (status == EMS_TX_STATUS_ERR) {
LOG_ERROR(F("Failed to transmit Tx via UART. Error: %s"), status == EMS_TX_WTD_TIMEOUT ? F("Timeout") : F("BRK")); LOG_ERROR(F("Failed to transmit Tx via UART."));
increment_telegram_fail_count(); // another Tx fail
tx_waiting(false); // nothing send, tx not in wait state tx_waiting(false); // nothing send, tx not in wait state
return; return;
} }
@@ -513,11 +499,15 @@ void TxService::send_telegram(const uint8_t * data, const uint8_t length) {
LOG_DEBUG(F("Sending Raw telegram: %s (length=%d)"), Helpers::data_to_hex(telegram_raw, length).c_str(), length); LOG_DEBUG(F("Sending Raw telegram: %s (length=%d)"), Helpers::data_to_hex(telegram_raw, length).c_str(), length);
tx_waiting(false); // no post validation
// send the telegram to the UART Tx // send the telegram to the UART Tx
EMSUART_STATUS status = EMSuart::transmit(telegram_raw, length); uint16_t status = EMSuart::transmit(telegram_raw, length);
//LOG_TRACE(F("Tx: %s"), Helpers::data_to_hex(telegram_raw, length).c_str()); //LOG_DEBUG(F("Tx: %s"), Helpers::data_to_hex(telegram_raw, length).c_str());
if (status != EMS_TX_STATUS_OK) {
LOG_ERROR(F("Failed to transmit Tx via UART. Error: %s"), status == EMS_TX_WTD_TIMEOUT ? F("Timeout") : F("BRK")); if (status == EMS_TX_STATUS_ERR) {
LOG_ERROR(F("Failed to transmit Tx via UART."));
increment_telegram_fail_count(); // another Tx fail
} }
} }
@@ -525,7 +515,9 @@ void TxService::send_telegram(const uint8_t * data, const uint8_t length) {
// given some details like the destination, type, offset and message block // given some details like the destination, type, offset and message block
void TxService::add(const uint8_t operation, const uint8_t dest, const uint16_t type_id, const uint8_t offset, uint8_t * message_data, const uint8_t message_length) { void TxService::add(const uint8_t operation, const uint8_t dest, const uint16_t type_id, const uint8_t offset, uint8_t * message_data, const uint8_t message_length) {
auto telegram = std::make_shared<Telegram>(operation, ems_bus_id(), dest, type_id, offset, message_data, message_length); auto telegram = std::make_shared<Telegram>(operation, ems_bus_id(), dest, type_id, offset, message_data, message_length);
LOG_DEBUG(F("New Tx [#%d] telegram, length %d"), tx_telegram_id_, message_length); #ifdef EMSESP_DEBUG
LOG_DEBUG(F("[DEBUG] New Tx [#%d] telegram, length %d"), tx_telegram_id_, message_length);
#endif
// if the queue is full, make room but removing the last one // if the queue is full, make room but removing the last one
if (tx_telegrams_.size() >= MAX_TX_TELEGRAMS) { if (tx_telegrams_.size() >= MAX_TX_TELEGRAMS) {
@@ -536,14 +528,17 @@ void TxService::add(const uint8_t operation, const uint8_t dest, const uint16_t
} }
// builds a Tx telegram and adds to queue, using only raw data // builds a Tx telegram and adds to queue, using only raw data
// format is EMS 1.0 (src, dest, type_id, offset, data)
// length is the length of the whole telegram data // length is the length of the whole telegram data
void TxService::add(uint8_t * data, const uint8_t length) { // front = true if adding to the front of queue, e.g. with an Tx retry. Default is false.
uint8_t message_length = length - 4; void TxService::add(uint8_t * data, const uint8_t length, bool front) {
if (!message_length) { if (length < 5) {
LOG_ERROR(F("Bad Tx telegram, too short (message length is %d)"), message_length); LOG_ERROR(F("Tx telegram too short (telegram length is %d)"), length);
return; return;
} }
uint8_t message_length = length - 4;
// build header // build header
uint8_t src = data[0]; uint8_t src = data[0];
uint8_t dest = data[1]; uint8_t dest = data[1];
@@ -558,8 +553,16 @@ void TxService::add(uint8_t * data, const uint8_t length) {
tx_telegrams_.pop_front(); tx_telegrams_.pop_front();
} }
LOG_DEBUG(F("New Tx [#%d] telegram, length %d"), tx_telegram_id_, message_length); #ifdef EMSESP_DEBUG
LOG_DEBUG(F("[DEBUG] New Tx [#%d] telegram, length %d"), tx_telegram_id_, message_length);
#endif
// add to either front or back of queue
if (front) {
tx_telegrams_.emplace_front(tx_telegram_id_++, std::move(telegram));
} else {
tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram)); tx_telegrams_.emplace_back(tx_telegram_id_++, std::move(telegram));
}
} }
// send a Tx telegram to request data from an EMS device // send a Tx telegram to request data from an EMS device
@@ -622,7 +625,7 @@ uint8_t TxService::retry_tx() {
return 0; return 0;
} }
add(telegram_last_, telegram_last_length_); // add the last Tx telegram to the tx queue, at the top add(telegram_last_, telegram_last_length_, true); // add the last Tx telegram to the front of the tx queue, at the top
return retry_count_; return retry_count_;
} }

View File

@@ -23,7 +23,15 @@
#include <deque> #include <deque>
#include <memory> // for unique ptrs #include <memory> // for unique ptrs
#include <vector> #include <vector>
// #include <atomic> // for overflow
// UART drivers
#if defined(ESP8266)
#include "uart/emsuart_esp8266.h"
#elif defined(ESP32)
#include "uart/emsuart_esp32.h"
#elif defined(EMSESP_STANDALONE)
#include <emsuart_standalone.h>
#endif
#include <uuid/log.h> #include <uuid/log.h>
@@ -43,8 +51,8 @@ static constexpr int16_t EMS_VALUE_SHORT_INVALID = 0x8000;
static constexpr uint32_t EMS_VALUE_ULONG_NOTSET = 0xFFFFFFFF; // for 3-byte and 4-byte longs static constexpr uint32_t EMS_VALUE_ULONG_NOTSET = 0xFFFFFFFF; // for 3-byte and 4-byte longs
static constexpr uint32_t EMS_VALUE_ULONG_INVALID = 0x80000000; static constexpr uint32_t EMS_VALUE_ULONG_INVALID = 0x80000000;
static constexpr uint8_t EMS_MAX_TELEGRAM_LENGTH = 32; // max length of a telegram static constexpr uint8_t EMS_MAX_TELEGRAM_LENGTH = 32; // max length of a complete EMS telegram
static constexpr uint8_t EMS_MAX_TELEGRAM_MESSAGE_LENGTH = EMS_MAX_TELEGRAM_LENGTH - 5; // max length of message block static constexpr uint8_t EMS_MAX_TELEGRAM_MESSAGE_LENGTH = 27; // max length of message block, assuming EMS1.0
namespace emsesp { namespace emsesp {
@@ -170,7 +178,6 @@ class RxService : public EMSbus {
RxService() = default; RxService() = default;
~RxService() = default; ~RxService() = default;
void start();
void loop(); void loop();
void add(uint8_t * data, uint8_t length); void add(uint8_t * data, uint8_t length);
@@ -210,7 +217,6 @@ class RxService : public EMSbus {
static constexpr uint32_t RX_LOOP_WAIT = 800; // delay in processing Rx queue static constexpr uint32_t RX_LOOP_WAIT = 800; // delay in processing Rx queue
uint32_t last_rx_check_ = 0; uint32_t last_rx_check_ = 0;
// std::atomic<bool> rx_telegrams_overflow_{false};
uint8_t rx_telegram_id_ = 0; // queue counter uint8_t rx_telegram_id_ = 0; // queue counter
uint16_t telegram_count_ = 0; // # Rx received uint16_t telegram_count_ = 0; // # Rx received
@@ -234,7 +240,7 @@ class TxService : public EMSbus {
void send(); void send();
void add(const uint8_t operation, const uint8_t dest, const uint16_t type_id, const uint8_t offset, uint8_t * message_data, const uint8_t message_length); void add(const uint8_t operation, const uint8_t dest, const uint16_t type_id, const uint8_t offset, uint8_t * message_data, const uint8_t message_length);
void add(uint8_t * data, const uint8_t length); void add(uint8_t * data, const uint8_t length, bool front = false);
void read_request(const uint16_t type_id, const uint8_t dest, const uint8_t offset = 0); void read_request(const uint16_t type_id, const uint8_t dest, const uint8_t offset = 0);