Skip to content

Commit 21969db

Browse files
committed
add first ultra hacky AsyncUDP support
I am not proud of this code to say the least.
1 parent e54db5e commit 21969db

File tree

2 files changed

+251
-0
lines changed

2 files changed

+251
-0
lines changed

cores/portduino/AsyncUDP.cpp

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#include "AsyncUDP.h"
2+
3+
void _asyncudp_async_cb(uv_async_t *handle) {
4+
AsyncUDP *udp = (AsyncUDP *)handle->data;
5+
udp->_DO_NOT_CALL_async_cb();
6+
};
7+
8+
AsyncUDP::AsyncUDP() {
9+
_handler = NULL;
10+
_connected = false;
11+
uv_loop_init(&_loop);
12+
_async.data = this;
13+
uv_async_init(&_loop, &_async, _asyncudp_async_cb);
14+
};
15+
16+
void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
17+
buf->base = (char *)malloc(suggested_size);
18+
buf->len = suggested_size;
19+
};
20+
21+
void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
22+
// on the handle.data void pointer cast to AsyncUDP and call the _on_read function
23+
AsyncUDP *udp = (AsyncUDP *)handle->data;
24+
udp->_DO_NOT_CALL_uv_on_read(handle, nread, buf, addr, flags);
25+
}
26+
27+
void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
28+
if (nread == 0) {
29+
// no data
30+
return;
31+
}
32+
33+
_handlerMutex.lock();
34+
auto h = _handler;
35+
_handlerMutex.unlock();
36+
if (h) {
37+
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
38+
h(packet);
39+
}
40+
free(buf->base);
41+
};
42+
43+
bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) {
44+
if (_connected) {
45+
return false;
46+
}
47+
// FIXME: implement error handling rather than raising SIGSEGV
48+
if (uv_udp_init(&_loop, &_socket) < 0) {
49+
raise(SIGSEGV);
50+
}
51+
_socket.data = this;
52+
// FIXME: don't do bytes → string → bytes IP conversion
53+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
54+
char addr_str[maxIpLength+1]; // +1 for null terminator
55+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
56+
addr_str[maxIpLength] = '\0';
57+
struct sockaddr uvAddr;
58+
if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) {
59+
raise(SIGSEGV);
60+
}
61+
if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) {
62+
raise(SIGSEGV);
63+
}
64+
if (uv_udp_set_multicast_loop(&_socket, false) < 0) {
65+
raise(SIGSEGV);
66+
}
67+
if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) {
68+
raise(SIGSEGV);
69+
}
70+
if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) {
71+
raise(SIGSEGV);
72+
}
73+
if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) {
74+
raise(SIGSEGV);
75+
}
76+
77+
_ioThread = std::thread([this](){
78+
uv_run(&_loop, UV_RUN_DEFAULT);
79+
});
80+
81+
_listenIP = addr;
82+
_connected = true;
83+
return true;
84+
};
85+
86+
size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
87+
auto task = std::make_unique<asyncUDPSendTask>((uint8_t*)data, len, addr, port);
88+
_sendQueueMutex.lock();
89+
_sendQueue.push_back(std::move(task));
90+
_sendQueueMutex.unlock();
91+
uv_async_send(&_async);
92+
return len;
93+
};
94+
95+
void AsyncUDP::_DO_NOT_CALL_async_cb() {
96+
_sendQueueMutex.lock();
97+
while (!_sendQueue.empty()) {
98+
auto task = std::move(_sendQueue.back());
99+
_sendQueue.pop_back();
100+
_sendQueueMutex.unlock();
101+
_doWrite(task->data, task->len, task->addr, task->port);
102+
_sendQueueMutex.lock();
103+
}
104+
_sendQueueMutex.unlock();
105+
};
106+
107+
void _asyncudp_send_cb(uv_udp_send_t *req, int status) {
108+
free(req);
109+
};
110+
111+
void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
112+
// FIXME: don't do bytes → string → bytes IP conversion
113+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
114+
char addr_str[maxIpLength+1]; // +1 for null terminator
115+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
116+
addr_str[maxIpLength] = '\0';
117+
118+
// FIXME: implement error handling rather than raising SIGSEGV
119+
struct sockaddr uvAddr;
120+
if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) {
121+
raise(SIGSEGV);
122+
}
123+
124+
uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t));
125+
uv_buf_t msg;
126+
msg.base = (char *)data;
127+
msg.len = len;
128+
if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) {
129+
raise(SIGSEGV);
130+
}
131+
};

cores/portduino/AsyncUDP.h

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// FIXME: this is a really hacky implementation that just has the things required to make the firmware compile and run.
2+
3+
#ifndef ESPASYNCUDP_H
4+
#define ESPASYNCUDP_H
5+
6+
#include <signal.h>
7+
#include "IPAddress.h"
8+
#include "Print.h"
9+
#include <functional>
10+
#include <mutex>
11+
#include <memory>
12+
#include <thread>
13+
#include <uv.h>
14+
15+
class AsyncUDP;
16+
17+
class AsyncUDPPacket final
18+
{
19+
private:
20+
uint8_t *_data;
21+
size_t _len;
22+
23+
protected:
24+
AsyncUDPPacket(uint8_t* byte, size_t len) {
25+
_data = byte;
26+
_len = len;
27+
};
28+
29+
public:
30+
uint8_t * data() {
31+
return _data;
32+
};
33+
size_t length() {
34+
return _len;
35+
};
36+
37+
friend AsyncUDP;
38+
};
39+
40+
class asyncUDPSendTask final {
41+
protected:
42+
uint8_t *data;
43+
size_t len;
44+
IPAddress addr;
45+
uint16_t port;
46+
47+
public:
48+
asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) {
49+
this->data = (uint8_t*)malloc(len);
50+
memcpy(this->data, data, len);
51+
this->len = len;
52+
this->addr = addr;
53+
this->port = port;
54+
};
55+
56+
~asyncUDPSendTask() {
57+
free(data);
58+
};
59+
60+
friend AsyncUDP;
61+
};
62+
63+
typedef std::function<void(AsyncUDPPacket& packet)> AuPacketHandlerFunction;
64+
typedef std::function<void(void * arg, AsyncUDPPacket& packet)> AuPacketHandlerFunctionWithArg;
65+
66+
class AsyncUDP final
67+
{
68+
private:
69+
// use atomic AuPacketHandlerFunction _handler
70+
std::mutex _handlerMutex;
71+
AuPacketHandlerFunction _handler;
72+
73+
std::mutex _sendQueueMutex;
74+
// the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback
75+
std::vector<std::unique_ptr<asyncUDPSendTask>> _sendQueue;
76+
77+
std::thread _ioThread;
78+
79+
bool _connected;
80+
IPAddress _listenIP;
81+
82+
uv_loop_t _loop;
83+
uv_udp_t _socket;
84+
uv_async_t _async;
85+
86+
public:
87+
AsyncUDP();
88+
~AsyncUDP() {
89+
raise(SIGSEGV); // FIXME: implement closing and teardown
90+
}
91+
92+
void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) {
93+
onPacket(std::bind(cb, arg, std::placeholders::_1));
94+
};
95+
void onPacket(AuPacketHandlerFunction cb) {
96+
_handlerMutex.lock();
97+
_handler = cb;
98+
_handlerMutex.unlock();
99+
};
100+
101+
bool listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl=1);
102+
103+
size_t writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
104+
105+
IPAddress listenIP() {
106+
return _listenIP;
107+
};
108+
operator bool() {
109+
return _connected;
110+
};
111+
112+
// do not call, used internally as callback from libuv's C callback
113+
void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
114+
void _DO_NOT_CALL_async_cb();
115+
116+
private:
117+
void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
118+
};
119+
120+
#endif

0 commit comments

Comments
 (0)