summaryrefslogtreecommitdiff
path: root/src/ipcpd/ipcp.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-07-25 19:54:39 +0200
committerdimitri staessens <[email protected]>2017-07-26 10:39:51 +0200
commit809abada865727ea986d69afcf2a9a3b00df560a (patch)
treee6cdc6bfba21e87be04df6d6fa62490813d94ce3 /src/ipcpd/ipcp.c
parent36343a9c19fca9494881a9529b5fbbb0d51c1900 (diff)
downloadouroboros-809abada865727ea986d69afcf2a9a3b00df560a.tar.gz
ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.zip
lib: Add threadpool manager
This adds a threadpool manager component in the library that is used in the IRMd and IPCPs. The threadpool manager now doesn't detach threads but does a join when they exit. This solves a data race in the previous implementation where some threads were not completely finished upon release of some resources.
Diffstat (limited to 'src/ipcpd/ipcp.c')
-rw-r--r--src/ipcpd/ipcp.c181
1 files changed, 21 insertions, 160 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 0d6d850f..a56e46f7 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -31,6 +31,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
+#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -56,6 +57,8 @@ void ipcp_sig_handler(int sig,
if (ipcp_get_state() == IPCP_OPERATIONAL)
ipcp_set_state(IPCP_SHUTDOWN);
}
+
+ tpm_stop();
default:
return;
}
@@ -87,51 +90,7 @@ void ipcp_hash_str(char * buf,
buf[2 * i] = '\0';
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ++ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- ret = ipcpi.threads > ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&ipcpi.threads_lock);
- bmp_release(ipcpi.thread_ids, id);
-
- --ipcpi.threads;
- pthread_cond_signal(&ipcpi.threads_cond);
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
-}
-
-static void * ipcp_main_loop(void * o)
+static void * mainloop(void * o)
{
int lsockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
@@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o)
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o)
if (ipcp_get_state() == IPCP_SHUTDOWN ||
ipcp_get_state() == IPCP_NULL ||
- thread_check()) {
- thread_exit(id);
+ tpm_check()) {
+ tpm_exit();
break;
}
@@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -408,7 +367,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -416,7 +375,7 @@ static void * ipcp_main_loop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -426,14 +385,14 @@ static void * ipcp_main_loop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
continue;
}
free(buffer.data);
close(lsockfd);
- thread_inc();
+ tpm_inc();
}
return (void *) 0;
@@ -496,15 +455,6 @@ int ipcp_init(int argc,
ipcpi.state = IPCP_NULL;
ipcpi.shim_data = NULL;
- ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS);
- if (ipcpi.threadpool == NULL) {
- ret = -ENOMEM;
- goto fail_thr;
- }
-
- ipcpi.threads = 0;
- ipcpi.max_threads = IPCP_MIN_AV_THREADS;
-
ipcpi.sock_path = ipcp_sock_path(getpid());
if (ipcpi.sock_path == NULL)
goto fail_sock_path;
@@ -526,11 +476,6 @@ int ipcp_init(int argc,
goto fail_state_mtx;
}
- if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) {
- log_err("Could not create mutex.");
- goto fail_thread_lock;
- }
-
if (pthread_condattr_init(&cattr)) {
log_err("Could not create condattr.");
goto fail_cond_attr;
@@ -544,17 +489,6 @@ int ipcp_init(int argc,
goto fail_state_cond;
}
- if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) {
- log_err("Could not init condvar.");
- goto fail_thread_cond;
- }
-
- ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0);
- if (ipcpi.thread_ids == NULL) {
- log_err("Could not init condvar.");
- goto fail_bmp;
- }
-
if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) {
log_err("Failed to init mutex.");
goto fail_alloc_lock;
@@ -587,94 +521,21 @@ int ipcp_init(int argc,
fail_alloc_cond:
pthread_mutex_destroy(&ipcpi.alloc_lock);
fail_alloc_lock:
- bmp_destroy(ipcpi.thread_ids);
- fail_bmp:
- pthread_cond_destroy(&ipcpi.threads_cond);
- fail_thread_cond:
pthread_cond_destroy(&ipcpi.state_cond);
fail_state_cond:
pthread_condattr_destroy(&cattr);
fail_cond_attr:
- pthread_mutex_destroy(&ipcpi.threads_lock);
- fail_thread_lock:
pthread_mutex_destroy(&ipcpi.state_mtx);
fail_state_mtx:
close(ipcpi.sockfd);
fail_serv_sock:
free(ipcpi.sock_path);
fail_sock_path:
- free(ipcpi.threadpool);
- fail_thr:
ouroboros_fini();
return ret;
}
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (ipcp_get_state() == IPCP_SHUTDOWN ||
- ipcp_get_state() == IPCP_NULL) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&ipcpi.threads_lock);
- while (ipcpi.threads > 0)
- pthread_cond_wait(&ipcpi.threads_cond,
- &ipcpi.threads_lock);
- pthread_mutex_unlock(&ipcpi.threads_lock);
-
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&ipcpi.threads_lock);
-
- if (ipcpi.threads < IPCP_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- ipcpi.max_threads = IPCP_MAX_AV_THREADS;
-
- while (ipcpi.threads < ipcpi.max_threads) {
- ssize_t id = bmp_allocate(ipcpi.thread_ids);
- if (!bmp_is_id_valid(ipcpi.thread_ids, id)) {
- log_warn("IPCP threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&ipcpi.threadpool[id],
- &pattr, ipcp_main_loop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++ipcpi.threads;
- }
- }
-
- if (pthread_cond_timedwait(&ipcpi.threads_cond,
- &ipcpi.threads_lock,
- &dl) == ETIMEDOUT)
- if (ipcpi.threads > IPCP_MIN_AV_THREADS)
- --ipcpi.max_threads;
-
- pthread_mutex_unlock(&ipcpi.threads_lock);
- }
-
- return (void *) 0;
-}
-
int ipcp_boot()
{
struct sigaction sig_act;
@@ -699,9 +560,15 @@ int ipcp_boot()
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- ipcp_set_state(IPCP_INIT);
+ if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
+ return -1;
+
+ if (tpm_start()) {
+ tpm_fini();
+ return -1;
+ }
- pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL);
+ ipcp_set_state(IPCP_INIT);
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
@@ -710,8 +577,7 @@ int ipcp_boot()
void ipcp_shutdown()
{
- pthread_join(ipcpi.tpm, NULL);
-
+ tpm_fini();
log_info("IPCP %d shutting down.", getpid());
}
@@ -721,16 +587,11 @@ void ipcp_fini()
if (unlink(ipcpi.sock_path))
log_warn("Could not unlink %s.", ipcpi.sock_path);
- bmp_destroy(ipcpi.thread_ids);
-
free(ipcpi.sock_path);
- free(ipcpi.threadpool);
shim_data_destroy(ipcpi.shim_data);
pthread_cond_destroy(&ipcpi.state_cond);
- pthread_cond_destroy(&ipcpi.threads_cond);
- pthread_mutex_destroy(&ipcpi.threads_lock);
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);