diff options
author | dimitri staessens <[email protected]> | 2017-08-15 19:30:03 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-08-16 08:18:03 +0200 |
commit | c995538b1c6483996c979df62feee3d79acd0e45 (patch) | |
tree | 67fb33f13b02be7b9b65b47957ce5065ebb97411 /src/ipcpd | |
parent | 095c2414425952836e97d88a6dde6f4415725c68 (diff) | |
download | ouroboros-c995538b1c6483996c979df62feee3d79acd0e45.tar.gz ouroboros-c995538b1c6483996c979df62feee3d79acd0e45.zip |
irmd, ipcpd: Listen on a dedicated accept() thread
The IRMd and IPCPs will now call accept on their command sockets from
a single thread that will dispatch work to the other threads.
This solves a problem on OS X and FreeBSD where accept() doesn't time
out when setting SO_RCVTIMEO on the socket. Calling kqueue or select()
on that socket to wait for events before calling accept() didn't solve
it since select() or kqueue() might wake up multiple threads, with the
non-working threads again blocked on the accept() on shutdown.
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/ipcp.c | 199 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 8 |
2 files changed, 146 insertions, 61 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a8ff4c94..c5769f9e 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -91,67 +91,115 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } +static void * acceptloop(void * o) +{ + int csockfd; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; +#if defined(__FreeBSD__) || defined(__APPLE__) + fd_set fds; + struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; +#endif + (void) o; + + while (ipcp_get_state() != IPCP_SHUTDOWN && + ipcp_get_state() != IPCP_NULL) { + ssize_t count; +#if defined(__FreeBSD__) || defined(__APPLE__) + FD_ZERO(&fds); + FD_SET(ipcpi.sockfd, &fds); + if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) + continue; +#endif + csockfd = accept(ipcpi.sockfd, 0, 0); + if (csockfd < 0) + continue; + + if (setsockopt(csockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + log_warn("Failed to set timeout on socket."); + + pthread_mutex_lock(&ipcpi.cmd_lock); + + assert(ipcpi.csockfd == -1); + + count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); + if (count <= 0) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + log_err("Failed to read from socket."); + close(csockfd); + continue; + } + + ipcpi.cmd_len = count; + ipcpi.csockfd = csockfd; + + pthread_cond_signal(&ipcpi.cmd_cond); + + while (ipcpi.csockfd != -1) + pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + } + + return (void *) 0; +} + static void * mainloop(void * o) { - int lsockfd; - uint8_t buf[IPCP_MSG_BUF_SIZE]; - ssize_t count; + int sfd; buffer_t buffer; struct ipcp_config conf; struct dif_info info; - ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; - ipcp_msg_t ret_msg = IPCP_MSG__INIT; - dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; - struct timeval ltv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; - + struct timespec dl; + struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), + (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { -#ifdef __FreeBSD__ - fd_set fds; - struct timeval timeout = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * 1000}; -#endif - int fd = -1; - - if (ipcp_get_state() == IPCP_SHUTDOWN || - ipcp_get_state() == IPCP_NULL || - tpm_check()) { - tpm_exit(); - break; - } + int ret = 0; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; + dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; + int fd = -1; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; -#ifdef __FreeBSD__ - FD_ZERO(&fds); - FD_SET(ipcpi.sockfd, &fds); - if (select(ipcpi.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) - continue; -#endif - lsockfd = accept(ipcpi.sockfd, 0, 0); - if (lsockfd < 0) - continue; - if (setsockopt(lsockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) <v, sizeof(ltv))) - log_warn("Failed to set timeout on socket."); + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); - count = read(lsockfd, buf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { - log_err("Failed to read from socket"); - close(lsockfd); + pthread_mutex_lock(&ipcpi.cmd_lock); + + while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, + &ipcpi.cmd_lock, + &dl); + + sfd = ipcpi.csockfd; + ipcpi.csockfd = -1; + + if (sfd == -1) { + pthread_mutex_unlock(&ipcpi.cmd_lock); + if (tpm_check()) { + close(sfd); + break; + } continue; } - msg = ipcp_msg__unpack(NULL, count, buf); + pthread_cond_broadcast(&ipcpi.cmd_cond); + + msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - close(lsockfd); + pthread_mutex_unlock(&ipcpi.cmd_lock); + close(sfd); continue; } + pthread_mutex_unlock(&ipcpi.cmd_lock); + tpm_dec(); switch (msg->code) { @@ -398,7 +446,7 @@ static void * mainloop(void * o) buffer.len = ipcp_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { log_err("Failed to pack reply message"); - close(lsockfd); + close(sfd); tpm_inc(); continue; } @@ -406,27 +454,29 @@ static void * mainloop(void * o) buffer.data = malloc(buffer.len); if (buffer.data == NULL) { log_err("Failed to create reply buffer."); - close(lsockfd); + close(sfd); tpm_inc(); continue; } ipcp_msg__pack(&ret_msg, buffer.data); - if (write(lsockfd, buffer.data, buffer.len) == -1) { + if (write(sfd, buffer.data, buffer.len) == -1) { log_err("Failed to send reply message"); free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); continue; } free(buffer.data); - close(lsockfd); + close(sfd); tpm_inc(); } + tpm_exit(); + return (void *) 0; } @@ -526,22 +576,30 @@ int ipcp_init(int argc, goto fail_alloc_lock; } - if (pthread_cond_init(&ipcpi.alloc_cond, NULL)) { + if (pthread_cond_init(&ipcpi.alloc_cond, &cattr)) { log_err("Failed to init convar."); goto fail_alloc_cond; } - ipcpi.alloc_id = -1; + if (pthread_mutex_init(&ipcpi.cmd_lock, NULL)) { + log_err("Failed to init mutex."); + goto fail_cmd_lock; + } - if (type == IPCP_NORMAL) { - pthread_condattr_destroy(&cattr); - return 0; + if (pthread_cond_init(&ipcpi.cmd_cond, &cattr)) { + log_err("Failed to init convar."); + goto fail_cmd_cond; } - ipcpi.shim_data = shim_data_create(); - if (ipcpi.shim_data == NULL) { - ret = -ENOMEM; - goto fail_shim_data; + ipcpi.alloc_id = -1; + ipcpi.csockfd = -1; + + if (type != IPCP_NORMAL) { + ipcpi.shim_data = shim_data_create(); + if (ipcpi.shim_data == NULL) { + ret = -ENOMEM; + goto fail_shim_data; + } } pthread_condattr_destroy(&cattr); @@ -549,6 +607,10 @@ int ipcp_init(int argc, return 0; fail_shim_data: + pthread_cond_destroy(&ipcpi.cmd_cond); + fail_cmd_cond: + pthread_mutex_destroy(&ipcpi.cmd_lock); + fail_cmd_lock: pthread_cond_destroy(&ipcpi.alloc_cond); fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); @@ -590,26 +652,39 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - return -1; + goto fail_tpm_init; - if (tpm_start()) { - tpm_fini(); - return -1; - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + + if (tpm_start()) + goto fail_tpm_start; ipcp_set_state(IPCP_INIT); + if (pthread_create(&ipcpi.acceptor, NULL, acceptloop, NULL)) { + log_err("Failed to create acceptor thread."); + ipcp_set_state(IPCP_NULL); + goto fail_acceptor; + } + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); return 0; + + fail_acceptor: + tpm_stop(); + fail_tpm_start: + tpm_fini(); + fail_tpm_init: + return -1; } void ipcp_shutdown() { tpm_fini(); + pthread_join(ipcpi.acceptor, NULL); + log_info("IPCP %d shutting down.", getpid()); } @@ -627,6 +702,8 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); + pthread_cond_destroy(&ipcpi.cmd_cond); + pthread_mutex_destroy(&ipcpi.cmd_lock); log_info("IPCP %d out.", getpid()); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9ce3ed77..d2ad7cde 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -26,6 +26,7 @@ #include <ouroboros/config.h> #include <ouroboros/hash.h> #include <ouroboros/ipcp.h> +#include <ouroboros/sockets.h> #include "shim-data.h" @@ -89,10 +90,17 @@ struct ipcp { int sockfd; char * sock_path; + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t cmd_len; + int csockfd; + pthread_cond_t cmd_cond; + pthread_mutex_t cmd_lock; + int alloc_id; pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; + pthread_t acceptor; } ipcpi; int ipcp_init(int argc, |