diff options
author | Dimitri Staessens <[email protected]> | 2018-02-10 10:12:02 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2018-02-10 13:13:49 +0100 |
commit | 5d8519018bda6af03c116604a2922625b00f8e52 (patch) | |
tree | 36834e9ef8ed4bb1e3cd027ecb50424f2fe44cb0 /src/ipcpd | |
parent | 544ad6f9759de6acc109307caee2100478cba8ed (diff) | |
download | ouroboros-5d8519018bda6af03c116604a2922625b00f8e52.tar.gz ouroboros-5d8519018bda6af03c116604a2922625b00f8e52.zip |
ipcpd: Revise lookup tracking in DHT
The lookups now track the responses by cookie instead of just counting
the remaining number of responses. This is needed because simultaneous
lookups for the same hash interfere with eachother and lead to missed
responses.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/normal/dht.c | 136 | ||||
-rw-r--r-- | src/ipcpd/normal/fa.c | 6 |
2 files changed, 88 insertions, 54 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index 9296130a..d2ea9985 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -60,13 +60,13 @@ typedef KadContactMsg kad_contact_msg_t; #define KAD_K 8 /* Replication factor, MDHT value. */ #define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ #define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ -#define KAD_T_JOIN 6 /* Response time to wait for a join. */ -#define KAD_T_RESP 2 /* Response time to wait for a response. */ +#define KAD_T_JOIN 8 /* Response time to wait for a join. */ +#define KAD_T_RESP 5 /* Response time to wait for a response. */ #define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ #define KAD_QUEER 15 /* Time to declare peer questionable. */ #define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ #define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ -#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */ +#define KAD_JOIN_RETR 8 /* Number of retries sending a join. */ #define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */ #define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */ @@ -101,7 +101,6 @@ enum lookup_state { LU_PENDING, LU_UPDATE, LU_COMPLETE, - LU_DONE, LU_DESTROY }; @@ -120,16 +119,22 @@ struct kad_req { time_t t_exp; }; +struct cookie_el { + struct list_head next; + + uint32_t cookie; +}; + struct lookup { struct list_head next; + struct list_head cookies; + uint8_t * key; struct list_head contacts; size_t n_contacts; - size_t out; - uint64_t * addrs; size_t n_addrs; @@ -645,11 +650,11 @@ static struct lookup * lookup_create(struct dht * dht, goto fail_malloc; list_head_init(&lu->contacts); + list_head_init(&lu->cookies); lu->state = LU_INIT; lu->addrs = NULL; lu->n_addrs = 0; - lu->out = 0; lu->key = dht_dup_key(id, dht->b); if (lu->key == NULL) goto fail_id; @@ -688,16 +693,6 @@ static struct lookup * lookup_create(struct dht * dht, return NULL; } -static void lookup_add_out(struct lookup * lu, - size_t n) -{ - pthread_mutex_lock(&lu->lock); - - lu->out += n; - - pthread_mutex_unlock(&lu->lock); -} - static void cancel_lookup_destroy(void * o) { struct lookup * lu; @@ -717,6 +712,12 @@ static void cancel_lookup_destroy(void * o) contact_destroy(c); } + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * c = list_entry(p, struct cookie_el, next); + list_del(&c->next); + free(c); + } + pthread_mutex_unlock(&lu->lock); pthread_mutex_destroy(&lu->lock); @@ -739,7 +740,6 @@ static void lookup_destroy(struct lookup * lu) pthread_cond_broadcast(&lu->cond); break; case LU_INIT: - case LU_DONE: case LU_UPDATE: case LU_COMPLETE: lu->state = LU_NULL; @@ -762,6 +762,7 @@ static void lookup_update(struct dht * dht, kad_msg_t * msg) { struct list_head * p = NULL; + struct list_head * h; struct contact * c = NULL; size_t n; size_t pos = 0; @@ -775,7 +776,19 @@ static void lookup_update(struct dht * dht, pthread_mutex_lock(&lu->lock); - --lu->out; + list_for_each_safe(p, h, &lu->cookies) { + struct cookie_el * e = list_entry(p, struct cookie_el, next); + if (e->cookie == msg->cookie) { + list_del(&e->next); + free(e); + break; + } + } + + if (lu->state == LU_COMPLETE) { + pthread_mutex_unlock(&lu->lock); + return; + } if (msg->n_addrs > 0) { if (lu->addrs == NULL) { @@ -784,6 +797,7 @@ static void lookup_update(struct dht * dht, lu->addrs[n] = msg->addrs[n]; lu->n_addrs = msg->n_addrs; } + lu->state = LU_COMPLETE; pthread_cond_broadcast(&lu->cond); pthread_mutex_unlock(&lu->lock); @@ -850,7 +864,7 @@ static void lookup_update(struct dht * dht, } } - if (lu->out == 0 && !mod) + if (list_is_empty(&lu->cookies) && !mod) lu->state = LU_COMPLETE; else lu->state = LU_UPDATE; @@ -929,11 +943,6 @@ static void lookup_new_addrs(struct lookup * lu, addrs[n] = 0; - if (n == 0) { - lu->state = LU_DONE; - pthread_cond_signal(&lu->cond); - } - pthread_mutex_unlock(&lu->lock); } @@ -969,10 +978,8 @@ static enum lookup_state lookup_wait(struct lookup * lu) pthread_mutex_lock(&lu->lock); - if (lu->state == LU_INIT) { + if (lu->state == LU_INIT || lu->state == LU_UPDATE) lu->state = LU_PENDING; - pthread_cond_signal(&lu->cond); - } pthread_cleanup_push(cleanup_wait, lu); @@ -1009,17 +1016,26 @@ static struct kad_req * dht_find_request(struct dht * dht, } static struct lookup * dht_find_lookup(struct dht * dht, - const uint8_t * key) + uint32_t cookie) { struct list_head * p; + struct list_head * p2; assert(dht); - assert(key); + assert(cookie > 0); list_for_each(p, &dht->lookups) { struct lookup * l = list_entry(p, struct lookup, next); - if (!memcmp(l->key, key, dht->b)) - return l; + pthread_mutex_lock(&l->lock); + list_for_each(p2, &l->cookies) { + struct cookie_el * e; + e = list_entry(p2, struct cookie_el, next); + if (e->cookie == cookie) { + pthread_mutex_unlock(&l->lock); + return l; + } + } + pthread_mutex_unlock(&l->lock); } return NULL; @@ -1479,7 +1495,7 @@ static int send_msg(struct dht * dht, if (msg->code < KAD_STORE && dht_get_state(dht) != DHT_SHUTDOWN) kad_req_create(dht, msg, addr); - return 0; + return msg->cookie; #ifndef __DHT_TEST__ fail_msg: pthread_rwlock_wrlock(&dht->lock); @@ -1592,14 +1608,14 @@ static int kad_store(struct dht * dht, msg.n_contacts = 1; msg.contacts = cmsgp; - if (send_msg(dht, &msg, r_addr)) + if (send_msg(dht, &msg, r_addr) < 0) return -1; return 0; } static ssize_t kad_find(struct dht * dht, - const uint8_t * key, + struct lookup * lu, const uint64_t * addrs, enum kad_code code) { @@ -1607,19 +1623,40 @@ static ssize_t kad_find(struct dht * dht, ssize_t sent = 0; assert(dht); - assert(key); + assert(lu->key); msg.code = code; msg.has_key = true; - msg.key.data = (uint8_t *) key; + msg.key.data = (uint8_t *) lu->key; msg.key.len = dht->b; while (*addrs != 0) { - if (*addrs != dht->addr) { - send_msg(dht, &msg, *addrs); - sent++; + struct cookie_el * c; + int ret; + + if (*addrs == dht->addr) { + ++addrs; + continue; } + + ret = send_msg(dht, &msg, *addrs); + if (ret < 0) + break; + + c = malloc(sizeof(*c)); + if (c == NULL) + break; + + c->cookie = (uint32_t) ret; + + pthread_mutex_lock(&lu->lock); + + list_add_tail(&c->next, &lu->cookies); + + pthread_mutex_unlock(&lu->lock); + + ++sent; ++addrs; } @@ -1643,7 +1680,6 @@ static struct lookup * kad_lookup(struct dht * dht, uint64_t addrs[KAD_ALPHA + 1]; enum lookup_state state; struct lookup * lu; - size_t out; lu = lookup_create(dht, id); if (lu == NULL) @@ -1657,14 +1693,11 @@ static struct lookup * kad_lookup(struct dht * dht, return NULL; } - out = kad_find(dht, id, addrs, code); - if (out == 0) { + if (kad_find(dht, lu, addrs, code) == 0) { lookup_detach(dht, lu); return lu; } - lookup_add_out(lu, out); - while ((state = lookup_wait(lu)) != LU_COMPLETE) { switch (state) { case LU_UPDATE: @@ -1672,10 +1705,8 @@ static struct lookup * kad_lookup(struct dht * dht, if (addrs[0] == 0) break; - out = kad_find(dht, id, addrs, code); - lookup_add_out(lu, out); + kad_find(dht, lu, addrs, code); break; - case LU_DONE: case LU_DESTROY: lookup_detach(dht, lu); lookup_set_state(lu, LU_NULL); @@ -1714,7 +1745,6 @@ static void kad_publish(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - addrs = malloc(k * sizeof(*addrs)); if (addrs == NULL) return; @@ -1768,7 +1798,7 @@ static int kad_join(struct dht * dht, pthread_rwlock_unlock(&dht->lock); - if (send_msg(dht, &msg, addr)) + if (send_msg(dht, &msg, addr) < 0) return -1; if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) @@ -2104,6 +2134,8 @@ static int kad_handle_join_resp(struct dht * dht, kad_req_respond(req); + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + pthread_rwlock_unlock(&dht->lock); log_dbg("Enrollment of DHT completed."); @@ -2123,7 +2155,7 @@ static int kad_handle_find_resp(struct dht * dht, pthread_rwlock_rdlock(&dht->lock); - lu = dht_find_lookup(dht, req->key); + lu = dht_find_lookup(dht, req->cookie); if (lu == NULL) { pthread_rwlock_unlock(&dht->lock); return -1; @@ -2515,7 +2547,7 @@ static void * dht_handle_sdu(void * o) pthread_rwlock_unlock(&dht->lock); } - if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr)) + if (msg->code < KAD_STORE && send_msg(dht, &resp_msg, addr) < 0) log_warn("Failed to send response."); kad_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 8d8b51ee..2b3f5c2a 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -229,10 +229,12 @@ int fa_alloc(int fd, uint64_t addr; struct shm_du_buff * sdb; - if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) + addr = dir_query(dst); + if (addr == 0) return -1; - addr = dir_query(dst); + if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + ipcp_dir_hash_len())) + return -1; msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REQ; |