From ff5063ad0e7902ce59864a466bd9d8d606d788e4 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 24 Sep 2017 14:34:03 +0200 Subject: ipcpd: Add threadpool manager to DHT This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads. --- src/ipcpd/ipcp.c | 99 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 54 insertions(+), 45 deletions(-) (limited to 'src/ipcpd/ipcp.c') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@ #include #include #include -#include #include "ipcp.h" @@ -44,6 +43,14 @@ #include #include +struct cmd { + struct list_head next; + + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t len; + int fd; +}; + void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o) while (ipcp_get_state() != IPCP_SHUTDOWN && ipcp_get_state() != IPCP_NULL) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&ipcpi.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory"); + break; + } - assert(ipcpi.csockfd == -1); + pthread_mutex_lock(&ipcpi.cmd_lock); - count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&ipcpi.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - ipcpi.cmd_len = count; - ipcpi.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&ipcpi.cmd_cond); + list_add(&cmd->next, &ipcpi.cmds); - while (ipcpi.csockfd != -1) - pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); + pthread_cond_signal(&ipcpi.cmd_cond); pthread_mutex_unlock(&ipcpi.cmd_lock); } @@ -159,13 +170,15 @@ static void * mainloop(void * o) struct timespec dl; struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; - (void) o; + + (void) o; while (true) { int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; + struct cmd * cmd; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o) pthread_mutex_lock(&ipcpi.cmd_lock); - while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, &ipcpi.cmd_lock, &dl); - sfd = ipcpi.csockfd; - ipcpi.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check()) + if (tpm_check(ipcpi.tpm)) break; continue; } - pthread_cond_signal(&ipcpi.acc_cond); + cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + + msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&ipcpi.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&ipcpi.cmd_lock); - - tpm_dec(); + tpm_dec(ipcpi.tpm); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -482,7 +496,7 @@ static void * mainloop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -492,17 +506,17 @@ static void * mainloop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); } - tpm_exit(); + tpm_exit(ipcpi.tpm); return (void *) 0; } @@ -617,20 +631,14 @@ int ipcp_init(int argc, goto fail_cmd_cond; } - if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { - log_err("Failed to init convar."); - goto fail_acc_cond; - } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; - ipcpi.csockfd = -1; pthread_condattr_destroy(&cattr); return 0; - fail_acc_cond: - pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&ipcpi.cmd_lock); fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - goto fail_tpm_init; + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) + goto fail_tpm_create; pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_start()) + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot() return 0; fail_acceptor: - tpm_stop(); + tpm_stop(ipcpi.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(ipcpi.tpm); + fail_tpm_create: return -1; } void ipcp_shutdown() { pthread_join(ipcpi.acceptor, NULL); - tpm_stop(); - tpm_fini(); + tpm_stop(ipcpi.tpm); + tpm_destroy(ipcpi.tpm); log_info("IPCP %d shutting down.", getpid()); } @@ -724,7 +734,6 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); - pthread_cond_destroy(&ipcpi.acc_cond); pthread_cond_destroy(&ipcpi.cmd_cond); pthread_mutex_destroy(&ipcpi.cmd_lock); -- cgit v1.2.3