Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add first ultra hacky AsyncUDP support #48

Merged
merged 4 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions cores/portduino/AsyncUDP.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#include "AsyncUDP.h"
#include "Utility.h"

void _asyncudp_async_cb(uv_async_t *handle) {
AsyncUDP *udp = (AsyncUDP *)handle->data;
udp->_DO_NOT_CALL_async_cb();
}

AsyncUDP::AsyncUDP() {
_handler = NULL;
_connected = false;
uv_loop_init(&_loop);
_async.data = this;
uv_async_init(&_loop, &_async, _asyncudp_async_cb);
}

AsyncUDP::~AsyncUDP() {
_quit.store(true);
uv_async_send(&_async);
_ioThread.join();
uv_loop_close(&_loop);
}

asyncUDPSendTask::asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) {
this->data = (uint8_t*)malloc(len);
memcpy(this->data, data, len);
this->len = len;
this->addr = addr;
this->port = port;
}

void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = (char *)malloc(suggested_size);
buf->len = suggested_size;
}

void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
AsyncUDP *udp = (AsyncUDP *)handle->data;
udp->_DO_NOT_CALL_uv_on_read(handle, nread, buf, addr, flags);
}

void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
_handlerMutex.lock();
auto h = _handler;
_handlerMutex.unlock();
if (h) {
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
h(packet);
}
free(buf->base);
}

bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) {
if (_connected) {
return false;
}
if (uv_udp_init(&_loop, &_socket) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_init failed");
}
_socket.data = this;
// FIXME: don't do bytes → string → bytes IP conversion
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
char addr_str[maxIpLength+1]; // +1 for null terminator
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
addr_str[maxIpLength] = '\0';
struct sockaddr uvAddr;
uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr);
if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_bind failed");
}
if (uv_udp_set_multicast_loop(&_socket, false) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_loop failed");
}
if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_set_multicast_ttl failed");
}
if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_set_membership failed");
}
if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_recv_start failed");
}

_ioThread = std::thread([this](){
uv_run(&_loop, UV_RUN_DEFAULT);
});

_listenIP = addr;
_connected = true;
return true;
}

size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
auto task = std::make_unique<asyncUDPSendTask>((uint8_t*)data, len, addr, port);
_sendQueueMutex.lock();
_sendQueue.push_back(std::move(task));
_sendQueueMutex.unlock();
uv_async_send(&_async);
return len;
}

void AsyncUDP::_DO_NOT_CALL_async_cb() {
_sendQueueMutex.lock();
while (!_sendQueue.empty()) {
auto task = std::move(_sendQueue.back());
_sendQueue.pop_back();
_sendQueueMutex.unlock();
_doWrite(task->data, task->len, task->addr, task->port);
_sendQueueMutex.lock();
}
_sendQueueMutex.unlock();
if (_quit.load()) {
uv_udp_recv_stop(&_socket);
// FIXME: don't do bytes → string → bytes IP conversion
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
char addr_str[maxIpLength+1]; // +1 for null terminator
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", _listenIP[0], _listenIP[1], _listenIP[2], _listenIP[3]);
addr_str[maxIpLength] = '\0';
uv_udp_set_membership(&_socket, addr_str, NULL, UV_LEAVE_GROUP);
uv_stop(&_loop);
}
}

void _asyncudp_send_cb(uv_udp_send_t *req, int status) {
free(req);
}

void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
// FIXME: don't do bytes → string → bytes IP conversion
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
char addr_str[maxIpLength+1]; // +1 for null terminator
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
addr_str[maxIpLength] = '\0';

// FIXME: implement error handling rather than raising SIGSEGV
struct sockaddr uvAddr;
uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr);

uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t));
uv_buf_t msg;
msg.base = (char *)data;
msg.len = len;
if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) {
portduinoError("FIXME: implement proper error handling; uv_udp_send failed");
}
}
110 changes: 110 additions & 0 deletions cores/portduino/AsyncUDP.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#ifndef ESPASYNCUDP_H
#define ESPASYNCUDP_H

#include "IPAddress.h"
#include "Print.h"
#include <functional>
#include <atomic>
#include <mutex>
#include <memory>
#include <thread>
#include <uv.h>

class AsyncUDP;

class AsyncUDPPacket final
{
private:
uint8_t *_data;
size_t _len;

protected:
AsyncUDPPacket(uint8_t* byte, size_t len) {
_data = byte;
_len = len;
};

public:
uint8_t * data() {
return _data;
};
size_t length() {
return _len;
};

friend AsyncUDP;
};

class asyncUDPSendTask final {
protected:
uint8_t *data;
size_t len;
IPAddress addr;
uint16_t port;

public:
asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port);

~asyncUDPSendTask() {
free(data);
};

friend AsyncUDP;
};

typedef std::function<void(AsyncUDPPacket& packet)> AuPacketHandlerFunction;
typedef std::function<void(void * arg, AsyncUDPPacket& packet)> AuPacketHandlerFunctionWithArg;

class AsyncUDP final
{
private:
std::mutex _handlerMutex;
AuPacketHandlerFunction _handler;

std::mutex _sendQueueMutex;
// the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback
std::vector<std::unique_ptr<asyncUDPSendTask>> _sendQueue;

std::atomic<bool> _quit;
std::thread _ioThread;

bool _connected;
IPAddress _listenIP;

uv_loop_t _loop;
uv_udp_t _socket;
uv_async_t _async;

public:
AsyncUDP();
~AsyncUDP();

void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) {
onPacket(std::bind(cb, arg, std::placeholders::_1));
};
void onPacket(AuPacketHandlerFunction cb) {
_handlerMutex.lock();
_handler = cb;
_handlerMutex.unlock();
};

bool listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl=1);

size_t writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);

IPAddress listenIP() {
return _listenIP;
};
operator bool() {
return _connected;
};

// do not call, used internally as callback from libuv's C callback
void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
void _DO_NOT_CALL_async_cb();

private:
void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
};

#endif