summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fa.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-04-21 11:21:44 +0200
committerSander Vrijders <[email protected]>2017-04-21 12:46:37 +0200
commita9d71381a84886007625958b9daea6b2d4a50563 (patch)
tree67b8576e9747d7815c7eed7170f49a10e5a4e0e0 /src/ipcpd/normal/fa.c
parent4bfd6c07281847405e127e9588376fcf20d07a7e (diff)
downloadouroboros-a9d71381a84886007625958b9daea6b2d4a50563.tar.gz
ouroboros-a9d71381a84886007625958b9daea6b2d4a50563.zip
ipcpd: normal: Split flow manager into DT and FA
This splits the flow manager into the Data Transfer AE, which is in charge of routing SDUs, and the Flow Allocator AE, which handles flow allocations.
Diffstat (limited to 'src/ipcpd/normal/fa.c')
-rw-r--r--src/ipcpd/normal/fa.c438
1 files changed, 438 insertions, 0 deletions
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
new file mode 100644
index 00000000..be1080b1
--- /dev/null
+++ b/src/ipcpd/normal/fa.c
@@ -0,0 +1,438 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow allocator of the IPC Process
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "flow-allocator"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+
+#include "fa.h"
+#include "sdu_sched.h"
+#include "ipcp.h"
+#include "ribconfig.h"
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
+#define TIMEOUT 10000 /* nanoseconds */
+
+struct {
+ pthread_rwlock_t flows_lock;
+ cep_id_t fd_to_cep_id[AP_MAX_FLOWS];
+ int cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ flow_set_t * set[QOS_CUBE_MAX];
+ struct sdu_sched * sdu_sched;
+} fa;
+
+static int sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ (void) qc;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ if (frct_i_write_sdu(fa.fd_to_cep_id[fd], sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_flow_del(sdb);
+ log_warn("Failed to hand SDU to FRCT.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+int fa_init(void)
+{
+ int i;
+
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fa.fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fa.cep_id_to_fd[i] = -1;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fa.set[i] = flow_set_create();
+ if (fa.set[i] == NULL)
+ goto fail_flows;
+ }
+
+ if (pthread_rwlock_init(&fa.flows_lock, NULL))
+ goto fail_flows;
+
+ return 0;
+fail_flows:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ return -1;
+}
+
+void fa_fini(void)
+{
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ flow_set_destroy(fa.set[i]);
+
+ pthread_rwlock_destroy(&fa.flows_lock);
+}
+
+int fa_start(void)
+{
+ fa.sdu_sched = sdu_sched_create(fa.set, sdu_handler);
+ if (fa.sdu_sched == NULL) {
+ log_err("Failed to create SDU scheduler.");
+ return -1;
+ }
+
+ return 0;
+}
+
+void fa_stop(void)
+{
+ sdu_sched_destroy(fa.sdu_sched);
+}
+
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qoscube_t qc)
+{
+ cep_id_t cep_id;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ char path[RIB_MAX_PATH_LEN + 1];
+ uint64_t addr;
+ ssize_t ch;
+ ssize_t i;
+ char ** children;
+ char hashstr[ipcp_dir_hash_strlen() + 1];
+ char * dst_ipcp = NULL;
+
+ ipcp_hash_str(hashstr, dst);
+
+ assert(strlen(hashstr) + strlen(DIR_PATH) + 1
+ < RIB_MAX_PATH_LEN);
+
+ strcpy(path, DIR_PATH);
+
+ rib_path_append(path, hashstr);
+
+ ch = rib_children(path, &children);
+ if (ch <= 0)
+ return -1;
+
+ for (i = 0; i < ch; ++i)
+ if (dst_ipcp == NULL && strcmp(children[i], ipcpi.name) != 0)
+ dst_ipcp = children[i];
+ else
+ free(children[i]);
+
+ free(children);
+
+ if (dst_ipcp == NULL)
+ return -1;
+
+ strcpy(path, MEMBERS_PATH);
+
+ rib_path_append(path, dst_ipcp);
+
+ free(dst_ipcp);
+
+ if (rib_read(path, &addr, sizeof(addr)) < 0)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.has_hash = true;
+ msg.hash.len = ipcp_dir_hash_len();
+ msg.hash.data = (uint8_t *) dst;
+ msg.has_qoscube = true;
+ msg.qoscube = qc;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ cep_id = frct_i_create(addr, &buf, qc);
+ if (cep_id == INVALID_CEP_ID) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+
+ free(buf.data);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}
+
+/* Call under flows lock */
+static int fa_flow_dealloc(int fd)
+{
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+ qoscube_t qc;
+
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]] = -1;
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+
+ free(buf.data);
+
+ return ret;
+}
+
+int fa_alloc_resp(int fd,
+ int response)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ ipcpi.alloc_id = -1;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -ENOMEM;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ if (response < 0) {
+ frct_i_destroy(fa.fd_to_cep_id[fd], &buf);
+ free(buf.data);
+ fa.cep_id_to_fd[fa.fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fa.fd_to_cep_id[fd] = -1;
+ } else {
+ qoscube_t qc;
+ ipcp_flow_get_qoscube(fd, &qc);
+ if (frct_i_accept(fa.fd_to_cep_id[fd], &buf, qc)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ free(buf.data);
+ return -1;
+ }
+ flow_set_add(fa.set[qc], fd);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ free(buf.data);
+
+ return 0;
+}
+
+int fa_dealloc(int fd)
+{
+ int ret;
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ret = fa_flow_dealloc(fd);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return ret;
+}
+
+int fa_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
+{
+ struct timespec ts = {0, TIMEOUT * 1000};
+ int ret = 0;
+ int fd;
+ flow_alloc_msg_t * msg;
+ qoscube_t qc;
+
+ /* Depending on the message call the function in ipcp-dev.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ log_err("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+ pthread_mutex_lock(&ipcpi.alloc_lock);
+
+ if (!msg->has_hash) {
+ log_err("Bad flow request.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ while (ipcpi.alloc_id != -1 &&
+ ipcp_get_state() == IPCP_OPERATIONAL)
+ pthread_cond_timedwait(&ipcpi.alloc_cond,
+ &ipcpi.alloc_lock,
+ &ts);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ log_dbg("Won't allocate over non-operational IPCP.");
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ return -1;
+ }
+
+ assert(ipcpi.alloc_id == -1);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->hash.data,
+ ipcp_dir_hash_len(),
+ msg->qoscube);
+ if (fd < 0) {
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ log_err("Failed to get fd for flow.");
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa.fd_to_cep_id[fd] = cep_id;
+ fa.cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ipcpi.alloc_id = fd;
+ pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+ pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
+ if (msg->response < 0) {
+ fa.fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fa.cep_id_to_fd[cep_id] = -1;
+ } else {
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_add(fa.set[qc],
+ fa.cep_id_to_fd[cep_id]);
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ fd = fa.cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &qc);
+ flow_set_del(fa.set[qc], fd);
+ ret = flow_dealloc(fd);
+ break;
+ default:
+ log_err("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
+
+ return ret;
+}
+
+int fa_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ fd = fa.cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ log_err("Failed to hand SDU to N flow.");
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ return 0;
+}