diff options
-rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/ipcpd/normal/config.h | 28 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 591 | ||||
-rw-r--r-- | src/ipcpd/normal/fmgr.h | 43 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.c | 33 | ||||
-rw-r--r-- | src/ipcpd/normal/frct.h | 9 | ||||
-rw-r--r-- | src/ipcpd/normal/main.c | 8 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.c | 26 | ||||
-rw-r--r-- | src/ipcpd/normal/ribmgr.h | 5 | ||||
-rw-r--r-- | src/ipcpd/normal/rmt.c | 195 | ||||
-rw-r--r-- | src/ipcpd/normal/rmt.h | 46 |
11 files changed, 411 insertions, 574 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 654bb127..151721a2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -28,7 +28,6 @@ set(SOURCE_FILES frct.c main.c ribmgr.c - rmt.c shm_pci.c ) diff --git a/src/ipcpd/normal/config.h b/src/ipcpd/normal/config.h deleted file mode 100644 index 0febf3fd..00000000 --- a/src/ipcpd/normal/config.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP configuration constants - * - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_CONFIG_H -#define OUROBOROS_IPCP_CONFIG_H - -#define FD_UPDATE_TIMEOUT 100 /* microseconds */ - -#endif diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 25898661..8c627641 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -39,46 +39,46 @@ #include "ribmgr.h" #include "frct.h" #include "ipcp.h" -#include "rmt.h" #include "shm_pci.h" -#include "config.h" #include "flow_alloc.pb-c.h" typedef FlowAllocMsg flow_alloc_msg_t; -struct n_flow { +#define FD_UPDATE_TIMEOUT 100 /* microseconds */ + +struct np1_flow { int fd; cep_id_t cep_id; enum qos_cube qos; - - struct list_head next; }; -struct n_1_flow { - int fd; - char * ae_name; - struct list_head next; +struct nm1_flow { + int fd; + char * ae_name; + enum qos_cube qos; }; struct { - pthread_t n_1_flow_acceptor; - - /* FIXME: Make this a table */ - struct list_head n_1_flows; - pthread_mutex_t n_1_flows_lock; - - /* FIXME: Make this a table */ - struct list_head n_flows; - /* FIXME: Make this a read/write lock */ - pthread_mutex_t n_flows_lock; - - struct flow_set * set; - pthread_t n_reader; + pthread_t nm1_flow_acceptor; + struct nm1_flow ** nm1_flows; + pthread_rwlock_t nm1_flows_lock; + flow_set_t * nm1_set; + + struct np1_flow ** np1_flows; + struct np1_flow ** np1_flows_cep; + pthread_rwlock_t np1_flows_lock; + flow_set_t * np1_set; + pthread_t np1_sdu_reader; + + /* FIXME: Replace with PFF */ + int fd; } fmgr; -static int add_n_1_fd(int fd, char * ae_name) +static int add_nm1_fd(int fd, + char * ae_name, + enum qos_cube qos) { - struct n_1_flow * tmp; + struct nm1_flow * tmp; if (ae_name == NULL) return -1; @@ -89,45 +89,39 @@ static int add_n_1_fd(int fd, char * ae_name) tmp->fd = fd; tmp->ae_name = ae_name; + tmp->qos = qos; - INIT_LIST_HEAD(&tmp->next); + pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); + fmgr.nm1_flows[fd] = tmp; + pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - pthread_mutex_lock(&fmgr.n_1_flows_lock); - list_add(&tmp->next, &fmgr.n_1_flows); - pthread_mutex_unlock(&fmgr.n_1_flows_lock); + /* FIXME: Temporary, until we have a PFF */ + fmgr.fd = fd; return 0; } -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_fd(int fd) +static int add_np1_fd(int fd, + cep_id_t cep_id, + enum qos_cube qos) { - struct list_head * pos = NULL; + struct np1_flow * flow; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->fd == fd) - return e; - } - - return NULL; -} + flow = malloc(sizeof(*flow)); + if (flow == NULL) + return -1; -/* Call under n_flows lock */ -static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id) -{ - struct list_head * pos = NULL; + flow->cep_id = cep_id; + flow->qos = qos; + flow->fd = fd; - list_for_each(pos, &fmgr.n_flows) { - struct n_flow * e = list_entry(pos, struct n_flow, next); - if (e->cep_id == cep_id) - return e; - } + fmgr.np1_flows[fd] = flow; + fmgr.np1_flows_cep[fd] = flow; - return NULL; + return 0; } -static void * fmgr_n_1_acceptor(void * o) +static void * fmgr_nm1_acceptor(void * o) { int fd; char * ae_name; @@ -175,16 +169,8 @@ static void * fmgr_n_1_acceptor(void * o) } } - if (strcmp(ae_name, DT_AE) == 0) { - /* FIXME: Pass correct QoS cube */ - if (rmt_dt_flow(fd, 0)) { - LOG_ERR("Failed to hand fd to FRCT."); - flow_dealloc(fd); - continue; - } - } - - if (add_n_1_fd(fd, ae_name)) { + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { LOG_ERR("Failed to add file descriptor to list."); flow_dealloc(fd); continue; @@ -194,14 +180,14 @@ static void * fmgr_n_1_acceptor(void * o) return (void *) 0; } -static void * fmgr_n_reader(void * o) +static void * fmgr_np1_sdu_reader(void * o) { struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct n_flow * flow; + struct np1_flow * flow; while (true) { - int fd = flow_select(fmgr.set, &timeout); + int fd = flow_select(fmgr.np1_set, &timeout); if (fd == -ETIMEDOUT) continue; @@ -215,172 +201,194 @@ static void * fmgr_n_reader(void * o) continue; } - pthread_mutex_lock(&fmgr.n_flows_lock); - flow = get_n_flow_by_fd(fd); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to retrieve flow."); continue; } if (frct_i_write_sdu(flow->cep_id, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); ipcp_flow_del(sdb); LOG_ERR("Failed to hand SDU to FRCT."); continue; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); } return (void *) 0; } -int fmgr_init() +void * fmgr_nm1_sdu_reader(void * o) { - INIT_LIST_HEAD(&fmgr.n_1_flows); - INIT_LIST_HEAD(&fmgr.n_flows); - - pthread_mutex_init(&fmgr.n_1_flows_lock, NULL); - pthread_mutex_init(&fmgr.n_flows_lock, NULL); + struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; + struct shm_du_buff * sdb; + struct pci * pci; - fmgr.set = flow_set_create(); - if (fmgr.set == NULL) - return -1; + while (true) { + int fd = flow_select(fmgr.nm1_set, &timeout); + if (fd == -ETIMEDOUT) + continue; - pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL); - pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL); + if (fd < 0) { + LOG_ERR("Failed to get active fd."); + continue; + } - return 0; -} + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } -int fmgr_fini() -{ - struct list_head * pos = NULL; + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - pthread_cancel(fmgr.n_1_flow_acceptor); - pthread_cancel(fmgr.n_reader); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); - pthread_join(fmgr.n_1_flow_acceptor, NULL); - pthread_join(fmgr.n_reader, NULL); + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - list_for_each(pos, &fmgr.n_1_flows) { - struct n_1_flow * e = list_entry(pos, struct n_1_flow, next); - if (e->ae_name != NULL) - free(e->ae_name); - if (ribmgr_remove_flow(e->fd)) - LOG_ERR("Failed to remove management flow."); - } + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ + ipcp_flow_del(sdb); + free(pci); + continue; + } - pthread_mutex_destroy(&fmgr.n_1_flows_lock); - pthread_mutex_destroy(&fmgr.n_flows_lock); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); + ipcp_flow_del(sdb); + free(pci); + continue; + } - flow_set_destroy(fmgr.set); + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + } - return 0; + return (void *) 0; } -int fmgr_mgmt_flow(char * dst_name) +int fmgr_init() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(MGMT_AE); - if (ae_name == NULL) + fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.nm1_flows == NULL) return -1; - /* FIXME: Request retransmission. */ - fd = flow_alloc(dst_name, MGMT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); + fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows == NULL) { + free(fmgr.nm1_flows); return -1; } - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); + fmgr.np1_flows_cep = + malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); + if (fmgr.np1_flows_cep == NULL) { + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (ribmgr_add_flow(fd)) { - LOG_ERR("Failed to hand file descriptor to RIB manager"); - flow_dealloc(fd); - free(ae_name); + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + fmgr.nm1_flows[i] = NULL; + fmgr.np1_flows[i] = NULL; + fmgr.np1_flows_cep[i] = NULL; + } + + pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL); + pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); + + fmgr.np1_set = flow_set_create(); + if (fmgr.np1_set == NULL) { + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); + fmgr.nm1_set = flow_set_create(); + if (fmgr.nm1_set == NULL) { + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return -1; } + pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); + pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); + return 0; } -int fmgr_dt_flow(char * dst_name, enum qos_cube qos) +int fmgr_fini() { - int fd; - int result; - char * ae_name; + int i; - ae_name = strdup(DT_AE); - if (ae_name == NULL) - return -1; + pthread_cancel(fmgr.nm1_flow_acceptor); + pthread_cancel(fmgr.np1_sdu_reader); - /* FIXME: Map qos cube on correct QoS. */ - fd = flow_alloc(dst_name, DT_AE, NULL); - if (fd < 0) { - LOG_ERR("Failed to allocate flow to %s", dst_name); - free(ae_name); - return -1; - } + pthread_join(fmgr.nm1_flow_acceptor, NULL); + pthread_join(fmgr.np1_sdu_reader, NULL); - result = flow_alloc_res(fd); - if (result < 0) { - LOG_ERR("Result of flow allocation to %s is %d", - dst_name, result); - free(ae_name); - return -1; + for (i = 0; i < IRMD_MAX_FLOWS; i++) { + if (fmgr.nm1_flows[i] == NULL) + continue; + if (fmgr.nm1_flows[i]->ae_name != NULL) + free(fmgr.nm1_flows[i]->ae_name); + if (ribmgr_remove_flow(fmgr.nm1_flows[i]->fd)) + LOG_ERR("Failed to remove management flow."); } - if (rmt_dt_flow(fd, qos)) { - LOG_ERR("Failed to hand file descriptor to FRCT"); - flow_dealloc(fd); - free(ae_name); - return -1; - } + pthread_rwlock_destroy(&fmgr.nm1_flows_lock); + pthread_rwlock_destroy(&fmgr.np1_flows_lock); - if (add_n_1_fd(fd, ae_name)) { - LOG_ERR("Failed to add file descriptor to list."); - flow_dealloc(fd); - free(ae_name); - return -1; - } + flow_set_destroy(fmgr.nm1_set); + flow_set_destroy(fmgr.np1_set); + free(fmgr.np1_flows_cep); + free(fmgr.np1_flows); + free(fmgr.nm1_flows); return 0; } -int fmgr_flow_alloc(int fd, +int fmgr_np1_alloc(int fd, char * dst_ap_name, char * src_ae_name, enum qos_cube qos) { - struct n_flow * flow; cep_id_t cep_id; uint32_t address = 0; buffer_t buf; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - flow = malloc(sizeof(*flow)); - if (flow == NULL) - return -1; - /* FIXME: Obtain correct address here from DIF NSM */ msg.code = FLOW_ALLOC_CODE__FLOW_REQ; @@ -390,55 +398,47 @@ int fmgr_flow_alloc(int fd, msg.has_qos_cube = true; buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) { - free(flow); + if (buf.len == 0) return -1; - } buf.data = malloc(buf.len); - if (buf.data == NULL) { - free(flow); + if (buf.data == NULL) return -1; - } flow_alloc_msg__pack(&msg, buf.data); - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); cep_id = frct_i_create(address, &buf, qos); if (cep_id == INVALID_CEP_ID) { free(buf.data); - free(flow); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } free(buf.data); - flow->fd = fd; - flow->cep_id = cep_id; - flow->qos = qos; - - INIT_LIST_HEAD(&flow->next); - - list_add(&flow->next, &fmgr.n_flows); + if (add_np1_fd(fd, cep_id, qos)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + return -1; + } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -/* Call under n_flows lock */ -static int n_flow_dealloc(int fd) +/* Call under np1_flows lock */ +static int np1_flow_dealloc(int fd) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; int ret; - flow_set_del(fmgr.set, fd); + flow_set_del(fmgr.np1_set, fd); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) return -1; @@ -455,7 +455,9 @@ static int n_flow_dealloc(int fd) flow_alloc_msg__pack(&msg, buf.data); ret = frct_i_destroy(flow->cep_id, &buf); - list_del(&flow->next); + + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); free(buf.data); @@ -463,17 +465,17 @@ static int n_flow_dealloc(int fd) return ret; } -int fmgr_flow_alloc_resp(int fd, int response) +int fmgr_np1_alloc_resp(int fd, int response) { - struct n_flow * flow; + struct np1_flow * flow; flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; buffer_t buf; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_fd(fd); + flow = fmgr.np1_flows[fd]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -483,13 +485,13 @@ int fmgr_flow_alloc_resp(int fd, int response) buf.len = flow_alloc_msg__get_packed_size(&msg); if (buf.len == 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } buf.data = malloc(buf.len); if (buf.data == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } @@ -498,84 +500,76 @@ int fmgr_flow_alloc_resp(int fd, int response) if (response < 0) { frct_i_destroy(flow->cep_id, &buf); free(buf.data); - list_del(&flow->next); + fmgr.np1_flows[fd] = NULL; + fmgr.np1_flows_cep[flow->cep_id] = NULL; free(flow); } else { if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return -1; } - flow_set_add(fmgr.set, fd); + flow_set_add(fmgr.np1_set, fd); } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return 0; } -int fmgr_flow_dealloc(int fd) +int fmgr_np1_dealloc(int fd) { int ret; - pthread_mutex_lock(&fmgr.n_flows_lock); - ret = n_flow_dealloc(fd); - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + ret = np1_flow_dealloc(fd); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); return ret; } -int fmgr_frct_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, + buffer_t * buf) { - struct n_flow * flow; + struct np1_flow * flow; int ret = 0; int fd; flow_alloc_msg_t * msg; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_wrlock(&fmgr.np1_flows_lock); /* Depending on the message call the function in ipcp-dev.h */ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); if (msg == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to unpack flow alloc message"); return -1; } switch (msg->code) { case FLOW_ALLOC_CODE__FLOW_REQ: - flow = malloc(sizeof(*flow)); - if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - flow_alloc_msg__free_unpacked(msg, NULL); - return -1; - } - - flow->cep_id = cep_id; - flow->qos = msg->qos_cube; - fd = ipcp_flow_req_arr(getpid(), msg->dst_name, msg->src_ae_name); if (fd < 0) { - pthread_mutex_unlock(&fmgr.n_flows_lock); - free(flow); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("Failed to get fd for flow."); return -1; } - flow->fd = fd; - - INIT_LIST_HEAD(&flow->next); + if (add_np1_fd(fd, cep_id, msg->qos_cube)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + LOG_ERR("Failed to add np1 flow."); + return -1; + } - list_add(&flow->next, &fmgr.n_flows); break; case FLOW_ALLOC_CODE__FLOW_REPLY: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; @@ -583,23 +577,24 @@ int fmgr_frct_post_buf(cep_id_t cep_id, ret = ipcp_flow_alloc_reply(flow->fd, msg->response); if (msg->response < 0) { - list_del(&flow->next); + fmgr.np1_flows[flow->fd] = NULL; + fmgr.np1_flows_cep[cep_id] = NULL; free(flow); } else { - flow_set_add(fmgr.set, flow->fd); + flow_set_add(fmgr.np1_set, flow->fd); } break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); LOG_ERR("No such flow in flow manager."); return -1; } - flow_set_del(fmgr.set, flow->fd); + flow_set_del(fmgr.np1_set, flow->fd); ret = flow_dealloc(flow->fd); break; @@ -609,34 +604,160 @@ int fmgr_frct_post_buf(cep_id_t cep_id, break; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); flow_alloc_msg__free_unpacked(msg, NULL); return ret; } -int fmgr_frct_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) +int fmgr_np1_post_sdu(cep_id_t cep_id, + struct shm_du_buff * sdb) { - struct n_flow * flow; + struct np1_flow * flow; - pthread_mutex_lock(&fmgr.n_flows_lock); + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = get_n_flow_by_cep_id(cep_id); + flow = fmgr.np1_flows_cep[cep_id]; if (flow == NULL) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to find N flow."); return -1; } if (ipcp_flow_write(flow->fd, sdb)) { - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); LOG_ERR("Failed to hand SDU to N flow."); return -1; } - pthread_mutex_unlock(&fmgr.n_flows_lock); + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + + return 0; +} + +int fmgr_nm1_mgmt_flow(char * dst_name) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(MGMT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Request retransmission. */ + fd = flow_alloc(dst_name, MGMT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (ribmgr_add_flow(fd)) { + LOG_ERR("Failed to hand file descriptor to RIB manager"); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + /* FIXME: Pass correct QoS cube */ + if (add_nm1_fd(fd, ae_name, QOS_CUBE_BE)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + return -1; + } + + return 0; +} + +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos) +{ + int fd; + int result; + char * ae_name; + + ae_name = strdup(DT_AE); + if (ae_name == NULL) + return -1; + + /* FIXME: Map qos cube on correct QoS. */ + fd = flow_alloc(dst_name, DT_AE, NULL); + if (fd < 0) { + LOG_ERR("Failed to allocate flow to %s", dst_name); + free(ae_name); + return -1; + } + + result = flow_alloc_res(fd); + if (result < 0) { + LOG_ERR("Result of flow allocation to %s is %d", + dst_name, result); + free(ae_name); + return -1; + } + + if (add_nm1_fd(fd, ae_name, qos)) { + LOG_ERR("Failed to add file descriptor to list."); + flow_dealloc(fd); + free(ae_name); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_sdu(struct pci * pci, + struct shm_du_buff * sdb) +{ + if (pci == NULL || sdb == NULL) + return -1; + + if (shm_pci_ser(sdb, pci)) { + LOG_ERR("Failed to serialize PDU."); + ipcp_flow_del(sdb); + return -1; + } + + if (ipcp_flow_write(fmgr.fd, sdb)) { + LOG_ERR("Failed to write SDU to fd %d.", fmgr.fd); + ipcp_flow_del(sdb); + return -1; + } + + return 0; +} + +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf) +{ + buffer_t * buffer; + + if (pci == NULL || buf == NULL || buf->data == NULL) + return -1; + + buffer = shm_pci_ser_buf(buf, pci); + if (buffer == NULL) { + LOG_ERR("Failed to serialize buffer."); + free(buf->data); + return -1; + } + + if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) { + LOG_ERR("Failed to write buffer to fd."); + free(buffer); + return -1; + } + free(buffer); return 0; } diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h index 0f2cd045..f97cf858 100644 --- a/src/ipcpd/normal/fmgr.h +++ b/src/ipcpd/normal/fmgr.h @@ -37,29 +37,26 @@ int fmgr_init(); int fmgr_fini(); -/* N-flow ops */ -int fmgr_mgmt_flow(char * dst_name); - -int fmgr_dt_flow(char * dst_name, - enum qos_cube qos); - -/* N+1-flow ops, local */ -int fmgr_flow_alloc(int fd, - char * dst_ap_name, - char * src_ae_name, - enum qos_cube qos); - -int fmgr_flow_alloc_resp(int fd, - int response); - -int fmgr_flow_dealloc(int fd); - -/* N+1-flow ops, remote */ -int fmgr_frct_post_buf(cep_id_t id, - buffer_t * buf); - -/* SDU for N+1-flow */ -int fmgr_frct_post_sdu(cep_id_t id, +int fmgr_np1_alloc(int fd, + char * dst_ap_name, + char * src_ae_name, + enum qos_cube qos); +int fmgr_np1_alloc_resp(int fd, + int response); +int fmgr_np1_dealloc(int fd); + +int fmgr_np1_post_buf(cep_id_t id, + buffer_t * buf); +int fmgr_np1_post_sdu(cep_id_t id, + struct shm_du_buff * sdb); + +int fmgr_nm1_mgmt_flow(char * dst_name); +int fmgr_nm1_dt_flow(char * dst_name, + enum qos_cube qos); + +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb); +int fmgr_nm1_write_buf(struct pci * pci, + buffer_t * buf); #endif diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c index abbde779..9daf8755 100644 --- a/src/ipcpd/normal/frct.c +++ b/src/ipcpd/normal/frct.c @@ -32,8 +32,8 @@ #include <pthread.h> #include "frct.h" -#include "rmt.h" #include "fmgr.h" +#include "ribmgr.h" enum conn_state { CONN_PENDING = 0, @@ -51,8 +51,6 @@ struct frct_i { }; struct { - uint32_t address; - pthread_mutex_t instances_lock; struct frct_i ** instances; @@ -82,10 +80,9 @@ static int release_cep_id(int id) return ret; } -int frct_init(uint32_t address) +int frct_init() { int i; - frct.address = address; if (pthread_mutex_init(&frct.cep_ids_lock, NULL)) return -1; @@ -142,7 +139,7 @@ static struct frct_i * create_frct_i(uint32_t address, return instance; } -int frct_rmt_post_sdu(struct pci * pci, +int frct_nm1_post_sdu(struct pci * pci, struct shm_du_buff * sdb) { struct frct_i * instance; @@ -167,14 +164,14 @@ int frct_rmt_post_sdu(struct pci * pci, buf.len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); buf.data = shm_du_buff_head(sdb); - if (fmgr_frct_post_buf(id, &buf)) { + if (fmgr_np1_post_buf(id, &buf)) { LOG_ERR("Failed to hand buffer to FMGR."); free(pci); return -1; } } else { /* FIXME: Known cep-ids are delivered to FMGR (minimal DTP) */ - if (fmgr_frct_post_sdu(pci->dst_cep_id, sdb)) { + if (fmgr_np1_post_sdu(pci->dst_cep_id, sdb)) { LOG_ERR("Failed to hand SDU to FMGR."); free(pci); return -1; @@ -217,15 +214,15 @@ cep_id_t frct_i_create(uint32_t address, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = 0; pci.src_cep_id = id; pci.seqno = 0; pci.qos_id = cube; - if (rmt_frct_write_buf(&pci, buf)) { + if (fmgr_nm1_write_buf(&pci, buf)) { free(instance); - LOG_ERR("Failed to hand PDU to RMT."); + LOG_ERR("Failed to hand PDU to FMGR."); return INVALID_CEP_ID; } @@ -262,7 +259,7 @@ int frct_i_accept(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -270,7 +267,7 @@ int frct_i_accept(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -299,7 +296,7 @@ int frct_i_destroy(cep_id_t id, pci.pdu_type = PDU_TYPE_MGMT; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = 0; @@ -309,7 +306,7 @@ int frct_i_destroy(cep_id_t id, pthread_mutex_unlock(&frct.instances_lock); if (buf != NULL && buf->data != NULL) - if (rmt_frct_write_buf(&pci, buf)) + if (fmgr_nm1_write_buf(&pci, buf)) return -1; return 0; @@ -341,15 +338,15 @@ int frct_i_write_sdu(cep_id_t id, pci.pdu_type = PDU_TYPE_DTP; pci.dst_addr = instance->r_address; - pci.src_addr = frct.address; + pci.src_addr = ribmgr_address(); pci.dst_cep_id = instance->r_cep_id; pci.src_cep_id = instance->cep_id; pci.seqno = (instance->seqno)++; pci.qos_id = instance->cube; - if (rmt_frct_write_sdu(&pci, sdb)) { + if (fmgr_nm1_write_sdu(&pci, sdb)) { pthread_mutex_unlock(&frct.instances_lock); - LOG_ERR("Failed to hand SDU to RMT."); + LOG_ERR("Failed to hand SDU to FMGR."); return -1; } diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h index 2b86f5bd..b9e70d0f 100644 --- a/src/ipcpd/normal/frct.h +++ b/src/ipcpd/normal/frct.h @@ -30,13 +30,9 @@ struct frct_i; -int frct_init(uint32_t address); +int frct_init(); int frct_fini(); -/* Called by RMT upon receipt of a PDU for us */ -int frct_rmt_post_sdu(struct pci * pci, - struct shm_du_buff * sdb); - cep_id_t frct_i_create(uint32_t address, buffer_t * buf, enum qos_cube cube); @@ -51,4 +47,7 @@ int frct_i_destroy(cep_id_t id, int frct_i_write_sdu(cep_id_t id, struct shm_du_buff * sdb); +int frct_nm1_post_sdu(struct pci * pci, + struct shm_du_buff * sdb); + #endif diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index 4611408d..0339eaf4 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -109,7 +109,7 @@ static int normal_ipcp_enroll(char * dif_name) pthread_rwlock_unlock(&ipcpi.state_lock); - if (fmgr_mgmt_flow(dif_name)) { + if (fmgr_nm1_mgmt_flow(dif_name)) { LOG_ERR("Failed to establish management flow."); return -1; } @@ -163,9 +163,9 @@ static struct ipcp_ops normal_ops = { .ipcp_enroll = normal_ipcp_enroll, .ipcp_name_reg = normal_ipcp_name_reg, .ipcp_name_unreg = normal_ipcp_name_unreg, - .ipcp_flow_alloc = fmgr_flow_alloc, - .ipcp_flow_alloc_resp = fmgr_flow_alloc_resp, - .ipcp_flow_dealloc = fmgr_flow_dealloc + .ipcp_flow_alloc = fmgr_np1_alloc, + .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp, + .ipcp_flow_dealloc = fmgr_np1_dealloc }; int main(int argc, char * argv[]) diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index dd17f9bd..c69a59ce 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -39,7 +39,6 @@ #include "frct.h" #include "ipcp.h" #include "cdap_request.h" -#include "rmt.h" #include "static_info.pb-c.h" typedef StaticInfoMsg static_info_msg_t; @@ -242,7 +241,7 @@ int ribmgr_cdap_write(struct cdap * instance, rib.address = msg->address; - if (frct_init(rib.address)) { + if (frct_init()) { ipcp_set_state(IPCP_INIT); pthread_rwlock_unlock(&ipcpi.state_lock); cdap_send_reply(instance, invoke_id, -1, NULL, 0); @@ -251,16 +250,6 @@ int ribmgr_cdap_write(struct cdap * instance, return -1; } - if (rmt_init(rib.address)) { - ipcp_set_state(IPCP_INIT); - pthread_rwlock_unlock(&ipcpi.state_lock); - frct_fini(); - cdap_send_reply(instance, invoke_id, -1, NULL, 0); - static_info_msg__free_unpacked(msg, NULL); - LOG_ERR("Failed to init RMT"); - return -1; - } - static_info_msg__free_unpacked(msg, NULL); } else { ret = -1; @@ -540,17 +529,11 @@ int ribmgr_bootstrap(struct dif_config * conf) /* FIXME: Set correct address. */ rib.address = 0; - if (frct_init(rib.address)) { + if (frct_init()) { LOG_ERR("Failed to initialize FRCT."); return -1; } - if (rmt_init(rib.address)) { - LOG_ERR("Failed to initialize RMT."); - frct_fini(); - return -1; - } - LOG_DBG("Bootstrapped RIB Manager."); return 0; @@ -560,3 +543,8 @@ struct dt_const * ribmgr_dt_const() { return &(rib.dtc); } + +uint32_t ribmgr_address() +{ + return rib.address; +} diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h index f776f7eb..ed8bae03 100644 --- a/src/ipcpd/normal/ribmgr.h +++ b/src/ipcpd/normal/ribmgr.h @@ -35,6 +35,11 @@ int ribmgr_remove_flow(int fd); int ribmgr_bootstrap(struct dif_config * conf); +/* + * FIXME: Should we expose the RIB? + * Else we may end up with a lot of getters and setters + */ struct dt_const * ribmgr_dt_const(); +uint32_t ribmgr_address(); #endif diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c deleted file mode 100644 index fa4c7edd..00000000 --- a/src/ipcpd/normal/rmt.c +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#define OUROBOROS_PREFIX "flow-manager" - -#include <ouroboros/config.h> -#include <ouroboros/logs.h> -#include <ouroboros/select.h> -#include <ouroboros/ipcp-dev.h> -#include <ouroboros/errno.h> -#include <ouroboros/dev.h> - -#include <stdlib.h> - -#include "rmt.h" -#include "config.h" -#include "frct.h" - -struct { - pthread_t sdu_reader; - struct flow_set * set; - uint32_t address; - - /* - * FIXME: Normally the PFF is held here, - * for now we keep 1 fd to forward a PDU on - */ - int fd; -} rmt; - -int rmt_init(uint32_t address) -{ - rmt.set = flow_set_create(); - if (rmt.set == NULL) - return -1; - - rmt.address = address; - - return 0; -} - -int rmt_fini() -{ - flow_set_destroy(rmt.set); - - return 0; -} - -void * rmt_sdu_reader(void * o) -{ - struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; - struct shm_du_buff * sdb; - struct pci * pci; - - while (true) { - int fd = flow_select(rmt.set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); - continue; - } - - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } - - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } - - if (pci->dst_addr != rmt.address) { - LOG_DBG("PDU needs to be forwarded."); - - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (frct_rmt_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - } - - return (void *) 0; -} - -int rmt_dt_flow(int fd, - enum qos_cube qos) -{ - struct flow_set * set = rmt.set; - if (set == NULL) - return -1; - - flow_set_add(set, fd); - - /* FIXME: This will be removed once we have a PFF */ - rmt.fd = fd; - - return 0; -} - -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb) -{ - if (shm_pci_ser(sdb, pci)) { - LOG_ERR("Failed to serialize PDU."); - ipcp_flow_del(sdb); - return -1; - } - - if (ipcp_flow_write(rmt.fd, sdb)) { - LOG_ERR("Failed to write SDU to fd %d.", rmt.fd); - ipcp_flow_del(sdb); - return -1; - } - - return 0; -} - -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf) -{ - buffer_t * buffer; - - if (pci == NULL || buf == NULL || buf->data == NULL) - return -1; - - buffer = shm_pci_ser_buf(buf, pci); - if (buffer == NULL) { - LOG_ERR("Failed to serialize buffer."); - free(buf->data); - return -1; - } - - if (flow_write(rmt.fd, buffer->data, buffer->len) == -1) { - LOG_ERR("Failed to write buffer to fd."); - free(buffer); - return -1; - } - - free(buffer); - return 0; -} diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h deleted file mode 100644 index 6ce7a7d7..00000000 --- a/src/ipcpd/normal/rmt.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * The Relaying and Multiplexing task - * - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCP_RMT_H -#define OUROBOROS_IPCP_RMT_H - -#include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/utils.h> - -#include "dt_const.h" -#include "shm_pci.h" - -int rmt_init(uint32_t address); -int rmt_fini(); - -int rmt_dt_flow(int fd, - enum qos_cube qos); - -/* Hand PDU to RMT, SDU from N+1 */ -int rmt_frct_write_sdu(struct pci * pci, - struct shm_du_buff * sdb); - -/* Hand PDU to RMT, SDU from N */ -int rmt_frct_write_buf(struct pci * pci, - buffer_t * buf); - -#endif |