diff options
author | dimitri staessens <[email protected]> | 2017-07-13 09:43:09 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-07-18 13:03:05 +0200 |
commit | 6e739b09bef860a4830328630ea07622bdd79d79 (patch) | |
tree | 205ea90bd2f59a0a707c7b4a14df2a54fd7b4a50 /src/ipcpd/normal/dht.c | |
parent | 0bcb3ab0804bbfd31d056c08548cb40591598f4b (diff) | |
download | ouroboros-6e739b09bef860a4830328630ea07622bdd79d79.tar.gz ouroboros-6e739b09bef860a4830328630ea07622bdd79d79.zip |
ipcpd: Add DHT as directory in normal IPCP
This implements a Distributed Hash Table (DHT) based on the Kademlia
protocol, with default parameters set as used in the BitTorrent
Mainline DHT. This initial implementation is almost feature complete,
except for some things to be done after a testing period: caching and
stale peer bumping, and setting the expiration timeout via the IRM
tool.
Diffstat (limited to 'src/ipcpd/normal/dht.c')
-rw-r--r-- | src/ipcpd/normal/dht.c | 2369 |
1 files changed, 2369 insertions, 0 deletions
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c new file mode 100644 index 00000000..0b00e2f5 --- /dev/null +++ b/src/ipcpd/normal/dht.c @@ -0,0 +1,2369 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#define OUROBOROS_PREFIX "dht" + +#include <ouroboros/config.h> +#include <ouroboros/hash.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 6 /* Response time to wait for a join. */ +#define KAD_T_RESP 2 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ + +enum dht_state { + DHT_INIT = 0, + DHT_RUNNING, + DHT_SHUTDOWN, +}; + +enum kad_code { + KAD_JOIN = 0, + KAD_FIND_NODE, + KAD_FIND_VALUE, + /* Messages without a response below. */ + KAD_STORE, + KAD_RESPONSE +}; + +enum kad_req_state { + REQ_NULL = 0, + REQ_INIT, + REQ_PENDING, + REQ_RESPONSE, + REQ_DONE, + REQ_DESTROY +}; + +enum lookup_state { + LU_NULL = 0, + LU_INIT, + LU_PENDING, + LU_UPDATE, + LU_COMPLETE, + LU_DONE, + LU_DESTROY +}; + +struct kad_req { + struct list_head next; + + uint32_t cookie; + enum kad_code code; + uint8_t * key; + uint64_t addr; + + enum kad_req_state state; + pthread_cond_t cond; + pthread_mutex_t lock; + + time_t t_exp; +}; + +struct lookup { + struct list_head next; + + uint8_t * key; + + struct list_head contacts; + size_t n_contacts; + + uint64_t * addrs; + size_t n_addrs; + + enum lookup_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct val { + struct list_head next; + + uint64_t addr; + + time_t t_exp; + time_t t_rep; +}; + +struct ref_entry { + struct list_head next; + + uint8_t * key; + + time_t t_rep; +}; + +struct dht_entry { + struct list_head next; + + uint8_t * key; + size_t n_vals; + struct list_head vals; +}; + +struct contact { + struct list_head next; + + uint8_t * id; + uint64_t addr; + + size_t fails; + time_t t_seen; +}; + +struct bucket { + struct list_head contacts; + size_t n_contacts; + + struct list_head alts; + size_t n_alts; + + time_t t_refr; + + size_t depth; + uint8_t mask; + + struct bucket * parent; + struct bucket * children[1L << KAD_BETA]; +}; + +struct dht { + size_t alpha; + size_t b; + size_t k; + + time_t t_expire; + time_t t_refresh; + time_t t_replic; + time_t t_repub; + + uint8_t * id; + uint64_t addr; + + struct bucket * buckets; + + struct list_head entries; + + struct list_head refs; + + struct list_head lookups; + + struct list_head requests; + struct bmp * cookies; + + enum dht_state state; + pthread_mutex_t mtx; + + pthread_rwlock_t lock; + + int fd; + + pthread_t worker; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, + size_t len) +{ + uint8_t * dup; + + dup = malloc(sizeof(*dup) * len); + if (dup == NULL) + return NULL; + + memcpy(dup, key, len); + + return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ + enum dht_state state; + + pthread_mutex_lock(&dht->mtx); + + state = dht->state; + + pthread_mutex_unlock(&dht->mtx); + + return state; +} + +static void dht_set_state(struct dht * dht, + enum dht_state state) +{ + pthread_mutex_lock(&dht->mtx); + + dht->state = state; + + pthread_mutex_unlock(&dht->mtx); +} + +static uint8_t * create_id(size_t len) +{ + uint8_t * id; + + id = malloc(len); + if (id == NULL) + return NULL; + + if (random_buffer(id, len) < 0) { + free(id); + return NULL; + } + + return id; +} + +static struct kad_req * kad_req_create(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct kad_req * req; + pthread_condattr_t cattr; + struct timespec t; + + req = malloc(sizeof(*req)); + if (req == NULL) + return NULL; + + list_head_init(&req->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + req->t_exp = t.tv_sec + KAD_T_RESP; + req->addr = addr; + req->state = REQ_INIT; + req->cookie = msg->cookie; + req->code = msg->code; + req->key = NULL; + + if (msg->has_key) { + req->key = dht_dup_key(msg->key.data, dht->b); + if (req->key == NULL) { + free(req); + return NULL; + } + } + + if (pthread_mutex_init(&req->lock, NULL)) { + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&req->cond, &cattr)) { + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&req->lock); + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_destroy(&cattr); + + return req; +} + +static void kad_req_destroy(struct kad_req * req) +{ + assert(req); + + if (req->key != NULL) + free(req->key); + + pthread_mutex_lock(&req->lock); + + switch (req->state) { + case REQ_DESTROY: + pthread_mutex_unlock(&req->lock); + return; + case REQ_PENDING: + req->state = REQ_DESTROY; + pthread_cond_signal(&req->cond); + break; + case REQ_INIT: + case REQ_DONE: + req->state = REQ_NULL; + break; + case REQ_RESPONSE: + case REQ_NULL: + default: + break; + } + + while (req->state != REQ_NULL) + pthread_cond_wait(&req->cond, &req->lock); + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + free(req); +} + +static int kad_req_wait(struct kad_req * req, + time_t t) +{ + struct timespec timeo = {t, 0}; + struct timespec abs; + int ret = 0; + + assert(req); + + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&abs, &timeo, &abs); + + pthread_mutex_lock(&req->lock); + + req->state = REQ_PENDING; + + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + + switch(req->state) { + case REQ_DESTROY: + ret = -1; + req->state = REQ_NULL; + pthread_cond_signal(&req->cond); + break; + case REQ_PENDING: /* ETIMEDOUT */ + case REQ_RESPONSE: + req->state = REQ_DONE; + pthread_cond_signal(&req->cond); + break; + default: + break; + } + + pthread_mutex_unlock(&req->lock); + + return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ + pthread_mutex_lock(&req->lock); + + req->state = REQ_RESPONSE; + pthread_cond_signal(&req->cond); + + pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, + size_t len, + uint64_t addr) +{ + struct contact * c; + struct timespec t; + + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; + + list_head_init(&c->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id, len); + if (c->id == NULL) { + free(c); + return NULL; + } + + return c; +} + +static void contact_destroy(struct contact * c) +{ + if (c != NULL) + free(c->id); + + free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht * dht, + const uint8_t * id) +{ + assert(dht->buckets); + + return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, + const uint8_t * dst) +{ + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, + struct contact * c, + const uint8_t * key) +{ + struct list_head * p; + + assert(l); + assert(c); + assert(key); + assert(c->id); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (dist(c->id, key) > dist(e->id, key)) + break; + } + + list_add_tail(&c->next, p); + + return 1; +} + +static size_t dht_contact_list(struct dht * dht, + struct list_head * l, + const uint8_t * key) +{ + struct list_head * p; + struct bucket * b; + size_t len = 0; + size_t i; + struct timespec t; + + assert(l); + assert(dht); + assert(key); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + b = dht_get_bucket(dht, key); + if (b == NULL) + return 0; + + b->t_refr = t.tv_sec + KAD_T_REFR; + + if (b->n_contacts == dht->k || b->parent == NULL) { + list_for_each(p, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << KAD_BETA); ++i) { + list_for_each(p, &d->children[i]->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (c == NULL) + continue; + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } + } + + assert(len == dht->k || b->parent == NULL); + + return len; +} + +static struct lookup * lookup_create(struct dht * dht, + const uint8_t * id) +{ + struct lookup * lu; + pthread_condattr_t cattr; + + assert(dht); + assert(id); + + lu = malloc(sizeof(*lu)); + if (lu == NULL) + goto fail_malloc; + + list_head_init(&lu->contacts); + + lu->state = LU_INIT; + lu->addrs = NULL; + lu->n_addrs = 0; + lu->key = dht_dup_key(id, dht->b); + if (lu->key == NULL) + goto fail_id; + + if (pthread_mutex_init(&lu->lock, NULL)) + goto fail_mutex; + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&lu->cond, &cattr)) + goto fail_cond; + + pthread_condattr_destroy(&cattr); + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&lu->next, &dht->lookups); + + lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + + pthread_rwlock_unlock(&dht->lock); + + return lu; + + fail_cond: + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&lu->lock); + fail_mutex: + free(lu->key); + fail_id: + free(lu); + fail_malloc: + return NULL; +} + +static void lookup_destroy(struct lookup * lu) +{ + struct list_head * p; + struct list_head * h; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + switch (lu->state) { + case LU_DESTROY: + pthread_mutex_unlock(&lu->lock); + return; + case LU_PENDING: + lu->state = LU_DESTROY; + pthread_cond_signal(&lu->cond); + break; + case LU_INIT: + case LU_DONE: + case LU_UPDATE: + case LU_COMPLETE: + lu->state = REQ_NULL; + break; + case LU_NULL: + default: + break; + } + + while (lu->state != LU_NULL) + pthread_cond_wait(&lu->cond, &lu->lock); + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_cond_destroy(&lu->cond); + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_update(struct dht * dht, + struct lookup * lu, + kad_msg_t * msg) +{ + struct list_head * p = NULL; + struct contact * c = NULL; + size_t n; + size_t pos = 0; + + assert(lu); + assert(msg); + + if (dht_get_state(dht) != DHT_RUNNING) + return; + + pthread_mutex_lock(&lu->lock); + + if (msg->n_addrs > 0) { + if (lu->addrs == NULL) { + lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); + for (n = 0; n < msg->n_addrs; ++n) + lu->addrs[n] = msg->addrs[n]; + lu->n_addrs = msg->n_addrs; + } + lu->state = LU_COMPLETE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; + } + + while (lu->state == LU_INIT) + pthread_cond_wait(&lu->cond, &lu->lock); + + for (n = 0; n < msg->n_contacts; ++n) { + c = contact_create(msg->contacts[n]->id.data, + dht->b, msg->contacts[n]->addr); + if (c == NULL) + continue; + + list_for_each(p, &lu->contacts) { + struct contact * e; + e = list_entry(p, struct contact, next); + if (!memcmp(e->id, c->id, dht->b)) { + contact_destroy(c); + goto finish_node; + } + + if (dist(c->id, lu->key) > dist(e->id, lu->key)) + break; + pos++; + } + + } + + if (pos == dht->k) { + contact_destroy(c); + goto finish_node; + } else { + struct contact * d; + d = list_last_entry(&lu->contacts, struct contact, next); + list_del(&d->next); + list_add_tail(&c->next, p); + contact_destroy(d); + } + + finish_node: + lu->state = LU_UPDATE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, + uint64_t * addrs) +{ + ssize_t n; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + for (n = 0; (size_t) n < lu->n_addrs; ++n) + addrs[n] = lu->addrs[n]; + + assert((size_t) n == lu->n_addrs); + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + addrs[n] = c->addr; + n++; + } + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_new_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + /* Uses fails to check if the contact has been contacted. */ + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (c->fails == 0) { + c->fails = 1; + addrs[n] = c->addr; + n++; + } + + if (n == KAD_ALPHA) + break; + } + + if (n == 0) + lu->state = LU_DONE; + + pthread_mutex_unlock(&lu->lock); + + assert(n <= KAD_ALPHA); + + return n; +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ + enum lookup_state state; + + pthread_mutex_lock(&lu->lock); + + lu->state = LU_PENDING; + pthread_cond_signal(&lu->cond); + + pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); + + while (lu->state == LU_PENDING) + pthread_cond_wait(&lu->cond, &lu->lock); + + pthread_cleanup_pop(false); + + if (lu->state == LU_DESTROY) { + lu->state = LU_NULL; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return -1; + } + + state = lu->state; + + pthread_mutex_unlock(&lu->lock); + + return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, + kad_msg_t * msg) +{ + struct list_head * p; + + assert(dht); + assert(msg); + + list_for_each(p, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + if (r->cookie == msg->cookie) + return r; + } + + return NULL; +} + +static struct lookup * dht_find_lookup(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + assert(dht); + assert(key); + + list_for_each(p, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + if (!memcmp(l->key, key, dht->b)) + return l; + } + + return NULL; +} + +static struct val * val_create(uint64_t addr, + time_t exp) +{ + struct val * v; + struct timespec t; + + v = malloc(sizeof(*v)); + if (v == NULL) + return NULL; + + list_head_init(&v->next); + v->addr = addr; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + + return v; +} + +static void val_destroy(struct val * v) +{ + assert(v); + + free(v); +} + +static struct ref_entry * ref_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + struct timespec t; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + e->t_rep = t.tv_sec + dht->t_repub; + + return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ + free(e->key); + free(e); +} + +static struct dht_entry * dht_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + list_head_init(&e->next); + list_head_init(&e->vals); + + e->n_vals = 0; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + list_del(&v->next); + val_destroy(v); + } + + free(e->key); + + free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, + uint64_t addr, + time_t exp) +{ + struct list_head * p; + struct val * val; + struct timespec t; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + if (v->t_exp < t.tv_sec + exp) { + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + } + + return 0; + } + } + + val = val_create(addr, exp); + if (val == NULL) + return -ENOMEM; + + list_add(&val->next, &e->vals); + ++e->n_vals; + + return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + list_del(&v->next); + val_destroy(v); + --e->n_vals; + } + } + + if (e->n_vals == 0) { + list_del(&e->next); + dht_entry_destroy(e); + } +} + +static uint64_t dht_entry_get_addr(struct dht * dht, + struct dht_entry * e) +{ + struct list_head * p; + + assert(e); + assert(!list_is_empty(&e->vals)); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr != dht->addr) + return v->addr; + } + + return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * key, + enum kad_code code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht * dht, + struct bucket * b, + time_t t, + struct list_head * r) +{ + size_t i; + + if (*b->children != NULL) + for (i = 0; i < (1L << KAD_BETA); ++i) + bucket_refresh(dht, b->children[i], t, r); + + if (b->n_contacts == 0) + return; + + if (t > b->t_refr) { + struct contact * c; + struct contact * d; + c = list_first_entry(&b->contacts, struct contact, next); + d = contact_create(c->id, dht->b, c->addr); + if (c != NULL) + list_add(&d->next, r); + return; + } +} + + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts); + b->n_contacts = 0; + + list_head_init(&b->alts); + b->n_alts = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + KAD_T_REFR; + + for (i = 0; i < (1L << KAD_BETA); ++i) + b->children[i] = NULL; + + b->parent = NULL; + b->depth = 0; + + return b; +} + +static void bucket_destroy(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + size_t i; + + assert(b); + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i] != NULL) + bucket_destroy(b->children[i]); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + list_for_each_safe(p, h, &b->alts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + free(b); +} + +static bool bucket_has_id(struct bucket * b, + const uint8_t * id) +{ + uint8_t mask; + uint8_t byte; + + if (b->depth == 0) + return true; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + + return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + uint8_t mask = 0; + size_t i; + size_t c; + + assert(b); + assert(b->n_alts == 0); + assert(b->n_contacts); + assert(b->children[0] == NULL); + + c = b->n_contacts; + + for (i = 0; i < (1L << KAD_BETA); ++i) { + b->children[i] = bucket_create(); + if (b->children[i] == NULL) { + size_t j; + for (j = 0; j < i; ++j) + bucket_destroy(b->children[j]); + return -1; + } + + b->children[i]->depth = b->depth + 1; + b->children[i]->mask = mask; + b->children[i]->parent = b; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + if (bucket_has_id(b->children[i], c->id)) { + list_del(&c->next); + --b->n_contacts; + list_add(&c->next, &b->children[i]->contacts); + ++b->children[i]->n_contacts; + } + } + + mask++; + } + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i]->n_contacts == c) + split_bucket(b->children[i]); + + return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht * dht, + const uint8_t * id, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + struct contact * c; + + assert(dht); + + b = dht_get_bucket(dht, id); + if (b == NULL) + return -1; + + c = contact_create(id, dht->b, addr); + if (c == NULL) + return -1; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * d = list_entry(p, struct contact, next); + if (d->addr == addr) { + list_del(&d->next); + contact_destroy(d); + --b->n_contacts; + } + } + + if (b->n_contacts == dht->k) { + if (bucket_has_id(b, dht->id)) { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + if (split_bucket(b)) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + } else if (b->n_alts == dht->k) { + struct contact * d; + d = list_first_entry(&b->alts, struct contact, next); + list_del(&d->next); + contact_destroy(d); + list_add_tail(&c->next, &b->alts); + } else { + list_add_tail(&c->next, &b->alts); + ++b->n_alts; + } + } else { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + } + + return 0; +} + +static int send_msg(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct shm_du_buff * sdb; + struct kad_req * req; + size_t len; + + pthread_rwlock_wrlock(&dht->lock); + + if (dht->id != NULL) { + msg->has_s_id = true; + msg->s_id.data = dht->id; + msg->s_id.len = dht->b; + } + + msg->s_addr = dht->addr; + + if (msg->code < KAD_STORE) { + msg->cookie = bmp_allocate(dht->cookies); + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) + goto fail_bmp_alloc; + } + + len = kad_msg__get_packed_size(msg); + if (len == 0) + goto fail_msg; + + if (ipcp_sdb_reserve(&sdb, len)) + goto fail_msg; + + kad_msg__pack(msg, shm_du_buff_head(sdb)); + +#ifndef __DHT_TEST__ + if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb)) + goto fail_write; +#else + (void) addr; + ipcp_sdb_release(sdb); +#endif /* __DHT_TEST__ */ + + if (msg->code < KAD_STORE) { + req = kad_req_create(dht, msg, addr); + if (req != NULL) + list_add(&req->next, &dht->requests); + } + + pthread_rwlock_unlock(&dht->lock); + + return 0; + +#ifndef __DHT_TEST__ + fail_write: + ipcp_sdb_release(sdb); +#endif + fail_msg: + bmp_release(dht->cookies, msg->cookie); + fail_bmp_alloc: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +static struct dht_entry * dht_find_entry(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + list_for_each(p, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht->b)) + return e; + } + + return NULL; +} + +static int kad_add(struct dht * dht, + const kad_contact_msg_t * contacts, + ssize_t n, + time_t exp) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + while (--n >= 0) { + if (contacts[n].id.len != dht->b) + log_warn("Bad key length in contact data."); + + e = dht_find_entry(dht, contacts[n].id.data); + if (e != NULL) { + if (dht_entry_add_addr(e, contacts[n].addr, exp)) + goto fail; + } else { + e = dht_entry_create(dht, contacts[n].id.data); + if (e == NULL) + goto fail; + + if (dht_entry_add_addr(e, contacts[n].addr, exp)) { + dht_entry_destroy(e); + goto fail; + } + + list_add(&e->next, &dht->entries); + } + } + + pthread_rwlock_unlock(&dht->lock); + return 0; + + fail: + pthread_rwlock_unlock(&dht->lock); + return -ENOMEM; +} + +static int wait_resp(struct dht * dht, + kad_msg_t * msg, + time_t timeo) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + pthread_rwlock_unlock(&dht->lock); + + return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht * dht, + const uint8_t * key, + uint64_t addr, + uint64_t r_addr, + time_t ttl) +{ + kad_msg_t msg = KAD_MSG__INIT; + kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; + kad_contact_msg_t * cmsgp[1]; + + cmsg.id.data = (uint8_t *) key; + cmsg.id.len = dht->b; + cmsg.addr = addr; + + cmsgp[0] = &cmsg; + + msg.code = KAD_STORE; + msg.has_t_expire = true; + msg.t_expire = ttl; + msg.n_contacts = 1; + msg.contacts = cmsgp; + + if (send_msg(dht, &msg, r_addr)) + return -1; + + return 0; +} + +static ssize_t kad_find(struct dht * dht, + const uint8_t * key, + const uint64_t * addrs, + enum kad_code code) +{ + kad_msg_t msg = KAD_MSG__INIT; + ssize_t sent = 0; + + assert(dht); + assert(key); + + msg.code = code; + + msg.has_key = true; + msg.key.data = (uint8_t *) key; + msg.key.len = dht->b; + + while (*addrs != 0) { + if (*addrs != dht->addr) { + send_msg(dht, &msg, *addrs); + sent++; + } + ++addrs; + } + + return sent; +} + +static void lookup_set_state(struct lookup * lu, + enum lookup_state state) +{ + pthread_mutex_lock(&lu->lock); + + lu->state = state; + + pthread_mutex_unlock(&lu->lock); +} + +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * id, + enum kad_code code) +{ + uint64_t addrs[KAD_ALPHA + 1]; + enum lookup_state state; + struct lookup * lu; + + lu = lookup_create(dht, id); + if (lu == NULL) + return NULL; + + addrs[lookup_new_addrs(lu, addrs)] = 0; + + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lookup_destroy(lu); + return NULL; + } + + if (kad_find(dht, id, addrs, code) == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lu->state = LU_COMPLETE; + return lu; + } + + while ((state = lookup_wait(lu)) != LU_COMPLETE) { + switch (state) { + case LU_UPDATE: + addrs[lookup_new_addrs(lu, addrs)] = 0; + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + return lu; + } + + kad_find(dht, id, addrs, code); + break; + case LU_DESTROY: + lookup_set_state(lu, LU_NULL); + return NULL; + default: + break; + }; + } + + assert(state = LU_COMPLETE); + + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + + return lu; +} + +static void kad_publish(struct dht * dht, + const uint8_t * key, + uint64_t addr, + time_t exp) +{ + struct lookup * lu; + uint64_t addrs[KAD_K]; + ssize_t n; + + assert(dht); + assert(key); + + lu = kad_lookup(dht, key, KAD_FIND_NODE); + if (lu == NULL) + return; + + n = lookup_contact_addrs(lu, addrs); + + while (--n > 0) { + if (addrs[n] == dht->addr) { + kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; + msg.id.data = (uint8_t *) key; + msg.id.len = dht->b; + msg.addr = addr; + kad_add(dht, &msg, 1, exp); + } else { + if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) + log_warn("Failed to send store message."); + } + } + + lookup_destroy(lu); +} + +static int kad_join(struct dht * dht, + uint64_t addr) +{ + kad_msg_t msg = KAD_MSG__INIT; + struct lookup * lu; + + msg.code = KAD_JOIN; + + msg.has_alpha = true; + msg.has_b = true; + msg.has_k = true; + msg.has_t_refresh = true; + msg.has_t_replicate = true; + msg.alpha = KAD_ALPHA; + msg.b = dht->b; + msg.k = KAD_K; + msg.t_refresh = KAD_T_REFR; + msg.t_replicate = KAD_T_REPL; + + if (send_msg(dht, &msg, addr)) + return -1; + + if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) + return -1; + + dht->id = create_id(dht->b); + if (dht->id == NULL) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + if (lu == NULL) + log_warn("Join response not yet added."); + else + lookup_destroy(lu); + + return 0; +} + +static void dht_dead_peer(struct dht * dht, + uint8_t * key, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + + b = dht_get_bucket(dht, key); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (b->n_contacts + b->n_alts <= dht->k) { + ++c->fails; + return; + } + + if (c->addr == addr) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + break; + } + } + + while (b->n_contacts < dht->k && b->n_alts > 0) { + struct contact * c; + c = list_first_entry(&b->alts, struct contact, next); + list_del(&c->next); + --b->n_alts; + list_add(&c->next, &b->contacts); + ++b->n_contacts; + } +} + +static int dht_del(struct dht * dht, + const uint8_t * key, + uint64_t addr) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + dht_entry_del_addr(e, addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static buffer_t dht_retrieve(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct list_head * p; + buffer_t buf; + uint64_t * pos; + + buf.len = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.data = malloc(sizeof(dht->addr) * e->n_vals); + if (buf.data == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.len = e->n_vals; + + pos = (uint64_t *) buf.data;; + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + *pos++ = v->addr; + } + + pthread_rwlock_unlock(&dht->lock); + + return buf; +} + +static ssize_t dht_get_contacts(struct dht * dht, + const uint8_t * key, + kad_contact_msg_t *** msgs) +{ + struct list_head l; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + list_head_init(&l); + + pthread_rwlock_rdlock(&dht->lock); + + len = dht_contact_list(dht, &l, key); + if (len == 0) + return 0; + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + return 0; + + list_for_each_safe(p, h, &l) { + struct contact * c = list_entry(p, struct contact, next); + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) { + pthread_rwlock_unlock(&dht->lock); + while (i > 0) + free(*msgs[--i]); + free(*msgs); + return 0; + } + + kad_contact_msg__init((*msgs)[i]); + + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht->b; + (*msgs)[i++]->addr = c->addr; + list_del(&c->next); + free(c); + } + + pthread_rwlock_unlock(&dht->lock); + + return i; +} + +static time_t gcd(time_t a, + time_t b) +{ + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static void * work(void * o) +{ + struct dht * dht; + struct timespec now; + struct list_head * p; + struct list_head * h; + struct list_head reflist; + time_t intv; + struct lookup * lu; + + dht = (struct dht *) o; + + intv = gcd(dht->t_expire, dht->t_repub); + intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + + list_head_init(&reflist); + + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht->lock); + + /* Republish registered hashes. */ + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e; + e = list_entry(p, struct ref_entry, next); + if (now.tv_sec > e->t_rep) { + kad_publish(dht, e->key, dht->addr, + dht->t_expire); + e->t_rep = now.tv_sec + dht->t_repub; + } + } + + /* Remove stale entries and republish if necessary. */ + list_for_each_safe(p, h, &dht->entries) { + struct list_head * p1; + struct list_head * h1; + struct dht_entry * e; + e = list_entry (p, struct dht_entry, next); + list_for_each_safe(p1, h1, &e->vals) { + struct val * v; + v = list_entry(p1, struct val, next); + if (now.tv_sec > v->t_exp) { + list_del(&v->next); + val_destroy(v); + } + + if (now.tv_sec > v->t_rep) { + kad_publish(dht, e->key, v->addr, + dht->t_expire - now.tv_sec); + v->t_rep = now.tv_sec + dht->t_replic; + } + } + } + + /* Check the requests list for unresponsive nodes. */ + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r; + r = list_entry(p, struct kad_req, next); + if (now.tv_sec > r->t_exp) { + list_del(&r->next); + bmp_release(dht->cookies, r->cookie); + dht_dead_peer(dht, r->key, r->addr); + kad_req_destroy(r); + } + } + + /* Refresh unaccessed buckets. */ + bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + + pthread_rwlock_unlock(&dht->lock); + + list_for_each_safe(p, h, &reflist) { + struct contact * c; + c = list_entry(p, struct contact, next); + lu = kad_lookup(dht, c->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + list_del(&c->next); + contact_destroy(c); + } + + sleep(intv); + } + + return (void *) 0; +} + +static int kad_handle_join_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + assert(dht); + assert(req); + assert(msg); + + /* We might send version numbers later to warn of updates if needed. */ + if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && + msg->has_t_refresh && msg->has_t_replicate)) { + log_warn("Join refused by remote."); + return -1; + } + + if (msg->b < sizeof(uint64_t)) { + log_err("Hash sizes less than 8 bytes unsupported."); + return -1; + } + + pthread_rwlock_wrlock(&dht->lock); + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + /* Likely corrupt packet. The member will refuse, we might here too. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) + log_warn("Different kademlia parameters detected."); + + if (msg->t_replicate != KAD_T_REPL) + log_warn("Different kademlia replication time detected."); + + if (msg->t_refresh != KAD_T_REFR) + log_warn("Different kademlia refresh time detected."); + + dht->k = msg->k; + dht->b = msg->b; + dht->t_expire = msg->t_expire; + dht->t_repub = MAX(1, dht->t_expire - 10); + + if (pthread_create(&dht->worker, NULL, work, dht)) { + bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + dht->state = DHT_RUNNING; + + kad_req_respond(req); + + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + + pthread_rwlock_unlock(&dht->lock); + + log_dbg("Enrollment of DHT completed."); + + return 0; +} + +static int kad_handle_find_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + struct lookup * lu; + + assert(dht); + assert(req); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + lu = dht_find_lookup(dht, req->key); + if (lu == NULL) { + log_dbg("Response for unknown lookup."); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + lookup_update(dht, lu, msg); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static void kad_handle_response(struct dht * dht, + kad_msg_t * msg) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_wrlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return; + } + + bmp_release(dht->cookies, req->cookie); + list_del(&req->next); + + pthread_rwlock_unlock(&dht->lock); + + switch(req->code) { + case KAD_JOIN: + if (kad_handle_join_resp(dht, req, msg)) + log_err("Enrollment of DHT failed."); + break; + case KAD_FIND_VALUE: + case KAD_FIND_NODE: + if (dht_get_state(dht) != DHT_RUNNING) + return; + if (kad_handle_find_resp(dht, req, msg)) + log_dbg("Invalid or outdated response."); + break; + default: + break; + } + + kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, + size_t b, + time_t t_expire) +{ + assert(dht); + + pthread_rwlock_wrlock(&dht->lock); + + dht->id = create_id(b); + if (dht->id == NULL) + goto fail_id; + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) + goto fail_buckets; + + dht->buckets->depth = 0; + dht->buckets->mask = 0; + + dht->b = b / CHAR_BIT; + dht->t_expire = MAX(2, t_expire); + dht->t_repub = MAX(1, t_expire - 10); + dht->k = KAD_K; + + if (pthread_create(&dht->worker, NULL, work, dht)) + goto fail_pthread_create; + + dht->state = DHT_RUNNING; + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; + + fail_pthread_create: + bucket_destroy(dht->buckets); + dht->buckets = NULL; + fail_buckets: + free(dht->id); + dht->id = NULL; + fail_id: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +int dht_enroll(struct dht * dht, + uint64_t addr) +{ + assert(dht); + + return kad_join(dht, addr); +} + +int dht_reg(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + + assert(dht); + assert(key); + assert(dht->addr != 0); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + e = ref_entry_create(dht, key); + if (e == NULL) + return -ENOMEM; + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&e->next, &dht->refs); + + pthread_rwlock_unlock(&dht->lock); + + kad_publish(dht, key, dht->addr, dht->t_expire); + + return 0; +} + +int dht_unreg(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + struct list_head * h; + + assert(dht); + assert(key); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * r = list_entry(p, struct ref_entry, next); + if (!memcmp(key, r->key, dht-> b) ) { + list_del(&r->next); + ref_entry_destroy(r); + } + } + + dht_del(dht, key, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +uint64_t dht_query(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct lookup * lu; + uint64_t addrs[KAD_K]; + size_t n; + + addrs[0] = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e != NULL) + addrs[0] = dht_entry_get_addr(dht, e); + + pthread_rwlock_unlock(&dht->lock); + + if (addrs[0] != 0 && addrs[0] != dht->addr) + return addrs[0]; + + lu = kad_lookup(dht, key, KAD_FIND_VALUE); + if (lu == NULL) + return 0; + + n = lookup_get_addrs(lu, addrs); + if (n == 0) { + lookup_destroy(lu); + return 0; + } + + lookup_destroy(lu); + + /* Current behaviour is anycast and return the first peer address. */ + if (addrs[0] != dht->addr) + return addrs[0]; + + if (n > 1) + return addrs[1]; + + return 0; +} + +void dht_post_sdu(void * ae, + struct shm_du_buff * sdb) +{ + struct dht * dht; + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + + assert(ae); + assert(sdb); + + memset(&buf, 0, sizeof(buf)); + + dht = (struct dht *) ae; + + msg = kad_msg__unpack(NULL, + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); + + ipcp_sdb_release(sdb); + + if (msg == NULL) { + log_err("Failed to unpack message."); + return; + } + + if (msg->has_key && msg->key.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + return; + } + + if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", msg->code); + return; + } + + if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { + kad_msg__free_unpacked(msg, NULL); + return; + } + + addr = msg->s_addr; + + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; + + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } + + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); + + break; + } + + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = dht->b; + resp_msg.k = KAD_K; + resp_msg.t_expire = dht->t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; + break; + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } + + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); + break; + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); + break; + } + + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); + } + + if (msg->code < KAD_STORE) + send_msg(dht, &resp_msg, addr); + + kad_msg__free_unpacked(msg, NULL); + + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); + + if (resp_msg.n_contacts == 0) + return; + + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); + free(resp_msg.contacts); +} + +void dht_destroy(struct dht * dht) +{ + struct list_head * p; + struct list_head * h; + + if (dht == NULL) + return; + + if (dht_get_state(dht) == DHT_RUNNING) + dht_set_state(dht, DHT_SHUTDOWN); + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + list_del(&r->next); + free(r); + } + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e = list_entry(p, struct ref_entry, next); + list_del(&e->next); + ref_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + list_del(&l->next); + lookup_destroy(l); + } + + pthread_rwlock_unlock(&dht->lock); + + if (dht_get_state(dht) == DHT_SHUTDOWN) { + pthread_cancel(dht->worker); + pthread_join(dht->worker, NULL); + } + + if (dht->buckets != NULL) + bucket_destroy(dht->buckets); + + bmp_destroy(dht->cookies); + + pthread_mutex_destroy(&dht->mtx); + + pthread_rwlock_destroy(&dht->lock); + + free(dht->id); + + free(dht); +} + +struct dht * dht_create(uint64_t addr) +{ + struct dht * dht; + + dht = malloc(sizeof(*dht)); + if (dht == NULL) + goto fail_malloc; + + dht->buckets = NULL; + + list_head_init(&dht->entries); + list_head_init(&dht->requests); + list_head_init(&dht->refs); + list_head_init(&dht->lookups); + + if (pthread_rwlock_init(&dht->lock, NULL)) + goto fail_rwlock; + + if (pthread_mutex_init(&dht->mtx, NULL)) + goto fail_mutex; + + dht->cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht->cookies == NULL) + goto fail_bmp; + + dht->b = 0; + dht->addr = addr; + dht->id = NULL; +#ifndef __DHT_TEST__ + dht->fd = dt_reg_ae(dht, &dht_post_sdu); +#endif /* __DHT_TEST__ */ + + dht->state = DHT_INIT; + + return dht; + + fail_bmp: + pthread_mutex_destroy(&dht->mtx); + fail_mutex: + pthread_rwlock_destroy(&dht->lock); + fail_rwlock: + free(dht); + fail_malloc: + return NULL; +} |