diff options
author | Sander Vrijders <[email protected]> | 2017-04-21 11:21:44 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2017-04-21 12:46:37 +0200 |
commit | a9d71381a84886007625958b9daea6b2d4a50563 (patch) | |
tree | 67b8576e9747d7815c7eed7170f49a10e5a4e0e0 /src/ipcpd/normal/fmgr.c | |
parent | 4bfd6c07281847405e127e9588376fcf20d07a7e (diff) | |
download | ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.tar.gz ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.zip |
ipcpd: normal: Split flow manager into DT and FA
This splits the flow manager into the Data Transfer AE, which is in
charge of routing SDUs, and the Flow Allocator AE, which handles flow
allocations.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 748 |
1 files changed, 0 insertions, 748 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c deleted file mode 100644 index d055b311..00000000 --- a/src/ipcpd/normal/fmgr.c +++ /dev/null @@ -1,748 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2017 - * - * Flow manager of the IPC Process - * - * Dimitri Staessens <[email protected]> - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include <ouroboros/config.h> -#include <ouroboros/logs.h> -#include <ouroboros/dev.h> -#include <ouroboros/list.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/fqueue.h> -#include <ouroboros/errno.h> -#include <ouroboros/cacep.h> -#include <ouroboros/rib.h> - -#include "connmgr.h" -#include "fmgr.h" -#include "frct.h" -#include "ipcp.h" -#include "shm_pci.h" -#include "ribconfig.h" -#include "pff.h" -#include "neighbors.h" -#include "gam.h" -#include "routing.h" -#include "sdu_sched.h" - -#include <stdlib.h> -#include <stdbool.h> -#include <pthread.h> -#include <string.h> -#include <inttypes.h> - -#include "flow_alloc.pb-c.h" -typedef FlowAllocMsg flow_alloc_msg_t; - -#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ - -struct { - pthread_rwlock_t np1_flows_lock; - cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS]; - int np1_cep_id_to_fd[IPCPD_MAX_CONNS]; - - flow_set_t * np1_set[QOS_CUBE_MAX]; - struct sdu_sched * np1_sdu_sched; - - flow_set_t * nm1_set[QOS_CUBE_MAX]; - struct sdu_sched * nm1_sdu_sched; - - struct pff * pff[QOS_CUBE_MAX]; - struct routing_i * routing[QOS_CUBE_MAX]; - - struct gam * gam; - struct nbs * nbs; - struct ae * ae; - - struct nb_notifier nb_notifier; -} fmgr; - -static int fmgr_neighbor_event(enum nb_event event, - struct conn conn) -{ - qoscube_t cube; - - /* We are only interested in neighbors being added and removed. */ - switch (event) { - case NEIGHBOR_ADDED: - ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); - flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd); - log_dbg("Added fd %d to flow set.", conn.flow_info.fd); - break; - case NEIGHBOR_REMOVED: - ipcp_flow_get_qoscube(conn.flow_info.fd, &cube); - flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd); - log_dbg("Removed fd %d from flow set.", conn.flow_info.fd); - break; - default: - break; - } - - return 0; -} - -static int np1_sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) -{ - (void) qc; - - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - log_warn("Failed to hand SDU to FRCT."); - return -1; - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -static int nm1_sdu_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) -{ - struct pci pci; - - memset(&pci, 0, sizeof(pci)); - - shm_pci_des(sdb, &pci); - - if (pci.dst_addr != ipcpi.dt_addr) { - if (pci.ttl == 0) { - log_dbg("TTL was zero."); - ipcp_flow_del(sdb); - return 0; - } - - pff_lock(fmgr.pff[qc]); - - fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[qc]); - log_err("No next hop for %" PRIu64, pci.dst_addr); - ipcp_flow_del(sdb); - return -1; - } - - pff_unlock(fmgr.pff[qc]); - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", fd); - ipcp_flow_del(sdb); - return -1; - } - } else { - shm_pci_shrink(sdb); - - if (frct_nm1_post_sdu(&pci, sdb)) { - log_err("Failed to hand PDU to FRCT."); - return -1; - } - } - - return 0; -} - -static void fmgr_destroy_flows(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - flow_set_destroy(fmgr.nm1_set[i]); - flow_set_destroy(fmgr.np1_set[i]); - } -} - -static void fmgr_destroy_routing(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) - routing_i_destroy(fmgr.routing[i]); -} - -static void fmgr_destroy_pff(void) -{ - int i; - - for (i = 0; i < QOS_CUBE_MAX; ++i) - pff_destroy(fmgr.pff[i]); -} - -int fmgr_init(void) -{ - int i; - int j; - struct conn_info info; - - for (i = 0; i < AP_MAX_FLOWS; ++i) - fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; - - for (i = 0; i < IPCPD_MAX_CONNS; ++i) - fmgr.np1_cep_id_to_fd[i] = -1; - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fmgr.np1_set[i] = flow_set_create(); - if (fmgr.np1_set[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - - fmgr.nm1_set[i] = flow_set_create(); - if (fmgr.nm1_set[i] == NULL) { - fmgr_destroy_flows(); - return -1; - } - } - - if (shm_pci_init()) { - log_err("Failed to init shm pci."); - fmgr_destroy_flows(); - return -1; - } - - memset(&info, 0, sizeof(info)); - - strcpy(info.ae_name, DT_AE); - strcpy(info.protocol, FRCT_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_FIXED; - info.addr = ipcpi.dt_addr; - - fmgr.ae = connmgr_ae_create(info); - if (fmgr.ae == NULL) { - log_err("Failed to create AE struct."); - fmgr_destroy_flows(); - return -1; - } - - fmgr.nbs = nbs_create(); - if (fmgr.nbs == NULL) { - log_err("Failed to create neighbors struct."); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - fmgr.nb_notifier.notify_call = fmgr_neighbor_event; - if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) { - log_err("Failed to register notifier."); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - if (routing_init(fmgr.nbs)) { - log_err("Failed to init routing."); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) { - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - fmgr.pff[i] = pff_create(); - if (fmgr.pff[i] == NULL) { - for (j = 0; j < i; ++j) - pff_destroy(fmgr.pff[j]); - pthread_rwlock_destroy(&fmgr.np1_flows_lock); - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - - fmgr.routing[i] = routing_i_create(fmgr.pff[i]); - if (fmgr.routing[i] == NULL) { - for (j = 0; j < i; ++j) - routing_i_destroy(fmgr.routing[j]); - fmgr_destroy_pff(); - pthread_rwlock_destroy(&fmgr.np1_flows_lock); - routing_fini(); - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - nbs_destroy(fmgr.nbs); - fmgr_destroy_flows(); - connmgr_ae_destroy(fmgr.ae); - return -1; - } - } - - return 0; -} - -void fmgr_fini() -{ - nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier); - - fmgr_destroy_routing(); - - fmgr_destroy_pff(); - - routing_fini(); - - fmgr_destroy_flows(); - - connmgr_ae_destroy(fmgr.ae); - - nbs_destroy(fmgr.nbs); -} - -int fmgr_start(void) -{ - enum pol_gam pg; - - if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg)) - != sizeof(pg)) { - log_err("Failed to read policy for ribmgr gam."); - return -1; - } - - fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae); - if (fmgr.gam == NULL) { - log_err("Failed to init dt graph adjacency manager."); - return -1; - } - - fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); - if (fmgr.nm1_sdu_sched == NULL) { - log_err("Failed to create N-1 SDU scheduler."); - gam_destroy(fmgr.gam); - return -1; - } - - fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); - if (fmgr.np1_sdu_sched == NULL) { - log_err("Failed to create N+1 SDU scheduler."); - sdu_sched_destroy(fmgr.nm1_sdu_sched); - gam_destroy(fmgr.gam); - return -1; - } - - return 0; -} - -void fmgr_stop(void) -{ - sdu_sched_destroy(fmgr.np1_sdu_sched); - - sdu_sched_destroy(fmgr.nm1_sdu_sched); - - gam_destroy(fmgr.gam); -} - -int fmgr_np1_alloc(int fd, - const uint8_t * dst, - qoscube_t cube) -{ - cep_id_t cep_id; - buffer_t buf; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - char path[RIB_MAX_PATH_LEN + 1]; - uint64_t addr; - ssize_t ch; - ssize_t i; - char ** children; - char hashstr[ipcp_dir_hash_strlen() + 1]; - char * dst_ipcp = NULL; - - ipcp_hash_str(hashstr, dst); - - assert(strlen(hashstr) + strlen(DIR_PATH) + 1 - < RIB_MAX_PATH_LEN); - - strcpy(path, DIR_PATH); - - rib_path_append(path, hashstr); - - ch = rib_children(path, &children); - if (ch <= 0) - return -1; - - for (i = 0; i < ch; ++i) - if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0) - dst_ipcp = children[i]; - else - free(children[i]); - - free(children); - - if (dst_ipcp == NULL) - return -1; - - strcpy(path, MEMBERS_PATH); - - rib_path_append(path, dst_ipcp); - - free(dst_ipcp); - - if (rib_read(path, &addr, sizeof(addr)) < 0) - return -1; - - msg.code = FLOW_ALLOC_CODE__FLOW_REQ; - msg.has_hash = true; - msg.hash.len = ipcp_dir_hash_len(); - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = cube; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -1; - - flow_alloc_msg__pack(&msg, buf.data); - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - cep_id = frct_i_create(addr, &buf, cube); - if (cep_id == INVALID_CEP_ID) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - free(buf.data); - return -1; - } - - free(buf.data); - - fmgr.np1_fd_to_cep_id[fd] = cep_id; - fmgr.np1_cep_id_to_fd[cep_id] = fd; - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -/* Call under np1_flows lock */ -static int np1_flow_dealloc(int fd) -{ - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - int ret; - qoscube_t cube; - - ipcp_flow_get_qoscube(fd, &cube); - flow_set_del(fmgr.np1_set[cube], fd); - - msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; - - flow_alloc_msg__pack(&msg, buf.data); - - ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - - fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; - fmgr.np1_fd_to_cep_id[fd] = -1; - - free(buf.data); - - return ret; -} - -int fmgr_np1_alloc_resp(int fd, - int response) -{ - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; - msg.response = response; - msg.has_response = true; - - pthread_mutex_lock(&ipcpi.alloc_lock); - - while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - ipcpi.alloc_id = -1; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; - - flow_alloc_msg__pack(&msg, buf.data); - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - if (response < 0) { - frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); - free(buf.data); - fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] - = INVALID_CEP_ID; - fmgr.np1_fd_to_cep_id[fd] = -1; - } else { - qoscube_t cube; - ipcp_flow_get_qoscube(fd, &cube); - if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - free(buf.data); - return -1; - } - flow_set_add(fmgr.np1_set[cube], fd); - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - free(buf.data); - - return 0; -} - -int fmgr_np1_dealloc(int fd) -{ - int ret; - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - ret = np1_flow_dealloc(fd); - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return ret; -} - -int fmgr_np1_post_buf(cep_id_t cep_id, - buffer_t * buf) -{ - struct timespec ts = {0, FD_UPDATE_TIMEOUT * 1000}; - int ret = 0; - int fd; - flow_alloc_msg_t * msg; - qoscube_t cube; - - /* Depending on the message call the function in ipcp-dev.h */ - - msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); - if (msg == NULL) { - log_err("Failed to unpack flow alloc message"); - return -1; - } - - switch (msg->code) { - case FLOW_ALLOC_CODE__FLOW_REQ: - pthread_mutex_lock(&ipcpi.alloc_lock); - - if (!msg->has_hash) { - log_err("Bad flow request."); - return -1; - } - - while (ipcpi.alloc_id != -1 && - ipcp_get_state() == IPCP_OPERATIONAL) - pthread_cond_timedwait(&ipcpi.alloc_cond, - &ipcpi.alloc_lock, - &ts); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_dbg("Won't allocate over non-operational IPCP."); - pthread_mutex_unlock(&ipcpi.alloc_lock); - return -1; - } - - assert(ipcpi.alloc_id == -1); - - fd = ipcp_flow_req_arr(getpid(), - msg->hash.data, - ipcp_dir_hash_len(), - msg->qoscube); - if (fd < 0) { - pthread_mutex_unlock(&ipcpi.alloc_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - log_err("Failed to get fd for flow."); - return -1; - } - - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - fmgr.np1_fd_to_cep_id[fd] = cep_id; - fmgr.np1_cep_id_to_fd[cep_id] = fd; - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - ipcpi.alloc_id = fd; - pthread_cond_broadcast(&ipcpi.alloc_cond); - - pthread_mutex_unlock(&ipcpi.alloc_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_REPLY: - pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - - fd = fmgr.np1_cep_id_to_fd[cep_id]; - ret = ipcp_flow_alloc_reply(fd, msg->response); - if (msg->response < 0) { - fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; - fmgr.np1_cep_id_to_fd[cep_id] = -1; - } else { - ipcp_flow_get_qoscube(fd, &cube); - flow_set_add(fmgr.np1_set[cube], - fmgr.np1_cep_id_to_fd[cep_id]); - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - break; - case FLOW_ALLOC_CODE__FLOW_DEALLOC: - fd = fmgr.np1_cep_id_to_fd[cep_id]; - ipcp_flow_get_qoscube(fd, &cube); - flow_set_del(fmgr.np1_set[cube], fd); - ret = flow_dealloc(fd); - break; - default: - log_err("Got an unknown flow allocation message."); - ret = -1; - break; - } - - flow_alloc_msg__free_unpacked(msg, NULL); - - return ret; -} - -int fmgr_np1_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) -{ - int fd; - - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - - fd = fmgr.np1_cep_id_to_fd[cep_id]; - if (ipcp_flow_write(fd, sdb)) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - log_err("Failed to hand SDU to N flow."); - return -1; - } - - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - - return 0; -} - -int fmgr_nm1_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) -{ - int fd; - - if (pci == NULL || sdb == NULL) - return -EINVAL; - - pff_lock(fmgr.pff[pci->qos_id]); - fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[pci->qos_id]); - log_err("Could not get nhop for address %" PRIu64, - pci->dst_addr); - ipcp_flow_del(sdb); - return -1; - } - pff_unlock(fmgr.pff[pci->qos_id]); - - if (shm_pci_ser(sdb, pci)) { - log_err("Failed to serialize PDU."); - ipcp_flow_del(sdb); - return -1; - } - - if (ipcp_flow_write(fd, sdb)) { - log_err("Failed to write SDU to fd %d.", fd); - ipcp_flow_del(sdb); - return -1; - } - - return 0; -} - -int fmgr_nm1_write_buf(struct pci * pci, - buffer_t * buf) -{ - buffer_t * buffer; - int fd; - - if (pci == NULL || buf == NULL || buf->data == NULL) - return -EINVAL; - - pff_lock(fmgr.pff[pci->qos_id]); - fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr); - if (fd < 0) { - pff_unlock(fmgr.pff[pci->qos_id]); - log_err("Could not get nhop for address %" PRIu64, - pci->dst_addr); - return -1; - } - pff_unlock(fmgr.pff[pci->qos_id]); - - buffer = shm_pci_ser_buf(buf, pci); - if (buffer == NULL) { - log_err("Failed to serialize buffer."); - return -1; - } - - if (flow_write(fd, buffer->data, buffer->len) == -1) { - log_err("Failed to write buffer to fd."); - free(buffer); - return -1; - } - - free(buffer->data); - free(buffer); - return 0; -} |