From b8194f1e7d94c351dbf6281dcfcb84e8eba1abac Mon Sep 17 00:00:00 2001 From: Ryan Bullock Date: Fri, 3 Apr 2020 11:37:49 -0700 Subject: [PATCH 1/3] Enables the use of EPOLLEXCLUSIVE for UDP workers where supported. Normal behaviour for multiple processes using EPOLL to listen on a single FD is for every process to be woken on every IO event. This can cause a thundering herd effect, increasing context switches and cpu usage. With EPOLLEXLUSIVE only a single UDP worker will be woken to handle an IO request greatly reducing context switching and contention, especially as the number of processes grow. One potential downside to using EPOLLEXCLUSIVE is that EPOLL may coalesce multiple events on a file descriptor into a single wakeup. This has the potential to increase latency if only a single process is woken to handle potentially multiple SIP messages. To help balance latency and reduced thundering this patch causes the first worker for a socket to not use EPOLLEXCLUSIVE and thus ALWAYS get woken for events. If present, at least one other worker using EPOLLEXCLUSIVE will also be woken. --- io_wait.h | 14 ++++---------- net/net_udp.c | 23 +++++++++++++++++------ reactor_defs.h | 9 ++++++--- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/io_wait.h b/io_wait.h index 47255440998..37d56a9a83f 100644 --- a/io_wait.h +++ b/io_wait.h @@ -347,7 +347,8 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto] void* data, int prio, unsigned int timeout, - int flags) + int flags, + int exclusive) { /* helper macros */ @@ -506,16 +507,9 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto] ep_event.events|=EPOLLOUT; if (!already) { again1: -#if 0 -/* This is currently broken, because when using EPOLLEXCLUSIVE, the OS will - * send sequential events to the same process - thus our pseudo-dispatcher - * will no longer work, since events on a pipe will be queued by a single - * process. - razvanc - */ -#if (defined __OS_linux) && (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 24) - if (e->flags & IO_WATCH_READ) +#ifdef EPOLLEXCLUSIVE + if (e->flags & IO_WATCH_READ && exclusive == 1) ep_event.events|=EPOLLEXCLUSIVE; -#endif #endif n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event); if (n==-1){ diff --git a/net/net_udp.c b/net/net_udp.c index b9194a6b79f..4024360667f 100644 --- a/net/net_udp.c +++ b/net/net_udp.c @@ -331,7 +331,7 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) } -int udp_proc_reactor_init( struct socket_info *si ) +int udp_proc_reactor_init( struct socket_info *si, int si_rank ) { /* create the reactor for UDP proc */ @@ -359,9 +359,18 @@ int udp_proc_reactor_init( struct socket_info *si ) } /* init: start watching the SIP UDP fd */ - if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { - LM_CRIT("failed to add UDP listen socket to reactor\n"); - goto error; + //First child per socket becomes 'Master', will wake on every event + if (si_rank == 0) { + if (reactor_add_reader( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { + LM_CRIT("failed to add UDP listen socket to reactor\n"); + goto error; + } + } else { + //Subsequent processes are helpers, only one should be woken to help at a time + if (reactor_add_reader_exclusive( si->socket, F_UDP_READ, RCT_PRIO_NET, si)<0) { + LM_CRIT("failed to add UDP listen socket to reactor\n"); + goto error; + } } return 0; @@ -389,7 +398,9 @@ static int fork_dynamic_udp_process(void *si_filter) bind_address=si; /* shortcut */ /* we first need to init the reactor to be able to add fd * into it in child_init routines */ - if (udp_proc_reactor_init(si) < 0 || + /* Since this is in addition to the master process, si_rank should be > 0 to enable + * exlusive polling with EPOLL */ + if (udp_proc_reactor_init(si, 1) < 0 || init_child(10000/*FIXME*/) < 0) { goto error; } @@ -486,7 +497,7 @@ int udp_start_processes(int *chd_rank, int *startup_done) bind_address=si; /* shortcut */ /* we first need to init the reactor to be able to add fd * into it in child_init routines */ - if (udp_proc_reactor_init(si) < 0 || + if (udp_proc_reactor_init(si, i) < 0 || init_child(*chd_rank) < 0) { report_failure_status(); if (*chd_rank == 1 && startup_done) diff --git a/reactor_defs.h b/reactor_defs.h index 6c167fa56fa..ef92fac8796 100644 --- a/reactor_defs.h +++ b/reactor_defs.h @@ -78,13 +78,16 @@ int init_reactor_size(void); init_io_wait(&_worker_io, _name, reactor_size, io_poll_method, _prio_max) #define reactor_add_reader( _fd, _type, _prio, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 0) + +#define reactor_add_reader_exclusive( _fd, _type, _prio, _data) \ + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ, 1) #define reactor_add_reader_with_timeout( _fd, _type, _prio, _t, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ, 0) #define reactor_add_writer( _fd, _type, _prio, _data) \ - io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE) + io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE, 0) #define reactor_del_reader( _fd, _idx, _io_flags) \ io_watch_del(&_worker_io, _fd, _idx, _io_flags, IO_WATCH_READ) From 950a6b62639506308d610e8e0d7770e9f9925739 Mon Sep 17 00:00:00 2001 From: Ryan Bullock Date: Fri, 3 Apr 2020 11:52:06 -0700 Subject: [PATCH 2/3] This change causes epoll_wait to only return events for a single FD on everycall. With the use of EPOLLEXCLUSIVE we want to ensure that a single process is not being woke to handle multiple FDs at once. This allows EPOLL to better distribute wake-ups to processes that are actually ready to run. Without this a single process may be woken to handle IO for multiple FDs, while other processes are available waiting for work. --- io_wait_loop.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/io_wait_loop.h b/io_wait_loop.h index d07a3be5163..cd6e296a438 100644 --- a/io_wait_loop.h +++ b/io_wait_loop.h @@ -175,7 +175,14 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat) unsigned int curr_time; again: +#ifdef EPOLLEXCLUSIVE + /* When using EPOLLEXCLUSIVE we don't want a single wakeup to handle multiple fds at once + as it could introduce latency in handling requests. + Limit each wakeup to handling events from a single fd */ + ret=n=epoll_wait(h->epfd, h->ep_array, 1, t*1000); +#else ret=n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000); +#endif if (n==-1){ if (errno == EINTR) { goto again; /* signal, ignore it */ From d9e3fbe59cebc744fc657352f47b86b70669baaa Mon Sep 17 00:00:00 2001 From: Ryan Bullock Date: Fri, 3 Apr 2020 12:24:39 -0700 Subject: [PATCH 3/3] Optimize IO handling by draining FDs on every wake-up. A single IO wake-up can correspond to multiple actual IO events/waiting IO. Currently, after handling a single event we go back to waiting on the FD, where we will be immediatly woke again because of the already waiting IO. This increases context switches and can increase latency. By handling all the IO possible on every wakeup before waiting again we can reduce both of these. --- ipc.c | 37 +++++++++++++++++---------------- net/net_udp.c | 11 ++++++++-- net/proto_udp/proto_udp.c | 6 +++--- timer.c | 43 +++++++++++++++++++++------------------ 4 files changed, 55 insertions(+), 42 deletions(-) diff --git a/ipc.c b/ipc.c index 05869da40ef..601b1225711 100644 --- a/ipc.c +++ b/ipc.c @@ -278,26 +278,29 @@ void ipc_handle_job(int fd) ipc_job job; int n; - /* read one IPC job from the pipe; even if the read is blocking, - * we are here triggered from the reactor, on a READ event, so - * we shouldn;t ever block */ - n = read(fd, &job, sizeof(job) ); - if (n==-1) { - if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + //Process all jobs until handle is drained + while (1) { + /* read one IPC job from the pipe; even if the read is blocking, + * we are here triggered from the reactor, on a READ event, so + * we shouldn;t ever block */ + n = read(fd, &job, sizeof(job) ); + if (n==-1) { + if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + return; + LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); return; - LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); - return; - } + } - LM_DBG("received job type %d[%s] from process %d\n", - job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc); + LM_DBG("received job type %d[%s] from process %d\n", + job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc); - /* custom handling for RPC type */ - if (job.handler_type==ipc_rpc_type) { - ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2); - } else { - /* generic registered type */ - ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1); + /* custom handling for RPC type */ + if (job.handler_type==ipc_rpc_type) { + ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2); + } else { + /* generic registered type */ + ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1); + } } return; diff --git a/net/net_udp.c b/net/net_udp.c index 4024360667f..50b7eb17b05 100644 --- a/net/net_udp.c +++ b/net/net_udp.c @@ -292,8 +292,11 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) switch(fm->type){ case F_UDP_READ: - n = protos[((struct socket_info*)fm->data)->proto].net. - read( fm->data /*si*/, &read); + do { + n = protos[((struct socket_info*)fm->data)->proto].net. + read( fm->data /*si*/, &read); + //Continue reading packets until we get an error + } while (n == 0); break; case F_TIMER_JOB: handle_timer_job(); @@ -327,6 +330,10 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type) post_run_handle_script_reload(); pt_become_idle(); + + if (n == 1) { + n = 0; + } return n; } diff --git a/net/proto_udp/proto_udp.c b/net/proto_udp/proto_udp.c index 8f2c4cabdd7..56ad3f3b62b 100644 --- a/net/proto_udp/proto_udp.c +++ b/net/proto_udp/proto_udp.c @@ -135,9 +135,9 @@ static int udp_read_req(struct socket_info *si, int* bytes_read) /* coverity[overrun-buffer-arg: FALSE] - union has 28 bytes, CID #200029 */ len=recvfrom(bind_address->socket, buf, BUF_SIZE,0,&ri.src_su.s,&fromlen); if (len==-1){ - if (errno==EAGAIN) - return 0; - if ((errno==EINTR)||(errno==EWOULDBLOCK)|| (errno==ECONNREFUSED)) + if (errno==EAGAIN || errno==EWOULDBLOCK || errno==EINTR) + return 1; + if (errno==ECONNREFUSED) return -1; LM_ERR("recvfrom:[%d] %s\n", errno, strerror(errno)); return -2; diff --git a/timer.c b/timer.c index d3970ffd535..32c223421bd 100644 --- a/timer.c +++ b/timer.c @@ -840,32 +840,35 @@ void handle_timer_job(void) struct os_timer *t; ssize_t l; - /* read one "os_timer" pointer from the pipe (non-blocking) */ - l = read( timer_fd_out, &t, sizeof(t) ); - if (l==-1) { - if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + /* Read events until epipe is empty */ + while(1) { + /* read one "os_timer" pointer from the pipe (non-blocking) */ + l = read( timer_fd_out, &t, sizeof(t) ); + if (l==-1) { + if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) + return; + LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); return; - LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); - return; - } + } - /* run the handler */ - if (t->flags&TIMER_FLAG_IS_UTIMER) { + /* run the handler */ + if (t->flags&TIMER_FLAG_IS_UTIMER) { - if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) - LM_WARN("utimer job <%s> has a %lld us delay in execution\n", - t->label, *ijiffies-t->trigger_time); - t->u.utimer_f( t->time , t->t_param); - t->trigger_time = 0; + if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) + LM_WARN("utimer job <%s> has a %lld us delay in execution\n", + t->label, *ijiffies-t->trigger_time); + t->u.utimer_f( t->time , t->t_param); + t->trigger_time = 0; - } else { + } else { - if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) - LM_WARN("timer job <%s> has a %lld us delay in execution\n", - t->label, *ijiffies-t->trigger_time); - t->u.timer_f( (unsigned int)t->time , t->t_param); - t->trigger_time = 0; + if (t->trigger_time<(*ijiffies-ITIMER_TICK) ) + LM_WARN("timer job <%s> has a %lld us delay in execution\n", + t->label, *ijiffies-t->trigger_time); + t->u.timer_f( (unsigned int)t->time , t->t_param); + t->trigger_time = 0; + } } return;