Skip to content

Commit

Permalink
Merge from #5 @xiaosuo
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 2, 2018
2 parents 7a74a6a + 50912bf commit 14a0d65
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 42 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
state-threads
=============

Fork from http://sourceforge.net/projects/state-threads, patched for [SRS](https://github.com/ossrs/srs/tree/2.0release).

Expand Down
2 changes: 2 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ typedef struct _st_eventsys_ops {
int (*fd_new)(int); /* New descriptor allocated */
int (*fd_close)(int); /* Descriptor closed */
int (*fd_getlimit)(void); /* Descriptor hard limit */
int (*pollq_add)(_st_pollq_t *pq);
void (*pollq_del)(_st_pollq_t *pq);
} _st_eventsys_t;


Expand Down
189 changes: 148 additions & 41 deletions event.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ typedef struct _epoll_fd_data {
int wr_ref_cnt;
int ex_ref_cnt;
int revents;
/* The following members aren't touched after forking. */
union {
_st_pollq_t *pq;
_st_pollq_t **pqs;
};
int pq_cnt;
} _epoll_fd_data_t;

static struct _st_epolldata {
Expand Down Expand Up @@ -1194,16 +1200,97 @@ ST_HIDDEN int _st_epoll_pollset_add(struct pollfd *pds, int npds)
return 0;
}

ST_HIDDEN void _st_epoll_pollq_del(_st_pollq_t *pq)
{
struct pollfd *pd = pq->pds;
struct pollfd *pd_end = pd + pq->npds;
_epoll_fd_data_t *efd;
int i;

while (pd < pd_end) {
efd = &_st_epoll_data->fd_data[pd->fd];
if (efd->pq_cnt == 1) {
if (efd->pq == pq)
efd->pq = NULL;
} else if (efd->pq_cnt > 0) {
for (i = 0; i < efd->pq_cnt; ++i) {
if (efd->pqs[i] == pq) {
efd->pqs[i] = NULL;
break;
}
}
}
++pd;
}
}

ST_HIDDEN int _st_epoll_pollq_add(_st_pollq_t *pq)
{
struct pollfd *pd = pq->pds;
struct pollfd *pd_end = pd + pq->npds;
_epoll_fd_data_t *efd;
int i;
_st_pollq_t **pqs;

while (pd < pd_end) {
efd = &_st_epoll_data->fd_data[pd->fd];
if (efd->pq_cnt == 0) {
efd->pq = pq;
efd->pq_cnt = 1;
} else if (efd->pq_cnt == 1) {
if (efd->pq == NULL) {
efd->pq = pq;
} else {
assert(efd->pq != pq);
pqs = malloc(sizeof(*pqs) * 2);
if (!pqs) {
_st_epoll_pollq_del(pq);
errno = ENOMEM;
return -1;
}
pqs[0] = efd->pq;
pqs[1] = pq;
efd->pqs = pqs;
efd->pq_cnt = 2;
}
} else {
for (i = 0; i < efd->pq_cnt; ++i) {
if (efd->pqs[i] == NULL) {
efd->pqs[i] = pq;
break;
} else {
assert(efd->pqs[i] != pq);
}
}
if (i == efd->pq_cnt) {
pqs = realloc(efd->pqs, sizeof(*pqs) * (efd->pq_cnt + 1));
if (!pqs) {
_st_epoll_pollq_del(pq);
errno = ENOMEM;
return -1;
}
efd->pqs = pqs;
efd->pqs[efd->pq_cnt++] = pq;
}
}
++pd;
}

return 0;
}

ST_HIDDEN void _st_epoll_dispatch(void)
{
st_utime_t min_timeout;
_st_clist_t *q;
_st_pollq_t *pq;
struct pollfd *pds, *epds;
struct epoll_event ev;
int timeout, nfd, i, osfd, notify;
int timeout, nfd, i, j, osfd, notify;
int events, op;
short revents;
_epoll_fd_data_t *efd;
_st_pollq_t **pqs;

if (_ST_SLEEPQ == NULL) {
timeout = -1;
Expand All @@ -1224,7 +1311,10 @@ ST_HIDDEN void _st_epoll_dispatch(void)
_st_epoll_data->pid = getpid();

/* Put all descriptors on ioq into new epoll set */
memset(_st_epoll_data->fd_data, 0, _st_epoll_data->fd_data_size * sizeof(_epoll_fd_data_t));
for (i = 0; i < _st_epoll_data->fd_data_size; ++i) {
memset(&_st_epoll_data->fd_data[i], 0,
offsetof(_epoll_fd_data_t, pq));
}
_st_epoll_data->evtlist_cnt = 0;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
Expand All @@ -1245,48 +1335,63 @@ ST_HIDDEN void _st_epoll_dispatch(void)
}
}

for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;

for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
for (i = 0; i < nfd; ++i) {
osfd = _st_epoll_data->evtlist[i].data.fd;
efd = &_st_epoll_data->fd_data[osfd];
assert(efd->pq_cnt > 0);
if (efd->pq_cnt == 1)
pqs = &efd->pq;
else
pqs = efd->pqs;
for (j = 0; j < efd->pq_cnt; ++j) {
pq = pqs[j];
if (!pq)
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;
notify = 0;
epds = pq->pds + pq->npds;

pds->revents = revents;
if (revents) {
notify = 1;
for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
if ((events & POLLOUT) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
if ((events & POLLPRI) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;

pds->revents = revents;
if (revents) {
notify = 1;
}
}
}
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);
if (notify) {
_st_epoll_pollq_del(pq);
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);

if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
}
}

Expand Down Expand Up @@ -1353,7 +1458,9 @@ static _st_eventsys_t _st_epoll_eventsys = {
_st_epoll_pollset_del,
_st_epoll_fd_new,
_st_epoll_fd_close,
_st_epoll_fd_getlimit
_st_epoll_fd_getlimit,
_st_epoll_pollq_add,
_st_epoll_pollq_del
};
#endif /* MD_HAVE_EPOLL */

Expand Down
4 changes: 4 additions & 0 deletions sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
if (*_st_eventsys->pollq_add && (*_st_eventsys->pollq_add)(&pq))
return -1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
Expand All @@ -92,6 +94,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)

n = 0;
if (pq.on_ioq) {
if (*_st_eventsys->pollq_del)
(*_st_eventsys->pollq_del)(&pq);
/* If we timed out, the pollq might still be on the ioq. Remove it */
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
Expand Down

0 comments on commit 14a0d65

Please sign in to comment.