diff --git a/README.md b/README.md index 85bb2fa..21bfc37 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ state-threads -============= Fork from http://sourceforge.net/projects/state-threads, patched for [SRS](https://github.com/ossrs/srs/tree/2.0release). diff --git a/common.h b/common.h index 0c0685b..7fb6029 100644 --- a/common.h +++ b/common.h @@ -228,6 +228,8 @@ typedef struct _st_eventsys_ops { int (*fd_new)(int); /* New descriptor allocated */ int (*fd_close)(int); /* Descriptor closed */ int (*fd_getlimit)(void); /* Descriptor hard limit */ + int (*pollq_add)(_st_pollq_t *pq); + void (*pollq_del)(_st_pollq_t *pq); } _st_eventsys_t; diff --git a/event.c b/event.c index 142882a..70caa27 100644 --- a/event.c +++ b/event.c @@ -119,6 +119,12 @@ typedef struct _epoll_fd_data { int wr_ref_cnt; int ex_ref_cnt; int revents; + /* The following members aren't touched after forking. */ + union { + _st_pollq_t *pq; + _st_pollq_t **pqs; + }; + int pq_cnt; } _epoll_fd_data_t; static struct _st_epolldata { @@ -1194,6 +1200,85 @@ ST_HIDDEN int _st_epoll_pollset_add(struct pollfd *pds, int npds) return 0; } +ST_HIDDEN void _st_epoll_pollq_del(_st_pollq_t *pq) +{ + struct pollfd *pd = pq->pds; + struct pollfd *pd_end = pd + pq->npds; + _epoll_fd_data_t *efd; + int i; + + while (pd < pd_end) { + efd = &_st_epoll_data->fd_data[pd->fd]; + if (efd->pq_cnt == 1) { + if (efd->pq == pq) + efd->pq = NULL; + } else if (efd->pq_cnt > 0) { + for (i = 0; i < efd->pq_cnt; ++i) { + if (efd->pqs[i] == pq) { + efd->pqs[i] = NULL; + break; + } + } + } + ++pd; + } +} + +ST_HIDDEN int _st_epoll_pollq_add(_st_pollq_t *pq) +{ + struct pollfd *pd = pq->pds; + struct pollfd *pd_end = pd + pq->npds; + _epoll_fd_data_t *efd; + int i; + _st_pollq_t **pqs; + + while (pd < pd_end) { + efd = &_st_epoll_data->fd_data[pd->fd]; + if (efd->pq_cnt == 0) { + efd->pq = pq; + efd->pq_cnt = 1; + } else if (efd->pq_cnt == 1) { + if (efd->pq == NULL) { + efd->pq = pq; + } else { + assert(efd->pq != pq); + pqs = malloc(sizeof(*pqs) * 2); + if (!pqs) { + _st_epoll_pollq_del(pq); + errno = ENOMEM; + return -1; + } + pqs[0] = efd->pq; + pqs[1] = pq; + efd->pqs = pqs; + efd->pq_cnt = 2; + } + } else { + for (i = 0; i < efd->pq_cnt; ++i) { + if (efd->pqs[i] == NULL) { + efd->pqs[i] = pq; + break; + } else { + assert(efd->pqs[i] != pq); + } + } + if (i == efd->pq_cnt) { + pqs = realloc(efd->pqs, sizeof(*pqs) * (efd->pq_cnt + 1)); + if (!pqs) { + _st_epoll_pollq_del(pq); + errno = ENOMEM; + return -1; + } + efd->pqs = pqs; + efd->pqs[efd->pq_cnt++] = pq; + } + } + ++pd; + } + + return 0; +} + ST_HIDDEN void _st_epoll_dispatch(void) { st_utime_t min_timeout; @@ -1201,9 +1286,11 @@ ST_HIDDEN void _st_epoll_dispatch(void) _st_pollq_t *pq; struct pollfd *pds, *epds; struct epoll_event ev; - int timeout, nfd, i, osfd, notify; + int timeout, nfd, i, j, osfd, notify; int events, op; short revents; + _epoll_fd_data_t *efd; + _st_pollq_t **pqs; if (_ST_SLEEPQ == NULL) { timeout = -1; @@ -1224,7 +1311,10 @@ ST_HIDDEN void _st_epoll_dispatch(void) _st_epoll_data->pid = getpid(); /* Put all descriptors on ioq into new epoll set */ - memset(_st_epoll_data->fd_data, 0, _st_epoll_data->fd_data_size * sizeof(_epoll_fd_data_t)); + for (i = 0; i < _st_epoll_data->fd_data_size; ++i) { + memset(&_st_epoll_data->fd_data[i], 0, + offsetof(_epoll_fd_data_t, pq)); + } _st_epoll_data->evtlist_cnt = 0; for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) { pq = _ST_POLLQUEUE_PTR(q); @@ -1245,48 +1335,63 @@ ST_HIDDEN void _st_epoll_dispatch(void) } } - for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) { - pq = _ST_POLLQUEUE_PTR(q); - notify = 0; - epds = pq->pds + pq->npds; - - for (pds = pq->pds; pds < epds; pds++) { - if (_ST_EPOLL_REVENTS(pds->fd) == 0) { - pds->revents = 0; + for (i = 0; i < nfd; ++i) { + osfd = _st_epoll_data->evtlist[i].data.fd; + efd = &_st_epoll_data->fd_data[osfd]; + assert(efd->pq_cnt > 0); + if (efd->pq_cnt == 1) + pqs = &efd->pq; + else + pqs = efd->pqs; + for (j = 0; j < efd->pq_cnt; ++j) { + pq = pqs[j]; + if (!pq) continue; - } - osfd = pds->fd; - events = pds->events; - revents = 0; - if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN)) - revents |= POLLIN; - if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT)) - revents |= POLLOUT; - if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI)) - revents |= POLLPRI; - if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR) - revents |= POLLERR; - if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP) - revents |= POLLHUP; + notify = 0; + epds = pq->pds + pq->npds; - pds->revents = revents; - if (revents) { - notify = 1; + for (pds = pq->pds; pds < epds; pds++) { + if (_ST_EPOLL_REVENTS(pds->fd) == 0) { + pds->revents = 0; + continue; + } + osfd = pds->fd; + events = pds->events; + revents = 0; + if ((events & POLLIN) && + (_ST_EPOLL_REVENTS(osfd) & EPOLLIN)) + revents |= POLLIN; + if ((events & POLLOUT) && + (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT)) + revents |= POLLOUT; + if ((events & POLLPRI) && + (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI)) + revents |= POLLPRI; + if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR) + revents |= POLLERR; + if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP) + revents |= POLLHUP; + + pds->revents = revents; + if (revents) { + notify = 1; + } } - } - if (notify) { - ST_REMOVE_LINK(&pq->links); - pq->on_ioq = 0; - /* - * Here we will only delete/modify descriptors that - * didn't fire (see comments in _st_epoll_pollset_del()). - */ - _st_epoll_pollset_del(pq->pds, pq->npds); + if (notify) { + _st_epoll_pollq_del(pq); + ST_REMOVE_LINK(&pq->links); + pq->on_ioq = 0; + /* + * Here we will only delete/modify descriptors that + * didn't fire (see comments in _st_epoll_pollset_del()). + */ + _st_epoll_pollset_del(pq->pds, pq->npds); - if (pq->thread->flags & _ST_FL_ON_SLEEPQ) - _ST_DEL_SLEEPQ(pq->thread); - pq->thread->state = _ST_ST_RUNNABLE; - _ST_ADD_RUNQ(pq->thread); + if (pq->thread->flags & _ST_FL_ON_SLEEPQ) + _ST_DEL_SLEEPQ(pq->thread); + pq->thread->state = _ST_ST_RUNNABLE; + _ST_ADD_RUNQ(pq->thread); + } } } @@ -1353,7 +1458,9 @@ static _st_eventsys_t _st_epoll_eventsys = { _st_epoll_pollset_del, _st_epoll_fd_new, _st_epoll_fd_close, - _st_epoll_fd_getlimit + _st_epoll_fd_getlimit, + _st_epoll_pollq_add, + _st_epoll_pollq_del }; #endif /* MD_HAVE_EPOLL */ diff --git a/sched.c b/sched.c index 8751582..9c6f52e 100644 --- a/sched.c +++ b/sched.c @@ -83,6 +83,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout) pq.npds = npds; pq.thread = me; pq.on_ioq = 1; + if (*_st_eventsys->pollq_add && (*_st_eventsys->pollq_add)(&pq)) + return -1; _ST_ADD_IOQ(pq); if (timeout != ST_UTIME_NO_TIMEOUT) _ST_ADD_SLEEPQ(me, timeout); @@ -92,6 +94,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout) n = 0; if (pq.on_ioq) { + if (*_st_eventsys->pollq_del) + (*_st_eventsys->pollq_del)(&pq); /* If we timed out, the pollq might still be on the ioq. Remove it */ _ST_DEL_IOQ(pq); (*_st_eventsys->pollset_del)(pds, npds);