From 0c0508619959b5ac98b31b4389fcfadf5ee26d9b Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 27 Jul 2016 16:46:23 +0200 Subject: ipcpd: normal: Provide initial steps for enrollment This provides the normal IPCP with bootstrapping and the initial steps for enrollment. Next step is actually reacting to an enrollment request and sending the data transfer constants. --- src/ipcpd/normal/frct.c | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) (limited to 'src/ipcpd/normal/frct.c') diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 22f8a9fc..49006276 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -22,6 +22,8 @@ #define OUROBOROS_PREFIX "flow-rtx-control" +#include + #include #include "frct.h" @@ -30,16 +32,29 @@ struct frct_i { }; -int frct_init(struct dt_const * dt_const) +struct frct { + struct dt_const * dtc; + +} * frct = NULL; + +int frct_init(struct dt_const * dtc) { - LOG_MISSING; + if (dtc == NULL) + return -1; + + frct = malloc(sizeof(*frct)); + if (frct == NULL) { + return -1; + } + + frct->dtc = dtc; return 0; } int frct_fini() { - LOG_MISSING; + free(frct); return 0; } -- cgit v1.2.3 From cf719963be2e42026012e152ae49f4c764dd9b4f Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Thu, 28 Jul 2016 17:23:42 +0200 Subject: ipcpd: normal: Allow initiating enrollment This will add more functionality for enrolling two normal IPCPs with each other. Some bugs were fixed in CDAP. Now on enrolling, an IPCP will send a START message to the other IPCP. Next step is syncing the RIBs. --- src/ipcpd/ipcp.h | 1 + src/ipcpd/normal/fmgr.c | 42 ++++++++++++++++++++++++--------------- src/ipcpd/normal/fmgr.h | 3 --- src/ipcpd/normal/frct.c | 5 +---- src/ipcpd/normal/main.c | 12 ++++++------ src/ipcpd/normal/ribmgr.c | 50 +++++++++++++++++++++++++++++++++++------------ src/ipcpd/normal/ribmgr.h | 7 +++---- src/lib/cdap.c | 4 ++++ src/lib/shm_ap_rbuff.c | 2 +- 9 files changed, 80 insertions(+), 46 deletions(-) (limited to 'src/ipcpd/normal/frct.c') diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index bbf1d1f7..630f7922 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -33,6 +33,7 @@ enum ipcp_state { IPCP_INIT = 0, + IPCP_PENDING_ENROLL, IPCP_ENROLLED, IPCP_DISCONNECTED, IPCP_SHUTDOWN diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 4b3c49e6..a539b289 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -35,6 +35,9 @@ #include "fmgr.h" #include "ribmgr.h" #include "frct.h" +#include "ipcp.h" + +extern struct ipcp * _ipcp; struct n_1_flow { int fd; @@ -77,28 +80,37 @@ static void * fmgr_listen(void * o) int fd; char * ae_name; - /* FIXME: Only start to listen once we are enrolled */ + /* FIXME: Avoid busy wait and react to pthread_cond_t */ + pthread_rwlock_rdlock(&_ipcp->state_lock); + while (_ipcp->state != IPCP_ENROLLED || + _ipcp->state != IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&_ipcp->state_lock); + sched_yield(); + pthread_rwlock_rdlock(&_ipcp->state_lock); + } - while (true) { + while (_ipcp->state != IPCP_SHUTDOWN) { + pthread_rwlock_unlock(&_ipcp->state_lock); fd = flow_accept(&ae_name); if (fd < 0) { LOG_ERR("Failed to accept flow."); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } - LOG_DBG("New flow alloc request for AE %s", ae_name); - if (!(strcmp(ae_name, MGMT_AE) == 0 || strcmp(ae_name, DT_AE) == 0)) { if (flow_alloc_resp(fd, -1)) LOG_ERR("Failed to reply to flow allocation."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } if (flow_alloc_resp(fd, 0)) { LOG_ERR("Failed to reply to flow allocation."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } @@ -106,9 +118,10 @@ static void * fmgr_listen(void * o) ae_name); if (strcmp(ae_name, MGMT_AE) == 0) { - if (ribmgr_mgmt_flow(fd)) { + if (ribmgr_add_flow(fd)) { LOG_ERR("Failed to hand fd to RIB."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } } @@ -117,6 +130,7 @@ static void * fmgr_listen(void * o) if (frct_dt_flow(fd)) { LOG_ERR("Failed to hand fd to FRCT."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } } @@ -124,8 +138,11 @@ static void * fmgr_listen(void * o) if (add_n_1_fd(fd, ae_name)) { LOG_ERR("Failed to add file descriptor to list."); flow_dealloc(fd); + pthread_rwlock_rdlock(&_ipcp->state_lock); continue; } + + pthread_rwlock_rdlock(&_ipcp->state_lock); } return (void *) 0; @@ -134,9 +151,8 @@ static void * fmgr_listen(void * o) int fmgr_init() { fmgr = malloc(sizeof(*fmgr)); - if (fmgr == NULL) { + if (fmgr == NULL) return -1; - } INIT_LIST_HEAD(&fmgr->n_1_flows); @@ -164,7 +180,8 @@ int fmgr_fini() list_entry(pos, struct n_1_flow, next); if (e->ae_name != NULL) free(e->ae_name); - flow_dealloc(e->fd); + if (ribmgr_remove_flow(e->fd)) + LOG_ERR("Failed to remove management flow."); } free(fmgr); @@ -191,7 +208,7 @@ int fmgr_mgmt_flow(char * dst_name) return -1; } - if (ribmgr_mgmt_flow(fd)) { + if (ribmgr_add_flow(fd)) { LOG_ERR("Failed to hand file descriptor to RIB manager"); flow_dealloc(fd); return -1; @@ -239,10 +256,3 @@ int fmgr_flow_dealloc(int port_id) return -1; } - -int fmgr_flow_msg() -{ - LOG_MISSING; - - return -1; -} diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 867cbff6..dc88bbdf 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -52,7 +52,4 @@ int fmgr_flow_alloc_resp(pid_t n_api, int fmgr_flow_dealloc(int port_id); -/* RIB Manager calls this (param will be of type fmgr_msg_t) */ -int fmgr_flow_msg(); - #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index 49006276..ba465540 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -29,12 +29,10 @@ #include "frct.h" struct frct_i { - }; struct frct { struct dt_const * dtc; - } * frct = NULL; int frct_init(struct dt_const * dtc) @@ -43,9 +41,8 @@ int frct_init(struct dt_const * dtc) return -1; frct = malloc(sizeof(*frct)); - if (frct == NULL) { + if (frct == NULL) return -1; - } frct->dtc = dtc; diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 54ebd674..57fb72df 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -80,14 +80,14 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c) pthread_cancel(normal_data(_ipcp)->mainloop); + if (fmgr_fini()) + LOG_ERR("Failed to finalize flow manager."); + if (ribmgr_fini()) LOG_ERR("Failed to finalize RIB manager."); if (frct_fini()) LOG_ERR("Failed to finalize FRCT."); - - if (fmgr_fini()) - LOG_ERR("Failed to finalize flow manager."); } default: return; @@ -138,15 +138,15 @@ static int normal_ipcp_enroll(char * dif_name) return -1; /* -ENOTINIT */ } + pthread_rwlock_unlock(&_ipcp->state_lock); + if (fmgr_mgmt_flow(dif_name)) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to establish management flow."); return -1; } - _ipcp->state = IPCP_ENROLLED; - - pthread_rwlock_unlock(&_ipcp->state_lock); + /* FIXME: Wait until state changed to ENROLLED */ return 0; } diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 4d29b098..8bb320c0 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -34,9 +34,12 @@ #include "ribmgr.h" #include "dt_const.h" #include "frct.h" +#include "ipcp.h" #define ENROLLMENT "enrollment" +extern struct ipcp * _ipcp; + enum cdap_opcode { READ = 0, WRITE, @@ -103,11 +106,11 @@ int cdap_request_add(struct cdap * instance, int ribmgr_init() { rib = malloc(sizeof(*rib)); - if (rib == NULL) { + if (rib == NULL) return -1; - } INIT_LIST_HEAD(&rib->flows); + INIT_LIST_HEAD(&rib->cdap_reqs); if (pthread_rwlock_init(&rib->flows_lock, NULL)) { LOG_ERR("Failed to initialize rwlock."); @@ -141,11 +144,13 @@ int ribmgr_fini() pthread_mutex_unlock(&rib->cdap_reqs_lock); pthread_rwlock_wrlock(&rib->flows_lock); - list_for_each_safe(pos, n, &rib->cdap_reqs) { + list_for_each_safe(pos, n, &rib->flows) { struct mgmt_flow * flow = list_entry(pos, struct mgmt_flow, next); if (cdap_destroy(flow->instance)) LOG_ERR("Failed to destroy CDAP instance."); + list_del(&flow->next); + free(flow); } pthread_rwlock_unlock(&rib->flows_lock); @@ -232,7 +237,7 @@ static struct cdap_ops ribmgr_ops = { .cdap_stop = ribmgr_cdap_stop }; -int ribmgr_mgmt_flow(int fd) +int ribmgr_add_flow(int fd) { struct cdap * instance = NULL; struct mgmt_flow * flow; @@ -253,8 +258,14 @@ int ribmgr_mgmt_flow(int fd) flow->instance = instance; flow->fd = fd; + pthread_rwlock_rdlock(&_ipcp->state_lock); pthread_rwlock_wrlock(&rib->flows_lock); - if (list_empty(&rib->flows)) { + if (list_empty(&rib->flows) && + (_ipcp->state == IPCP_INIT || + _ipcp->state == IPCP_DISCONNECTED)) { + _ipcp->state = IPCP_PENDING_ENROLL; + pthread_rwlock_unlock(&_ipcp->state_lock); + pthread_mutex_lock(&rib->cdap_reqs_lock); iid = cdap_send_start(instance, ENROLLMENT); @@ -277,6 +288,7 @@ int ribmgr_mgmt_flow(int fd) } pthread_mutex_unlock(&rib->cdap_reqs_lock); } + pthread_rwlock_unlock(&_ipcp->state_lock); list_add(&flow->next, &rib->flows); pthread_rwlock_unlock(&rib->flows_lock); @@ -284,6 +296,27 @@ int ribmgr_mgmt_flow(int fd) return 0; } +int ribmgr_remove_flow(int fd) +{ + struct list_head * pos, * n = NULL; + + pthread_rwlock_wrlock(&rib->flows_lock); + list_for_each_safe(pos, n, &rib->flows) { + struct mgmt_flow * flow = + list_entry(pos, struct mgmt_flow, next); + if (flow->fd == fd) { + if (cdap_destroy(flow->instance)) + LOG_ERR("Failed to destroy CDAP instance."); + list_del(&flow->next); + free(flow); + return 0; + } + } + pthread_rwlock_unlock(&rib->flows_lock); + + return -1; +} + int ribmgr_bootstrap(struct dif_config * conf) { if (conf == NULL || @@ -310,10 +343,3 @@ int ribmgr_bootstrap(struct dif_config * conf) return 0; } - -int ribmgr_fmgr_msg() -{ - LOG_MISSING; - - return -1; -} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index 335189f9..e85c65be 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -28,10 +28,9 @@ int ribmgr_init(); int ribmgr_fini(); -int ribmgr_mgmt_flow(int fd); -int ribmgr_bootstrap(struct dif_config * conf); +int ribmgr_add_flow(int fd); +int ribmgr_remove_flow(int fd); -/* Called by Flow Manager (param of type fmgr_msg_t) */ -int ribmgr_fmgr_msg(); +int ribmgr_bootstrap(struct dif_config * conf); #endif diff --git a/src/lib/cdap.c b/src/lib/cdap.c index 441f7e44..4599fd8b 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -212,6 +212,7 @@ struct cdap * cdap_create(struct cdap_ops * ops, } instance->ops = ops; + instance->fd = fd; instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { @@ -234,6 +235,9 @@ int cdap_destroy(struct cdap * instance) pthread_cancel(instance->reader); + if (flow_dealloc(instance->fd)) + return -1; + pthread_mutex_lock(&instance->ids_lock); bmp_destroy(instance->ids); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 618c4c19..86570d98 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -296,7 +296,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) while (tail_el_ptr->port_id < 0) *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); - while(shm_rbuff_empty(rb)) + while (shm_rbuff_empty(rb)) if (pthread_cond_wait(rb->work, rb->shm_mutex) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); -- cgit v1.2.3