diff options
author | dimitri staessens <[email protected]> | 2016-09-18 06:27:43 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-10-04 15:16:00 +0200 |
commit | c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (patch) | |
tree | acd08f09f5a094e897020e97961b2847209df043 /src/ipcpd/normal | |
parent | 2e561a634ae3e747b293a4e05eaf44726968dc1a (diff) | |
download | ouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.tar.gz ouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.zip |
lib, ipcp: Revise fast path and flow interfaces
IPCPs can now use ap_init() to initialize the memory. All flows are
accessed using flow descriptors, this greatly simplifies IPCP
development. Reverts the fast path to a single ap_rbuff per process.
Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev
holding tailored functions for shims. Moves the buffer_t to utils.h.
Fixes the shim-eth-llc length field. Removes the flow from shared.h.
Fixes #4
Fixes #5
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 202 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.h | 12 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.h | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 172 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.c | 210 |
5 files changed, 218 insertions, 380 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 79b1bb4b..b6ec1984 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -26,7 +26,7 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/list.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h> #include <stdlib.h> #include <stdbool.h> @@ -41,10 +41,8 @@ #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -extern struct ipcp * _ipcp; - struct n_flow { - struct flow flow; + int fd; struct frct_i * frct_i; enum qos_cube qos; @@ -57,7 +55,7 @@ struct n_1_flow { struct list_head next; }; -struct fmgr { +struct { pthread_t listen_thread; struct list_head n_1_flows; @@ -66,10 +64,9 @@ struct fmgr { struct list_head n_flows; /* FIXME: Make this a read/write lock */ pthread_mutex_t n_flows_lock; -} * fmgr = NULL; +} fmgr; -static int add_n_1_fd(int fd, - char * ae_name) +static int add_n_1_fd(int fd, char * ae_name) { struct n_1_flow * tmp; @@ -85,9 +82,9 @@ static int add_n_1_fd(int fd, INIT_LIST_HEAD(&tmp->next); - pthread_mutex_lock(&fmgr->n_1_flows_lock); - list_add(&tmp->next, &fmgr->n_1_flows); - pthread_mutex_unlock(&fmgr->n_1_flows_lock); + pthread_mutex_lock(&fmgr.n_1_flows_lock); + list_add(&tmp->next, &fmgr.n_1_flows); + pthread_mutex_unlock(&fmgr.n_1_flows_lock); return 0; } @@ -98,16 +95,16 @@ static void * fmgr_listen(void * o) char * ae_name; while (true) { - ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL); + ipcp_wait_state(IPCP_ENROLLED, NULL); - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() == IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); fd = flow_accept(&ae_name); if (fd < 0) { @@ -161,17 +158,13 @@ static void * fmgr_listen(void * o) int fmgr_init() { - fmgr = malloc(sizeof(*fmgr)); - if (fmgr == NULL) - return -1; + INIT_LIST_HEAD(&fmgr.n_1_flows); + INIT_LIST_HEAD(&fmgr.n_flows); - INIT_LIST_HEAD(&fmgr->n_1_flows); - INIT_LIST_HEAD(&fmgr->n_flows); + pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); + pthread_mutex_init(&fmgr.n_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr->n_flows_lock, NULL); - - pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL); + pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL); return 0; } @@ -180,23 +173,20 @@ int fmgr_fini() { struct list_head * pos = NULL; - pthread_cancel(fmgr->listen_thread); + pthread_cancel(fmgr.listen_thread); - pthread_join(fmgr->listen_thread, NULL); + pthread_join(fmgr.listen_thread, NULL); - list_for_each(pos, &fmgr->n_1_flows) { - struct n_1_flow * e = - list_entry(pos, struct n_1_flow, next); + list_for_each(pos, &fmgr.n_1_flows) { + struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); if (e->ae_name != NULL) free(e->ae_name); if (ribmgr_remove_flow(e->fd)) LOG_ERR("Failed to remove management flow."); } - pthread_mutex_destroy(&fmgr->n_1_flows_lock); - pthread_mutex_destroy(&fmgr->n_flows_lock); - - free(fmgr); + pthread_mutex_destroy(&fmgr.n_1_flows_lock); + pthread_mutex_destroy(&fmgr.n_flows_lock); return 0; } @@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name) return 0; } -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos) +int fmgr_dt_flow(char * dst_name, enum qos_cube qos) { int fd; int result; @@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name, } /* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_port_id(int port_id) +static struct n_flow * get_n_flow_by_fd(int fd) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); - if (e->flow.port_id == port_id) + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); + if (e->fd == fd) return e; } @@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) { struct list_head * pos = NULL; - list_for_each(pos, &fmgr->n_flows) { - struct n_flow * e = - list_entry(pos, struct n_flow, next); + list_for_each(pos, &fmgr.n_flows) { + struct n_flow * e = list_entry(pos, struct n_flow, next); if (e->frct_i == frct_i) return e; } @@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i) return NULL; } -int fmgr_flow_alloc(pid_t n_api, - int port_id, +int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) @@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t n_api, flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); frct_i = frct_i_create(address, &buf, qos); if (frct_i == NULL) { free(buf.data); free(flow); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } free(buf.data); - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - free(flow); - return -1; - } - - flow->flow.api = n_api; - flow->flow.port_id = port_id; - flow->flow.state = FLOW_PENDING; + flow->fd = fd; flow->frct_i = frct_i; - flow->qos = qos; + flow->qos = qos; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } /* Call under n_flows lock */ -static int n_flow_dealloc(int port_id) +static int n_flow_dealloc(int fd) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) return -1; @@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->frct_i, &buf); - if (flow->flow.rb != NULL) - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); @@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id) return ret; } -int fmgr_flow_alloc_resp(pid_t n_api, - int port_id, - int response) +int fmgr_flow_alloc_resp(int fd, int response) { struct n_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_port_id(port_id); + flow = get_n_flow_by_fd(fd); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - if (flow->flow.state != FLOW_PENDING) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - LOG_ERR("Flow is not pending."); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api, buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return -1; } @@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api, free(buf.data); list_del(&flow->next); free(flow); - } else { - if (frct_i_accept(flow->frct_i, &buf)) { - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } - - flow->flow.state = FLOW_ALLOCATED; - flow->flow.api = n_api; - - flow->flow.rb = shm_ap_rbuff_open_s(n_api); - if (flow->flow.rb == NULL) { - n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); - return -1; - } + } else if (frct_i_accept(flow->frct_i, &buf)) { + pthread_mutex_unlock(&fmgr.n_flows_lock); + return -1; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); return 0; } -int fmgr_flow_dealloc(int port_id) +int fmgr_flow_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr->n_flows_lock); - ret = n_flow_dealloc(port_id); - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); + ret = n_flow_dealloc(fd); + pthread_mutex_unlock(&fmgr.n_flows_lock); return ret; } -int fmgr_flow_alloc_msg(struct frct_i * frct_i, - buffer_t * buf) +int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf) { struct n_flow * flow; int ret = 0; - int port_id; + int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr->n_flows_lock); + pthread_mutex_lock(&fmgr.n_flows_lock); - /* Depending on what is in the message call the function in ipcp.h */ + /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return -1; } - flow->flow.state = FLOW_PENDING; flow->frct_i = frct_i; flow->qos = msg->qos_cube; - flow->flow.rb = NULL; - flow->flow.api = 0; - - port_id = ipcp_flow_req_arr(getpid(), - msg->dst_name, - msg->src_ae_name); - if (port_id < 0) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + + fd = ipcp_flow_req_arr(getpid(), + msg->dst_name, + msg->src_ae_name); + if (fd < 0) { + pthread_mutex_unlock(&fmgr.n_flows_lock); free(flow); flow_alloc_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to get port-id from IRMd."); + LOG_ERR("Failed to get fd for flow."); return -1; } - flow->flow.port_id = port_id; + flow->fd = fd; INIT_LIST_HEAD(&flow->next); - list_add(&flow->next, &fmgr->n_flows); + list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = ipcp_flow_alloc_reply(getpid(), - flow->flow.port_id, - msg->response); - + ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - shm_ap_rbuff_close(flow->flow.rb); list_del(&flow->next); free(flow); } @@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, case FLOW_ALLOC_CODE__FLOW_DEALLOC: flow = get_n_flow_by_frct_i(frct_i); if (flow == NULL) { - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - ret = irm_flow_dealloc(flow->flow.port_id); + ret = flow_dealloc(flow->fd); break; default: LOG_ERR("Got an unknown flow allocation message."); @@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, break; } - pthread_mutex_unlock(&fmgr->n_flows_lock); + pthread_mutex_unlock(&fmgr.n_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 342410ca..7e3ef5f4 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -35,25 +35,25 @@ #define DT_AE "Data transfer" int fmgr_init(); + int fmgr_fini(); /* N-flow ops */ int fmgr_mgmt_flow(char * dst_name); + int fmgr_dt_flow(char * dst_name, enum qos_cube qos); /* N+1-flow ops, local */ -int fmgr_flow_alloc(pid_t n_api, - int port_id, +int fmgr_flow_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos); -int fmgr_flow_alloc_resp(pid_t n_api, - int port_id, - int response); +int fmgr_flow_alloc_resp(int fd, + int response); -int fmgr_flow_dealloc(int port_id); +int fmgr_flow_dealloc(int fd); /* N+1-flow ops, remote */ int fmgr_flow_alloc_msg(struct frct_i * frct_i, diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 09873445..0ee87004 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -24,7 +24,7 @@ #define OUROBOROS_IPCP_FRCT_H #include <ouroboros/shared.h> -#include <ouroboros/common.h> +#include <ouroboros/utils.h> #include "dt_const.h" diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 082973f4..4611408d 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -24,10 +24,8 @@ #include <ouroboros/config.h> #include <ouroboros/logs.h> -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/dev.h> -#include <ouroboros/ipcp.h> +#include <ouroboros/ipcp-dev.h> #include <ouroboros/time_utils.h> #include <stdbool.h> @@ -47,26 +45,8 @@ /* global for trapping signal */ int irmd_api; -struct ipcp * _ipcp; - -#define normal_data(type) ((struct normal_ipcp_data *) type->data) - -struct normal_ipcp_data { - /* Keep ipcp_data first for polymorphism. */ - struct ipcp_data ipcp_data; - - struct shm_rdrbuff * rdrb; - struct shm_ap_rbuff * rb; - - pthread_t mainloop; -}; - void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - switch(sig) { case SIGINT: case SIGTERM: @@ -75,11 +55,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) LOG_DBG("IPCP %d terminating by order of %d. Bye.", getpid(), info->si_pid); - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - ipcp_set_state(_ipcp, IPCP_SHUTDOWN); + ipcp_set_state(IPCP_SHUTDOWN); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); } default: return; @@ -88,15 +68,15 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) static int normal_ipcp_name_reg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_data_add_reg_entry(_ipcp->data, name)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(ipcpi.data, name)) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to add %s to local registry.", name); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Registered %s.", name); @@ -105,11 +85,11 @@ static int normal_ipcp_name_reg(char * name) static int normal_ipcp_name_unreg(char * name) { - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - ipcp_data_del_reg_entry(_ipcp->data, name); + ipcp_data_del_reg_entry(ipcpi.data, name); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -119,59 +99,59 @@ static int normal_ipcp_enroll(char * dif_name) struct timespec timeout = {(ENROLL_TIMEOUT / 1000), (ENROLL_TIMEOUT % 1000) * MILLION}; - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Won't enroll an IPCP that is not in INIT."); return -1; /* -ENOTINIT */ } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (fmgr_mgmt_flow(dif_name)) { LOG_ERR("Failed to establish management flow."); return -1; } - if (ipcp_wait_state(_ipcp, IPCP_ENROLLED, &timeout) == -ETIMEDOUT) { + if (ipcp_wait_state(IPCP_ENROLLED, &timeout) == -ETIMEDOUT) { LOG_ERR("Enrollment timed out."); return -1; } - pthread_rwlock_rdlock(&_ipcp->state_lock); + pthread_rwlock_rdlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_ENROLLED) { + pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } static int normal_ipcp_bootstrap(struct dif_config * conf) { - pthread_rwlock_wrlock(&_ipcp->state_lock); + pthread_rwlock_wrlock(&ipcpi.state_lock); - if (ipcp_get_state(_ipcp) != IPCP_INIT) { - pthread_rwlock_unlock(&_ipcp->state_lock); + if (ipcp_get_state() != IPCP_INIT) { + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Won't bootstrap an IPCP that is not in INIT."); return -1; /* -ENOTINIT */ } if (ribmgr_bootstrap(conf)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to bootstrap RIB manager."); return -1; } - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); - _ipcp->data->dif_name = conf->dif_name; + ipcpi.data->dif_name = conf->dif_name; - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_DBG("Bootstrapped in DIF %s.", conf->dif_name); @@ -188,67 +168,6 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_flow_dealloc }; -struct normal_ipcp_data * normal_ipcp_data_create() -{ - struct normal_ipcp_data * normal_data; - enum ipcp_type ipcp_type; - - normal_data = malloc(sizeof(*normal_data)); - if (normal_data == NULL) { - LOG_ERR("Failed to allocate."); - return NULL; - } - - ipcp_type = THIS_TYPE; - if (ipcp_data_init((struct ipcp_data *) normal_data, - ipcp_type) == NULL) { - free(normal_data); - return NULL; - } - - normal_data->rdrb = shm_rdrbuff_open(); - if (normal_data->rdrb == NULL) { - free(normal_data); - return NULL; - } - - normal_data->rb = shm_ap_rbuff_create_n(); - if (normal_data->rb == NULL) { - shm_rdrbuff_close(normal_data->rdrb); - free(normal_data); - return NULL; - } - - return normal_data; -} - - -void normal_ipcp_data_destroy() -{ - int idx = 0; - - if (_ipcp == NULL) - return; - - pthread_rwlock_rdlock(&_ipcp->state_lock); - - if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN) - LOG_WARN("Cleaning up while not in shutdown."); - - /* remove all remaining sdus */ - while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0) - shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx); - - if (normal_data(_ipcp)->rdrb != NULL) - shm_rdrbuff_close(normal_data(_ipcp)->rdrb); - if (normal_data(_ipcp)->rb != NULL) - shm_ap_rbuff_close(normal_data(_ipcp)->rb); - - ipcp_data_destroy(_ipcp->data); - - pthread_rwlock_unlock(&_ipcp->state_lock); -} - int main(int argc, char * argv[]) { struct sigaction sig_act; @@ -285,56 +204,38 @@ int main(int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_instance_create(); - if (_ipcp == NULL) { - LOG_ERR("Failed to create instance."); - close_logfile(); - exit(EXIT_FAILURE); - } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); - _ipcp->data = (struct ipcp_data *) normal_ipcp_data_create(); - if (_ipcp->data == NULL) { - LOG_ERR("Failed to create instance data."); - free(_ipcp); + if (ipcp_init(THIS_TYPE, &normal_ops) < 0) { + LOG_ERR("Failed to create instance."); close_logfile(); exit(EXIT_FAILURE); } - _ipcp->ops = &normal_ops; - _ipcp->state = IPCP_INIT; + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); if (fmgr_init()) { - normal_ipcp_data_destroy(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } if (ribmgr_init()) { - normal_ipcp_data_destroy(); fmgr_fini(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } - pthread_sigmask(SIG_BLOCK, &sigset, NULL); - - pthread_create(&normal_data(_ipcp)->mainloop, NULL, - ipcp_main_loop, _ipcp); - - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); - if (ipcp_create_r(getpid())) { LOG_ERR("Failed to notify IRMd we are initialized."); - normal_ipcp_data_destroy(); fmgr_fini(); - free(_ipcp); + ipcp_fini(); close_logfile(); exit(EXIT_FAILURE); } - pthread_join(normal_data(_ipcp)->mainloop, NULL); + ipcp_fini(); if (fmgr_fini()) LOG_ERR("Failed to finalize flow manager."); @@ -345,10 +246,9 @@ int main(int argc, char * argv[]) if (frct_fini()) LOG_ERR("Failed to finalize FRCT."); - normal_ipcp_data_destroy(); - free(_ipcp); close_logfile(); ap_fini(); + exit(EXIT_SUCCESS); } diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 9733abc9..99d156f5 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -27,6 +27,7 @@ #include <ouroboros/cdap.h> #include <ouroboros/list.h> #include <ouroboros/time_utils.h> +#include <ouroboros/ipcp-dev.h> #include <stdlib.h> #include <pthread.h> @@ -45,15 +46,13 @@ typedef StaticInfoMsg static_info_msg_t; #define ENROLLMENT "enrollment" #define STATIC_INFO "static DIF information" -extern struct ipcp * _ipcp; - struct mgmt_flow { struct cdap * instance; int fd; struct list_head next; }; -struct rib { +struct { struct dt_const dtc; uint32_t address; @@ -63,7 +62,7 @@ struct rib { struct list_head cdap_reqs; pthread_mutex_t cdap_reqs_lock; -} * rib = NULL; +} rib; /* Call while holding cdap_reqs_lock */ /* FIXME: better not to call blocking functions under any lock */ @@ -84,13 +83,13 @@ int cdap_result_wait(struct cdap * instance, return -1; } - list_add(&req->next, &rib->cdap_reqs); + list_add(&req->next, &rib.cdap_reqs); - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); ret = cdap_request_wait(req); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); if (ret == -1) /* should only be on ipcp shutdown */ LOG_DBG("Waiting CDAP request destroyed."); @@ -112,22 +111,16 @@ int cdap_result_wait(struct cdap * instance, int ribmgr_init() { - rib = malloc(sizeof(*rib)); - if (rib == NULL) - return -1; + INIT_LIST_HEAD(&rib.flows); + INIT_LIST_HEAD(&rib.cdap_reqs); - INIT_LIST_HEAD(&rib->flows); - INIT_LIST_HEAD(&rib->cdap_reqs); - - if (pthread_rwlock_init(&rib->flows_lock, NULL)) { + if (pthread_rwlock_init(&rib.flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); - free(rib); return -1; } - if (pthread_mutex_init(&rib->cdap_reqs_lock, NULL)) { + if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { LOG_ERR("Failed to initialize mutex."); - free(rib); return -1; } @@ -139,19 +132,18 @@ int ribmgr_fini() struct list_head * pos = NULL; struct list_head * n = NULL; - pthread_mutex_lock(&rib->cdap_reqs_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + pthread_mutex_lock(&rib.cdap_reqs_lock); + list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); - free(req->name); list_del(&req->next); free(req); } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); - pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->flows) { + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) @@ -159,9 +151,10 @@ int ribmgr_fini() list_del(&flow->next); free(flow); } - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); - free(rib); + pthread_mutex_destroy(&rib.cdap_reqs_lock); + pthread_rwlock_destroy(&rib.flows_lock); return 0; } @@ -174,9 +167,9 @@ int ribmgr_cdap_reply(struct cdap * instance, { struct list_head * pos, * n = NULL; - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + list_for_each_safe(pos, n, &rib.cdap_reqs) { struct cdap_request * req = list_entry(pos, struct cdap_request, next); if (req->instance == instance && @@ -191,15 +184,15 @@ int ribmgr_cdap_reply(struct cdap * instance, "executed succesfully", req->code, req->name); - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: In case of a read, update values here */ cdap_request_respond(req, result); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); } } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); return 0; } @@ -223,34 +216,34 @@ int ribmgr_cdap_write(struct cdap * instance, static_info_msg_t * msg; int ret = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, STATIC_INFO) == 0) { LOG_DBG("Received static DIF information."); msg = static_info_msg__unpack(NULL, len, data); if (msg == NULL) { - ipcp_set_state(_ipcp, IPCP_INIT); - pthread_rwlock_unlock(&_ipcp->state_lock); + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); LOG_ERR("Failed to unpack static info message."); return -1; } - rib->dtc.addr_size = msg->addr_size; - rib->dtc.cep_id_size = msg->cep_id_size; - rib->dtc.pdu_length_size = msg->pdu_length_size; - rib->dtc.seqno_size = msg->seqno_size; - rib->dtc.has_ttl = msg->has_ttl; - rib->dtc.has_chk = msg->has_chk; - rib->dtc.min_pdu_size = msg->min_pdu_size; - rib->dtc.max_pdu_size = msg->max_pdu_size; + rib.dtc.addr_size = msg->addr_size; + rib.dtc.cep_id_size = msg->cep_id_size; + rib.dtc.pdu_length_size = msg->pdu_length_size; + rib.dtc.seqno_size = msg->seqno_size; + rib.dtc.has_ttl = msg->has_ttl; + rib.dtc.has_chk = msg->has_chk; + rib.dtc.min_pdu_size = msg->min_pdu_size; + rib.dtc.max_pdu_size = msg->max_pdu_size; - rib->address = msg->address; + rib.address = msg->address; - if (frct_init(&rib->dtc, rib->address)) { - ipcp_set_state(_ipcp, IPCP_INIT); - pthread_rwlock_unlock(&_ipcp->state_lock); + if (frct_init(&rib.dtc, rib.address)) { + ipcp_set_state(IPCP_INIT); + pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); static_info_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to init FRCT"); @@ -262,7 +255,7 @@ int ribmgr_cdap_write(struct cdap * instance, ret = -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { LOG_ERR("Failed to send reply to write request."); @@ -303,39 +296,39 @@ int ribmgr_cdap_start(struct cdap * instance, size_t len = 0; int iid = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_ENROLLED && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_ENROLLED && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("New enrollment request."); if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to enrollment request."); return -1; } - stat_info.addr_size = rib->dtc.addr_size; - stat_info.cep_id_size = rib->dtc.cep_id_size; - stat_info.pdu_length_size = rib->dtc.pdu_length_size; - stat_info.seqno_size = rib->dtc.seqno_size; - stat_info.has_ttl = rib->dtc.has_ttl; - stat_info.has_chk = rib->dtc.has_chk; - stat_info.min_pdu_size = rib->dtc.min_pdu_size; - stat_info.max_pdu_size = rib->dtc.max_pdu_size; + stat_info.addr_size = rib.dtc.addr_size; + stat_info.cep_id_size = rib.dtc.cep_id_size; + stat_info.pdu_length_size = rib.dtc.pdu_length_size; + stat_info.seqno_size = rib.dtc.seqno_size; + stat_info.has_ttl = rib.dtc.has_ttl; + stat_info.has_chk = rib.dtc.has_chk; + stat_info.min_pdu_size = rib.dtc.min_pdu_size; + stat_info.max_pdu_size = rib.dtc.max_pdu_size; /* FIXME: Hand out an address. */ stat_info.address = 0; len = static_info_msg__get_packed_size(&stat_info); if (len == 0) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to get size of static information."); return -1; } data = malloc(len); if (data == NULL) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to allocate memory."); return -1; } @@ -344,59 +337,59 @@ int ribmgr_cdap_start(struct cdap * instance, LOG_DBGF("Sending static info..."); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_write(instance, STATIC_INFO, data, len, 0); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send static information."); return -1; } if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote did not receive static information."); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); /* FIXME: Send neighbors here. */ LOG_DBGF("Sending stop enrollment..."); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_stop(instance, ENROLLMENT); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Failed to send stop of enrollment."); return -1; } if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); free(data); LOG_ERR("Remote failed to complete enrollment."); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); free(data); } else { if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to start request."); return -1; } } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -407,21 +400,21 @@ int ribmgr_cdap_stop(struct cdap * instance, { int ret = 0; - pthread_rwlock_wrlock(&_ipcp->state_lock); - if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL && + pthread_rwlock_wrlock(&ipcpi.state_lock); + if (ipcp_get_state() == IPCP_PENDING_ENROLL && strcmp(name, ENROLLMENT) == 0) { LOG_DBG("Stop enrollment received."); - ipcp_set_state(_ipcp, IPCP_ENROLLED); + ipcp_set_state(IPCP_ENROLLED); } else ret = -1; if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to send reply to stop request."); return -1; } - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -457,19 +450,18 @@ int ribmgr_add_flow(int fd) flow->instance = instance; flow->fd = fd; - pthread_rwlock_wrlock(&_ipcp->state_lock); - pthread_rwlock_wrlock(&rib->flows_lock); - if (list_empty(&rib->flows) && - ipcp_get_state(_ipcp) == IPCP_INIT) { - ipcp_set_state(_ipcp, IPCP_PENDING_ENROLL); + pthread_rwlock_wrlock(&ipcpi.state_lock); + pthread_rwlock_wrlock(&rib.flows_lock); + if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) { + ipcp_set_state(IPCP_PENDING_ENROLL); - pthread_mutex_lock(&rib->cdap_reqs_lock); + pthread_mutex_lock(&rib.cdap_reqs_lock); iid = cdap_send_start(instance, ENROLLMENT); if (iid < 0) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); @@ -477,20 +469,20 @@ int ribmgr_add_flow(int fd) } if (cdap_result_wait(instance, START, ENROLLMENT, iid)) { - pthread_mutex_unlock(&rib->cdap_reqs_lock); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); LOG_ERR("Failed to start enrollment."); cdap_destroy(instance); free(flow); return -1; } - pthread_mutex_unlock(&rib->cdap_reqs_lock); + pthread_mutex_unlock(&rib.cdap_reqs_lock); } - list_add(&flow->next, &rib->flows); - pthread_rwlock_unlock(&rib->flows_lock); - pthread_rwlock_unlock(&_ipcp->state_lock); + list_add(&flow->next, &rib.flows); + pthread_rwlock_unlock(&rib.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); return 0; } @@ -499,20 +491,20 @@ int ribmgr_remove_flow(int fd) { struct list_head * pos, * n = NULL; - pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->flows) { + pthread_rwlock_wrlock(&rib.flows_lock); + list_for_each_safe(pos, n, &rib.flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (flow->fd == fd) { if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); list_del(&flow->next); - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); free(flow); return 0; } } - pthread_rwlock_unlock(&rib->flows_lock); + pthread_rwlock_unlock(&rib.flows_lock); return -1; } @@ -525,19 +517,19 @@ int ribmgr_bootstrap(struct dif_config * conf) return -1; } - rib->dtc.addr_size = conf->addr_size; - rib->dtc.cep_id_size = conf->cep_id_size; - rib->dtc.pdu_length_size = conf->pdu_length_size; - rib->dtc.seqno_size = conf->seqno_size; - rib->dtc.has_ttl = conf->has_ttl; - rib->dtc.has_chk = conf->has_chk; - rib->dtc.min_pdu_size = conf->min_pdu_size; - rib->dtc.max_pdu_size = conf->max_pdu_size; + rib.dtc.addr_size = conf->addr_size; + rib.dtc.cep_id_size = conf->cep_id_size; + rib.dtc.pdu_length_size = conf->pdu_length_size; + rib.dtc.seqno_size = conf->seqno_size; + rib.dtc.has_ttl = conf->has_ttl; + rib.dtc.has_chk = conf->has_chk; + rib.dtc.min_pdu_size = conf->min_pdu_size; + rib.dtc.max_pdu_size = conf->max_pdu_size; /* FIXME: Set correct address. */ - rib->address = 0; + rib.address = 0; - if (frct_init(&rib->dtc, rib->address)) { + if (frct_init(&rib.dtc, rib.address)) { LOG_ERR("Failed to initialize FRCT."); return -1; } |