summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-04-01 12:31:05 +0000
committerSander Vrijders <[email protected]>2017-04-01 12:31:05 +0000
commit2725780520d0e2c6a2c49ac8e2124b5088cbe1bb (patch)
tree833a7f4133fb3f3f8d746343cff6fb9fc40f7829 /src
parentd9f3619d791fef7d79127556304a4aa4f1cda50a (diff)
parentc72634b5d921bc06d8e06afb2a60a05a1acb7ee2 (diff)
downloadouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.tar.gz
ouroboros-2725780520d0e2c6a2c49ac8e2124b5088cbe1bb.zip
Merged in dstaesse/ouroboros/be-irmd-threadpool (pull request #448)
Be irmd threadpool
Diffstat (limited to 'src')
-rw-r--r--src/irmd/main.c227
-rw-r--r--src/lib/bitmap.c61
-rw-r--r--src/lib/tests/bitmap_test.c35
3 files changed, 261 insertions, 62 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 39f44c44..966be500 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -75,31 +75,37 @@ enum irm_state {
};
struct irm {
- struct list_head registry;
+ struct list_head registry; /* registered names known */
- struct list_head ipcps;
+ struct list_head ipcps; /* list of ipcps in system */
- struct list_head api_table;
- struct list_head apn_table;
- struct list_head spawned_apis;
- pthread_rwlock_t reg_lock;
+ struct list_head api_table; /* ap instances */
+ struct list_head apn_table; /* ap names known */
+ struct list_head spawned_apis; /* child ap instances */
+ pthread_rwlock_t reg_lock; /* lock for registration info */
- /* keep track of all flows in this processing system */
- struct bmp * port_ids;
- /* maps port_ids to api pair */
- struct list_head irm_flows;
- pthread_rwlock_t flows_lock;
+ struct bmp * port_ids; /* port_ids for flows */
+ struct list_head irm_flows; /* flow information */
+ pthread_rwlock_t flows_lock; /* lock for flows */
- struct lockfile * lf;
- struct shm_rdrbuff * rdrb;
- pthread_t * threadpool;
- int sockfd;
+ struct lockfile * lf; /* single irmd per system */
+ struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */
+ int sockfd; /* UNIX socket */
- enum irm_state state;
- pthread_rwlock_t state_lock;
+ pthread_t * threadpool; /* pool of mainloop threads */
- pthread_t irm_sanitize;
- pthread_t shm_sanitize;
+ struct bmp * thread_ids; /* ids for mainloop threads */
+ size_t max_threads; /* max threads set by tpm */
+ size_t threads; /* available mainloop threads */
+ pthread_cond_t threads_cond; /* signal thread entry/exit */
+ pthread_mutex_t threads_lock; /* mutex for threads/condvar */
+
+ enum irm_state state; /* state of the irmd */
+ pthread_rwlock_t state_lock; /* lock for the entire irmd */
+
+ pthread_t tpm; /* threadpool manager */
+ pthread_t irm_sanitize; /* clean up irmd resources */
+ pthread_t shm_sanitize; /* keep track of rdrbuff use */
} * irmd;
static void clear_irm_flow(struct irm_flow * f) {
@@ -1449,6 +1455,13 @@ static void irm_destroy(void)
if (irmd->state != IRMD_NULL)
log_warn("Unsafe destroy.");
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->thread_ids != NULL)
+ bmp_destroy(irmd->thread_ids);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
if (irmd->threadpool != NULL)
free(irmd->threadpool);
@@ -1724,11 +1737,55 @@ void * irm_sanitize(void * o)
}
}
+static void thread_inc(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ++irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static void thread_dec(void)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
+static bool thread_check(void)
+{
+ int ret;
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = irmd->threads > irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+static void thread_exit(ssize_t id)
+{
+ pthread_mutex_lock(&irmd->threads_lock);
+ bmp_release(irmd->thread_ids, id);
+
+ --irmd->threads;
+ pthread_cond_signal(&irmd->threads_cond);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+}
+
void * mainloop(void * o)
{
uint8_t buf[IRM_MSG_BUF_SIZE];
- (void) o;
+ ssize_t id = (ssize_t) o;
while (true) {
#ifdef __FreeBSD__
@@ -1747,10 +1804,13 @@ void * mainloop(void * o)
(SOCKET_TIMEOUT % 1000) * 1000};
pthread_rwlock_rdlock(&irmd->state_lock);
- if (irmd->state != IRMD_RUNNING) {
+
+ if (irmd->state != IRMD_RUNNING || thread_check()) {
+ thread_exit(id);
pthread_rwlock_unlock(&irmd->state_lock);
break;
}
+
pthread_rwlock_unlock(&irmd->state_lock);
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
@@ -1760,6 +1820,7 @@ void * mainloop(void * o)
if (select(irmd->sockfd, &fds, NULL, NULL, &timeout) <= 0)
continue;
#endif
+
cli_sockfd = accept(irmd->sockfd, 0, 0);
if (cli_sockfd < 0)
continue;
@@ -1781,6 +1842,8 @@ void * mainloop(void * o)
continue;
}
+ thread_dec();
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -1909,6 +1972,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1917,6 +1981,7 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
close(cli_sockfd);
+ thread_inc();
continue;
}
@@ -1930,6 +1995,82 @@ void * mainloop(void * o)
free(buffer.data);
close(cli_sockfd);
+
+ thread_inc();
+ }
+
+ return (void *) 0;
+}
+
+static bool is_thread_alive(ssize_t id)
+{
+ bool ret;
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ ret = bmp_is_id_used(irmd->thread_ids, id);
+
+ pthread_mutex_unlock(&irmd->threads_lock);
+
+ return ret;
+}
+
+void * threadpoolmgr(void * o)
+{
+ struct timespec to = {(IRMD_TPM_TIMEOUT / 1000),
+ (IRMD_TPM_TIMEOUT % 1000) * MILLION};
+ struct timespec dl;
+ size_t t;
+
+ (void) o;
+
+ while (true) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ if (irmd->state != IRMD_RUNNING) {
+ pthread_rwlock_unlock(&irmd->state_lock);
+ log_dbg("Threadpool manager exiting.");
+ for (t = 0; t < IRMD_MAX_THREADS; ++t)
+ if (is_thread_alive(t)) {
+ log_dbg("Waiting for thread %zd.", t);
+ pthread_join(irmd->threadpool[t], NULL);
+ }
+
+ log_dbg("Threadpool manager done.");
+ break;
+ }
+
+ pthread_rwlock_unlock(&irmd->state_lock);
+
+ pthread_mutex_lock(&irmd->threads_lock);
+
+ if (irmd->threads < IRMD_MIN_AV_THREADS) {
+ log_dbg("Increasing threadpool.");
+ irmd->max_threads = IRMD_MAX_AV_THREADS;
+
+ while (irmd->threads < irmd->max_threads) {
+ ssize_t id = bmp_allocate(irmd->thread_ids);
+ if (!bmp_is_id_valid(irmd->thread_ids, id)) {
+ log_warn("IRMd threadpool exhausted.");
+ break;
+ }
+
+ if (pthread_create(&irmd->threadpool[id],
+ NULL, mainloop, (void *) id))
+ log_warn("Failed to start new thread.");
+ else
+ ++irmd->threads;
+ }
+ }
+
+ if (pthread_cond_timedwait(&irmd->threads_cond,
+ &irmd->threads_lock,
+ &dl) == ETIMEDOUT)
+ if (irmd->threads > IRMD_MIN_AV_THREADS)
+ --irmd->max_threads;
+
+ pthread_mutex_unlock(&irmd->threads_lock);
}
return (void *) 0;
@@ -1938,6 +2079,7 @@ void * mainloop(void * o)
static int irm_create(void)
{
struct stat st;
+ pthread_condattr_t cattr;
struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000),
(IRMD_ACCEPT_TIMEOUT % 1000) * 1000};
@@ -1967,6 +2109,27 @@ static int irm_create(void)
return -1;
}
+ if (pthread_mutex_init(&irmd->threads_lock, NULL)) {
+ log_err("Failed to initialize mutex.");
+ free(irmd);
+ return -1;
+ }
+
+ if (pthread_condattr_init(&cattr)) {
+ log_err("Failed to initialize condattr.");
+ free(irmd);
+ return -1;
+ }
+
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&irmd->threads_cond, &cattr)) {
+ log_err("Failed to initialize cond.");
+ free(irmd);
+ return -1;
+ }
+
list_head_init(&irmd->ipcps);
list_head_init(&irmd->api_table);
list_head_init(&irmd->apn_table);
@@ -1980,7 +2143,13 @@ static int irm_create(void)
return -ENOMEM;
}
- irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ irmd->thread_ids = bmp_create(IRMD_MAX_THREADS, 0);
+ if (irmd->thread_ids == NULL) {
+ irm_destroy();
+ return -ENOMEM;
+ }
+
+ irmd->threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS);
if (irmd->threadpool == NULL) {
irm_destroy();
return -ENOMEM;
@@ -2045,7 +2214,9 @@ static int irm_create(void)
return -1;
}
- irmd->state = IRMD_RUNNING;
+ irmd->threads = 0;
+ irmd->max_threads = IRMD_MIN_AV_THREADS;
+ irmd->state = IRMD_RUNNING;
log_info("Ouroboros IPC Resource Manager daemon started...");
@@ -2063,8 +2234,6 @@ int main(int argc,
{
struct sigaction sig_act;
- int t = 0;
-
bool use_stdout = false;
if (geteuid() != 0) {
@@ -2108,16 +2277,12 @@ int main(int argc,
exit(EXIT_FAILURE);
}
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_create(&irmd->threadpool[t], NULL, mainloop, NULL);
+ pthread_create(&irmd->tpm, NULL, threadpoolmgr, NULL);
+ pthread_join(irmd->tpm, NULL);
pthread_create(&irmd->irm_sanitize, NULL, irm_sanitize, NULL);
pthread_create(&irmd->shm_sanitize, NULL, shm_sanitize, irmd->rdrb);
- /* Wait for (all of them) to return. */
- for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
- pthread_join(irmd->threadpool[t], NULL);
-
pthread_join(irmd->irm_sanitize, NULL);
pthread_cancel(irmd->shm_sanitize);
diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c
index 93ffda77..bf9bb99d 100644
--- a/src/lib/bitmap.c
+++ b/src/lib/bitmap.c
@@ -38,7 +38,8 @@
#define BITS_TO_LONGS(nr) \
DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(size_t))
-static size_t find_next_zero_bit(const size_t * addr, size_t nbits)
+static size_t find_next_zero_bit(const size_t * addr,
+ size_t nbits)
{
size_t tmp;
size_t start = 0;
@@ -65,13 +66,15 @@ static size_t find_next_zero_bit(const size_t * addr, size_t nbits)
return (start * BITS_PER_LONG) + pos;
}
-static void bitmap_zero(size_t * dst, size_t nbits)
+static void bitmap_zero(size_t * dst,
+ size_t nbits)
{
size_t len = BITS_TO_LONGS(nbits) * sizeof(size_t);
memset(dst, 0, len);
}
-static void bitmap_clear(size_t * map, size_t start)
+static void bitmap_clear(size_t * map,
+ size_t start)
{
size_t * p = map + BIT_WORD(start);
size_t mask = ~(1UL << (start % (BITS_PER_LONG)));
@@ -79,7 +82,8 @@ static void bitmap_clear(size_t * map, size_t start)
*p &= mask;
}
-static void bitmap_set(size_t * map, size_t start)
+static void bitmap_set(size_t * map,
+ size_t start)
{
size_t * p = map + BIT_WORD(start);
size_t mask = 1UL << (start % (BITS_PER_LONG));
@@ -94,7 +98,8 @@ struct bmp {
size_t * bitmap;
};
-struct bmp * bmp_create(size_t bits, ssize_t offset)
+struct bmp * bmp_create(size_t bits,
+ ssize_t offset)
{
struct bmp * tmp;
@@ -118,20 +123,15 @@ struct bmp * bmp_create(size_t bits, ssize_t offset)
return tmp;
}
-int bmp_destroy(struct bmp * b)
+void bmp_destroy(struct bmp * b)
{
if (b == NULL)
- return -1;
+ return;
- if (b->bitmap == NULL) {
- free(b);
- return -1;
- }
+ if (b->bitmap != NULL)
+ free(b->bitmap);
- free(b->bitmap);
free(b);
-
- return 0;
}
static ssize_t bad_id(struct bmp * b)
@@ -158,7 +158,8 @@ ssize_t bmp_allocate(struct bmp * b)
return id + b->offset;
}
-static bool is_id_valid(struct bmp * b, ssize_t id)
+static bool is_id_valid(struct bmp * b,
+ ssize_t id)
{
assert(b);
@@ -168,7 +169,17 @@ static bool is_id_valid(struct bmp * b, ssize_t id)
return true;
}
-bool bmp_is_id_valid(struct bmp * b, ssize_t id)
+static bool is_id_used(size_t * map,
+ size_t start)
+{
+ size_t * p = map + BIT_WORD(start);
+ size_t mask = 1UL << (start % (BITS_PER_LONG));
+
+ return (*p & mask) != 0;
+}
+
+bool bmp_is_id_valid(struct bmp * b,
+ ssize_t id)
{
if (b == NULL)
return false;
@@ -176,19 +187,25 @@ bool bmp_is_id_valid(struct bmp * b, ssize_t id)
return is_id_valid(b, id);
}
-int bmp_release(struct bmp * b, ssize_t id)
+int bmp_release(struct bmp * b,
+ ssize_t id)
{
- size_t rid;
-
if (b == NULL)
return -1;
if (!is_id_valid(b, id))
return -1;
- rid = id - b->offset;
-
- bitmap_clear(b->bitmap, rid);
+ bitmap_clear(b->bitmap, id - b->offset);
return 0;
}
+
+bool bmp_is_id_used(struct bmp * b,
+ ssize_t id)
+{
+ if (b == NULL)
+ return false;
+
+ return is_id_used(b->bitmap, id - b->offset);
+}
diff --git a/src/lib/tests/bitmap_test.c b/src/lib/tests/bitmap_test.c
index 7480600e..e438f217 100644
--- a/src/lib/tests/bitmap_test.c
+++ b/src/lib/tests/bitmap_test.c
@@ -23,6 +23,7 @@
#include "bitmap.c"
#include <time.h>
#include <stdlib.h>
+#include <stdio.h>
#define BITMAP_SIZE 200
@@ -41,40 +42,56 @@ int bitmap_test(int argc, char ** argv)
srand(time(NULL));
bmp = bmp_create(bits, offset);
- if (bmp == NULL)
+ if (bmp == NULL) {
+ printf("Failed to create bmp.\n");
return -1;
+ }
- if (bmp_destroy(bmp))
- return -1;
+ bmp_destroy(bmp);
bmp = bmp_create(bits, offset);
- if (bmp == NULL)
+ if (bmp == NULL) {
+ printf("Failed to re-create bmp.\n");
return -1;
+ }
for (i = offset; i < BITMAP_SIZE + 5 + offset; i++) {
id = bmp_allocate(bmp);
if (!bmp_is_id_valid(bmp, id))
continue;
- if (id != i)
+ if (!bmp_is_id_used(bmp, id)) {
+ printf("ID not marked in use.\n");
+ bmp_destroy(bmp);
return -1;
+ }
+
+ if (id != i) {
+ printf("Wrong ID returned.\n");
+ bmp_destroy(bmp);
+ return -1;
+ }
}
for (i = 0; i < BITMAP_SIZE + 5; i++) {
r = (ssize_t) (rand() % BITMAP_SIZE) + offset;
- if (bmp_release(bmp, r))
+ if (bmp_release(bmp, r)) {
+ printf("Failed to release ID.\n");
return -1;
+ }
id = bmp_allocate(bmp);
if (!bmp_is_id_valid(bmp, id))
continue;
- if (id != r)
+ if (id != r) {
+ printf("Wrong prev ID returned.\n");
+ bmp_destroy(bmp);
return -1;
+ }
}
- if (bmp_destroy(bmp))
- return -1;
+ bmp_destroy(bmp);
return 0;
}