watcher: Use poll(2) instead of select

This commit is contained in:
Martin Willi 2014-11-05 17:31:17 +01:00
parent 10743ac9d6
commit 03bccc240c

View File

@ -24,9 +24,6 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#ifndef WIN32
#include <sys/select.h>
#endif
#include <fcntl.h> #include <fcntl.h>
typedef struct private_watcher_t private_watcher_t; typedef struct private_watcher_t private_watcher_t;
@ -244,6 +241,23 @@ static void activate_all(private_watcher_t *this)
this->mutex->unlock(this->mutex); this->mutex->unlock(this->mutex);
} }
/**
* Find flagged revents in a pollfd set by fd
*/
static int find_revents(struct pollfd *pfd, int count, int fd)
{
int i;
for (i = 0; i < count; i++)
{
if (pfd[i].fd == fd)
{
return pfd[i].revents;
}
}
return 0;
}
/** /**
* Dispatching function * Dispatching function
*/ */
@ -251,17 +265,14 @@ static job_requeue_t watch(private_watcher_t *this)
{ {
enumerator_t *enumerator; enumerator_t *enumerator;
entry_t *entry; entry_t *entry;
fd_set rd, wr, ex; struct pollfd *pfd;
int maxfd = 0, res; int count = 0, res;
bool rebuild = FALSE; bool rebuild = FALSE;
FD_ZERO(&rd);
FD_ZERO(&wr);
FD_ZERO(&ex);
this->mutex->lock(this->mutex); this->mutex->lock(this->mutex);
if (this->fds->get_count(this->fds) == 0) count = this->fds->get_count(this->fds);
if (count == 0)
{ {
this->state = WATCHER_STOPPED; this->state = WATCHER_STOPPED;
this->mutex->unlock(this->mutex); this->mutex->unlock(this->mutex);
@ -272,33 +283,34 @@ static job_requeue_t watch(private_watcher_t *this)
this->state = WATCHER_RUNNING; this->state = WATCHER_RUNNING;
} }
if (this->notify[0] != -1) pfd = alloca(sizeof(*pfd) * (count + 1));
{ pfd[0].fd = this->notify[0];
FD_SET(this->notify[0], &rd); pfd[0].events = POLLIN;
maxfd = this->notify[0]; count = 1;
}
enumerator = this->fds->create_enumerator(this->fds); enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry)) while (enumerator->enumerate(enumerator, &entry))
{ {
if (!entry->in_callback) if (!entry->in_callback)
{ {
pfd[count].fd = entry->fd;
pfd[count].events = 0;
if (entry->events & WATCHER_READ) if (entry->events & WATCHER_READ)
{ {
DBG3(DBG_JOB, " watching %d for reading", entry->fd); DBG3(DBG_JOB, " watching %d for reading", entry->fd);
FD_SET(entry->fd, &rd); pfd[count].events |= POLLIN;
} }
if (entry->events & WATCHER_WRITE) if (entry->events & WATCHER_WRITE)
{ {
DBG3(DBG_JOB, " watching %d for writing", entry->fd); DBG3(DBG_JOB, " watching %d for writing", entry->fd);
FD_SET(entry->fd, &wr); pfd[count].events |= POLLOUT;
} }
if (entry->events & WATCHER_EXCEPT) if (entry->events & WATCHER_EXCEPT)
{ {
DBG3(DBG_JOB, " watching %d for exceptions", entry->fd); DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
FD_SET(entry->fd, &ex); pfd[count].events |= POLLERR;
} }
maxfd = max(maxfd, entry->fd); count++;
} }
} }
enumerator->destroy(enumerator); enumerator->destroy(enumerator);
@ -306,6 +318,7 @@ static job_requeue_t watch(private_watcher_t *this)
while (!rebuild) while (!rebuild)
{ {
int revents;
char buf[1]; char buf[1];
bool old; bool old;
ssize_t len; ssize_t len;
@ -315,13 +328,13 @@ static job_requeue_t watch(private_watcher_t *this)
thread_cleanup_push((void*)activate_all, this); thread_cleanup_push((void*)activate_all, this);
old = thread_cancelability(TRUE); old = thread_cancelability(TRUE);
res = select(maxfd + 1, &rd, &wr, &ex, NULL); res = poll(pfd, count, -1);
thread_cancelability(old); thread_cancelability(old);
thread_cleanup_pop(FALSE); thread_cleanup_pop(FALSE);
if (res > 0) if (res > 0)
{ {
if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd)) if (pfd[0].revents & POLLIN)
{ {
while (TRUE) while (TRUE)
{ {
@ -354,17 +367,18 @@ static job_requeue_t watch(private_watcher_t *this)
rebuild = TRUE; rebuild = TRUE;
break; break;
} }
if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ)) revents = find_revents(pfd, count, entry->fd);
if ((revents & POLLIN) && (entry->events & WATCHER_READ))
{ {
DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd); DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
notify(this, entry, WATCHER_READ); notify(this, entry, WATCHER_READ);
} }
if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE)) if ((revents & POLLOUT) && (entry->events & WATCHER_WRITE))
{ {
DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd); DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
notify(this, entry, WATCHER_WRITE); notify(this, entry, WATCHER_WRITE);
} }
if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT)) if ((revents & POLLERR) && (entry->events & WATCHER_EXCEPT))
{ {
DBG2(DBG_JOB, "watched FD %d has exception", entry->fd); DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
notify(this, entry, WATCHER_EXCEPT); notify(this, entry, WATCHER_EXCEPT);