diff options
author | dimitri staessens <[email protected]> | 2017-09-24 14:34:03 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-09-24 14:34:03 +0200 |
commit | ff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch) | |
tree | 17f66b04659a06c018494eb732adb661111d63f2 | |
parent | 7cef269be64f64b920763c6f2455931422c8bfe9 (diff) | |
download | ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip |
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the
detached thread could cause a data race on shutdown.
The threadpool manager is revised to allow multiple instances in a
single program.
The irmd and ipcp now store commands in a buffer (list) instead of a
single buffer before passing it to handler threads.
-rw-r--r-- | include/ouroboros/tpm.h | 23 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 99 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 9 | ||||
-rw-r--r-- | src/ipcpd/normal/dht.c | 339 | ||||
-rw-r--r-- | src/irmd/main.c | 104 | ||||
-rw-r--r-- | src/lib/tpm.c | 162 |
6 files changed, 417 insertions, 319 deletions
diff --git a/include/ouroboros/tpm.h b/include/ouroboros/tpm.h index dc57f485..74e45035 100644 --- a/include/ouroboros/tpm.h +++ b/include/ouroboros/tpm.h @@ -25,22 +25,25 @@ #include <stdbool.h> -int tpm_init(size_t min, - size_t inc, - void * (* func)(void *)); +struct tpm; -int tpm_start(void); +struct tpm * tpm_create(size_t min, + size_t inc, + void * (* func)(void *), + void * o); -void tpm_stop(void); +void tpm_destroy(struct tpm * tpm); -void tpm_fini(void); +int tpm_start(struct tpm * tpm); -bool tpm_check(void); +void tpm_stop(struct tpm * tpm); -void tpm_exit(void); +bool tpm_check(struct tpm * tpm); -void tpm_dec(void); +void tpm_exit(struct tpm * tpm); -void tpm_inc(void); +void tpm_dec(struct tpm * tpm); + +void tpm_inc(struct tpm * tpm); #endif /* OUROBOROS_LIB_TPM_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 85d543da..513c638a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -36,7 +36,6 @@ #include <ouroboros/dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> -#include <ouroboros/tpm.h> #include "ipcp.h" @@ -44,6 +43,14 @@ #include <sys/socket.h> #include <stdlib.h> +struct cmd { + struct list_head next; + + uint8_t cbuf[IPCP_MSG_BUF_SIZE]; + size_t len; + int fd; +}; + void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -107,7 +114,8 @@ static void * acceptloop(void * o) while (ipcp_get_state() != IPCP_SHUTDOWN && ipcp_get_state() != IPCP_NULL) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(ipcpi.sockfd, &fds); @@ -122,25 +130,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&ipcpi.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory"); + break; + } - assert(ipcpi.csockfd == -1); + pthread_mutex_lock(&ipcpi.cmd_lock); - count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&ipcpi.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - ipcpi.cmd_len = count; - ipcpi.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&ipcpi.cmd_cond); + list_add(&cmd->next, &ipcpi.cmds); - while (ipcpi.csockfd != -1) - pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock); + pthread_cond_signal(&ipcpi.cmd_cond); pthread_mutex_unlock(&ipcpi.cmd_lock); } @@ -159,13 +170,15 @@ static void * mainloop(void * o) struct timespec dl; struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; - (void) o; + + (void) o; while (true) { int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; + struct cmd * cmd; ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; @@ -174,33 +187,34 @@ static void * mainloop(void * o) pthread_mutex_lock(&ipcpi.cmd_lock); - while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, &ipcpi.cmd_lock, &dl); - sfd = ipcpi.csockfd; - ipcpi.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check()) + if (tpm_check(ipcpi.tpm)) break; continue; } - pthread_cond_signal(&ipcpi.acc_cond); + cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&ipcpi.cmd_lock); + + msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&ipcpi.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&ipcpi.cmd_lock); - - tpm_dec(); + tpm_dec(ipcpi.tpm); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -474,7 +488,7 @@ static void * mainloop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -482,7 +496,7 @@ static void * mainloop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } @@ -492,17 +506,17 @@ static void * mainloop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); continue; } free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(ipcpi.tpm); } - tpm_exit(); + tpm_exit(ipcpi.tpm); return (void *) 0; } @@ -617,20 +631,14 @@ int ipcp_init(int argc, goto fail_cmd_cond; } - if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) { - log_err("Failed to init convar."); - goto fail_acc_cond; - } + list_head_init(&ipcpi.cmds); ipcpi.alloc_id = -1; - ipcpi.csockfd = -1; pthread_condattr_destroy(&cattr); return 0; - fail_acc_cond: - pthread_cond_destroy(&ipcpi.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&ipcpi.cmd_lock); fail_cmd_lock: @@ -675,12 +683,14 @@ int ipcp_boot() sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) - goto fail_tpm_init; + ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS, + mainloop, NULL); + if (ipcpi.tpm == NULL) + goto fail_tpm_create; pthread_sigmask(SIG_BLOCK, &sigset, NULL); - if (tpm_start()) + if (tpm_start(ipcpi.tpm)) goto fail_tpm_start; ipcp_set_state(IPCP_INIT); @@ -696,18 +706,18 @@ int ipcp_boot() return 0; fail_acceptor: - tpm_stop(); + tpm_stop(ipcpi.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(ipcpi.tpm); + fail_tpm_create: return -1; } void ipcp_shutdown() { pthread_join(ipcpi.acceptor, NULL); - tpm_stop(); - tpm_fini(); + tpm_stop(ipcpi.tpm); + tpm_destroy(ipcpi.tpm); log_info("IPCP %d shutting down.", getpid()); } @@ -724,7 +734,6 @@ void ipcp_fini() pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); - pthread_cond_destroy(&ipcpi.acc_cond); pthread_cond_destroy(&ipcpi.cmd_cond); pthread_mutex_destroy(&ipcpi.cmd_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 1b2a0334..d47d224b 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -25,8 +25,10 @@ #include <ouroboros/hash.h> #include <ouroboros/ipcp.h> +#include <ouroboros/list.h> #include <ouroboros/qoscube.h> #include <ouroboros/sockets.h> +#include <ouroboros/tpm.h> #include <pthread.h> #include <time.h> @@ -92,10 +94,7 @@ struct ipcp { int sockfd; char * sock_path; - uint8_t cbuf[IPCP_MSG_BUF_SIZE]; - size_t cmd_len; - int csockfd; - pthread_cond_t acc_cond; + struct list_head cmds; pthread_cond_t cmd_cond; pthread_mutex_t cmd_lock; @@ -103,6 +102,8 @@ struct ipcp { pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; + struct tpm * tpm; + pthread_t acceptor; } ipcpi; diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 548ae03a..93fd4e4e 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -34,6 +34,7 @@ #include <ouroboros/notifier.h> #include <ouroboros/random.h> #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> #include <ouroboros/utils.h> #include "connmgr.h" @@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ #define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ #define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ +#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ enum dht_state { DHT_INIT = 0, @@ -187,6 +189,12 @@ struct bucket { struct bucket * children[1L << KAD_BETA]; }; +struct cmd { + struct list_head next; + + struct shm_du_buff * sdb; +}; + struct dht { size_t alpha; size_t b; @@ -212,6 +220,7 @@ struct dht { struct bmp * cookies; enum dht_state state; + struct list_head cmds; pthread_cond_t cond; pthread_mutex_t mtx; @@ -219,6 +228,8 @@ struct dht { int fd; + struct tpm * tpm; + pthread_t worker; }; @@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht, ipcp_sdb_release(sdb); #endif fail_msg: + pthread_rwlock_wrlock(&dht->lock); bmp_release(dht->cookies, msg->cookie); + pthread_rwlock_unlock(&dht->lock); fail_bmp_alloc: return -1; } @@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht, list_head_init(&l); - pthread_rwlock_rdlock(&dht->lock); + pthread_rwlock_wrlock(&dht->lock); len = dht_contact_list(dht, &l, key); if (len == 0) { @@ -1898,9 +1911,13 @@ static void * work(void * o) dht = (struct dht *) o; + pthread_rwlock_rdlock(&dht->lock); + intv = gcd(dht->t_expire, dht->t_repub); intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + pthread_rwlock_unlock(&dht->lock); + list_head_init(&reflist); while (true) { @@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - kad_publish(dht, key, >addr, t_expire); + kad_publish(dht, key, addr, t_expire); return 0; } @@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_sdu(void * o) { - struct dht * dht; - struct shm_du_buff * sdb; - kad_msg_t * msg; - kad_contact_msg_t ** cmsgs; - kad_msg_t resp_msg = KAD_MSG__INIT; - uint64_t addr; - buffer_t buf; - size_t i; - size_t b; - size_t t_expire; + struct dht * dht = (struct dht *) o; + struct timespec dl; + struct timespec to = {(HANDLE_TIMEO / 1000), + (HANDLE_TIMEO % 1000) * MILLION}; + assert(dht); - assert(o); + while (true) { + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + size_t b; + size_t t_expire; + struct cmd * cmd; + int ret = 0; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&dht->mtx); + + while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&dht->cond, + &dht->mtx, &dl); + + if (ret == -ETIMEDOUT) { + pthread_mutex_unlock(&dht->mtx); + if (tpm_check(dht->tpm)) + break; + continue; + } - memset(&buf, 0, sizeof(buf)); + cmd = list_last_entry(&dht->cmds, struct cmd, next); + list_del(&cmd->next); - dht = ((struct sdu_info *) o)->dht; - sdb = ((struct sdu_info *) o)->sdb; + pthread_mutex_unlock(&dht->mtx); - assert(dht); - assert(sdb); + i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); - msg = kad_msg__unpack(NULL, - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - shm_du_buff_head(sdb)); + msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb)); - ipcp_sdb_release(sdb); + ipcp_sdb_release(cmd->sdb); + free(cmd); - free(o); + if (msg == NULL) { + log_err("Failed to unpack message."); + continue; + } - if (msg == NULL) { - log_err("Failed to unpack message."); - return (void *) -1; - } + pthread_rwlock_rdlock(&dht->lock); - pthread_rwlock_rdlock(&dht->lock); + b = dht->b; + t_expire = dht->t_expire; - b = dht->b; - t_expire = dht->t_expire; + pthread_rwlock_unlock(&dht->lock); - pthread_rwlock_unlock(&dht->lock); + if (msg->has_key && msg->key.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + continue; + } - if (msg->has_key && msg->key.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad key in message."); - return (void *) -1; - } + if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", + msg->code); + continue; + } - if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) { - kad_msg__free_unpacked(msg, NULL); - log_warn("Bad source ID in message of type %d.", msg->code); - return (void *) -1; - } + if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { + kad_msg__free_unpacked(msg, NULL); + log_dbg("Got a request message when not running."); + continue; + } - if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) { - kad_msg__free_unpacked(msg, NULL); - log_dbg("Got a request message when not running."); - return (void *) -1; - } + tpm_dec(dht->tpm); - addr = msg->s_addr; + addr = msg->s_addr; - resp_msg.code = KAD_RESPONSE; - resp_msg.cookie = msg->cookie; + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; - switch(msg->code) { - case KAD_JOIN: - /* Refuse enrollee on check fails. */ - if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { - log_warn("Parameter mismatch. " - "DHT enrolment refused."); - break; - } + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } - if (msg->t_replicate != KAD_T_REPL) { - log_warn("Replication time mismatch. " - "DHT enrolment refused."); + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); - break; + break; } - if (msg->t_refresh != KAD_T_REFR) { - log_warn("Refresh time mismatch. " - "DHT enrolment refused."); + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = b; + resp_msg.k = KAD_K; + resp_msg.t_expire = t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; break; - } + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } - resp_msg.has_alpha = true; - resp_msg.has_b = true; - resp_msg.has_k = true; - resp_msg.has_t_expire = true; - resp_msg.has_t_refresh = true; - resp_msg.has_t_replicate = true; - resp_msg.alpha = KAD_ALPHA; - resp_msg.b = b; - resp_msg.k = KAD_K; - resp_msg.t_expire = t_expire; - resp_msg.t_refresh = KAD_T_REFR; - resp_msg.t_replicate = KAD_T_REPL; - break; - case KAD_FIND_VALUE: - buf = dht_retrieve(dht, msg->key.data); - if (buf.len != 0) { - resp_msg.n_addrs = buf.len; - resp_msg.addrs = (uint64_t *) buf.data; + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, + msg->t_expire); break; - } - /* FALLTHRU */ - case KAD_FIND_NODE: - /* Return k closest contacts. */ - resp_msg.n_contacts = - dht_get_contacts(dht, msg->key.data, &cmsgs); - resp_msg.contacts = cmsgs; - break; - case KAD_STORE: - if (msg->n_contacts < 1) { - log_warn("No contacts in store message."); + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); break; } - if (!msg->has_t_expire) { - log_warn("No expiry time in store message."); - break; + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); } - kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); - break; - case KAD_RESPONSE: - kad_handle_response(dht, msg); - break; - default: - assert(false); - break; - } + if (msg->code < KAD_STORE) { + if (send_msg(dht, &resp_msg, addr)) + log_warn("Failed to send response."); + } - if (msg->code != KAD_JOIN) { - pthread_rwlock_wrlock(&dht->lock); - if (dht_update_bucket(dht, msg->s_id.data, addr)) - log_warn("Failed to update bucket."); - pthread_rwlock_unlock(&dht->lock); - } + kad_msg__free_unpacked(msg, NULL); - if (msg->code < KAD_STORE) { - if (send_msg(dht, &resp_msg, addr)) - log_warn("Failed to send response."); - } + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); - kad_msg__free_unpacked(msg, NULL); + if (resp_msg.n_contacts == 0) { + tpm_inc(dht->tpm); + continue; + } - if (resp_msg.n_addrs > 0) - free(resp_msg.addrs); + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], + NULL); + free(resp_msg.contacts); - if (resp_msg.n_contacts == 0) - return (void *) -1; + tpm_inc(dht->tpm); + } - for (i = 0; i < resp_msg.n_contacts; ++i) - kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); - free(resp_msg.contacts); + tpm_exit(dht->tpm); return (void *) 0; } @@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o) static void dht_post_sdu(void * comp, struct shm_du_buff * sdb) { - pthread_t thr; - struct sdu_info * info; + struct cmd * cmd; + struct dht * dht = (struct dht *) comp; - info = malloc(sizeof(*info)); - if (info == NULL) + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Command failed. Out of memory."); return; + } - info->dht = (struct dht *) comp; - info->sdb = sdb; + cmd->sdb = sdb; - if (pthread_create(&thr, NULL, dht_handle_sdu, info)) { - free(info); - return; - } + pthread_mutex_lock(&dht->mtx); + + list_add(&cmd->next, &dht->cmds); - pthread_detach(thr); + pthread_cond_signal(&dht->cond); + + pthread_mutex_unlock(&dht->mtx); } void dht_destroy(struct dht * dht) @@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht) if (dht == NULL) return; +#ifndef __DHT_TEST__ + tpm_stop(dht->tpm); + + tpm_destroy(dht->tpm); +#endif if (dht_get_state(dht) == DHT_RUNNING) { dht_set_state(dht, DHT_SHUTDOWN); pthread_cancel(dht->worker); @@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr) list_head_init(&dht->requests); list_head_init(&dht->refs); list_head_init(&dht->lookups); + list_head_init(&dht->cmds); if (pthread_rwlock_init(&dht->lock, NULL)) goto fail_rwlock; @@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr) dht->addr = addr; dht->id = NULL; #ifndef __DHT_TEST__ + dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht); + if (dht->tpm == NULL) + goto fail_tpm_create; + + if (tpm_start(dht->tpm)) + goto fail_tpm_start; + dht->fd = dt_reg_ae(dht, &dht_post_sdu); notifier_reg(handle_event, dht); #else (void) handle_event; + (void) dht_handle_sdu; (void) dht_post_sdu; #endif dht->state = DHT_INIT; return dht; - +#ifndef __DHT_TEST__ + fail_tpm_start: + tpm_destroy(dht->tpm); + fail_tpm_create: + bmp_destroy(dht->cookies); +#endif fail_bmp: pthread_cond_destroy(&dht->cond); fail_cond: diff --git a/src/irmd/main.c b/src/irmd/main.c index 27c771a6..3fceadb6 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -93,6 +93,14 @@ enum irm_state { IRMD_RUNNING }; +struct cmd { + struct list_head next; + + uint8_t cbuf[IB_LEN]; + size_t len; + int fd; +}; + struct { struct list_head registry; /* registered names known */ @@ -112,16 +120,15 @@ struct { int sockfd; /* UNIX socket */ - uint8_t cbuf[IB_LEN]; /* cmd message buffer */ - size_t cmd_len; /* length of cmd in cbuf */ - int csockfd; /* cmd UNIX socket */ - pthread_cond_t acc_cond; /* cmd accepted condvar */ + struct list_head cmds; /* pending commands */ pthread_cond_t cmd_cond; /* cmd signal condvar */ pthread_mutex_t cmd_lock; /* cmd signal lock */ enum irm_state state; /* state of the irmd */ pthread_rwlock_t state_lock; /* lock for the entire irmd */ + struct tpm * tpm; /* thread pool manager */ + pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t shm_sanitize; /* keep track of rdrbuff use */ pthread_t acceptor; /* accept new commands */ @@ -1631,7 +1638,6 @@ static void irm_fini(void) lockfile_destroy(irmd.lf); pthread_mutex_destroy(&irmd.cmd_lock); - pthread_cond_destroy(&irmd.acc_cond); pthread_cond_destroy(&irmd.cmd_cond); pthread_rwlock_destroy(&irmd.reg_lock); pthread_rwlock_destroy(&irmd.state_lock); @@ -1864,7 +1870,8 @@ static void * acceptloop(void * o) (void) o; while (irmd_get_state() == IRMD_RUNNING) { - ssize_t count; + struct cmd * cmd; + #if defined(__FreeBSD__) || defined(__APPLE__) FD_ZERO(&fds); FD_SET(irmd.sockfd, &fds); @@ -1879,25 +1886,28 @@ static void * acceptloop(void * o) (void *) &tv, sizeof(tv))) log_warn("Failed to set timeout on socket."); - pthread_mutex_lock(&irmd.cmd_lock); + cmd = malloc(sizeof(*cmd)); + if (cmd == NULL) { + log_err("Out of memory."); + break; + } - assert(irmd.csockfd == -1); + pthread_mutex_lock(&irmd.cmd_lock); - count = read(csockfd, irmd.cbuf, IRM_MSG_BUF_SIZE); - if (count <= 0) { + cmd->len = read(csockfd, cmd->cbuf, IRM_MSG_BUF_SIZE); + if (cmd->len <= 0) { pthread_mutex_unlock(&irmd.cmd_lock); log_err("Failed to read from socket."); close(csockfd); + free(cmd); continue; } - irmd.cmd_len = count; - irmd.csockfd = csockfd; + cmd->fd = csockfd; - pthread_cond_signal(&irmd.cmd_cond); + list_add(&cmd->next, &irmd.cmds); - while(irmd.csockfd != -1) - pthread_cond_wait(&irmd.acc_cond, &irmd.cmd_lock); + pthread_cond_signal(&irmd.cmd_cond); pthread_mutex_unlock(&irmd.cmd_lock); } @@ -1923,6 +1933,7 @@ void * mainloop(void * o) pid_t * apis = NULL; struct timespec * timeo = NULL; struct timespec ts = {0, 0}; + struct cmd * cmd; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; @@ -1931,33 +1942,34 @@ void * mainloop(void * o) pthread_mutex_lock(&irmd.cmd_lock); - while (irmd.csockfd == -1 && ret != -ETIMEDOUT) + while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&irmd.cmd_cond, &irmd.cmd_lock, &dl); - sfd = irmd.csockfd; - irmd.csockfd = -1; - - if (sfd == -1) { + if (ret == -ETIMEDOUT) { pthread_mutex_unlock(&irmd.cmd_lock); - if (tpm_check()) + if (tpm_check(irmd.tpm)) break; continue; } - pthread_cond_signal(&irmd.acc_cond); + cmd = list_last_entry(&irmd.cmds, struct cmd, next); + list_del(&cmd->next); + + pthread_mutex_unlock(&irmd.cmd_lock); + + msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf); + sfd = cmd->fd; + + free(cmd); - msg = irm_msg__unpack(NULL, irmd.cmd_len, irmd.cbuf); if (msg == NULL) { - pthread_mutex_unlock(&irmd.cmd_lock); close(sfd); continue; } - pthread_mutex_unlock(&irmd.cmd_lock); - - tpm_dec(); + tpm_dec(irmd.tpm); if (msg->has_timeo_sec) { assert(msg->has_timeo_nsec); @@ -2098,7 +2110,7 @@ void * mainloop(void * o) if (ret_msg.result == -EPIPE || !ret_msg.has_result) { close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2108,7 +2120,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2117,7 +2129,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); continue; } @@ -2133,10 +2145,10 @@ void * mainloop(void * o) free(buffer.data); close(sfd); - tpm_inc(); + tpm_inc(irmd.tpm); } - tpm_exit(); + tpm_exit(irmd.tpm); return (void *) 0; } @@ -2184,12 +2196,6 @@ static int irm_init(void) goto fail_cmd_cond; } - if (pthread_cond_init(&irmd.acc_cond, &cattr)) { - log_err("Failed to initialize condvar."); - pthread_condattr_destroy(&cattr); - goto fail_acc_cond; - } - pthread_condattr_destroy(&cattr); list_head_init(&irmd.ipcps); @@ -2198,6 +2204,7 @@ static int irm_init(void) list_head_init(&irmd.spawned_apis); list_head_init(&irmd.registry); list_head_init(&irmd.irm_flows); + list_head_init(&irmd.cmds); irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0); if (irmd.port_ids == NULL) { @@ -2272,7 +2279,6 @@ static int irm_init(void) gcry_control(GCRYCTL_INITIALIZATION_FINISHED); #endif - irmd.csockfd = -1; irmd.state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2294,8 +2300,6 @@ static int irm_init(void) fail_lockfile: bmp_destroy(irmd.port_ids); fail_port_ids: - pthread_cond_destroy(&irmd.acc_cond); - fail_acc_cond: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); @@ -2367,12 +2371,14 @@ int main(int argc, if (irm_init() < 0) goto fail_irm_init; - if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { + irmd.tpm = tpm_create(IRMD_MIN_THREADS, IRMD_ADD_THREADS, + mainloop, NULL); + if (irmd.tpm == NULL) { irmd_set_state(IRMD_NULL); - goto fail_tpm_init; + goto fail_tpm_create; } - if (tpm_start()) { + if (tpm_start(irmd.tpm)) { irmd_set_state(IRMD_NULL); goto fail_tpm_start; } @@ -2396,9 +2402,9 @@ int main(int argc, pthread_join(irmd.irm_sanitize, NULL); pthread_join(irmd.shm_sanitize, NULL); - tpm_stop(); + tpm_stop(irmd.tpm); - tpm_fini(); + tpm_destroy(irmd.tpm); pthread_sigmask(SIG_BLOCK, &sigset, NULL); @@ -2417,10 +2423,10 @@ int main(int argc, fail_shm_sanitize: pthread_join(irmd.irm_sanitize, NULL); fail_irm_sanitize: - tpm_stop(); + tpm_stop(irmd.tpm); fail_tpm_start: - tpm_fini(); - fail_tpm_init: + tpm_destroy(irmd.tpm); + fail_tpm_create: irm_fini(); fail_irm_init: log_fini(); diff --git a/src/lib/tpm.c b/src/lib/tpm.c index dd71d276..c883e0a8 100644 --- a/src/lib/tpm.c +++ b/src/lib/tpm.c @@ -50,38 +50,38 @@ enum tpm_state { TPM_RUNNING }; -struct { +struct tpm { size_t min; size_t inc; size_t cur; size_t wrk; void * (* func)(void *); + void * o; struct list_head pool; enum tpm_state state; - pthread_cond_t cond; pthread_mutex_t lock; pthread_t mgr; -} tpm; +}; -static void tpm_join(void) +static void tpm_join(struct tpm * tpm) { struct list_head * p; struct list_head * h; - list_for_each_safe(p, h, &tpm.pool) { + list_for_each_safe(p, h, &tpm->pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); - if (tpm.state != TPM_RUNNING) { + if (tpm->state != TPM_RUNNING) { if (!e->kill) { e->kill = true; - --tpm.cur; + --tpm->cur; } while (!e->join) - pthread_cond_wait(&tpm.cond, &tpm.lock); + pthread_cond_wait(&tpm->cond, &tpm->lock); } if (e->join) { @@ -92,12 +92,13 @@ static void tpm_join(void) } } -static struct pthr_el * tpm_pthr_el(pthread_t thr) +static struct pthr_el * tpm_pthr_el(struct tpm * tpm, + pthread_t thr) { struct list_head * p; struct pthr_el * e; - list_for_each(p, &tpm.pool) { + list_for_each(p, &tpm->pool) { e = list_entry(p, struct pthr_el, next); if (e->thr == thr) return e; @@ -109,15 +110,15 @@ static struct pthr_el * tpm_pthr_el(pthread_t thr) return NULL; } -static void tpm_kill(void) +static void tpm_kill(struct tpm * tpm) { struct list_head * p; - list_for_each(p, &tpm.pool) { + list_for_each(p, &tpm->pool) { struct pthr_el * e = list_entry(p, struct pthr_el, next); if (!e->kill) { e->kill = true; - --tpm.cur; + --tpm->cur; return; } } @@ -128,25 +129,25 @@ static void * tpmgr(void * o) struct timespec dl; struct timespec to = {(TPM_TIMEOUT / 1000), (TPM_TIMEOUT % 1000) * MILLION}; - (void) o; + struct tpm * tpm = (struct tpm *) o; while (true) { clock_gettime(PTHREAD_COND_CLOCK, &dl); ts_add(&dl, &to, &dl); - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - if (tpm.state != TPM_RUNNING) { - tpm_join(); - pthread_mutex_unlock(&tpm.lock); + if (tpm->state != TPM_RUNNING) { + tpm_join(tpm); + pthread_mutex_unlock(&tpm->lock); break; } - tpm_join(); + tpm_join(tpm); - if (tpm.cur - tpm.wrk < tpm.min) { + if (tpm->cur - tpm->wrk < tpm->min) { size_t i; - for (i = 0; i < tpm.inc; ++i) { + for (i = 0; i < tpm->inc; ++i) { struct pthr_el * e = malloc(sizeof(*e)); if (e == NULL) break; @@ -155,35 +156,41 @@ static void * tpmgr(void * o) e->kill = false; if (pthread_create(&e->thr, NULL, - tpm.func, NULL)) { + tpm->func, tpm->o)) { free(e); break; } - list_add(&e->next, &tpm.pool); + list_add(&e->next, &tpm->pool); } - tpm.cur += i; + tpm->cur += i; } - if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) + if (pthread_cond_timedwait(&tpm->cond, &tpm->lock, &dl) == ETIMEDOUT) - if (tpm.cur > tpm.min) - tpm_kill(); + if (tpm->cur > tpm->min) + tpm_kill(tpm); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } return (void *) 0; } -int tpm_init(size_t min, - size_t inc, - void * (* func)(void *)) +struct tpm * tpm_create(size_t min, + size_t inc, + void * (* func)(void *), + void * o) { + struct tpm * tpm; pthread_condattr_t cattr; - if (pthread_mutex_init(&tpm.lock, NULL)) + tpm = malloc(sizeof(*tpm)); + if (tpm == NULL) + goto fail_malloc; + + if (pthread_mutex_init(&tpm->lock, NULL)) goto fail_lock; if (pthread_condattr_init(&cattr)) @@ -192,103 +199,108 @@ int tpm_init(size_t min, #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_cond_init(&tpm.cond, &cattr)) + if (pthread_cond_init(&tpm->cond, &cattr)) goto fail_cond; - list_head_init(&tpm.pool); + list_head_init(&tpm->pool); pthread_condattr_destroy(&cattr); - tpm.state = TPM_INIT; - tpm.func = func; - tpm.min = min; - tpm.inc = inc; - tpm.cur = 0; - tpm.wrk = 0; + tpm->state = TPM_INIT; + tpm->func = func; + tpm->o = o; + tpm->min = min; + tpm->inc = inc; + tpm->cur = 0; + tpm->wrk = 0; - return 0; + return tpm; fail_cond: pthread_condattr_destroy(&cattr); fail_cattr: - pthread_mutex_destroy(&tpm.lock); + pthread_mutex_destroy(&tpm->lock); fail_lock: - return -1; + free(tpm); + fail_malloc: + return NULL; } -int tpm_start(void) +int tpm_start(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { - pthread_mutex_unlock(&tpm.lock); + if (pthread_create(&tpm->mgr, NULL, tpmgr, tpm)) { + pthread_mutex_unlock(&tpm->lock); return -1; } - tpm.state = TPM_RUNNING; + tpm->state = TPM_RUNNING; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); return 0; } -void tpm_stop(void) +void tpm_stop(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - tpm.state = TPM_NULL; + tpm->state = TPM_NULL; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_fini(void) +void tpm_destroy(struct tpm * tpm) { - pthread_join(tpm.mgr, NULL); + pthread_join(tpm->mgr, NULL); + + pthread_mutex_destroy(&tpm->lock); + pthread_cond_destroy(&tpm->cond); - pthread_mutex_destroy(&tpm.lock); - pthread_cond_destroy(&tpm.cond); + free(tpm); } -bool tpm_check(void) +bool tpm_check(struct tpm * tpm) { bool ret; - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - ret = tpm_pthr_el(pthread_self())->kill; + ret = tpm_pthr_el(tpm, pthread_self())->kill; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); return ret; } -void tpm_inc(void) +void tpm_inc(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - --tpm.wrk; + --tpm->wrk; - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_dec(void) +void tpm_dec(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - ++tpm.wrk; + ++tpm->wrk; - pthread_cond_signal(&tpm.cond); + pthread_cond_signal(&tpm->cond); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } -void tpm_exit(void) +void tpm_exit(struct tpm * tpm) { - pthread_mutex_lock(&tpm.lock); + pthread_mutex_lock(&tpm->lock); - tpm_pthr_el(pthread_self())->join = true; + tpm_pthr_el(tpm, pthread_self())->join = true; - pthread_cond_signal(&tpm.cond); + pthread_cond_signal(&tpm->cond); - pthread_mutex_unlock(&tpm.lock); + pthread_mutex_unlock(&tpm->lock); } |