diff options
author | dimitri staessens <[email protected]> | 2017-08-12 22:29:04 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-08-12 23:01:11 +0200 |
commit | 7729888c3fe454733759903a56c0e3e82ac3f31b (patch) | |
tree | 1fdde62b7a8b16aead8bcfa39d0edd2cc8bfd63a /src/lib | |
parent | 2b42b1e1121dfd715a78502a3652d326330b8160 (diff) | |
download | ouroboros-7729888c3fe454733759903a56c0e3e82ac3f31b.tar.gz ouroboros-7729888c3fe454733759903a56c0e3e82ac3f31b.zip |
lib: Fix instability in threadpool manager
The threadpool manager now tracks threads to prevent cyclic behaviour
where too many threads shut down and the TPM responds with creating
additional threads.
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/tpm.c | 88 |
1 files changed, 57 insertions, 31 deletions
diff --git a/src/lib/tpm.c b/src/lib/tpm.c index 8298eeb5..f45744ee 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -29,6 +29,7 @@ #include <pthread.h> #include <stdlib.h> +#include <assert.h> #define TPM_TIMEOUT 1000 @@ -36,6 +37,7 @@ struct pthr_el { struct list_head next; bool join; + bool kill; pthread_t thr; }; @@ -49,8 +51,8 @@ enum tpm_state { struct { size_t min; size_t inc; - size_t max; size_t cur; + size_t wrk; void * (* func)(void *); @@ -71,9 +73,14 @@ static void tpm_join(void) list_for_each_safe(p, h, &tpm.pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (tpm.state != TPM_RUNNING) + if (tpm.state != TPM_RUNNING) { + if (!e->kill) { + e->kill = true; + --tpm.cur; + } while (!e->join) pthread_cond_wait(&tpm.cond, &tpm.lock); + } if (e->join) { pthread_join(e->thr, NULL); @@ -83,6 +90,37 @@ static void tpm_join(void) } } +static struct pthr_el * tpm_pthr_el(pthread_t thr) +{ + struct list_head * p; + struct pthr_el * e; + + list_for_each(p, &tpm.pool) { + e = list_entry(p, struct pthr_el, next); + if (e->thr == thr) + return e; + + } + + assert(false); + + return NULL; +} + +static void tpm_kill(void) +{ + struct list_head * p; + + list_for_each(p, &tpm.pool) { + struct pthr_el * e = list_entry(p, struct pthr_el, next); + if (!e->kill) { + e->kill = true; + --tpm.cur; + return; + } + } +} + static void * tpmgr(void * o) { struct timespec dl; @@ -96,39 +134,40 @@ static void * tpmgr(void * o) pthread_mutex_lock(&tpm.lock); - tpm_join(); - if (tpm.state != TPM_RUNNING) { - tpm.max = 0; tpm_join(); pthread_mutex_unlock(&tpm.lock); break; } - if (tpm.cur < tpm.min) { - tpm.max = tpm.inc; + tpm_join(); - while (tpm.cur < tpm.max) { + if (tpm.cur - tpm.wrk < tpm.min) { + size_t i; + for (i = 0; i < tpm.inc; ++i) { struct pthr_el * e = malloc(sizeof(*e)); if (e == NULL) break; e->join = false; + e->kill = false; if (pthread_create(&e->thr, NULL, tpm.func, NULL)) { free(e); - } else { - list_add(&e->next, &tpm.pool); - ++tpm.cur; + break; } + + list_add(&e->next, &tpm.pool); } + + tpm.cur += tpm.inc; } if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) == ETIMEDOUT) - if (tpm.cur > tpm.min ) - --tpm.max; + if (tpm.cur > tpm.min) + tpm_kill(); pthread_mutex_unlock(&tpm.lock); } @@ -162,8 +201,8 @@ int tpm_init(size_t min, tpm.func = func; tpm.min = min; tpm.inc = inc; - tpm.max = 0; tpm.cur = 0; + tpm.wrk = 0; return 0; @@ -214,7 +253,7 @@ bool tpm_check(void) pthread_mutex_lock(&tpm.lock); - ret = tpm.cur > tpm.max; + ret = tpm_pthr_el(pthread_self())->kill; pthread_mutex_unlock(&tpm.lock); @@ -225,7 +264,7 @@ void tpm_inc(void) { pthread_mutex_lock(&tpm.lock); - ++tpm.cur; + --tpm.wrk; pthread_mutex_unlock(&tpm.lock); } @@ -234,7 +273,7 @@ void tpm_dec(void) { pthread_mutex_lock(&tpm.lock); - --tpm.cur; + ++tpm.wrk; pthread_cond_signal(&tpm.cond); @@ -243,22 +282,9 @@ void tpm_dec(void) void tpm_exit(void) { - struct list_head * p; - pthread_t id; - - id = pthread_self(); - pthread_mutex_lock(&tpm.lock); - --tpm.cur; - - list_for_each(p, &tpm.pool) { - struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (e->thr == id) { - e->join = true; - break; - } - } + tpm_pthr_el(pthread_self())->join = true; pthread_cond_signal(&tpm.cond); |