summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-03-21 16:21:49 +0100
committerSander Vrijders <[email protected]>2017-03-21 16:21:49 +0100
commitfef50c3db0e02f0052f1759d508045c44fc4146e (patch)
treefc73859827a5dfebf5022fad37e826d98ba4046f /src/ipcpd/normal
parent4b257b249ea91d1ee7e2341c563bac561911e8a6 (diff)
parentd4e80d41197b75d2c351659c7e8d4546270e677d (diff)
downloadouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.tar.gz
ouroboros-fef50c3db0e02f0052f1759d508045c44fc4146e.zip
Merge branch 'be' of bitbucket.org:ouroboros-rina/ouroboros into be
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt16
-rw-r--r--src/ipcpd/normal/addr_auth.c4
-rw-r--r--src/ipcpd/normal/addr_auth.h4
-rw-r--r--src/ipcpd/normal/ae.h4
-rw-r--r--src/ipcpd/normal/cdap_flow.c150
-rw-r--r--src/ipcpd/normal/cdap_flow.h49
-rw-r--r--src/ipcpd/normal/connmgr.c358
-rw-r--r--src/ipcpd/normal/connmgr.h57
-rw-r--r--src/ipcpd/normal/dir.c4
-rw-r--r--src/ipcpd/normal/dir.h3
-rw-r--r--src/ipcpd/normal/dt_const.h4
-rw-r--r--src/ipcpd/normal/enroll.c330
-rw-r--r--src/ipcpd/normal/enroll.h13
-rw-r--r--src/ipcpd/normal/flow_alloc.proto5
-rw-r--r--src/ipcpd/normal/fmgr.c307
-rw-r--r--src/ipcpd/normal/fmgr.h8
-rw-r--r--src/ipcpd/normal/frct.c27
-rw-r--r--src/ipcpd/normal/frct.h5
-rw-r--r--src/ipcpd/normal/fso.proto29
-rw-r--r--src/ipcpd/normal/gam.c288
-rw-r--r--src/ipcpd/normal/gam.h26
-rw-r--r--src/ipcpd/normal/graph.c277
-rw-r--r--src/ipcpd/normal/graph.h67
-rw-r--r--src/ipcpd/normal/main.c137
-rw-r--r--src/ipcpd/normal/neighbors.c213
-rw-r--r--src/ipcpd/normal/neighbors.h81
-rw-r--r--src/ipcpd/normal/pff.c10
-rw-r--r--src/ipcpd/normal/pff.h8
-rw-r--r--src/ipcpd/normal/pol-addr-auth-ops.h4
-rw-r--r--src/ipcpd/normal/pol-gam-ops.h18
-rw-r--r--src/ipcpd/normal/pol/complete.c201
-rw-r--r--src/ipcpd/normal/pol/complete.h33
-rw-r--r--src/ipcpd/normal/pol/flat.c4
-rw-r--r--src/ipcpd/normal/pol/flat.h3
-rw-r--r--src/ipcpd/normal/ribconfig.h2
-rw-r--r--src/ipcpd/normal/ribmgr.c96
-rw-r--r--src/ipcpd/normal/ribmgr.h6
-rw-r--r--src/ipcpd/normal/routing.c298
-rw-r--r--src/ipcpd/normal/routing.h45
-rw-r--r--src/ipcpd/normal/shm_pci.c4
-rw-r--r--src/ipcpd/normal/shm_pci.h4
41 files changed, 2130 insertions, 1072 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 7e10cc0d..06292c50 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -15,32 +15,36 @@ include_directories(${CMAKE_BINARY_DIR}/include)
set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS flow_alloc.proto)
+protobuf_generate_c(FSO_SRCS FSO_HDRS fso.proto)
set(SOURCE_FILES
# Add source files here
addr_auth.c
- cdap_flow.c
+ connmgr.c
dir.c
enroll.c
fmgr.c
frct.c
gam.c
+ graph.c
main.c
+ neighbors.c
pff.c
ribmgr.c
+ routing.c
shm_pci.c
# Add policies last
pol/complete.c
pol/flat.c
)
-add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${FLOW_ALLOC_SRCS})
-target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros)
+add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
+ ${FLOW_ALLOC_SRCS} ${FSO_SRCS})
+target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)
-include(MacroAddCompileFlags)
+include(AddCompileFlags)
if (CMAKE_BUILD_TYPE MATCHES Debug)
- macro_add_compile_flags(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
+ add_compile_flags(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
endif (CMAKE_BUILD_TYPE MATCHES Debug)
install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)
diff --git a/src/ipcpd/normal/addr_auth.c b/src/ipcpd/normal/addr_auth.c
index 8469e95e..5b3c6170 100644
--- a/src/ipcpd/normal/addr_auth.c
+++ b/src/ipcpd/normal/addr_auth.c
@@ -3,8 +3,8 @@
*
* Address authority
*
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/addr_auth.h b/src/ipcpd/normal/addr_auth.h
index b389fa90..fbe7d790 100644
--- a/src/ipcpd/normal/addr_auth.h
+++ b/src/ipcpd/normal/addr_auth.h
@@ -3,8 +3,8 @@
*
* Address authority
*
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/ae.h b/src/ipcpd/normal/ae.h
index 882625dd..aafef625 100644
--- a/src/ipcpd/normal/ae.h
+++ b/src/ipcpd/normal/ae.h
@@ -3,8 +3,8 @@
*
* Application Entities for the normal IPC process
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/cdap_flow.c b/src/ipcpd/normal/cdap_flow.c
deleted file mode 100644
index a94627c2..00000000
--- a/src/ipcpd/normal/cdap_flow.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Normal IPC Process - Authenticated CDAP Flow Allocator
- *
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define OUROBOROS_PREFIX "cdap-flow"
-
-#include <ouroboros/config.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/logs.h>
-
-#include "cdap_flow.h"
-
-#include <stdlib.h>
-#include <assert.h>
-
-static void cdap_flow_destroy(struct cdap_flow * flow)
-{
- assert(flow);
-
- if (flow->ci != NULL)
- cdap_destroy(flow->ci);
- if (flow->info != NULL) {
- cacep_info_fini(flow->info);
- free(flow->info);
- }
-
- free(flow);
-}
-
-struct cdap_flow * cdap_flow_arr(int fd,
- int resp,
- enum pol_cacep pc,
- const struct cacep_info * info)
-{
- struct cdap_flow * flow;
-
- if (flow_alloc_resp(fd, resp) < 0) {
- log_err("Could not respond to new flow.");
- return NULL;
- }
-
- if (resp)
- return NULL;
-
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- log_err("Failed to malloc.");
- return NULL;
- }
-
- flow->fd = fd;
- flow->ci = NULL;
-
- flow->info = cacep_auth_wait(fd, pc, info);
- if (flow->info == NULL) {
- log_err("Other side failed to authenticate.");
- cdap_flow_destroy(flow);
- return NULL;
- }
-
- flow->ci = cdap_create(fd);
- if (flow->ci == NULL) {
- log_err("Failed to create CDAP instance.");
- cdap_flow_destroy(flow);
- return NULL;
- }
-
- return flow;
-}
-
-struct cdap_flow * cdap_flow_alloc(const char * dst_name,
- const char * ae_name,
- qosspec_t * qs,
- enum pol_cacep pc,
- const struct cacep_info * info)
-{
- struct cdap_flow * flow;
- int fd;
-
- log_dbg("Allocating flow to %s.", dst_name);
-
- if (dst_name == NULL || ae_name == NULL) {
- log_err("Not enough info to establish flow.");
- return NULL;
- }
-
- fd = flow_alloc(dst_name, ae_name, qs);
- if (fd < 0) {
- log_err("Failed to allocate flow to %s.", dst_name);
- return NULL;
- }
-
- if (flow_alloc_res(fd)) {
- log_err("Flow allocation to %s failed.", dst_name);
- return NULL;
- }
-
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- log_err("Failed to malloc.");
- flow_dealloc(fd);
- return NULL;
- }
-
- flow->fd = fd;
- flow->ci = NULL;
-
- flow->info = cacep_auth(fd, pc, info);
- if (flow->info == NULL) {
- log_err("Failed to authenticate.");
- cdap_flow_dealloc(flow);
- return NULL;
- }
-
- flow->ci = cdap_create(fd);
- if (flow->ci == NULL) {
- log_err("Failed to create CDAP instance.");
- cdap_flow_dealloc(flow);
- return NULL;
- }
-
- return flow;
-}
-
-void cdap_flow_dealloc(struct cdap_flow * flow)
-{
- int fd = flow->fd;
-
- cdap_flow_destroy(flow);
-
- flow_dealloc(fd);
-}
diff --git a/src/ipcpd/normal/cdap_flow.h b/src/ipcpd/normal/cdap_flow.h
deleted file mode 100644
index c5ca2ab4..00000000
--- a/src/ipcpd/normal/cdap_flow.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Normal IPC Process - Authenticated CDAP Flow Allocator
- *
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_FLOW_H
-#define OUROBOROS_IPCPD_NORMAL_CDAP_FLOW_H
-
-#include <ouroboros/cacep.h>
-#include <ouroboros/cdap.h>
-#include <ouroboros/qos.h>
-
-struct cdap_flow {
- int fd;
- struct cdap * ci;
- struct cacep_info * info;
-};
-
-struct cdap_flow * cdap_flow_arr(int fd,
- int resp,
- enum pol_cacep pc,
- const struct cacep_info * info);
-
-struct cdap_flow * cdap_flow_alloc(const char * dst_name,
- const char * ae_name,
- qosspec_t * qs,
- enum pol_cacep pc,
- const struct cacep_info * info);
-
-void cdap_flow_dealloc(struct cdap_flow * flow);
-
-#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_FLOW_H */
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
new file mode 100644
index 00000000..b97d2b23
--- /dev/null
+++ b/src/ipcpd/normal/connmgr.c
@@ -0,0 +1,358 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Handles AE connections
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "normal-ipcp"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/cacep.h>
+#include <ouroboros/cdap.h>
+#include <ouroboros/errno.h>
+
+#include "ae.h"
+#include "connmgr.h"
+#include "enroll.h"
+#include "fmgr.h"
+#include "frct.h"
+#include "ipcp.h"
+#include "ribmgr.h"
+
+#include <pthread.h>
+#include <string.h>
+#include <stdlib.h>
+#include <assert.h>
+
+struct ae_conn {
+ struct list_head next;
+ struct conn conn;
+};
+
+struct ae {
+ struct list_head next;
+ struct conn_info info;
+
+ struct list_head conn_list;
+ pthread_cond_t conn_cond;
+ pthread_mutex_t conn_lock;
+};
+
+struct {
+ pthread_t acceptor;
+
+ struct list_head aes;
+ pthread_mutex_t aes_lock;
+} connmgr;
+
+static int add_ae_conn(struct ae * ae,
+ int fd,
+ qosspec_t qs,
+ struct conn_info * rcv_info)
+{
+ struct ae_conn * ae_conn = NULL;
+
+ ae_conn = malloc(sizeof(*ae_conn));
+ if (ae_conn == NULL) {
+ log_err("Not enough memory.");
+ return -1;
+ }
+
+ ae_conn->conn.conn_info = *rcv_info;
+ ae_conn->conn.flow_info.fd = fd;
+ ae_conn->conn.flow_info.qs = qs;
+
+ list_head_init(&ae_conn->next);
+
+ pthread_mutex_lock(&ae->conn_lock);
+ list_add(&ae_conn->next, &ae->conn_list);
+ pthread_cond_signal(&ae->conn_cond);
+ pthread_mutex_unlock(&ae->conn_lock);
+
+ return 0;
+}
+
+static struct ae * find_ae_by_name(char * name)
+{
+ struct list_head * p = NULL;
+
+ list_for_each(p, &connmgr.aes) {
+ struct ae * ae = list_entry(p, struct ae, next);
+ if (strcmp(ae->info.ae_name, name) == 0)
+ return ae;
+ }
+
+ return NULL;
+}
+
+static void * flow_acceptor(void * o)
+{
+ int fd;
+ qosspec_t qs;
+ struct conn_info rcv_info;
+ struct conn_info fail_info;
+ struct ae * ae = NULL;
+
+ (void) o;
+
+ memset(&fail_info, 0, sizeof(fail_info));
+
+ while (true) {
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ log_info("Shutting down flow acceptor.");
+ return 0;
+ }
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ fd = flow_accept(&qs);
+ if (fd < 0) {
+ if (fd != -EIRMD)
+ log_warn("Flow accept failed: %d", fd);
+ continue;
+ }
+
+ if (flow_alloc_resp(fd, 0)) {
+ log_err("Failed to respond to flow alloc request.");
+ continue;
+ }
+
+ if (cacep_rcv(fd, &rcv_info)) {
+ log_err("Error establishing application connection.");
+ flow_dealloc(fd);
+ continue;
+ }
+
+ pthread_mutex_lock(&connmgr.aes_lock);
+ ae = find_ae_by_name(rcv_info.ae_name);
+ pthread_mutex_unlock(&connmgr.aes_lock);
+
+ if (ae != NULL) {
+ if (cacep_snd(fd, &ae->info)) {
+ log_err("Failed to respond to req.");
+ flow_dealloc(fd);
+ continue;
+ }
+
+ if (add_ae_conn(ae, fd, qs, &rcv_info)) {
+ log_err("Failed to add ae conn.");
+ flow_dealloc(fd);
+ continue;
+ }
+ } else {
+ cacep_snd(fd, &fail_info);
+ flow_dealloc(fd);
+ }
+ }
+
+ return (void *) 0;
+}
+
+int connmgr_init(void)
+{
+ list_head_init(&connmgr.aes);
+
+ if (pthread_mutex_init(&connmgr.aes_lock, NULL))
+ return -1;
+
+ return 0;
+}
+
+int connmgr_start(void)
+{
+ pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL);
+
+ return 0;
+}
+
+void connmgr_stop(void)
+{
+ pthread_cancel(connmgr.acceptor);
+ pthread_join(connmgr.acceptor, NULL);
+}
+
+static void destroy_ae(struct ae * ae)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ pthread_mutex_lock(&ae->conn_lock);
+
+ list_for_each_safe(p, h, &ae->conn_list) {
+ struct ae_conn * e = list_entry(p, struct ae_conn, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ pthread_mutex_unlock(&ae->conn_lock);
+
+ pthread_cond_destroy(&ae->conn_cond);
+ pthread_mutex_destroy(&ae->conn_lock);
+
+ free(ae);
+}
+
+void connmgr_fini(void)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ pthread_mutex_lock(&connmgr.aes_lock);
+
+ list_for_each_safe(p, n, &connmgr.aes) {
+ struct ae * e = list_entry(p, struct ae, next);
+ list_del(&e->next);
+ destroy_ae(e);
+ }
+
+ pthread_mutex_unlock(&connmgr.aes_lock);
+
+ pthread_mutex_destroy(&connmgr.aes_lock);
+}
+
+struct ae * connmgr_ae_create(struct conn_info info)
+{
+ struct ae * ae;
+
+ ae = malloc(sizeof(*ae));
+ if (ae == NULL)
+ return NULL;
+
+ list_head_init(&ae->next);
+ list_head_init(&ae->conn_list);
+
+ ae->info = info;
+
+ if (pthread_mutex_init(&ae->conn_lock, NULL)) {
+ free(ae);
+ return NULL;
+ }
+
+ if (pthread_cond_init(&ae->conn_cond, NULL)) {
+ pthread_mutex_destroy(&ae->conn_lock);
+ free(ae);
+ return NULL;
+ }
+
+ pthread_mutex_lock(&connmgr.aes_lock);
+ list_add(&ae->next, &connmgr.aes);
+ pthread_mutex_unlock(&connmgr.aes_lock);
+
+ return ae;
+}
+
+void connmgr_ae_destroy(struct ae * ae)
+{
+ assert(ae);
+
+ pthread_mutex_lock(&connmgr.aes_lock);
+
+ list_del(&ae->next);
+
+ destroy_ae(ae);
+
+ pthread_mutex_unlock(&connmgr.aes_lock);
+}
+
+int connmgr_alloc(struct ae * ae,
+ char * dst_name,
+ qosspec_t * qs,
+ struct conn * conn)
+{
+ assert(ae);
+ assert(dst_name);
+ assert(conn);
+
+ memset(&conn->conn_info, 0, sizeof(conn->conn_info));
+
+ conn->flow_info.fd = flow_alloc(dst_name, qs);
+ if (conn->flow_info.fd < 0) {
+ log_err("Failed to allocate flow to %s.", dst_name);
+ return -1;
+ }
+
+ if (qs != NULL)
+ conn->flow_info.qs = *qs;
+ else
+ memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs));
+
+ if (flow_alloc_res(conn->flow_info.fd)) {
+ log_err("Flow allocation to %s failed.", dst_name);
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ if (cacep_snd(conn->flow_info.fd, &ae->info)) {
+ log_err("Failed to create application connection.");
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) {
+ log_err("Failed to connect to application.");
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ if (strcmp(ae->info.protocol, conn->conn_info.protocol) ||
+ ae->info.pref_version != conn->conn_info.pref_version ||
+ ae->info.pref_syntax != conn->conn_info.pref_syntax) {
+ flow_dealloc(conn->flow_info.fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int connmgr_wait(struct ae * ae,
+ struct conn * conn)
+{
+ struct ae_conn * ae_conn;
+
+ assert(ae);
+ assert(conn);
+
+ pthread_mutex_lock(&ae->conn_lock);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &ae->conn_lock);
+
+ while (list_is_empty(&ae->conn_list))
+ pthread_cond_wait(&ae->conn_cond, &ae->conn_lock);
+
+ ae_conn = list_first_entry((&ae->conn_list), struct ae_conn, next);
+ if (ae_conn == NULL) {
+ pthread_mutex_unlock(&ae->conn_lock);
+ return -1;
+ }
+
+ *conn = ae_conn->conn;
+
+ list_del(&ae_conn->next);
+ free(ae_conn);
+
+ pthread_cleanup_pop(true);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
new file mode 100644
index 00000000..c0356f6d
--- /dev/null
+++ b/src/ipcpd/normal/connmgr.h
@@ -0,0 +1,57 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Handles the different AP connections
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_CONNMGR_H
+#define OUROBOROS_IPCPD_NORMAL_CONNMGR_H
+
+#include <ouroboros/cacep.h>
+#include <ouroboros/qos.h>
+
+struct conn {
+ struct conn_info conn_info;
+ struct flow_info {
+ int fd;
+ qosspec_t qs;
+ } flow_info;
+};
+
+int connmgr_init(void);
+
+void connmgr_fini(void);
+
+int connmgr_start(void);
+
+void connmgr_stop(void);
+
+struct ae * connmgr_ae_create(struct conn_info info);
+
+void connmgr_ae_destroy(struct ae * ae);
+
+int connmgr_alloc(struct ae * ae,
+ char * dst_name,
+ qosspec_t * qs,
+ struct conn * conn);
+
+int connmgr_wait(struct ae * ae,
+ struct conn * conn);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_CONNMGR_H */
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 55d6e3f6..ae9793c6 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -3,8 +3,8 @@
*
* DIF directory
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h
index 925fc823..04e722f3 100644
--- a/src/ipcpd/normal/dir.h
+++ b/src/ipcpd/normal/dir.h
@@ -3,7 +3,8 @@
*
* DIF directory
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/dt_const.h b/src/ipcpd/normal/dt_const.h
index c94e9395..327f51b8 100644
--- a/src/ipcpd/normal/dt_const.h
+++ b/src/ipcpd/normal/dt_const.h
@@ -3,8 +3,8 @@
*
* Data Transfer Constants for the IPCP
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c
index ce6768fb..7e15be11 100644
--- a/src/ipcpd/normal/enroll.c
+++ b/src/ipcpd/normal/enroll.c
@@ -3,7 +3,8 @@
*
* Enrollment Task
*
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -22,6 +23,8 @@
#include <ouroboros/config.h>
#include <ouroboros/endian.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/cdap.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/logs.h>
@@ -29,29 +32,35 @@
#include <ouroboros/errno.h>
#include "ae.h"
-#include "cdap_flow.h"
+#include "connmgr.h"
#include "ribconfig.h"
#include <assert.h>
#include <stdlib.h>
#include <string.h>
+#include <pthread.h>
/* Symbolic, will return current time */
#define TIME_NAME "localtime"
#define TIME_PATH DLR TIME_NAME
#define ENROLL_WARN_TIME_OFFSET 20
-int enroll_handle(int fd)
+struct {
+ struct ae * ae;
+ pthread_t listener;
+} enroll;
+
+static void * enroll_handle(void * o)
{
- struct cdap_flow * flow;
- struct cacep_info info;
- cdap_key_t key;
- enum cdap_opcode oc;
- char * name;
- uint8_t * buf;
- uint8_t * data;
- ssize_t len;
- uint32_t flags;
+ struct cdap * cdap;
+ struct conn conn;
+ cdap_key_t key;
+ enum cdap_opcode oc;
+ char * name;
+ uint8_t * buf;
+ uint8_t * data;
+ ssize_t len;
+ uint32_t flags;
bool boot_r = false;
bool members_r = false;
@@ -61,105 +70,105 @@ int enroll_handle(int fd)
char * members_ro = MEMBERS_PATH;
char * dif_ro = DIF_PATH;
- cacep_info_init(&info);
-
- info.proto.protocol = strdup(CDAP_PROTO);
- if (info.proto.protocol == NULL) {
- cacep_info_fini(&info);
- return -ENOMEM;
- }
+ (void) o;
- info.proto.pref_version = 1;
- info.proto.pref_syntax = PROTO_GPB;
-
- flow = cdap_flow_arr(fd, 0, ANONYMOUS_AUTH, &info);
- if (flow == NULL) {
- log_err("Failed to auth enrollment request.");
- cacep_info_fini(&info);
- flow_dealloc(fd);
- return -1;
- }
-
- cacep_info_fini(&info);
-
- while (!(boot_r && members_r && dif_name_r)) {
- key = cdap_request_wait(flow->ci, &oc, &name, &data,
- (size_t *) &len , &flags);
- assert(key >= 0);
- assert(name);
-
- if (data != NULL) {
- free(data);
- log_warn("Received data with enrollment request.");
+ while (true) {
+ cdap = cdap_create();
+ if (cdap == NULL) {
+ log_err("Failed to instantiate CDAP.");
+ continue;
}
- if (oc != CDAP_READ) {
- log_warn("Invalid request.");
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
+ if (connmgr_wait(enroll.ae, &conn)) {
+ log_err("Failed to get next connection.");
+ cdap_destroy(cdap);
+ continue;
}
- if (strcmp(name, boot_ro) == 0) {
- boot_r = true;
- } else if (strcmp(name, members_ro) == 0) {
- members_r = true;
- } else if (strcmp(name, dif_ro) == 0) {
- dif_name_r = true;
- } else if (strcmp(name, TIME_PATH) == 0) {
- struct timespec t;
- uint64_t buf[2];
- clock_gettime(CLOCK_REALTIME, &t);
- buf[0] = hton64(t.tv_sec);
- buf[1] = hton64(t.tv_nsec);
- cdap_reply_send(flow->ci, key, 0, buf, sizeof(buf));
- free(name);
+ if (cdap_add_flow(cdap, conn.flow_info.fd)) {
+ log_warn("Failed to add flow to CDAP.");
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
continue;
- } else {
- log_warn("Illegal read: %s.", name);
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
}
- len = rib_pack(name, &buf, PACK_HASH_ROOT);
- if (len < 0) {
- log_err("Failed to pack %s.", name);
- cdap_reply_send(flow->ci, key, -1, NULL, 0);
- cdap_flow_dealloc(flow);
- free(name);
- return -1;
- }
+ while (!(boot_r && members_r && dif_name_r)) {
+ key = cdap_request_wait(cdap, &oc, &name, &data,
+ (size_t *) &len , &flags);
+ assert(key >= 0);
+ assert(name);
+
+ if (data != NULL) {
+ free(data);
+ log_warn("Received data with enroll request.");
+ }
+
+ if (oc != CDAP_READ) {
+ log_warn("Invalid request.");
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ free(name);
+ continue;
+ }
+
+ if (strcmp(name, boot_ro) == 0) {
+ boot_r = true;
+ } else if (strcmp(name, members_ro) == 0) {
+ members_r = true;
+ } else if (strcmp(name, dif_ro) == 0) {
+ dif_name_r = true;
+ } else if (strcmp(name, TIME_PATH) == 0) {
+ struct timespec t;
+ uint64_t buf[2];
+ clock_gettime(CLOCK_REALTIME, &t);
+ buf[0] = hton64(t.tv_sec);
+ buf[1] = hton64(t.tv_nsec);
+ cdap_reply_send(cdap, key, 0, buf, sizeof(buf));
+ free(name);
+ continue;
+ } else {
+ log_warn("Illegal read: %s.", name);
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ free(name);
+ continue;
+ }
+
+ len = rib_pack(name, &buf, PACK_HASH_ROOT);
+ if (len < 0) {
+ log_err("Failed to pack %s.", name);
+ cdap_reply_send(cdap, key, -1, NULL, 0);
+ free(name);
+ continue;
+ }
+
+ log_dbg("Packed %s (%zu bytes).", name, len);
- log_dbg("Packed %s (%zu bytes).", name, len);
+ free(name);
- free(name);
+ if (cdap_reply_send(cdap, key, 0, buf, len)) {
+ log_err("Failed to send CDAP reply.");
+ free(buf);
+ continue;
+ }
- if (cdap_reply_send(flow->ci, key, 0, buf, len)) {
- log_err("Failed to send CDAP reply.");
- cdap_flow_dealloc(flow);
- return -1;
+ free(buf);
}
- free(buf);
- }
-
- log_dbg("Sent boot info to new member.");
+ log_dbg("Sent boot info to new member.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ }
return 0;
}
int enroll_boot(char * dst_name)
{
- struct cdap_flow * flow;
- struct cacep_info info;
- cdap_key_t key;
- uint8_t * data;
- size_t len;
+ struct cdap * cdap;
+ cdap_key_t * key;
+ uint8_t * data;
+ size_t len;
+ struct conn conn;
struct timespec t0;
struct timespec rtt;
@@ -170,49 +179,52 @@ int enroll_boot(char * dst_name)
char * members_ro = MEMBERS_PATH;
char * dif_ro = DIF_PATH;
- cacep_info_init(&info);
-
- info.proto.protocol = strdup(CDAP_PROTO);
- if (info.proto.protocol == NULL) {
- cacep_info_fini(&info);
- return -ENOMEM;
+ cdap = cdap_create();
+ if (cdap == NULL) {
+ log_err("Failed to instantiate CDAP.");
+ return -1;
}
- info.proto.pref_version = 1;
- info.proto.pref_syntax = PROTO_GPB;
-
- flow = cdap_flow_alloc(dst_name, ENROLL_AE, NULL, ANONYMOUS_AUTH,
- &info);
- if (flow == NULL) {
- log_err("Failed to allocate flow for enrollment request.");
- cacep_info_fini(&info);
+ if (connmgr_alloc(enroll.ae, dst_name, NULL, &conn)) {
+ log_err("Failed to get connection.");
+ cdap_destroy(cdap);
return -1;
}
- cacep_info_fini(&info);
+ if (cdap_add_flow(cdap, conn.flow_info.fd)) {
+ log_warn("Failed to add flow to CDAP.");
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ return -1;
+ }
log_dbg("Getting boot information from %s.", dst_name);
clock_gettime(CLOCK_REALTIME, &t0);
- key = cdap_request_send(flow->ci, CDAP_READ, TIME_PATH, NULL, 0, 0);
- if (key < 0) {
+ key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0);
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ free(key);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
clock_gettime(CLOCK_REALTIME, &rtt);
delta_t = ts_diff_ms(&t0, &rtt);
- assert (len == 2 * sizeof (uint64_t));
+ assert(len == 2 * sizeof (uint64_t));
rtt.tv_sec = ntoh64(((uint64_t *) data)[0]);
rtt.tv_nsec = ntoh64(((uint64_t *) data)[1]);
@@ -222,82 +234,138 @@ int enroll_boot(char * dst_name)
free(data);
- key = cdap_request_send(flow->ci, CDAP_READ, boot_ro, NULL, 0, 0);
- if (key < 0) {
+ key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0);
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ free(key);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- key = cdap_request_send(flow->ci, CDAP_READ, members_ro, NULL, 0, 0);
- if (key < 0) {
+ key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0);
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ free(key);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- key = cdap_request_send(flow->ci, CDAP_READ, dif_ro, NULL, 0, 0);
- if (key < 0) {
+ key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0);
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(flow->ci, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
- cdap_flow_dealloc(flow);
+ free(key);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
log_warn("Error unpacking RIB data.");
rib_del(boot_ro);
free(data);
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
return -1;
}
log_dbg("Packed information inserted into RIB.");
- cdap_flow_dealloc(flow);
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+
+ return 0;
+}
+
+int enroll_init(void)
+{
+ struct conn_info info;
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, ENROLL_AE);
+ strcpy(info.protocol, CDAP_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+
+ enroll.ae = connmgr_ae_create(info);
+ if (enroll.ae == NULL)
+ return -1;
return 0;
}
+
+void enroll_fini(void)
+{
+ connmgr_ae_destroy(enroll.ae);
+}
+
+int enroll_start(void)
+{
+ if (pthread_create(&enroll.listener, NULL, enroll_handle, NULL))
+ return -1;
+
+ return 0;
+}
+
+void enroll_stop(void)
+{
+ pthread_cancel(enroll.listener);
+ pthread_join(enroll.listener, NULL);
+}
diff --git a/src/ipcpd/normal/enroll.h b/src/ipcpd/normal/enroll.h
index 2980c380..05f950ba 100644
--- a/src/ipcpd/normal/enroll.h
+++ b/src/ipcpd/normal/enroll.h
@@ -3,7 +3,8 @@
*
* Enrollment Task
*
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -22,8 +23,14 @@
#ifndef OUROBOROS_IPCPD_NORMAL_ENROLL_H
#define OUROBOROS_IPCPD_NORMAL_ENROLL_H
-int enroll_handle(int fd);
+int enroll_init(void);
-int enroll_boot(char * dst_name);
+void enroll_fini(void);
+
+int enroll_start(void);
+
+void enroll_stop(void);
+
+int enroll_boot(char * dst_name);
#endif /* OUROBOROS_IPCPD_NORMAL_ENROLL_H */
diff --git a/src/ipcpd/normal/flow_alloc.proto b/src/ipcpd/normal/flow_alloc.proto
index 16e8be2c..3b08f047 100644
--- a/src/ipcpd/normal/flow_alloc.proto
+++ b/src/ipcpd/normal/flow_alloc.proto
@@ -31,7 +31,6 @@ enum flow_alloc_code {
message flow_alloc_msg {
required flow_alloc_code code = 1;
optional string dst_name = 2;
- optional string src_ae_name = 3;
- optional uint32 qoscube = 4;
- optional sint32 response = 5;
+ optional uint32 qoscube = 3;
+ optional sint32 response = 4;
};
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index c2b53abf..184baf82 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -3,7 +3,8 @@
*
* Flow manager of the IPC Process
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -31,12 +32,16 @@
#include <ouroboros/cacep.h>
#include <ouroboros/rib.h>
+#include "connmgr.h"
#include "fmgr.h"
#include "frct.h"
#include "ipcp.h"
#include "shm_pci.h"
-#include "gam.h"
#include "ribconfig.h"
+#include "pff.h"
+#include "neighbors.h"
+#include "gam.h"
+#include "routing.h"
#include <stdlib.h>
#include <stdbool.h>
@@ -48,19 +53,7 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct nm1_flow {
- struct list_head next;
- int fd;
- qosspec_t qs;
- struct cacep_info * info;
-};
-
struct {
- flow_set_t * nm1_set[QOS_CUBE_MAX];
- fqueue_t * nm1_fqs[QOS_CUBE_MAX];
- struct list_head nm1_flows;
- pthread_rwlock_t nm1_flows_lock;
-
flow_set_t * np1_set[QOS_CUBE_MAX];
fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
@@ -69,15 +62,43 @@ struct {
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
pthread_t np1_sdu_reader;
+
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_t nm1_sdu_reader;
- pthread_t nm1_flow_wait;
- /* FIXME: Replace with PFF */
- int fd;
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing_i * routing[QOS_CUBE_MAX];
struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
} fmgr;
+static int fmgr_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ qoscube_t cube;
+
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_add(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ ipcp_flow_get_qoscube(conn.flow_info.fd, &cube);
+ flow_set_del(fmgr.nm1_set[cube], conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
@@ -162,7 +183,7 @@ void * fmgr_nm1_sdu_reader(void * o)
shm_pci_des(sdb, &pci);
- if (pci.dst_addr != ipcpi.address) {
+ if (pci.dst_addr != ipcpi.dt_addr) {
log_dbg("PDU needs to be forwarded.");
if (pci.ttl == 0) {
@@ -171,12 +192,20 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- continue;
+ fd = pff_nhop(fmgr.pff[i], pci.dst_addr);
+ if (fd < 0) {
+ log_err("No next hop for %lu",
+ pci.dst_addr);
+ ipcp_flow_del(sdb);
+ continue;
+ }
+
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.",
+ fd);
+ ipcp_flow_del(sdb);
+ continue;
+ }
}
shm_pci_shrink(sdb);
@@ -192,49 +221,6 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-static void * fmgr_nm1_flow_wait(void * o)
-{
- qoscube_t cube;
- struct cacep_info * info;
- int fd;
- qosspec_t qs;
- struct nm1_flow * flow;
-
- (void) o;
-
- while (true) {
- if (gam_flow_wait(fmgr.gam, &fd, &info, &qs)) {
- log_err("Failed to get next flow descriptor.");
- continue;
- }
-
- ipcp_flow_get_qoscube(fd, &cube);
- flow_set_add(fmgr.nm1_set[cube], fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- flow = malloc(sizeof(*flow));
- if (flow == NULL) {
- free(info);
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- continue;
- }
-
- flow->info = info;
- flow->fd = fd;
- flow->qs = qs;
-
- list_head_init(&flow->next);
- list_add(&flow->next, &fmgr.nm1_flows);
-
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
- }
-
- return (void *) 0;
-}
-
static void fmgr_destroy_flows(void)
{
int i;
@@ -247,13 +233,29 @@ static void fmgr_destroy_flows(void)
}
}
-int fmgr_init(void)
+static void fmgr_destroy_routing(void)
{
- enum pol_cacep pc;
- enum pol_gam pg;
+ int i;
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_i_destroy(fmgr.routing[i]);
+}
+static void fmgr_destroy_pff(void)
+{
int i;
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(fmgr.pff[i]);
+}
+
+int fmgr_init(void)
+{
+ enum pol_gam pg;
+ int i;
+ int j;
+ struct conn_info info;
+
for (i = 0; i < AP_MAX_FLOWS; ++i)
fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
@@ -289,78 +291,135 @@ int fmgr_init(void)
if (rib_read(BOOT_PATH "/dt/gam/type", &pg, sizeof(pg))
!= sizeof(pg)) {
log_err("Failed to read policy for ribmgr gam.");
+ fmgr_destroy_flows();
return -1;
}
- if (rib_read(BOOT_PATH "/dt/gam/cacep", &pc, sizeof(pc))
- != sizeof(pc)) {
- log_err("Failed to read CACEP policy for ribmgr gam.");
+ strcpy(info.ae_name, DT_AE);
+ strcpy(info.protocol, FRCT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ fmgr.ae = connmgr_ae_create(info);
+ if (fmgr.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ fmgr_destroy_flows();
return -1;
}
- /* FIXME: Implement cacep policies */
- (void) pc;
+ fmgr.nbs = nbs_create();
+ if (fmgr.nbs == NULL) {
+ log_err("Failed to create neighbors struct.");
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
- fmgr.gam = gam_create(pg, DT_AE);
- if (fmgr.gam == NULL) {
- log_err("Failed to create graph adjacency manager.");
+ if (routing_init(fmgr.nbs)) {
+ log_err("Failed to init routing.");
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ fmgr.nb_notifier.notify_call = fmgr_neighbor_event;
+ if (nbs_reg_notifier(fmgr.nbs, &fmgr.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ if (pthread_rwlock_init(&fmgr.np1_flows_lock, NULL)) {
+ gam_destroy(fmgr.gam);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
return -1;
}
- list_head_init(&fmgr.nm1_flows);
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.pff[i] = pff_create();
+ if (fmgr.pff[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ pff_destroy(fmgr.pff[j]);
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+
+ fmgr.routing[i] = routing_i_create(fmgr.pff[i]);
+ if (fmgr.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_i_destroy(fmgr.routing[j]);
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
+ }
- pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
- pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
+ fmgr.gam = gam_create(pg, fmgr.nbs, fmgr.ae);
+ if (fmgr.gam == NULL) {
+ log_err("Failed to init dt graph adjacency manager.");
+ fmgr_destroy_routing();
+ fmgr_destroy_pff();
+ pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
+ routing_fini();
+ nbs_destroy(fmgr.nbs);
+ fmgr_destroy_flows();
+ connmgr_ae_destroy(fmgr.ae);
+ return -1;
+ }
pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
- pthread_create(&fmgr.nm1_flow_wait, NULL, fmgr_nm1_flow_wait, NULL);
return 0;
}
void fmgr_fini()
{
- struct list_head * pos = NULL;
- struct list_head * n = NULL;
- qoscube_t cube;
-
pthread_cancel(fmgr.np1_sdu_reader);
pthread_cancel(fmgr.nm1_sdu_reader);
- pthread_cancel(fmgr.nm1_flow_wait);
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- pthread_join(fmgr.nm1_flow_wait, NULL);
+
+ nbs_unreg_notifier(fmgr.nbs, &fmgr.nb_notifier);
gam_destroy(fmgr.gam);
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
-
- list_for_each_safe(pos, n, &fmgr.nm1_flows) {
- struct nm1_flow * flow =
- list_entry(pos, struct nm1_flow, next);
- list_del(&flow->next);
- flow_dealloc(flow->fd);
- ipcp_flow_get_qoscube(flow->fd, &cube);
- flow_set_del(fmgr.nm1_set[cube], flow->fd);
- free(flow->info->name);
- free(flow->info);
- free(flow);
- }
+ fmgr_destroy_routing();
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
+ fmgr_destroy_pff();
- pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
- pthread_rwlock_destroy(&fmgr.np1_flows_lock);
+ routing_fini();
fmgr_destroy_flows();
+
+ connmgr_ae_destroy(fmgr.ae);
+
+ nbs_destroy(fmgr.nbs);
}
int fmgr_np1_alloc(int fd,
char * dst_ap_name,
- char * src_ae_name,
qoscube_t cube)
{
cep_id_t cep_id;
@@ -406,7 +465,6 @@ int fmgr_np1_alloc(int fd,
msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
msg.dst_name = dst_ap_name;
- msg.src_ae_name = src_ae_name;
msg.has_qoscube = true;
msg.qoscube = cube;
@@ -546,7 +604,6 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
case FLOW_ALLOC_CODE__FLOW_REQ:
fd = ipcp_flow_req_arr(getpid(),
msg->dst_name,
- msg->src_ae_name,
msg->qoscube);
if (fd < 0) {
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -615,24 +672,20 @@ int fmgr_np1_post_sdu(cep_id_t cep_id,
return 0;
}
-int fmgr_nm1_flow_arr(int fd,
- qosspec_t qs)
-{
- assert(fmgr.gam);
-
- if (gam_flow_arr(fmgr.gam, fd, qs)) {
- log_err("Failed to hand to graph adjacency manager.");
- return -1;
- }
-
- return 0;
-}
-
int fmgr_nm1_write_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
+ int fd;
+
if (pci == NULL || sdb == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ ipcp_flow_del(sdb);
return -1;
+ }
if (shm_pci_ser(sdb, pci)) {
log_err("Failed to serialize PDU.");
@@ -640,8 +693,8 @@ int fmgr_nm1_write_sdu(struct pci * pci,
return -1;
}
- if (ipcp_flow_write(fmgr.fd, sdb)) {
- log_err("Failed to write SDU to fd %d.", fmgr.fd);
+ if (ipcp_flow_write(fd, sdb)) {
+ log_err("Failed to write SDU to fd %d.", fd);
ipcp_flow_del(sdb);
return -1;
}
@@ -653,9 +706,17 @@ int fmgr_nm1_write_buf(struct pci * pci,
buffer_t * buf)
{
buffer_t * buffer;
+ int fd;
if (pci == NULL || buf == NULL || buf->data == NULL)
+ return -EINVAL;
+
+ fd = pff_nhop(fmgr.pff[pci->qos_id], pci->dst_addr);
+ if (fd < 0) {
+ log_err("Could not get nhop for address %lu", pci->dst_addr);
+ free(buf->data);
return -1;
+ }
buffer = shm_pci_ser_buf(buf, pci);
if (buffer == NULL) {
@@ -664,7 +725,7 @@ int fmgr_nm1_write_buf(struct pci * pci,
return -1;
}
- if (flow_write(fmgr.fd, buffer->data, buffer->len) == -1) {
+ if (flow_write(fd, buffer->data, buffer->len) == -1) {
log_err("Failed to write buffer to fd.");
free(buffer);
return -1;
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index 3c61f55a..f5076eab 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -3,7 +3,8 @@
*
* Flow manager of the IPC Process
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -34,7 +35,6 @@ void fmgr_fini(void);
int fmgr_np1_alloc(int fd,
char * dst_ap_name,
- char * src_ae_name,
qoscube_t qos);
int fmgr_np1_alloc_resp(int fd,
@@ -54,8 +54,4 @@ int fmgr_nm1_write_sdu(struct pci * pci,
int fmgr_nm1_write_buf(struct pci * pci,
buffer_t * buf);
-int fmgr_nm1_flow_arr(int fd,
- qosspec_t qs);
-
-
#endif /* OUROBOROS_IPCPD_NORMAL_FMGR_H */
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 915feaf8..62cbf9f7 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -3,7 +3,8 @@
*
* The Flow and Retransmission control component
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -198,12 +199,12 @@ int frct_fini()
return 0;
}
-int frct_nm1_post_sdu(struct pci * pci,
+int frct_nm1_post_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
struct frct_i * instance;
- buffer_t buf;
- cep_id_t id;
+ buffer_t buf;
+ cep_id_t id;
if (pci == NULL || sdb == NULL)
return -1;
@@ -267,8 +268,8 @@ cep_id_t frct_i_create(uint64_t address,
qoscube_t cube)
{
struct frct_i * instance;
- struct pci pci;
- cep_id_t id;
+ struct pci pci;
+ cep_id_t id;
if (buf == NULL || buf->data == NULL)
return INVALID_CEP_ID;
@@ -285,7 +286,7 @@ cep_id_t frct_i_create(uint64_t address,
pci.pdu_type = PDU_TYPE_MGMT;
pci.dst_addr = address;
- pci.src_addr = ipcpi.address;
+ pci.src_addr = ipcpi.dt_addr;
pci.dst_cep_id = 0;
pci.src_cep_id = id;
pci.seqno = 0;
@@ -304,7 +305,7 @@ int frct_i_accept(cep_id_t id,
buffer_t * buf,
qoscube_t cube)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
if (buf == NULL || buf->data == NULL)
@@ -330,7 +331,7 @@ int frct_i_accept(cep_id_t id,
pci.pdu_type = PDU_TYPE_MGMT;
pci.dst_addr = instance->r_address;
- pci.src_addr = ipcpi.address;
+ pci.src_addr = ipcpi.dt_addr;
pci.dst_cep_id = instance->r_cep_id;
pci.src_cep_id = instance->cep_id;
pci.seqno = 0;
@@ -347,7 +348,7 @@ int frct_i_accept(cep_id_t id,
int frct_i_destroy(cep_id_t id,
buffer_t * buf)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
pthread_mutex_lock(&frct.instances_lock);
@@ -367,7 +368,7 @@ int frct_i_destroy(cep_id_t id,
pci.pdu_type = PDU_TYPE_MGMT;
pci.dst_addr = instance->r_address;
- pci.src_addr = ipcpi.address;
+ pci.src_addr = ipcpi.dt_addr;
pci.dst_cep_id = instance->r_cep_id;
pci.src_cep_id = instance->cep_id;
pci.seqno = 0;
@@ -390,7 +391,7 @@ int frct_i_destroy(cep_id_t id,
int frct_i_write_sdu(cep_id_t id,
struct shm_du_buff * sdb)
{
- struct pci pci;
+ struct pci pci;
struct frct_i * instance;
if (sdb == NULL)
@@ -413,7 +414,7 @@ int frct_i_write_sdu(cep_id_t id,
pci.pdu_type = PDU_TYPE_DTP;
pci.dst_addr = instance->r_address;
- pci.src_addr = ipcpi.address;
+ pci.src_addr = ipcpi.dt_addr;
pci.dst_cep_id = instance->r_cep_id;
pci.src_cep_id = instance->cep_id;
pci.seqno = (instance->seqno)++;
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 462b8cc3..a1dcb151 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -3,7 +3,8 @@
*
* The Flow and Retransmission control component
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -27,6 +28,8 @@
#include "shm_pci.h"
+#define FRCT_PROTO "FRCT"
+
struct frct_i;
int frct_init(void);
diff --git a/src/ipcpd/normal/fso.proto b/src/ipcpd/normal/fso.proto
new file mode 100644
index 00000000..32b281d6
--- /dev/null
+++ b/src/ipcpd/normal/fso.proto
@@ -0,0 +1,29 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow State Object message
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+syntax = "proto2";
+
+message fso {
+ required uint64 s_addr = 1;
+ required uint64 d_addr = 2;
+ /* Add QoS parameters of link here */
+};
diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c
index 791cf34e..643d83b0 100644
--- a/src/ipcpd/normal/gam.c
+++ b/src/ipcpd/normal/gam.c
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Data transfer graph adjacency manager
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -20,7 +20,7 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "graph-adjacency-manager"
+#define OUROBOROS_PREFIX "dt-gam"
#include <ouroboros/config.h>
#include <ouroboros/cdap.h>
@@ -40,295 +40,43 @@
#include <pthread.h>
#include <string.h>
-struct ga {
- struct list_head next;
-
- qosspec_t qs;
- int fd;
- struct cacep_info * info;
-};
-
struct gam {
- struct list_head gas;
- pthread_mutex_t gas_lock;
- pthread_cond_t gas_cond;
-
- char * ae_name;
-
struct pol_gam_ops * ops;
void * ops_o;
};
struct gam * gam_create(enum pol_gam gam_type,
- const char * ae_name)
+ struct nbs * nbs,
+ struct ae * ae)
{
- struct gam * tmp;
+ struct gam * gam;
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
+ gam = malloc(sizeof(*gam));
+ if (gam == NULL)
return NULL;
switch (gam_type) {
case COMPLETE:
- tmp->ops = &complete_ops;
+ gam->ops = &complete_ops;
break;
default:
log_err("Unknown gam policy: %d.", gam_type);
- free(tmp);
return NULL;
}
- list_head_init(&tmp->gas);
-
- tmp->ae_name = strdup(ae_name);
- if (tmp->ae_name == NULL) {
- free(tmp);
+ gam->ops_o = gam->ops->create(nbs, ae);
+ if (gam->ops_o == NULL) {
+ free(gam);
return NULL;
}
- if (pthread_mutex_init(&tmp->gas_lock, NULL)) {
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- if (pthread_cond_init(&tmp->gas_cond, NULL)) {
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- tmp->ops_o = tmp->ops->create(tmp);
- if (tmp->ops_o == NULL) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- if (tmp->ops->start(tmp->ops_o)) {
- pthread_cond_destroy(&tmp->gas_cond);
- pthread_mutex_destroy(&tmp->gas_lock);
- free(tmp->ae_name);
- free(tmp);
- return NULL;
- }
-
- return tmp;
-}
-
-void gam_destroy(struct gam * instance)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- assert(instance);
-
- instance->ops->stop(instance->ops_o);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- list_for_each_safe(p, n, &instance->gas) {
- struct ga * e = list_entry(p, struct ga, next);
- list_del(&e->next);
- free(e->info->name);
- free(e->info);
- free(e);
- }
-
- pthread_mutex_unlock(&instance->gas_lock);
-
- pthread_mutex_destroy(&instance->gas_lock);
- pthread_cond_destroy(&instance->gas_cond);
-
- free(instance->ae_name);
- instance->ops->destroy(instance->ops_o);
- free(instance);
-}
-
-static int add_ga(struct gam * instance,
- int fd,
- qosspec_t qs,
- struct cacep_info * info)
-{
- struct ga * ga;
-
- ga = malloc(sizeof(*ga));
- if (ga == NULL)
- return -ENOMEM;
-
- ga->fd = fd;
- ga->info = info;
- ga->qs = qs;
-
- list_head_init(&ga->next);
-
- pthread_mutex_lock(&instance->gas_lock);
- list_add(&ga->next, &instance->gas);
- pthread_cond_signal(&instance->gas_cond);
- pthread_mutex_unlock(&instance->gas_lock);
-
- log_info("Added %s flow to %s.", instance->ae_name, info->name);
-
- return 0;
-}
-
-int gam_flow_arr(struct gam * instance,
- int fd,
- qosspec_t qs)
-{
- struct cacep_info * rcv_info;
- struct cacep_info snd_info;
-
- if (flow_alloc_resp(fd, instance->ops->accept_new_flow(instance->ops_o))
- < 0) {
- log_err("Could not respond to new flow.");
- return -1;
- }
-
- cacep_info_init(&snd_info);
- snd_info.proto.protocol = strdup(CDAP_PROTO);
- if (snd_info.proto.protocol == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- snd_info.proto.pref_version = 1;
- snd_info.proto.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.address;
- snd_info.name = strdup(ipcpi.name);
- if (snd_info.name == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- rcv_info = cacep_auth_wait(fd, SIMPLE_AUTH, &snd_info);
- if (rcv_info == NULL) {
- log_err("Other side failed to authenticate.");
- cacep_info_fini(&snd_info);
- return -1;
- }
-
- cacep_info_fini(&snd_info);
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add ga to graph adjacency manager list.");
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return -1;
- }
-
- return 0;
-}
-
-int gam_flow_alloc(struct gam * instance,
- char * dst_name,
- qosspec_t qs)
-{
- struct cacep_info * rcv_info;
- struct cacep_info snd_info;
- int fd;
-
- log_dbg("Allocating flow to %s.", dst_name);
-
- fd = flow_alloc(dst_name, instance->ae_name, NULL);
- if (fd < 0) {
- log_err("Failed to allocate flow to %s.", dst_name);
- return -1;
- }
-
- if (flow_alloc_res(fd)) {
- log_err("Flow allocation to %s failed.", dst_name);
- flow_dealloc(fd);
- return -1;
- }
-
- cacep_info_init(&snd_info);
- snd_info.proto.protocol = strdup(CDAP_PROTO);
- if (snd_info.proto.protocol == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- snd_info.proto.pref_version = 1;
- snd_info.proto.pref_syntax = PROTO_GPB;
- snd_info.addr = ipcpi.address;
- snd_info.name = strdup(ipcpi.name);
- if (snd_info.name == NULL) {
- cacep_info_fini(&snd_info);
- return -ENOMEM;
- }
-
- rcv_info = cacep_auth(fd, SIMPLE_AUTH, &snd_info);
- if (rcv_info == NULL) {
- log_err("Other side failed to authenticate.");
- cacep_info_fini(&snd_info);
- return -1;
- }
-
- cacep_info_fini(&snd_info);
-
- if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) {
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return 0;
- }
-
- if (add_ga(instance, fd, qs, rcv_info)) {
- log_err("Failed to add GA to graph adjacency manager list.");
- flow_dealloc(fd);
- cacep_info_fini(rcv_info);
- free(rcv_info);
- return -1;
- }
-
- return 0;
+ return gam;
}
-int gam_flow_wait(struct gam * instance,
- int * fd,
- struct cacep_info ** info,
- qosspec_t * qs)
+void gam_destroy(struct gam * gam)
{
- struct ga * ga;
-
- assert(fd);
- assert(info);
- assert(qs);
-
- pthread_mutex_lock(&instance->gas_lock);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &instance->gas_lock);
-
- while (list_is_empty(&instance->gas))
- pthread_cond_wait(&instance->gas_cond, &instance->gas_lock);
-
- ga = list_first_entry((&instance->gas), struct ga, next);
- if (ga == NULL) {
- pthread_mutex_unlock(&instance->gas_lock);
- return -1;
- }
-
- *fd = ga->fd;
- *info = ga->info;
- *qs = ga->qs;
-
- list_del(&ga->next);
- free(ga);
-
- pthread_cleanup_pop(true);
+ assert(gam);
- return 0;
+ gam->ops->destroy(gam->ops_o);
+ free(gam);
}
diff --git a/src/ipcpd/normal/gam.h b/src/ipcpd/normal/gam.h
index 50f83df9..4ae0b1b3 100644
--- a/src/ipcpd/normal/gam.h
+++ b/src/ipcpd/normal/gam.h
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Data transfer graph adjacency manager
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -26,22 +26,12 @@
#include <ouroboros/cacep.h>
#include <ouroboros/irm_config.h>
-struct gam * gam_create(enum pol_gam gam_type,
- const char * ae_name);
-
-void gam_destroy(struct gam * instance);
+#include "neighbors.h"
-int gam_flow_arr(struct gam * instance,
- int fd,
- qosspec_t qs);
-
-int gam_flow_alloc(struct gam * instance,
- char * dst_name,
- qosspec_t qs);
+struct gam * gam_create(enum pol_gam gam_type,
+ struct nbs * nbs,
+ struct ae * ae);
-int gam_flow_wait(struct gam * instance,
- int * fd,
- struct cacep_info ** info,
- qosspec_t * qs);
+void gam_destroy(struct gam * gam);
#endif /* OUROBOROS_IPCPD_NORMAL_GAM_H */
diff --git a/src/ipcpd/normal/graph.c b/src/ipcpd/normal/graph.c
new file mode 100644
index 00000000..85bb3fe2
--- /dev/null
+++ b/src/ipcpd/normal/graph.c
@@ -0,0 +1,277 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Graph structure
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "graph"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/list.h>
+
+#include "graph.h"
+
+#include <assert.h>
+#include <pthread.h>
+#include <stdlib.h>
+
+static struct edge * find_edge_by_addr(struct vertex * vertex,
+ uint64_t dst_addr)
+{
+ struct list_head * p = NULL;
+
+ list_for_each(p, &vertex->edges) {
+ struct edge * e = list_entry(p, struct edge, next);
+ if (e->dst_addr == dst_addr)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct vertex * find_vertex_by_addr(struct graph * graph,
+ uint64_t addr)
+{
+ struct list_head * p = NULL;
+
+ list_for_each(p, &graph->vertices) {
+ struct vertex * e = list_entry(p, struct vertex, next);
+ if (e->addr == addr)
+ return e;
+ }
+
+ return NULL;
+}
+
+static int add_edge(struct vertex * vertex,
+ uint64_t dst_addr,
+ qosspec_t qs)
+{
+ struct edge * edge;
+
+ edge = malloc(sizeof(*edge));
+ if (edge == NULL)
+ return -ENOMEM;
+
+ list_head_init(&edge->next);
+ edge->dst_addr = dst_addr;
+ edge->qs = qs;
+
+ list_add(&edge->next, &vertex->edges);
+
+ return 0;
+}
+
+static void del_edge(struct edge * edge)
+{
+ list_del(&edge->next);
+ free(edge);
+}
+
+static int add_vertex(struct graph * graph,
+ uint64_t addr)
+{
+ struct vertex * vertex;
+ struct list_head * p;
+
+ vertex = malloc(sizeof(*vertex));
+ if (vertex == NULL)
+ return -1;
+
+ list_head_init(&vertex->next);
+ list_head_init(&vertex->edges);
+ vertex->addr = addr;
+
+ list_for_each(p, &graph->vertices) {
+ struct vertex * v = list_entry(p, struct vertex, next);
+ if (v->addr > addr)
+ break;
+ }
+
+ list_add_tail(&vertex->next, p);
+
+ graph->nr_vertices++;
+
+ return 0;
+}
+
+static void del_vertex(struct graph * graph,
+ struct vertex * vertex)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ list_del(&vertex->next);
+
+ list_for_each_safe(p, n, &vertex->edges) {
+ struct edge * e = list_entry(p, struct edge, next);
+ del_edge(e);
+ }
+
+ free(vertex);
+
+ graph->nr_vertices--;
+}
+
+struct graph * graph_create(void)
+{
+ struct graph * graph;
+
+ graph = malloc(sizeof(*graph));
+ if (graph == NULL)
+ return NULL;
+
+ if (pthread_mutex_init(&graph->lock, NULL)) {
+ free(graph);
+ return NULL;
+ }
+
+ graph->nr_vertices = 0;
+ list_head_init(&graph->vertices);
+
+ return graph;
+}
+
+void graph_destroy(struct graph * graph)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ assert(graph);
+
+ pthread_mutex_lock(&graph->lock);
+
+ list_for_each_safe(p, n, &graph->vertices) {
+ struct vertex * e = list_entry(p, struct vertex, next);
+ del_vertex(graph, e);
+ }
+
+ pthread_mutex_unlock(&graph->lock);
+
+ pthread_mutex_destroy(&graph->lock);
+}
+
+int graph_add_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr,
+ qosspec_t qs)
+{
+ struct vertex * v;
+ struct edge * e;
+
+ assert(graph);
+
+ pthread_mutex_lock(&graph->lock);
+
+ v = find_vertex_by_addr(graph, s_addr);
+ if (v == NULL) {
+ if (add_vertex(graph, s_addr)) {
+ pthread_mutex_unlock(&graph->lock);
+ return -ENOMEM;
+ }
+ }
+
+ e = find_edge_by_addr(v, d_addr);
+ if (e != NULL) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("Edge already exists.");
+ return -1;
+ }
+
+ if (add_edge(v, d_addr, qs)) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("Failed to add edge.");
+ return -1;
+ }
+
+ pthread_mutex_unlock(&graph->lock);
+
+ return 0;
+}
+
+int graph_update_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr,
+ qosspec_t qs)
+{
+ struct vertex * v;
+ struct edge * e;
+
+ assert(graph);
+
+ pthread_mutex_lock(&graph->lock);
+
+ v = find_vertex_by_addr(graph, s_addr);
+ if (v == NULL) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("No such vertex.");
+ return -1;
+ }
+
+ e = find_edge_by_addr(v, d_addr);
+ if (e == NULL) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("No such edge.");
+ return -1;
+ }
+
+ e->qs = qs;
+
+ pthread_mutex_unlock(&graph->lock);
+
+ return 0;
+}
+
+int graph_del_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr)
+{
+ struct vertex * v;
+ struct edge * e;
+
+ assert(graph);
+
+ pthread_mutex_lock(&graph->lock);
+
+ v = find_vertex_by_addr(graph, s_addr);
+ if (v == NULL) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("No such vertex.");
+ return -1;
+ }
+
+ e = find_edge_by_addr(v, d_addr);
+ if (e == NULL) {
+ pthread_mutex_unlock(&graph->lock);
+ log_err("No such edge.");
+ return -1;
+ }
+
+ del_edge(e);
+
+ /* Removing vertex if it was the last edge */
+ if (list_is_empty(&v->edges))
+ del_vertex(graph, v);
+
+ pthread_mutex_unlock(&graph->lock);
+
+ return 0;
+}
diff --git a/src/ipcpd/normal/graph.h b/src/ipcpd/normal/graph.h
new file mode 100644
index 00000000..9653efd7
--- /dev/null
+++ b/src/ipcpd/normal/graph.h
@@ -0,0 +1,67 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Graph structure
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_GRAPH_H
+#define OUROBOROS_IPCPD_NORMAL_GRAPH_H
+
+#include <ouroboros/list.h>
+#include <ouroboros/qos.h>
+
+#include <inttypes.h>
+
+struct edge {
+ struct list_head next;
+ uint64_t dst_addr;
+ qosspec_t qs;
+};
+
+struct vertex {
+ struct list_head next;
+ uint64_t addr;
+ struct list_head edges;
+};
+
+struct graph {
+ size_t nr_vertices;
+ struct list_head vertices;
+ pthread_mutex_t lock;
+};
+
+struct graph * graph_create(void);
+
+void graph_destroy(struct graph * graph);
+
+int graph_add_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr,
+ qosspec_t qs);
+
+int graph_update_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr,
+ qosspec_t qs);
+
+int graph_del_edge(struct graph * graph,
+ uint64_t s_addr,
+ uint64_t d_addr);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_GRAPH_H */
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 522daa3b..398d6ee3 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -3,8 +3,8 @@
*
* Normal IPC Process
*
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -24,7 +24,6 @@
#include <ouroboros/config.h>
#include <ouroboros/logs.h>
-#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/irm.h>
@@ -33,11 +32,10 @@
#include <ouroboros/errno.h>
#include "addr_auth.h"
-#include "ae.h"
+#include "connmgr.h"
#include "dir.h"
#include "enroll.h"
#include "fmgr.h"
-#include "frct.h"
#include "ipcp.h"
#include "ribconfig.h"
#include "ribmgr.h"
@@ -45,16 +43,11 @@
#include <stdbool.h>
#include <signal.h>
#include <stdlib.h>
-#include <pthread.h>
#include <string.h>
#include <assert.h>
#include <inttypes.h>
-#define THIS_TYPE IPCP_NORMAL
-
-struct {
- pthread_t acceptor;
-} normal;
+#define THIS_TYPE IPCP_NORMAL
void ipcp_sig_handler(int sig,
siginfo_t * info,
@@ -82,54 +75,6 @@ void ipcp_sig_handler(int sig,
}
}
-static void * flow_acceptor(void * o)
-{
- int fd;
- char * ae_name;
- qosspec_t qs;
-
- (void) o;
-
- while (true) {
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- log_info("Shutting down flow acceptor.");
- return 0;
- }
-
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- fd = flow_accept(&ae_name, &qs);
- if (fd < 0) {
- if (fd != -EIRMD)
- log_warn("Flow accept failed: %d", fd);
- continue;
- }
-
- log_dbg("New flow allocation request for AE %s.", ae_name);
-
- if (strcmp(ae_name, ENROLL_AE) == 0) {
- enroll_handle(fd);
- } else if (strcmp(ae_name, MGMT_AE) == 0) {
- ribmgr_flow_arr(fd, qs);
- } else if (strcmp(ae_name, DT_AE) == 0) {
- fmgr_nm1_flow_arr(fd, qs);
- } else {
- log_dbg("Flow allocation request for unknown AE %s.",
- ae_name);
- if (flow_alloc_resp(fd, -1))
- log_warn("Failed to reply to flow allocation.");
- flow_dealloc(fd);
- }
-
- free(ae_name);
- }
-
- return (void *) 0;
-}
-
/*
* Boots the IPCP off information in the rib.
* Common function after bootstrap or enroll.
@@ -154,7 +99,7 @@ static int boot_components(void)
}
if (rib_add(MEMBERS_PATH, ipcpi.name)) {
- log_warn("Failed to add name to " MEMBERS_PATH);
+ log_err("Failed to add name to " MEMBERS_PATH);
return -1;
}
@@ -163,6 +108,7 @@ static int boot_components(void)
if (rib_read(BOOT_PATH "/addr_auth/type", &pa, sizeof(pa))
!= sizeof(pa)) {
log_err("Failed to read policy for address authority.");
+ connmgr_fini();
return -1;
}
@@ -171,14 +117,14 @@ static int boot_components(void)
return -1;
}
- ipcpi.address = addr_auth_address();
- if (ipcpi.address == 0) {
+ ipcpi.dt_addr = addr_auth_address();
+ if (ipcpi.dt_addr == 0) {
log_err("Failed to get a valid address.");
addr_auth_fini();
return -1;
}
- log_dbg("IPCP got address %" PRIu64 ".", ipcpi.address);
+ log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
log_dbg("Starting ribmgr.");
@@ -214,15 +160,26 @@ static int boot_components(void)
return -1;
}
+ if (enroll_start()) {
+ fmgr_fini();
+ dir_fini();
+ ribmgr_fini();
+ addr_auth_fini();
+ log_err("Failed to start enroll.");
+ return -1;
+ }
+
ipcp_set_state(IPCP_OPERATIONAL);
- if (pthread_create(&normal.acceptor, NULL, flow_acceptor, NULL)) {
+ if (connmgr_start()) {
ipcp_set_state(IPCP_INIT);
+ enroll_stop();
+ frct_fini();
fmgr_fini();
dir_fini();
ribmgr_fini();
addr_auth_fini();
- log_err("Failed to create acceptor thread.");
+ log_err("Failed to start AP connection manager.");
return -1;
}
@@ -231,8 +188,9 @@ static int boot_components(void)
void shutdown_components(void)
{
- pthread_cancel(normal.acceptor);
- pthread_join(normal.acceptor, NULL);
+ connmgr_stop();
+
+ enroll_stop();
frct_fini();
@@ -337,11 +295,6 @@ int normal_rib_init(void)
static int normal_ipcp_bootstrap(struct dif_config * conf)
{
- /* FIXME: get CACEP policies from conf */
- enum pol_cacep pol = SIMPLE_AUTH;
-
- (void) pol;
-
assert(conf);
assert(conf->type == THIS_TYPE);
@@ -389,12 +342,6 @@ static int normal_ipcp_bootstrap(struct dif_config * conf)
rib_write(BOOT_PATH "/rm/gam/type",
&conf->rm_gam_type,
sizeof(conf->rm_gam_type)) ||
- rib_write(BOOT_PATH "/rm/gam/cacep",
- &pol,
- sizeof(pol)) ||
- rib_write(BOOT_PATH "/dt/gam/cacep",
- &pol,
- sizeof(pol)) ||
rib_write(BOOT_PATH "/addr_auth/type",
&conf->addr_auth_type,
sizeof(conf->addr_auth_type))) {
@@ -422,9 +369,9 @@ static struct ipcp_ops normal_ops = {
.ipcp_name_reg = dir_name_reg,
.ipcp_name_unreg = dir_name_unreg,
.ipcp_name_query = dir_name_query,
- .ipcp_flow_alloc = NULL, /* fmgr_np1_alloc, */
- .ipcp_flow_alloc_resp = NULL, /* fmgr_np1_alloc_resp, */
- .ipcp_flow_dealloc = NULL, /* fmgr_np1_dealloc */
+ .ipcp_flow_alloc = fmgr_np1_alloc,
+ .ipcp_flow_alloc_resp = fmgr_np1_alloc_resp,
+ .ipcp_flow_dealloc = fmgr_np1_dealloc
};
int main(int argc,
@@ -471,11 +418,33 @@ int main(int argc,
exit(EXIT_FAILURE);
}
+
+ if (connmgr_init()) {
+ log_err("Failed to initialize connection manager.");
+ ipcp_create_r(getpid(), -1);
+ rib_fini();
+ irm_unbind_api(getpid(), ipcpi.name);
+ ipcp_fini();
+ exit(EXIT_FAILURE);
+ }
+
+ if (enroll_init()) {
+ log_err("Failed to initialize enroll component.");
+ ipcp_create_r(getpid(), -1);
+ connmgr_fini();
+ rib_fini();
+ irm_unbind_api(getpid(), ipcpi.name);
+ ipcp_fini();
+ exit(EXIT_FAILURE);
+ }
+
pthread_sigmask(SIG_BLOCK, &sigset, NULL);
if (ipcp_boot() < 0) {
log_err("Failed to boot IPCP.");
ipcp_create_r(getpid(), -1);
+ enroll_fini();
+ connmgr_fini();
rib_fini();
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
@@ -488,6 +457,8 @@ int main(int argc,
log_err("Failed to notify IRMd we are initialized.");
ipcp_set_state(IPCP_NULL);
ipcp_shutdown();
+ enroll_fini();
+ connmgr_fini();
rib_fini();
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
@@ -501,6 +472,10 @@ int main(int argc,
rib_fini();
+ enroll_fini();
+
+ connmgr_fini();
+
irm_unbind_api(getpid(), ipcpi.name);
ipcp_fini();
diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c
new file mode 100644
index 00000000..1c399145
--- /dev/null
+++ b/src/ipcpd/normal/neighbors.c
@@ -0,0 +1,213 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data transfer neighbors
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "neighbors"
+
+#include <ouroboros/config.h>
+#include <ouroboros/shared.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+
+#include "neighbors.h"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <inttypes.h>
+
+static void notify_listeners(enum nb_event event,
+ struct nb * nb,
+ struct nbs * nbs)
+{
+ struct list_head * p = NULL;
+
+ list_for_each(p, &nbs->notifiers) {
+ struct nb_notifier * e =
+ list_entry(p, struct nb_notifier, next);
+ if (e->notify_call(event, nb->conn))
+ log_err("Listener reported an error.");
+ }
+}
+
+struct nbs * nbs_create(void)
+{
+ struct nbs * nbs;
+
+ nbs = malloc(sizeof(*nbs));
+ if (nbs == NULL)
+ return NULL;
+
+ list_head_init(&nbs->list);
+ list_head_init(&nbs->notifiers);
+
+ if (pthread_mutex_init(&nbs->list_lock, NULL))
+ return NULL;
+
+ if (pthread_mutex_init(&nbs->notifiers_lock, NULL)) {
+ pthread_mutex_destroy(&nbs->list_lock);
+ return NULL;
+ }
+
+ return nbs;
+}
+
+void nbs_destroy(struct nbs * nbs)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each_safe(p, n, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ pthread_mutex_destroy(&nbs->list_lock);
+ pthread_mutex_destroy(&nbs->notifiers_lock);
+}
+
+int nbs_add(struct nbs * nbs,
+ struct conn conn)
+{
+ struct nb * nb;
+
+ assert(nbs);
+
+ nb = malloc(sizeof(*nb));
+ if (nb == NULL)
+ return -ENOMEM;
+
+ nb->conn = conn;
+
+ list_head_init(&nb->next);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_add(&nb->next, &nbs->list);
+
+ notify_listeners(NEIGHBOR_ADDED, nb, nbs);
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ log_info("Added neighbor with address %" PRIu64 " to list.",
+ conn.conn_info.addr);
+
+ return 0;
+}
+
+int nbs_update_qos(struct nbs * nbs,
+ int fd,
+ qosspec_t qs)
+{
+ struct list_head * p = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each(p, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ if (e->conn.flow_info.fd == fd) {
+ e->conn.flow_info.qs = qs;
+
+ notify_listeners(NEIGHBOR_QOS_CHANGE, e, nbs);
+
+ pthread_mutex_unlock(&nbs->list_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ return -1;
+}
+
+int nbs_del(struct nbs * nbs,
+ int fd)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ assert(nbs);
+
+ pthread_mutex_lock(&nbs->list_lock);
+
+ list_for_each_safe(p, n, &nbs->list) {
+ struct nb * e = list_entry(p, struct nb, next);
+ if (e->conn.flow_info.fd == fd) {
+ notify_listeners(NEIGHBOR_REMOVED, e, nbs);
+ list_del(&e->next);
+ free(e);
+ pthread_mutex_unlock(&nbs->list_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->list_lock);
+
+ return -1;
+}
+
+int nbs_reg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify)
+{
+ assert(nbs);
+ assert(notify);
+
+ pthread_mutex_lock(&nbs->notifiers_lock);
+
+ list_head_init(&notify->next);
+ list_add(&notify->next, &nbs->notifiers);
+
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+
+ return 0;
+}
+
+int nbs_unreg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify)
+{
+ struct list_head * p = NULL;
+ struct list_head * n = NULL;
+
+ pthread_mutex_lock(&nbs->notifiers_lock);
+
+ list_for_each_safe(p, n, &nbs->notifiers) {
+ struct nb_notifier * e =
+ list_entry(p, struct nb_notifier, next);
+ if (e == notify) {
+ list_del(&e->next);
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+ return 0;
+ }
+ }
+
+ pthread_mutex_unlock(&nbs->notifiers_lock);
+
+ return -1;
+}
diff --git a/src/ipcpd/normal/neighbors.h b/src/ipcpd/normal/neighbors.h
new file mode 100644
index 00000000..8714a9aa
--- /dev/null
+++ b/src/ipcpd/normal/neighbors.h
@@ -0,0 +1,81 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Data transfer neighbors
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
+#define OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
+
+#include <ouroboros/irm_config.h>
+#include <ouroboros/list.h>
+#include <ouroboros/qos.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/cacep.h>
+
+#include "connmgr.h"
+
+enum nb_event {
+ NEIGHBOR_ADDED,
+ NEIGHBOR_REMOVED,
+ NEIGHBOR_QOS_CHANGE
+};
+
+typedef int (* nb_notify_t)(enum nb_event event,
+ struct conn conn);
+
+struct nb_notifier {
+ struct list_head next;
+ nb_notify_t notify_call;
+};
+
+struct nb {
+ struct list_head next;
+ struct conn conn;
+};
+
+struct nbs {
+ struct list_head notifiers;
+ pthread_mutex_t notifiers_lock;
+
+ struct list_head list;
+ pthread_mutex_t list_lock;
+};
+
+struct nbs * nbs_create(void);
+
+void nbs_destroy(struct nbs * nbs);
+
+int nbs_add(struct nbs * nbs,
+ struct conn conn);
+
+int nbs_update_qos(struct nbs * nbs,
+ int fd,
+ qosspec_t qs);
+
+int nbs_del(struct nbs * nbs,
+ int fd);
+
+int nbs_reg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify);
+
+int nbs_unreg_notifier(struct nbs * nbs,
+ struct nb_notifier * notify);
+
+#endif
diff --git a/src/ipcpd/normal/pff.c b/src/ipcpd/normal/pff.c
index 2f7d554b..8cab7936 100644
--- a/src/ipcpd/normal/pff.c
+++ b/src/ipcpd/normal/pff.c
@@ -3,7 +3,8 @@
*
* PDU Forwarding Function
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -55,15 +56,16 @@ struct pff * pff_create(void)
return tmp;
}
-int pff_destroy(struct pff * instance)
+void pff_destroy(struct pff * instance)
{
assert(instance);
+ pthread_mutex_lock(&instance->lock);
htable_destroy(instance->table);
+ pthread_mutex_unlock(&instance->lock);
+
pthread_mutex_destroy(&instance->lock);
free(instance);
-
- return 0;
}
int pff_add(struct pff * instance, uint64_t addr, int fd)
diff --git a/src/ipcpd/normal/pff.h b/src/ipcpd/normal/pff.h
index b4a1400b..667c341e 100644
--- a/src/ipcpd/normal/pff.h
+++ b/src/ipcpd/normal/pff.h
@@ -3,8 +3,8 @@
*
* PDU Forwarding Function
*
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -25,8 +25,6 @@
#include <stdint.h>
-struct pff;
-
/*
* PFF will take a type in the future,
* to allow different policies.
@@ -34,7 +32,7 @@ struct pff;
*/
struct pff * pff_create(void);
-int pff_destroy(struct pff * instance);
+void pff_destroy(struct pff * instance);
int pff_add(struct pff * instance,
uint64_t addr,
diff --git a/src/ipcpd/normal/pol-addr-auth-ops.h b/src/ipcpd/normal/pol-addr-auth-ops.h
index 25952636..f0f473ef 100644
--- a/src/ipcpd/normal/pol-addr-auth-ops.h
+++ b/src/ipcpd/normal/pol-addr-auth-ops.h
@@ -3,8 +3,8 @@
*
* Address authority policy ops
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/pol-gam-ops.h b/src/ipcpd/normal/pol-gam-ops.h
index 0721136c..cfe9cbc3 100644
--- a/src/ipcpd/normal/pol-gam-ops.h
+++ b/src/ipcpd/normal/pol-gam-ops.h
@@ -3,8 +3,8 @@
*
* Graph adjacency manager policy ops
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -24,21 +24,13 @@
#define OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H
#include <ouroboros/cacep.h>
+#include <ouroboros/qos.h>
struct pol_gam_ops {
- void * (* create)(struct gam * instance);
+ void * (* create)(struct nbs * nbs,
+ struct ae * ae);
void (* destroy)(void * o);
-
- int (* start)(void * o);
-
- int (* stop)(void * o);
-
- int (* accept_new_flow)(void * o);
-
- int (* accept_flow)(void * o,
- qosspec_t qs,
- const struct cacep_info * info);
};
#endif /* OUROBOROS_IPCPD_NORMAL_POL_GAM_OPS_H */
diff --git a/src/ipcpd/normal/pol/complete.c b/src/ipcpd/normal/pol/complete.c
index 5faa1ae8..1f3f6031 100644
--- a/src/ipcpd/normal/pol/complete.c
+++ b/src/ipcpd/normal/pol/complete.c
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Sets up a complete graph
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -20,35 +20,54 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "complete-graph-adjacency-manager"
+#define OUROBOROS_PREFIX "complete"
#include <ouroboros/config.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/list.h>
-#include <ouroboros/qos.h>
+#include <ouroboros/shared.h>
#include <ouroboros/rib.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/cacep.h>
-#include "ipcp.h"
-#include "gam.h"
+#include "neighbors.h"
+#include "frct.h"
#include "ribconfig.h"
+#include "ipcp.h"
+#include "ae.h"
#include <string.h>
#include <stdlib.h>
#include <assert.h>
-struct neighbor {
- struct list_head next;
- char * neighbor;
+struct complete {
+ struct nbs * nbs;
+ struct ae * ae;
+ pthread_t allocator;
+ pthread_t listener;
};
-struct complete {
- struct list_head neighbors;
- pthread_mutex_t neighbors_lock;
+static void * listener(void * o)
+{
+ struct complete * complete;
+ struct conn conn;
- pthread_t allocator;
+ complete = (struct complete *) o;
- struct gam * gam;
-};
+ while (true) {
+ if (connmgr_wait(complete->ae, &conn)) {
+ log_err("Error while getting next connection.");
+ continue;
+ }
+
+ if (nbs_add(complete->nbs, conn)) {
+ log_err("Failed to add neighbor.");
+ continue;
+ }
+ }
+
+ return (void *) 0;
+}
static void * allocator(void * o)
{
@@ -56,19 +75,37 @@ static void * allocator(void * o)
ssize_t len;
char ** children;
ssize_t i;
- struct complete * complete = (struct complete *) o;
+ struct complete * complete;
+ struct conn conn;
- assert(complete);
- assert(complete->gam);
+ complete = (struct complete *) o;
qs.delay = 0;
qs.jitter = 0;
+ /* FIXME: implement QoS specs */
+ qs.cube = QOS_CUBE_BE;
+
/* FIXME: subscribe to members to keep the graph complete. */
len = rib_children("/" MEMBERS_NAME, &children);
for (i = 0; i < len; ++i) {
- if (strcmp(children[i], ipcpi.name) < 0)
- gam_flow_alloc(complete->gam, children[i], qs);
+ if (strcmp(children[i], ipcpi.name) < 0) {
+ if (connmgr_alloc(complete->ae,
+ children[i],
+ &qs,
+ &conn)) {
+ log_warn("Failed to get a conn to neighbor.");
+ free(children[i]);
+ continue;
+ }
+
+ if (nbs_add(complete->nbs, conn)) {
+ log_err("Failed to add neighbor.");
+ free(children[i]);
+ continue;
+ }
+
+ }
free(children[i]);
}
@@ -78,123 +115,41 @@ static void * allocator(void * o)
return (void *) 0;
}
-void * complete_create(struct gam * gam)
+void * complete_create(struct nbs * nbs,
+ struct ae * ae)
{
struct complete * complete;
- assert(gam);
-
complete = malloc(sizeof(*complete));
if (complete == NULL)
return NULL;
- list_head_init(&complete->neighbors);
- complete->gam = gam;
-
- if (pthread_mutex_init(&complete->neighbors_lock, NULL)) {
- free(complete);
- return NULL;
- }
-
- return (void *) complete;
-}
-
-int complete_start(void * o)
-{
- struct complete * complete = (struct complete *) o;
-
- assert(complete);
- assert(complete->gam);
+ complete->nbs = nbs;
+ complete->ae = ae;
if (pthread_create(&complete->allocator, NULL,
- allocator, (void *) complete)) {
- pthread_mutex_destroy(&complete->neighbors_lock);
- free(complete);
- return -1;
- }
+ allocator, (void *) complete))
+ return NULL;
- /* FIXME: Handle flooding of the flow allocator before detaching.*/
- pthread_join(complete->allocator, NULL);
+ if (pthread_create(&complete->listener, NULL,
+ listener, (void *) complete))
+ return NULL;
- return 0;
+ return complete;
}
-int complete_stop(void * o)
+void complete_destroy(void * ops_o)
{
- (void) o;
+ struct complete * complete;
- return 0;
-}
+ assert(ops_o);
-void complete_destroy(void * o)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
- struct complete * complete = (struct complete *) o;
-
- list_for_each_safe(p, n, &complete->neighbors) {
- struct neighbor * e = list_entry(p, struct neighbor, next);
- list_del(&e->next);
- free(e->neighbor);
- free(e);
- }
+ complete = (struct complete *) ops_o;
- pthread_mutex_destroy(&complete->neighbors_lock);
+ pthread_cancel(complete->allocator);
+ pthread_cancel(complete->listener);
+ pthread_join(complete->allocator, NULL);
+ pthread_join(complete->listener, NULL);
free(complete);
}
-
-int complete_accept_new_flow(void * o)
-{
- (void) o;
-
- return 0;
-}
-
-int complete_accept_flow(void * o,
- qosspec_t qs,
- const struct cacep_info * info)
-{
- struct list_head * pos = NULL;
- struct neighbor * n;
- struct complete * complete = (struct complete *) o;
-
- (void) qs;
-
- assert(complete);
-
- pthread_mutex_lock(&complete->neighbors_lock);
-
- list_for_each(pos, &complete->neighbors) {
- struct neighbor * e = list_entry(pos, struct neighbor, next);
- if (strcmp(e->neighbor, info->name) == 0) {
- pthread_mutex_unlock(&complete->neighbors_lock);
- return -1;
- }
-
- assert(complete);
- assert(&complete->neighbors_lock);
- assert(pos->nxt);
- }
-
- n = malloc(sizeof(*n));
- if (n == NULL) {
- pthread_mutex_unlock(&complete->neighbors_lock);
- return -1;
- }
-
- list_head_init(&n->next);
-
- n->neighbor = strdup(info->name);
- if (n->neighbor == NULL) {
- pthread_mutex_unlock(&complete->neighbors_lock);
- free(n);
- return -1;
- }
-
- list_add(&n->next, &complete->neighbors);
-
- pthread_mutex_unlock(&complete->neighbors_lock);
-
- return 0;
-}
diff --git a/src/ipcpd/normal/pol/complete.h b/src/ipcpd/normal/pol/complete.h
index 3f08c2e5..46a535c2 100644
--- a/src/ipcpd/normal/pol/complete.h
+++ b/src/ipcpd/normal/pol/complete.h
@@ -1,10 +1,10 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Graph adjacency manager for IPC Process components
+ * Sets up a complete graph
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -23,30 +23,19 @@
#ifndef OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H
#define OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H
-#include "gam.h"
-#include "pol-gam-ops.h"
-
-void * complete_create(struct gam * instance);
-
-void complete_destroy(void * o);
+#include <ouroboros/irm_config.h>
+#include <ouroboros/qos.h>
-int complete_start(void * o);
-
-int complete_stop(void * o);
+#include "pol-gam-ops.h"
-int complete_accept_new_flow(void * o);
+void * complete_create(struct nbs * nbs,
+ struct ae * ae);
-int complete_accept_flow(void * o,
- qosspec_t qs,
- const struct cacep_info * info);
+void complete_destroy(void * ops_o);
struct pol_gam_ops complete_ops = {
- .create = complete_create,
- .destroy = complete_destroy,
- .start = complete_start,
- .stop = complete_stop,
- .accept_new_flow = complete_accept_new_flow,
- .accept_flow = complete_accept_flow
+ .create = complete_create,
+ .destroy = complete_destroy
};
#endif /* OUROBOROS_IPCPD_NORMAL_POL_COMPLETE_H */
diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c
index aa0f6c7c..e709da7c 100644
--- a/src/ipcpd/normal/pol/flat.c
+++ b/src/ipcpd/normal/pol/flat.c
@@ -3,8 +3,8 @@
*
* Policy for flat addresses in a distributed way
*
- * Sander Vrijders <[email protected]>
- * Dimitri Staessens <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/pol/flat.h b/src/ipcpd/normal/pol/flat.h
index 85fe9281..d45a89cd 100644
--- a/src/ipcpd/normal/pol/flat.h
+++ b/src/ipcpd/normal/pol/flat.h
@@ -3,7 +3,8 @@
*
* Policy for flat addresses in a distributed way
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h
index 15b65ce2..5ecdaab3 100644
--- a/src/ipcpd/normal/ribconfig.h
+++ b/src/ipcpd/normal/ribconfig.h
@@ -31,9 +31,11 @@
#define MEMBERS_NAME "members"
#define DIF_NAME "dif_name"
#define DIR_NAME "directory"
+#define ROUTING_NAME "fsdb"
#define DIF_PATH DLR DIF_NAME
#define DIR_PATH DLR DIR_NAME
#define BOOT_PATH DLR BOOT_NAME
#define MEMBERS_PATH DLR MEMBERS_NAME
+#define ROUTING_PATH DLR ROUTING_NAME
#endif /* OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H */
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index f254bd50..ec465c6b 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -3,8 +3,8 @@
*
* RIB manager of the IPC Process
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -45,35 +45,70 @@
#include <assert.h>
struct {
- flow_set_t * fs;
- fqueue_t * fq;
- struct gam * gam;
+ flow_set_t * fs;
+ fqueue_t * fq;
+
+ struct gam * gam;
+ struct nbs * nbs;
+ struct ae * ae;
+
+ struct nb_notifier nb_notifier;
} ribmgr;
+static int ribmgr_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ /* We are only interested in neighbors being added and removed. */
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ flow_set_add(ribmgr.fs, conn.flow_info.fd);
+ break;
+ case NEIGHBOR_REMOVED:
+ flow_set_del(ribmgr.fs, conn.flow_info.fd);
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
int ribmgr_init(void)
{
- enum pol_cacep pc;
- enum pol_gam pg;
+ enum pol_gam pg;
+ struct conn_info info;
- if (rib_read(BOOT_PATH "/rm/gam/type", &pg, sizeof(pg))
- != sizeof(pg)) {
- log_err("Failed to read policy for ribmgr gam.");
+ strcpy(info.ae_name, MGMT_AE);
+ strcpy(info.protocol, CDAP_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+
+ ribmgr.nbs = nbs_create();
+ if (ribmgr.nbs == NULL) {
+ log_err("Failed to create neighbors.");
return -1;
}
- if (rib_read(BOOT_PATH "/rm/gam/cacep", &pc, sizeof(pc))
- != sizeof(pc)) {
- log_err("Failed to read CACEP policy for ribmgr gam.");
+ ribmgr.ae = connmgr_ae_create(info);
+ if (ribmgr.ae == NULL) {
+ log_err("Failed to create AE struct.");
+ nbs_destroy(ribmgr.nbs);
return -1;
}
- /* FIXME: Implement cacep policies */
- (void) pc;
+ if (rib_read(BOOT_PATH "/rm/gam/type", &pg, sizeof(pg))
+ != sizeof(pg)) {
+ log_err("Failed to read policy for ribmgr gam.");
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
+ return -1;
+ }
- ribmgr.gam = gam_create(pg, MGMT_AE);
+ ribmgr.gam = gam_create(pg, ribmgr.nbs, ribmgr.ae);
if (ribmgr.gam == NULL) {
log_err("Failed to create gam.");
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -81,6 +116,8 @@ int ribmgr_init(void)
if (ribmgr.fs == NULL) {
log_err("Failed to create flow set.");
gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -89,6 +126,19 @@ int ribmgr_init(void)
log_err("Failed to create fq.");
flow_set_destroy(ribmgr.fs);
gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
+ return -1;
+ }
+
+ ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event;
+ if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) {
+ log_err("Failed to register notifier.");
+ fqueue_destroy(ribmgr.fq);
+ flow_set_destroy(ribmgr.fs);
+ gam_destroy(ribmgr.gam);
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
return -1;
}
@@ -97,20 +147,12 @@ int ribmgr_init(void)
void ribmgr_fini(void)
{
+ nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
flow_set_destroy(ribmgr.fs);
fqueue_destroy(ribmgr.fq);
gam_destroy(ribmgr.gam);
-}
-
-int ribmgr_flow_arr(int fd,
- qosspec_t qs)
-{
- assert(ribmgr.gam);
-
- if (gam_flow_arr(ribmgr.gam, fd, qs))
- return -1;
-
- return 0;
+ connmgr_ae_destroy(ribmgr.ae);
+ nbs_destroy(ribmgr.nbs);
}
int ribmgr_disseminate(char * path,
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
index 12f407ab..8922688a 100644
--- a/src/ipcpd/normal/ribmgr.h
+++ b/src/ipcpd/normal/ribmgr.h
@@ -3,7 +3,8 @@
*
* RIB manager of the IPC Process
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
@@ -41,9 +42,6 @@ int ribmgr_init(void);
void ribmgr_fini(void);
-int ribmgr_flow_arr(int fd,
- qosspec_t qs);
-
int ribmgr_disseminate(char * path,
enum diss_target target,
enum diss_freq freq,
diff --git a/src/ipcpd/normal/routing.c b/src/ipcpd/normal/routing.c
new file mode 100644
index 00000000..745d1812
--- /dev/null
+++ b/src/ipcpd/normal/routing.c
@@ -0,0 +1,298 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Routing component of the IPCP
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "routing"
+
+#include <ouroboros/config.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/list.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/rib.h>
+#include <ouroboros/rqueue.h>
+
+#include "routing.h"
+#include "ribmgr.h"
+#include "ribconfig.h"
+#include "ipcp.h"
+#include "graph.h"
+
+#include <assert.h>
+#include <stdlib.h>
+#include <inttypes.h>
+#include <string.h>
+
+#include "fso.pb-c.h"
+typedef Fso fso_t;
+
+#define BUF_SIZE 256
+
+struct routing_table_entry {
+ struct list_head next;
+ uint64_t dst;
+ uint64_t nhop;
+};
+
+struct routing_i {
+ struct pff * pff;
+};
+
+struct {
+ struct nbs * nbs;
+ struct nb_notifier nb_notifier;
+
+ struct graph * graph;
+
+ ro_set_t * set;
+ rqueue_t * queue;
+ pthread_t rib_listener;
+} routing;
+
+struct routing_i * routing_i_create(struct pff * pff)
+{
+ struct routing_i * tmp;
+
+ assert(pff);
+
+ tmp = malloc(sizeof(*tmp));
+ if (tmp == NULL)
+ return NULL;
+
+ tmp->pff = pff;
+
+ return tmp;
+}
+
+void routing_i_destroy(struct routing_i * instance)
+{
+ assert(instance);
+
+ free(instance);
+}
+
+static int routing_neighbor_event(enum nb_event event,
+ struct conn conn)
+{
+ char path[RIB_MAX_PATH_LEN + 1];
+ char fso_name[RIB_MAX_PATH_LEN + 1];
+ fso_t fso = FSO__INIT;
+ size_t len;
+ uint8_t * data;
+
+ sprintf(fso_name, "%" PRIx64 "-%" PRIx64,
+ ipcpi.dt_addr, conn.conn_info.addr);
+ rib_path_append(rib_path_append(path, ROUTING_PATH), fso_name);
+
+ switch (event) {
+ case NEIGHBOR_ADDED:
+ fso.s_addr = ipcpi.dt_addr;
+ fso.d_addr = conn.conn_info.addr;
+
+ len = fso__get_packed_size(&fso);
+ if (len == 0)
+ return -1;
+
+ data = malloc(len);
+ if (data == NULL)
+ return -1;
+
+ fso__pack(&fso, data);
+
+ if (rib_add(ROUTING_PATH, fso_name)) {
+ log_err("Failed to add FSO.");
+ free(data);
+ return -1;
+ }
+
+ if (rib_put(path, data, len)) {
+ log_err("Failed to put FSO in RIB.");
+ rib_del(path);
+ free(data);
+ return -1;
+ }
+
+ break;
+ case NEIGHBOR_REMOVED:
+ if (rib_del(path)) {
+ log_err("Failed to remove FSO.");
+ return -1;
+ }
+
+ break;
+ case NEIGHBOR_QOS_CHANGE:
+ log_info("Not currently supported.");
+ break;
+ default:
+ log_info("Unsupported event for routing.");
+ break;
+ }
+
+ return 0;
+}
+
+static int read_fso(char * path,
+ int32_t flag)
+{
+ ssize_t len;
+ uint8_t ro[BUF_SIZE];
+ fso_t * fso;
+ qosspec_t qs;
+
+ len = rib_read(path, ro, BUF_SIZE);
+ if (len < 0) {
+ log_err("Failed to read FSO.");
+ return -1;
+ }
+
+ fso = fso__unpack(NULL, len, ro);
+ if (fso == NULL) {
+ log_err("Failed to unpack.");
+ return -1;
+ }
+
+ if (flag & RO_CREATE) {
+ if (graph_add_edge(routing.graph,
+ fso->s_addr, fso->d_addr, qs)) {
+ log_err("Failed to add edge to graph.");
+ fso__free_unpacked(fso, NULL);
+ return -1;
+ }
+ } else if (flag & RO_MODIFY) {
+ if (graph_update_edge(routing.graph,
+ fso->s_addr, fso->d_addr, qs)) {
+ log_err("Failed to update edge of graph.");
+ fso__free_unpacked(fso, NULL);
+ return -1;
+ }
+ } else if (flag & RO_DELETE) {
+ if (graph_del_edge(routing.graph, fso->s_addr, fso->d_addr)) {
+ log_err("Failed to del edge of graph.");
+ fso__free_unpacked(fso, NULL);
+ return -1;
+ }
+ }
+
+ fso__free_unpacked(fso, NULL);
+
+ return 0;
+}
+
+static void * rib_listener(void * o)
+{
+ int32_t flag;
+ char path[RIB_MAX_PATH_LEN + 1];
+ char ** children;
+ ssize_t len;
+ int i;
+
+ (void) o;
+
+ if (ro_set_add(routing.set, ROUTING_PATH,
+ RO_MODIFY | RO_CREATE | RO_DELETE)) {
+ log_err("Failed to add to RO set");
+ return (void * ) -1;
+ }
+
+ len = rib_children(ROUTING_PATH, &children);
+ if (len < 0) {
+ log_err("Failed to retrieve children.");
+ return (void *) -1;
+ }
+
+ for (i = 0; i < len; i++) {
+ if (read_fso(children[i], RO_CREATE)) {
+ log_err("Failed to parse FSO.");
+ continue;
+ }
+ }
+
+ while (rib_event_wait(routing.set, routing.queue, NULL)) {
+ flag = rqueue_next(routing.queue, path);
+ if (flag < 0)
+ continue;
+
+ if (read_fso(children[i], flag)) {
+ log_err("Failed to parse FSO.");
+ continue;
+ }
+ }
+
+ return (void *) 0;
+}
+
+int routing_init(struct nbs * nbs)
+{
+ routing.graph = graph_create();
+ if (routing.graph == NULL)
+ return -1;
+
+ if (rib_add(RIB_ROOT, ROUTING_NAME)) {
+ graph_destroy(routing.graph);
+ return -1;
+ }
+
+ routing.nbs = nbs;
+
+ routing.nb_notifier.notify_call = routing_neighbor_event;
+ if (nbs_reg_notifier(routing.nbs, &routing.nb_notifier)) {
+ graph_destroy(routing.graph);
+ rib_del(ROUTING_PATH);
+ return -1;
+ }
+
+ routing.set = ro_set_create();
+ if (routing.set == NULL) {
+ nbs_unreg_notifier(routing.nbs, &routing.nb_notifier);
+ graph_destroy(routing.graph);
+ rib_del(ROUTING_PATH);
+ return -1;
+ }
+
+ routing.queue = rqueue_create();
+ if (routing.queue == NULL) {
+ ro_set_destroy(routing.set);
+ nbs_unreg_notifier(routing.nbs, &routing.nb_notifier);
+ graph_destroy(routing.graph);
+ rib_del(ROUTING_PATH);
+ return -1;
+ }
+
+ pthread_create(&routing.rib_listener, NULL, rib_listener, NULL);
+
+ return 0;
+}
+
+void routing_fini(void)
+{
+ pthread_cancel(routing.rib_listener);
+
+ pthread_join(routing.rib_listener, NULL);
+
+ rqueue_destroy(routing.queue);
+
+ ro_set_destroy(routing.set);
+
+ graph_destroy(routing.graph);
+
+ rib_del(ROUTING_PATH);
+
+ nbs_unreg_notifier(routing.nbs, &routing.nb_notifier);
+}
diff --git a/src/ipcpd/normal/routing.h b/src/ipcpd/normal/routing.h
new file mode 100644
index 00000000..0794ef28
--- /dev/null
+++ b/src/ipcpd/normal/routing.h
@@ -0,0 +1,45 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Routing component of the IPCP
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_ROUTING_H
+#define OUROBOROS_IPCPD_NORMAL_ROUTING_H
+
+#include <ouroboros/qos.h>
+
+#include "pff.h"
+#include "neighbors.h"
+
+#include <stdint.h>
+
+/*
+ * Routing will take a type in the future,
+ * to allow different policies.
+ */
+int routing_init(struct nbs * nbs);
+
+void routing_fini(void);
+
+struct routing_i * routing_i_create(struct pff * pff);
+
+void routing_i_destroy(struct routing_i * instance);
+
+#endif /* OUROBOROS_IPCPD_NORMAL_ROUTING_H */
diff --git a/src/ipcpd/normal/shm_pci.c b/src/ipcpd/normal/shm_pci.c
index 0807a24f..1170adff 100644
--- a/src/ipcpd/normal/shm_pci.c
+++ b/src/ipcpd/normal/shm_pci.c
@@ -3,8 +3,8 @@
*
* Protocol Control Information in Shared Memory Map
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
diff --git a/src/ipcpd/normal/shm_pci.h b/src/ipcpd/normal/shm_pci.h
index 17ce5cdd..0c54c883 100644
--- a/src/ipcpd/normal/shm_pci.h
+++ b/src/ipcpd/normal/shm_pci.h
@@ -3,8 +3,8 @@
*
* Protocol Control Information in Shared Memory Map
*
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as