diff options
34 files changed, 3274 insertions, 890 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c7266eb..85040496 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,12 @@ add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND}) find_package(ProtobufC REQUIRED) include_directories(${PROTOBUF_C_INCLUDE_DIRS}) +include(CheckSymbolExists) +list(APPEND CMAKE_REQUIRED_DEFINITIONS -D_POSIX_C_SOURCE=200809L) +list(APPEND CMAKE_REQUIRED_DEFINITIONS -D__XSI_VISIBLE=500) +list(APPEND CMAKE_REQUIRED_LIBRARIES pthread) +check_symbol_exists(pthread_mutexattr_setrobust pthread.h HAVE_ROBUST_MUTEX) + add_subdirectory(src) add_subdirectory(include) add_subdirectory(doc) diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in index bae2d89e..4c255da5 100644 --- a/include/ouroboros/config.h.in +++ b/include/ouroboros/config.h.in @@ -3,7 +3,8 @@ * * Configuration information * - * Sander Vrijders <[email protected]> + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -35,6 +36,7 @@ #define IPCP_SHIM_ETH_LLC_EXEC "@IPCP_SHIM_ETH_LLC_TARGET@" #define IPCP_NORMAL_EXEC "@IPCP_NORMAL_TARGET@" #define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@" +#cmakedefine HAVE_ROBUST_MUTEX #define AP_MAX_FLOWS 2048 #define AP_RES_FDS 64 #define AP_MAX_FQUEUES 64 @@ -50,25 +52,21 @@ #define SHM_FLOW_SET_PREFIX "/ouroboros.sets." #define IRMD_MAX_FLOWS 4096 /* IRMD dynamic threadpooling */ -#define IRMD_MIN_AV_THREADS 16 -#define IRMD_MAX_AV_THREADS 64 -#define IRMD_MAX_THREADS 256 +#define IRMD_MIN_THREADS 16 +#define IRMD_ADD_THREADS 32 /* IPCP dynamic threadpooling */ -#define IPCP_MIN_AV_THREADS 4 -#define IPCP_MAX_AV_THREADS 32 -#define IPCP_MAX_THREADS 64 - +#define IPCP_MIN_THREADS 4 +#define IPCP_ADD_THREADS 16 +#define IPCP_SCHED_THREADS 8 #define IPCPD_MAX_CONNS IRMD_MAX_FLOWS #define PTHREAD_COND_CLOCK CLOCK_MONOTONIC #define PFT_SIZE 1 << 12 /* Timeout values */ -#define IRMD_TPM_TIMEOUT 1000 -#define IPCP_TPM_TIMEOUT 1000 #define IRMD_ACCEPT_TIMEOUT 100 #define IRMD_REQ_ARR_TIMEOUT 500 #define IRMD_FLOW_TIMEOUT 5000 #define IPCP_ACCEPT_TIMEOUT 100 -#define SOCKET_TIMEOUT 4000 +#define SOCKET_TIMEOUT 10000 #define CDAP_REPLY_TIMEOUT 1000 #define ENROLL_TIMEOUT 2000 diff --git a/include/ouroboros/endian.h b/include/ouroboros/endian.h index 16200028..873aff73 100644 --- a/include/ouroboros/endian.h +++ b/include/ouroboros/endian.h @@ -24,7 +24,7 @@ #ifndef OUROBOROS_ENDIAN_H #define OUROBOROS_ENDIAN_H -#if defined(__linux__) || defined(__CYGWIN__) +#if defined(__linux__) || defined(__CYGWIN__) || defined(__MACH__) #ifndef _BSD_SOURCE #define _BSD_SOURCE diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index a4e94b89..3db2a0dd 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -24,10 +24,13 @@ #ifndef OUROBOROS_NP1_FLOW_H #define OUROBOROS_NP1_FLOW_H +#include <ouroboros/qoscube.h> + #include <unistd.h> -int np1_flow_alloc(pid_t n_api, - int port_id); +int np1_flow_alloc(pid_t n_api, + int port_id, + qoscube_t qc); int np1_flow_resp(int port_id); diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h index 0d65c15d..660709bf 100644 --- a/include/ouroboros/sockets.h +++ b/include/ouroboros/sockets.h @@ -30,9 +30,6 @@ typedef IpcpConfigMsg ipcp_config_msg_t; typedef DifInfoMsg dif_info_msg_t; -#include "frct_enroll.pb-c.h" -typedef FrctEnrollMsg frct_enroll_msg_t; - #include "irmd_messages.pb-c.h" typedef IrmMsg irm_msg_t; diff --git a/src/lib/frct_enroll.proto b/include/ouroboros/tpm.h index 497d6acc..d34f06f3 100644 --- a/src/lib/frct_enroll.proto +++ b/include/ouroboros/tpm.h @@ -1,10 +1,10 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * QoS messages + * Threadpool management * - * Dimitri Staessens <[email protected]> - * Sander Vrijders <[email protected]> + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -21,12 +21,27 @@ * 02110-1301 USA */ -syntax = "proto2"; +#ifndef OUROBOROS_LIB_TPM_H +#define OUROBOROS_LIB_TPM_H -message frct_enroll_msg { - required bool resource_control = 1; - required bool reliable = 2; - required bool error_check = 3; - required bool ordered = 4; - required bool partial = 5; -}; +#include <stdbool.h> + +int tpm_init(size_t min, + size_t inc, + void * (* func)(void *)); + +int tpm_start(void); + +void tpm_stop(void); + +void tpm_fini(void); + +bool tpm_check(void); + +void tpm_exit(void); + +void tpm_dec(void); + +void tpm_inc(void); + +#endif /* OUROBOROS_LIB_TPM_H */ diff --git a/src/ipcpd/CMakeLists.txt b/src/ipcpd/CMakeLists.txt index 00baa762..e0b375b6 100644 --- a/src/ipcpd/CMakeLists.txt +++ b/src/ipcpd/CMakeLists.txt @@ -8,7 +8,7 @@ set(IPCP_SOURCES add_subdirectory(local) add_subdirectory(normal) add_subdirectory(shim-udp) -if (NOT APPLE) +if (NOT APPLE AND NOT CMAKE_SYSTEM_NAME STREQUAL GNU) add_subdirectory(shim-eth-llc) endif () add_subdirectory(tests) diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index fdd1edc4..48ff046c 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -31,6 +31,7 @@ #include <ouroboros/dev.h> #include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> +#include <ouroboros/tpm.h> #include "ipcp.h" @@ -56,6 +57,8 @@ void ipcp_sig_handler(int sig, if (ipcp_get_state() == IPCP_OPERATIONAL) ipcp_set_state(IPCP_SHUTDOWN); } + + tpm_stop(); default: return; } @@ -87,51 +90,7 @@ void ipcp_hash_str(char * buf, buf[2 * i] = '\0'; } -static void thread_inc(void) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - - ++ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void thread_dec(void) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - - --ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static bool thread_check(void) -{ - int ret; - - pthread_mutex_lock(&ipcpi.threads_lock); - - ret = ipcpi.threads > ipcpi.max_threads; - - pthread_mutex_unlock(&ipcpi.threads_lock); - - return ret; -} - -static void thread_exit(ssize_t id) -{ - pthread_mutex_lock(&ipcpi.threads_lock); - bmp_release(ipcpi.thread_ids, id); - - --ipcpi.threads; - pthread_cond_signal(&ipcpi.threads_cond); - - pthread_mutex_unlock(&ipcpi.threads_lock); -} - -static void * ipcp_main_loop(void * o) +static void * mainloop(void * o) { int lsockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; @@ -147,7 +106,7 @@ static void * ipcp_main_loop(void * o) struct timeval ltv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; - ssize_t id = (ssize_t) o; + (void) o; while (true) { #ifdef __FreeBSD__ @@ -159,8 +118,8 @@ static void * ipcp_main_loop(void * o) if (ipcp_get_state() == IPCP_SHUTDOWN || ipcp_get_state() == IPCP_NULL || - thread_check()) { - thread_exit(id); + tpm_check()) { + tpm_exit(); break; } @@ -192,7 +151,7 @@ static void * ipcp_main_loop(void * o) continue; } - thread_dec(); + tpm_dec(); switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: @@ -260,7 +219,6 @@ static void * ipcp_main_loop(void * o) ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dst_name, &info); - if (ret_msg.result == 0) { ret_msg.dif_info = &dif_info; dif_info.dir_hash_algo = info.dir_hash_algo; @@ -332,7 +290,9 @@ static void * ipcp_main_loop(void * o) break; } - fd = np1_flow_alloc(msg->api, msg->port_id); + fd = np1_flow_alloc(msg->api, + msg->port_id, + msg->qoscube); if (fd < 0) { log_err("Failed allocating fd on port_id %d.", msg->port_id); @@ -409,7 +369,7 @@ static void * ipcp_main_loop(void * o) if (buffer.len == 0) { log_err("Failed to pack reply message"); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } @@ -417,7 +377,7 @@ static void * ipcp_main_loop(void * o) if (buffer.data == NULL) { log_err("Failed to create reply buffer."); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } @@ -427,14 +387,14 @@ static void * ipcp_main_loop(void * o) log_err("Failed to send reply message"); free(buffer.data); close(lsockfd); - thread_inc(); + tpm_inc(); continue; } free(buffer.data); close(lsockfd); - thread_inc(); + tpm_inc(); } return (void *) 0; @@ -497,15 +457,6 @@ int ipcp_init(int argc, ipcpi.state = IPCP_NULL; ipcpi.shim_data = NULL; - ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCP_MAX_THREADS); - if (ipcpi.threadpool == NULL) { - ret = -ENOMEM; - goto fail_thr; - } - - ipcpi.threads = 0; - ipcpi.max_threads = IPCP_MIN_AV_THREADS; - ipcpi.sock_path = ipcp_sock_path(getpid()); if (ipcpi.sock_path == NULL) goto fail_sock_path; @@ -527,11 +478,6 @@ int ipcp_init(int argc, goto fail_state_mtx; } - if (pthread_mutex_init(&ipcpi.threads_lock, NULL)) { - log_err("Could not create mutex."); - goto fail_thread_lock; - } - if (pthread_condattr_init(&cattr)) { log_err("Could not create condattr."); goto fail_cond_attr; @@ -545,17 +491,6 @@ int ipcp_init(int argc, goto fail_state_cond; } - if (pthread_cond_init(&ipcpi.threads_cond, &cattr)) { - log_err("Could not init condvar."); - goto fail_thread_cond; - } - - ipcpi.thread_ids = bmp_create(IPCP_MAX_THREADS, 0); - if (ipcpi.thread_ids == NULL) { - log_err("Could not init condvar."); - goto fail_bmp; - } - if (pthread_mutex_init(&ipcpi.alloc_lock, NULL)) { log_err("Failed to init mutex."); goto fail_alloc_lock; @@ -588,94 +523,21 @@ int ipcp_init(int argc, fail_alloc_cond: pthread_mutex_destroy(&ipcpi.alloc_lock); fail_alloc_lock: - bmp_destroy(ipcpi.thread_ids); - fail_bmp: - pthread_cond_destroy(&ipcpi.threads_cond); - fail_thread_cond: pthread_cond_destroy(&ipcpi.state_cond); fail_state_cond: pthread_condattr_destroy(&cattr); fail_cond_attr: - pthread_mutex_destroy(&ipcpi.threads_lock); - fail_thread_lock: pthread_mutex_destroy(&ipcpi.state_mtx); fail_state_mtx: close(ipcpi.sockfd); fail_serv_sock: free(ipcpi.sock_path); fail_sock_path: - free(ipcpi.threadpool); - fail_thr: ouroboros_fini(); return ret; } -void * threadpoolmgr(void * o) -{ - pthread_attr_t pattr; - struct timespec dl; - struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), - (IRMD_TPM_TIMEOUT % 1000) * MILLION}; - (void) o; - - if (pthread_attr_init(&pattr)) - return (void *) -1; - - pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - - while (true) { - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - - if (ipcp_get_state() == IPCP_SHUTDOWN || - ipcp_get_state() == IPCP_NULL) { - pthread_attr_destroy(&pattr); - log_dbg("Waiting for threads to exit."); - pthread_mutex_lock(&ipcpi.threads_lock); - while (ipcpi.threads > 0) - pthread_cond_wait(&ipcpi.threads_cond, - &ipcpi.threads_lock); - pthread_mutex_unlock(&ipcpi.threads_lock); - - log_dbg("Threadpool manager done."); - break; - } - - pthread_mutex_lock(&ipcpi.threads_lock); - - if (ipcpi.threads < IPCP_MIN_AV_THREADS) { - log_dbg("Increasing threadpool."); - ipcpi.max_threads = IPCP_MAX_AV_THREADS; - - while (ipcpi.threads < ipcpi.max_threads) { - ssize_t id = bmp_allocate(ipcpi.thread_ids); - if (!bmp_is_id_valid(ipcpi.thread_ids, id)) { - log_warn("IPCP threadpool exhausted."); - break; - } - - if (pthread_create(&ipcpi.threadpool[id], - &pattr, ipcp_main_loop, - (void *) id)) - log_warn("Failed to start new thread."); - else - ++ipcpi.threads; - } - } - - if (pthread_cond_timedwait(&ipcpi.threads_cond, - &ipcpi.threads_lock, - &dl) == ETIMEDOUT) - if (ipcpi.threads > IPCP_MIN_AV_THREADS) - --ipcpi.max_threads; - - pthread_mutex_unlock(&ipcpi.threads_lock); - } - - return (void *) 0; -} - int ipcp_boot() { struct sigaction sig_act; @@ -700,9 +562,15 @@ int ipcp_boot() pthread_sigmask(SIG_BLOCK, &sigset, NULL); - ipcp_set_state(IPCP_INIT); + if (tpm_init(IPCP_MIN_THREADS, IPCP_ADD_THREADS, mainloop)) + return -1; + + if (tpm_start()) { + tpm_fini(); + return -1; + } - pthread_create(&ipcpi.tpm, NULL, threadpoolmgr, NULL); + ipcp_set_state(IPCP_INIT); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); @@ -711,8 +579,7 @@ int ipcp_boot() void ipcp_shutdown() { - pthread_join(ipcpi.tpm, NULL); - + tpm_fini(); log_info("IPCP %d shutting down.", getpid()); } @@ -722,16 +589,11 @@ void ipcp_fini() if (unlink(ipcpi.sock_path)) log_warn("Could not unlink %s.", ipcpi.sock_path); - bmp_destroy(ipcpi.thread_ids); - free(ipcpi.sock_path); - free(ipcpi.threadpool); shim_data_destroy(ipcpi.shim_data); pthread_cond_destroy(&ipcpi.state_cond); - pthread_cond_destroy(&ipcpi.threads_cond); - pthread_mutex_destroy(&ipcpi.threads_lock); pthread_mutex_destroy(&ipcpi.state_mtx); pthread_cond_destroy(&ipcpi.alloc_cond); pthread_mutex_destroy(&ipcpi.alloc_lock); diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 3f5e1bd6..fb69df5c 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -93,15 +93,6 @@ struct ipcp { pthread_cond_t alloc_cond; pthread_mutex_t alloc_lock; - pthread_t * threadpool; - - struct bmp * thread_ids; - size_t max_threads; - size_t threads; - pthread_cond_t threads_cond; - pthread_mutex_t threads_lock; - - pthread_t tpm; } ipcpi; int ipcp_init(int argc, diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 336b0e8f..8c2d4efc 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -15,6 +15,8 @@ include_directories(${CMAKE_BINARY_DIR}/include) set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET") protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto) +protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) + # Add GPB sources of policies last protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto) @@ -22,6 +24,7 @@ set(SOURCE_FILES # Add source files here addr_auth.c connmgr.c + dht.c dir.c dt.c dt_pci.c @@ -42,7 +45,7 @@ set(SOURCE_FILES ) add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} - ${FLOW_ALLOC_SRCS} ${FSO_SRCS}) + ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS}) target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros) include(AddCompileFlags) @@ -53,3 +56,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug) install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin) add_subdirectory(pol/tests) +add_subdirectory(tests) diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c new file mode 100644 index 00000000..74618658 --- /dev/null +++ b/src/ipcpd/normal/dht.c @@ -0,0 +1,2383 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#define OUROBOROS_PREFIX "dht" + +#include <ouroboros/config.h> +#include <ouroboros/hash.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/list.h> +#include <ouroboros/random.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/utils.h> + +#include "dht.h" +#include "dt.h" + +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <inttypes.h> + +#include "kademlia.pb-c.h" +typedef KadMsg kad_msg_t; +typedef KadContactMsg kad_contact_msg_t; + +#define DHT_MAX_REQS 2048 /* KAD recommends rnd(), bmp can be changed. */ +#define KAD_ALPHA 3 /* Parallel factor, proven optimal value. */ +#define KAD_K 8 /* Replication factor, MDHT value. */ +#define KAD_T_REPL 900 /* Replication time, tied to k. MDHT value. */ +#define KAD_T_REFR 900 /* Refresh time stale bucket, MDHT value. */ +#define KAD_T_JOIN 6 /* Response time to wait for a join. */ +#define KAD_T_RESP 2 /* Response time to wait for a response. */ +#define KAD_R_PING 2 /* Ping retries before declaring peer dead. */ +#define KAD_QUEER 15 /* Time to declare peer questionable. */ +#define KAD_BETA 8 /* Bucket split factor, must be 1, 2, 4 or 8. */ +#define KAD_RESP_RETR 6 /* Number of retries on sending a response. */ + +enum dht_state { + DHT_INIT = 0, + DHT_RUNNING, + DHT_SHUTDOWN, +}; + +enum kad_code { + KAD_JOIN = 0, + KAD_FIND_NODE, + KAD_FIND_VALUE, + /* Messages without a response below. */ + KAD_STORE, + KAD_RESPONSE +}; + +enum kad_req_state { + REQ_NULL = 0, + REQ_INIT, + REQ_PENDING, + REQ_RESPONSE, + REQ_DONE, + REQ_DESTROY +}; + +enum lookup_state { + LU_NULL = 0, + LU_INIT, + LU_PENDING, + LU_UPDATE, + LU_COMPLETE, + LU_DONE, + LU_DESTROY +}; + +struct kad_req { + struct list_head next; + + uint32_t cookie; + enum kad_code code; + uint8_t * key; + uint64_t addr; + + enum kad_req_state state; + pthread_cond_t cond; + pthread_mutex_t lock; + + time_t t_exp; +}; + +struct lookup { + struct list_head next; + + uint8_t * key; + + struct list_head contacts; + size_t n_contacts; + + uint64_t * addrs; + size_t n_addrs; + + enum lookup_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct val { + struct list_head next; + + uint64_t addr; + + time_t t_exp; + time_t t_rep; +}; + +struct ref_entry { + struct list_head next; + + uint8_t * key; + + time_t t_rep; +}; + +struct dht_entry { + struct list_head next; + + uint8_t * key; + size_t n_vals; + struct list_head vals; +}; + +struct contact { + struct list_head next; + + uint8_t * id; + uint64_t addr; + + size_t fails; + time_t t_seen; +}; + +struct bucket { + struct list_head contacts; + size_t n_contacts; + + struct list_head alts; + size_t n_alts; + + time_t t_refr; + + size_t depth; + uint8_t mask; + + struct bucket * parent; + struct bucket * children[1L << KAD_BETA]; +}; + +struct dht { + size_t alpha; + size_t b; + size_t k; + + time_t t_expire; + time_t t_refresh; + time_t t_replic; + time_t t_repub; + + uint8_t * id; + uint64_t addr; + + struct bucket * buckets; + + struct list_head entries; + + struct list_head refs; + + struct list_head lookups; + + struct list_head requests; + struct bmp * cookies; + + enum dht_state state; + pthread_mutex_t mtx; + + pthread_rwlock_t lock; + + int fd; + + pthread_t worker; +}; + +static uint8_t * dht_dup_key(const uint8_t * key, + size_t len) +{ + uint8_t * dup; + + dup = malloc(sizeof(*dup) * len); + if (dup == NULL) + return NULL; + + memcpy(dup, key, len); + + return dup; +} + +static enum dht_state dht_get_state(struct dht * dht) +{ + enum dht_state state; + + pthread_mutex_lock(&dht->mtx); + + state = dht->state; + + pthread_mutex_unlock(&dht->mtx); + + return state; +} + +static void dht_set_state(struct dht * dht, + enum dht_state state) +{ + pthread_mutex_lock(&dht->mtx); + + dht->state = state; + + pthread_mutex_unlock(&dht->mtx); +} + +static uint8_t * create_id(size_t len) +{ + uint8_t * id; + + id = malloc(len); + if (id == NULL) + return NULL; + + if (random_buffer(id, len) < 0) { + free(id); + return NULL; + } + + return id; +} + +static struct kad_req * kad_req_create(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct kad_req * req; + pthread_condattr_t cattr; + struct timespec t; + + req = malloc(sizeof(*req)); + if (req == NULL) + return NULL; + + list_head_init(&req->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + req->t_exp = t.tv_sec + KAD_T_RESP; + req->addr = addr; + req->state = REQ_INIT; + req->cookie = msg->cookie; + req->code = msg->code; + req->key = NULL; + + if (msg->has_key) { + req->key = dht_dup_key(msg->key.data, dht->b); + if (req->key == NULL) { + free(req); + return NULL; + } + } + + if (pthread_mutex_init(&req->lock, NULL)) { + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&req->cond, &cattr)) { + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&req->lock); + free(req->key); + free(req); + return NULL; + } + + pthread_condattr_destroy(&cattr); + + return req; +} + +static void kad_req_destroy(struct kad_req * req) +{ + assert(req); + + if (req->key != NULL) + free(req->key); + + pthread_mutex_lock(&req->lock); + + switch (req->state) { + case REQ_DESTROY: + pthread_mutex_unlock(&req->lock); + return; + case REQ_PENDING: + req->state = REQ_DESTROY; + pthread_cond_signal(&req->cond); + break; + case REQ_INIT: + case REQ_DONE: + req->state = REQ_NULL; + break; + case REQ_RESPONSE: + case REQ_NULL: + default: + break; + } + + while (req->state != REQ_NULL) + pthread_cond_wait(&req->cond, &req->lock); + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + free(req); +} + +static int kad_req_wait(struct kad_req * req, + time_t t) +{ + struct timespec timeo = {t, 0}; + struct timespec abs; + int ret = 0; + + assert(req); + + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + ts_add(&abs, &timeo, &abs); + + pthread_mutex_lock(&req->lock); + + req->state = REQ_PENDING; + + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) + ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); + + switch(req->state) { + case REQ_DESTROY: + ret = -1; + req->state = REQ_NULL; + pthread_cond_signal(&req->cond); + break; + case REQ_PENDING: /* ETIMEDOUT */ + case REQ_RESPONSE: + req->state = REQ_DONE; + pthread_cond_signal(&req->cond); + break; + default: + break; + } + + pthread_mutex_unlock(&req->lock); + + return ret; +} + +static void kad_req_respond(struct kad_req * req) +{ + pthread_mutex_lock(&req->lock); + + req->state = REQ_RESPONSE; + pthread_cond_signal(&req->cond); + + pthread_mutex_unlock(&req->lock); +} + +static struct contact * contact_create(const uint8_t * id, + size_t len, + uint64_t addr) +{ + struct contact * c; + struct timespec t; + + c = malloc(sizeof(*c)); + if (c == NULL) + return NULL; + + list_head_init(&c->next); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + c->addr = addr; + c->fails = 0; + c->t_seen = t.tv_sec; + c->id = dht_dup_key(id, len); + if (c->id == NULL) { + free(c); + return NULL; + } + + return c; +} + +static void contact_destroy(struct contact * c) +{ + if (c != NULL) + free(c->id); + + free(c); +} + +static struct bucket * iter_bucket(struct bucket * b, + const uint8_t * id) +{ + uint8_t byte; + uint8_t mask; + + assert(b); + + if (b->children[0] == NULL) + return b; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth) * KAD_BETA) & (CHAR_BIT - 1)); + + return iter_bucket(b->children[(byte & mask)], id); +} + +static struct bucket * dht_get_bucket(struct dht * dht, + const uint8_t * id) +{ + assert(dht->buckets); + + return iter_bucket(dht->buckets, id); +} + +/* + * If someone builds a network where the n (n > k) closest nodes all + * have IDs starting with the same 64 bits: by all means, change this. + */ +static uint64_t dist(const uint8_t * src, + const uint8_t * dst) +{ + return betoh64(*((uint64_t *) src) ^ *((uint64_t *) dst)); +} + +static size_t list_add_sorted(struct list_head * l, + struct contact * c, + const uint8_t * key) +{ + struct list_head * p; + + assert(l); + assert(c); + assert(key); + assert(c->id); + + list_for_each(p, l) { + struct contact * e = list_entry(p, struct contact, next); + if (dist(c->id, key) > dist(e->id, key)) + break; + } + + list_add_tail(&c->next, p); + + return 1; +} + +static size_t dht_contact_list(struct dht * dht, + struct list_head * l, + const uint8_t * key) +{ + struct list_head * p; + struct bucket * b; + size_t len = 0; + size_t i; + struct timespec t; + + assert(l); + assert(dht); + assert(key); + assert(list_is_empty(l)); + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + b = dht_get_bucket(dht, key); + if (b == NULL) + return 0; + + b->t_refr = t.tv_sec + KAD_T_REFR; + + if (b->n_contacts == dht->k || b->parent == NULL) { + list_for_each(p, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } else { + struct bucket * d = b->parent; + for (i = 0; i < (1L << KAD_BETA); ++i) { + list_for_each(p, &d->children[i]->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + c = contact_create(c->id, dht->b, c->addr); + if (c == NULL) + continue; + if (list_add_sorted(l, c, key) == 1) + if (++len > dht->k) + break; + } + } + } + + assert(len == dht->k || b->parent == NULL); + + return len; +} + +static struct lookup * lookup_create(struct dht * dht, + const uint8_t * id) +{ + struct lookup * lu; + pthread_condattr_t cattr; + + assert(dht); + assert(id); + + lu = malloc(sizeof(*lu)); + if (lu == NULL) + goto fail_malloc; + + list_head_init(&lu->contacts); + + lu->state = LU_INIT; + lu->addrs = NULL; + lu->n_addrs = 0; + lu->key = dht_dup_key(id, dht->b); + if (lu->key == NULL) + goto fail_id; + + if (pthread_mutex_init(&lu->lock, NULL)) + goto fail_mutex; + + pthread_condattr_init(&cattr); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_cond_init(&lu->cond, &cattr)) + goto fail_cond; + + pthread_condattr_destroy(&cattr); + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&lu->next, &dht->lookups); + + lu->n_contacts = dht_contact_list(dht, &lu->contacts, id); + + pthread_rwlock_unlock(&dht->lock); + + return lu; + + fail_cond: + pthread_condattr_destroy(&cattr); + pthread_mutex_destroy(&lu->lock); + fail_mutex: + free(lu->key); + fail_id: + free(lu); + fail_malloc: + return NULL; +} + +static void lookup_destroy(struct lookup * lu) +{ + struct list_head * p; + struct list_head * h; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + switch (lu->state) { + case LU_DESTROY: + pthread_mutex_unlock(&lu->lock); + return; + case LU_PENDING: + lu->state = LU_DESTROY; + pthread_cond_signal(&lu->cond); + break; + case LU_INIT: + case LU_DONE: + case LU_UPDATE: + case LU_COMPLETE: + lu->state = REQ_NULL; + break; + case LU_NULL: + default: + break; + } + + while (lu->state != LU_NULL) + pthread_cond_wait(&lu->cond, &lu->lock); + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_cond_destroy(&lu->cond); + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_update(struct dht * dht, + struct lookup * lu, + kad_msg_t * msg) +{ + struct list_head * p = NULL; + struct contact * c = NULL; + size_t n; + size_t pos = 0; + + assert(lu); + assert(msg); + + if (dht_get_state(dht) != DHT_RUNNING) + return; + + pthread_mutex_lock(&lu->lock); + + if (msg->n_addrs > 0) { + if (lu->addrs == NULL) { + lu->addrs = malloc(sizeof(*lu->addrs) * msg->n_addrs); + for (n = 0; n < msg->n_addrs; ++n) + lu->addrs[n] = msg->addrs[n]; + lu->n_addrs = msg->n_addrs; + } + lu->state = LU_COMPLETE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; + } + + while (lu->state == LU_INIT) + pthread_cond_wait(&lu->cond, &lu->lock); + + for (n = 0; n < msg->n_contacts; ++n) { + c = contact_create(msg->contacts[n]->id.data, + dht->b, msg->contacts[n]->addr); + if (c == NULL) + continue; + + list_for_each(p, &lu->contacts) { + struct contact * e; + e = list_entry(p, struct contact, next); + if (!memcmp(e->id, c->id, dht->b)) { + contact_destroy(c); + c = NULL; + break; + } + + if (dist(c->id, lu->key) > dist(e->id, lu->key)) + break; + pos++; + } + + if (c == NULL) + continue; + + if (lu->n_contacts < dht->k) { + list_add_tail(&c->next, p); + ++lu->n_contacts; + } else if (pos == dht->k) { + contact_destroy(c); + continue; + } else { + struct contact * d; + list_add_tail(&c->next, p); + d = list_last_entry(&lu->contacts, struct contact, next); + list_del(&d->next); + contact_destroy(d); + } + } + + lu->state = LU_UPDATE; + pthread_cond_signal(&lu->cond); + pthread_mutex_unlock(&lu->lock); + return; +} + +static ssize_t lookup_get_addrs(struct lookup * lu, + uint64_t * addrs) +{ + ssize_t n; + + assert(lu); + + pthread_mutex_lock(&lu->lock); + + for (n = 0; (size_t) n < lu->n_addrs; ++n) + addrs[n] = lu->addrs[n]; + + assert((size_t) n == lu->n_addrs); + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static ssize_t lookup_contact_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + ssize_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + addrs[n] = c->addr; + n++; + } + + pthread_mutex_unlock(&lu->lock); + + return n; +} + +static void lookup_new_addrs(struct lookup * lu, + uint64_t * addrs) +{ + struct list_head * p; + size_t n = 0; + + assert(lu); + assert(addrs); + + pthread_mutex_lock(&lu->lock); + + /* Uses fails to check if the contact has been contacted. */ + list_for_each(p, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (c->fails == 0) { + c->fails = 1; + addrs[n] = c->addr; + n++; + } + + if (n == KAD_ALPHA) + break; + } + + assert(n <= KAD_ALPHA); + + addrs[n] = 0; + + if (n == 0) + lu->state = LU_DONE; + + pthread_mutex_unlock(&lu->lock); +} + +static enum lookup_state lookup_wait(struct lookup * lu) +{ + enum lookup_state state; + + pthread_mutex_lock(&lu->lock); + + lu->state = LU_PENDING; + pthread_cond_signal(&lu->cond); + + pthread_cleanup_push((void (*)(void *)) lookup_destroy, (void *) lu); + + while (lu->state == LU_PENDING) + pthread_cond_wait(&lu->cond, &lu->lock); + + pthread_cleanup_pop(false); + + state = lu->state; + + pthread_mutex_unlock(&lu->lock); + + return state; +} + +static struct kad_req * dht_find_request(struct dht * dht, + kad_msg_t * msg) +{ + struct list_head * p; + + assert(dht); + assert(msg); + + list_for_each(p, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + if (r->cookie == msg->cookie) + return r; + } + + return NULL; +} + +static struct lookup * dht_find_lookup(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + assert(dht); + assert(key); + + list_for_each(p, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + if (!memcmp(l->key, key, dht->b)) + return l; + } + + return NULL; +} + +static struct val * val_create(uint64_t addr, + time_t exp) +{ + struct val * v; + struct timespec t; + + v = malloc(sizeof(*v)); + if (v == NULL) + return NULL; + + list_head_init(&v->next); + v->addr = addr; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + + return v; +} + +static void val_destroy(struct val * v) +{ + assert(v); + + free(v); +} + +static struct ref_entry * ref_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + struct timespec t; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + e->t_rep = t.tv_sec + dht->t_repub; + + return e; +} + +static void ref_entry_destroy(struct ref_entry * e) +{ + free(e->key); + free(e); +} + +static struct dht_entry * dht_entry_create(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + + assert(dht); + assert(key); + + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + list_head_init(&e->next); + list_head_init(&e->vals); + + e->n_vals = 0; + + e->key = dht_dup_key(key, dht->b); + if (e->key == NULL) { + free(e); + return NULL; + } + + return e; +} + +static void dht_entry_destroy(struct dht_entry * e) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + list_del(&v->next); + val_destroy(v); + } + + free(e->key); + + free(e); +} + +static int dht_entry_add_addr(struct dht_entry * e, + uint64_t addr, + time_t exp) +{ + struct list_head * p; + struct val * val; + struct timespec t; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + if (v->t_exp < t.tv_sec + exp) { + v->t_exp = t.tv_sec + exp; + v->t_rep = t.tv_sec + KAD_T_REPL; + } + + return 0; + } + } + + val = val_create(addr, exp); + if (val == NULL) + return -ENOMEM; + + list_add(&val->next, &e->vals); + ++e->n_vals; + + return 0; +} + + +static void dht_entry_del_addr(struct dht_entry * e, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + + assert(e); + + list_for_each_safe(p, h, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr == addr) { + list_del(&v->next); + val_destroy(v); + --e->n_vals; + } + } + + if (e->n_vals == 0) { + list_del(&e->next); + dht_entry_destroy(e); + } +} + +static uint64_t dht_entry_get_addr(struct dht * dht, + struct dht_entry * e) +{ + struct list_head * p; + + assert(e); + assert(!list_is_empty(&e->vals)); + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + if (v->addr != dht->addr) + return v->addr; + } + + return 0; +} + +/* Forward declaration. */ +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * key, + enum kad_code code); + + +/* Build a refresh list. */ +static void bucket_refresh(struct dht * dht, + struct bucket * b, + time_t t, + struct list_head * r) +{ + size_t i; + + if (*b->children != NULL) + for (i = 0; i < (1L << KAD_BETA); ++i) + bucket_refresh(dht, b->children[i], t, r); + + if (b->n_contacts == 0) + return; + + if (t > b->t_refr) { + struct contact * c; + struct contact * d; + c = list_first_entry(&b->contacts, struct contact, next); + d = contact_create(c->id, dht->b, c->addr); + if (c != NULL) + list_add(&d->next, r); + return; + } +} + + +static struct bucket * bucket_create(void) +{ + struct bucket * b; + struct timespec t; + size_t i; + + b = malloc(sizeof(*b)); + if (b == NULL) + return NULL; + + list_head_init(&b->contacts); + b->n_contacts = 0; + + list_head_init(&b->alts); + b->n_alts = 0; + + clock_gettime(CLOCK_REALTIME_COARSE, &t); + b->t_refr = t.tv_sec + KAD_T_REFR; + + for (i = 0; i < (1L << KAD_BETA); ++i) + b->children[i] = NULL; + + b->parent = NULL; + b->depth = 0; + + return b; +} + +static void bucket_destroy(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + size_t i; + + assert(b); + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i] != NULL) + bucket_destroy(b->children[i]); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + list_for_each_safe(p, h, &b->alts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + + free(b); +} + +static bool bucket_has_id(struct bucket * b, + const uint8_t * id) +{ + uint8_t mask; + uint8_t byte; + + if (b->depth == 0) + return true; + + byte = id[(b->depth * KAD_BETA) / CHAR_BIT]; + + mask = ((1L << KAD_BETA) - 1) & 0xFF; + + byte >>= (CHAR_BIT - KAD_BETA) - + (((b->depth - 1) * KAD_BETA) & (CHAR_BIT - 1)); + + return ((byte & mask) == b->mask); +} + +static int split_bucket(struct bucket * b) +{ + struct list_head * p; + struct list_head * h; + uint8_t mask = 0; + size_t i; + size_t c; + + assert(b); + assert(b->n_alts == 0); + assert(b->n_contacts); + assert(b->children[0] == NULL); + + c = b->n_contacts; + + for (i = 0; i < (1L << KAD_BETA); ++i) { + b->children[i] = bucket_create(); + if (b->children[i] == NULL) { + size_t j; + for (j = 0; j < i; ++j) + bucket_destroy(b->children[j]); + return -1; + } + + b->children[i]->depth = b->depth + 1; + b->children[i]->mask = mask; + b->children[i]->parent = b; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c; + c = list_entry(p, struct contact, next); + if (bucket_has_id(b->children[i], c->id)) { + list_del(&c->next); + --b->n_contacts; + list_add(&c->next, &b->children[i]->contacts); + ++b->children[i]->n_contacts; + } + } + + mask++; + } + + for (i = 0; i < (1L << KAD_BETA); ++i) + if (b->children[i]->n_contacts == c) + split_bucket(b->children[i]); + + return 0; +} + +/* Locked externally to mandate update as (final) part of join transaction. */ +static int dht_update_bucket(struct dht * dht, + const uint8_t * id, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + struct contact * c; + + assert(dht); + + b = dht_get_bucket(dht, id); + if (b == NULL) + return -1; + + c = contact_create(id, dht->b, addr); + if (c == NULL) + return -1; + + list_for_each_safe(p, h, &b->contacts) { + struct contact * d = list_entry(p, struct contact, next); + if (d->addr == addr) { + list_del(&d->next); + contact_destroy(d); + --b->n_contacts; + } + } + + if (b->n_contacts == dht->k) { + if (bucket_has_id(b, dht->id)) { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + if (split_bucket(b)) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + } + } else if (b->n_alts == dht->k) { + struct contact * d; + d = list_first_entry(&b->alts, struct contact, next); + list_del(&d->next); + contact_destroy(d); + list_add_tail(&c->next, &b->alts); + } else { + list_add_tail(&c->next, &b->alts); + ++b->n_alts; + } + } else { + list_add_tail(&c->next, &b->contacts); + ++b->n_contacts; + } + + return 0; +} + +static int send_msg(struct dht * dht, + kad_msg_t * msg, + uint64_t addr) +{ + struct shm_du_buff * sdb; + struct kad_req * req; + size_t len; + int retr = 0; + + if (msg->code == KAD_RESPONSE) + retr = KAD_RESP_RETR; + + pthread_rwlock_wrlock(&dht->lock); + + if (dht->id != NULL) { + msg->has_s_id = true; + msg->s_id.data = dht->id; + msg->s_id.len = dht->b; + } + + msg->s_addr = dht->addr; + + if (msg->code < KAD_STORE) { + msg->cookie = bmp_allocate(dht->cookies); + if (!bmp_is_id_valid(dht->cookies, msg->cookie)) + goto fail_bmp_alloc; + } + + len = kad_msg__get_packed_size(msg); + if (len == 0) + goto fail_msg; + + if (ipcp_sdb_reserve(&sdb, len)) + goto fail_msg; + + kad_msg__pack(msg, shm_du_buff_head(sdb)); + +#ifndef __DHT_TEST__ + while (retr >= 0) { + if (dt_write_sdu(addr, QOS_CUBE_BE, dht->fd, sdb)) + retr--; + else + break; + sleep(1); + } + + if (retr < 0) + goto fail_write; +#else + (void) addr; + (void) retr; + ipcp_sdb_release(sdb); +#endif /* __DHT_TEST__ */ + + if (msg->code < KAD_STORE && dht->state != DHT_SHUTDOWN) { + req = kad_req_create(dht, msg, addr); + if (req != NULL) + list_add(&req->next, &dht->requests); + } + + pthread_rwlock_unlock(&dht->lock); + + return 0; + +#ifndef __DHT_TEST__ + fail_write: + ipcp_sdb_release(sdb); +#endif + fail_msg: + bmp_release(dht->cookies, msg->cookie); + fail_bmp_alloc: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +static struct dht_entry * dht_find_entry(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + + list_for_each(p, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + if (!memcmp(key, e->key, dht->b)) + return e; + } + + return NULL; +} + +static int kad_add(struct dht * dht, + const kad_contact_msg_t * contacts, + ssize_t n, + time_t exp) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + while (n-- > 0) { + if (contacts[n].id.len != dht->b) + log_warn("Bad key length in contact data."); + + e = dht_find_entry(dht, contacts[n].id.data); + if (e != NULL) { + if (dht_entry_add_addr(e, contacts[n].addr, exp)) + goto fail; + } else { + e = dht_entry_create(dht, contacts[n].id.data); + if (e == NULL) + goto fail; + + if (dht_entry_add_addr(e, contacts[n].addr, exp)) { + dht_entry_destroy(e); + goto fail; + } + + list_add(&e->next, &dht->entries); + } + } + + pthread_rwlock_unlock(&dht->lock); + return 0; + + fail: + pthread_rwlock_unlock(&dht->lock); + return -ENOMEM; +} + +static int wait_resp(struct dht * dht, + kad_msg_t * msg, + time_t timeo) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + pthread_rwlock_unlock(&dht->lock); + + return kad_req_wait(req, timeo); +} + +static int kad_store(struct dht * dht, + const uint8_t * key, + uint64_t addr, + uint64_t r_addr, + time_t ttl) +{ + kad_msg_t msg = KAD_MSG__INIT; + kad_contact_msg_t cmsg = KAD_CONTACT_MSG__INIT; + kad_contact_msg_t * cmsgp[1]; + + cmsg.id.data = (uint8_t *) key; + cmsg.id.len = dht->b; + cmsg.addr = addr; + + cmsgp[0] = &cmsg; + + msg.code = KAD_STORE; + msg.has_t_expire = true; + msg.t_expire = ttl; + msg.n_contacts = 1; + msg.contacts = cmsgp; + + if (send_msg(dht, &msg, r_addr)) + return -1; + + return 0; +} + +static ssize_t kad_find(struct dht * dht, + const uint8_t * key, + const uint64_t * addrs, + enum kad_code code) +{ + kad_msg_t msg = KAD_MSG__INIT; + ssize_t sent = 0; + + assert(dht); + assert(key); + + msg.code = code; + + msg.has_key = true; + msg.key.data = (uint8_t *) key; + msg.key.len = dht->b; + + while (*addrs != 0) { + if (*addrs != dht->addr) { + send_msg(dht, &msg, *addrs); + sent++; + } + ++addrs; + } + + return sent; +} + +static void lookup_set_state(struct lookup * lu, + enum lookup_state state) +{ + pthread_mutex_lock(&lu->lock); + + lu->state = state; + pthread_cond_signal(&lu->cond); + + pthread_mutex_unlock(&lu->lock); +} + +static struct lookup * kad_lookup(struct dht * dht, + const uint8_t * id, + enum kad_code code) +{ + uint64_t addrs[KAD_ALPHA + 1]; + enum lookup_state state; + struct lookup * lu; + + lu = lookup_create(dht, id); + if (lu == NULL) + return NULL; + + lookup_new_addrs(lu, addrs); + + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lookup_destroy(lu); + return NULL; + } + + if (kad_find(dht, id, addrs, code) == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lu->state = LU_COMPLETE; + return lu; + } + + while ((state = lookup_wait(lu)) != LU_COMPLETE) { + switch (state) { + case LU_UPDATE: + lookup_new_addrs(lu, addrs); + if (addrs[0] == 0) { + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + return lu; + } + + kad_find(dht, id, addrs, code); + break; + case LU_DESTROY: + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + lookup_set_state(lu, LU_NULL); + return NULL; + default: + break; + }; + } + + assert(state = LU_COMPLETE); + + pthread_rwlock_wrlock(&dht->lock); + list_del(&lu->next); + pthread_rwlock_unlock(&dht->lock); + + return lu; +} + +static void kad_publish(struct dht * dht, + const uint8_t * key, + uint64_t addr, + time_t exp) +{ + struct lookup * lu; + uint64_t addrs[KAD_K]; + ssize_t n; + + assert(dht); + assert(key); + + lu = kad_lookup(dht, key, KAD_FIND_NODE); + if (lu == NULL) + return; + + n = lookup_contact_addrs(lu, addrs); + + while (n-- > 0) { + if (addrs[n] == dht->addr) { + kad_contact_msg_t msg = KAD_CONTACT_MSG__INIT; + msg.id.data = (uint8_t *) key; + msg.id.len = dht->b; + msg.addr = addr; + kad_add(dht, &msg, 1, exp); + } else { + if (kad_store(dht, key, addr, addrs[n], dht->t_expire)) + log_warn("Failed to send store message."); + } + } + + lookup_destroy(lu); +} + +static int kad_join(struct dht * dht, + uint64_t addr) +{ + kad_msg_t msg = KAD_MSG__INIT; + struct lookup * lu; + + msg.code = KAD_JOIN; + + msg.has_alpha = true; + msg.has_b = true; + msg.has_k = true; + msg.has_t_refresh = true; + msg.has_t_replicate = true; + msg.alpha = KAD_ALPHA; + msg.b = dht->b; + msg.k = KAD_K; + msg.t_refresh = KAD_T_REFR; + msg.t_replicate = KAD_T_REPL; + + if (send_msg(dht, &msg, addr)) + return -1; + + if (wait_resp(dht, &msg, KAD_T_JOIN) < 0) + return -1; + + dht->id = create_id(dht->b); + if (dht->id == NULL) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + + return 0; +} + +static void dht_dead_peer(struct dht * dht, + uint8_t * key, + uint64_t addr) +{ + struct list_head * p; + struct list_head * h; + struct bucket * b; + + b = dht_get_bucket(dht, key); + + list_for_each_safe(p, h, &b->contacts) { + struct contact * c = list_entry(p, struct contact, next); + if (b->n_contacts + b->n_alts <= dht->k) { + ++c->fails; + return; + } + + if (c->addr == addr) { + list_del(&c->next); + contact_destroy(c); + --b->n_contacts; + break; + } + } + + while (b->n_contacts < dht->k && b->n_alts > 0) { + struct contact * c; + c = list_first_entry(&b->alts, struct contact, next); + list_del(&c->next); + --b->n_alts; + list_add(&c->next, &b->contacts); + ++b->n_contacts; + } +} + +static int dht_del(struct dht * dht, + const uint8_t * key, + uint64_t addr) +{ + struct dht_entry * e; + + pthread_rwlock_wrlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -EPERM; + } + + dht_entry_del_addr(e, addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static buffer_t dht_retrieve(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct list_head * p; + buffer_t buf; + uint64_t * pos; + + buf.len = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.data = malloc(sizeof(dht->addr) * e->n_vals); + if (buf.data == NULL) { + pthread_rwlock_unlock(&dht->lock); + return buf; + } + + buf.len = e->n_vals; + + pos = (uint64_t *) buf.data;; + + list_for_each(p, &e->vals) { + struct val * v = list_entry(p, struct val, next); + *pos++ = v->addr; + } + + pthread_rwlock_unlock(&dht->lock); + + return buf; +} + +static ssize_t dht_get_contacts(struct dht * dht, + const uint8_t * key, + kad_contact_msg_t *** msgs) +{ + struct list_head l; + struct list_head * p; + struct list_head * h; + size_t len; + size_t i = 0; + + list_head_init(&l); + + pthread_rwlock_rdlock(&dht->lock); + + len = dht_contact_list(dht, &l, key); + if (len == 0) + return 0; + + *msgs = malloc(len * sizeof(**msgs)); + if (*msgs == NULL) + return 0; + + list_for_each_safe(p, h, &l) { + struct contact * c = list_entry(p, struct contact, next); + (*msgs)[i] = malloc(sizeof(***msgs)); + if ((*msgs)[i] == NULL) { + pthread_rwlock_unlock(&dht->lock); + while (i > 0) + free(*msgs[--i]); + free(*msgs); + return 0; + } + + kad_contact_msg__init((*msgs)[i]); + + (*msgs)[i]->id.data = c->id; + (*msgs)[i]->id.len = dht->b; + (*msgs)[i++]->addr = c->addr; + list_del(&c->next); + free(c); + } + + pthread_rwlock_unlock(&dht->lock); + + return i; +} + +static time_t gcd(time_t a, + time_t b) +{ + if (a == 0) + return b; + + return gcd(b % a, a); +} + +static void * work(void * o) +{ + struct dht * dht; + struct timespec now; + struct list_head * p; + struct list_head * h; + struct list_head reflist; + time_t intv; + struct lookup * lu; + + dht = (struct dht *) o; + + intv = gcd(dht->t_expire, dht->t_repub); + intv = gcd(intv, gcd(KAD_T_REPL, KAD_T_REFR)) / 2; + + list_head_init(&reflist); + + while (true) { + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&dht->lock); + + /* Republish registered hashes. */ + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e; + e = list_entry(p, struct ref_entry, next); + if (now.tv_sec > e->t_rep) { + kad_publish(dht, e->key, dht->addr, + dht->t_expire); + e->t_rep = now.tv_sec + dht->t_repub; + } + } + + /* Remove stale entries and republish if necessary. */ + list_for_each_safe(p, h, &dht->entries) { + struct list_head * p1; + struct list_head * h1; + struct dht_entry * e; + e = list_entry (p, struct dht_entry, next); + list_for_each_safe(p1, h1, &e->vals) { + struct val * v; + v = list_entry(p1, struct val, next); + if (now.tv_sec > v->t_exp) { + list_del(&v->next); + val_destroy(v); + } + + if (now.tv_sec > v->t_rep) { + kad_publish(dht, e->key, v->addr, + dht->t_expire - now.tv_sec); + v->t_rep = now.tv_sec + dht->t_replic; + } + } + } + + /* Check the requests list for unresponsive nodes. */ + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r; + r = list_entry(p, struct kad_req, next); + if (now.tv_sec > r->t_exp) { + list_del(&r->next); + bmp_release(dht->cookies, r->cookie); + dht_dead_peer(dht, r->key, r->addr); + kad_req_destroy(r); + } + } + + /* Refresh unaccessed buckets. */ + bucket_refresh(dht, dht->buckets, now.tv_sec, &reflist); + + pthread_rwlock_unlock(&dht->lock); + + list_for_each_safe(p, h, &reflist) { + struct contact * c; + c = list_entry(p, struct contact, next); + lu = kad_lookup(dht, c->id, KAD_FIND_NODE); + if (lu != NULL) + lookup_destroy(lu); + list_del(&c->next); + contact_destroy(c); + } + + sleep(intv); + } + + return (void *) 0; +} + +static int kad_handle_join_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + assert(dht); + assert(req); + assert(msg); + + /* We might send version numbers later to warn of updates if needed. */ + if (!(msg->has_alpha && msg->has_b && msg->has_k && msg->has_t_expire && + msg->has_t_refresh && msg->has_t_replicate)) { + log_warn("Join refused by remote."); + return -1; + } + + if (msg->b < sizeof(uint64_t)) { + log_err("Hash sizes less than 8 bytes unsupported."); + return -1; + } + + pthread_rwlock_wrlock(&dht->lock); + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + /* Likely corrupt packet. The member will refuse, we might here too. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) + log_warn("Different kademlia parameters detected."); + + if (msg->t_replicate != KAD_T_REPL) + log_warn("Different kademlia replication time detected."); + + if (msg->t_refresh != KAD_T_REFR) + log_warn("Different kademlia refresh time detected."); + + dht->k = msg->k; + dht->b = msg->b; + dht->t_expire = msg->t_expire; + dht->t_repub = MAX(1, dht->t_expire - 10); + + if (pthread_create(&dht->worker, NULL, work, dht)) { + bucket_destroy(dht->buckets); + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + dht->state = DHT_RUNNING; + + kad_req_respond(req); + + dht_update_bucket(dht, msg->s_id.data, msg->s_addr); + + pthread_rwlock_unlock(&dht->lock); + + log_dbg("Enrollment of DHT completed."); + + return 0; +} + +static int kad_handle_find_resp(struct dht * dht, + struct kad_req * req, + kad_msg_t * msg) +{ + struct lookup * lu; + + assert(dht); + assert(req); + assert(msg); + + pthread_rwlock_rdlock(&dht->lock); + + lu = dht_find_lookup(dht, req->key); + if (lu == NULL) { + pthread_rwlock_unlock(&dht->lock); + return -1; + } + + lookup_update(dht, lu, msg); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +static void kad_handle_response(struct dht * dht, + kad_msg_t * msg) +{ + struct kad_req * req; + + assert(dht); + assert(msg); + + pthread_rwlock_wrlock(&dht->lock); + + req = dht_find_request(dht, msg); + if (req == NULL) { + pthread_rwlock_unlock(&dht->lock); + return; + } + + bmp_release(dht->cookies, req->cookie); + list_del(&req->next); + + pthread_rwlock_unlock(&dht->lock); + + switch(req->code) { + case KAD_JOIN: + if (kad_handle_join_resp(dht, req, msg)) + log_err("Enrollment of DHT failed."); + break; + case KAD_FIND_VALUE: + case KAD_FIND_NODE: + if (dht_get_state(dht) != DHT_RUNNING) + return; + kad_handle_find_resp(dht, req, msg); + break; + default: + break; + } + + kad_req_destroy(req); +} + +int dht_bootstrap(struct dht * dht, + size_t b, + time_t t_expire) +{ + assert(dht); + + pthread_rwlock_wrlock(&dht->lock); + + dht->id = create_id(b); + if (dht->id == NULL) + goto fail_id; + + dht->buckets = bucket_create(); + if (dht->buckets == NULL) + goto fail_buckets; + + dht->buckets->depth = 0; + dht->buckets->mask = 0; + + dht->b = b / CHAR_BIT; + dht->t_expire = MAX(2, t_expire); + dht->t_repub = MAX(1, t_expire - 10); + dht->k = KAD_K; + + if (pthread_create(&dht->worker, NULL, work, dht)) + goto fail_pthread_create; + + dht->state = DHT_RUNNING; + + dht_update_bucket(dht, dht->id, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; + + fail_pthread_create: + bucket_destroy(dht->buckets); + dht->buckets = NULL; + fail_buckets: + free(dht->id); + dht->id = NULL; + fail_id: + pthread_rwlock_unlock(&dht->lock); + return -1; +} + +int dht_enroll(struct dht * dht, + uint64_t addr) +{ + assert(dht); + + return kad_join(dht, addr); +} + +int dht_reg(struct dht * dht, + const uint8_t * key) +{ + struct ref_entry * e; + + assert(dht); + assert(key); + assert(dht->addr != 0); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + e = ref_entry_create(dht, key); + if (e == NULL) + return -ENOMEM; + + pthread_rwlock_wrlock(&dht->lock); + + list_add(&e->next, &dht->refs); + + pthread_rwlock_unlock(&dht->lock); + + kad_publish(dht, key, dht->addr, dht->t_expire); + + return 0; +} + +int dht_unreg(struct dht * dht, + const uint8_t * key) +{ + struct list_head * p; + struct list_head * h; + + assert(dht); + assert(key); + + if (dht_get_state(dht) != DHT_RUNNING) + return -1; + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * r = list_entry(p, struct ref_entry, next); + if (!memcmp(key, r->key, dht-> b) ) { + list_del(&r->next); + ref_entry_destroy(r); + } + } + + dht_del(dht, key, dht->addr); + + pthread_rwlock_unlock(&dht->lock); + + return 0; +} + +uint64_t dht_query(struct dht * dht, + const uint8_t * key) +{ + struct dht_entry * e; + struct lookup * lu; + uint64_t addrs[KAD_K]; + size_t n; + + addrs[0] = 0; + + pthread_rwlock_rdlock(&dht->lock); + + e = dht_find_entry(dht, key); + if (e != NULL) + addrs[0] = dht_entry_get_addr(dht, e); + + pthread_rwlock_unlock(&dht->lock); + + if (addrs[0] != 0 && addrs[0] != dht->addr) + return addrs[0]; + + lu = kad_lookup(dht, key, KAD_FIND_VALUE); + if (lu == NULL) + return 0; + + n = lookup_get_addrs(lu, addrs); + if (n == 0) { + lookup_destroy(lu); + return 0; + } + + lookup_destroy(lu); + + /* Current behaviour is anycast and return the first peer address. */ + if (addrs[0] != dht->addr) + return addrs[0]; + + if (n > 1) + return addrs[1]; + + return 0; +} + +void dht_post_sdu(void * ae, + struct shm_du_buff * sdb) +{ + struct dht * dht; + kad_msg_t * msg; + kad_contact_msg_t ** cmsgs; + kad_msg_t resp_msg = KAD_MSG__INIT; + uint64_t addr; + buffer_t buf; + size_t i; + + assert(ae); + assert(sdb); + + memset(&buf, 0, sizeof(buf)); + + dht = (struct dht *) ae; + + msg = kad_msg__unpack(NULL, + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); + + ipcp_sdb_release(sdb); + + if (msg == NULL) { + log_err("Failed to unpack message."); + return; + } + + if (msg->has_key && msg->key.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad key in message."); + return; + } + + if (msg->has_s_id && !msg->has_b && msg->s_id.len != dht->b) { + kad_msg__free_unpacked(msg, NULL); + log_warn("Bad source ID in message of type %d.", msg->code); + return; + } + + if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { + kad_msg__free_unpacked(msg, NULL); + return; + } + + addr = msg->s_addr; + + resp_msg.code = KAD_RESPONSE; + resp_msg.cookie = msg->cookie; + + switch(msg->code) { + case KAD_JOIN: + /* Refuse enrollee on check fails. */ + if (msg->alpha != KAD_ALPHA || msg->k != KAD_K) { + log_warn("Parameter mismatch. " + "DHT enrolment refused."); + break; + } + + if (msg->t_replicate != KAD_T_REPL) { + log_warn("Replication time mismatch. " + "DHT enrolment refused."); + + break; + } + + if (msg->t_refresh != KAD_T_REFR) { + log_warn("Refresh time mismatch. " + "DHT enrolment refused."); + break; + } + + resp_msg.has_alpha = true; + resp_msg.has_b = true; + resp_msg.has_k = true; + resp_msg.has_t_expire = true; + resp_msg.has_t_refresh = true; + resp_msg.has_t_replicate = true; + resp_msg.alpha = KAD_ALPHA; + resp_msg.b = dht->b; + resp_msg.k = KAD_K; + resp_msg.t_expire = dht->t_expire; + resp_msg.t_refresh = KAD_T_REFR; + resp_msg.t_replicate = KAD_T_REPL; + break; + case KAD_FIND_VALUE: + buf = dht_retrieve(dht, msg->key.data); + if (buf.len != 0) { + resp_msg.n_addrs = buf.len; + resp_msg.addrs = (uint64_t *) buf.data; + break; + } + /* FALLTHRU */ + case KAD_FIND_NODE: + /* Return k closest contacts. */ + resp_msg.n_contacts = + dht_get_contacts(dht, msg->key.data, &cmsgs); + resp_msg.contacts = cmsgs; + break; + case KAD_STORE: + if (msg->n_contacts < 1) { + log_warn("No contacts in store message."); + break; + } + + if (!msg->has_t_expire) { + log_warn("No expiry time in store message."); + break; + } + + kad_add(dht, *msg->contacts, msg->n_contacts, msg->t_expire); + break; + case KAD_RESPONSE: + kad_handle_response(dht, msg); + break; + default: + assert(false); + break; + } + + if (msg->code != KAD_JOIN) { + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, msg->s_id.data, addr)) + log_warn("Failed to update bucket."); + pthread_rwlock_unlock(&dht->lock); + } + + if (msg->code < KAD_STORE) { + if (send_msg(dht, &resp_msg, addr)) + log_warn("Failed to send response."); + } + + kad_msg__free_unpacked(msg, NULL); + + if (resp_msg.n_addrs > 0) + free(resp_msg.addrs); + + if (resp_msg.n_contacts == 0) + return; + + for (i = 0; i < resp_msg.n_contacts; ++i) + kad_contact_msg__free_unpacked(resp_msg.contacts[i], NULL); + free(resp_msg.contacts); +} + +void dht_destroy(struct dht * dht) +{ + struct list_head * p; + struct list_head * h; + + if (dht == NULL) + return; + + if (dht_get_state(dht) == DHT_RUNNING) + dht_set_state(dht, DHT_SHUTDOWN); + + pthread_rwlock_wrlock(&dht->lock); + + list_for_each_safe(p, h, &dht->entries) { + struct dht_entry * e = list_entry(p, struct dht_entry, next); + list_del(&e->next); + dht_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->requests) { + struct kad_req * r = list_entry(p, struct kad_req, next); + list_del(&r->next); + free(r); + } + + list_for_each_safe(p, h, &dht->refs) { + struct ref_entry * e = list_entry(p, struct ref_entry, next); + list_del(&e->next); + ref_entry_destroy(e); + } + + list_for_each_safe(p, h, &dht->lookups) { + struct lookup * l = list_entry(p, struct lookup, next); + list_del(&l->next); + lookup_destroy(l); + } + + pthread_rwlock_unlock(&dht->lock); + + if (dht_get_state(dht) == DHT_SHUTDOWN) { + pthread_cancel(dht->worker); + pthread_join(dht->worker, NULL); + } + + if (dht->buckets != NULL) + bucket_destroy(dht->buckets); + + bmp_destroy(dht->cookies); + + pthread_mutex_destroy(&dht->mtx); + + pthread_rwlock_destroy(&dht->lock); + + free(dht->id); + + free(dht); +} + +struct dht * dht_create(uint64_t addr) +{ + struct dht * dht; + + dht = malloc(sizeof(*dht)); + if (dht == NULL) + goto fail_malloc; + + dht->buckets = NULL; + + list_head_init(&dht->entries); + list_head_init(&dht->requests); + list_head_init(&dht->refs); + list_head_init(&dht->lookups); + + if (pthread_rwlock_init(&dht->lock, NULL)) + goto fail_rwlock; + + if (pthread_mutex_init(&dht->mtx, NULL)) + goto fail_mutex; + + dht->cookies = bmp_create(DHT_MAX_REQS, 1); + if (dht->cookies == NULL) + goto fail_bmp; + + dht->b = 0; + dht->addr = addr; + dht->id = NULL; +#ifndef __DHT_TEST__ + dht->fd = dt_reg_ae(dht, &dht_post_sdu); +#endif /* __DHT_TEST__ */ + + dht->state = DHT_INIT; + + return dht; + + fail_bmp: + pthread_mutex_destroy(&dht->mtx); + fail_mutex: + pthread_rwlock_destroy(&dht->lock); + fail_rwlock: + free(dht); + fail_malloc: + return NULL; +} diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h new file mode 100644 index 00000000..5d7fc894 --- /dev/null +++ b/src/ipcpd/normal/dht.h @@ -0,0 +1,54 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Distributed Hash Table based on Kademlia + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_DHT_H +#define OUROBOROS_IPCPD_NORMAL_DHT_H + +#include <ouroboros/ipcp-dev.h> + +#include <stdint.h> +#include <sys/types.h> + +struct dht; + +struct dht * dht_create(uint64_t addr); + +int dht_bootstrap(struct dht * dht, + size_t b, + time_t t_expire); + +int dht_enroll(struct dht * dht, + uint64_t addr); + +void dht_destroy(struct dht * dht); + +int dht_reg(struct dht * dht, + const uint8_t * key); + +int dht_unreg(struct dht * dht, + const uint8_t * key); + +uint64_t dht_query(struct dht * dht, + const uint8_t * key); + +#endif /* OUROBOROS_IPCPD_NORMAL_DHT_H */ diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 5ea8a300..69b7e90e 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -20,129 +20,134 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +#define OUROBOROS_PREFIX "directory" + #include <ouroboros/config.h> +#include <ouroboros/endian.h> #include <ouroboros/errno.h> +#include <ouroboros/logs.h> #include <ouroboros/rib.h> +#include <ouroboros/utils.h> #include "dir.h" +#include "dht.h" #include "ipcp.h" #include "ribconfig.h" #include <stdlib.h> #include <string.h> #include <assert.h> +#include <inttypes.h> -static char dir_path[RIB_MAX_PATH_LEN + 1]; +#define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) +#define ENROL_RETR 6 +#define ENROL_INTV 1 -static void dir_path_reset(void) { - dir_path[strlen(DIR_PATH)]= '\0'; - assert(strcmp(DIR_PATH, dir_path) == 0); -} +struct dht * dht; -int dir_init(void) +static uint64_t find_peer_addr(void) { - /* FIXME: set ribmgr dissemination here */ - if (rib_add(RIB_ROOT, DIR_NAME)) - return -1; + ssize_t i; + char ** members; + ssize_t n_members; + size_t reset; + char path[RIB_MAX_PATH_LEN + 1]; - strcpy(dir_path, DIR_PATH); + strcpy(path, MEMBERS_PATH); - return 0; -} + reset = strlen(path); -int dir_fini(void) -{ - /* FIXME: remove ribmgr dissemination here*/ + n_members = rib_children(path, &members); + if (n_members == 1) { + freepp(ssize_t, members, n_members); + return 0; + } + + for (i = 0; i < n_members; ++i) { + uint64_t addr; + rib_path_append(path, members[i]); + if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) { + log_err("Failed to read address from RIB."); + freepp(ssize_t, members, n_members); + return ipcpi.dt_addr; + } + + if (addr != ipcpi.dt_addr) { + freepp(ssize_t, members, n_members); + return addr; + } + + path[reset] = '\0'; + } - dir_path_reset(); - rib_del(dir_path); + freepp(ssize_t, members, n_members); return 0; } -int dir_reg(const uint8_t * hash) +int dir_init() { - char hashstr[ipcp_dir_hash_strlen() + 1]; - int ret; - - assert(hash); + uint64_t addr; - dir_path_reset(); - - ipcp_hash_str(hashstr, hash); - - ret = rib_add(dir_path, hashstr); - if (ret == -ENOMEM) - return -ENOMEM; - - rib_path_append(dir_path, hashstr); + dht = dht_create(ipcpi.dt_addr); + if (dht == NULL) + return -ENOMEM; - ret = rib_add(dir_path, ipcpi.name); - if (ret == -EPERM) + addr = find_peer_addr(); + if (addr == ipcpi.dt_addr) { + log_err("Failed to get peer address."); + dht_destroy(dht); return -EPERM; - if (ret == -ENOMEM) { - if (rib_children(dir_path, NULL) == 0) - rib_del(dir_path); - return -ENOMEM; } - return 0; -} - -int dir_unreg(const uint8_t * hash) -{ - char hashstr[ipcp_dir_hash_strlen() + 1]; - size_t len; - - assert(hash); + if (addr != 0) { + size_t retr = 0; + log_dbg("Enrolling directory with peer %" PRIu64 ".", addr); + /* NOTE: we could try other members if dht_enroll times out. */ + while (dht_enroll(dht, addr)) { + if (retr++ == ENROL_RETR) { + dht_destroy(dht); + return -EPERM; + } - dir_path_reset(); + log_dbg("Directory enrollment failed, retrying..."); + sleep(ENROL_INTV); + } - ipcp_hash_str(hashstr, hash); + log_dbg("Directory enrolled."); - rib_path_append(dir_path, hashstr); - - if (!rib_has(dir_path)) return 0; + } - len = strlen(dir_path); - - rib_path_append(dir_path, ipcpi.name); - - rib_del(dir_path); + log_dbg("Bootstrapping directory."); - dir_path[len] = '\0'; + /* TODO: get parameters for bootstrap from IRM tool. */ + if (dht_bootstrap(dht, KAD_B, 86400)) { + dht_destroy(dht); + return -ENOMEM; + } - if (rib_children(dir_path, NULL) == 0) - rib_del(dir_path); + log_dbg("Directory bootstrapped."); return 0; } -int dir_query(const uint8_t * hash) +void dir_fini(void) { - char hashstr[ipcp_dir_hash_strlen() + 1]; - size_t len; - - dir_path_reset(); - - ipcp_hash_str(hashstr, hash); - - rib_path_append(dir_path, hashstr); - - if (!rib_has(dir_path)) - return -1; - - /* FIXME: assert after local IPCP is deprecated */ - len = strlen(dir_path); + dht_destroy(dht); +} - rib_path_append(dir_path, ipcpi.name); +int dir_reg(const uint8_t * hash) +{ + return dht_reg(dht, hash); +} - if (rib_has(dir_path)) { - dir_path[len] = '\0'; - if (rib_children(dir_path, NULL) == 1) - return -1; - } +int dir_unreg(const uint8_t * hash) +{ + return dht_unreg(dht, hash); +} - return 0; +uint64_t dir_query(const uint8_t * hash) +{ + return dht_query(dht, hash); } diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h index 1b28a5c0..4091a3e8 100644 --- a/src/ipcpd/normal/dir.h +++ b/src/ipcpd/normal/dir.h @@ -23,14 +23,14 @@ #ifndef OUROBOROS_IPCPD_NORMAL_DIR_H #define OUROBOROS_IPCPD_NORMAL_DIR_H -int dir_init(void); +int dir_init(void); -int dir_fini(void); +void dir_fini(void); -int dir_reg(const uint8_t * hash); +int dir_reg(const uint8_t * hash); -int dir_unreg(const uint8_t * hash); +int dir_unreg(const uint8_t * hash); -int dir_query(const uint8_t * hash); +uint64_t dir_query(const uint8_t * hash); #endif /* OUROBOROS_IPCPD_NORMAL_DIR_H */ diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 1867c13b..5fcc5865 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -50,7 +50,7 @@ #include <assert.h> struct ae_info { - int (*post_sdu)(void * ae, struct shm_du_buff * sdb); + void (* post_sdu)(void * ae, struct shm_du_buff * sdb); void * ae; }; @@ -131,11 +131,14 @@ static int sdu_handler(int fd, return 0; } - if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) { + if (dt.aes[dt_pci.fd].post_sdu == NULL) { + log_err("No registered AE on fd %d.", dt_pci.fd); ipcp_sdb_release(sdb); - return -1; + return -EPERM; } + dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb); + return 0; } @@ -295,7 +298,7 @@ void dt_stop(void) } int dt_reg_ae(void * ae, - int (* func)(void * func, struct shm_du_buff *)) + void (* func)(void * func, struct shm_du_buff *)) { int res_fd; @@ -330,10 +333,11 @@ int dt_write_sdu(uint64_t dst_addr, struct dt_pci dt_pci; assert(sdb); + assert(dst_addr != ipcpi.dt_addr); fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { - log_err("Could not get nhop for addr %" PRIu64 ".", dst_addr); + log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); return -1; } diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 0e1a8cc3..15ef51f0 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -38,7 +38,7 @@ int dt_start(void); void dt_stop(void); int dt_reg_ae(void * ae, - int (* func)(void * ae, struct shm_du_buff * sdb)); + void (* func)(void * ae, struct shm_du_buff * sdb)); int dt_write_sdu(uint64_t dst_addr, qoscube_t qc, diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 40a680c3..704f4f16 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -30,6 +30,7 @@ #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include "dir.h" #include "dt_pci.h" #include "fa.h" #include "sdu_sched.h" @@ -79,8 +80,8 @@ static void destroy_conn(int fd) fa.r_addr[fd] = INVALID_ADDR; } -static int fa_post_sdu(void * ae, - struct shm_du_buff * sdb) +static void fa_post_sdu(void * ae, + struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; struct timespec abstime; @@ -98,9 +99,12 @@ static int fa_post_sdu(void * ae, shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), shm_du_buff_head(sdb)); + + ipcp_sdb_release(sdb); + if (msg == NULL) { log_err("Failed to unpack flow alloc message."); - return -1; + return; } switch (msg->code) { @@ -113,7 +117,7 @@ static int fa_post_sdu(void * ae, log_err("Bad flow request."); pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); - return -1; + return; } while (ipcpi.alloc_id != -1 && @@ -128,7 +132,7 @@ static int fa_post_sdu(void * ae, log_dbg("Won't allocate over non-operational IPCP."); pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); - return -1; + return; } assert(ipcpi.alloc_id == -1); @@ -141,7 +145,7 @@ static int fa_post_sdu(void * ae, pthread_mutex_unlock(&ipcpi.alloc_lock); flow_alloc_msg__free_unpacked(msg, NULL); log_err("Failed to get fd for flow."); - return -1; + return; } pthread_rwlock_wrlock(&fa.flows_lock); @@ -173,13 +177,10 @@ static int fa_post_sdu(void * ae, default: log_err("Got an unknown flow allocation message."); flow_alloc_msg__free_unpacked(msg, NULL); - return -1; + return; } flow_alloc_msg__free_unpacked(msg, NULL); - ipcp_sdb_release(sdb); - - return 0; } int fa_init(void) @@ -240,47 +241,10 @@ int fa_alloc(int fd, qoscube_t qc) { flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - char path[RIB_MAX_PATH_LEN + 1]; uint64_t addr; - ssize_t ch; - ssize_t i; - char ** children; - char hashstr[ipcp_dir_hash_strlen() + 1]; - char * dst_ipcp = NULL; struct shm_du_buff * sdb; - ipcp_hash_str(hashstr, dst); - - assert(strlen(hashstr) + strlen(DIR_PATH) + 1 - < RIB_MAX_PATH_LEN); - - strcpy(path, DIR_PATH); - - rib_path_append(path, hashstr); - - ch = rib_children(path, &children); - if (ch <= 0) - return -1; - - for (i = 0; i < ch; ++i) - if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) - dst_ipcp = children[i]; - else - free(children[i]); - - free(children); - - if (dst_ipcp == NULL) - return -1; - - strcpy(path, MEMBERS_PATH); - - rib_path_append(path, dst_ipcp); - - free(dst_ipcp); - - if (rib_read(path, &addr, sizeof(addr)) != sizeof(addr)) - return -1; + addr = dir_query(dst); msg.code = FLOW_ALLOC_CODE__FLOW_REQ; msg.has_hash = true; diff --git a/src/ipcpd/normal/kademlia.proto b/src/ipcpd/normal/kademlia.proto new file mode 100644 index 00000000..0b7e8beb --- /dev/null +++ b/src/ipcpd/normal/kademlia.proto @@ -0,0 +1,46 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * KAD protocol + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +syntax = "proto2"; + +message kad_contact_msg { + required bytes id = 1; + required uint64 addr = 2; +}; + +message kad_msg { + required uint32 code = 1; + required uint32 cookie = 2; + required uint64 s_addr = 3; + optional bytes s_id = 4; + optional bytes key = 5; + repeated uint64 addrs = 6; + repeated kad_contact_msg contacts = 7; + // enrolment parameters + optional uint32 alpha = 8; + optional uint32 b = 9; + optional uint32 k = 10; + optional uint32 t_expire = 11; + optional uint32 t_refresh = 12; + optional uint32 t_replicate = 13; +};
\ No newline at end of file diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 8c28de78..f94c15de 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -116,11 +116,6 @@ static int boot_components(void) log_dbg("Starting ribmgr."); - if (dir_init()) { - log_err("Failed to initialize directory."); - goto fail_dir; - } - if (ribmgr_init()) { log_err("Failed to initialize RIB manager."); goto fail_ribmgr; @@ -148,6 +143,11 @@ static int boot_components(void) goto fail_fa_start; } + if (dir_init()) { + log_err("Failed to initialize directory."); + goto fail_dir; + } + if (enroll_start()) { log_err("Failed to start enroll."); goto fail_enroll_start; @@ -166,6 +166,8 @@ static int boot_components(void) ipcp_set_state(IPCP_INIT); enroll_stop(); fail_enroll_start: + dir_fini(); + fail_dir: fa_stop(); fail_fa_start: dt_stop(); @@ -176,8 +178,6 @@ static int boot_components(void) fail_dt: ribmgr_fini(); fail_ribmgr: - dir_fini(); - fail_dir: addr_auth_fini(); fail_addr_auth: free(ipcpi.dif_name); @@ -191,6 +191,8 @@ void shutdown_components(void) enroll_stop(); + dir_fini(); + fa_stop(); dt_stop(); @@ -201,8 +203,6 @@ void shutdown_components(void) ribmgr_fini(); - dir_fini(); - addr_auth_fini(); free(ipcpi.dif_name); @@ -227,10 +227,9 @@ static int normal_ipcp_enroll(const char * dst, return -1; } - log_dbg("Enrolled with " HASH_FMT, HASH_VAL(dst)); + log_dbg("Enrolled with %s.", dst); info->dir_hash_algo = ipcpi.dir_hash_algo; - strcpy(info->dif_name, ipcpi.dif_name); return 0; @@ -347,12 +346,17 @@ static int normal_ipcp_bootstrap(const struct ipcp_config * conf) return 0; } +static int normal_ipcp_query(const uint8_t * dst) +{ + return dir_query(dst) ? 0 : -1; +} + static struct ipcp_ops normal_ops = { .ipcp_bootstrap = normal_ipcp_bootstrap, .ipcp_enroll = normal_ipcp_enroll, .ipcp_reg = dir_reg, .ipcp_unreg = dir_unreg, - .ipcp_query = dir_query, + .ipcp_query = normal_ipcp_query, .ipcp_flow_alloc = fa_alloc, .ipcp_flow_alloc_resp = fa_alloc_resp, .ipcp_flow_dealloc = fa_dealloc diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c index e709da7c..0907cf7a 100644 --- a/src/ipcpd/normal/pol/flat.c +++ b/src/ipcpd/normal/pol/flat.c @@ -56,7 +56,7 @@ static int addr_taken(char * name, char path[RIB_MAX_PATH_LEN + 1]; size_t reset; - strcpy(path, "/" MEMBERS_NAME); + strcpy(path, MEMBERS_PATH); reset = strlen(path); @@ -102,7 +102,7 @@ uint64_t flat_address(void) char ** members; ssize_t n_members; - strcpy(path, "/" MEMBERS_NAME); + strcpy(path, MEMBERS_PATH); if (!rib_has(path)) { log_err("Could not read members from RIB."); diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h index 31c79fbe..db1ff1bb 100644 --- a/src/ipcpd/normal/ribconfig.h +++ b/src/ipcpd/normal/ribconfig.h @@ -29,9 +29,7 @@ #define DLR "/" #define BOOT_NAME "boot" #define MEMBERS_NAME "members" -#define DIR_NAME "directory" #define ROUTING_NAME "fsdb" -#define DIR_PATH DLR DIR_NAME #define BOOT_PATH DLR BOOT_NAME #define MEMBERS_PATH DLR MEMBERS_NAME #define ROUTING_PATH DLR ROUTING_NAME diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 266a628d..3beb917c 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -299,9 +299,8 @@ static void * sync_rib(void *o) rib_path_append(path, children[--ch]); free(children[ch]); - /* Only sync fsdb, members and directory */ + /* Only sync fsdb and members */ if (strcmp(path, MEMBERS_PATH) == 0 - || strcmp(path, DIR_PATH) == 0 || strcmp(path, ROUTING_PATH) == 0) ribmgr_sync(path); } diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c index 63259430..a4b9e074 100644 --- a/src/ipcpd/normal/sdu_sched.c +++ b/src/ipcpd/normal/sdu_sched.c @@ -36,11 +36,19 @@ struct sdu_sched { flow_set_t * set[QOS_CUBE_MAX]; - fqueue_t * fqs[QOS_CUBE_MAX]; next_sdu_t callback; - pthread_t sdu_reader; + pthread_t sdu_readers[IPCP_SCHED_THREADS]; }; +static void cleanup_reader(void * o) +{ + int i; + fqueue_t ** fqs = (fqueue_t **) o; + + for (i = 0; i < QOS_CUBE_MAX; ++i) + fqueue_destroy(fqs[i]); +} + static void * sdu_reader(void * o) { struct sdu_sched * sched; @@ -49,14 +57,27 @@ static void * sdu_reader(void * o) int fd; int i = 0; int ret; + fqueue_t * fqs[QOS_CUBE_MAX]; sched = (struct sdu_sched *) o; + for (i = 0; i < QOS_CUBE_MAX; ++i) { + fqs[i] = fqueue_create(); + if (fqs[i] == NULL) { + int j; + for (j = 0; j < i; ++j) + fqueue_destroy(fqs[j]); + return (void *) -1; + } + } + + pthread_cleanup_push(cleanup_reader, fqs); + while (true) { /* FIXME: replace with scheduling policy call */ i = (i + 1) % QOS_CUBE_MAX; - ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); + ret = flow_event_wait(sched->set[i], fqs[i], &timeout); if (ret == -ETIMEDOUT) continue; @@ -65,7 +86,7 @@ static void * sdu_reader(void * o) continue; } - while ((fd = fqueue_next(sched->fqs[i])) >= 0) { + while ((fd = fqueue_next(fqs[i])) >= 0) { if (ipcp_flow_read(fd, &sdb)) { log_warn("Failed to read SDU from fd %d.", fd); continue; @@ -78,6 +99,8 @@ static void * sdu_reader(void * o) } } + pthread_cleanup_pop(true); + return (void *) 0; } @@ -89,7 +112,7 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) sdu_sched = malloc(sizeof(*sdu_sched)); if (sdu_sched == NULL) - return NULL; + goto fail_malloc; sdu_sched->callback = callback; @@ -98,31 +121,27 @@ struct sdu_sched * sdu_sched_create(next_sdu_t callback) if (sdu_sched->set[i] == NULL) { for (j = 0; j < i; ++j) flow_set_destroy(sdu_sched->set[j]); - goto fail_sdu_sched; + goto fail_flow_set; } } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sdu_sched->fqs[i] = fqueue_create(); - if (sdu_sched->fqs[i] == NULL) { - for (j = 0; j < i; ++j) - fqueue_destroy(sdu_sched->fqs[j]); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + if (pthread_create(&sdu_sched->sdu_readers[i], NULL, + sdu_reader, sdu_sched)) { + int j; + for (j = 0; j < i; ++j) { + pthread_cancel(sdu_sched->sdu_readers[j]); + pthread_join(sdu_sched->sdu_readers[j], NULL); + } goto fail_flow_set; } } - pthread_create(&sdu_sched->sdu_reader, - NULL, - sdu_reader, - (void *) sdu_sched); - return sdu_sched; fail_flow_set: - for (i = 0; i < QOS_CUBE_MAX; ++i) - flow_set_destroy(sdu_sched->set[i]); - fail_sdu_sched: free(sdu_sched); + fail_malloc: return NULL; } @@ -132,14 +151,13 @@ void sdu_sched_destroy(struct sdu_sched * sdu_sched) assert(sdu_sched); - pthread_cancel(sdu_sched->sdu_reader); - - pthread_join(sdu_sched->sdu_reader, NULL); + for (i = 0; i < IPCP_SCHED_THREADS; ++i) { + pthread_cancel(sdu_sched->sdu_readers[i]); + pthread_join(sdu_sched->sdu_readers[i], NULL); + } - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fqueue_destroy(sdu_sched->fqs[i]); + for (i = 0; i < QOS_CUBE_MAX; ++i) flow_set_destroy(sdu_sched->set[i]); - } free(sdu_sched); } diff --git a/src/ipcpd/normal/tests/CMakeLists.txt b/src/ipcpd/normal/tests/CMakeLists.txt new file mode 100644 index 00000000..d975caf6 --- /dev/null +++ b/src/ipcpd/normal/tests/CMakeLists.txt @@ -0,0 +1,37 @@ +get_filename_component(CURRENT_SOURCE_PARENT_DIR + ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(CURRENT_BINARY_PARENT_DIR + ${CMAKE_CURRENT_BINARY_DIR} DIRECTORY) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +include_directories(${CURRENT_SOURCE_PARENT_DIR}) +include_directories(${CURRENT_BINARY_PARENT_DIR}) + +include_directories(${CMAKE_SOURCE_DIR}/include) +include_directories(${CMAKE_BINARY_DIR}/include) + +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + dht_test.c +) + +set_source_files_properties(${KAD_PROTO_SRCS} PROPERTIES GENERATED TRUE) + +add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests} + ${KAD_PROTO_SRCS}) +target_link_libraries(${PARENT_DIR}_test ouroboros) + +add_dependencies(check ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +remove(tests_to_run test_suite.c) + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) +endforeach (test) diff --git a/src/ipcpd/normal/tests/dht_test.c b/src/ipcpd/normal/tests/dht_test.c new file mode 100644 index 00000000..861ae10a --- /dev/null +++ b/src/ipcpd/normal/tests/dht_test.c @@ -0,0 +1,99 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Unit tests of the DHT AE + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define __DHT_TEST__ + +#include "dht.c" + +#include <pthread.h> +#include <time.h> +#include <stdlib.h> +#include <stdio.h> + +#define KEY_LEN 32 + +#define EXP 86400 +#define CONTACTS 1000 + +int dht_test(int argc, + char ** argv) +{ + struct dht * dht; + uint64_t addr = 0x0D1F; + uint8_t key[KEY_LEN]; + size_t i; + + (void) argc; + (void) argv; + + dht = dht_create(addr); + if (dht == NULL) { + printf("Failed to create dht.\n"); + return -1; + } + + dht_destroy(dht); + + dht = dht_create(addr); + if (dht == NULL) { + printf("Failed to re-create dht.\n"); + return -1; + } + + if (dht_bootstrap(dht, KEY_LEN, EXP)) { + printf("Failed to bootstrap dht.\n"); + dht_destroy(dht); + return -1; + } + + dht_destroy(dht); + + dht = dht_create(addr); + if (dht == NULL) { + printf("Failed to re-create dht.\n"); + return -1; + } + + if (dht_bootstrap(dht, KEY_LEN, EXP)) { + printf("Failed to bootstrap dht.\n"); + dht_destroy(dht); + return -1; + } + + for (i = 0; i < CONTACTS; ++i) { + uint64_t addr; + random_buffer(&addr, sizeof(addr)); + random_buffer(key, KEY_LEN); + pthread_rwlock_wrlock(&dht->lock); + if (dht_update_bucket(dht, key, addr)) { + pthread_rwlock_unlock(&dht->lock); + printf("Failed to update bucket.\n"); + dht_destroy(dht); + return -1; + } + pthread_rwlock_unlock(&dht->lock); + } + + dht_destroy(dht); + + return 0; +} diff --git a/src/irmd/main.c b/src/irmd/main.c index 63ae6b13..8b22bdef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -36,6 +36,7 @@ #include <ouroboros/bitmap.h> #include <ouroboros/qos.h> #include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> #include <ouroboros/logs.h> #include "utils.h" @@ -99,18 +100,9 @@ struct irm { struct shm_rdrbuff * rdrb; /* rdrbuff for SDUs */ int sockfd; /* UNIX socket */ - pthread_t * threadpool; /* pool of mainloop threads */ - - struct bmp * thread_ids; /* ids for mainloop threads */ - size_t max_threads; /* max threads set by tpm */ - size_t threads; /* available mainloop threads */ - pthread_cond_t threads_cond; /* signal thread entry/exit */ - pthread_mutex_t threads_lock; /* mutex for threads/condvar */ - enum irm_state state; /* state of the irmd */ pthread_rwlock_t state_lock; /* lock for the entire irmd */ - pthread_t tpm; /* threadpool manager */ pthread_t irm_sanitize; /* clean up irmd resources */ pthread_t shm_sanitize; /* keep track of rdrbuff use */ } irmd; @@ -315,7 +307,7 @@ static pid_t create_ipcp(char * name, break; } - list_add_tail(&tmp->next, &irmd.ipcps); + list_add_tail(&tmp->next, p); list_add(&api->next, &irmd.spawned_apis); @@ -478,7 +470,7 @@ static int enroll_ipcp(pid_t api, pthread_rwlock_unlock(&irmd.reg_lock); if (ipcp_enroll(api, dst_name, &info) < 0) { - log_err("Could not enroll IPCP."); + log_err("Could not enroll IPCP %d.", api); return -1; } @@ -1426,16 +1418,6 @@ static void irm_fini(void) if (irmd_get_state() != IRMD_NULL) log_warn("Unsafe destroy."); - pthread_mutex_lock(&irmd.threads_lock); - - if (irmd.thread_ids != NULL) - bmp_destroy(irmd.thread_ids); - - pthread_mutex_unlock(&irmd.threads_lock); - - if (irmd.threadpool != NULL) - free(irmd.threadpool); - pthread_rwlock_wrlock(&irmd.flows_lock); if (irmd.port_ids != NULL) @@ -1509,8 +1491,8 @@ void irmd_sig_handler(int sig, } log_info("IRMd shutting down..."); - irmd_set_state(IRMD_NULL); + tpm_stop(); break; case SIGPIPE: log_dbg("Ignored SIGPIPE."); @@ -1692,55 +1674,11 @@ void * irm_sanitize(void * o) } } -static void thread_inc(void) -{ - pthread_mutex_lock(&irmd.threads_lock); - - ++irmd.threads; - pthread_cond_signal(&irmd.threads_cond); - - pthread_mutex_unlock(&irmd.threads_lock); -} - -static void thread_dec(void) -{ - pthread_mutex_lock(&irmd.threads_lock); - - --irmd.threads; - pthread_cond_signal(&irmd.threads_cond); - - pthread_mutex_unlock(&irmd.threads_lock); -} - -static bool thread_check(void) -{ - int ret; - - pthread_mutex_lock(&irmd.threads_lock); - - ret = irmd.threads > irmd.max_threads; - - pthread_mutex_unlock(&irmd.threads_lock); - - return ret; -} - -static void thread_exit(ssize_t id) -{ - pthread_mutex_lock(&irmd.threads_lock); - bmp_release(irmd.thread_ids, id); - - --irmd.threads; - pthread_cond_signal(&irmd.threads_cond); - - pthread_mutex_unlock(&irmd.threads_lock); -} - void * mainloop(void * o) { uint8_t buf[IRM_MSG_BUF_SIZE]; - ssize_t id = (ssize_t) o; + (void) o; while (true) { #ifdef __FreeBSD__ @@ -1760,8 +1698,8 @@ void * mainloop(void * o) struct timeval tv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; - if (irmd_get_state() != IRMD_RUNNING || thread_check()) { - thread_exit(id); + if (irmd_get_state() != IRMD_RUNNING || tpm_check()) { + tpm_exit(); break; } @@ -1772,7 +1710,6 @@ void * mainloop(void * o) if (select(irmd.sockfd + 1, &fds, NULL, NULL, &timeout) <= 0) continue; #endif - cli_sockfd = accept(irmd.sockfd, 0, 0); if (cli_sockfd < 0) continue; @@ -1790,7 +1727,7 @@ void * mainloop(void * o) if (irmd_get_state() != IRMD_RUNNING) { close(cli_sockfd); - thread_exit(id); + tpm_exit(); break; } @@ -1800,7 +1737,7 @@ void * mainloop(void * o) continue; } - thread_dec(); + tpm_dec(); if (msg->has_timeo_sec) { assert(msg->has_timeo_nsec); @@ -1929,7 +1866,7 @@ void * mainloop(void * o) if (ret_msg.result == -EPIPE || !ret_msg.has_result) { close(cli_sockfd); - thread_inc(); + tpm_inc(); continue; } @@ -1939,7 +1876,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); - thread_inc(); + tpm_inc(); continue; } @@ -1948,7 +1885,7 @@ void * mainloop(void * o) if (apis != NULL) free(apis); close(cli_sockfd); - thread_inc(); + tpm_inc(); continue; } @@ -1963,70 +1900,7 @@ void * mainloop(void * o) free(buffer.data); close(cli_sockfd); - thread_inc(); - } - - return (void *) 0; -} - -void * threadpoolmgr(void * o) -{ - pthread_attr_t pattr; - struct timespec dl; - struct timespec to = {(IRMD_TPM_TIMEOUT / 1000), - (IRMD_TPM_TIMEOUT % 1000) * MILLION}; - (void) o; - - if (pthread_attr_init(&pattr)) - return (void *) -1; - - pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); - - while (true) { - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - - if (irmd_get_state() != IRMD_RUNNING) { - pthread_attr_destroy(&pattr); - log_dbg("Waiting for threads to exit."); - pthread_mutex_lock(&irmd.threads_lock); - while (irmd.threads > 0) - pthread_cond_wait(&irmd.threads_cond, - &irmd.threads_lock); - pthread_mutex_unlock(&irmd.threads_lock); - log_dbg("Threadpool manager done."); - break; - } - - pthread_mutex_lock(&irmd.threads_lock); - - if (irmd.threads < IRMD_MIN_AV_THREADS) { - log_dbg("Increasing threadpool."); - irmd.max_threads = IRMD_MAX_AV_THREADS; - - while (irmd.threads < irmd.max_threads) { - ssize_t id = bmp_allocate(irmd.thread_ids); - if (!bmp_is_id_valid(irmd.thread_ids, id)) { - log_warn("IRMd threadpool exhausted."); - break; - } - - if (pthread_create(&irmd.threadpool[id], - &pattr, mainloop, - (void *) id)) - log_warn("Failed to start new thread."); - else - ++irmd.threads; - } - } - - if (pthread_cond_timedwait(&irmd.threads_cond, - &irmd.threads_lock, - &dl) == ETIMEDOUT) - if (irmd.threads > IRMD_MIN_AV_THREADS ) - --irmd.max_threads; - - pthread_mutex_unlock(&irmd.threads_lock); + tpm_inc(); } return (void *) 0; @@ -2035,7 +1909,6 @@ void * threadpoolmgr(void * o) static int irm_init(void) { struct stat st; - pthread_condattr_t cattr; struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; @@ -2058,24 +1931,6 @@ static int irm_init(void) goto fail_flows_lock; } - if (pthread_mutex_init(&irmd.threads_lock, NULL)) { - log_err("Failed to initialize mutex."); - goto fail_threads_lock; - } - - if (pthread_condattr_init(&cattr)) { - log_err("Failed to initialize condattr."); - goto fail_cattr; - } - -#ifndef __APPLE__ - pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif - if (pthread_cond_init(&irmd.threads_cond, &cattr)) { - log_err("Failed to initialize cond."); - goto fail_threads_cond; - } - list_head_init(&irmd.ipcps); list_head_init(&irmd.api_table); list_head_init(&irmd.apn_table); @@ -2089,18 +1944,6 @@ static int irm_init(void) goto fail_port_ids; } - irmd.thread_ids = bmp_create(IRMD_MAX_THREADS, 0); - if (irmd.thread_ids == NULL) { - log_err("Failed to thread thread_ids bitmap."); - goto fail_thread_ids; - } - - irmd.threadpool = malloc(sizeof(pthread_t) * IRMD_MAX_THREADS); - if (irmd.threadpool == NULL) { - log_err("Failed to malloc threadpool"); - goto fail_thrpool; - } - if ((irmd.lf = lockfile_create()) == NULL) { if ((irmd.lf = lockfile_open()) == NULL) { log_err("Lockfile error."); @@ -2155,8 +1998,6 @@ static int irm_init(void) goto fail_rdrbuff; } - irmd.threads = 0; - irmd.max_threads = IRMD_MIN_AV_THREADS; irmd.state = IRMD_RUNNING; log_info("Ouroboros IPC Resource Manager daemon started..."); @@ -2172,18 +2013,8 @@ fail_sock_path: fail_stat: lockfile_destroy(irmd.lf); fail_lockfile: - free(irmd.threadpool); -fail_thrpool: - bmp_destroy(irmd.thread_ids); -fail_thread_ids: bmp_destroy(irmd.port_ids); fail_port_ids: - pthread_cond_destroy(&irmd.threads_cond); -fail_threads_cond: - pthread_condattr_destroy(&cattr); -fail_cattr: - pthread_mutex_destroy(&irmd.threads_lock); -fail_threads_lock: pthread_rwlock_destroy(&irmd.flows_lock); fail_flows_lock: pthread_rwlock_destroy(&irmd.reg_lock); @@ -2253,12 +2084,24 @@ int main(int argc, exit(EXIT_FAILURE); } - pthread_create(&irmd.tpm, NULL, threadpoolmgr, NULL); + if (tpm_init(IRMD_MIN_THREADS, IRMD_ADD_THREADS, mainloop)) { + log_fini(); + exit(EXIT_FAILURE); + } + + if (tpm_start()) { + tpm_fini(); + log_fini(); + exit(EXIT_FAILURE); + } pthread_create(&irmd.irm_sanitize, NULL, irm_sanitize, NULL); pthread_create(&irmd.shm_sanitize, NULL, shm_sanitize, irmd.rdrb); - pthread_join(irmd.tpm, NULL); + /* tpm_stop() called from sighandler */ + + tpm_fini(); + pthread_join(irmd.irm_sanitize, NULL); pthread_join(irmd.shm_sanitize, NULL); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index e08869b8..fe4dd88c 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -11,7 +11,6 @@ protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto) protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto) protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto) -protobuf_generate_c(FRCT_ENROLL_SRCS FRCT_ENROLL_HDRS frct_enroll.proto) if (NOT APPLE) find_library(LIBRT_LIBRARIES rt) @@ -59,12 +58,13 @@ set(SOURCE_FILES shm_rdrbuff.c sockets.c time_utils.c + tpm.c utils.c ) add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS} ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS} - ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS} ${FRCT_ENROLL_SRCS}) + ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS}) include(AddCompileFlags) if (CMAKE_BUILD_TYPE MATCHES Debug) @@ -77,9 +77,8 @@ else() if (${LINUX_RND_HDR} STREQUAL "LINUX_RND_HDR-NOTFOUND") find_package(OpenSSL) if (NOT OPENSSL_FOUND) - message(STATUS "No secure random generation, please install OpenSSL.") + message(FATAL_ERROR "No secure random generation, please install libssl.") else() - message(STATUS "OpenSSL found") include_directories($OPENSSL_INCLUDE_DIR}) add_compile_flags(ouroboros -DHAVE_OPENSSL) endif() diff --git a/src/lib/dev.c b/src/lib/dev.c index 14971528..c8e43778 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -85,7 +85,6 @@ struct flow { struct { char * ap_name; - char * daf_name; pid_t api; struct shm_rdrbuff * rdrb; @@ -205,7 +204,7 @@ static int api_announce(char * ap_name) return ret; } -static void init_flow(int fd) +static void flow_clear(int fd) { assert(!(fd < 0)); @@ -216,9 +215,9 @@ static void init_flow(int fd) ai.flows[fd].cube = QOS_CUBE_BE; } -static void reset_flow(int fd) +static void flow_fini(int fd) { - assert (!(fd < 0)); + assert(!(fd < 0)); if (ai.flows[fd].port_id != -1) port_destroy(&ai.ports[ai.flows[fd].port_id]); @@ -232,7 +231,59 @@ static void reset_flow(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - init_flow(fd); + flow_clear(fd); +} + +static int flow_init(int port_id, + pid_t api, + qoscube_t qc) +{ + int fd; + + pthread_rwlock_wrlock(&ai.flows_lock); + + fd = bmp_allocate(ai.fds); + if (!bmp_is_id_valid(ai.fds, fd)) { + pthread_rwlock_unlock(&ai.flows_lock); + return -EBADF; + } + + ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); + if (ai.flows[fd].rx_rb == NULL) { + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + return -ENOMEM; + } + + ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); + if (ai.flows[fd].tx_rb == NULL) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + return -ENOMEM; + } + + ai.flows[fd].set = shm_flow_set_open(api); + if (ai.flows[fd].set == NULL) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.flows_lock); + return -ENOMEM; + } + + ai.flows[fd].port_id = port_id; + ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].api = api; + ai.flows[fd].cube = qc; + ai.flows[fd].spec = qos_cube_to_spec(qc); + + ai.ports[port_id].fd = fd; + + port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + + pthread_rwlock_unlock(&ai.flows_lock); + + return fd; } int ouroboros_init(const char * ap_name) @@ -242,7 +293,6 @@ int ouroboros_init(const char * ap_name) assert(ai.ap_name == NULL); ai.api = getpid(); - ai.daf_name = NULL; ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS); if (ai.fds == NULL) @@ -279,7 +329,7 @@ int ouroboros_init(const char * ap_name) } for (i = 0; i < AP_MAX_FLOWS; ++i) - init_flow(i); + flow_clear(i); ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) { @@ -333,9 +383,6 @@ void ouroboros_fini() shm_flow_set_destroy(ai.fqset); - if (ai.daf_name != NULL) - free(ai.daf_name); - if (ai.ap_name != NULL) free(ai.ap_name); @@ -346,7 +393,7 @@ void ouroboros_fini() ssize_t idx; while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - reset_flow(i); + flow_fini(i); } } @@ -368,13 +415,9 @@ void ouroboros_fini() int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int fd = -1; - frct_enroll_msg_t * frct_enroll; - qosspec_t spec; - uint8_t data[BUF_SIZE]; - ssize_t n; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + int fd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_api = true; @@ -408,83 +451,15 @@ int flow_accept(qosspec_t * qs, return -EIRMD; } - pthread_rwlock_wrlock(&ai.flows_lock); - - fd = bmp_allocate(ai.fds); - if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -EBADF; - } - - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); - if (ai.flows[fd].rx_rb == NULL) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); - if (ai.flows[fd].tx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].set = shm_flow_set_open(recv_msg->api); - if (ai.flows[fd].set == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].port_id = recv_msg->port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = recv_msg->api; - ai.flows[fd].cube = recv_msg->qoscube; - - assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); - - spec = qos_cube_to_spec(recv_msg->qoscube); - - ai.ports[recv_msg->port_id].fd = fd; - ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; - - pthread_rwlock_unlock(&ai.flows_lock); + fd = flow_init(recv_msg->port_id, recv_msg->api, recv_msg->qoscube); irm_msg__free_unpacked(recv_msg, NULL); - n = flow_read(fd, data, BUF_SIZE); - if (n < 0) { - flow_dealloc(fd); - return n; - } - - frct_enroll = frct_enroll_msg__unpack(NULL, n, data); - if (frct_enroll == NULL) { - flow_dealloc(fd); - return -1; - } - - spec.resource_control = frct_enroll->resource_control; - spec.reliable = frct_enroll->reliable; - spec.error_check = frct_enroll->error_check; - spec.ordered = frct_enroll->ordered; - spec.partial = frct_enroll->partial; - - frct_enroll_msg__free_unpacked(frct_enroll, NULL); - - pthread_rwlock_wrlock(&ai.flows_lock); - ai.flows[fd].spec = spec; - pthread_rwlock_unlock(&ai.flows_lock); + if (fd < 0) + return fd; if (qs != NULL) - *qs = spec; + *qs = ai.flows[fd].spec; return fd; } @@ -493,14 +468,10 @@ int flow_alloc(const char * dst_name, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - frct_enroll_msg_t frct_enroll = FRCT_ENROLL_MSG__INIT; - irm_msg_t * recv_msg = NULL; - qoscube_t qc = QOS_CUBE_BE; - int fd; - ssize_t len; - uint8_t * data; - int ret; + irm_msg_t msg = IRM_MSG__INIT; + irm_msg_t * recv_msg = NULL; + qoscube_t qc = QOS_CUBE_BE; + int fd; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = (char *) dst_name; @@ -508,15 +479,8 @@ int flow_alloc(const char * dst_name, msg.has_qoscube = true; msg.api = ai.api; - if (qs != NULL) { - frct_enroll.resource_control = qs->resource_control; - frct_enroll.reliable = qs->reliable; - frct_enroll.error_check = qs->error_check; - frct_enroll.ordered = qs->ordered; - frct_enroll.partial = qs->partial; - + if (qs != NULL) qc = qos_spec_to_cube(*qs); - } msg.qoscube = qc; @@ -547,78 +511,10 @@ int flow_alloc(const char * dst_name, return -EIRMD; } - pthread_rwlock_wrlock(&ai.flows_lock); - - fd = bmp_allocate(ai.fds); - if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -EBADF; - } - - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, recv_msg->port_id); - if (ai.flows[fd].rx_rb == NULL) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id); - if (ai.flows[fd].tx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].set = shm_flow_set_open(recv_msg->api); - if (ai.flows[fd].set == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -ENOMEM; - } - - ai.flows[fd].port_id = recv_msg->port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = recv_msg->api; - ai.flows[fd].cube = recv_msg->qoscube; - - assert(ai.ports[recv_msg->port_id].state == PORT_INIT); - - ai.ports[recv_msg->port_id].fd = fd; - ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; + fd = flow_init(recv_msg->port_id, recv_msg->api, qc); irm_msg__free_unpacked(recv_msg, NULL); - pthread_rwlock_unlock(&ai.flows_lock); - - len = frct_enroll_msg__get_packed_size(&frct_enroll); - if (len < 0) { - flow_dealloc(fd); - return -1; - } - - data = malloc(len); - if (data == NULL) { - flow_dealloc(fd); - return -ENOMEM; - } - - frct_enroll_msg__pack(&frct_enroll, data); - - ret = flow_write(fd, data, len); - if (ret < 0) { - flow_dealloc(fd); - free(data); - return ret; - } - - free(data); - return fd; } @@ -657,7 +553,7 @@ int flow_dealloc(int fd) pthread_rwlock_wrlock(&ai.flows_lock); - reset_flow(fd); + flow_fini(fd); bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); @@ -1073,53 +969,11 @@ int flow_event_wait(struct flow_set * set, /* ipcp-dev functions */ -int np1_flow_alloc(pid_t n_api, - int port_id) +int np1_flow_alloc(pid_t n_api, + int port_id, + qoscube_t qc) { - int fd; - - pthread_rwlock_wrlock(&ai.flows_lock); - - fd = bmp_allocate(ai.fds); - if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.flows_lock); - return -1; - } - - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); - if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - return -1; - } - - ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id); - if (ai.flows[fd].tx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - return -1; - } - - ai.flows[fd].set = shm_flow_set_open(n_api); - if (ai.flows[fd].set == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - return -1; - } - - ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - ai.flows[fd].api = n_api; - - ai.ports[port_id].fd = fd; - ai.ports[port_id].state = PORT_ID_ASSIGNED; - - pthread_rwlock_unlock(&ai.flows_lock); - - return fd; + return flow_init(port_id, n_api, qc); } int np1_flow_dealloc(int port_id) @@ -1182,11 +1036,10 @@ int ipcp_create_r(pid_t api, int ipcp_flow_req_arr(pid_t api, const uint8_t * dst, size_t len, - qoscube_t cube) + qoscube_t qc) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int port_id = -1; int fd = -1; if (dst == NULL) @@ -1199,88 +1052,24 @@ int ipcp_flow_req_arr(pid_t api, msg.hash.len = len; msg.hash.data = (uint8_t *) dst; msg.has_qoscube = true; - msg.qoscube = cube; - - pthread_rwlock_wrlock(&ai.flows_lock); - - fd = bmp_allocate(ai.fds); - if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.flows_lock); - return -1; /* -ENOMOREFDS */ - } - - pthread_rwlock_unlock(&ai.flows_lock); + msg.qoscube = qc; recv_msg = send_recv_irm_msg(&msg); - pthread_rwlock_wrlock(&ai.flows_lock); - - if (recv_msg == NULL) { - ai.ports[fd].state = PORT_INIT; - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); + if (recv_msg == NULL) return -EIRMD; - } if (!recv_msg->has_port_id || !recv_msg->has_api) { - ai.ports[fd].state = PORT_INIT; - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } if (recv_msg->has_result && recv_msg->result) { - ai.ports[fd].state = PORT_INIT; - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - port_id = recv_msg->port_id; - if (port_id < 0) { - ai.ports[fd].state = PORT_INIT; - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); - if (ai.flows[fd].rx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); - if (ai.flows[fd].tx_rb == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ai.flows[fd].set = shm_flow_set_open(recv_msg->api); - if (ai.flows[fd].set == NULL) { - reset_flow(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.flows_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; - - ai.ports[port_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); - - pthread_rwlock_unlock(&ai.flows_lock); + fd = flow_init(recv_msg->port_id, recv_msg->api, qc); irm_msg__free_unpacked(recv_msg, NULL); diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 67abbb5b..7660b1dd 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -117,7 +117,7 @@ struct shm_flow_set * shm_flow_set_create() (set->fqueues + AP_MAX_FQUEUES * (SHM_BUFFER_SIZE)); pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -336,7 +336,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, assert(idx < AP_MAX_FQUEUES); assert(fqueue); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(set->lock); #else if (pthread_mutex_lock(set->lock) == EOWNERDEAD) @@ -358,7 +358,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set, else ret = -pthread_cond_wait(set->conds + idx, set->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX if (ret == -EOWNERDEAD) pthread_mutex_consistent(set->lock); #endif diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 1e072b21..757f65c8 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -127,7 +127,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) rb->del = rb->add + 1; pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -299,7 +299,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, ts_add(&abstime, timeout, &abstime); } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -315,7 +315,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, &abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX if (idx == -EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif @@ -356,7 +356,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb) if (shm_rbuff_empty(rb)) return; -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -367,7 +367,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb) (void *) rb->lock); while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_cond_wait(rb->del, rb->lock); #else if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index d3c1e143..1b9f07d1 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -124,7 +124,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) rb->del = rb->add + 1; pthread_mutexattr_init(&mattr); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -231,7 +231,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) assert(rb); assert(idx < SHM_BUFFER_SIZE); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -264,7 +264,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -297,7 +297,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, ts_add(&abstime, timeout, &abstime); } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -313,7 +313,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, &abstime); else idx = -pthread_cond_wait(rb->add, rb->lock); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX if (idx == -EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif @@ -334,7 +334,7 @@ void shm_rbuff_block(struct shm_rbuff * rb) { assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -349,7 +349,7 @@ void shm_rbuff_unblock(struct shm_rbuff * rb) { assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -364,7 +364,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb) { assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) @@ -376,7 +376,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb) (void *) rb->lock); while (!shm_rbuff_empty(rb)) -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_cond_wait(rb->del, rb->lock); #else if (pthread_cond_wait(rb->del, rb->lock) == EOWNERDEAD) @@ -391,7 +391,7 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb) assert(rb); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); #else if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 1f999f93..d454fef8 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -192,7 +192,7 @@ struct shm_rdrbuff * shm_rdrbuff_create() pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ +#ifdef HAVE_ROBUST_MUTEX pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); #endif pthread_mutex_init(rdrb->lock, &mattr); @@ -274,7 +274,7 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, ts_add(&abstime, timeo, &abstime); } -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -282,9 +282,9 @@ int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, #endif while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX if (pthread_cond_timedwait(rdrb->full, - rdrb->lock + rdrb->lock, &abstime) == ETIMEDOUT) { pthread_mutex_unlock(rdrb->lock); return -ETIMEDOUT; @@ -358,7 +358,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -437,7 +437,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; #endif -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) @@ -535,7 +535,7 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, assert(rdrb); assert(idx < (SHM_BUFFER_SIZE)); -#ifdef __APPLE__ +#ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) diff --git a/src/lib/tests/time_utils_test.c b/src/lib/tests/time_utils_test.c index 6d1ae32f..86636c15 100644 --- a/src/lib/tests/time_utils_test.c +++ b/src/lib/tests/time_utils_test.c @@ -28,12 +28,12 @@ static void ts_print(struct timespec * s) { - printf("timespec is %zd:%zd.\n", s->tv_sec, s->tv_nsec); + printf("timespec is %zd:%ld.\n", (ssize_t) s->tv_sec, s->tv_nsec); } static void tv_print(struct timeval * v) { - printf("timeval is %zd:%zd.\n", v->tv_sec, v->tv_usec); + printf("timeval is %zd:%ld.\n", (ssize_t) v->tv_sec, v->tv_usec); } static void ts_init(struct timespec * s, diff --git a/src/lib/tpm.c b/src/lib/tpm.c new file mode 100644 index 00000000..8298eeb5 --- /dev/null +++ b/src/lib/tpm.c @@ -0,0 +1,266 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Threadpool management + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/list.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/tpm.h> + +#include <pthread.h> +#include <stdlib.h> + +#define TPM_TIMEOUT 1000 + +struct pthr_el { + struct list_head next; + + bool join; + + pthread_t thr; +}; + +enum tpm_state { + TPM_NULL = 0, + TPM_INIT, + TPM_RUNNING +}; + +struct { + size_t min; + size_t inc; + size_t max; + size_t cur; + + void * (* func)(void *); + + struct list_head pool; + + enum tpm_state state; + + pthread_cond_t cond; + pthread_mutex_t lock; + + pthread_t mgr; +} tpm; + +static void tpm_join(void) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &tpm.pool) { + struct pthr_el * e = list_entry(p, struct pthr_el, next); + if (tpm.state != TPM_RUNNING) + while (!e->join) + pthread_cond_wait(&tpm.cond, &tpm.lock); + + if (e->join) { + pthread_join(e->thr, NULL); + list_del(&e->next); + free(e); + } + } +} + +static void * tpmgr(void * o) +{ + struct timespec dl; + struct timespec to = {(TPM_TIMEOUT / 1000), + (TPM_TIMEOUT % 1000) * MILLION}; + (void) o; + + while (true) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&tpm.lock); + + tpm_join(); + + if (tpm.state != TPM_RUNNING) { + tpm.max = 0; + tpm_join(); + pthread_mutex_unlock(&tpm.lock); + break; + } + + if (tpm.cur < tpm.min) { + tpm.max = tpm.inc; + + while (tpm.cur < tpm.max) { + struct pthr_el * e = malloc(sizeof(*e)); + if (e == NULL) + break; + + e->join = false; + + if (pthread_create(&e->thr, NULL, + tpm.func, NULL)) { + free(e); + } else { + list_add(&e->next, &tpm.pool); + ++tpm.cur; + } + } + } + + if (pthread_cond_timedwait(&tpm.cond, &tpm.lock, &dl) + == ETIMEDOUT) + if (tpm.cur > tpm.min ) + --tpm.max; + + pthread_mutex_unlock(&tpm.lock); + } + + return (void *) 0; +} + +int tpm_init(size_t min, + size_t inc, + void * (* func)(void *)) +{ + pthread_condattr_t cattr; + + if (pthread_mutex_init(&tpm.lock, NULL)) + goto fail_lock; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&tpm.cond, &cattr)) + goto fail_cond; + + list_head_init(&tpm.pool); + + pthread_condattr_destroy(&cattr); + + tpm.state = TPM_INIT; + tpm.func = func; + tpm.min = min; + tpm.inc = inc; + tpm.max = 0; + tpm.cur = 0; + + return 0; + + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&tpm.lock); + fail_lock: + return -1; +} + +int tpm_start(void) +{ + pthread_mutex_lock(&tpm.lock); + + if (pthread_create(&tpm.mgr, NULL, tpmgr, NULL)) { + pthread_mutex_unlock(&tpm.lock); + return -1; + } + + tpm.state = TPM_RUNNING; + + pthread_mutex_unlock(&tpm.lock); + + return 0; +} + +void tpm_stop(void) +{ + pthread_mutex_lock(&tpm.lock); + + tpm.state = TPM_NULL; + + pthread_mutex_unlock(&tpm.lock); +} + +void tpm_fini(void) +{ + pthread_join(tpm.mgr, NULL); + + pthread_mutex_destroy(&tpm.lock); + pthread_cond_destroy(&tpm.cond); +} + +bool tpm_check(void) +{ + bool ret; + + pthread_mutex_lock(&tpm.lock); + + ret = tpm.cur > tpm.max; + + pthread_mutex_unlock(&tpm.lock); + + return ret; +} + +void tpm_inc(void) +{ + pthread_mutex_lock(&tpm.lock); + + ++tpm.cur; + + pthread_mutex_unlock(&tpm.lock); +} + +void tpm_dec(void) +{ + pthread_mutex_lock(&tpm.lock); + + --tpm.cur; + + pthread_cond_signal(&tpm.cond); + + pthread_mutex_unlock(&tpm.lock); +} + +void tpm_exit(void) +{ + struct list_head * p; + pthread_t id; + + id = pthread_self(); + + pthread_mutex_lock(&tpm.lock); + + --tpm.cur; + + list_for_each(p, &tpm.pool) { + struct pthr_el * e = list_entry(p, struct pthr_el, next); + if (e->thr == id) { + e->join = true; + break; + } + } + + pthread_cond_signal(&tpm.cond); + + pthread_mutex_unlock(&tpm.lock); +} |