This commit is contained in:
proddy
2020-10-07 19:16:55 +02:00
parent 8bf55e74c5
commit 07992c391b
3 changed files with 218 additions and 228 deletions

View File

@@ -23,7 +23,6 @@ class MsgPackDeserializer {
_stringStorage(stringStorage), _stringStorage(stringStorage),
_error(DeserializationError::Ok) {} _error(DeserializationError::Ok) {}
// TODO: add support for filter
DeserializationError parse(VariantData &variant, AllowAllFilter, DeserializationError parse(VariantData &variant, AllowAllFilter,
NestingLimit nestingLimit) { NestingLimit nestingLimit) {
parseVariant(variant, nestingLimit); parseVariant(variant, nestingLimit);

View File

@@ -29,49 +29,49 @@
#include "ESPAsyncTCPbuffer.h" #include "ESPAsyncTCPbuffer.h"
AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient* client) { AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient * client) {
if(client == NULL) { if (client == NULL) {
DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n"); DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n");
panic(); panic();
} }
_client = client; _client = client;
_TXbufferWrite = new (std::nothrow) cbuf(TCP_MSS); _TXbufferWrite = new (std::nothrow) cbuf(TCP_MSS);
_TXbufferRead = _TXbufferWrite; _TXbufferRead = _TXbufferWrite;
_RXbuffer = new (std::nothrow) cbuf(100); _RXbuffer = new (std::nothrow) cbuf(100);
_RXmode = ATB_RX_MODE_FREE; _RXmode = ATB_RX_MODE_FREE;
_rxSize = 0; _rxSize = 0;
_rxTerminator = 0x00; _rxTerminator = 0x00;
_rxReadBytesPtr = NULL; _rxReadBytesPtr = NULL;
_rxReadStringPtr = NULL; _rxReadStringPtr = NULL;
_cbDisconnect = NULL; _cbDisconnect = NULL;
_cbRX = NULL; _cbRX = NULL;
_cbDone = NULL; _cbDone = NULL;
_attachCallbacks(); _attachCallbacks();
} }
AsyncTCPbuffer::~AsyncTCPbuffer() { AsyncTCPbuffer::~AsyncTCPbuffer() {
if(_client) { if (_client) {
_client->close(); _client->close();
} }
if(_RXbuffer) { if (_RXbuffer) {
delete _RXbuffer; delete _RXbuffer;
_RXbuffer = NULL; _RXbuffer = NULL;
} }
if(_TXbufferWrite) { if (_TXbufferWrite) {
// will be deleted in _TXbufferRead chain // will be deleted in _TXbufferRead chain
_TXbufferWrite = NULL; _TXbufferWrite = NULL;
} }
if(_TXbufferRead) { if (_TXbufferRead) {
cbuf * next = _TXbufferRead->next; cbuf * next = _TXbufferRead->next;
delete _TXbufferRead; delete _TXbufferRead;
while(next != NULL) { while (next != NULL) {
_TXbufferRead = next; _TXbufferRead = next;
next = _TXbufferRead->next; next = _TXbufferRead->next;
delete _TXbufferRead; delete _TXbufferRead;
} }
_TXbufferRead = NULL; _TXbufferRead = NULL;
@@ -86,12 +86,12 @@ size_t AsyncTCPbuffer::write(uint8_t data) {
return write(&data, 1); return write(&data, 1);
} }
size_t AsyncTCPbuffer::write(const char* data) { size_t AsyncTCPbuffer::write(const char * data) {
return write((const uint8_t *) data, strlen(data)); return write((const uint8_t *)data, strlen(data));
} }
size_t AsyncTCPbuffer::write(const char *data, size_t len) { size_t AsyncTCPbuffer::write(const char * data, size_t len) {
return write((const uint8_t *) data, len); return write((const uint8_t *)data, len);
} }
/** /**
@@ -100,29 +100,28 @@ size_t AsyncTCPbuffer::write(const char *data, size_t len) {
* @param len * @param len
* @return * @return
*/ */
size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) { size_t AsyncTCPbuffer::write(const uint8_t * data, size_t len) {
if(_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) { if (_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) {
return 0; return 0;
} }
size_t bytesLeft = len; size_t bytesLeft = len;
while(bytesLeft) { while (bytesLeft) {
size_t w = _TXbufferWrite->write((const char*) data, bytesLeft); size_t w = _TXbufferWrite->write((const char *)data, bytesLeft);
bytesLeft -= w; bytesLeft -= w;
data += w; data += w;
_sendBuffer(); _sendBuffer();
// add new buffer since we have more data // add new buffer since we have more data
if(_TXbufferWrite->full() && bytesLeft > 0) { if (_TXbufferWrite->full() && bytesLeft > 0) {
// to less ram!!! // to less ram!!!
if(ESP.getFreeHeap() < 4096) { if (ESP.getFreeHeap() < 4096) {
DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n"); DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n");
return (len - bytesLeft); return (len - bytesLeft);
} }
cbuf * next = new (std::nothrow) cbuf(TCP_MSS); cbuf * next = new (std::nothrow) cbuf(TCP_MSS);
if(next == NULL) { if (next == NULL) {
DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n"); DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n");
panic(); panic();
} else { } else {
@@ -138,19 +137,18 @@ size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) {
} }
return len; return len;
} }
/** /**
* wait until all data has send out * wait until all data has send out
*/ */
void AsyncTCPbuffer::flush() { void AsyncTCPbuffer::flush() {
while(!_TXbufferWrite->empty()) { while (!_TXbufferWrite->empty()) {
while(connected() && !_client->canSend()) { while (connected() && !_client->canSend()) {
delay(0); delay(0);
} }
if(!connected()) if (!connected())
return; return;
_sendBuffer(); _sendBuffer();
} }
} }
@@ -160,16 +158,16 @@ void AsyncTCPbuffer::noCallback() {
} }
void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) { void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) {
if(_client == NULL) { if (_client == NULL) {
return; return;
} }
DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator); DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator);
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone = done; _cbDone = done;
_rxReadStringPtr = str; _rxReadStringPtr = str;
_rxTerminator = terminator; _rxTerminator = terminator;
_rxSize = 0; _rxSize = 0;
_RXmode = ATB_RX_MODE_TERMINATOR_STRING; _RXmode = ATB_RX_MODE_TERMINATOR_STRING;
} }
/* /*
@@ -188,30 +186,30 @@ void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbuff
} }
*/ */
void AsyncTCPbuffer::readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done) { void AsyncTCPbuffer::readBytes(char * buffer, size_t length, AsyncTCPbufferDoneCb done) {
if(_client == NULL) { if (_client == NULL) {
return; return;
} }
DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length); DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length);
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone = done; _cbDone = done;
_rxReadBytesPtr = (uint8_t *) buffer; _rxReadBytesPtr = (uint8_t *)buffer;
_rxSize = length; _rxSize = length;
_RXmode = ATB_RX_MODE_READ_BYTES; _RXmode = ATB_RX_MODE_READ_BYTES;
} }
void AsyncTCPbuffer::readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) { void AsyncTCPbuffer::readBytes(uint8_t * buffer, size_t length, AsyncTCPbufferDoneCb done) {
readBytes((char *) buffer, length, done); readBytes((char *)buffer, length, done);
} }
void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) { void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) {
if(_client == NULL) { if (_client == NULL) {
return; return;
} }
DEBUG_ASYNC_TCP("[A-TCP] onData\n"); DEBUG_ASYNC_TCP("[A-TCP] onData\n");
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone = NULL; _cbDone = NULL;
_cbRX = cb; _cbRX = cb;
_RXmode = ATB_RX_MODE_FREE; _RXmode = ATB_RX_MODE_FREE;
} }
@@ -220,44 +218,43 @@ void AsyncTCPbuffer::onDisconnect(AsyncTCPbufferDisconnectCb cb) {
} }
IPAddress AsyncTCPbuffer::remoteIP() { IPAddress AsyncTCPbuffer::remoteIP() {
if(!_client) { if (!_client) {
return IPAddress(0U); return IPAddress(0U);
} }
return _client->remoteIP(); return _client->remoteIP();
} }
uint16_t AsyncTCPbuffer::remotePort() { uint16_t AsyncTCPbuffer::remotePort() {
if(!_client) { if (!_client) {
return 0; return 0;
} }
return _client->remotePort(); return _client->remotePort();
} }
bool AsyncTCPbuffer::connected() { bool AsyncTCPbuffer::connected() {
if(!_client) { if (!_client) {
return false; return false;
} }
return _client->connected(); return _client->connected();
} }
void AsyncTCPbuffer::stop() { void AsyncTCPbuffer::stop() {
if (!_client) {
if(!_client) {
return; return;
} }
_client->stop(); _client->stop();
_client = NULL; _client = NULL;
if(_cbDone) { if (_cbDone) {
switch(_RXmode) { switch (_RXmode) {
case ATB_RX_MODE_READ_BYTES: case ATB_RX_MODE_READ_BYTES:
case ATB_RX_MODE_TERMINATOR: case ATB_RX_MODE_TERMINATOR:
case ATB_RX_MODE_TERMINATOR_STRING: case ATB_RX_MODE_TERMINATOR_STRING:
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone(false, NULL); _cbDone(false, NULL);
break; break;
default: default:
break; break;
} }
} }
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
@@ -274,56 +271,66 @@ void AsyncTCPbuffer::close() {
* attachCallbacks to AsyncClient class * attachCallbacks to AsyncClient class
*/ */
void AsyncTCPbuffer::_attachCallbacks() { void AsyncTCPbuffer::_attachCallbacks() {
if(!_client) { if (!_client) {
return; return;
} }
DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n"); DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n");
_client->onPoll([](void *obj, AsyncClient* c) { _client->onPoll(
(void)c; [](void * obj, AsyncClient * c) {
AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); (void)c;
if((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) { AsyncTCPbuffer * b = ((AsyncTCPbuffer *)(obj));
b->_sendBuffer(); if ((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) {
} b->_sendBuffer();
// if(!b->_RXbuffer->empty()) { }
// b->_handleRxBuffer(NULL, 0); // if(!b->_RXbuffer->empty()) {
// } // b->_handleRxBuffer(NULL, 0);
}, this); // }
},
this);
_client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time) { _client->onAck(
(void)c; [](void * obj, AsyncClient * c, size_t len, uint32_t time) {
(void)len; (void)c;
(void)time; (void)len;
DEBUG_ASYNC_TCP("[A-TCP] onAck\n"); (void)time;
((AsyncTCPbuffer*)(obj))->_sendBuffer(); DEBUG_ASYNC_TCP("[A-TCP] onAck\n");
}, this); ((AsyncTCPbuffer *)(obj))->_sendBuffer();
},
this);
_client->onDisconnect([](void *obj, AsyncClient* c) { _client->onDisconnect(
DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n"); [](void * obj, AsyncClient * c) {
AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n");
b->_client = NULL; AsyncTCPbuffer * b = ((AsyncTCPbuffer *)(obj));
bool del = true; b->_client = NULL;
if(b->_cbDisconnect) { bool del = true;
del = b->_cbDisconnect(b); if (b->_cbDisconnect) {
} del = b->_cbDisconnect(b);
delete c; }
if(del) { delete c;
delete b; if (del) {
} delete b;
}, this); }
},
this);
_client->onData([](void *obj, AsyncClient* c, void *buf, size_t len) { _client->onData(
(void)c; [](void * obj, AsyncClient * c, void * buf, size_t len) {
AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj)); (void)c;
b->_rxData((uint8_t *)buf, len); AsyncTCPbuffer * b = ((AsyncTCPbuffer *)(obj));
}, this); b->_rxData((uint8_t *)buf, len);
},
this);
_client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ _client->onTimeout(
(void)obj; [](void * obj, AsyncClient * c, uint32_t time) {
(void)time; (void)obj;
DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n"); (void)time;
c->close(); DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n");
}, this); c->close();
},
this);
DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n"); DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n");
} }
@@ -334,20 +341,19 @@ void AsyncTCPbuffer::_attachCallbacks() {
void AsyncTCPbuffer::_sendBuffer() { void AsyncTCPbuffer::_sendBuffer() {
//DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n"); //DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n");
size_t available = _TXbufferRead->available(); size_t available = _TXbufferRead->available();
if(available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) { if (available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) {
return; return;
} }
while(connected() && (_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) { while (connected() && (_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) {
available = _TXbufferRead->available(); available = _TXbufferRead->available();
if(available > _client->space()) { if (available > _client->space()) {
available = _client->space(); available = _client->space();
} }
char *out = new (std::nothrow) char[available]; char * out = new (std::nothrow) char[available];
if(out == NULL) { if (out == NULL) {
DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n"); DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n");
return; return;
} }
@@ -356,10 +362,10 @@ void AsyncTCPbuffer::_sendBuffer() {
_TXbufferRead->peek(out, available); _TXbufferRead->peek(out, available);
// send data // send data
size_t send = _client->write((const char*) out, available); size_t send = _client->write((const char *)out, available);
if(send != available) { if (send != available) {
DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available); DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available);
if(!connected()) { if (!connected()) {
DEBUG_ASYNC_TCP("[A-TCP] incomplete transfer, connection lost.\n"); DEBUG_ASYNC_TCP("[A-TCP] incomplete transfer, connection lost.\n");
} }
} }
@@ -368,8 +374,8 @@ void AsyncTCPbuffer::_sendBuffer() {
_TXbufferRead->remove(send); _TXbufferRead->remove(send);
// if buffer is empty and there is a other buffer in chain delete the empty one // if buffer is empty and there is a other buffer in chain delete the empty one
if(_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) { if (_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) {
cbuf * old = _TXbufferRead; cbuf * old = _TXbufferRead;
_TXbufferRead = _TXbufferRead->next; _TXbufferRead = _TXbufferRead->next;
delete old; delete old;
DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n"); DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n");
@@ -377,7 +383,6 @@ void AsyncTCPbuffer::_sendBuffer() {
delete out; delete out;
} }
} }
/** /**
@@ -385,12 +390,12 @@ void AsyncTCPbuffer::_sendBuffer() {
* @param buf * @param buf
* @param len * @param len
*/ */
void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) { void AsyncTCPbuffer::_rxData(uint8_t * buf, size_t len) {
if(!_client || !_client->connected()) { if (!_client || !_client->connected()) {
DEBUG_ASYNC_TCP("[A-TCP] not connected!\n"); DEBUG_ASYNC_TCP("[A-TCP] not connected!\n");
return; return;
} }
if(!_RXbuffer) { if (!_RXbuffer) {
DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n"); DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n");
return; return;
} }
@@ -398,14 +403,14 @@ void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) {
size_t handled = 0; size_t handled = 0;
if(_RXmode != ATB_RX_MODE_NONE) { if (_RXmode != ATB_RX_MODE_NONE) {
handled = _handleRxBuffer((uint8_t *) buf, len); handled = _handleRxBuffer((uint8_t *)buf, len);
buf += handled; buf += handled;
len -= handled; len -= handled;
// handle as much as possible before using the buffer // handle as much as possible before using the buffer
if(_RXbuffer->empty()) { if (_RXbuffer->empty()) {
while(_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) { while (_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) {
handled = _handleRxBuffer(buf, len); handled = _handleRxBuffer(buf, len);
buf += handled; buf += handled;
len -= handled; len -= handled;
@@ -413,88 +418,86 @@ void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) {
} }
} }
if(len > 0) { if (len > 0) {
if (_RXbuffer->room() < len) {
if(_RXbuffer->room() < len) {
// to less space // to less space
DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n"); DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n");
_RXbuffer->resizeAdd((len + _RXbuffer->room())); _RXbuffer->resizeAdd((len + _RXbuffer->room()));
if(_RXbuffer->room() < len) { if (_RXbuffer->room() < len) {
DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room()); DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room());
} }
} }
_RXbuffer->write((const char *) (buf), len); _RXbuffer->write((const char *)(buf), len);
} }
if(!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) { if (!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) {
// handle as much as possible data in buffer // handle as much as possible data in buffer
handled = _handleRxBuffer(NULL, 0); handled = _handleRxBuffer(NULL, 0);
while(_RXmode != ATB_RX_MODE_NONE && handled != 0) { while (_RXmode != ATB_RX_MODE_NONE && handled != 0) {
handled = _handleRxBuffer(NULL, 0); handled = _handleRxBuffer(NULL, 0);
} }
} }
// clean up ram // clean up ram
if(_RXbuffer->empty() && _RXbuffer->room() != 100) { if (_RXbuffer->empty() && _RXbuffer->room() != 100) {
_RXbuffer->resize(100); _RXbuffer->resize(100);
} }
} }
/** /**
* *
*/ */
size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) { size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t * buf, size_t len) {
if(!_client || !_client->connected() || _RXbuffer == NULL) { if (!_client || !_client->connected() || _RXbuffer == NULL) {
return 0; return 0;
} }
DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode); DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode);
size_t BufferAvailable = _RXbuffer->available(); size_t BufferAvailable = _RXbuffer->available();
size_t r = 0; size_t r = 0;
if(_RXmode == ATB_RX_MODE_NONE) { if (_RXmode == ATB_RX_MODE_NONE) {
return 0; return 0;
} else if(_RXmode == ATB_RX_MODE_FREE) { } else if (_RXmode == ATB_RX_MODE_FREE) {
if(_cbRX == NULL) { if (_cbRX == NULL) {
return 0; return 0;
} }
if(BufferAvailable > 0) { if (BufferAvailable > 0) {
uint8_t * b = new (std::nothrow) uint8_t[BufferAvailable]; uint8_t * b = new (std::nothrow) uint8_t[BufferAvailable];
if(b == NULL){ if (b == NULL) {
panic(); //TODO: What action should this be ? panic();
} }
_RXbuffer->peek((char *) b, BufferAvailable); _RXbuffer->peek((char *)b, BufferAvailable);
r = _cbRX(b, BufferAvailable); r = _cbRX(b, BufferAvailable);
_RXbuffer->remove(r); _RXbuffer->remove(r);
} }
if(r == BufferAvailable && buf && (len > 0)) { if (r == BufferAvailable && buf && (len > 0)) {
return _cbRX(buf, len); return _cbRX(buf, len);
} else { } else {
return 0; return 0;
} }
} else if(_RXmode == ATB_RX_MODE_READ_BYTES) { } else if (_RXmode == ATB_RX_MODE_READ_BYTES) {
if(_rxReadBytesPtr == NULL || _cbDone == NULL) { if (_rxReadBytesPtr == NULL || _cbDone == NULL) {
return 0; return 0;
} }
size_t newReadCount = 0; size_t newReadCount = 0;
if(BufferAvailable) { if (BufferAvailable) {
r = _RXbuffer->read((char *) _rxReadBytesPtr, _rxSize); r = _RXbuffer->read((char *)_rxReadBytesPtr, _rxSize);
_rxSize -= r; _rxSize -= r;
_rxReadBytesPtr += r; _rxReadBytesPtr += r;
} }
if(_RXbuffer->empty() && (len > 0) && buf) { if (_RXbuffer->empty() && (len > 0) && buf) {
r = len; r = len;
if(r > _rxSize) { if (r > _rxSize) {
r = _rxSize; r = _rxSize;
} }
memcpy(_rxReadBytesPtr, buf, r); memcpy(_rxReadBytesPtr, buf, r);
@@ -503,7 +506,7 @@ size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) {
newReadCount += r; newReadCount += r;
} }
if(_rxSize == 0) { if (_rxSize == 0) {
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone(true, NULL); _cbDone(true, NULL);
} }
@@ -511,19 +514,18 @@ size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) {
// add left over bytes to Buffer // add left over bytes to Buffer
return newReadCount; return newReadCount;
} else if(_RXmode == ATB_RX_MODE_TERMINATOR) { } else if (_RXmode == ATB_RX_MODE_TERMINATOR) {
// TODO implement read terminator non string //
} else if (_RXmode == ATB_RX_MODE_TERMINATOR_STRING) {
} else if(_RXmode == ATB_RX_MODE_TERMINATOR_STRING) { if (_rxReadStringPtr == NULL || _cbDone == NULL) {
if(_rxReadStringPtr == NULL || _cbDone == NULL) {
return 0; return 0;
} }
// handle Buffer // handle Buffer
if(BufferAvailable > 0) { if (BufferAvailable > 0) {
while(!_RXbuffer->empty()) { while (!_RXbuffer->empty()) {
char c = _RXbuffer->read(); char c = _RXbuffer->read();
if(c == _rxTerminator || c == 0x00) { if (c == _rxTerminator || c == 0x00) {
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone(true, _rxReadStringPtr); _cbDone(true, _rxReadStringPtr);
return 0; return 0;
@@ -533,13 +535,13 @@ size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) {
} }
} }
if(_RXbuffer->empty() && (len > 0) && buf) { if (_RXbuffer->empty() && (len > 0) && buf) {
size_t newReadCount = 0; size_t newReadCount = 0;
while(newReadCount < len) { while (newReadCount < len) {
char c = (char) *buf; char c = (char)*buf;
buf++; buf++;
newReadCount++; newReadCount++;
if(c == _rxTerminator || c == 0x00) { if (c == _rxTerminator || c == 0x00) {
_RXmode = ATB_RX_MODE_NONE; _RXmode = ATB_RX_MODE_NONE;
_cbDone(true, _rxReadStringPtr); _cbDone(true, _rxReadStringPtr);
return newReadCount; return newReadCount;

View File

@@ -38,81 +38,70 @@
typedef enum { typedef enum { ATB_RX_MODE_NONE, ATB_RX_MODE_FREE, ATB_RX_MODE_READ_BYTES, ATB_RX_MODE_TERMINATOR, ATB_RX_MODE_TERMINATOR_STRING } atbRxMode_t;
ATB_RX_MODE_NONE,
ATB_RX_MODE_FREE,
ATB_RX_MODE_READ_BYTES,
ATB_RX_MODE_TERMINATOR,
ATB_RX_MODE_TERMINATOR_STRING
} atbRxMode_t;
class AsyncTCPbuffer: public Print { class AsyncTCPbuffer : public Print {
public:
typedef std::function<size_t(uint8_t * payload, size_t length)> AsyncTCPbufferDataCb;
typedef std::function<void(bool ok, void * ret)> AsyncTCPbufferDoneCb;
typedef std::function<bool(AsyncTCPbuffer * obj)> AsyncTCPbufferDisconnectCb;
public: AsyncTCPbuffer(AsyncClient * c);
virtual ~AsyncTCPbuffer();
typedef std::function<size_t(uint8_t * payload, size_t length)> AsyncTCPbufferDataCb; size_t write(String & data);
typedef std::function<void(bool ok, void * ret)> AsyncTCPbufferDoneCb; size_t write(uint8_t data);
typedef std::function<bool(AsyncTCPbuffer * obj)> AsyncTCPbufferDisconnectCb; size_t write(const char * data);
size_t write(const char * data, size_t len);
size_t write(const uint8_t * data, size_t len);
AsyncTCPbuffer(AsyncClient* c); void flush();
virtual ~AsyncTCPbuffer();
size_t write(String & data); void noCallback();
size_t write(uint8_t data);
size_t write(const char* data);
size_t write(const char *data, size_t len);
size_t write(const uint8_t *data, size_t len);
void flush(); void readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done);
void noCallback(); //void readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done);
//void readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done);
void readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done); void readBytes(char * buffer, size_t length, AsyncTCPbufferDoneCb done);
void readBytes(uint8_t * buffer, size_t length, AsyncTCPbufferDoneCb done);
// TODO implement read terminator non string // void setTimeout(size_t timeout);
//void readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done);
//void readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done);
void readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done); void onData(AsyncTCPbufferDataCb cb);
void readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done); void onDisconnect(AsyncTCPbufferDisconnectCb cb);
// TODO implement IPAddress remoteIP();
// void setTimeout(size_t timeout); uint16_t remotePort();
IPAddress localIP();
uint16_t localPort();
void onData(AsyncTCPbufferDataCb cb); bool connected();
void onDisconnect(AsyncTCPbufferDisconnectCb cb);
IPAddress remoteIP(); void stop();
uint16_t remotePort(); void close();
IPAddress localIP();
uint16_t localPort();
bool connected(); protected:
AsyncClient * _client;
cbuf * _TXbufferRead;
cbuf * _TXbufferWrite;
cbuf * _RXbuffer;
atbRxMode_t _RXmode;
size_t _rxSize;
char _rxTerminator;
uint8_t * _rxReadBytesPtr;
String * _rxReadStringPtr;
void stop(); AsyncTCPbufferDataCb _cbRX;
void close(); AsyncTCPbufferDoneCb _cbDone;
AsyncTCPbufferDisconnectCb _cbDisconnect;
protected:
AsyncClient* _client;
cbuf * _TXbufferRead;
cbuf * _TXbufferWrite;
cbuf * _RXbuffer;
atbRxMode_t _RXmode;
size_t _rxSize;
char _rxTerminator;
uint8_t * _rxReadBytesPtr;
String * _rxReadStringPtr;
AsyncTCPbufferDataCb _cbRX;
AsyncTCPbufferDoneCb _cbDone;
AsyncTCPbufferDisconnectCb _cbDisconnect;
void _attachCallbacks();
void _sendBuffer();
void _on_close();
void _rxData(uint8_t *buf, size_t len);
size_t _handleRxBuffer(uint8_t *buf, size_t len);
void _attachCallbacks();
void _sendBuffer();
void _on_close();
void _rxData(uint8_t * buf, size_t len);
size_t _handleRxBuffer(uint8_t * buf, size_t len);
}; };
#endif /* ESPASYNCTCPBUFFER_H_ */ #endif /* ESPASYNCTCPBUFFER_H_ */