diff options
author | dimitri staessens <[email protected]> | 2017-07-25 19:54:39 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-07-26 10:39:51 +0200 |
commit | 809abada865727ea986d69afcf2a9a3b00df560a (patch) | |
tree | e6cdc6bfba21e87be04df6d6fa62490813d94ce3 /src/ipcpd/ipcp.c | |
parent | 36343a9c19fca9494881a9529b5fbbb0d51c1900 (diff) | |
download | ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.tar.gz ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.zip |
lib: Add threadpool manager
This adds a threadpool manager component in the library that is used
in the IRMd and IPCPs. The threadpool manager now doesn't detach
threads but does a join when they exit. This solves a data race in the
previous implementation where some threads were not completely finished
upon release of some resources.
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r-- | src/ipcpd/ipcp.c | 181 |
1 files changed, 21 insertions, 160 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 0d6d850f..a56e46f7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -31,6 +31,7 @@ #include <ouroboros/dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> +#include <ouroboros/tpm.h> #include "ipcp.h" @@ -56,6 +57,8 @@ void ipcp_sig_handler(int sig, if (ipcp_get_state() == IPCP_OPERATIONAL) ipcp_set_state(IPCP_SHUTDOWN); } + + tpm_stop(); default: return; } @@ -87,51 +90,7 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } -static void thread_inc(void) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - - ++ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void thread_dec(void) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - - --ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static bool thread_check(void) -{ - int ret; - - pthread_mutex_lock(&ipcpi.threads_lock); - - ret = ipcpi.threads > ipcpi.max_threads; - - pthread_mutex_unlock(&ipcpi.threads_lock); - - return ret; -} - -static void thread_exit(ssize_t id) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - bmp_release(ipcpi.thread_ids, id); - - --ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void * ipcp_main_loop(void * o) +static void * mainloop(void * o) { int lsockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; @@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o) struct timeval ltv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; - ssize_t id = (ssize_t) o; + (void) o; while (true) { #ifdef __FreeBSD__ @@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o) if (ipcp_get_state() == IPCP_SHUTDOWN || ipcp_get_state() == IPCP_NULL || - thread_check()) { - thread_exit(id); + tpm_check()) { + tpm_exit(); break; } @@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o) continue; } - thread_dec(); + tpm_dec(); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -408,7 +367,7 @@ static void * ipcp_main_loop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } @@ -416,7 +375,7 @@ static void * ipcp_main_loop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } @@ -426,14 +385,14 @@ static void * ipcp_main_loop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } free(buffer.data); close(lsockfd); - thread_inc(); + tpm_inc(); } return (void *) 0; @@ -496,15 +455,6 @@ int ipcp_init(int argc, ipcpi.state = IPCP_NULL; ipcpi.shim_data = NULL; - ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS); - if (ipcpi.threadpool == NULL) { - ret = -ENOMEM; - goto fail_thr; - } - - ipcpi.threads = 0; - ipcpi.max_threads = IPCP_MIN_AV_THREADS; - ipcpi.sock_path = ipcp_sock_path(getpid()); if (ipcpi.sock_path == NULL) goto fail_sock_path; @@ -526,11 +476,6 @@ int ipcp_init(int argc, goto fail_state_mtx; } - if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { - log_err("Could not create mutex."); - goto fail_thread_lock; - } - if (pthread_condattr_init(&cattr)) { log_err("Could not create condattr."); goto fail_cond_attr; @@ -544,17 +489,6 @@ int ipcp_init(int argc, goto fail_state_cond; } - if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { - log_err("Could not init condvar."); - goto fail_thread_cond; - } - - ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); - if (ipcpi.thread_ids == NULL) { - log_err("Could not init condvar."); - goto fail_bmp; - } - if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) { log_err("Failed to init mutex."); goto fail_alloc_lock; @@ -587,94 +521,21 @@ int ipcp_init(int argc, fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); fail_alloc_lock: - bmp_destroy(ipcpi.thread_ids); - fail_bmp: - pthread_cond_destroy(&ipcpi.threads_cond); - fail_thread_cond: pthread_cond_destroy(&ipcpi.state_cond); fail_state_cond: pthread_condattr_destroy(&cattr); fail_cond_attr: - pthread_mutex_destroy(&ipcpi.threads_lock); - fail_thread_lock: pthread_mutex_destroy(&ipcpi.state_mtx); fail_state_mtx: close(ipcpi.sockfd); fail_serv_sock: free(ipcpi.sock_path); fail_sock_path: - free(ipcpi.threadpool); - fail_thr: ouroboros_fini(); return ret; } -void * threadpoolmgr(void * o) -{ - pthread_attr_t pattr; - struct timespec dl; - struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), - (IRMD_TPM_TIMEOUT % 1000) * MILLION}; - (void) o; - - if (pthread_attr_init(&pattr)) - return (void *) -1; - - pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - - while (true) { - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - - if (ipcp_get_state() == IPCP_SHUTDOWN || - ipcp_get_state() == IPCP_NULL) { - pthread_attr_destroy(&pattr); - log_dbg("Waiting for threads to exit."); - pthread_mutex_lock(&ipcpi.threads_lock); - while (ipcpi.threads > 0) - pthread_cond_wait(&ipcpi.threads_cond, - &ipcpi.threads_lock); - pthread_mutex_unlock(&ipcpi.threads_lock); - - log_dbg("Threadpool manager done."); - break; - } - - pthread_mutex_lock(&ipcpi.threads_lock); - - if (ipcpi.threads < IPCP_MIN_AV_THREADS) { - log_dbg("Increasing threadpool."); - ipcpi.max_threads = IPCP_MAX_AV_THREADS; - - while (ipcpi.threads < ipcpi.max_threads) { - ssize_t id = bmp_allocate(ipcpi.thread_ids); - if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { - log_warn("IPCP threadpool exhausted."); - break; - } - - if (pthread_create(&ipcpi.threadpool[id], - &pattr, ipcp_main_loop, - (void *) id)) - log_warn("Failed to start new thread."); - else - ++ipcpi.threads; - } - } - - if (pthread_cond_timedwait(&ipcpi.threads_cond, - &ipcpi.threads_lock, - &dl) == ETIMEDOUT) - if (ipcpi.threads > IPCP_MIN_AV_THREADS) - --ipcpi.max_threads; - - pthread_mutex_unlock(&ipcpi.threads_lock); - } - - return (void *) 0; -} - int ipcp_boot() { struct sigaction sig_act; @@ -699,9 +560,15 @@ int ipcp_boot() pthread_sigmask(SIG_BLOCK, &sigset, NULL); - ipcp_set_state(IPCP_INIT); + if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) + return -1; + + if (tpm_start()) { + tpm_fini(); + return -1; + } - pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); + ipcp_set_state(IPCP_INIT); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); @@ -710,8 +577,7 @@ int ipcp_boot() void ipcp_shutdown() { - pthread_join(ipcpi.tpm, NULL); - + tpm_fini(); log_info("IPCP %d shutting down.", getpid()); } @@ -721,16 +587,11 @@ void ipcp_fini() if (unlink(ipcpi.sock_path)) log_warn("Could not unlink %s.", ipcpi.sock_path); - bmp_destroy(ipcpi.thread_ids); - free(ipcpi.sock_path); - free(ipcpi.threadpool); shim_data_destroy(ipcpi.shim_data); pthread_cond_destroy(&ipcpi.state_cond); - pthread_cond_destroy(&ipcpi.threads_cond); - pthread_mutex_destroy(&ipcpi.threads_lock); pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); |