diff options
author | dimitri staessens <[email protected]> | 2017-04-05 20:02:28 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-04-06 10:36:24 +0200 |
commit | e1c0714d5827cd927961f3a687d9720e6e9aa802 (patch) | |
tree | d8e793cffbe829d64855eaa5a429b90ebe3dc3a4 /src/irmd | |
parent | c6ad4f96f8bb2f1ee749e92308e7173523ddd0b8 (diff) | |
download | ouroboros-e1c0714d5827cd927961f3a687d9720e6e9aa802.tar.gz ouroboros-e1c0714d5827cd927961f3a687d9720e6e9aa802.zip |
lib, irmd: Implement flow allocation timeout
Setting the timeouts on flow_alloc and flow_accept will now work. This
makes some changes to the UNIX sockets used for management
communication between the APs, IRMd and IPCPs.
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/api_table.c | 85 | ||||
-rw-r--r-- | src/irmd/api_table.h | 2 | ||||
-rw-r--r-- | src/irmd/ipcp.c | 14 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 37 | ||||
-rw-r--r-- | src/irmd/irm_flow.h | 5 | ||||
-rw-r--r-- | src/irmd/main.c | 182 |
6 files changed, 213 insertions, 112 deletions
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 5ff0fcf6..268f8231 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -31,14 +31,17 @@ #include <stdlib.h> #include <unistd.h> #include <limits.h> +#include <assert.h> -struct api_entry * api_entry_create(pid_t api, char * apn) +#define ENTRY_SLEEP_TIMEOUT 10 /* ms */ + +struct api_entry * api_entry_create(pid_t api, + char * apn) { struct api_entry * e; pthread_condattr_t cattr; - if (apn == NULL) - return NULL; + assert(apn); e = malloc(sizeof(*e)); if (e == NULL) @@ -84,8 +87,7 @@ void api_entry_destroy(struct api_entry * e) struct list_head * p; struct list_head * h; - if (e == NULL) - return; + assert(e); pthread_mutex_lock(&e->state_lock); @@ -121,11 +123,13 @@ void api_entry_destroy(struct api_entry * e) free(e); } -int api_entry_add_name(struct api_entry * e, char * name) +int api_entry_add_name(struct api_entry * e, + char * name) { struct str_el * s; - if (e == NULL || name == NULL) - return -EINVAL; + + assert(e); + assert(name); s = malloc(sizeof(*s)); if (s == NULL) @@ -137,11 +141,15 @@ int api_entry_add_name(struct api_entry * e, char * name) return 0; } -void api_entry_del_name(struct api_entry * e, char * name) +void api_entry_del_name(struct api_entry * e, + char * name) { struct list_head * p = NULL; struct list_head * h = NULL; + assert(e); + assert(name); + list_for_each_safe(p, h, &e->names) { struct str_el * s = list_entry(p, struct str_el, next); if (!wildcard_match(name, s->str)) { @@ -153,31 +161,34 @@ void api_entry_del_name(struct api_entry * e, char * name) } } +void api_entry_cancel(struct api_entry * e) +{ + pthread_mutex_lock(&e->state_lock); + + e->state = API_INIT; + pthread_cond_broadcast(&e->state_cond); + + pthread_mutex_unlock(&e->state_lock); +} + int api_entry_sleep(struct api_entry * e) { - struct timespec timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), - (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION}; + struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000), + (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION}; struct timespec now; struct timespec dl; int ret = 0; - if (e == NULL) - return -EINVAL; - - e->re = NULL; + assert(e); clock_gettime(PTHREAD_COND_CLOCK, &now); - ts_add(&now, &timeout, &dl); pthread_mutex_lock(&e->state_lock); - if (e->state != API_INIT) { - pthread_mutex_unlock(&e->state_lock); - return -EINVAL; - } - e->state = API_SLEEP; + if (e->state != API_WAKE && e->state != API_DESTROY) + e->state = API_SLEEP; while (e->state == API_SLEEP && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&e->state_cond, @@ -190,17 +201,20 @@ int api_entry_sleep(struct api_entry * e) ret = -1; } - e->state = API_INIT; + if (ret != -ETIMEDOUT) + e->state = API_INIT; + pthread_cond_broadcast(&e->state_cond); pthread_mutex_unlock(&e->state_lock); return ret; } -void api_entry_wake(struct api_entry * e, struct reg_entry * re) +void api_entry_wake(struct api_entry * e, + struct reg_entry * re) { - if (e == NULL) - return; + assert(e); + assert(re); pthread_mutex_lock(&e->state_lock); @@ -217,24 +231,32 @@ void api_entry_wake(struct api_entry * e, struct reg_entry * re) while (e->state == API_WAKE) pthread_cond_wait(&e->state_cond, &e->state_lock); + if (e->state == API_DESTROY) + e->state = API_INIT; + pthread_mutex_unlock(&e->state_lock); } -int api_table_add(struct list_head * api_table, struct api_entry * e) +int api_table_add(struct list_head * api_table, + struct api_entry * e) { - if (api_table == NULL || e == NULL) - return -EINVAL; + + assert(api_table); + assert(e); list_add(&e->next, api_table); return 0; } -void api_table_del(struct list_head * api_table, pid_t api) +void api_table_del(struct list_head * api_table, + pid_t api) { struct list_head * p; struct list_head * h; + assert(api_table); + list_for_each_safe(p, h, api_table) { struct api_entry * e = list_entry(p, struct api_entry, next); if (api == e->api) { @@ -244,10 +266,13 @@ void api_table_del(struct list_head * api_table, pid_t api) } } -struct api_entry * api_table_get(struct list_head * api_table, pid_t api) +struct api_entry * api_table_get(struct list_head * api_table, + pid_t api) { struct list_head * h; + assert(api_table); + list_for_each(h, api_table) { struct api_entry * e = list_entry(h, struct api_entry, next); if (api == e->api) diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h index c7998c7f..f9c4d0aa 100644 --- a/src/irmd/api_table.h +++ b/src/irmd/api_table.h @@ -61,6 +61,8 @@ int api_entry_sleep(struct api_entry * e); void api_entry_wake(struct api_entry * e, struct reg_entry * re); +void api_entry_cancel(struct api_entry * e); + int api_entry_add_name(struct api_entry * e, char * name); diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index a8263580..eb0c2de0 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -53,10 +53,12 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api, char * sock_path = NULL; ssize_t count = 0; ipcp_msg_t * recv_msg = NULL; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), (SOCKET_TIMEOUT % 1000) * 1000}; + if (kill(api, 0) < 0) + return NULL; + sock_path = ipcp_sock_path(api); if (sock_path == NULL) return NULL; @@ -67,10 +69,6 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api, return NULL; } - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, - (void *) &tv, sizeof(tv))) - log_warn("Failed to set timeout on socket."); - free(sock_path); buf.len = ipcp_msg__get_packed_size(msg); @@ -85,6 +83,10 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t api, return NULL; } + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + log_warn("Failed to set timeout on socket."); + pthread_cleanup_push(close_ptr, (void *) &sockfd); pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); @@ -184,7 +186,7 @@ int ipcp_destroy(pid_t api) return 0; } -int ipcp_bootstrap(pid_t api, +int ipcp_bootstrap(pid_t api, dif_config_msg_t * conf) { ipcp_msg_t msg = IPCP_MSG__INIT; diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index 7a02b01a..8b85f36f 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -23,7 +23,9 @@ #define OUROBOROS_PREFIX "irm_flow" #include <ouroboros/config.h> +#include <ouroboros/errno.h> #include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> #include "irm_flow.h" @@ -142,31 +144,52 @@ void irm_flow_set_state(struct irm_flow * f, pthread_mutex_unlock(&f->state_lock); } -enum flow_state irm_flow_wait_state(struct irm_flow * f, - enum flow_state state) +int irm_flow_wait_state(struct irm_flow * f, + enum flow_state state, + struct timespec * timeo) { + int ret = 0; + int s; + + struct timespec dl; + assert(f); assert(state != FLOW_NULL); assert(state != FLOW_DESTROY); assert(state != FLOW_DEALLOC_PENDING); + if (timeo != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, timeo, &dl); + } + pthread_mutex_lock(&f->state_lock); assert(f->state != FLOW_NULL); while (!(f->state == state || f->state == FLOW_DESTROY || - f->state == FLOW_DEALLOC_PENDING)) - pthread_cond_wait(&f->state_cond, &f->state_lock); + f->state == FLOW_DEALLOC_PENDING) && + ret != -ETIMEDOUT) { + if (timeo == NULL) + ret = -pthread_cond_wait(&f->state_cond, + &f->state_lock); + else + ret = -pthread_cond_timedwait(&f->state_cond, + &f->state_lock, + &dl); + } - if (f->state == FLOW_DESTROY || f->state == FLOW_DEALLOC_PENDING) { + if (f->state == FLOW_DESTROY || + f->state == FLOW_DEALLOC_PENDING || + ret == -ETIMEDOUT) { f->state = FLOW_NULL; pthread_cond_broadcast(&f->state_cond); } - state = f->state; + s = f->state; pthread_mutex_unlock(&f->state_lock); - return state; + return ret ? ret : s; } diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index 97770117..8902a6ab 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -71,7 +71,8 @@ enum flow_state irm_flow_get_state(struct irm_flow * f); void irm_flow_set_state(struct irm_flow * f, enum flow_state state); -enum flow_state irm_flow_wait_state(struct irm_flow * f, - enum flow_state state); +int irm_flow_wait_state(struct irm_flow * f, + enum flow_state state, + struct timespec * timeo); #endif /* OUROBOROS_IRMD_IRM_FLOW_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index 673e39ea..41beb049 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -983,23 +983,33 @@ static int api_announce(pid_t api, return 0; } -static struct irm_flow * flow_accept(pid_t api) +static int flow_accept(pid_t api, + struct timespec * timeo, + struct irm_flow ** fl) { - struct irm_flow * f = NULL; + struct irm_flow * f = NULL; struct api_entry * e = NULL; struct reg_entry * re = NULL; struct list_head * p = NULL; + struct timespec dl; + struct timespec now; + pid_t api_n1; pid_t api_n; int port_id; int ret; + if (timeo != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, timeo, &dl); + } + pthread_rwlock_rdlock(&irmd.state_lock); if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } pthread_rwlock_wrlock(&irmd.reg_lock); @@ -1010,7 +1020,7 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Unknown instance %d calling accept.", api); - return NULL; + return -EINVAL; } log_dbg("New instance (%d) of %s added.", api, e->apn); @@ -1027,18 +1037,33 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); - while ((ret = api_entry_sleep(e)) == -ETIMEDOUT) { + while (true) { + if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) { + log_dbg("Accept timed out."); + return -ETIMEDOUT; + } + pthread_rwlock_rdlock(&irmd.state_lock); + if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } + pthread_rwlock_unlock(&irmd.state_lock); - } - if (ret == -1) { - /* The process died, we can exit here. */ - return NULL; + ret = api_entry_sleep(e); + if (ret == -ETIMEDOUT) { + clock_gettime(PTHREAD_COND_CLOCK, &now); + api_entry_cancel(e); + continue; + } + + if (ret == -1) + return -EPIPE; + + if (ret == 0) + break; } pthread_rwlock_rdlock(&irmd.state_lock); @@ -1046,7 +1071,7 @@ static struct irm_flow * flow_accept(pid_t api) if (irmd.state != IRMD_RUNNING) { reg_entry_set_state(re, REG_NAME_NULL); pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -EIRMD; } pthread_rwlock_rdlock(&irmd.flows_lock); @@ -1056,7 +1081,7 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_warn("Port_id was not created yet."); - return NULL; + return -EPERM; } api_n = f->n_api; @@ -1079,7 +1104,7 @@ static struct irm_flow * flow_accept(pid_t api) irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); log_dbg("Process gone while accepting flow."); - return NULL; + return -EPERM; } pthread_mutex_lock(&e->state_lock); @@ -1100,7 +1125,7 @@ static struct irm_flow * flow_accept(pid_t api) irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); log_err("Entry in wrong state."); - return NULL; + return -EPERM; } registry_del_api(&irmd.registry, api); @@ -1118,29 +1143,34 @@ static struct irm_flow * flow_accept(pid_t api) clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); - return NULL; + return -EPERM; } irm_flow_set_state(f, FLOW_ALLOCATED); log_info("Flow on port_id %d allocated.", f->port_id); - return f; + *fl = f; + + return 0; } -static struct irm_flow * flow_alloc(pid_t api, - char * dst_name, - qoscube_t cube) +static int flow_alloc(pid_t api, + char * dst_name, + qoscube_t cube, + struct timespec * timeo, + struct irm_flow ** e) { struct irm_flow * f; - pid_t ipcp; - int port_id; + pid_t ipcp; + int port_id; + int state; pthread_rwlock_rdlock(&irmd.state_lock); if (irmd.state != IRMD_RUNNING) { pthread_rwlock_unlock(&irmd.state_lock); - return NULL; + return -1; } pthread_rwlock_rdlock(&irmd.reg_lock); @@ -1150,7 +1180,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_unlock(&irmd.state_lock); log_info("Destination unreachable."); - return NULL; + return -1; } pthread_rwlock_unlock(&irmd.reg_lock); @@ -1160,7 +1190,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Could not allocate port_id."); - return NULL; + return -EBADF; } f = irm_flow_create(api, ipcp, port_id, cube); @@ -1169,7 +1199,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_unlock(&irmd.state_lock); log_err("Could not allocate port_id."); - return NULL; + return -ENOMEM; } list_add(&f->next, &irmd.irm_flows); @@ -1179,22 +1209,30 @@ static struct irm_flow * flow_alloc(pid_t api, assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); - if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { + if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube)) { /* sanitizer cleans this */ - log_info("Failed to respond to alloc."); - return NULL; + log_info("Flow_allocation failed."); + return -EAGAIN; } - if (irm_flow_wait_state(f, FLOW_ALLOCATED) != FLOW_ALLOCATED) { - log_info("Pending flow on port_id %d torn down.", port_id); - return NULL; + state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo); + if (state != FLOW_ALLOCATED) { + if (state == -ETIMEDOUT) { + log_dbg("Flow allocation timed out"); + return -ETIMEDOUT; + } + + log_info("Pending flow to %s torn down.", dst_name); + return -EPIPE; } assert(irm_flow_get_state(f) == FLOW_ALLOCATED); + *e = f; + log_info("Flow on port_id %d allocated.", port_id); - return f; + return 0; } static int flow_dealloc(pid_t api, @@ -1382,7 +1420,6 @@ static struct irm_flow * flow_req_arr(pid_t api, return NULL; } - pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); port_id = bmp_allocate(irmd.port_ids); @@ -1798,15 +1835,17 @@ void * mainloop(void * o) struct timeval timeout = {(IRMD_ACCEPT_TIMEOUT / 1000), (IRMD_ACCEPT_TIMEOUT % 1000) * 1000}; #endif - int cli_sockfd; - irm_msg_t * msg; - ssize_t count; - buffer_t buffer; - irm_msg_t ret_msg = IRM_MSG__INIT; - struct irm_flow * e = NULL; - pid_t * apis = NULL; - struct timeval tv = {(SOCKET_TIMEOUT / 1000), - (SOCKET_TIMEOUT % 1000) * 1000}; + int cli_sockfd; + irm_msg_t * msg; + ssize_t count; + buffer_t buffer; + irm_msg_t ret_msg = IRM_MSG__INIT; + struct irm_flow * e = NULL; + pid_t * apis = NULL; + struct timespec * timeo = NULL; + struct timespec ts = {0, 0}; + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; pthread_rwlock_rdlock(&irmd.state_lock); @@ -1849,6 +1888,14 @@ void * mainloop(void * o) thread_dec(); + if (msg->has_timeo_sec) { + assert(msg->has_timeo_nsec); + + ts.tv_sec = msg->timeo_sec; + ts.tv_nsec = msg->timeo_nsec; + timeo = &ts; + } + switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; @@ -1897,9 +1944,9 @@ void * mainloop(void * o) ret_msg.result = unbind_api(msg->api, msg->dst_name); break; case IRM_MSG_CODE__IRM_LIST_IPCPS: + ret_msg.has_result = true; ret_msg.n_apis = list_ipcps(msg->dst_name, &apis); ret_msg.apis = apis; - ret_msg.has_result = true; break; case IRM_MSG_CODE__IRM_REG: ret_msg.has_result = true; @@ -1914,32 +1961,27 @@ void * mainloop(void * o) msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - e = flow_accept(msg->api); - if (e == NULL) { - ret_msg.has_result = true; - ret_msg.result = -EIRMD; - break; + ret_msg.has_result = true; + ret_msg.result = flow_accept(msg->api, timeo, &e); + if (ret_msg.result == 0) { + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_api = true; + ret_msg.api = e->n_1_api; + ret_msg.has_qoscube = true; + ret_msg.qoscube = e->qc; } - ret_msg.has_port_id = true; - ret_msg.port_id = e->port_id; - ret_msg.has_api = true; - ret_msg.api = e->n_1_api; - ret_msg.has_qoscube = true; - ret_msg.qoscube = e->qc; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: - e = flow_alloc(msg->api, - msg->dst_name, - msg->qoscube); - if (e == NULL) { - ret_msg.has_result = true; - ret_msg.result = -1; - break; + ret_msg.has_result = true; + ret_msg.result = flow_alloc(msg->api, msg->dst_name, + msg->qoscube, timeo, &e); + if (ret_msg.result == 0) { + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_api = true; + ret_msg.api = e->n_1_api; } - ret_msg.has_port_id = true; - ret_msg.port_id = e->port_id; - ret_msg.has_api = true; - ret_msg.api = e->n_1_api; break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; @@ -1949,8 +1991,8 @@ void * mainloop(void * o) e = flow_req_arr(msg->api, msg->dst_name, msg->qoscube); + ret_msg.has_result = true; if (e == NULL) { - ret_msg.has_result = true; ret_msg.result = -1; break; } @@ -1971,6 +2013,12 @@ void * mainloop(void * o) irm_msg__free_unpacked(msg, NULL); + if (ret_msg.result == -EPIPE || !ret_msg.has_result) { + close(cli_sockfd); + thread_inc(); + continue; + } + buffer.len = irm_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { log_err("Failed to calculate length of reply message."); @@ -2065,7 +2113,7 @@ void * threadpoolmgr(void * o) if (pthread_cond_timedwait(&irmd.threads_cond, &irmd.threads_lock, &dl) == ETIMEDOUT) - if (irmd.threads > IRMD_MIN_AV_THREADS) + if (irmd.threads > IRMD_MIN_AV_THREADS ) --irmd.max_threads; pthread_mutex_unlock(&irmd.threads_lock); |