From f0167930862e57a2aa22520cd574f0368cb1032c Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Mon, 9 Jan 2017 16:06:48 +0100 Subject: ipcpd: normal: Add graph adjacency manager This commit adds the graph adjacency manager to the normal IPCP, which sets up N-1 flows to other members. --- src/ipcpd/normal/CMakeLists.txt | 1 + src/ipcpd/normal/fmgr.c | 197 ++++++++++++++--------- src/ipcpd/normal/fmgr.h | 10 +- src/ipcpd/normal/gam.c | 339 ++++++++++++++++++++++++++++++++++++++++ src/ipcpd/normal/gam.h | 44 ++++++ src/ipcpd/normal/main.c | 11 +- src/ipcpd/normal/ribmgr.c | 6 +- 7 files changed, 523 insertions(+), 85 deletions(-) create mode 100644 src/ipcpd/normal/gam.c create mode 100644 src/ipcpd/normal/gam.h (limited to 'src/ipcpd/normal') diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index bdcb78ae..43059c3e 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -29,6 +29,7 @@ set(SOURCE_FILES dir.c fmgr.c frct.c + gam.c main.c pathname.c pff.c diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index a419e9f5..d839cf1b 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -42,15 +43,24 @@ #include "dir.h" #include "pathname.h" #include "ro.h" +#include "gam.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ +struct nm1_flow { + struct list_head next; + int fd; + qosspec_t qs; + struct cacep_info * info; +}; + struct { flow_set_t * nm1_set[QOS_CUBE_MAX]; fqueue_t * nm1_fqs[QOS_CUBE_MAX]; + struct list_head nm1_flows; pthread_rwlock_t nm1_flows_lock; flow_set_t * np1_set[QOS_CUBE_MAX]; @@ -60,21 +70,23 @@ struct { cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - pthread_t nm1_sdu_reader; pthread_t np1_sdu_reader; + pthread_t nm1_sdu_reader; + pthread_t nm1_flow_wait; /* FIXME: Replace with PFF */ int fd; + + struct gam * gam; } fmgr; static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - int fd; - - int i = 0; - int ret; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + int fd; + int i = 0; + int ret; (void) o; @@ -118,12 +130,12 @@ static void * fmgr_np1_sdu_reader(void * o) void * fmgr_nm1_sdu_reader(void * o) { - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; - struct pci * pci; - int fd; - int i = 0; - int ret; + struct pci * pci; + int fd; + int i = 0; + int ret; (void) o; @@ -202,6 +214,49 @@ void * fmgr_nm1_sdu_reader(void * o) return (void *) 0; } +static void * fmgr_nm1_flow_wait(void * o) +{ + qoscube_t cube; + struct cacep_info * info; + int fd; + qosspec_t qs; + struct nm1_flow * flow; + + (void) o; + + while (true) { + if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) { + LOG_ERR("Failed to get next flow descriptor."); + continue;; + } + + ipcp_flow_get_qoscube(fd, &cube); + flow_set_add(fmgr.nm1_set[cube], fd); + + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; + + pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); + flow = malloc(sizeof(*flow)); + if (flow == NULL) { + free(info); + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); + continue; + } + + flow->info = info; + flow->fd = fd; + flow->qs = qs; + + INIT_LIST_HEAD(&flow->next); + list_add(&flow->next, &fmgr.nm1_flows); + + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); + } + + return (void *) 0; +} + static void fmgr_destroy_flows(void) { int i; @@ -224,9 +279,6 @@ int fmgr_init() for (i = 0; i < IPCPD_MAX_CONNS; ++i) fmgr.np1_cep_id_to_fd[i] = -1; - pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); - pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); - for (i = 0; i < QOS_CUBE_MAX; ++i) { fmgr.np1_set[i] = flow_set_create(); if (fmgr.np1_set[i] == NULL) { @@ -253,29 +305,55 @@ int fmgr_init() } } + fmgr.gam = gam_create(DT_AE); + if (fmgr.gam == NULL) { + LOG_ERR("Failed to create graph adjacency manager."); + fmgr_destroy_flows(); + return -1; + } + + INIT_LIST_HEAD(&fmgr.nm1_flows); + + pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); + pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); + pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); + pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL); return 0; } int fmgr_fini() { - int i; - int j; + struct list_head * pos = NULL; + struct list_head * n = NULL; + qoscube_t cube; pthread_cancel(fmgr.np1_sdu_reader); pthread_cancel(fmgr.nm1_sdu_reader); + pthread_cancel(fmgr.nm1_flow_wait); pthread_join(fmgr.np1_sdu_reader, NULL); pthread_join(fmgr.nm1_sdu_reader, NULL); + pthread_join(fmgr.nm1_flow_wait, NULL); - for (i = 0; i < AP_MAX_FLOWS; ++i) - for (j = 0; j < QOS_CUBE_MAX; ++j) - if (flow_set_has(fmgr.nm1_set[j], i)) { - flow_dealloc(i); - flow_set_del(fmgr.nm1_set[j], i); - } + gam_destroy(fmgr.gam); + + pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); + + list_for_each_safe(pos, n, &fmgr.nm1_flows) { + struct nm1_flow * flow = + list_entry(pos, struct nm1_flow, next); + list_del(&flow->next); + flow_dealloc(flow->fd); + ipcp_flow_get_qoscube(flow->fd, &cube); + flow_set_del(fmgr.nm1_set[cube], flow->fd); + free(flow->info); + free(flow); + } + + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); pthread_rwlock_destroy(&fmgr.nm1_flows_lock); pthread_rwlock_destroy(&fmgr.np1_flows_lock); @@ -290,12 +368,12 @@ int fmgr_np1_alloc(int fd, char * src_ae_name, qoscube_t cube) { - cep_id_t cep_id; - buffer_t buf; + cep_id_t cep_id; + buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - char * path; - uint8_t * ro_data; - uint64_t addr; + char * path; + uint8_t * ro_data; + uint64_t addr; path = pathname_create(RO_DIR); if (path == NULL) @@ -359,9 +437,9 @@ int fmgr_np1_alloc(int fd, static int np1_flow_dealloc(int fd) { flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - int ret; - qoscube_t cube; + buffer_t buf; + int ret; + qoscube_t cube; ipcp_flow_get_qoscube(fd, &cube); flow_set_del(fmgr.np1_set[cube], fd); @@ -388,10 +466,11 @@ static int np1_flow_dealloc(int fd) return ret; } -int fmgr_np1_alloc_resp(int fd, int response) +int fmgr_np1_alloc_resp(int fd, + int response) { flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; + buffer_t buf; msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; msg.response = response; @@ -443,7 +522,8 @@ int fmgr_np1_dealloc(int fd) return ret; } -int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, + buffer_t * buf) { int ret = 0; int fd; @@ -512,7 +592,8 @@ int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf) return ret; } -int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) +int fmgr_np1_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) { int fd; @@ -530,52 +611,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb) return 0; } -/* FIXME: do this in a topologymanager instance */ -int fmgr_nm1_add_flow(int fd) +int fmgr_nm1_flow_arr(int fd, + qosspec_t qs) { - qoscube_t qos; + assert(fmgr.gam); - if (flow_alloc_resp(fd, 0) < 0) { - LOG_ERR("Could not respond to new flow."); + if (gam_flow_arr(fmgr.gam, fd, qs)) { + LOG_ERR("Failed to hand to connectivy manager."); return -1; } - ipcp_flow_get_qoscube(fd, &qos); - flow_set_add(fmgr.nm1_set[qos], fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - - return 0; -} - -int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos) -{ - int fd; - int result; - - /* FIXME: Map qos cube on correct QoS. */ - fd = flow_alloc(dst_name, DT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s.", dst_name); - return -1; - } - - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Allocate flow to %s result %d.", dst_name, result); - return -1; - } - - flow_set_add(fmgr.nm1_set[qos], fd); - - /* FIXME: Temporary, until we have a PFF */ - fmgr.fd = fd; - return 0; } -int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) { if (pci == NULL || sdb == NULL) return -1; @@ -595,7 +645,8 @@ int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb) return 0; } -int fmgr_nm1_write_buf(struct pci * pci, buffer_t * buf) +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf) { buffer_t * buffer; diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 85731081..ae5c8ea8 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -23,6 +23,7 @@ #define OUROBOROS_IPCPD_NORMAL_FMGR_H #include +#include #include "ae.h" #include "frct.h" @@ -47,15 +48,14 @@ int fmgr_np1_post_buf(cep_id_t id, int fmgr_np1_post_sdu(cep_id_t id, struct shm_du_buff * sdb); -int fmgr_nm1_add_flow(int fd); - -int fmgr_nm1_dt_flow(char * dst_name, - qoscube_t qos); - int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb); int fmgr_nm1_write_buf(struct pci * pci, buffer_t * buf); +int fmgr_nm1_flow_arr(int fd, + qosspec_t qs); + + #endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */ diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c new file mode 100644 index 00000000..a749563d --- /dev/null +++ b/src/ipcpd/normal/gam.c @@ -0,0 +1,339 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Graph adjacency manager for IPC Process components + * + * Dimitri Staeesens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "graph-adjacency-manager" + +#include +#include +#include +#include +#include +#include + +#include "ribmgr.h" +#include "ipcp.h" +#include "ro.h" +#include "pathname.h" +#include "gam.h" + +#include +#include +#include +#include + +#define RO_DIR "neighbors" + +struct ga { + struct list_head next; + + qosspec_t qs; + int fd; + struct cacep_info * info; +}; + +struct gam { + struct list_head gas; + pthread_mutex_t gas_lock; + pthread_cond_t gas_cond; + + char * ae_name; + + /* FIXME: Keep a list of known members */ + + pthread_t allocator; +}; + +static void * allocator(void * o) +{ + qosspec_t qs; + ssize_t len; + char ** children; + struct gam * instance; + int i; + char * ro_name; + + instance = (struct gam *) o; + + qs.delay = 0; + qs.jitter = 0; + + ro_name = pathname_create(RO_DIR); + if (ro_name == NULL) + return (void *) -1; + + len = ro_children(ro_name, &children); + if (len > 0) { + for (i = 0; i < len; i++) { + if (strcmp(children[i], ipcpi.name) == 0) + continue; + gam_flow_alloc(instance, children[i], qs); + } + } + + pathname_destroy(ro_name); + + return (void *) 0; +} + +struct gam * gam_create(char * ae_name) +{ + struct gam * tmp; + struct ro_attr attr; + char * ro_name; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return NULL; + + INIT_LIST_HEAD(&tmp->gas); + + tmp->ae_name = strdup(ae_name); + if (tmp->ae_name == NULL) { + free(tmp); + return NULL; + } + + if (pthread_mutex_init(&tmp->gas_lock, NULL)) { + free(tmp->ae_name); + free(tmp); + return NULL; + } + + if (pthread_cond_init(&tmp->gas_cond, NULL)) { + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + + ro_attr_init(&attr); + attr.enrol_sync = true; + attr.recv_set = ALL_MEMBERS; + + ro_name = pathname_create(RO_DIR); + if (ro_name == NULL) { + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + + if (!ro_exists(RO_DIR)) { + if (ro_create(ro_name, &attr, NULL, 0)) { + pathname_destroy(ro_name); + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + } + + ro_name = pathname_append(ro_name, ipcpi.name); + if (ro_name == NULL) { + pathname_destroy(ro_name); + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + + if (ro_create(ro_name, &attr, NULL, 0)) { + pathname_destroy(ro_name); + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + pathname_destroy(ro_name); + + if (pthread_create(&tmp->allocator, NULL, allocator, (void *) tmp)) { + pthread_cond_destroy(&tmp->gas_cond); + pthread_mutex_destroy(&tmp->gas_lock); + free(tmp->ae_name); + free(tmp); + return NULL; + } + + return tmp; +} + +void gam_destroy(struct gam * instance) +{ + struct list_head * p = NULL; + struct list_head * n = NULL; + + assert(instance); + + pthread_cancel(instance->allocator); + pthread_join(instance->allocator, NULL); + + pthread_mutex_destroy(&instance->gas_lock); + pthread_cond_destroy(&instance->gas_cond); + + list_for_each_safe(p, n, &instance->gas) { + struct ga * e = list_entry(p, struct ga, next); + list_del(&e->next); + free(e->info); + free(e); + } + + free(instance->ae_name); + free(instance); +} + +static int add_ga(struct gam * instance, + int fd, + qosspec_t qs, + struct cacep_info * info) +{ + struct ga * ga; + + ga = malloc(sizeof(*ga)); + if (ga == NULL) + return -ENOMEM; + + ga->fd = fd; + ga->info = info; + ga->qs = qs; + + INIT_LIST_HEAD(&ga->next); + + pthread_mutex_lock(&instance->gas_lock); + list_add(&ga->next, &instance->gas); + pthread_cond_signal(&instance->gas_cond); + pthread_mutex_unlock(&instance->gas_lock); + + return 0; +} + +int gam_flow_arr(struct gam * instance, + int fd, + qosspec_t qs) +{ + struct cacep * cacep; + struct cacep_info * info; + + if (flow_alloc_resp(fd, 0) < 0) { + LOG_ERR("Could not respond to new flow."); + return -1; + } + + cacep = cacep_create(fd, ipcpi.name, ribmgr_address()); + if (cacep == NULL) { + LOG_ERR("Failed to create CACEP instance."); + return -1; + } + + info = cacep_auth_wait(cacep); + if (info == NULL) { + LOG_ERR("Other side failed to authenticate."); + cacep_destroy(cacep); + return -1; + } + cacep_destroy(cacep); + + if (add_ga(instance, fd, qs, info)) { + LOG_ERR("Failed to add ga to graph adjacency manager list."); + free(info); + return -1; + } + + return 0; +} + +int gam_flow_alloc(struct gam * instance, + char * dst_name, + qosspec_t qs) +{ + struct cacep * cacep; + struct cacep_info * info; + int fd; + + fd = flow_alloc(dst_name, instance->ae_name, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s.", dst_name); + return -1; + } + + if (flow_alloc_res(fd)) { + LOG_ERR("Flow allocation to %s failed.", dst_name); + flow_dealloc(fd); + return -1; + } + + cacep = cacep_create(fd, ipcpi.name, ribmgr_address()); + if (cacep == NULL) { + LOG_ERR("Failed to create CACEP instance."); + return -1; + } + + info = cacep_auth(cacep); + if (info == NULL) { + LOG_ERR("Failed to authenticate."); + cacep_destroy(cacep); + return -1; + } + cacep_destroy(cacep); + + if (add_ga(instance, fd, qs, info)) { + LOG_ERR("Failed to add ga to graph adjacency manager list."); + free(info); + return -1; + } + + return 0; +} + +int gam_flow_wait(struct gam * instance, + int * fd, + struct cacep_info ** info, + qosspec_t * qs) +{ + struct ga * ga; + + assert(fd); + assert(info); + assert(qs); + + pthread_mutex_lock(&instance->gas_lock); + + while (list_empty(&instance->gas)) + pthread_cond_wait(&instance->gas_cond, &instance->gas_lock); + + ga = list_first_entry((&instance->gas), struct ga, next); + if (ga == NULL) { + pthread_mutex_unlock(&instance->gas_lock); + LOG_ERR("Ga was NULL."); + return -1; + } + + *fd = ga->fd; + *info = ga->info; + *qs = ga->qs; + + list_del(&ga->next); + free(ga); + + pthread_mutex_unlock(&instance->gas_lock); + + return 0; +} diff --git a/src/ipcpd/normal/gam.h b/src/ipcpd/normal/gam.h new file mode 100644 index 00000000..309cb46d --- /dev/null +++ b/src/ipcpd/normal/gam.h @@ -0,0 +1,44 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Graph adjacency manager for IPC Process components + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_GAM_H +#define OUROBOROS_IPCPD_NORMAL_GAM_H + +/* FIXME: Will take a policy */ +struct gam * gam_create(char * ae_name); + +void gam_destroy(struct gam * instance); + +int gam_flow_arr(struct gam * instance, + int fd, + qosspec_t qs); + +int gam_flow_alloc(struct gam * instance, + char * dst_name, + qosspec_t qs); + +int gam_flow_wait(struct gam * instance, + int * fd, + struct cacep_info ** info, + qosspec_t * qs); + +#endif diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 94f463af..85f56ab0 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -48,7 +48,9 @@ int irmd_api; pthread_t acceptor; -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +void ipcp_sig_handler(int sig, + siginfo_t * info, + void * c) { (void) c; @@ -102,7 +104,7 @@ static void * flow_acceptor(void * o) if (strcmp(ae_name, MGMT_AE) == 0) { ribmgr_add_nm1_flow(fd); } else if (strcmp(ae_name, DT_AE) == 0) { - fmgr_nm1_add_flow(fd); + fmgr_nm1_flow_arr(fd, qs); } else { LOG_DBG("Flow allocation request for unknown AE %s.", ae_name); @@ -291,10 +293,11 @@ static struct ipcp_ops normal_ops = { .ipcp_flow_dealloc = fmgr_np1_dealloc }; -int main(int argc, char * argv[]) +int main(int argc, + char * argv[]) { struct sigaction sig_act; - sigset_t sigset; + sigset_t sigset; if (ap_init(argv[0])) { LOG_ERR("Failed to init AP"); diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index e52db08a..f2c4cda2 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -1577,8 +1577,8 @@ ssize_t ro_children(const char * name, char *** children) } child = node->child; - **children = malloc(len); - if (**children == NULL) { + *children = malloc(len); + if (*children == NULL) { pthread_mutex_unlock(&rib.ro_lock); return -1; } @@ -1590,7 +1590,7 @@ ssize_t ro_children(const char * name, char *** children) free((*children)[i]); i--; } - free(**children); + free(*children); pthread_mutex_unlock(&rib.ro_lock); return -1; } -- cgit v1.2.3