summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-07-25 19:54:39 +0200
committerdimitri staessens <[email protected]>2017-07-26 10:39:51 +0200
commit809abada865727ea986d69afcf2a9a3b00df560a (patch)
treee6cdc6bfba21e87be04df6d6fa62490813d94ce3 /src/irmd
parent36343a9c19fca9494881a9529b5fbbb0d51c1900 (diff)
downloadouroboros-809abada865727ea986d69afcf2a9a3b00df560a.tar.gz
ouroboros-809abada865727ea986d69afcf2a9a3b00df560a.zip
lib: Add threadpool manager
This adds a threadpool manager component in the library that is used in the IRMd and IPCPs. The threadpool manager now doesn't detach threads but does a join when they exit. This solves a data race in the previous implementation where some threads were not completely finished upon release of some resources.
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/main.c209
1 files changed, 26 insertions, 183 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index b72893ba..8b22bdef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -36,6 +36,7 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/logs.h>
#include "utils.h"
@@ -99,18 +100,9 @@ struct irm {
struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
int sockfd; /* UNIX socket */
- pthread_t * threadpool; /* pool of mainloop threads */
-
- struct bmp * thread_ids; /* ids for mainloop threads */
- size_t max_threads; /* max threads set by tpm */
- size_t threads; /* available mainloop threads */
- pthread_cond_t threads_cond; /* signal thread entry/exit */
- pthread_mutex_t threads_lock; /* mutex for threads/condvar */
-
enum irm_state state; /* state of the irmd */
pthread_rwlock_t state_lock; /* lock for the entire irmd */
- pthread_t tpm; /* threadpool manager */
pthread_t irm_sanitize; /* clean up irmd resources */
pthread_t shm_sanitize; /* keep track of rdrbuff use */
} irmd;
@@ -478,7 +470,7 @@ static int enroll_ipcp(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
if (ipcp_enroll(api, dst_name, &info) < 0) {
- log_err("Could not enroll IPCP.");
+ log_err("Could not enroll IPCP %d.", api);
return -1;
}
@@ -1426,16 +1418,6 @@ static void irm_fini(void)
if (irmd_get_state() != IRMD_NULL)
log_warn("Unsafe destroy.");
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.thread_ids != NULL)
- bmp_destroy(irmd.thread_ids);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- if (irmd.threadpool != NULL)
- free(irmd.threadpool);
-
pthread_rwlock_wrlock(&irmd.flows_lock);
if (irmd.port_ids != NULL)
@@ -1509,8 +1491,8 @@ void irmd_sig_handler(int sig,
}
log_info("IRMd shutting down...");
-
irmd_set_state(IRMD_NULL);
+ tpm_stop();
break;
case SIGPIPE:
log_dbg("Ignored SIGPIPE.");
@@ -1692,55 +1674,11 @@ void * irm_sanitize(void * o)
}
}
-static void thread_inc(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- ++irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static void thread_dec(void)
-{
- pthread_mutex_lock(&irmd.threads_lock);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
-static bool thread_check(void)
-{
- int ret;
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- ret = irmd.threads > irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
-
- return ret;
-}
-
-static void thread_exit(ssize_t id)
-{
- pthread_mutex_lock(&irmd.threads_lock);
- bmp_release(irmd.thread_ids, id);
-
- --irmd.threads;
- pthread_cond_signal(&irmd.threads_cond);
-
- pthread_mutex_unlock(&irmd.threads_lock);
-}
-
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- ssize_t id = (ssize_t) o;
+ (void) o;
while (true) {
#ifdef __FreeBSD__
@@ -1760,8 +1698,8 @@ void * mainloop(void * o)
struct timeval tv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
- if (irmd_get_state() != IRMD_RUNNING || thread_check()) {
- thread_exit(id);
+ if (irmd_get_state() != IRMD_RUNNING || tpm_check()) {
+ tpm_exit();
break;
}
@@ -1772,7 +1710,6 @@ void * mainloop(void * o)
if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
-
cli_sockfd = accept(irmd.sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1790,7 +1727,7 @@ void * mainloop(void * o)
if (irmd_get_state() != IRMD_RUNNING) {
close(cli_sockfd);
- thread_exit(id);
+ tpm_exit();
break;
}
@@ -1800,7 +1737,7 @@ void * mainloop(void * o)
continue;
}
- thread_dec();
+ tpm_dec();
if (msg->has_timeo_sec) {
assert(msg->has_timeo_nsec);
@@ -1929,7 +1866,7 @@ void * mainloop(void * o)
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1939,7 +1876,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1948,7 +1885,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
- thread_inc();
+ tpm_inc();
continue;
}
@@ -1963,70 +1900,7 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
- thread_inc();
- }
-
- return (void *) 0;
-}
-
-void * threadpoolmgr(void * o)
-{
- pthread_attr_t pattr;
- struct timespec dl;
- struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
- (IRMD_TPM_TIMEOUT % 1000) * MILLION};
- (void) o;
-
- if (pthread_attr_init(&pattr))
- return (void *) -1;
-
- pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
-
- while (true) {
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
- if (irmd_get_state() != IRMD_RUNNING) {
- pthread_attr_destroy(&pattr);
- log_dbg("Waiting for threads to exit.");
- pthread_mutex_lock(&irmd.threads_lock);
- while (irmd.threads > 0)
- pthread_cond_wait(&irmd.threads_cond,
- &irmd.threads_lock);
- pthread_mutex_unlock(&irmd.threads_lock);
- log_dbg("Threadpool manager done.");
- break;
- }
-
- pthread_mutex_lock(&irmd.threads_lock);
-
- if (irmd.threads < IRMD_MIN_AV_THREADS) {
- log_dbg("Increasing threadpool.");
- irmd.max_threads = IRMD_MAX_AV_THREADS;
-
- while (irmd.threads < irmd.max_threads) {
- ssize_t id = bmp_allocate(irmd.thread_ids);
- if (!bmp_is_id_valid(irmd.thread_ids, id)) {
- log_warn("IRMd threadpool exhausted.");
- break;
- }
-
- if (pthread_create(&irmd.threadpool[id],
- &pattr, mainloop,
- (void *) id))
- log_warn("Failed to start new thread.");
- else
- ++irmd.threads;
- }
- }
-
- if (pthread_cond_timedwait(&irmd.threads_cond,
- &irmd.threads_lock,
- &dl) == ETIMEDOUT)
- if (irmd.threads > IRMD_MIN_AV_THREADS )
- --irmd.max_threads;
-
- pthread_mutex_unlock(&irmd.threads_lock);
+ tpm_inc();
}
return (void *) 0;
@@ -2035,7 +1909,6 @@ void * threadpoolmgr(void * o)
static int irm_init(void)
{
struct stat st;
- pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -2058,24 +1931,6 @@ static int irm_init(void)
goto fail_flows_lock;
}
- if (pthread_mutex_init(&irmd.threads_lock, NULL)) {
- log_err("Failed to initialize mutex.");
- goto fail_threads_lock;
- }
-
- if (pthread_condattr_init(&cattr)) {
- log_err("Failed to initialize condattr.");
- goto fail_cattr;
- }
-
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&irmd.threads_cond, &cattr)) {
- log_err("Failed to initialize cond.");
- goto fail_threads_cond;
- }
-
list_head_init(&irmd.ipcps);
list_head_init(&irmd.api_table);
list_head_init(&irmd.apn_table);
@@ -2089,18 +1944,6 @@ static int irm_init(void)
goto fail_port_ids;
}
- irmd.thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
- if (irmd.thread_ids == NULL) {
- log_err("Failed to thread thread_ids bitmap.");
- goto fail_thread_ids;
- }
-
- irmd.threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
- if (irmd.threadpool == NULL) {
- log_err("Failed to malloc threadpool");
- goto fail_thrpool;
- }
-
if ((irmd.lf = lockfile_create()) == NULL) {
if ((irmd.lf = lockfile_open()) == NULL) {
log_err("Lockfile error.");
@@ -2155,8 +1998,6 @@ static int irm_init(void)
goto fail_rdrbuff;
}
- irmd.threads = 0;
- irmd.max_threads = IRMD_MIN_AV_THREADS;
irmd.state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2172,18 +2013,8 @@ fail_sock_path:
fail_stat:
lockfile_destroy(irmd.lf);
fail_lockfile:
- free(irmd.threadpool);
-fail_thrpool:
- bmp_destroy(irmd.thread_ids);
-fail_thread_ids:
bmp_destroy(irmd.port_ids);
fail_port_ids:
- pthread_cond_destroy(&irmd.threads_cond);
-fail_threads_cond:
- pthread_condattr_destroy(&cattr);
-fail_cattr:
- pthread_mutex_destroy(&irmd.threads_lock);
-fail_threads_lock:
pthread_rwlock_destroy(&irmd.flows_lock);
fail_flows_lock:
pthread_rwlock_destroy(&irmd.reg_lock);
@@ -2253,12 +2084,24 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- pthread_create(&irmd.tpm, NULL, threadpoolmgr, NULL);
+ if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) {
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
+
+ if (tpm_start()) {
+ tpm_fini();
+ log_fini();
+ exit(EXIT_FAILURE);
+ }
pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb);
- pthread_join(irmd.tpm, NULL);
+ /* tpm_stop() called from sighandler */
+
+ tpm_fini();
+
pthread_join(irmd.irm_sanitize, NULL);
pthread_join(irmd.shm_sanitize, NULL);