summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-09-24 14:34:03 +0200
committerdimitri staessens <[email protected]>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd/normal
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/dht.c339
1 files changed, 203 insertions, 136 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 548ae03a..93fd4e4e 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -34,6 +34,7 @@
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
#include "connmgr.h"
@@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
+#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */
enum dht_state {
DHT_INIT = 0,
@@ -187,6 +189,12 @@ struct bucket {
struct bucket * children[1L << KAD_BETA];
};
+struct cmd {
+ struct list_head next;
+
+ struct shm_du_buff * sdb;
+};
+
struct dht {
size_t alpha;
size_t b;
@@ -212,6 +220,7 @@ struct dht {
struct bmp * cookies;
enum dht_state state;
+ struct list_head cmds;
pthread_cond_t cond;
pthread_mutex_t mtx;
@@ -219,6 +228,8 @@ struct dht {
int fd;
+ struct tpm * tpm;
+
pthread_t worker;
};
@@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,
ipcp_sdb_release(sdb);
#endif
fail_msg:
+ pthread_rwlock_wrlock(&dht->lock);
bmp_release(dht->cookies, msg->cookie);
+ pthread_rwlock_unlock(&dht->lock);
fail_bmp_alloc:
return -1;
}
@@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
list_head_init(&l);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht->lock);
len = dht_contact_list(dht, &l, key);
if (len == 0) {
@@ -1898,9 +1911,13 @@ static void * work(void * o)
dht = (struct dht *) o;
+ pthread_rwlock_rdlock(&dht->lock);
+
intv = gcd(dht->t_expire, dht->t_repub);
intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+ pthread_rwlock_unlock(&dht->lock);
+
list_head_init(&reflist);
while (true) {
@@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, >addr, t_expire);
+ kad_publish(dht, key, addr, t_expire);
return 0;
}
@@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht,
static void * dht_handle_sdu(void * o)
{
- struct dht * dht;
- struct shm_du_buff * sdb;
- kad_msg_t * msg;
- kad_contact_msg_t ** cmsgs;
- kad_msg_t resp_msg = KAD_MSG__INIT;
- uint64_t addr;
- buffer_t buf;
- size_t i;
- size_t b;
- size_t t_expire;
+ struct dht * dht = (struct dht *) o;
+ struct timespec dl;
+ struct timespec to = {(HANDLE_TIMEO / 1000),
+ (HANDLE_TIMEO % 1000) * MILLION};
+ assert(dht);
- assert(o);
+ while (true) {
+ kad_msg_t * msg;
+ kad_contact_msg_t ** cmsgs;
+ kad_msg_t resp_msg = KAD_MSG__INIT;
+ uint64_t addr;
+ buffer_t buf;
+ size_t i;
+ size_t b;
+ size_t t_expire;
+ struct cmd * cmd;
+ int ret = 0;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_mutex_lock(&dht->mtx);
+
+ while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&dht->cond,
+ &dht->mtx, &dl);
+
+ if (ret == -ETIMEDOUT) {
+ pthread_mutex_unlock(&dht->mtx);
+ if (tpm_check(dht->tpm))
+ break;
+ continue;
+ }
- memset(&buf, 0, sizeof(buf));
+ cmd = list_last_entry(&dht->cmds, struct cmd, next);
+ list_del(&cmd->next);
- dht = ((struct sdu_info *) o)->dht;
- sdb = ((struct sdu_info *) o)->sdb;
+ pthread_mutex_unlock(&dht->mtx);
- assert(dht);
- assert(sdb);
+ i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
- msg = kad_msg__unpack(NULL,
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- shm_du_buff_head(sdb));
+ msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
- ipcp_sdb_release(sdb);
+ ipcp_sdb_release(cmd->sdb);
+ free(cmd);
- free(o);
+ if (msg == NULL) {
+ log_err("Failed to unpack message.");
+ continue;
+ }
- if (msg == NULL) {
- log_err("Failed to unpack message.");
- return (void *) -1;
- }
+ pthread_rwlock_rdlock(&dht->lock);
- pthread_rwlock_rdlock(&dht->lock);
+ b = dht->b;
+ t_expire = dht->t_expire;
- b = dht->b;
- t_expire = dht->t_expire;
+ pthread_rwlock_unlock(&dht->lock);
- pthread_rwlock_unlock(&dht->lock);
+ if (msg->has_key && msg->key.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad key in message.");
+ continue;
+ }
- if (msg->has_key && msg->key.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad key in message.");
- return (void *) -1;
- }
+ if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad source ID in message of type %d.",
+ msg->code);
+ continue;
+ }
- if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad source ID in message of type %d.", msg->code);
- return (void *) -1;
- }
+ if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_dbg("Got a request message when not running.");
+ continue;
+ }
- if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- kad_msg__free_unpacked(msg, NULL);
- log_dbg("Got a request message when not running.");
- return (void *) -1;
- }
+ tpm_dec(dht->tpm);
- addr = msg->s_addr;
+ addr = msg->s_addr;
- resp_msg.code = KAD_RESPONSE;
- resp_msg.cookie = msg->cookie;
+ resp_msg.code = KAD_RESPONSE;
+ resp_msg.cookie = msg->cookie;
- switch(msg->code) {
- case KAD_JOIN:
- /* Refuse enrollee on check fails. */
- if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
- log_warn("Parameter mismatch. "
- "DHT enrolment refused.");
- break;
- }
+ switch(msg->code) {
+ case KAD_JOIN:
+ /* Refuse enrollee on check fails. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
+ log_warn("Parameter mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
- if (msg->t_replicate != KAD_T_REPL) {
- log_warn("Replication time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_replicate != KAD_T_REPL) {
+ log_warn("Replication time mismatch. "
+ "DHT enrolment refused.");
- break;
+ break;
}
- if (msg->t_refresh != KAD_T_REFR) {
- log_warn("Refresh time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_refresh != KAD_T_REFR) {
+ log_warn("Refresh time mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ resp_msg.has_alpha = true;
+ resp_msg.has_b = true;
+ resp_msg.has_k = true;
+ resp_msg.has_t_expire = true;
+ resp_msg.has_t_refresh = true;
+ resp_msg.has_t_replicate = true;
+ resp_msg.alpha = KAD_ALPHA;
+ resp_msg.b = b;
+ resp_msg.k = KAD_K;
+ resp_msg.t_expire = t_expire;
+ resp_msg.t_refresh = KAD_T_REFR;
+ resp_msg.t_replicate = KAD_T_REPL;
break;
- }
+ case KAD_FIND_VALUE:
+ buf = dht_retrieve(dht, msg->key.data);
+ if (buf.len != 0) {
+ resp_msg.n_addrs = buf.len;
+ resp_msg.addrs = (uint64_t *) buf.data;
+ break;
+ }
+ /* FALLTHRU */
+ case KAD_FIND_NODE:
+ /* Return k closest contacts. */
+ resp_msg.n_contacts =
+ dht_get_contacts(dht, msg->key.data, &cmsgs);
+ resp_msg.contacts = cmsgs;
+ break;
+ case KAD_STORE:
+ if (msg->n_contacts < 1) {
+ log_warn("No contacts in store message.");
+ break;
+ }
- resp_msg.has_alpha = true;
- resp_msg.has_b = true;
- resp_msg.has_k = true;
- resp_msg.has_t_expire = true;
- resp_msg.has_t_refresh = true;
- resp_msg.has_t_replicate = true;
- resp_msg.alpha = KAD_ALPHA;
- resp_msg.b = b;
- resp_msg.k = KAD_K;
- resp_msg.t_expire = t_expire;
- resp_msg.t_refresh = KAD_T_REFR;
- resp_msg.t_replicate = KAD_T_REPL;
- break;
- case KAD_FIND_VALUE:
- buf = dht_retrieve(dht, msg->key.data);
- if (buf.len != 0) {
- resp_msg.n_addrs = buf.len;
- resp_msg.addrs = (uint64_t *) buf.data;
+ if (!msg->has_t_expire) {
+ log_warn("No expiry time in store message.");
+ break;
+ }
+
+ kad_add(dht, *msg->contacts, msg->n_contacts,
+ msg->t_expire);
break;
- }
- /* FALLTHRU */
- case KAD_FIND_NODE:
- /* Return k closest contacts. */
- resp_msg.n_contacts =
- dht_get_contacts(dht, msg->key.data, &cmsgs);
- resp_msg.contacts = cmsgs;
- break;
- case KAD_STORE:
- if (msg->n_contacts < 1) {
- log_warn("No contacts in store message.");
+ case KAD_RESPONSE:
+ kad_handle_response(dht, msg);
+ break;
+ default:
+ assert(false);
break;
}
- if (!msg->has_t_expire) {
- log_warn("No expiry time in store message.");
- break;
+ if (msg->code != KAD_JOIN) {
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, msg->s_id.data, addr))
+ log_warn("Failed to update bucket.");
+ pthread_rwlock_unlock(&dht->lock);
}
- kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire);
- break;
- case KAD_RESPONSE:
- kad_handle_response(dht, msg);
- break;
- default:
- assert(false);
- break;
- }
+ if (msg->code < KAD_STORE) {
+ if (send_msg(dht, &resp_msg, addr))
+ log_warn("Failed to send response.");
+ }
- if (msg->code != KAD_JOIN) {
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_update_bucket(dht, msg->s_id.data, addr))
- log_warn("Failed to update bucket.");
- pthread_rwlock_unlock(&dht->lock);
- }
+ kad_msg__free_unpacked(msg, NULL);
- if (msg->code < KAD_STORE) {
- if (send_msg(dht, &resp_msg, addr))
- log_warn("Failed to send response.");
- }
+ if (resp_msg.n_addrs > 0)
+ free(resp_msg.addrs);
- kad_msg__free_unpacked(msg, NULL);
+ if (resp_msg.n_contacts == 0) {
+ tpm_inc(dht->tpm);
+ continue;
+ }
- if (resp_msg.n_addrs > 0)
- free(resp_msg.addrs);
+ for (i = 0; i < resp_msg.n_contacts; ++i)
+ kad_contact_msg__free_unpacked(resp_msg.contacts[i],
+ NULL);
+ free(resp_msg.contacts);
- if (resp_msg.n_contacts == 0)
- return (void *) -1;
+ tpm_inc(dht->tpm);
+ }
- for (i = 0; i < resp_msg.n_contacts; ++i)
- kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
- free(resp_msg.contacts);
+ tpm_exit(dht->tpm);
return (void *) 0;
}
@@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)
static void dht_post_sdu(void * comp,
struct shm_du_buff * sdb)
{
- pthread_t thr;
- struct sdu_info * info;
+ struct cmd * cmd;
+ struct dht * dht = (struct dht *) comp;
- info = malloc(sizeof(*info));
- if (info == NULL)
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
return;
+ }
- info->dht = (struct dht *) comp;
- info->sdb = sdb;
+ cmd->sdb = sdb;
- if (pthread_create(&thr, NULL, dht_handle_sdu, info)) {
- free(info);
- return;
- }
+ pthread_mutex_lock(&dht->mtx);
+
+ list_add(&cmd->next, &dht->cmds);
- pthread_detach(thr);
+ pthread_cond_signal(&dht->cond);
+
+ pthread_mutex_unlock(&dht->mtx);
}
void dht_destroy(struct dht * dht)
@@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)
if (dht == NULL)
return;
+#ifndef __DHT_TEST__
+ tpm_stop(dht->tpm);
+
+ tpm_destroy(dht->tpm);
+#endif
if (dht_get_state(dht) == DHT_RUNNING) {
dht_set_state(dht, DHT_SHUTDOWN);
pthread_cancel(dht->worker);
@@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)
list_head_init(&dht->requests);
list_head_init(&dht->refs);
list_head_init(&dht->lookups);
+ list_head_init(&dht->cmds);
if (pthread_rwlock_init(&dht->lock, NULL))
goto fail_rwlock;
@@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)
dht->addr = addr;
dht->id = NULL;
#ifndef __DHT_TEST__
+ dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht);
+ if (dht->tpm == NULL)
+ goto fail_tpm_create;
+
+ if (tpm_start(dht->tpm))
+ goto fail_tpm_start;
+
dht->fd = dt_reg_ae(dht, &dht_post_sdu);
notifier_reg(handle_event, dht);
#else
(void) handle_event;
+ (void) dht_handle_sdu;
(void) dht_post_sdu;
#endif
dht->state = DHT_INIT;
return dht;
-
+#ifndef __DHT_TEST__
+ fail_tpm_start:
+ tpm_destroy(dht->tpm);
+ fail_tpm_create:
+ bmp_destroy(dht->cookies);
+#endif
fail_bmp:
pthread_cond_destroy(&dht->cond);
fail_cond: