From a4617aa0edd27b3f64a76371c9f19def4e29aee0 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 17 Mar 2025 20:44:47 +0100 Subject: [PATCH 1/4] add first ultra hacky AsyncUDP support I am not proud of this code to say the least. --- cores/portduino/AsyncUDP.cpp | 125 +++++++++++++++++++++++++++++++++++ cores/portduino/AsyncUDP.h | 119 +++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 cores/portduino/AsyncUDP.cpp create mode 100644 cores/portduino/AsyncUDP.h diff --git a/cores/portduino/AsyncUDP.cpp b/cores/portduino/AsyncUDP.cpp new file mode 100644 index 0000000..479ac6f --- /dev/null +++ b/cores/portduino/AsyncUDP.cpp @@ -0,0 +1,125 @@ +#include "AsyncUDP.h" + +void _asyncudp_async_cb(uv_async_t *handle) { + AsyncUDP *udp = (AsyncUDP *)handle->data; + udp->_DO_NOT_CALL_async_cb(); +}; + +AsyncUDP::AsyncUDP() { + _handler = NULL; + _connected = false; + uv_loop_init(&_loop); + _async.data = this; + uv_async_init(&_loop, &_async, _asyncudp_async_cb); +}; + +void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + buf->base = (char *)malloc(suggested_size); + buf->len = suggested_size; +}; + +void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { + AsyncUDP *udp = (AsyncUDP *)handle->data; + udp->_DO_NOT_CALL_uv_on_read(handle, nread, buf, addr, flags); +} + +void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { + _handlerMutex.lock(); + auto h = _handler; + _handlerMutex.unlock(); + if (h) { + AsyncUDPPacket packet((uint8_t*)buf->base, nread); + h(packet); + } + free(buf->base); +}; + +bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) { + if (_connected) { + return false; + } + // FIXME: implement error handling rather than raising SIGSEGV + if (uv_udp_init(&_loop, &_socket) < 0) { + raise(SIGSEGV); + } + _socket.data = this; + // FIXME: don't do bytes → string → bytes IP conversion + int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots + char addr_str[maxIpLength+1]; // +1 for null terminator + snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]); + addr_str[maxIpLength] = '\0'; + struct sockaddr uvAddr; + if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) { + raise(SIGSEGV); + } + if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) { + raise(SIGSEGV); + } + if (uv_udp_set_multicast_loop(&_socket, false) < 0) { + raise(SIGSEGV); + } + if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) { + raise(SIGSEGV); + } + if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) { + raise(SIGSEGV); + } + if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) { + raise(SIGSEGV); + } + + _ioThread = std::thread([this](){ + uv_run(&_loop, UV_RUN_DEFAULT); + }); + + _listenIP = addr; + _connected = true; + return true; +}; + +size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) { + auto task = std::make_unique((uint8_t*)data, len, addr, port); + _sendQueueMutex.lock(); + _sendQueue.push_back(std::move(task)); + _sendQueueMutex.unlock(); + uv_async_send(&_async); + return len; +}; + +void AsyncUDP::_DO_NOT_CALL_async_cb() { + _sendQueueMutex.lock(); + while (!_sendQueue.empty()) { + auto task = std::move(_sendQueue.back()); + _sendQueue.pop_back(); + _sendQueueMutex.unlock(); + _doWrite(task->data, task->len, task->addr, task->port); + _sendQueueMutex.lock(); + } + _sendQueueMutex.unlock(); +}; + +void _asyncudp_send_cb(uv_udp_send_t *req, int status) { + free(req); +}; + +void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) { + // FIXME: don't do bytes → string → bytes IP conversion + int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots + char addr_str[maxIpLength+1]; // +1 for null terminator + snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]); + addr_str[maxIpLength] = '\0'; + + // FIXME: implement error handling rather than raising SIGSEGV + struct sockaddr uvAddr; + if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) { + raise(SIGSEGV); + } + + uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t)); + uv_buf_t msg; + msg.base = (char *)data; + msg.len = len; + if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) { + raise(SIGSEGV); + } +}; \ No newline at end of file diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h new file mode 100644 index 0000000..bbfad2d --- /dev/null +++ b/cores/portduino/AsyncUDP.h @@ -0,0 +1,119 @@ +// FIXME: this is a really hacky implementation that just has the things required to make the firmware compile and run. + +#ifndef ESPASYNCUDP_H +#define ESPASYNCUDP_H + +#include +#include "IPAddress.h" +#include "Print.h" +#include +#include +#include +#include +#include + +class AsyncUDP; + +class AsyncUDPPacket final +{ +private: + uint8_t *_data; + size_t _len; + +protected: + AsyncUDPPacket(uint8_t* byte, size_t len) { + _data = byte; + _len = len; + }; + +public: + uint8_t * data() { + return _data; + }; + size_t length() { + return _len; + }; + + friend AsyncUDP; +}; + +class asyncUDPSendTask final { + protected: + uint8_t *data; + size_t len; + IPAddress addr; + uint16_t port; + + public: + asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) { + this->data = (uint8_t*)malloc(len); + memcpy(this->data, data, len); + this->len = len; + this->addr = addr; + this->port = port; + }; + + ~asyncUDPSendTask() { + free(data); + }; + + friend AsyncUDP; +}; + +typedef std::function AuPacketHandlerFunction; +typedef std::function AuPacketHandlerFunctionWithArg; + +class AsyncUDP final +{ +private: + std::mutex _handlerMutex; + AuPacketHandlerFunction _handler; + + std::mutex _sendQueueMutex; + // the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback + std::vector> _sendQueue; + + std::thread _ioThread; + + bool _connected; + IPAddress _listenIP; + + uv_loop_t _loop; + uv_udp_t _socket; + uv_async_t _async; + +public: + AsyncUDP(); + ~AsyncUDP() { + raise(SIGSEGV); // FIXME: implement closing and teardown + } + + void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) { + onPacket(std::bind(cb, arg, std::placeholders::_1)); + }; + void onPacket(AuPacketHandlerFunction cb) { + _handlerMutex.lock(); + _handler = cb; + _handlerMutex.unlock(); + }; + + bool listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl=1); + + size_t writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port); + + IPAddress listenIP() { + return _listenIP; + }; + operator bool() { + return _connected; + }; + + // do not call, used internally as callback from libuv's C callback + void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags); + void _DO_NOT_CALL_async_cb(); + +private: + void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port); +}; + +#endif From ce9335d6857ca5e720965a87fd1b6f07512c2ce9 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 20 Mar 2025 02:19:09 +0100 Subject: [PATCH 2/4] implement AsyncUDP teardown in destructor and review comments --- cores/portduino/AsyncUDP.cpp | 34 ++++++++++++++++++++-------------- cores/portduino/AsyncUDP.h | 8 ++++++-- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/cores/portduino/AsyncUDP.cpp b/cores/portduino/AsyncUDP.cpp index 479ac6f..74ea42f 100644 --- a/cores/portduino/AsyncUDP.cpp +++ b/cores/portduino/AsyncUDP.cpp @@ -1,4 +1,5 @@ #include "AsyncUDP.h" +#include "Utility.h" void _asyncudp_async_cb(uv_async_t *handle) { AsyncUDP *udp = (AsyncUDP *)handle->data; @@ -38,9 +39,8 @@ bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) if (_connected) { return false; } - // FIXME: implement error handling rather than raising SIGSEGV if (uv_udp_init(&_loop, &_socket) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_init failed"); } _socket.data = this; // FIXME: don't do bytes → string → bytes IP conversion @@ -49,23 +49,21 @@ bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]); addr_str[maxIpLength] = '\0'; struct sockaddr uvAddr; - if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) { - raise(SIGSEGV); - } + uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr); if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_bind failed"); } if (uv_udp_set_multicast_loop(&_socket, false) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_loop failed"); } if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_ttl failed"); } if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_set_membership failed"); } if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_recv_start failed"); } _ioThread = std::thread([this](){ @@ -96,6 +94,16 @@ void AsyncUDP::_DO_NOT_CALL_async_cb() { _sendQueueMutex.lock(); } _sendQueueMutex.unlock(); + if (_quit.load()) { + uv_udp_recv_stop(&_socket); + // FIXME: don't do bytes → string → bytes IP conversion + int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots + char addr_str[maxIpLength+1]; // +1 for null terminator + snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", _listenIP[0], _listenIP[1], _listenIP[2], _listenIP[3]); + addr_str[maxIpLength] = '\0'; + uv_udp_set_membership(&_socket, addr_str, NULL, UV_LEAVE_GROUP); + uv_stop(&_loop); + } }; void _asyncudp_send_cb(uv_udp_send_t *req, int status) { @@ -111,15 +119,13 @@ void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, u // FIXME: implement error handling rather than raising SIGSEGV struct sockaddr uvAddr; - if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) { - raise(SIGSEGV); - } + uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr); uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t)); uv_buf_t msg; msg.base = (char *)data; msg.len = len; if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) { - raise(SIGSEGV); + portduinoError("FIXME: implement proper error handling; uv_udp_send failed"); } }; \ No newline at end of file diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h index bbfad2d..920862d 100644 --- a/cores/portduino/AsyncUDP.h +++ b/cores/portduino/AsyncUDP.h @@ -3,10 +3,10 @@ #ifndef ESPASYNCUDP_H #define ESPASYNCUDP_H -#include #include "IPAddress.h" #include "Print.h" #include +#include #include #include #include @@ -73,6 +73,7 @@ class AsyncUDP final // the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback std::vector> _sendQueue; + std::atomic _quit; std::thread _ioThread; bool _connected; @@ -85,7 +86,10 @@ class AsyncUDP final public: AsyncUDP(); ~AsyncUDP() { - raise(SIGSEGV); // FIXME: implement closing and teardown + _quit.store(true); + uv_async_send(&_async); + _ioThread.join(); + uv_loop_close(&_loop); } void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) { From db40e94df2eb8d3bfa3b938df2d01e0eb029aad7 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 20 Mar 2025 02:54:38 +0100 Subject: [PATCH 3/4] clean up a bit AsyncUDP --- cores/portduino/AsyncUDP.cpp | 33 ++++++++++++++++++++++++--------- cores/portduino/AsyncUDP.h | 15 ++------------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/cores/portduino/AsyncUDP.cpp b/cores/portduino/AsyncUDP.cpp index 74ea42f..ee17075 100644 --- a/cores/portduino/AsyncUDP.cpp +++ b/cores/portduino/AsyncUDP.cpp @@ -4,7 +4,7 @@ void _asyncudp_async_cb(uv_async_t *handle) { AsyncUDP *udp = (AsyncUDP *)handle->data; udp->_DO_NOT_CALL_async_cb(); -}; +} AsyncUDP::AsyncUDP() { _handler = NULL; @@ -12,12 +12,27 @@ AsyncUDP::AsyncUDP() { uv_loop_init(&_loop); _async.data = this; uv_async_init(&_loop, &_async, _asyncudp_async_cb); -}; +} + +AsyncUDP::~AsyncUDP() { + _quit.store(true); + uv_async_send(&_async); + _ioThread.join(); + uv_loop_close(&_loop); +} + +asyncUDPSendTask::asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) { + this->data = (uint8_t*)malloc(len); + memcpy(this->data, data, len); + this->len = len; + this->addr = addr; + this->port = port; +} void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = (char *)malloc(suggested_size); buf->len = suggested_size; -}; +} void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { AsyncUDP *udp = (AsyncUDP *)handle->data; @@ -33,7 +48,7 @@ void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv h(packet); } free(buf->base); -}; +} bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) { if (_connected) { @@ -73,7 +88,7 @@ bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) _listenIP = addr; _connected = true; return true; -}; +} size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) { auto task = std::make_unique((uint8_t*)data, len, addr, port); @@ -82,7 +97,7 @@ size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, _sendQueueMutex.unlock(); uv_async_send(&_async); return len; -}; +} void AsyncUDP::_DO_NOT_CALL_async_cb() { _sendQueueMutex.lock(); @@ -104,11 +119,11 @@ void AsyncUDP::_DO_NOT_CALL_async_cb() { uv_udp_set_membership(&_socket, addr_str, NULL, UV_LEAVE_GROUP); uv_stop(&_loop); } -}; +} void _asyncudp_send_cb(uv_udp_send_t *req, int status) { free(req); -}; +} void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) { // FIXME: don't do bytes → string → bytes IP conversion @@ -128,4 +143,4 @@ void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, u if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) { portduinoError("FIXME: implement proper error handling; uv_udp_send failed"); } -}; \ No newline at end of file +} diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h index 920862d..55fd6bb 100644 --- a/cores/portduino/AsyncUDP.h +++ b/cores/portduino/AsyncUDP.h @@ -45,13 +45,7 @@ class asyncUDPSendTask final { uint16_t port; public: - asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) { - this->data = (uint8_t*)malloc(len); - memcpy(this->data, data, len); - this->len = len; - this->addr = addr; - this->port = port; - }; + asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port); ~asyncUDPSendTask() { free(data); @@ -85,12 +79,7 @@ class AsyncUDP final public: AsyncUDP(); - ~AsyncUDP() { - _quit.store(true); - uv_async_send(&_async); - _ioThread.join(); - uv_loop_close(&_loop); - } + ~AsyncUDP(); void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) { onPacket(std::bind(cb, arg, std::placeholders::_1)); From b3cfafd3647bfc9bbd732ce681afa0a1664086b4 Mon Sep 17 00:00:00 2001 From: Ben Meadors Date: Wed, 19 Mar 2025 21:06:51 -0500 Subject: [PATCH 4/4] Update AsyncUDP.h --- cores/portduino/AsyncUDP.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h index 55fd6bb..e98fa76 100644 --- a/cores/portduino/AsyncUDP.h +++ b/cores/portduino/AsyncUDP.h @@ -1,5 +1,3 @@ -// FIXME: this is a really hacky implementation that just has the things required to make the firmware compile and run. - #ifndef ESPASYNCUDP_H #define ESPASYNCUDP_H