diff options
author | Sander Vrijders <[email protected]> | 2016-10-04 15:23:54 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-10-04 15:23:54 +0200 |
commit | 1a7c0923206cfb98d43122621a585027c67040ea (patch) | |
tree | acd08f09f5a094e897020e97961b2847209df043 /src/irmd/main.c | |
parent | ecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff) | |
parent | c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff) | |
download | ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip |
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 176 |
1 files changed, 51 insertions, 125 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index cc9160bf..523741ef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -21,14 +21,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "irmd" - #include <ouroboros/config.h> #include <ouroboros/errno.h> -#include <ouroboros/logs.h> #include <ouroboros/sockets.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/nsm.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/irm_config.h> @@ -36,14 +31,19 @@ #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <ouroboros/time_utils.h> +#define OUROBOROS_PREFIX "irmd" + +#include <ouroboros/logs.h> + + #include "utils.h" #include "registry.h" #include "irm_flow.h" #include "api_table.h" +#include "ipcp.h" #include <sys/socket.h> #include <sys/un.h> @@ -60,10 +60,12 @@ struct ipcp_entry { struct list_head next; + char * name; pid_t api; enum ipcp_type type; char * dif_name; + pthread_cond_t init_cond; pthread_mutex_t init_lock; bool init; @@ -100,7 +102,7 @@ struct irm { pthread_t irm_sanitize; pthread_t shm_sanitize; -} * irmd = NULL; +} * irmd; static struct irm_flow * get_irm_flow(int port_id) { @@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->port_id == port_id) return e; } @@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->n_api == n_api) return e; } @@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name) return NULL; } - LOG_INFO("New instance (%d) of %s added.", api, e->apn); - + LOG_DBG("New instance (%d) of %s added.", api, e->apn); LOG_DBG("This instance accepts flows for:"); list_for_each(p, &e->names) { struct str_el * s = list_entry(p, struct str_el, next); @@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api, struct api_entry * e = NULL; int ret = -1; - pid_t f_n_1_api; - pid_t f_n_api; + pid_t api_n1; + pid_t api_n; pthread_rwlock_rdlock(&irmd->state_lock); @@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api, return -1; } - f_n_api = f->n_api; - f_n_1_api = f->n_1_api; - - if (!response) { - f->state = FLOW_ALLOCATED; - pthread_cond_signal(&f->state_cond); - } + api_n = f->n_api; + api_n1 = f->n_1_api; pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - ret = ipcp_flow_alloc_resp(f_n_1_api, - port_id, - f_n_api, - response); + ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + + if (!(response || ret)) + irm_flow_set_state(f, FLOW_ALLOCATED); + return ret; } @@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api, { struct irm_flow * f; pid_t ipcp; + int port_id; /* FIXME: Map qos_spec to qos_cube */ @@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api, f->n_api = api; f->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) LOG_WARN("Failed to set timestamp."); @@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - f->port_id = bmp_allocate(irmd->port_ids); + port_id = f->port_id = bmp_allocate(irmd->port_ids); f->n_1_api = ipcp; list_add(&f->next, &irmd->irm_flows); @@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - if (ipcp_flow_alloc(ipcp, - f->port_id, - f->n_api, - dst_name, - src_ae_name, - QOS_CUBE_BE) < 0) { + if (ipcp_flow_alloc(ipcp, port_id, api, + dst_name, src_ae_name, QOS_CUBE_BE) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); list_del(&f->next); bmp_release(irmd->port_ids, f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(f); + irm_flow_destroy(f); return NULL; } @@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id) f = get_irm_flow(port_id); if (f == NULL) { - LOG_ERR("Could not find port %d.", port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_ERR("Could not find port %d.", port_id); return -1; } - if (f->state == FLOW_NULL) { - LOG_INFO("Port %d is deprecated.", port_id); + if (irm_flow_get_state(f) == FLOW_NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_INFO("Port %d is deprecated.", port_id); return -1; } - if (f->state == FLOW_ALLOCATED) { + if (irm_flow_get_state(f) == FLOW_ALLOCATED) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; @@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - pthread_mutex_lock(&f->state_lock); - - while (f->state == FLOW_PENDING) - pthread_cond_wait(&f->state_cond, &f->state_lock); - - pthread_mutex_unlock(&f->state_lock); - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_ALLOCATED) { - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); + if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) return 0; - } - - f->state = FLOW_NULL; - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); return -1; } -static int flow_dealloc(int port_id) +static int flow_dealloc(pid_t api, int port_id) { pid_t n_1_api; int ret = 0; @@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); - ret = ipcp_flow_dealloc(n_1_api, port_id); + if (api != n_1_api) + ret = ipcp_flow_dealloc(n_1_api, port_id); pthread_rwlock_unlock(&irmd->state_lock); @@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api, struct pid_el * c_api; pid_t h_api = -1; + LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", + api, dst_name, ae_name); + f = irm_flow_create(); if (f == NULL) { LOG_ERR("Failed to create irm_flow."); @@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api, return f; } -static int flow_alloc_reply(int port_id, - int response) +static int flow_alloc_reply(int port_id, int response) { struct irm_flow * f; @@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id, return -1; } - pthread_mutex_lock(&f->state_lock); - if (!response) - f->state = FLOW_ALLOCATED; - + irm_flow_set_state(f, FLOW_ALLOCATED); else - f->state = FLOW_NULL; - - if (pthread_cond_signal(&f->state_cond)) - LOG_ERR("Failed to send signal."); - - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id, return 0; } -static int flow_dealloc_ipcp(int port_id) -{ - struct irm_flow * f = NULL; - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return 0; - } - - list_del(&f->next); - - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - irm_flow_destroy(f); - - return 0; -} - static void irm_destroy() { struct list_head * p; @@ -1729,46 +1671,35 @@ void * irm_sanitize() struct irm_flow * f = list_entry(p, struct irm_flow, next); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_PENDING && - ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { + if (irm_flow_get_state(f) == FLOW_PENDING + && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { LOG_INFO("Pending port_id %d timed out.", f->port_id); - f->state = FLOW_NULL; - pthread_cond_signal(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); continue; } - pthread_mutex_unlock(&f->state_lock); - if (kill(f->n_api, 0) < 0) { - struct shm_ap_rbuff * n_rb = - shm_ap_rbuff_open_s(f->n_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_api); bmp_release(irmd->port_ids, f->port_id); - list_del(&f->next); LOG_INFO("AP-I %d gone, flow %d deallocated.", f->n_api, f->port_id); ipcp_flow_dealloc(f->n_1_api, f->port_id); - if (n_rb != NULL) - shm_ap_rbuff_destroy(n_rb); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * n_1_rb_s = - shm_ap_rbuff_open_s(f->n_1_api); - struct shm_ap_rbuff * n_1_rb_n = - shm_ap_rbuff_open_n(f->n_1_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_1_api); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); - if (n_1_rb_n != NULL) - shm_ap_rbuff_destroy(n_1_rb_n); - if (n_1_rb_s != NULL) - shm_ap_rbuff_destroy(n_1_rb_s); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); } } @@ -1939,7 +1870,7 @@ void * mainloop() break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->port_id); + ret_msg.result = flow_dealloc(msg->api, msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->api, @@ -1950,7 +1881,6 @@ void * mainloop() ret_msg.result = -1; break; } - /* FIXME: badly timed dealloc may give SEGV */ ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_api = true; @@ -1961,10 +1891,6 @@ void * mainloop() ret_msg.result = flow_alloc_reply(msg->port_id, msg->response); break; - case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: - ret_msg.has_result = true; - ret_msg.result = flow_dealloc_ipcp(msg->port_id); - break; default: LOG_ERR("Don't know that message code."); break; |