diff options
author | dimitri staessens <[email protected]> | 2017-04-01 12:31:05 +0000 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2017-04-01 12:31:05 +0000 |
commit | 2725780520d0e2c6a2c49ac8e2124b5088cbe1bb (patch) | |
tree | 833a7f4133fb3f3f8d746343cff6fb9fc40f7829 /src | |
parent | d9f3619d791fef7d79127556304a4aa4f1cda50a (diff) | |
parent | c72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (diff) | |
download | ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.tar.gz ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.zip |
Merged in dstaesse/ouroboros/be-irmd-threadpool (pull request #448)
Be irmd threadpool
Diffstat (limited to 'src')
-rw-r--r-- | src/irmd/main.c | 227 | ||||
-rw-r--r-- | src/lib/bitmap.c | 61 | ||||
-rw-r--r-- | src/lib/tests/bitmap_test.c | 35 |
3 files changed, 261 insertions, 62 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; /* registered names known */ - struct list_head ipcps; + struct list_head ipcps; /* list of ipcps in system */ - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; /* ap instances */ + struct list_head apn_table; /* ap names known */ + struct list_head spawned_apis; /* child ap instances */ + pthread_rwlock_t reg_lock; /* lock for registration info */ - /* keep track of all flows in this processing system */ - struct bmp * port_ids; - /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct bmp * port_ids; /* port_ids for flows */ + struct list_head irm_flows; /* flow information */ + pthread_rwlock_t flows_lock; /* lock for flows */ - struct lockfile * lf; - struct shm_rdrbuff * rdrb; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; /* single irmd per system */ + struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ - enum irm_state state; - pthread_rwlock_t state_lock; + pthread_t * threadpool; /* pool of mainloop threads */ - pthread_t irm_sanitize; - pthread_t shm_sanitize; + struct bmp * thread_ids; /* ids for mainloop threads */ + size_t max_threads; /* max threads set by tpm */ + size_t threads; /* available mainloop threads */ + pthread_cond_t threads_cond; /* signal thread entry/exit */ + pthread_mutex_t threads_lock; /* mutex for threads/condvar */ + + enum irm_state state; /* state of the irmd */ + pthread_rwlock_t state_lock; /* lock for the entire irmd */ + + pthread_t tpm; /* threadpool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ + pthread_t shm_sanitize; /* keep track of rdrbuff use */ } * irmd; static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void) if (irmd->state != IRMD_NULL) log_warn("Unsafe destroy."); + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->thread_ids != NULL) + bmp_destroy(irmd->thread_ids); + + pthread_mutex_unlock(&irmd->threads_lock); + if (irmd->threadpool != NULL) free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o) } } +static void thread_inc(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + ++irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&irmd->threads_lock); + + ret = irmd->threads > irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&irmd->threads_lock); + bmp_release(irmd->thread_ids, id); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o) (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd->state_lock); - if (irmd->state != IRMD_RUNNING) { + + if (irmd->state != IRMD_RUNNING || thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&irmd->state_lock); break; } + pthread_rwlock_unlock(&irmd->state_lock); ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o) if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0) continue; #endif + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1917,6 +1981,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1930,6 +1995,82 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); + + thread_inc(); + } + + return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ + bool ret; + pthread_mutex_lock(&irmd->threads_lock); + + ret = bmp_is_id_used(irmd->thread_ids, id); + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), + (IRMD_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&irmd->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IRMD_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(irmd->threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&irmd->state_lock); + + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->threads < IRMD_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + irmd->max_threads = IRMD_MAX_AV_THREADS; + + while (irmd->threads < irmd->max_threads) { + ssize_t id = bmp_allocate(irmd->thread_ids); + if (!bmp_is_id_valid(irmd->thread_ids, id)) { + log_warn("IRMd threadpool exhausted."); + break; + } + + if (pthread_create(&irmd->threadpool[id], + NULL, mainloop, (void *) id)) + log_warn("Failed to start new thread."); + else + ++irmd->threads; + } + } + + if (pthread_cond_timedwait(&irmd->threads_cond, + &irmd->threads_lock, + &dl) == ETIMEDOUT) + if (irmd->threads > IRMD_MIN_AV_THREADS) + --irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); } return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o) static int irm_create(void) { struct stat st; + pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void) return -1; } + if (pthread_mutex_init(&irmd->threads_lock, NULL)) { + log_err("Failed to initialize mutex."); + free(irmd); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + free(irmd); + return -1; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd->threads_cond, &cattr)) { + log_err("Failed to initialize cond."); + free(irmd); + return -1; + } + list_head_init(&irmd->ipcps); list_head_init(&irmd->api_table); list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void) return -ENOMEM; } - irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); + if (irmd->thread_ids == NULL) { + irm_destroy(); + return -ENOMEM; + } + + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); if (irmd->threadpool == NULL) { irm_destroy(); return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void) return -1; } - irmd->state = IRMD_RUNNING; + irmd->threads = 0; + irmd->max_threads = IRMD_MIN_AV_THREADS; + irmd->state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int argc, { struct sigaction sig_act; - int t = 0; - bool use_stdout = false; if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int argc, exit(EXIT_FAILURE); } - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); + pthread_join(irmd->tpm, NULL); pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); - /* Wait for (all of them) to return. */ - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(irmd->threadpool[t], NULL); - pthread_join(irmd->irm_sanitize, NULL); pthread_cancel(irmd->shm_sanitize); diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 93ffda77..bf9bb99d 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -38,7 +38,8 @@ #define BITS_TO_LONGS(nr) \ DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(size_t)) -static size_t find_next_zero_bit(const size_t * addr, size_t nbits) +static size_t find_next_zero_bit(const size_t * addr, + size_t nbits) { size_t tmp; size_t start = 0; @@ -65,13 +66,15 @@ static size_t find_next_zero_bit(const size_t * addr, size_t nbits) return (start * BITS_PER_LONG) + pos; } -static void bitmap_zero(size_t * dst, size_t nbits) +static void bitmap_zero(size_t * dst, + size_t nbits) { size_t len = BITS_TO_LONGS(nbits) * sizeof(size_t); memset(dst, 0, len); } -static void bitmap_clear(size_t * map, size_t start) +static void bitmap_clear(size_t * map, + size_t start) { size_t * p = map + BIT_WORD(start); size_t mask = ~(1UL << (start % (BITS_PER_LONG))); @@ -79,7 +82,8 @@ static void bitmap_clear(size_t * map, size_t start) *p &= mask; } -static void bitmap_set(size_t * map, size_t start) +static void bitmap_set(size_t * map, + size_t start) { size_t * p = map + BIT_WORD(start); size_t mask = 1UL << (start % (BITS_PER_LONG)); @@ -94,7 +98,8 @@ struct bmp { size_t * bitmap; }; -struct bmp * bmp_create(size_t bits, ssize_t offset) +struct bmp * bmp_create(size_t bits, + ssize_t offset) { struct bmp * tmp; @@ -118,20 +123,15 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return tmp; } -int bmp_destroy(struct bmp * b) +void bmp_destroy(struct bmp * b) { if (b == NULL) - return -1; + return; - if (b->bitmap == NULL) { - free(b); - return -1; - } + if (b->bitmap != NULL) + free(b->bitmap); - free(b->bitmap); free(b); - - return 0; } static ssize_t bad_id(struct bmp * b) @@ -158,7 +158,8 @@ ssize_t bmp_allocate(struct bmp * b) return id + b->offset; } -static bool is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_valid(struct bmp * b, + ssize_t id) { assert(b); @@ -168,7 +169,17 @@ static bool is_id_valid(struct bmp * b, ssize_t id) return true; } -bool bmp_is_id_valid(struct bmp * b, ssize_t id) +static bool is_id_used(size_t * map, + size_t start) +{ + size_t * p = map + BIT_WORD(start); + size_t mask = 1UL << (start % (BITS_PER_LONG)); + + return (*p & mask) != 0; +} + +bool bmp_is_id_valid(struct bmp * b, + ssize_t id) { if (b == NULL) return false; @@ -176,19 +187,25 @@ bool bmp_is_id_valid(struct bmp * b, ssize_t id) return is_id_valid(b, id); } -int bmp_release(struct bmp * b, ssize_t id) +int bmp_release(struct bmp * b, + ssize_t id) { - size_t rid; - if (b == NULL) return -1; if (!is_id_valid(b, id)) return -1; - rid = id - b->offset; - - bitmap_clear(b->bitmap, rid); + bitmap_clear(b->bitmap, id - b->offset); return 0; } + +bool bmp_is_id_used(struct bmp * b, + ssize_t id) +{ + if (b == NULL) + return false; + + return is_id_used(b->bitmap, id - b->offset); +} diff --git a/src/lib/tests/bitmap_test.c b/src/lib/tests/bitmap_test.c index 7480600e..e438f217 100644 --- a/src/lib/tests/bitmap_test.c +++ b/src/lib/tests/bitmap_test.c @@ -23,6 +23,7 @@ #include "bitmap.c" #include <time.h> #include <stdlib.h> +#include <stdio.h> #define BITMAP_SIZE 200 @@ -41,40 +42,56 @@ int bitmap_test(int argc, char ** argv) srand(time(NULL)); bmp = bmp_create(bits, offset); - if (bmp == NULL) + if (bmp == NULL) { + printf("Failed to create bmp.\n"); return -1; + } - if (bmp_destroy(bmp)) - return -1; + bmp_destroy(bmp); bmp = bmp_create(bits, offset); - if (bmp == NULL) + if (bmp == NULL) { + printf("Failed to re-create bmp.\n"); return -1; + } for (i = offset; i < BITMAP_SIZE + 5 + offset; i++) { id = bmp_allocate(bmp); if (!bmp_is_id_valid(bmp, id)) continue; - if (id != i) + if (!bmp_is_id_used(bmp, id)) { + printf("ID not marked in use.\n"); + bmp_destroy(bmp); return -1; + } + + if (id != i) { + printf("Wrong ID returned.\n"); + bmp_destroy(bmp); + return -1; + } } for (i = 0; i < BITMAP_SIZE + 5; i++) { r = (ssize_t) (rand() % BITMAP_SIZE) + offset; - if (bmp_release(bmp, r)) + if (bmp_release(bmp, r)) { + printf("Failed to release ID.\n"); return -1; + } id = bmp_allocate(bmp); if (!bmp_is_id_valid(bmp, id)) continue; - if (id != r) + if (id != r) { + printf("Wrong prev ID returned.\n"); + bmp_destroy(bmp); return -1; + } } - if (bmp_destroy(bmp)) - return -1; + bmp_destroy(bmp); return 0; } |