summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-09-24 14:34:03 +0200
committerdimitri staessens <[email protected]>2017-09-24 14:34:03 +0200
commitff5063ad0e7902ce59864a466bd9d8d606d788e4 (patch)
tree17f66b04659a06c018494eb732adb661111d63f2 /src/ipcpd
parent7cef269be64f64b920763c6f2455931422c8bfe9 (diff)
downloadouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.tar.gz
ouroboros-ff5063ad0e7902ce59864a466bd9d8d606d788e4.zip
ipcpd: Add threadpool manager to DHT
This adds a threadpool manager to the DHT. This was needed because the detached thread could cause a data race on shutdown. The threadpool manager is revised to allow multiple instances in a single program. The irmd and ipcp now store commands in a buffer (list) instead of a single buffer before passing it to handler threads.
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/ipcp.c99
-rw-r--r--src/ipcpd/ipcp.h9
-rw-r--r--src/ipcpd/normal/dht.c339
3 files changed, 262 insertions, 185 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 85d543da..513c638a 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -36,7 +36,6 @@
#include <ouroboros/dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/tpm.h>
#include "ipcp.h"
@@ -44,6 +43,14 @@
#include <sys/socket.h>
#include <stdlib.h>
+struct cmd {
+ struct list_head next;
+
+ uint8_t cbuf[IPCP_MSG_BUF_SIZE];
+ size_t len;
+ int fd;
+};
+
void ipcp_sig_handler(int sig,
siginfo_t * info,
void * c)
@@ -107,7 +114,8 @@ static void * acceptloop(void * o)
while (ipcp_get_state() != IPCP_SHUTDOWN &&
ipcp_get_state() != IPCP_NULL) {
- ssize_t count;
+ struct cmd * cmd;
+
#if defined(__FreeBSD__) || defined(__APPLE__)
FD_ZERO(&fds);
FD_SET(ipcpi.sockfd, &fds);
@@ -122,25 +130,28 @@ static void * acceptloop(void * o)
(void *) &tv, sizeof(tv)))
log_warn("Failed to set timeout on socket.");
- pthread_mutex_lock(&ipcpi.cmd_lock);
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Out of memory");
+ break;
+ }
- assert(ipcpi.csockfd == -1);
+ pthread_mutex_lock(&ipcpi.cmd_lock);
- count = read(csockfd, ipcpi.cbuf, IPCP_MSG_BUF_SIZE);
- if (count <= 0) {
+ cmd->len = read(csockfd, cmd->cbuf, IPCP_MSG_BUF_SIZE);
+ if (cmd->len <= 0) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
log_err("Failed to read from socket.");
close(csockfd);
+ free(cmd);
continue;
}
- ipcpi.cmd_len = count;
- ipcpi.csockfd = csockfd;
+ cmd->fd = csockfd;
- pthread_cond_signal(&ipcpi.cmd_cond);
+ list_add(&cmd->next, &ipcpi.cmds);
- while (ipcpi.csockfd != -1)
- pthread_cond_wait(&ipcpi.acc_cond, &ipcpi.cmd_lock);
+ pthread_cond_signal(&ipcpi.cmd_cond);
pthread_mutex_unlock(&ipcpi.cmd_lock);
}
@@ -159,13 +170,15 @@ static void * mainloop(void * o)
struct timespec dl;
struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000),
(IPCP_ACCEPT_TIMEOUT % 1000) * MILLION};
- (void) o;
+
+ (void) o;
while (true) {
int ret = 0;
ipcp_msg_t ret_msg = IPCP_MSG__INIT;
dif_info_msg_t dif_info = DIF_INFO_MSG__INIT;
int fd = -1;
+ struct cmd * cmd;
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
@@ -174,33 +187,34 @@ static void * mainloop(void * o)
pthread_mutex_lock(&ipcpi.cmd_lock);
- while (ipcpi.csockfd == -1 && ret != -ETIMEDOUT)
+ while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT)
ret = -pthread_cond_timedwait(&ipcpi.cmd_cond,
&ipcpi.cmd_lock,
&dl);
- sfd = ipcpi.csockfd;
- ipcpi.csockfd = -1;
-
- if (sfd == -1) {
+ if (ret == -ETIMEDOUT) {
pthread_mutex_unlock(&ipcpi.cmd_lock);
- if (tpm_check())
+ if (tpm_check(ipcpi.tpm))
break;
continue;
}
- pthread_cond_signal(&ipcpi.acc_cond);
+ cmd = list_last_entry(&ipcpi.cmds, struct cmd, next);
+ list_del(&cmd->next);
+
+ pthread_mutex_unlock(&ipcpi.cmd_lock);
+
+ msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf);
+ sfd = cmd->fd;
+
+ free(cmd);
- msg = ipcp_msg__unpack(NULL, ipcpi.cmd_len, ipcpi.cbuf);
if (msg == NULL) {
- pthread_mutex_unlock(&ipcpi.cmd_lock);
close(sfd);
continue;
}
- pthread_mutex_unlock(&ipcpi.cmd_lock);
-
- tpm_dec();
+ tpm_dec(ipcpi.tpm);
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
@@ -474,7 +488,7 @@ static void * mainloop(void * o)
if (buffer.len == 0) {
log_err("Failed to pack reply message");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -482,7 +496,7 @@ static void * mainloop(void * o)
if (buffer.data == NULL) {
log_err("Failed to create reply buffer.");
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
@@ -492,17 +506,17 @@ static void * mainloop(void * o)
log_err("Failed to send reply message");
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
continue;
}
free(buffer.data);
close(sfd);
- tpm_inc();
+ tpm_inc(ipcpi.tpm);
}
- tpm_exit();
+ tpm_exit(ipcpi.tpm);
return (void *) 0;
}
@@ -617,20 +631,14 @@ int ipcp_init(int argc,
goto fail_cmd_cond;
}
- if (pthread_cond_init(&ipcpi.acc_cond, &cattr)) {
- log_err("Failed to init convar.");
- goto fail_acc_cond;
- }
+ list_head_init(&ipcpi.cmds);
ipcpi.alloc_id = -1;
- ipcpi.csockfd = -1;
pthread_condattr_destroy(&cattr);
return 0;
- fail_acc_cond:
- pthread_cond_destroy(&ipcpi.cmd_cond);
fail_cmd_cond:
pthread_mutex_destroy(&ipcpi.cmd_lock);
fail_cmd_lock:
@@ -675,12 +683,14 @@ int ipcp_boot()
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop))
- goto fail_tpm_init;
+ ipcpi.tpm = tpm_create(IPCP_MIN_THREADS, IPCP_ADD_THREADS,
+ mainloop, NULL);
+ if (ipcpi.tpm == NULL)
+ goto fail_tpm_create;
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- if (tpm_start())
+ if (tpm_start(ipcpi.tpm))
goto fail_tpm_start;
ipcp_set_state(IPCP_INIT);
@@ -696,18 +706,18 @@ int ipcp_boot()
return 0;
fail_acceptor:
- tpm_stop();
+ tpm_stop(ipcpi.tpm);
fail_tpm_start:
- tpm_fini();
- fail_tpm_init:
+ tpm_destroy(ipcpi.tpm);
+ fail_tpm_create:
return -1;
}
void ipcp_shutdown()
{
pthread_join(ipcpi.acceptor, NULL);
- tpm_stop();
- tpm_fini();
+ tpm_stop(ipcpi.tpm);
+ tpm_destroy(ipcpi.tpm);
log_info("IPCP %d shutting down.", getpid());
}
@@ -724,7 +734,6 @@ void ipcp_fini()
pthread_mutex_destroy(&ipcpi.state_mtx);
pthread_cond_destroy(&ipcpi.alloc_cond);
pthread_mutex_destroy(&ipcpi.alloc_lock);
- pthread_cond_destroy(&ipcpi.acc_cond);
pthread_cond_destroy(&ipcpi.cmd_cond);
pthread_mutex_destroy(&ipcpi.cmd_lock);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 1b2a0334..d47d224b 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -25,8 +25,10 @@
#include <ouroboros/hash.h>
#include <ouroboros/ipcp.h>
+#include <ouroboros/list.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/tpm.h>
#include <pthread.h>
#include <time.h>
@@ -92,10 +94,7 @@ struct ipcp {
int sockfd;
char * sock_path;
- uint8_t cbuf[IPCP_MSG_BUF_SIZE];
- size_t cmd_len;
- int csockfd;
- pthread_cond_t acc_cond;
+ struct list_head cmds;
pthread_cond_t cmd_cond;
pthread_mutex_t cmd_lock;
@@ -103,6 +102,8 @@ struct ipcp {
pthread_cond_t alloc_cond;
pthread_mutex_t alloc_lock;
+ struct tpm * tpm;
+
pthread_t acceptor;
} ipcpi;
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index 548ae03a..93fd4e4e 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -34,6 +34,7 @@
#include <ouroboros/notifier.h>
#include <ouroboros/random.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/tpm.h>
#include <ouroboros/utils.h>
#include "connmgr.h"
@@ -67,6 +68,7 @@ typedef KadContactMsg kad_contact_msg_t;
#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */
#define KAD_JOIN_RETR 5 /* Number of retries sending a join. */
#define KAD_JOIN_INTV 1 /* Time (seconds) between join retries. */
+#define HANDLE_TIMEO 1000 /* Timeout for dht_handle_sdu tpm check (ms) */
enum dht_state {
DHT_INIT = 0,
@@ -187,6 +189,12 @@ struct bucket {
struct bucket * children[1L << KAD_BETA];
};
+struct cmd {
+ struct list_head next;
+
+ struct shm_du_buff * sdb;
+};
+
struct dht {
size_t alpha;
size_t b;
@@ -212,6 +220,7 @@ struct dht {
struct bmp * cookies;
enum dht_state state;
+ struct list_head cmds;
pthread_cond_t cond;
pthread_mutex_t mtx;
@@ -219,6 +228,8 @@ struct dht {
int fd;
+ struct tpm * tpm;
+
pthread_t worker;
};
@@ -1428,7 +1439,9 @@ static int send_msg(struct dht * dht,
ipcp_sdb_release(sdb);
#endif
fail_msg:
+ pthread_rwlock_wrlock(&dht->lock);
bmp_release(dht->cookies, msg->cookie);
+ pthread_rwlock_unlock(&dht->lock);
fail_bmp_alloc:
return -1;
}
@@ -1838,7 +1851,7 @@ static ssize_t dht_get_contacts(struct dht * dht,
list_head_init(&l);
- pthread_rwlock_rdlock(&dht->lock);
+ pthread_rwlock_wrlock(&dht->lock);
len = dht_contact_list(dht, &l, key);
if (len == 0) {
@@ -1898,9 +1911,13 @@ static void * work(void * o)
dht = (struct dht *) o;
+ pthread_rwlock_rdlock(&dht->lock);
+
intv = gcd(dht->t_expire, dht->t_repub);
intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2;
+ pthread_rwlock_unlock(&dht->lock);
+
list_head_init(&reflist);
while (true) {
@@ -2189,7 +2206,7 @@ int dht_reg(struct dht * dht,
pthread_rwlock_unlock(&dht->lock);
- kad_publish(dht, key, >addr, t_expire);
+ kad_publish(dht, key, addr, t_expire);
return 0;
}
@@ -2271,163 +2288,192 @@ uint64_t dht_query(struct dht * dht,
static void * dht_handle_sdu(void * o)
{
- struct dht * dht;
- struct shm_du_buff * sdb;
- kad_msg_t * msg;
- kad_contact_msg_t ** cmsgs;
- kad_msg_t resp_msg = KAD_MSG__INIT;
- uint64_t addr;
- buffer_t buf;
- size_t i;
- size_t b;
- size_t t_expire;
+ struct dht * dht = (struct dht *) o;
+ struct timespec dl;
+ struct timespec to = {(HANDLE_TIMEO / 1000),
+ (HANDLE_TIMEO % 1000) * MILLION};
+ assert(dht);
- assert(o);
+ while (true) {
+ kad_msg_t * msg;
+ kad_contact_msg_t ** cmsgs;
+ kad_msg_t resp_msg = KAD_MSG__INIT;
+ uint64_t addr;
+ buffer_t buf;
+ size_t i;
+ size_t b;
+ size_t t_expire;
+ struct cmd * cmd;
+ int ret = 0;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, &to, &dl);
+
+ pthread_mutex_lock(&dht->mtx);
+
+ while(list_is_empty(&dht->cmds) && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&dht->cond,
+ &dht->mtx, &dl);
+
+ if (ret == -ETIMEDOUT) {
+ pthread_mutex_unlock(&dht->mtx);
+ if (tpm_check(dht->tpm))
+ break;
+ continue;
+ }
- memset(&buf, 0, sizeof(buf));
+ cmd = list_last_entry(&dht->cmds, struct cmd, next);
+ list_del(&cmd->next);
- dht = ((struct sdu_info *) o)->dht;
- sdb = ((struct sdu_info *) o)->sdb;
+ pthread_mutex_unlock(&dht->mtx);
- assert(dht);
- assert(sdb);
+ i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
- msg = kad_msg__unpack(NULL,
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- shm_du_buff_head(sdb));
+ msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
- ipcp_sdb_release(sdb);
+ ipcp_sdb_release(cmd->sdb);
+ free(cmd);
- free(o);
+ if (msg == NULL) {
+ log_err("Failed to unpack message.");
+ continue;
+ }
- if (msg == NULL) {
- log_err("Failed to unpack message.");
- return (void *) -1;
- }
+ pthread_rwlock_rdlock(&dht->lock);
- pthread_rwlock_rdlock(&dht->lock);
+ b = dht->b;
+ t_expire = dht->t_expire;
- b = dht->b;
- t_expire = dht->t_expire;
+ pthread_rwlock_unlock(&dht->lock);
- pthread_rwlock_unlock(&dht->lock);
+ if (msg->has_key && msg->key.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad key in message.");
+ continue;
+ }
- if (msg->has_key && msg->key.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad key in message.");
- return (void *) -1;
- }
+ if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_warn("Bad source ID in message of type %d.",
+ msg->code);
+ continue;
+ }
- if (msg->has_s_id && !msg->has_b && msg->s_id.len != b) {
- kad_msg__free_unpacked(msg, NULL);
- log_warn("Bad source ID in message of type %d.", msg->code);
- return (void *) -1;
- }
+ if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
+ kad_msg__free_unpacked(msg, NULL);
+ log_dbg("Got a request message when not running.");
+ continue;
+ }
- if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {
- kad_msg__free_unpacked(msg, NULL);
- log_dbg("Got a request message when not running.");
- return (void *) -1;
- }
+ tpm_dec(dht->tpm);
- addr = msg->s_addr;
+ addr = msg->s_addr;
- resp_msg.code = KAD_RESPONSE;
- resp_msg.cookie = msg->cookie;
+ resp_msg.code = KAD_RESPONSE;
+ resp_msg.cookie = msg->cookie;
- switch(msg->code) {
- case KAD_JOIN:
- /* Refuse enrollee on check fails. */
- if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
- log_warn("Parameter mismatch. "
- "DHT enrolment refused.");
- break;
- }
+ switch(msg->code) {
+ case KAD_JOIN:
+ /* Refuse enrollee on check fails. */
+ if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) {
+ log_warn("Parameter mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
- if (msg->t_replicate != KAD_T_REPL) {
- log_warn("Replication time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_replicate != KAD_T_REPL) {
+ log_warn("Replication time mismatch. "
+ "DHT enrolment refused.");
- break;
+ break;
}
- if (msg->t_refresh != KAD_T_REFR) {
- log_warn("Refresh time mismatch. "
- "DHT enrolment refused.");
+ if (msg->t_refresh != KAD_T_REFR) {
+ log_warn("Refresh time mismatch. "
+ "DHT enrolment refused.");
+ break;
+ }
+
+ resp_msg.has_alpha = true;
+ resp_msg.has_b = true;
+ resp_msg.has_k = true;
+ resp_msg.has_t_expire = true;
+ resp_msg.has_t_refresh = true;
+ resp_msg.has_t_replicate = true;
+ resp_msg.alpha = KAD_ALPHA;
+ resp_msg.b = b;
+ resp_msg.k = KAD_K;
+ resp_msg.t_expire = t_expire;
+ resp_msg.t_refresh = KAD_T_REFR;
+ resp_msg.t_replicate = KAD_T_REPL;
break;
- }
+ case KAD_FIND_VALUE:
+ buf = dht_retrieve(dht, msg->key.data);
+ if (buf.len != 0) {
+ resp_msg.n_addrs = buf.len;
+ resp_msg.addrs = (uint64_t *) buf.data;
+ break;
+ }
+ /* FALLTHRU */
+ case KAD_FIND_NODE:
+ /* Return k closest contacts. */
+ resp_msg.n_contacts =
+ dht_get_contacts(dht, msg->key.data, &cmsgs);
+ resp_msg.contacts = cmsgs;
+ break;
+ case KAD_STORE:
+ if (msg->n_contacts < 1) {
+ log_warn("No contacts in store message.");
+ break;
+ }
- resp_msg.has_alpha = true;
- resp_msg.has_b = true;
- resp_msg.has_k = true;
- resp_msg.has_t_expire = true;
- resp_msg.has_t_refresh = true;
- resp_msg.has_t_replicate = true;
- resp_msg.alpha = KAD_ALPHA;
- resp_msg.b = b;
- resp_msg.k = KAD_K;
- resp_msg.t_expire = t_expire;
- resp_msg.t_refresh = KAD_T_REFR;
- resp_msg.t_replicate = KAD_T_REPL;
- break;
- case KAD_FIND_VALUE:
- buf = dht_retrieve(dht, msg->key.data);
- if (buf.len != 0) {
- resp_msg.n_addrs = buf.len;
- resp_msg.addrs = (uint64_t *) buf.data;
+ if (!msg->has_t_expire) {
+ log_warn("No expiry time in store message.");
+ break;
+ }
+
+ kad_add(dht, *msg->contacts, msg->n_contacts,
+ msg->t_expire);
break;
- }
- /* FALLTHRU */
- case KAD_FIND_NODE:
- /* Return k closest contacts. */
- resp_msg.n_contacts =
- dht_get_contacts(dht, msg->key.data, &cmsgs);
- resp_msg.contacts = cmsgs;
- break;
- case KAD_STORE:
- if (msg->n_contacts < 1) {
- log_warn("No contacts in store message.");
+ case KAD_RESPONSE:
+ kad_handle_response(dht, msg);
+ break;
+ default:
+ assert(false);
break;
}
- if (!msg->has_t_expire) {
- log_warn("No expiry time in store message.");
- break;
+ if (msg->code != KAD_JOIN) {
+ pthread_rwlock_wrlock(&dht->lock);
+ if (dht_update_bucket(dht, msg->s_id.data, addr))
+ log_warn("Failed to update bucket.");
+ pthread_rwlock_unlock(&dht->lock);
}
- kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire);
- break;
- case KAD_RESPONSE:
- kad_handle_response(dht, msg);
- break;
- default:
- assert(false);
- break;
- }
+ if (msg->code < KAD_STORE) {
+ if (send_msg(dht, &resp_msg, addr))
+ log_warn("Failed to send response.");
+ }
- if (msg->code != KAD_JOIN) {
- pthread_rwlock_wrlock(&dht->lock);
- if (dht_update_bucket(dht, msg->s_id.data, addr))
- log_warn("Failed to update bucket.");
- pthread_rwlock_unlock(&dht->lock);
- }
+ kad_msg__free_unpacked(msg, NULL);
- if (msg->code < KAD_STORE) {
- if (send_msg(dht, &resp_msg, addr))
- log_warn("Failed to send response.");
- }
+ if (resp_msg.n_addrs > 0)
+ free(resp_msg.addrs);
- kad_msg__free_unpacked(msg, NULL);
+ if (resp_msg.n_contacts == 0) {
+ tpm_inc(dht->tpm);
+ continue;
+ }
- if (resp_msg.n_addrs > 0)
- free(resp_msg.addrs);
+ for (i = 0; i < resp_msg.n_contacts; ++i)
+ kad_contact_msg__free_unpacked(resp_msg.contacts[i],
+ NULL);
+ free(resp_msg.contacts);
- if (resp_msg.n_contacts == 0)
- return (void *) -1;
+ tpm_inc(dht->tpm);
+ }
- for (i = 0; i < resp_msg.n_contacts; ++i)
- kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL);
- free(resp_msg.contacts);
+ tpm_exit(dht->tpm);
return (void *) 0;
}
@@ -2435,22 +2481,24 @@ static void * dht_handle_sdu(void * o)
static void dht_post_sdu(void * comp,
struct shm_du_buff * sdb)
{
- pthread_t thr;
- struct sdu_info * info;
+ struct cmd * cmd;
+ struct dht * dht = (struct dht *) comp;
- info = malloc(sizeof(*info));
- if (info == NULL)
+ cmd = malloc(sizeof(*cmd));
+ if (cmd == NULL) {
+ log_err("Command failed. Out of memory.");
return;
+ }
- info->dht = (struct dht *) comp;
- info->sdb = sdb;
+ cmd->sdb = sdb;
- if (pthread_create(&thr, NULL, dht_handle_sdu, info)) {
- free(info);
- return;
- }
+ pthread_mutex_lock(&dht->mtx);
+
+ list_add(&cmd->next, &dht->cmds);
- pthread_detach(thr);
+ pthread_cond_signal(&dht->cond);
+
+ pthread_mutex_unlock(&dht->mtx);
}
void dht_destroy(struct dht * dht)
@@ -2461,6 +2509,11 @@ void dht_destroy(struct dht * dht)
if (dht == NULL)
return;
+#ifndef __DHT_TEST__
+ tpm_stop(dht->tpm);
+
+ tpm_destroy(dht->tpm);
+#endif
if (dht_get_state(dht) == DHT_RUNNING) {
dht_set_state(dht, DHT_SHUTDOWN);
pthread_cancel(dht->worker);
@@ -2594,6 +2647,7 @@ struct dht * dht_create(uint64_t addr)
list_head_init(&dht->requests);
list_head_init(&dht->refs);
list_head_init(&dht->lookups);
+ list_head_init(&dht->cmds);
if (pthread_rwlock_init(&dht->lock, NULL))
goto fail_rwlock;
@@ -2612,16 +2666,29 @@ struct dht * dht_create(uint64_t addr)
dht->addr = addr;
dht->id = NULL;
#ifndef __DHT_TEST__
+ dht->tpm = tpm_create(2, 1, dht_handle_sdu, dht);
+ if (dht->tpm == NULL)
+ goto fail_tpm_create;
+
+ if (tpm_start(dht->tpm))
+ goto fail_tpm_start;
+
dht->fd = dt_reg_ae(dht, &dht_post_sdu);
notifier_reg(handle_event, dht);
#else
(void) handle_event;
+ (void) dht_handle_sdu;
(void) dht_post_sdu;
#endif
dht->state = DHT_INIT;
return dht;
-
+#ifndef __DHT_TEST__
+ fail_tpm_start:
+ tpm_destroy(dht->tpm);
+ fail_tpm_create:
+ bmp_destroy(dht->cookies);
+#endif
fail_bmp:
pthread_cond_destroy(&dht->cond);
fail_cond: