diff options
author | Sander Vrijders <[email protected]> | 2017-04-27 19:13:29 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2017-04-28 13:08:17 +0200 |
commit | 9177b0f3f72203cb6e18ee59c98b531a698d7f19 (patch) | |
tree | 524e72cf30f94613df32f06d5ec7bb9041fd11dc /src/ipcpd/normal/fa.c | |
parent | 1f8f2ebe3bb385593755b69bd264ff5f831a22ae (diff) | |
download | ouroboros-9177b0f3f72203cb6e18ee59c98b531a698d7f19.tar.gz ouroboros-9177b0f3f72203cb6e18ee59c98b531a698d7f19.zip |
ipcpd: normal: Split connection establishment
Connection establishment was done at the same time as flow
allocation. This splits it more cleanly, and allows to re-use the DT
AE for other purposes.
Diffstat (limited to 'src/ipcpd/normal/fa.c')
-rw-r--r-- | src/ipcpd/normal/fa.c | 271 |
1 files changed, 162 insertions, 109 deletions
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index 131100db..b116c842 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -28,11 +28,14 @@ #include <ouroboros/rib.h> #include <ouroboros/errno.h> #include <ouroboros/dev.h> +#include <ouroboros/ipcp-dev.h> +#include "dt_pci.h" #include "fa.h" #include "sdu_sched.h" #include "ipcp.h" #include "ribconfig.h" +#include "dt.h" #include <pthread.h> #include <stdlib.h> @@ -108,20 +111,45 @@ void fa_stop(void) sdu_sched_destroy(fa.sdu_sched); } +static struct shm_du_buff * create_fa_sdb(flow_alloc_msg_t * msg) +{ + struct shm_du_buff * sdb; + size_t len; + + len = flow_alloc_msg__get_packed_size(msg); + if (len == 0) + return NULL; + + if (ipcp_sdb_reserve(&sdb, len)) + return NULL; + + flow_alloc_msg__pack(msg, shm_du_buff_head(sdb)); + + return sdb; +} + +static void destroy_conn(int fd, + cep_id_t cep_id) +{ + fa.fd_to_cep_id[fd] = INVALID_CEP_ID; + fa.cep_id_to_fd[cep_id] = -1; + frct_i_destroy(cep_id); +} + int fa_alloc(int fd, const uint8_t * dst, qoscube_t qc) { - cep_id_t cep_id; - buffer_t buf; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - char path[RIB_MAX_PATH_LEN + 1]; - uint64_t addr; - ssize_t ch; - ssize_t i; - char ** children; - char hashstr[ipcp_dir_hash_strlen() + 1]; - char * dst_ipcp = NULL; + cep_id_t cep_id; + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + char path[RIB_MAX_PATH_LEN + 1]; + uint64_t addr; + ssize_t ch; + ssize_t i; + char ** children; + char hashstr[ipcp_dir_hash_strlen() + 1]; + char * dst_ipcp = NULL; + struct shm_du_buff * sdb; ipcp_hash_str(hashstr, dst); @@ -156,34 +184,36 @@ int fa_alloc(int fd, if (rib_read(path, &addr, sizeof(addr)) < 0) return -1; - msg.code = FLOW_ALLOC_CODE__FLOW_REQ; - msg.has_hash = true; - msg.hash.len = ipcp_dir_hash_len(); - msg.hash.data = (uint8_t *) dst; - msg.has_qoscube = true; - msg.qoscube = qc; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) + cep_id = frct_i_create(addr, qc); + if (cep_id == INVALID_CEP_ID) return -1; - buf.data = malloc(buf.len); - if (buf.data == NULL) + msg.code = FLOW_ALLOC_CODE__FLOW_REQ; + msg.has_hash = true; + msg.hash.len = ipcp_dir_hash_len(); + msg.hash.data = (uint8_t *) dst; + msg.has_qc = true; + msg.qc = qc; + msg.has_s_cep_id = true; + msg.s_cep_id = cep_id; + msg.has_s_addr = true; + msg.s_addr = ipcpi.dt_addr; + + sdb = create_fa_sdb(&msg); + if (sdb == NULL) { + frct_i_destroy(cep_id); return -1; - - flow_alloc_msg__pack(&msg, buf.data); + } pthread_rwlock_wrlock(&fa.flows_lock); - cep_id = frct_i_create(addr, &buf, qc); - if (cep_id == INVALID_CEP_ID) { + if (dt_write_sdu(addr, qc, PDU_TYPE_FA, sdb)) { + frct_i_destroy(cep_id); pthread_rwlock_unlock(&fa.flows_lock); - free(buf.data); + ipcp_sdb_release(sdb); return -1; } - free(buf.data); - fa.fd_to_cep_id[fd] = cep_id; fa.cep_id_to_fd[cep_id] = fd; @@ -192,47 +222,13 @@ int fa_alloc(int fd, return 0; } -/* Call under flows lock */ -static int fa_flow_dealloc(int fd) -{ - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - int ret; - - sdu_sched_del(fa.sdu_sched, fd); - - msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; - - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; - - flow_alloc_msg__pack(&msg, buf.data); - - ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf); - - fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1; - fa.fd_to_cep_id[fd] = INVALID_CEP_ID; - - free(buf.data); - - return ret; -} - int fa_alloc_resp(int fd, int response) { - struct timespec ts = {0, TIMEOUT * 1000}; - flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; - buffer_t buf; - - msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; - msg.response = response; - msg.has_response = true; + struct timespec ts = {0, TIMEOUT * 1000}; + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + struct shm_du_buff * sdb; + qoscube_t qc; pthread_mutex_lock(&ipcpi.alloc_lock); @@ -251,66 +247,102 @@ int fa_alloc_resp(int fd, pthread_mutex_unlock(&ipcpi.alloc_lock); - buf.len = flow_alloc_msg__get_packed_size(&msg); - if (buf.len == 0) - return -1; - - buf.data = malloc(buf.len); - if (buf.data == NULL) - return -ENOMEM; + pthread_rwlock_wrlock(&fa.flows_lock); - flow_alloc_msg__pack(&msg, buf.data); + msg.code = FLOW_ALLOC_CODE__FLOW_REPLY; + msg.has_cep_id = true; + msg.cep_id = frct_i_get_id(fa.fd_to_cep_id[fd]); + msg.s_cep_id = fa.fd_to_cep_id[fd]; + msg.has_s_cep_id = true; + msg.response = response; + msg.has_response = true; - pthread_rwlock_wrlock(&fa.flows_lock); + sdb = create_fa_sdb(&msg); + if (sdb == NULL) { + destroy_conn(fd, fa.fd_to_cep_id[fd]); + pthread_rwlock_unlock(&fa.flows_lock); + return -1; + } if (response < 0) { - frct_i_destroy(fa.fd_to_cep_id[fd], &buf); - free(buf.data); - fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] - = INVALID_CEP_ID; - fa.fd_to_cep_id[fd] = -1; + destroy_conn(fd, fa.fd_to_cep_id[fd]); + ipcp_sdb_release(sdb); } else { - qoscube_t qc; - ipcp_flow_get_qoscube(fd, &qc); - if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) { - pthread_rwlock_unlock(&fa.flows_lock); - free(buf.data); - return -1; - } sdu_sched_add(fa.sdu_sched, fd); } - pthread_rwlock_unlock(&fa.flows_lock); + ipcp_flow_get_qoscube(fd, &qc); - free(buf.data); + assert(qc >= 0 && qc < QOS_CUBE_MAX); + + if (dt_write_sdu(frct_i_get_addr(fa.fd_to_cep_id[fd]), + qc, + PDU_TYPE_FA, + sdb)) { + destroy_conn(fd, fa.fd_to_cep_id[fd]); + ipcp_sdb_release(sdb); + return -1; + } + + pthread_rwlock_unlock(&fa.flows_lock); return 0; } int fa_dealloc(int fd) { - int ret; + flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT; + struct shm_du_buff * sdb; + qoscube_t qc; pthread_rwlock_wrlock(&fa.flows_lock); - ret = fa_flow_dealloc(fd); + sdu_sched_del(fa.sdu_sched, fd); + + destroy_conn(fd, fa.fd_to_cep_id[fd]); + + msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; + msg.has_cep_id = true; + msg.cep_id = frct_i_get_id(fa.fd_to_cep_id[fd]); + + sdb = create_fa_sdb(&msg); + if (sdb == NULL) { + pthread_rwlock_unlock(&fa.flows_lock); + return -1; + } pthread_rwlock_unlock(&fa.flows_lock); - return ret; + ipcp_flow_get_qoscube(fd, &qc); + + assert(qc >= 0 && qc < QOS_CUBE_MAX); + + if (dt_write_sdu(frct_i_get_addr(fa.fd_to_cep_id[fd]), + qc, + PDU_TYPE_FA, + sdb)) { + ipcp_sdb_release(sdb); + return -1; + } + + return 0; } -int fa_post_buf(cep_id_t cep_id, - buffer_t * buf) +int fa_post_sdu(struct shm_du_buff * sdb) { struct timespec ts = {0, TIMEOUT * 1000}; - int ret = 0; int fd; flow_alloc_msg_t * msg; + cep_id_t cep_id; + + assert(sdb); /* Depending on the message call the function in ipcp-dev.h */ - msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data); + msg = flow_alloc_msg__unpack(NULL, + shm_du_buff_tail(sdb) - + shm_du_buff_head(sdb), + shm_du_buff_head(sdb)); if (msg == NULL) { log_err("Failed to unpack flow alloc message"); return -1; @@ -323,6 +355,7 @@ int fa_post_buf(cep_id_t cep_id, if (!msg->has_hash) { log_err("Bad flow request."); pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); return -1; } @@ -335,17 +368,33 @@ int fa_post_buf(cep_id_t cep_id, if (ipcp_get_state() != IPCP_OPERATIONAL) { log_dbg("Won't allocate over non-operational IPCP."); pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); return -1; } assert(ipcpi.alloc_id == -1); + cep_id = frct_i_create(msg->s_addr, msg->qc); + if (cep_id == INVALID_CEP_ID) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + + if (frct_i_set_id(cep_id, msg->s_cep_id)) { + pthread_mutex_unlock(&ipcpi.alloc_lock); + frct_i_destroy(cep_id); + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; + } + fd = ipcp_flow_req_arr(getpid(), msg->hash.data, ipcp_dir_hash_len(), - msg->qoscube); + msg->qc); if (fd < 0) { pthread_mutex_unlock(&ipcpi.alloc_lock); + frct_i_destroy(cep_id); flow_alloc_msg__free_unpacked(msg, NULL); log_err("Failed to get fd for flow."); return -1; @@ -367,39 +416,43 @@ int fa_post_buf(cep_id_t cep_id, case FLOW_ALLOC_CODE__FLOW_REPLY: pthread_rwlock_wrlock(&fa.flows_lock); - fd = fa.cep_id_to_fd[cep_id]; - ret = ipcp_flow_alloc_reply(fd, msg->response); + fd = fa.cep_id_to_fd[msg->cep_id]; + ipcp_flow_alloc_reply(fd, msg->response); if (msg->response < 0) { - fa.fd_to_cep_id[fd] = INVALID_CEP_ID; - fa.cep_id_to_fd[cep_id] = -1; + destroy_conn(fd, msg->cep_id); } else { - sdu_sched_add(fa.sdu_sched, fa.cep_id_to_fd[cep_id]); + frct_i_set_id(msg->cep_id, msg->s_cep_id); + sdu_sched_add(fa.sdu_sched, + fa.cep_id_to_fd[msg->cep_id]); } pthread_rwlock_unlock(&fa.flows_lock); break; case FLOW_ALLOC_CODE__FLOW_DEALLOC: - fd = fa.cep_id_to_fd[cep_id]; + fd = fa.cep_id_to_fd[msg->cep_id]; sdu_sched_del(fa.sdu_sched, fd); - ret = flow_dealloc(fd); + flow_dealloc(fd); break; default: log_err("Got an unknown flow allocation message."); - ret = -1; - break; + flow_alloc_msg__free_unpacked(msg, NULL); + return -1; } flow_alloc_msg__free_unpacked(msg, NULL); + ipcp_sdb_release(sdb); - return ret; + return 0; } -int fa_post_sdu(cep_id_t cep_id, - struct shm_du_buff * sdb) +int fa_post_sdu_user(cep_id_t cep_id, + struct shm_du_buff * sdb) { int fd; + assert(sdb); + pthread_rwlock_rdlock(&fa.flows_lock); fd = fa.cep_id_to_fd[cep_id]; |