Handle multiple lookip connections using a single FDSET

This commit is contained in:
Martin Willi 2012-10-09 10:03:15 +02:00
parent 28683ef137
commit 31576ceddf

View File

@ -57,6 +57,11 @@ struct private_lookip_socket_t {
*/ */
linked_list_t *registered; linked_list_t *registered;
/**
* List of connected clients, as uintptr_t FD
*/
linked_list_t *connected;
/** /**
* Mutex to lock clients list * Mutex to lock clients list
*/ */
@ -223,73 +228,179 @@ static void subscribe(private_lookip_socket_t *this, int fd, int type)
} }
/** /**
* Accept client connections, dispatch * Check if a client is subscribed for notifications
*/ */
static job_requeue_t receive(private_lookip_socket_t *this) static bool subscribed(private_lookip_socket_t *this, int fd)
{
enumerator_t *enumerator;
bool subscribed = FALSE;
entry_t *entry;
this->mutex->lock(this->mutex);
enumerator = this->registered->create_enumerator(this->registered);
while (enumerator->enumerate(enumerator, &entry))
{
if (entry->fd == fd)
{
subscribed = TRUE;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
return subscribed;
}
/**
* Create a fd_set from all bound sockets
*/
static int build_fds(private_lookip_socket_t *this, fd_set *fds)
{
enumerator_t *enumerator;
uintptr_t fd;
int maxfd;
FD_ZERO(fds);
FD_SET(this->socket, fds);
maxfd = this->socket;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &fd))
{
FD_SET(fd, fds);
maxfd = max(maxfd, fd);
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
return maxfd + 1;
}
/**
* Find the socket select()ed
*/
static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
{
enumerator_t *enumerator;
uintptr_t fd;
int selected = -1;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &fd))
{
if (FD_ISSET(fd, fds))
{
selected = fd;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
return selected;
}
/**
* Dispatch from a socket, return TRUE to end communication
*/
static bool dispatch(private_lookip_socket_t *this, int fd)
{ {
struct sockaddr_un addr;
int fd, len = sizeof(addr);
lookip_request_t req; lookip_request_t req;
bool oldstate, subscribed = FALSE; int len;
oldstate = thread_cancelability(TRUE);
fd = accept(this->socket, (struct sockaddr*)&addr, &len);
thread_cancelability(oldstate);
if (fd != -1)
{
while (TRUE)
{
oldstate = thread_cancelability(TRUE);
len = recv(fd, &req, sizeof(req), 0); len = recv(fd, &req, sizeof(req), 0);
thread_cancelability(oldstate); if (len != sizeof(req))
if (len == sizeof(req))
{
switch (req.type)
{
case LOOKIP_LOOKUP:
query(this, fd, &req);
continue;
case LOOKIP_DUMP:
query(this, fd, NULL);
continue;
case LOOKIP_REGISTER_UP:
subscribe(this, fd, LOOKIP_NOTIFY_UP);
subscribed = TRUE;
continue;
case LOOKIP_REGISTER_DOWN:
subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
subscribed = TRUE;
continue;
case LOOKIP_END:
break;
default:
DBG1(DBG_CFG, "received unknown lookip command");
break;
}
}
else
{ {
if (len != 0) if (len != 0)
{ {
DBG1(DBG_CFG, "receiving lookip request failed: %s", DBG1(DBG_CFG, "receiving lookip request failed: %s",
strerror(errno)); strerror(errno));
} }
return TRUE;
}
switch (req.type)
{
case LOOKIP_LOOKUP:
query(this, fd, &req);
return FALSE;
case LOOKIP_DUMP:
query(this, fd, NULL);
return FALSE;
case LOOKIP_REGISTER_UP:
subscribe(this, fd, LOOKIP_NOTIFY_UP);
return FALSE;
case LOOKIP_REGISTER_DOWN:
subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
return FALSE;
case LOOKIP_END:
return TRUE;
default:
DBG1(DBG_CFG, "received unknown lookip command");
return TRUE;
}
}
/**
* Accept client connections, dispatch
*/
static job_requeue_t receive(private_lookip_socket_t *this)
{
struct sockaddr_un addr;
int fd, maxfd, len;
bool oldstate;
fd_set fds;
while (TRUE)
{
maxfd = build_fds(this, &fds);
oldstate = thread_cancelability(TRUE);
if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
{
thread_cancelability(oldstate);
DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
strerror(errno));
break; break;
} }
break; thread_cancelability(oldstate);
}
if (!subscribed) if (FD_ISSET(this->socket, &fds))
{ /* don't close if we queued the fd */ { /* new connection, accept() */
close(fd); len = sizeof(addr);
} fd = accept(this->socket, (struct sockaddr*)&addr, &len);
if (fd != -1)
{
this->mutex->lock(this->mutex);
this->connected->insert_last(this->connected,
(void*)(uintptr_t)fd);
this->mutex->unlock(this->mutex);
} }
else else
{ {
DBG1(DBG_CFG, "accepting lookip connection failed: %s", DBG1(DBG_CFG, "accepting lookip connection failed: %s",
strerror(errno)); strerror(errno));
} }
continue;
}
fd = scan_fds(this, &fds);
if (fd == -1)
{
continue;
}
if (dispatch(this, fd))
{
this->mutex->lock(this->mutex);
this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
this->mutex->unlock(this->mutex);
if (!subscribed(this, fd))
{
close(fd);
}
}
}
return JOB_REQUEUE_FAIR; return JOB_REQUEUE_FAIR;
} }
@ -297,6 +408,7 @@ METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this) private_lookip_socket_t *this)
{ {
this->registered->destroy_function(this->registered, (void*)entry_destroy); this->registered->destroy_function(this->registered, (void*)entry_destroy);
this->connected->destroy(this->connected);
this->mutex->destroy(this->mutex); this->mutex->destroy(this->mutex);
close(this->socket); close(this->socket);
free(this); free(this);
@ -315,6 +427,7 @@ lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
}, },
.listener = listener, .listener = listener,
.registered = linked_list_create(), .registered = linked_list_create(),
.connected = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT), .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
); );