From c72634b5d921bc06d8e06afb2a60a05a1acb7ee2 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sat, 1 Apr 2017 13:45:51 +0200 Subject: irmd: Add dynamic threadpool This makes the IRMd add/remove worker threads dynamically. IRMD_TPM_TIMEOUT sets a timer in the threadpool manager for checking idle threads. Each time this timer expires, it will reduce the threadpool by one. IRMD_MIN_AV_THREADS is the minimum number of available worker threads. If the number of active threads goes under this threshold, the threadpool manager will create threads to get the number of threads to IRMD_MAX_AV_THREADS, unless IRMD_MAX_THREADS is reached. --- src/irmd/main.c | 227 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 196 insertions(+), 31 deletions(-) (limited to 'src/irmd/main.c') diff --git a/src/irmd/main.c b/src/irmd/main.c index 39f44c44..966be500 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -75,31 +75,37 @@ enum irm_state { }; struct irm { - struct list_head registry; + struct list_head registry; /* registered names known */ - struct list_head ipcps; + struct list_head ipcps; /* list of ipcps in system */ - struct list_head api_table; - struct list_head apn_table; - struct list_head spawned_apis; - pthread_rwlock_t reg_lock; + struct list_head api_table; /* ap instances */ + struct list_head apn_table; /* ap names known */ + struct list_head spawned_apis; /* child ap instances */ + pthread_rwlock_t reg_lock; /* lock for registration info */ - /* keep track of all flows in this processing system */ - struct bmp * port_ids; - /* maps port_ids to api pair */ - struct list_head irm_flows; - pthread_rwlock_t flows_lock; + struct bmp * port_ids; /* port_ids for flows */ + struct list_head irm_flows; /* flow information */ + pthread_rwlock_t flows_lock; /* lock for flows */ - struct lockfile * lf; - struct shm_rdrbuff * rdrb; - pthread_t * threadpool; - int sockfd; + struct lockfile * lf; /* single irmd per system */ + struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ + int sockfd; /* UNIX socket */ - enum irm_state state; - pthread_rwlock_t state_lock; + pthread_t * threadpool; /* pool of mainloop threads */ - pthread_t irm_sanitize; - pthread_t shm_sanitize; + struct bmp * thread_ids; /* ids for mainloop threads */ + size_t max_threads; /* max threads set by tpm */ + size_t threads; /* available mainloop threads */ + pthread_cond_t threads_cond; /* signal thread entry/exit */ + pthread_mutex_t threads_lock; /* mutex for threads/condvar */ + + enum irm_state state; /* state of the irmd */ + pthread_rwlock_t state_lock; /* lock for the entire irmd */ + + pthread_t tpm; /* threadpool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ + pthread_t shm_sanitize; /* keep track of rdrbuff use */ } * irmd; static void clear_irm_flow(struct irm_flow * f) { @@ -1449,6 +1455,13 @@ static void irm_destroy(void) if (irmd->state != IRMD_NULL) log_warn("Unsafe destroy."); + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->thread_ids != NULL) + bmp_destroy(irmd->thread_ids); + + pthread_mutex_unlock(&irmd->threads_lock); + if (irmd->threadpool != NULL) free(irmd->threadpool); @@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o) } } +static void thread_inc(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + ++irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static void thread_dec(void) +{ + pthread_mutex_lock(&irmd->threads_lock); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + +static bool thread_check(void) +{ + int ret; + + pthread_mutex_lock(&irmd->threads_lock); + + ret = irmd->threads > irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +static void thread_exit(ssize_t id) +{ + pthread_mutex_lock(&irmd->threads_lock); + bmp_release(irmd->thread_ids, id); + + --irmd->threads; + pthread_cond_signal(&irmd->threads_cond); + + pthread_mutex_unlock(&irmd->threads_lock); +} + void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - (void) o; + ssize_t id = (ssize_t) o; while (true) { #ifdef __FreeBSD__ @@ -1747,10 +1804,13 @@ void * mainloop(void * o) (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd->state_lock); - if (irmd->state != IRMD_RUNNING) { + + if (irmd->state != IRMD_RUNNING || thread_check()) { + thread_exit(id); pthread_rwlock_unlock(&irmd->state_lock); break; } + pthread_rwlock_unlock(&irmd->state_lock); ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1760,6 +1820,7 @@ void * mainloop(void * o) if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0) continue; #endif + cli_sockfd = accept(irmd->sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1781,6 +1842,8 @@ void * mainloop(void * o) continue; } + thread_dec(); + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1909,6 +1972,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1917,6 +1981,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); + thread_inc(); continue; } @@ -1930,6 +1995,82 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); + + thread_inc(); + } + + return (void *) 0; +} + +static bool is_thread_alive(ssize_t id) +{ + bool ret; + pthread_mutex_lock(&irmd->threads_lock); + + ret = bmp_is_id_used(irmd->thread_ids, id); + + pthread_mutex_unlock(&irmd->threads_lock); + + return ret; +} + +void * threadpoolmgr(void * o) +{ + struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), + (IRMD_TPM_TIMEOUT % 1000) * MILLION}; + struct timespec dl; + size_t t; + + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_rwlock_rdlock(&irmd->state_lock); + if (irmd->state != IRMD_RUNNING) { + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Threadpool manager exiting."); + for (t = 0; t < IRMD_MAX_THREADS; ++t) + if (is_thread_alive(t)) { + log_dbg("Waiting for thread %zd.", t); + pthread_join(irmd->threadpool[t], NULL); + } + + log_dbg("Threadpool manager done."); + break; + } + + pthread_rwlock_unlock(&irmd->state_lock); + + pthread_mutex_lock(&irmd->threads_lock); + + if (irmd->threads < IRMD_MIN_AV_THREADS) { + log_dbg("Increasing threadpool."); + irmd->max_threads = IRMD_MAX_AV_THREADS; + + while (irmd->threads < irmd->max_threads) { + ssize_t id = bmp_allocate(irmd->thread_ids); + if (!bmp_is_id_valid(irmd->thread_ids, id)) { + log_warn("IRMd threadpool exhausted."); + break; + } + + if (pthread_create(&irmd->threadpool[id], + NULL, mainloop, (void *) id)) + log_warn("Failed to start new thread."); + else + ++irmd->threads; + } + } + + if (pthread_cond_timedwait(&irmd->threads_cond, + &irmd->threads_lock, + &dl) == ETIMEDOUT) + if (irmd->threads > IRMD_MIN_AV_THREADS) + --irmd->max_threads; + + pthread_mutex_unlock(&irmd->threads_lock); } return (void *) 0; @@ -1938,6 +2079,7 @@ void * mainloop(void * o) static int irm_create(void) { struct stat st; + pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -1967,6 +2109,27 @@ static int irm_create(void) return -1; } + if (pthread_mutex_init(&irmd->threads_lock, NULL)) { + log_err("Failed to initialize mutex."); + free(irmd); + return -1; + } + + if (pthread_condattr_init(&cattr)) { + log_err("Failed to initialize condattr."); + free(irmd); + return -1; + } + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&irmd->threads_cond, &cattr)) { + log_err("Failed to initialize cond."); + free(irmd); + return -1; + } + list_head_init(&irmd->ipcps); list_head_init(&irmd->api_table); list_head_init(&irmd->apn_table); @@ -1980,7 +2143,13 @@ static int irm_create(void) return -ENOMEM; } - irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0); + if (irmd->thread_ids == NULL) { + irm_destroy(); + return -ENOMEM; + } + + irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); if (irmd->threadpool == NULL) { irm_destroy(); return -ENOMEM; @@ -2045,7 +2214,9 @@ static int irm_create(void) return -1; } - irmd->state = IRMD_RUNNING; + irmd->threads = 0; + irmd->max_threads = IRMD_MIN_AV_THREADS; + irmd->state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2063,8 +2234,6 @@ int main(int argc, { struct sigaction sig_act; - int t = 0; - bool use_stdout = false; if (geteuid() != 0) { @@ -2108,16 +2277,12 @@ int main(int argc, exit(EXIT_FAILURE); } - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL); + pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL); + pthread_join(irmd->tpm, NULL); pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb); - /* Wait for (all of them) to return. */ - for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) - pthread_join(irmd->threadpool[t], NULL); - pthread_join(irmd->irm_sanitize, NULL); pthread_cancel(irmd->shm_sanitize); -- cgit v1.2.3