summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-09-09 13:50:47 +0200
committerDimitri Staessens <[email protected]>2017-09-12 08:33:26 -0600
commit45c6615484ffe347654c34decb72ff1ef9bde0f3 (patch)
treef912e0eef256371f61b87a5a78e7604d9b623194
parent7c69c0f6b25a199bb3632eea66ccb7de1db06ccc (diff)
downloadouroboros-45c6615484ffe347654c34decb72ff1ef9bde0f3.tar.gz
ouroboros-45c6615484ffe347654c34decb72ff1ef9bde0f3.zip
ipcpd: Revise internals of normal IPCP
This removes the RIB as a datastructure and CDAP as the protocol between IPCPs. CDAP, the rib and related sources are deprecated. The link-state protocol policy is udpated to use its own protocol based on a simple broadcast strategy along a tree. The neighbors struct is deprecated and moved to the library as a generic notifier component.
-rw-r--r--include/ouroboros/CMakeLists.txt2
-rw-r--r--include/ouroboros/cdap.h89
-rw-r--r--include/ouroboros/notifier.h (renamed from src/lib/ro.proto)27
-rw-r--r--include/ouroboros/nsm.h50
-rw-r--r--include/ouroboros/rib.h77
-rw-r--r--include/ouroboros/rqueue.h71
-rw-r--r--include/ouroboros/wrap/ouroboros.i4
-rw-r--r--src/ipcpd/normal/CMakeLists.txt6
-rw-r--r--src/ipcpd/normal/connmgr.c34
-rw-r--r--src/ipcpd/normal/connmgr.h11
-rw-r--r--src/ipcpd/normal/dht.c14
-rw-r--r--src/ipcpd/normal/dir.c2
-rw-r--r--src/ipcpd/normal/dt.c94
-rw-r--r--src/ipcpd/normal/dt_pci.c2
-rw-r--r--src/ipcpd/normal/enroll.c6
-rw-r--r--src/ipcpd/normal/fa.c12
-rw-r--r--src/ipcpd/normal/main.c40
-rw-r--r--src/ipcpd/normal/neighbors.c239
-rw-r--r--src/ipcpd/normal/neighbors.h84
-rw-r--r--src/ipcpd/normal/pol-routing-ops.h2
-rw-r--r--src/ipcpd/normal/pol/flat.c2
-rw-r--r--src/ipcpd/normal/pol/link_state.c620
-rw-r--r--src/ipcpd/normal/pol/link_state.h12
-rw-r--r--src/ipcpd/normal/pol/link_state.proto (renamed from src/ipcpd/normal/pol/fso.proto)8
-rw-r--r--src/ipcpd/normal/ribconfig.h33
-rw-r--r--src/ipcpd/normal/ribmgr.c423
-rw-r--r--src/ipcpd/normal/ribmgr.h54
-rw-r--r--src/ipcpd/normal/routing.c13
-rw-r--r--src/ipcpd/normal/routing.h4
-rw-r--r--src/ipcpd/normal/sdu_sched.c13
-rw-r--r--src/ipcpd/normal/sdu_sched.h8
-rw-r--r--src/lib/CMakeLists.txt11
-rw-r--r--src/lib/cdap.c868
-rw-r--r--src/lib/cdap.proto32
-rw-r--r--src/lib/cdap_req.c208
-rw-r--r--src/lib/cdap_req.h74
-rw-r--r--src/lib/hashtable.c3
-rw-r--r--src/lib/notifier.c128
-rw-r--r--src/lib/rib.c1431
-rw-r--r--src/lib/tests/CMakeLists.txt1
-rw-r--r--src/lib/tests/rib_test.c289
41 files changed, 719 insertions, 4382 deletions
diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt
index b6edac53..e94d5c6c 100644
--- a/include/ouroboros/CMakeLists.txt
+++ b/include/ouroboros/CMakeLists.txt
@@ -1,6 +1,5 @@
set(HEADER_FILES
cacep.h
- cdap.h
cdefs.h
dev.h
errno.h
@@ -8,7 +7,6 @@ set(HEADER_FILES
fqueue.h
ipcp.h
irm.h
- nsm.h
proto.h
qos.h)
diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h
deleted file mode 100644
index 46ebca4b..00000000
--- a/include/ouroboros/cdap.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * The Common Distributed Application Protocol
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_CDAP_H
-#define OUROBOROS_CDAP_H
-
-#include <ouroboros/cdefs.h>
-
-#include <stdbool.h>
-#include <stdint.h>
-#include <unistd.h>
-
-#define F_SYNC 0x0001
-
-#define INVALID_CDAP_KEY -1
-#define CDAP_PROTO "CDAP"
-
-enum cdap_opcode {
- CDAP_READ = 0,
- CDAP_WRITE,
- CDAP_START,
- CDAP_STOP,
- CDAP_CREATE,
- CDAP_DELETE
-};
-
-struct cdap;
-
-typedef int32_t cdap_key_t;
-
-__BEGIN_DECLS
-
-struct cdap * cdap_create(void);
-
-int cdap_destroy(struct cdap * instance);
-
-int cdap_add_flow(struct cdap * instance,
- int fd);
-
-int cdap_del_flow(struct cdap * instance,
- int fd);
-
-cdap_key_t * cdap_request_send(struct cdap * instance,
- enum cdap_opcode code,
- const char * name,
- const void * data,
- size_t len,
- uint32_t flags);
-
-int cdap_reply_wait(struct cdap * instance,
- cdap_key_t key,
- uint8_t ** data,
- size_t * len);
-
-cdap_key_t cdap_request_wait(struct cdap * instance,
- enum cdap_opcode * opcode,
- char ** name,
- uint8_t ** data,
- size_t * len,
- uint32_t * flags);
-
-int cdap_reply_send(struct cdap * instance,
- cdap_key_t key,
- int result,
- const void * data,
- size_t len);
-
-__END_DECLS
-
-#endif /* OUROBOROS_CDAP_H */
diff --git a/src/lib/ro.proto b/include/ouroboros/notifier.h
index 8c547f14..7a70f95f 100644
--- a/src/lib/ro.proto
+++ b/include/ouroboros/notifier.h
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * RIB object message
+ * Notifier event system using callbacks
*
* Dimitri Staessens <[email protected]>
* Sander Vrijders <[email protected]>
@@ -20,12 +20,21 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-syntax = "proto2";
+#ifndef OUROBOROS_LIB_NOTIFIER_H
+#define OUROBOROS_LIB_NOTIFIER_H
-message ro_msg {
- required string name = 1;
- optional string parent = 2;
- optional bytes data = 3;
- optional bytes hash = 4;
- repeated ro_msg children = 5;
-} \ No newline at end of file
+typedef void (* notifier_fn_t)(int event,
+ const void * o);
+
+int notifier_init(void);
+
+void notifier_fini(void);
+
+void notifier_event(int event,
+ const void * o);
+
+int notifier_reg(notifier_fn_t callback);
+
+void notifier_unreg(notifier_fn_t callback);
+
+#endif /* OUROBOROS_LIB_NOTIFIER_H */
diff --git a/include/ouroboros/nsm.h b/include/ouroboros/nsm.h
deleted file mode 100644
index d89a2ee4..00000000
--- a/include/ouroboros/nsm.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * The API to instruct the global Namespace Manager
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_NSM_H
-#define OUROBOROS_NSM_H
-
-#include <ouroboros/cdefs.h>
-
-#include <stdint.h>
-#include <unistd.h>
-
-__BEGIN_DECLS
-
-int nsm_reg(char * name,
- char ** dafs,
- size_t dafs_size);
-
-int nsm_unreg(char * name,
- char ** dafs,
- size_t dafs_size);
-
-/*
- * dafs is an out parameter
- * The amount of DAFs is returned
- */
-ssize_t nsm_resolve(char * name,
- char ** dafs);
-
-__END_DECLS
-
-#endif
diff --git a/include/ouroboros/rib.h b/include/ouroboros/rib.h
deleted file mode 100644
index 281a4f20..00000000
--- a/include/ouroboros/rib.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Resource Information Base
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_LIB_RIB_H
-#define OUROBOROS_LIB_RIB_H
-
-#include <sys/types.h>
-#include <stdint.h>
-#include <stdbool.h>
-
-#define RIB_ROOT ""
-
-#define PACK_HASH_ROOT 0x0001
-#define PACK_HASH_ALL 0x0002
-
-#define UNPACK_CREATE 0x0001
-
-int rib_init(void);
-
-void rib_fini(void);
-
-int rib_add(const char * parent,
- const char * name);
-
-int rib_del(char * path);
-
-ssize_t rib_read(const char * path,
- void * data,
- size_t len);
-
-int rib_write(const char * path,
- const void * data,
- size_t len);
-
-int rib_put(const char * path,
- void * data,
- size_t len);
-
-bool rib_has(const char * path);
-
-ssize_t rib_children(const char * path,
- char *** children);
-
-char * rib_path_append(char * path,
- const char * name);
-
-char * rib_name_gen(void * data,
- size_t len);
-
-ssize_t rib_pack(const char * path,
- uint8_t ** buf,
- uint32_t flags);
-
-int rib_unpack(uint8_t * packed,
- size_t len,
- uint32_t flags);
-
-#endif /* OUROBOROS_LIB_RIB_H */
diff --git a/include/ouroboros/rqueue.h b/include/ouroboros/rqueue.h
deleted file mode 100644
index 601a4ab6..00000000
--- a/include/ouroboros/rqueue.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * RIB event queues
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_RQUEUE_H
-#define OUROBOROS_RQUEUE_H
-
-#include <stdbool.h>
-#include <stdint.h>
-#include <time.h>
-
-#define RO_READ 0x00000001
-#define RO_MODIFY 0x00000002
-#define RO_CREATE 0x00000004
-#define RO_DELETE 0x00000008
-#define RO_START 0x00000010
-#define RO_STOP 0x00000020
-
-#define RO_NO_OPS 0x00000000
-#define RO_ALL_OPS 0xFFFFFFFF
-
-struct ro_set;
-
-struct rqueue;
-
-typedef struct ro_set ro_set_t;
-typedef struct rqueue rqueue_t;
-
-ro_set_t * ro_set_create(void);
-
-void ro_set_destroy(ro_set_t * set);
-
-rqueue_t * rqueue_create(void);
-
-int rqueue_destroy(struct rqueue * rq);
-
-int ro_set_zero(ro_set_t * set);
-
-int ro_set_add(ro_set_t * set,
- const char * path,
- int32_t flags);
-
-int ro_set_del(ro_set_t * set,
- const char * path);
-
-int32_t rqueue_next(rqueue_t * rq,
- char * path);
-
-int rib_event_wait(ro_set_t * set,
- rqueue_t * rq,
- const struct timespec * timeout);
-
-#endif /* OUROBOROS_RQUEUE_H */
diff --git a/include/ouroboros/wrap/ouroboros.i b/include/ouroboros/wrap/ouroboros.i
index ebda2453..db5e09f2 100644
--- a/include/ouroboros/wrap/ouroboros.i
+++ b/include/ouroboros/wrap/ouroboros.i
@@ -23,14 +23,12 @@
%{
#include "ouroboros/cdefs.h"
#include "ouroboros/cacep.h"
-#include "ouroboros/cdap.h"
#include "ouroboros/dev.h"
#include "ouroboros/errno.h"
#include "ouroboros/fccntl.h"
#include "ouroboros/fqueue.h"
#include "ouroboros/irm.h"
#include "ouroboros/ipcp.h"
-#include "ouroboros/nsm.h"
#include "ouroboros/qos.h"
%}
@@ -38,12 +36,10 @@ typedef int pid_t;
%include "ouroboros/cdefs.h"
%include "ouroboros/cacep.h"
-%include "ouroboros/cdap.h"
%include "ouroboros/dev.h"
%include "ouroboros/errno.h"
%include "ouroboros/fccntl.h"
%include "ouroboros/fqueue.h"
%include "ouroboros/irm.h"
%include "ouroboros/ipcp.h"
-%include "ouroboros/nsm.h"
%include "ouroboros/qos.h"
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index aebc6c35..e5fc33da 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -20,7 +20,7 @@ protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS enroll.proto
protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)
# Add GPB sources of policies last
-protobuf_generate_c(FSO_SRCS FSO_HDRS pol/fso.proto)
+protobuf_generate_c(LS_PROTO_SRCS LS_PROTO_HDRS pol/link_state.proto)
math(EXPR PFT_EXPR "1 << 12")
set(PFT_SIZE ${PFT_EXPR} CACHE STRING
@@ -37,9 +37,7 @@ set(SOURCE_FILES
enroll.c
fa.c
main.c
- neighbors.c
pff.c
- ribmgr.c
routing.c
sdu_sched.c
# Add policies last
@@ -49,7 +47,7 @@ set(SOURCE_FILES
)
add_executable(ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${FLOW_ALLOC_SRCS} ${FSO_SRCS} ${KAD_PROTO_SRCS} ${ENROLL_PROTO_SRCS})
+ ${FLOW_ALLOC_SRCS} ${LS_PROTO_SRCS} ${KAD_PROTO_SRCS} ${ENROLL_PROTO_SRCS})
target_link_libraries(ipcpd-normal LINK_PUBLIC ouroboros)
include(AddCompileFlags)
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index 9feac0f6..8d3da709 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -26,16 +26,15 @@
#include <ouroboros/dev.h>
#include <ouroboros/cacep.h>
-#include <ouroboros/cdap.h>
#include <ouroboros/errno.h>
#include <ouroboros/list.h>
#include <ouroboros/logs.h>
+#include <ouroboros/notifier.h>
#include "ae.h"
#include "connmgr.h"
#include "enroll.h"
#include "ipcp.h"
-#include "ribmgr.h"
#include <pthread.h>
#include <string.h>
@@ -198,8 +197,7 @@ void connmgr_stop(void)
}
int connmgr_ae_init(enum ae_id id,
- const struct conn_info * info,
- struct nbs * nbs)
+ const struct conn_info * info)
{
struct ae * ae;
@@ -220,8 +218,6 @@ int connmgr_ae_init(enum ae_id id,
memcpy(&connmgr.aes[id].info, info, sizeof(connmgr.aes[id].info));
- connmgr.aes[id].nbs = nbs;
-
return 0;
}
@@ -258,8 +254,6 @@ void connmgr_ae_fini(enum ae_id id)
pthread_mutex_destroy(&ae->lock);
memset(&connmgr.aes[id].info, 0, sizeof(connmgr.aes[id].info));
-
- connmgr.aes[id].nbs = NULL;
}
int connmgr_ipcp_connect(const char * dst,
@@ -394,8 +388,16 @@ int connmgr_alloc(enum ae_id id,
return -1;
}
- if (connmgr.aes[id].nbs != NULL)
- nbs_add(connmgr.aes[id].nbs, *conn);
+ switch (id) {
+ case AEID_DT:
+ notifier_event(NOTIFY_DT_CONN_ADD, conn);
+ break;
+ case AEID_MGMT:
+ notifier_event(NOTIFY_MGMT_CONN_ADD, conn);
+ break;
+ default:
+ break;
+ }
return 0;
}
@@ -403,8 +405,16 @@ int connmgr_alloc(enum ae_id id,
int connmgr_dealloc(enum ae_id id,
struct conn * conn)
{
- if (connmgr.aes[id].nbs != NULL)
- nbs_del(connmgr.aes[id].nbs, conn->flow_info.fd);
+ switch (id) {
+ case AEID_DT:
+ notifier_event(NOTIFY_DT_CONN_DEL, conn);
+ break;
+ case AEID_MGMT:
+ notifier_event(NOTIFY_MGMT_CONN_DEL, conn);
+ break;
+ default:
+ break;
+ }
return flow_dealloc(conn->flow_info.fd);
}
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
index 379877e6..ca5288ae 100644
--- a/src/ipcpd/normal/connmgr.h
+++ b/src/ipcpd/normal/connmgr.h
@@ -27,7 +27,13 @@
#include <ouroboros/qos.h>
#include "ae.h"
-#include "neighbors.h"
+
+#define NOTIFY_DT_CONN_ADD 0x00D0
+#define NOTIFY_DT_CONN_DEL 0x00D1
+#define NOTIFY_DT_CONN_QOS 0x00D2
+
+#define NOTIFY_MGMT_CONN_ADD 0x00F0
+#define NOTIFY_MGMT_CONN_DEL 0x00F1
int connmgr_init(void);
@@ -38,8 +44,7 @@ int connmgr_start(void);
void connmgr_stop(void);
int connmgr_ae_init(enum ae_id id,
- const struct conn_info * info,
- struct nbs * nbs);
+ const struct conn_info * info);
void connmgr_ae_fini(enum ae_id id);
diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c
index d139cb91..b1ba44a8 100644
--- a/src/ipcpd/normal/dht.c
+++ b/src/ipcpd/normal/dht.c
@@ -328,9 +328,6 @@ static void kad_req_destroy(struct kad_req * req)
{
assert(req);
- if (req->key != NULL)
- free(req->key);
-
pthread_mutex_lock(&req->lock);
switch (req->state) {
@@ -351,7 +348,7 @@ static void kad_req_destroy(struct kad_req * req)
break;
}
- while (req->state != REQ_NULL)
+ while (req->state != REQ_NULL && req->state != REQ_DONE)
pthread_cond_wait(&req->cond, &req->lock);
pthread_mutex_unlock(&req->lock);
@@ -359,6 +356,9 @@ static void kad_req_destroy(struct kad_req * req)
pthread_cond_destroy(&req->cond);
pthread_mutex_destroy(&req->lock);
+ if (req->key != NULL)
+ free(req->key);
+
free(req);
}
@@ -391,7 +391,7 @@ static int kad_req_wait(struct kad_req * req,
case REQ_PENDING: /* ETIMEDOUT */
case REQ_RESPONSE:
req->state = REQ_DONE;
- pthread_cond_signal(&req->cond);
+ pthread_cond_broadcast(&req->cond);
break;
default:
break;
@@ -1859,7 +1859,7 @@ static void * work(void * o)
if (now.tv_sec > v->t_exp) {
list_del(&v->next);
val_destroy(v);
- }
+ }
if (now.tv_sec > v->t_rep) {
kad_publish(dht, e->key, v->addr,
@@ -2018,7 +2018,7 @@ static void kad_handle_response(struct dht * dht,
case KAD_FIND_VALUE:
case KAD_FIND_NODE:
if (dht_get_state(dht) != DHT_RUNNING)
- return;
+ break;
kad_handle_find_resp(dht, req, msg);
break;
default:
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index d2cda4f9..6d04c66a 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -27,13 +27,11 @@
#include <ouroboros/endian.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
-#include <ouroboros/rib.h>
#include <ouroboros/utils.h>
#include "dir.h"
#include "dht.h"
#include "ipcp.h"
-#include "ribconfig.h"
#include <stdlib.h>
#include <string.h>
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 282f6bee..2df17163 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -29,19 +29,17 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
-#include <ouroboros/rib.h>
#include <ouroboros/dev.h>
+#include <ouroboros/notifier.h>
#include "connmgr.h"
#include "ipcp.h"
#include "dt.h"
#include "dt_pci.h"
#include "pff.h"
-#include "neighbors.h"
#include "routing.h"
#include "sdu_sched.h"
#include "ae.h"
-#include "ribconfig.h"
#include "fa.h"
#include <stdlib.h>
@@ -66,36 +64,33 @@ struct {
struct ae_info aes[AP_RES_FDS];
pthread_rwlock_t lock;
- struct nbs * nbs;
-
- struct nb_notifier nb_notifier;
-
pthread_t listener;
} dt;
-static int dt_neighbor_event(enum nb_event event,
- struct conn conn)
+static void handle_event(int event,
+ const void * o)
{
- /* We are only interested in neighbors being added and removed. */
+ struct conn * c;
+
+ c = (struct conn *) o;
+
switch (event) {
- case NEIGHBOR_ADDED:
- sdu_sched_add(dt.sdu_sched, conn.flow_info.fd);
- log_dbg("Added fd %d to SDU scheduler.", conn.flow_info.fd);
+ case NOTIFY_DT_CONN_ADD:
+ sdu_sched_add(dt.sdu_sched, c->flow_info.fd);
+ log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd);
break;
- case NEIGHBOR_REMOVED:
- sdu_sched_del(dt.sdu_sched, conn.flow_info.fd);
- log_dbg("Removed fd %d from SDU scheduler.", conn.flow_info.fd);
+ case NOTIFY_DT_CONN_DEL:
+ sdu_sched_del(dt.sdu_sched, c->flow_info.fd);
+ log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd);
break;
default:
break;
}
-
- return 0;
}
-static int sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
+static void sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
@@ -107,45 +102,38 @@ static int sdu_handler(int fd,
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
ipcp_sdb_release(sdb);
- return 0;
+ return;
}
fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
if (fd < 0) {
log_err("No next hop for %" PRIu64, dt_pci.dst_addr);
ipcp_sdb_release(sdb);
- return -1;
+ return;
}
if (ipcp_flow_write(fd, sdb)) {
log_err("Failed to write SDU to fd %d.", fd);
ipcp_sdb_release(sdb);
- return -1;
+ return;
}
} else {
dt_pci_shrink(sdb);
if (dt_pci.fd > AP_RES_FDS) {
- if (ipcp_flow_write(dt_pci.fd, sdb)) {
+ if (ipcp_flow_write(dt_pci.fd, sdb))
ipcp_sdb_release(sdb);
- return -1;
- }
- return 0;
+ return;
}
if (dt.aes[dt_pci.fd].post_sdu == NULL) {
log_err("No registered AE on fd %d.", dt_pci.fd);
ipcp_sdb_release(sdb);
- return -EPERM;
+ return;
}
dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb);
-
- return 0;
}
-
- /* silence compiler */
- return 0;
}
static void * dt_conn_handle(void * o)
@@ -160,11 +148,9 @@ static void * dt_conn_handle(void * o)
continue;
}
- log_dbg("Got new connection.");
-
/* NOTE: connection acceptance policy could be here. */
- nbs_add(dt.nbs, conn);
+ notifier_event(NOTIFY_DT_CONN_ADD, &conn);
}
return 0;
@@ -192,24 +178,17 @@ int dt_init(enum pol_routing pr,
goto fail_pci_init;
}
- dt.nbs = nbs_create();
- if (dt.nbs == NULL) {
- log_err("Failed to create neighbors struct.");
- goto fail_nbs;
+ if (notifier_reg(handle_event)) {
+ log_err("Failed to register with notifier.");
+ goto fail_notifier_reg;
}
- dt.nb_notifier.notify_call = dt_neighbor_event;
- if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {
- log_err("Failed to register notifier.");
- goto fail_nbs_notifier;
- }
-
- if (connmgr_ae_init(AEID_DT, &info, dt.nbs)) {
+ if (connmgr_ae_init(AEID_DT, &info)) {
log_err("Failed to register with connmgr.");
goto fail_connmgr_ae_init;
}
- if (routing_init(pr, dt.nbs)) {
+ if (routing_init(pr)) {
log_err("Failed to init routing.");
goto fail_routing;
}
@@ -249,20 +228,17 @@ int dt_init(enum pol_routing pr,
for (j = 0; j < QOS_CUBE_MAX; ++j)
routing_i_destroy(dt.routing[j]);
fail_routing_i:
- connmgr_ae_fini(AEID_DT);
- fail_connmgr_ae_init:
for (i = 0; i < QOS_CUBE_MAX; ++i)
pff_destroy(dt.pff[i]);
fail_pff:
routing_fini();
fail_routing:
- nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
- fail_nbs_notifier:
- nbs_destroy(dt.nbs);
- fail_nbs:
+ connmgr_ae_fini(AEID_DT);
+ fail_connmgr_ae_init:
+ notifier_unreg(&handle_event);
+ fail_notifier_reg:
dt_pci_fini();
fail_pci_init:
- connmgr_ae_fini(AEID_DT);
return -1;
}
@@ -282,11 +258,11 @@ void dt_fini(void)
routing_fini();
- nbs_unreg_notifier(dt.nbs, &dt.nb_notifier);
+ connmgr_ae_fini(AEID_DT);
- nbs_destroy(dt.nbs);
+ notifier_unreg(&handle_event);
- connmgr_ae_fini(AEID_DT);
+ dt_pci_fini();
}
int dt_start(void)
diff --git a/src/ipcpd/normal/dt_pci.c b/src/ipcpd/normal/dt_pci.c
index 5704a09a..4684265d 100644
--- a/src/ipcpd/normal/dt_pci.c
+++ b/src/ipcpd/normal/dt_pci.c
@@ -21,10 +21,8 @@
*/
#include <ouroboros/errno.h>
-#include <ouroboros/rib.h>
#include "dt_pci.h"
-#include "ribconfig.h"
#include <stdlib.h>
#include <string.h>
diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c
index ad229f40..d14c62ac 100644
--- a/src/ipcpd/normal/enroll.c
+++ b/src/ipcpd/normal/enroll.c
@@ -29,14 +29,12 @@
#include <ouroboros/time_utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/logs.h>
-#include <ouroboros/rib.h>
#include <ouroboros/errno.h>
#include <ouroboros/sockets.h>
#include "connmgr.h"
#include "enroll.h"
#include "ipcp.h"
-#include "ribconfig.h"
#include <assert.h>
#include <stdlib.h>
@@ -270,6 +268,8 @@ static void * enroll_handle(void * o)
else
log_dbg("Neigbor reported failed enrollment.");
+ enroll_msg__free_unpacked(msg, NULL);
+
connmgr_dealloc(AEID_ENROLL, &conn);
}
@@ -339,7 +339,7 @@ int enroll_init(void)
info.pref_syntax = PROTO_GPB;
info.addr = 0;
- if (connmgr_ae_init(AEID_ENROLL, &info, NULL)) {
+ if (connmgr_ae_init(AEID_ENROLL, &info)) {
log_err("Failed to register with connmgr.");
return -1;
}
diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 682dc5c6..e684abd2 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -28,7 +28,6 @@
#include <ouroboros/logs.h>
#include <ouroboros/fqueue.h>
-#include <ouroboros/rib.h>
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
@@ -38,7 +37,6 @@
#include "fa.h"
#include "sdu_sched.h"
#include "ipcp.h"
-#include "ribconfig.h"
#include "dt.h"
#include <pthread.h>
@@ -59,9 +57,9 @@ struct {
struct sdu_sched * sdu_sched;
} fa;
-static int sdu_handler(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb)
+static void sdu_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
{
pthread_rwlock_rdlock(&fa.flows_lock);
@@ -69,12 +67,10 @@ static int sdu_handler(int fd,
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
log_warn("Failed to forward SDU.");
- return -1;
+ return;
}
pthread_rwlock_unlock(&fa.flows_lock);
-
- return 0;
}
static void destroy_conn(int fd)
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 22b6e718..2b35a04a 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -31,9 +31,9 @@
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/irm.h>
-#include <ouroboros/rib.h>
#include <ouroboros/hash.h>
#include <ouroboros/errno.h>
+#include <ouroboros/notifier.h>
#include "addr_auth.h"
#include "connmgr.h"
@@ -42,8 +42,6 @@
#include "fa.h"
#include "dt.h"
#include "ipcp.h"
-#include "ribconfig.h"
-#include "ribmgr.h"
#include <stdbool.h>
#include <signal.h>
@@ -56,11 +54,6 @@
static int initialize_components(const struct ipcp_config * conf)
{
- if (rib_init()) {
- log_err("Failed to initialize RIB.");
- goto fail_rib_init;
- }
-
ipcpi.dif_name = strdup(conf->dif_info.dif_name);
if (ipcpi.dif_name == NULL) {
log_err("Failed to set DIF name.");
@@ -85,11 +78,6 @@ static int initialize_components(const struct ipcp_config * conf)
log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
- if (ribmgr_init()) {
- log_err("Failed to initialize RIB manager.");
- goto fail_ribmgr;
- }
-
if (dt_init(conf->routing_type,
conf->addr_size,
conf->fd_size,
@@ -117,14 +105,10 @@ static int initialize_components(const struct ipcp_config * conf)
fail_fa:
dt_fini();
fail_dt:
- ribmgr_fini();
- fail_ribmgr:
addr_auth_fini();
fail_addr_auth:
free(ipcpi.dif_name);
fail_dif_name:
- rib_fini();
- fail_rib_init:
return -1;
}
@@ -136,13 +120,9 @@ static void finalize_components(void)
dt_fini();
- ribmgr_fini();
-
addr_auth_fini();
free(ipcpi.dif_name);
-
- rib_fini();
}
static int start_components(void)
@@ -151,11 +131,6 @@ static int start_components(void)
ipcp_set_state(IPCP_OPERATIONAL);
- if (ribmgr_start()) {
- log_err("Failed to start RIB manager.");
- goto fail_ribmgr_start;
- }
-
if (fa_start()) {
log_err("Failed to start flow allocator.");
goto fail_fa_start;
@@ -178,8 +153,6 @@ static int start_components(void)
fail_enroll_start:
fa_stop();
fail_fa_start:
- ribmgr_stop();
- fail_ribmgr_start:
ipcp_set_state(IPCP_INIT);
return -1;
}
@@ -195,8 +168,6 @@ static void stop_components(void)
fa_stop();
- ribmgr_stop();
-
ipcp_set_state(IPCP_INIT);
}
@@ -377,6 +348,11 @@ int main(int argc,
goto fail_enroll_init;
}
+ if (notifier_init()) {
+ log_err("Failed to initialize notifier component.");
+ goto fail_notifier_init;
+ }
+
if (ipcp_boot() < 0) {
log_err("Failed to boot IPCP.");
goto fail_boot;
@@ -396,6 +372,8 @@ int main(int argc,
finalize_components();
}
+ notifier_fini();
+
enroll_fini();
connmgr_fini();
@@ -409,6 +387,8 @@ int main(int argc,
fail_create_r:
ipcp_shutdown();
fail_boot:
+ notifier_fini();
+ fail_notifier_init:
enroll_fini();
fail_enroll_init:
connmgr_fini();
diff --git a/src/ipcpd/normal/neighbors.c b/src/ipcpd/normal/neighbors.c
deleted file mode 100644
index c32e9aa2..00000000
--- a/src/ipcpd/normal/neighbors.c
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Neighbors
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 199309L
-
-#define OUROBOROS_PREFIX "neighbors"
-
-#include <ouroboros/qoscube.h>
-#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/logs.h>
-
-#include "neighbors.h"
-
-#include <stdlib.h>
-#include <assert.h>
-#include <inttypes.h>
-
-static void notify_listeners(enum nb_event event,
- struct nb * nb,
- struct nbs * nbs)
-{
- struct list_head * p = NULL;
-
- pthread_mutex_lock(&nbs->notifiers_lock);
-
- list_for_each(p, &nbs->notifiers) {
- struct nb_notifier * e =
- list_entry(p, struct nb_notifier, next);
- if (e->notify_call(event, nb->conn))
- log_err("Listener reported an error.");
- }
-
- pthread_mutex_unlock(&nbs->notifiers_lock);
-}
-
-struct nbs * nbs_create(void)
-{
- struct nbs * nbs;
-
- nbs = malloc(sizeof(*nbs));
- if (nbs == NULL)
- return NULL;
-
- list_head_init(&nbs->list);
- list_head_init(&nbs->notifiers);
-
- if (pthread_mutex_init(&nbs->list_lock, NULL))
- return NULL;
-
- if (pthread_mutex_init(&nbs->notifiers_lock, NULL)) {
- pthread_mutex_destroy(&nbs->list_lock);
- return NULL;
- }
-
- return nbs;
-}
-
-void nbs_destroy(struct nbs * nbs)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- assert(nbs);
-
- pthread_mutex_lock(&nbs->list_lock);
-
- list_for_each_safe(p, n, &nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- list_del(&e->next);
- free(e);
- }
-
- pthread_mutex_unlock(&nbs->list_lock);
-
- pthread_mutex_destroy(&nbs->list_lock);
- pthread_mutex_destroy(&nbs->notifiers_lock);
-
- free(nbs);
-}
-
-int nbs_add(struct nbs * nbs,
- struct conn conn)
-{
- struct nb * nb;
-
- assert(nbs);
-
- nb = malloc(sizeof(*nb));
- if (nb == NULL)
- return -ENOMEM;
-
- nb->conn = conn;
-
- pthread_mutex_lock(&nbs->list_lock);
-
- list_add(&nb->next, &nbs->list);
-
- notify_listeners(NEIGHBOR_ADDED, nb, nbs);
-
- pthread_mutex_unlock(&nbs->list_lock);
-
- log_info("Added neighbor with fd %d and address %" PRIu64 " to list.",
- conn.flow_info.fd, conn.conn_info.addr);
-
- return 0;
-}
-
-int nbs_update_qos(struct nbs * nbs,
- int fd,
- qosspec_t qs)
-{
- struct list_head * p = NULL;
-
- assert(nbs);
-
- pthread_mutex_lock(&nbs->list_lock);
-
- list_for_each(p, &nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- if (e->conn.flow_info.fd == fd) {
- e->conn.flow_info.qs = qs;
-
- notify_listeners(NEIGHBOR_QOS_CHANGE, e, nbs);
-
- pthread_mutex_unlock(&nbs->list_lock);
- return 0;
- }
- }
-
- pthread_mutex_unlock(&nbs->list_lock);
-
- return -1;
-}
-
-int nbs_del(struct nbs * nbs,
- int fd)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- assert(nbs);
-
- pthread_mutex_lock(&nbs->list_lock);
-
- list_for_each_safe(p, n, &nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- if (e->conn.flow_info.fd == fd) {
- notify_listeners(NEIGHBOR_REMOVED, e, nbs);
- list_del(&e->next);
- free(e);
- pthread_mutex_unlock(&nbs->list_lock);
- return 0;
- }
- }
-
- pthread_mutex_unlock(&nbs->list_lock);
-
- return -1;
-}
-
-bool nbs_has(struct nbs * nbs,
- uint64_t addr)
-{
- struct list_head * p = NULL;
-
- assert(nbs);
-
- pthread_mutex_lock(&nbs->list_lock);
-
- list_for_each(p, &nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- if (e->conn.conn_info.addr == addr) {
- pthread_mutex_unlock(&nbs->list_lock);
- return true;
- }
- }
-
- pthread_mutex_unlock(&nbs->list_lock);
-
- return false;
-}
-
-int nbs_reg_notifier(struct nbs * nbs,
- struct nb_notifier * notify)
-{
- assert(nbs);
- assert(notify);
-
- pthread_mutex_lock(&nbs->notifiers_lock);
-
- list_add(&notify->next, &nbs->notifiers);
-
- pthread_mutex_unlock(&nbs->notifiers_lock);
-
- return 0;
-}
-
-int nbs_unreg_notifier(struct nbs * nbs,
- struct nb_notifier * notify)
-{
- struct list_head * p = NULL;
- struct list_head * n = NULL;
-
- pthread_mutex_lock(&nbs->notifiers_lock);
-
- list_for_each_safe(p, n, &nbs->notifiers) {
- struct nb_notifier * e =
- list_entry(p, struct nb_notifier, next);
- if (e == notify) {
- list_del(&e->next);
- pthread_mutex_unlock(&nbs->notifiers_lock);
- return 0;
- }
- }
-
- pthread_mutex_unlock(&nbs->notifiers_lock);
-
- return -1;
-}
diff --git a/src/ipcpd/normal/neighbors.h b/src/ipcpd/normal/neighbors.h
deleted file mode 100644
index 9c5a6e50..00000000
--- a/src/ipcpd/normal/neighbors.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Neighbors
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
-#define OUROBOROS_IPCPD_NORMAL_NEIGHBORS_H
-
-#include <ouroboros/ipcp.h>
-#include <ouroboros/list.h>
-#include <ouroboros/qos.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/cacep.h>
-
-#include "ae.h"
-
-enum nb_event {
- NEIGHBOR_ADDED,
- NEIGHBOR_REMOVED,
- NEIGHBOR_QOS_CHANGE
-};
-
-typedef int (* nb_notify_t)(enum nb_event event,
- struct conn conn);
-
-struct nb {
- struct list_head next;
- struct conn conn;
-};
-
-struct nb_notifier {
- struct list_head next;
- nb_notify_t notify_call;
-};
-
-struct nbs {
- struct list_head notifiers;
- pthread_mutex_t notifiers_lock;
-
- struct list_head list;
- pthread_mutex_t list_lock;
-};
-
-struct nbs * nbs_create(void);
-
-void nbs_destroy(struct nbs * nbs);
-
-int nbs_add(struct nbs * nbs,
- struct conn conn);
-
-int nbs_update_qos(struct nbs * nbs,
- int fd,
- qosspec_t qs);
-
-int nbs_del(struct nbs * nbs,
- int fd);
-
-bool nbs_has(struct nbs * nbs,
- uint64_t addr);
-
-int nbs_reg_notifier(struct nbs * nbs,
- struct nb_notifier * notify);
-
-int nbs_unreg_notifier(struct nbs * nbs,
- struct nb_notifier * notify);
-
-#endif
diff --git a/src/ipcpd/normal/pol-routing-ops.h b/src/ipcpd/normal/pol-routing-ops.h
index 0fec10fc..9804d5ad 100644
--- a/src/ipcpd/normal/pol-routing-ops.h
+++ b/src/ipcpd/normal/pol-routing-ops.h
@@ -26,7 +26,7 @@
#include "pff.h"
struct pol_routing_ops {
- int (* init)(struct nbs * nbs);
+ int (* init)(void);
void (* fini)(void);
diff --git a/src/ipcpd/normal/pol/flat.c b/src/ipcpd/normal/pol/flat.c
index 7a5a785e..0c4648c5 100644
--- a/src/ipcpd/normal/pol/flat.c
+++ b/src/ipcpd/normal/pol/flat.c
@@ -27,11 +27,9 @@
#include <ouroboros/logs.h>
#include <ouroboros/errno.h>
#include <ouroboros/time_utils.h>
-#include <ouroboros/rib.h>
#include <ouroboros/utils.h>
#include "ipcp.h"
-#include "ribconfig.h"
#include <time.h>
#include <stdlib.h>
diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c
index 512ced7f..7df09bce 100644
--- a/src/ipcpd/normal/pol/link_state.c
+++ b/src/ipcpd/normal/pol/link_state.c
@@ -27,14 +27,16 @@
#include <ouroboros/errno.h>
#include <ouroboros/list.h>
#include <ouroboros/logs.h>
-#include <ouroboros/rib.h>
-#include <ouroboros/rqueue.h>
+#include <ouroboros/utils.h>
+#include <ouroboros/notifier.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/fqueue.h>
-#include "ribmgr.h"
-#include "ribconfig.h"
+#include "ae.h"
+#include "connmgr.h"
#include "graph.h"
-#include "neighbors.h"
#include "ipcp.h"
+#include "link_state.h"
#include "pff.h"
#include <assert.h>
@@ -43,39 +45,230 @@
#include <string.h>
#include <pthread.h>
-#include "fso.pb-c.h"
-typedef Fso fso_t;
+#include "link_state.pb-c.h"
+typedef LinkStateMsg link_state_msg_t;
-#define BUF_SIZE 256
-#define RECALC_TIME 4
+#define RECALC_TIME 4
+#define LS_UPDATE_TIME 15
+#define LS_TIMEO 60
+#define LSA_MAX_LEN 128
+
+#ifndef CLOCK_REALTIME_COARSE
+#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
+#endif
struct routing_i {
struct pff * pff;
pthread_t calculator;
};
+/* TODO: link weight support. */
+struct adjacency {
+ struct list_head next;
+
+ uint64_t dst;
+ uint64_t src;
+
+ time_t stamp;
+};
+
+enum nb_type {
+ NB_DT = 0,
+ NB_MGMT
+};
+
+struct nb {
+ struct list_head next;
+
+ uint64_t addr;
+ int fd;
+ enum nb_type type;
+};
+
struct {
- struct nbs * nbs;
- struct nb_notifier nb_notifier;
+ struct list_head nbs;
+ fset_t * mgmt_set;
- struct graph * graph;
+ struct list_head db;
- ro_set_t * set;
- rqueue_t * queue;
- pthread_t rib_listener;
-} link_state;
+ pthread_rwlock_t db_lock;
-/* Take under neighbors lock */
-static int addr_to_fd(uint64_t addr)
+ struct graph * graph;
+
+ pthread_t lsupdate;
+ pthread_t lsreader;
+ pthread_t listener;
+} ls;
+
+struct pol_routing_ops link_state_ops = {
+ .init = link_state_init,
+ .fini = link_state_fini,
+ .routing_i_create = link_state_routing_i_create,
+ .routing_i_destroy = link_state_routing_i_destroy
+};
+
+static int lsdb_add_nb(uint64_t addr,
+ int fd,
+ enum nb_type type)
{
- struct list_head * p = NULL;
+ struct list_head * p;
+ struct nb * nb;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each(p, &ls.nbs) {
+ struct nb * el = list_entry(p, struct nb, next);
+ if (el->addr == addr && el->type == type) {
+ log_dbg("Already know %s neighbor %" PRIu64 ".",
+ type == NB_DT ? "dt" : "mgmt", addr);
+ if (el->fd != fd) {
+ log_warn("Existing neighbor assigned new fd.");
+ el->fd = fd;
+ }
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -EPERM;
+ }
- list_for_each(p, &link_state.nbs->list) {
- struct nb * e = list_entry(p, struct nb, next);
- if (e->conn.conn_info.addr == addr)
- return e->conn.flow_info.fd;
+ if (addr > el->addr)
+ break;
}
+ nb = malloc(sizeof(*nb));
+ if (nb == NULL) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -ENOMEM;
+ }
+
+ nb->addr = addr;
+ nb->fd = fd;
+ nb->type = type;
+
+ list_add_tail(&nb->next, p);
+
+ log_dbg("Type %s neighbor %" PRIu64 " added.",
+ nb->type == NB_DT ? "dt" : "mgmt", addr);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return 0;
+}
+
+static int lsdb_del_nb(uint64_t addr,
+ int fd)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->addr == addr && nb->fd == fd) {
+ list_del(&nb->next);
+ pthread_rwlock_unlock(&ls.db_lock);
+ log_dbg("Type %s neighbor %" PRIu64 " deleted.",
+ nb->type == NB_DT ? "dt" : "mgmt", addr);
+ free(nb);
+ return 0;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return -EPERM;
+}
+
+static int lsdb_add_link(uint64_t src,
+ uint64_t dst,
+ qosspec_t * qs)
+{
+ struct list_head * p;
+ struct adjacency * adj;
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each(p, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ if (a->dst == dst && a->src == src) {
+ a->stamp = now.tv_sec;
+ pthread_rwlock_unlock(&ls.db_lock);
+ return 0;
+ }
+
+ if (a->dst > dst || (a->dst == dst && a->src > src))
+ break;
+ }
+
+ adj = malloc(sizeof(*adj));
+ if (adj == NULL) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return -ENOMEM;
+ }
+
+ adj->dst = dst;
+ adj->src = src;
+ adj->stamp = now.tv_sec;
+
+ list_add_tail(&adj->next, p);
+
+ if (graph_update_edge(ls.graph, src, dst, *qs))
+ log_warn("Failed to add edge to graph.");
+
+ log_dbg("Added %" PRIu64 " - %" PRIu64" to lsdb.", adj->src, adj->dst);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return 0;
+}
+
+static int lsdb_del_link(uint64_t src,
+ uint64_t dst)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ if (a->dst == dst && a->src == src) {
+ list_del(&a->next);
+ if (graph_del_edge(ls.graph, src, dst))
+ log_warn("Failed to delete edge from graph.");
+
+ log_dbg("Removed %" PRIu64 " - %" PRIu64" from lsdb.",
+ a->src, a->dst);
+
+ pthread_rwlock_unlock(&ls.db_lock);
+ free(a);
+ return 0;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
+ return -EPERM;
+}
+
+static int nbr_to_fd(uint64_t addr)
+{
+ struct list_head * p;
+
+ pthread_rwlock_rdlock(&ls.db_lock);
+
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->addr == addr && nb->type == NB_DT) {
+ pthread_rwlock_unlock(&ls.db_lock);
+ return nb->fd;
+ }
+ }
+
+ pthread_rwlock_unlock(&ls.db_lock);
+
return -1;
}
@@ -91,20 +284,19 @@ static void * calculate_pff(void * o)
while (true) {
table = NULL;
- n_table = graph_routing_table(link_state.graph,
+ n_table = graph_routing_table(ls.graph,
ipcpi.dt_addr, &table);
if (n_table < 0) {
sleep(RECALC_TIME);
continue;
}
- pthread_mutex_lock(&link_state.nbs->list_lock);
pff_lock(instance->pff);
pff_flush(instance->pff);
for (i = 0; i < n_table; i++) {
- fd = addr_to_fd(table[i]->nhop);
+ fd = nbr_to_fd(table[i]->nhop);
if (fd == -1)
continue;
@@ -112,7 +304,6 @@ static void * calculate_pff(void * o)
}
pff_unlock(instance->pff);
- pthread_mutex_unlock(&link_state.nbs->list_lock);
freepp(struct routing_table, table, n_table);
sleep(RECALC_TIME);
@@ -121,154 +312,209 @@ static void * calculate_pff(void * o)
return (void *) 0;
}
-static int link_state_neighbor_event(enum nb_event event,
- struct conn conn)
+static void send_lsa(uint64_t dst,
+ uint64_t src)
{
- char path[RIB_MAX_PATH_LEN + 1];
- char fso_name[RIB_MAX_PATH_LEN + 1];
- fso_t fso = FSO__INIT;
- size_t len;
- uint8_t * data;
+ uint8_t buf[LSA_MAX_LEN];
+ link_state_msg_t lsa = LINK_STATE_MSG__INIT;
+ size_t len;
+ struct list_head * p;
- path[0] = '\0';
- sprintf(fso_name, "%" PRIu64 "-%" PRIu64,
- ipcpi.dt_addr, conn.conn_info.addr);
- rib_path_append(rib_path_append(path, ROUTING_NAME), fso_name);
+ lsa.d_addr = dst;
+ lsa.s_addr = src;
- switch (event) {
- case NEIGHBOR_ADDED:
- fso.s_addr = ipcpi.dt_addr;
- fso.d_addr = conn.conn_info.addr;
+ len = link_state_msg__get_packed_size(&lsa);
- len = fso__get_packed_size(&fso);
- if (len == 0)
- return -1;
+ assert(len <= LSA_MAX_LEN);
- data = malloc(len);
- if (data == NULL)
- return -1;
+ link_state_msg__pack(&lsa, buf);
- fso__pack(&fso, data);
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->type == NB_MGMT)
+ flow_write(nb->fd, buf, len);
+ }
+}
- if (rib_add(ROUTING_PATH, fso_name)) {
- log_err("Failed to add FSO.");
- free(data);
- return -1;
- }
+static void * lsupdate(void * o)
+{
+ struct list_head * p;
+ struct list_head * h;
+ struct timespec now;
- if (rib_put(path, data, len)) {
- log_err("Failed to put FSO in RIB.");
- rib_del(path);
- free(data);
- return -1;
- }
+ (void) o;
- log_dbg("Added %s to RIB.", path);
+ while (true) {
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_rdlock(&ls.db_lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock,
+ (void *) &ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * adj;
+ adj = list_entry(p, struct adjacency, next);
+ if (now.tv_sec - adj->stamp > LS_TIMEO) {
+ list_del(&adj->next);
+ log_dbg("%" PRIu64 " - %" PRIu64" timed out.",
+ adj->src, adj->dst);
+ if (graph_del_edge(ls.graph, adj->src,
+ adj->dst))
+ log_dbg("Failed to delete edge.");
+ free(adj);
+ continue;
+ }
- break;
- case NEIGHBOR_REMOVED:
- if (rib_del(path)) {
- log_err("Failed to remove FSO.");
- return -1;
+ if (adj->src == ipcpi.dt_addr) {
+ send_lsa(adj->src, adj->dst);
+ adj->stamp = now.tv_sec;
+ }
}
- log_dbg("Removed %s from RIB.", path);
+ pthread_cleanup_pop(true);
- break;
- case NEIGHBOR_QOS_CHANGE:
- log_info("Not currently supported.");
- break;
- default:
- log_info("Unsupported event for routing.");
- break;
+ sleep(LS_UPDATE_TIME);
}
- return 0;
+ return (void *) 0;
}
-static int read_fso(char * path,
- int32_t flag)
+static void * ls_conn_handle(void * o)
{
- ssize_t len;
- uint8_t ro[BUF_SIZE];
- fso_t * fso;
- qosspec_t qs;
+ struct conn conn;
- memset(&qs, 0, sizeof(qs));
+ (void) o;
- len = rib_read(path, ro, BUF_SIZE);
- if (len < 0) {
- log_err("Failed to read FSO.");
- return -1;
- }
+ while (true) {
+ if (connmgr_wait(AEID_MGMT, &conn)) {
+ log_err("Failed to get next MGMT connection.");
+ continue;
+ }
- fso = fso__unpack(NULL, len, ro);
- if (fso == NULL) {
- log_err("Failed to unpack.");
- return -1;
- }
+ /* NOTE: connection acceptance policy could be here. */
- if (flag & RO_MODIFY) {
- if (graph_update_edge(link_state.graph,
- fso->s_addr, fso->d_addr, qs)) {
- fso__free_unpacked(fso, NULL);
- return -1;
- }
- } else if (flag & RO_DELETE) {
- if (graph_del_edge(link_state.graph, fso->s_addr, fso->d_addr)) {
- fso__free_unpacked(fso, NULL);
- return -1;
- }
+ notifier_event(NOTIFY_MGMT_CONN_ADD, &conn);
}
- fso__free_unpacked(fso, NULL);
-
return 0;
}
-static void * rib_listener(void * o)
+
+static void forward_lsm(uint8_t * buf,
+ size_t len,
+ int in_fd)
{
- int32_t flag;
- char path[RIB_MAX_PATH_LEN + 1];
- char ** children;
- ssize_t len;
- int i;
+ struct list_head * p;
- (void) o;
+ pthread_rwlock_rdlock(&ls.db_lock);
- if (ro_set_add(link_state.set, ROUTING_PATH, RO_MODIFY | RO_DELETE)) {
- log_err("Failed to add to RO set");
- return (void * ) -1;
+ list_for_each(p, &ls.nbs) {
+ struct nb * nb = list_entry(p, struct nb, next);
+ if (nb->type == NB_MGMT && nb->fd != in_fd)
+ flow_write(nb->fd, buf, len);
}
- len = rib_children(ROUTING_PATH, &children);
- if (len < 0) {
- log_err("Failed to retrieve children.");
+ pthread_rwlock_unlock(&ls.db_lock);
+}
+
+static void * lsreader(void * o)
+{
+ fqueue_t * fq;
+ int ret;
+ uint8_t buf[LSA_MAX_LEN];
+ size_t len;
+ int fd;
+ qosspec_t qs;
+
+ (void) o;
+
+ memset(&qs, 0, sizeof(qs));
+
+ fq = fqueue_create();
+ if (fq == NULL)
return (void *) -1;
- }
- for (i = 0; i < len; i++) {
- if (read_fso(children[i], RO_CREATE)) {
- log_err("Failed to parse FSO.");
+ pthread_cleanup_push((void (*) (void *)) fqueue_destroy,
+ (void *) fq);
+
+ while (true) {
+ ret = fevent(ls.mgmt_set, fq, NULL);
+ if (ret < 0) {
+ log_warn("Event error: %d.", ret);
continue;
}
- }
- while (rib_event_wait(link_state.set, link_state.queue, NULL) == 0) {
- path[0] = '\0';
- flag = rqueue_next(link_state.queue, path);
- if (flag < 0)
- continue;
+ while ((fd = fqueue_next(fq)) >= 0) {
+ link_state_msg_t * msg;
+ len = flow_read(fd, buf, LSA_MAX_LEN);
+ if (len <= 0)
+ continue;
- if (read_fso(path, flag)) {
- log_err("Failed to parse FSO.");
- continue;
+ msg = link_state_msg__unpack(NULL, len, buf);
+ if (msg == NULL) {
+ log_dbg("Failed to unpack link state message.");
+ continue;
+ }
+
+ lsdb_add_link(msg->s_addr, msg->d_addr, &qs);
+
+ link_state_msg__free_unpacked(msg, NULL);
+
+ forward_lsm(buf, len, fd);
}
}
+ pthread_cleanup_pop(true);
+
return (void *) 0;
}
+static void handle_event(int event,
+ const void * o)
+{
+ /* FIXME: Apply correct QoS on graph */
+ struct conn * c;
+ qosspec_t qs;
+
+ c = (struct conn *) o;
+
+ memset(&qs, 0, sizeof(qs));
+
+ switch (event) {
+ case NOTIFY_DT_CONN_ADD:
+ if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT))
+ log_dbg("Failed to add neighbor to LSDB.");
+
+ if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs))
+ log_dbg("Failed to add adjacency to LSDB.");
+ break;
+ case NOTIFY_DT_CONN_DEL:
+ if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
+ log_dbg("Failed to delete neighbor from LSDB.");
+
+ if (lsdb_del_link(ipcpi.dt_addr, c->conn_info.addr))
+ log_dbg("Local link was not in LSDB.");
+ break;
+ case NOTIFY_DT_CONN_QOS:
+ log_dbg("QoS changes currently unsupported.");
+ break;
+ case NOTIFY_MGMT_CONN_ADD:
+ fset_add(ls.mgmt_set, c->flow_info.fd);
+ if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_MGMT))
+ log_warn("Failed to add mgmt neighbor to LSDB.");
+ break;
+ case NOTIFY_MGMT_CONN_DEL:
+ fset_del(ls.mgmt_set, c->flow_info.fd);
+ if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd))
+ log_warn("Failed to add mgmt neighbor to LSDB.");
+ break;
+ default:
+ log_info("Unknown routing event.");
+ break;
+ }
+}
+
struct routing_i * link_state_routing_i_create(struct pff * pff)
{
struct routing_i * tmp;
@@ -281,7 +527,10 @@ struct routing_i * link_state_routing_i_create(struct pff * pff)
tmp->pff = pff;
- pthread_create(&tmp->calculator, NULL, calculate_pff, (void *) tmp);
+ if (pthread_create(&tmp->calculator, NULL, calculate_pff, tmp)) {
+ free(tmp);
+ return NULL;
+ }
return tmp;
}
@@ -297,61 +546,100 @@ void link_state_routing_i_destroy(struct routing_i * instance)
free(instance);
}
-int link_state_init(struct nbs * nbs)
+int link_state_init(void)
{
- link_state.graph = graph_create();
- if (link_state.graph == NULL)
+ struct conn_info info;
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.ae_name, LS_AE);
+ strcpy(info.protocol, LS_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_GPB;
+ info.addr = ipcpi.dt_addr;
+
+ ls.graph = graph_create();
+ if (ls.graph == NULL)
goto fail_graph;
- if (rib_add(RIB_ROOT, ROUTING_NAME))
- goto fail_rib_add;
+ if (notifier_reg(handle_event))
+ goto fail_notifier_reg;
+
+ if (pthread_rwlock_init(&ls.db_lock, NULL))
+ goto fail_db_lock_init;
+
+ if (connmgr_ae_init(AEID_MGMT, &info))
+ goto fail_connmgr_ae_init;
- link_state.nbs = nbs;
+ ls.mgmt_set = fset_create();
+ if (ls.mgmt_set == NULL)
+ goto fail_fset_create;
- link_state.nb_notifier.notify_call = link_state_neighbor_event;
- if (nbs_reg_notifier(link_state.nbs, &link_state.nb_notifier))
- goto fail_nbs_reg_notifier;
+ list_head_init(&ls.db);
+ list_head_init(&ls.nbs);
- link_state.set = ro_set_create();
- if (link_state.set == NULL)
- goto fail_ro_set_create;
+ if (pthread_create(&ls.lsupdate, NULL, lsupdate, NULL))
+ goto fail_pthread_create_lsupdate;
- link_state.queue = rqueue_create();
- if (link_state.queue == NULL)
- goto fail_rqueue_create;
+ if (pthread_create(&ls.lsreader, NULL, lsreader, NULL))
+ goto fail_pthread_create_lsreader;
- if (pthread_create(&link_state.rib_listener, NULL, rib_listener, NULL))
- goto fail_listener_create;
+ if (pthread_create(&ls.listener, NULL, ls_conn_handle, NULL))
+ goto fail_pthread_create_listener;
return 0;
- fail_listener_create:
- ro_set_destroy(link_state.set);
- fail_rqueue_create:
- ro_set_destroy(link_state.set);
- fail_ro_set_create:
- nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier);
- fail_nbs_reg_notifier:
- rib_del(ROUTING_PATH);
- fail_rib_add:
- graph_destroy(link_state.graph);
+ fail_pthread_create_listener:
+ pthread_cancel(ls.lsreader);
+ pthread_join(ls.lsreader, NULL);
+ fail_pthread_create_lsreader:
+ pthread_cancel(ls.lsupdate);
+ pthread_join(ls.lsupdate, NULL);
+ fail_pthread_create_lsupdate:
+ fset_destroy(ls.mgmt_set);
+ fail_fset_create:
+ connmgr_ae_fini(AEID_MGMT);
+ fail_connmgr_ae_init:
+ pthread_rwlock_destroy(&ls.db_lock);
+ fail_db_lock_init:
+ notifier_unreg(handle_event);
+ fail_notifier_reg:
+ graph_destroy(ls.graph);
fail_graph:
return -1;
}
void link_state_fini(void)
{
- pthread_cancel(link_state.rib_listener);
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_cancel(ls.listener);
+ pthread_join(ls.listener, NULL);
+
+ pthread_cancel(ls.lsreader);
+ pthread_join(ls.lsreader, NULL);
- pthread_join(link_state.rib_listener, NULL);
+ pthread_cancel(ls.lsupdate);
+ pthread_join(ls.lsupdate, NULL);
- rqueue_destroy(link_state.queue);
+ fset_destroy(ls.mgmt_set);
- ro_set_destroy(link_state.set);
+ connmgr_ae_fini(AEID_MGMT);
+
+ graph_destroy(ls.graph);
+
+ pthread_rwlock_wrlock(&ls.db_lock);
+
+ list_for_each_safe(p, h, &ls.db) {
+ struct adjacency * a = list_entry(p, struct adjacency, next);
+ list_del(&a->next);
+ free(a);
+ }
- graph_destroy(link_state.graph);
+ pthread_rwlock_unlock(&ls.db_lock);
- rib_del(ROUTING_PATH);
+ pthread_rwlock_destroy(&ls.db_lock);
- nbs_unreg_notifier(link_state.nbs, &link_state.nb_notifier);
+ notifier_unreg(handle_event);
}
diff --git a/src/ipcpd/normal/pol/link_state.h b/src/ipcpd/normal/pol/link_state.h
index 9b96bcab..58f90d91 100644
--- a/src/ipcpd/normal/pol/link_state.h
+++ b/src/ipcpd/normal/pol/link_state.h
@@ -23,9 +23,12 @@
#ifndef OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H
#define OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H
+#define LS_AE "Management"
+#define LS_PROTO "LSP"
+
#include "pol-routing-ops.h"
-int link_state_init(struct nbs * nbs);
+int link_state_init(void);
void link_state_fini(void);
@@ -33,11 +36,6 @@ struct routing_i * link_state_routing_i_create(struct pff * pff);
void link_state_routing_i_destroy(struct routing_i * instance);
-struct pol_routing_ops link_state_ops = {
- .init = link_state_init,
- .fini = link_state_fini,
- .routing_i_create = link_state_routing_i_create,
- .routing_i_destroy = link_state_routing_i_destroy
-};
+struct pol_routing_ops link_state_ops;
#endif /* OUROBOROS_IPCPD_NORMAL_POL_LINK_STATE_H */
diff --git a/src/ipcpd/normal/pol/fso.proto b/src/ipcpd/normal/pol/link_state.proto
index 27a78efd..4e2280b0 100644
--- a/src/ipcpd/normal/pol/fso.proto
+++ b/src/ipcpd/normal/pol/link_state.proto
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2017
*
- * Flow State Object message
+ * Link State message
*
* Dimitri Staessens <[email protected]>
* Sander Vrijders <[email protected]>
@@ -22,8 +22,8 @@
syntax = "proto2";
-message fso {
- required uint64 s_addr = 1;
- required uint64 d_addr = 2;
+message link_state_msg {
+ required uint64 d_addr = 1;
+ required uint64 s_addr = 2;
/* Add QoS parameters of link here */
};
diff --git a/src/ipcpd/normal/ribconfig.h b/src/ipcpd/normal/ribconfig.h
deleted file mode 100644
index f6d10133..00000000
--- a/src/ipcpd/normal/ribconfig.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Normal IPC Process - RIB configuration
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H
-#define OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H
-
-/* RIB configuration for normal */
-#define RIB_MAX_PATH_LEN 256
-
-#define DLR "/"
-#define ROUTING_NAME "fsdb"
-#define ROUTING_PATH DLR ROUTING_NAME
-
-#endif /* OUROBOROS_IPCPD_NORMAL_RIB_CONFIG_H */
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
deleted file mode 100644
index a5e7d6ce..00000000
--- a/src/ipcpd/normal/ribmgr.c
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * RIB manager of the IPC Process
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200112L
-
-#define OUROBOROS_PREFIX "rib-manager"
-
-#include <ouroboros/logs.h>
-#include <ouroboros/cdap.h>
-#include <ouroboros/list.h>
-#include <ouroboros/time_utils.h>
-#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/rib.h>
-
-#include "ae.h"
-#include "connmgr.h"
-#include "ipcp.h"
-#include "neighbors.h"
-#include "ribconfig.h"
-#include "ribmgr.h"
-
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-
-#define MGMT_AE "Management"
-#define RIB_SYNC_TIMEOUT 1
-
-enum ribmgr_state {
- RIBMGR_NULL = 0,
- RIBMGR_INIT,
- RIBMGR_OPERATIONAL,
- RIBMGR_SHUTDOWN
-};
-
-struct {
- struct cdap * cdap;
-
- pthread_t reader;
- pthread_t sync;
-
- struct nbs * nbs;
- struct ae * ae;
-
- struct nb_notifier nb_notifier;
-
- pthread_rwlock_t state_lock;
- enum ribmgr_state state;
-} ribmgr;
-
-static int ribmgr_neighbor_event(enum nb_event event,
- struct conn conn)
-{
- switch (event) {
- case NEIGHBOR_ADDED:
- cdap_add_flow(ribmgr.cdap, conn.flow_info.fd);
- break;
- case NEIGHBOR_REMOVED:
- cdap_del_flow(ribmgr.cdap, conn.flow_info.fd);
- break;
- default:
- /* Don't care about other events */
- break;
- }
-
- return 0;
-}
-
-static enum ribmgr_state ribmgr_get_state(void)
-{
- enum ribmgr_state state;
-
- pthread_rwlock_rdlock(&ribmgr.state_lock);
-
- state = ribmgr.state;
-
- pthread_rwlock_unlock(&ribmgr.state_lock);
-
- return state;
-}
-
-static void ribmgr_set_state(enum ribmgr_state state)
-{
- pthread_rwlock_wrlock(&ribmgr.state_lock);
-
- ribmgr.state = state;
-
- pthread_rwlock_unlock(&ribmgr.state_lock);
-}
-
-static void * reader(void * o)
-{
- cdap_key_t key;
- enum cdap_opcode oc;
- char * name;
- uint8_t * data;
- size_t len;
- ssize_t slen;
- uint32_t flags;
- uint8_t * buf;
- int rval;
-
- (void) o;
-
- while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
- key = cdap_request_wait(ribmgr.cdap, &oc, &name, &data,
- (size_t *) &len , &flags);
- assert(key != -EINVAL);
-
- if (key == INVALID_CDAP_KEY) {
- log_warn("Bad CDAP request.");
- continue;
- }
-
- assert(name);
- assert(strlen(name));
-
- switch (oc) {
- case CDAP_READ:
- assert(len == 0);
- slen = rib_pack(name, &buf, PACK_HASH_ROOT);
- if (slen < 0) {
- log_err("Failed to pack %s.", name);
- cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
- free(name);
- continue;
- }
-
- log_dbg("Packed %s (%zu bytes).", name, slen);
-
- free(name);
-
- if (cdap_reply_send(ribmgr.cdap, key, 0, buf, slen)) {
- log_err("Failed to send CDAP reply.");
- free(buf);
- continue;
- }
-
- free(buf);
- break;
- case CDAP_WRITE:
- assert(len);
- assert(data);
-
- rval = rib_unpack(data, len, 0);
- switch(rval) {
- case 0:
- break;
- case -EFAULT:
- log_warn("Hash mismatch, not in sync.");
- free(data);
- break;
- default:
- log_warn("Error unpacking %s.", name);
- cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
- free(name);
- free(data);
- continue;
- }
-
- free(name);
-
- if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
- log_err("Failed to send CDAP reply.");
- continue;
- }
- break;
- case CDAP_CREATE:
- assert(len);
- assert(data);
-
- rval = rib_unpack(data, len, UNPACK_CREATE);
- switch(rval) {
- case 0:
- break;
- case -EFAULT:
- log_warn("Hash mismatch, not yet in sync.");
- free(data);
- break;
- default:
- log_warn("Error unpacking %s.", name);
- cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
- free(name);
- free(data);
- continue;
- }
-
- free(name);
-
- if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
- log_err("Failed to send CDAP reply.");
- continue;
- }
- break;
- case CDAP_DELETE:
- assert(len == 0);
- if (rib_del(name)) {
- log_warn("Failed deleting %s.", name);
- cdap_reply_send(ribmgr.cdap, key, -1, NULL, 0);
- }
-
- free(name);
-
- if (cdap_reply_send(ribmgr.cdap, key, 0, NULL, 0)) {
- log_err("Failed to send CDAP reply.");
- continue;
- }
- break;
- case CDAP_START:
- case CDAP_STOP:
- log_warn("Unsupported CDAP command.");
- if (len)
- free(data);
- break;
- default:
- log_err("Bad CDAP command.");
- if (len)
- free(data);
- break;
- }
- }
-
- return (void *) 0;
-}
-
-char path[RIB_MAX_PATH_LEN + 1];
-
-static void path_reset(void) {
- path[strlen(RIB_ROOT)] = '\0';
- assert(strcmp(path, RIB_ROOT) == 0);
-}
-
-static int ribmgr_sync(const char * path)
-{
- uint8_t * buf;
- ssize_t len;
- cdap_key_t * keys;
-
- len = rib_pack(path, &buf, PACK_HASH_ALL);
- if (len < 0) {
- log_warn("Failed to pack %s.", path);
- return -1;
- }
-
- keys = cdap_request_send(ribmgr.cdap, CDAP_CREATE, path, buf, len, 0);
- if (keys != NULL) {
- cdap_key_t * key = keys;
- while (*key != INVALID_CDAP_KEY)
- cdap_reply_wait(ribmgr.cdap, *(key++), NULL, NULL);
- free(keys);
- }
-
- free(buf);
-
- return 0;
-}
-
-/* FIXME: Temporary thread, syncs rib with neighbors every second */
-static void * sync_rib(void *o)
-{
- char ** children;
- ssize_t ch;
-
- (void) o;
-
- strcpy(path, RIB_ROOT);
-
- while (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
- sleep(RIB_SYNC_TIMEOUT);
-
- ch = rib_children(RIB_ROOT, &children);
- if (ch <= 0)
- continue;
-
- while (ch > 0) {
- path_reset();
-
- rib_path_append(path, children[--ch]);
- free(children[ch]);
-
- /* Sync fsdb */
- if (strcmp(path, ROUTING_PATH) == 0)
- ribmgr_sync(path);
- }
-
- free(children);
- }
-
- return (void *) 0;
-}
-
-int ribmgr_init(void)
-{
- struct conn_info info;
-
- memset(&info, 0, sizeof(info));
-
- strcpy(info.ae_name, MGMT_AE);
- strcpy(info.protocol, CDAP_PROTO);
- info.pref_version = 1;
- info.pref_syntax = PROTO_GPB;
- info.addr = 0;
-
- ribmgr.nbs = nbs_create();
- if (ribmgr.nbs == NULL) {
- log_err("Failed to create neighbors.");
- goto fail_nbs_create;
- }
-
- if (connmgr_ae_init(AEID_MGMT, &info, ribmgr.nbs)) {
- log_err("Failed to register with connmgr.");
- goto fail_connmgr_ae_init;
- };
-
- ribmgr.cdap = cdap_create();
- if (ribmgr.cdap == NULL) {
- log_err("Failed to create CDAP instance.");
- goto fail_cdap_create;
- }
-
- ribmgr.nb_notifier.notify_call = ribmgr_neighbor_event;
- if (nbs_reg_notifier(ribmgr.nbs, &ribmgr.nb_notifier)) {
- log_err("Failed to register notifier.");
- goto fail_nbs_reg_notifier;
- }
-
- if (pthread_rwlock_init(&ribmgr.state_lock, NULL)) {
- log_err("Failed to init rwlock.");
- goto fail_rwlock_init;
- }
-
- ribmgr.state = RIBMGR_INIT;
-
- return 0;
-
- fail_rwlock_init:
- nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
- fail_nbs_reg_notifier:
- cdap_destroy(ribmgr.cdap);
- fail_cdap_create:
- connmgr_ae_fini(AEID_MGMT);
- fail_connmgr_ae_init:
- nbs_destroy(ribmgr.nbs);
- fail_nbs_create:
- return -1;
-}
-
-void ribmgr_fini(void)
-{
- if (ribmgr_get_state() == RIBMGR_SHUTDOWN) {
- pthread_join(ribmgr.reader, NULL);
- pthread_join(ribmgr.sync, NULL);
- }
-
- nbs_unreg_notifier(ribmgr.nbs, &ribmgr.nb_notifier);
- cdap_destroy(ribmgr.cdap);
- nbs_destroy(ribmgr.nbs);
-
- connmgr_ae_fini(AEID_MGMT);
-}
-
-int ribmgr_start(void)
-{
- ribmgr_set_state(RIBMGR_OPERATIONAL);
-
- if (pthread_create(&ribmgr.sync, NULL, sync_rib, NULL)) {
- ribmgr_set_state(RIBMGR_NULL);
- return -1;
- }
-
- if (pthread_create(&ribmgr.reader, NULL, reader, NULL)) {
- ribmgr_set_state(RIBMGR_SHUTDOWN);
- pthread_cancel(ribmgr.reader);
- return -1;
- }
-
- return 0;
-}
-
-void ribmgr_stop(void)
-{
- if (ribmgr_get_state() == RIBMGR_OPERATIONAL) {
- ribmgr_set_state(RIBMGR_SHUTDOWN);
- pthread_cancel(ribmgr.reader);
- }
-}
-
-int ribmgr_disseminate(char * path,
- enum diss_target target,
- enum diss_freq freq,
- size_t delay)
-{
- (void) path;
- (void) target;
- (void) freq;
- (void) delay;
-
- return 0;
-}
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
deleted file mode 100644
index 20f87548..00000000
--- a/src/ipcpd/normal/ribmgr.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * RIB manager of the IPC Process
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_IPCPD_NORMAL_RIBMGR_H
-#define OUROBOROS_IPCPD_NORMAL_RIBMGR_H
-
-#include <ouroboros/ipcp.h>
-#include <ouroboros/utils.h>
-#include <ouroboros/qos.h>
-
-enum diss_target {
- NONE = 0,
- NEIGHBORS,
- ALL_MEMBERS
-};
-
-enum diss_freq {
- SINGLE = 0,
- PERIODIC
-};
-
-int ribmgr_init(void);
-
-void ribmgr_fini(void);
-
-int ribmgr_start(void);
-
-void ribmgr_stop(void);
-
-int ribmgr_disseminate(char * path,
- enum diss_target target,
- enum diss_freq freq,
- size_t delay);
-
-#endif /* OUROBOROS_IPCPD_NORMAL_RIBMGR_H */
diff --git a/src/ipcpd/normal/routing.c b/src/ipcpd/normal/routing.c
index 04e6fd76..47ce3518 100644
--- a/src/ipcpd/normal/routing.c
+++ b/src/ipcpd/normal/routing.c
@@ -22,29 +22,24 @@
#define _POSIX_C_SOURCE 200112L
-#define OUROBOROS_PREFIX "routing"
-
-#include <ouroboros/logs.h>
+#include <ouroboros/errno.h>
#include "routing.h"
#include "pol/link_state.h"
-
struct pol_routing_ops * r_ops;
-int routing_init(enum pol_routing pr,
- struct nbs * nbs)
+int routing_init(enum pol_routing pr)
{
switch (pr) {
case LINK_STATE:
r_ops = &link_state_ops;
break;
default:
- log_err("Unknown routing type.");
- return -1;
+ return -ENOTSUP;
}
- return r_ops->init(nbs);
+ return r_ops->init();
}
struct routing_i * routing_i_create(struct pff * pff)
diff --git a/src/ipcpd/normal/routing.h b/src/ipcpd/normal/routing.h
index 0ef11020..6c8cae76 100644
--- a/src/ipcpd/normal/routing.h
+++ b/src/ipcpd/normal/routing.h
@@ -27,12 +27,10 @@
#include <ouroboros/qos.h>
#include "pff.h"
-#include "neighbors.h"
#include <stdint.h>
-int routing_init(enum pol_routing pr,
- struct nbs * nbs);
+int routing_init(enum pol_routing pr);
void routing_fini(void);
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
index c7e799e2..7a82a874 100644
--- a/src/ipcpd/normal/sdu_sched.c
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -38,9 +38,9 @@
#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */
struct sdu_sched {
- fset_t * set[QOS_CUBE_MAX];
- next_sdu_t callback;
- pthread_t sdu_readers[IPCP_SCHED_THREADS];
+ fset_t * set[QOS_CUBE_MAX];
+ next_sdu_fn_t callback;
+ pthread_t sdu_readers[IPCP_SCHED_THREADS];
};
static void cleanup_reader(void * o)
@@ -95,10 +95,7 @@ static void * sdu_reader(void * o)
continue;
}
- if (sched->callback(fd, i, sdb)) {
- log_warn("Callback reported an error.");
- continue;
- }
+ sched->callback(fd, i, sdb);
}
}
@@ -107,7 +104,7 @@ static void * sdu_reader(void * o)
return (void *) 0;
}
-struct sdu_sched * sdu_sched_create(next_sdu_t callback)
+struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback)
{
struct sdu_sched * sdu_sched;
int i;
diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h
index 05371452..733f5648 100644
--- a/src/ipcpd/normal/sdu_sched.h
+++ b/src/ipcpd/normal/sdu_sched.h
@@ -26,11 +26,11 @@
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/fqueue.h>
-typedef int (* next_sdu_t)(int fd,
- qoscube_t qc,
- struct shm_du_buff * sdb);
+typedef void (* next_sdu_fn_t)(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb);
-struct sdu_sched * sdu_sched_create(next_sdu_t callback);
+struct sdu_sched * sdu_sched_create(next_sdu_fn_t callback);
void sdu_sched_destroy(struct sdu_sched * sdu_sched);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index eeb7966b..f126a52a 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -8,8 +8,6 @@ protobuf_generate_c(IRM_PROTO_SRCS IRM_PROTO_HDRS irmd_messages.proto)
protobuf_generate_c(IPCP_PROTO_SRCS IPCP_PROTO_HDRS ipcpd_messages.proto)
protobuf_generate_c(DIF_CONFIG_PROTO_SRCS DIF_CONFIG_PROTO_HDRS
ipcp_config.proto)
-protobuf_generate_c(CDAP_PROTO_SRCS CDAP_PROTO_HDRS cdap.proto)
-protobuf_generate_c(RO_PROTO_SRCS RO_PROTO_HDRS ro.proto)
protobuf_generate_c(CACEP_PROTO_SRCS CACEP_PROTO_HDRS cacep.proto)
if (NOT APPLE)
@@ -134,8 +132,6 @@ else ()
endif ()
set(SOCKET_TIMEOUT 1000 CACHE STRING
"Default timeout for responses from IPCPs (ms)")
-set(CDAP_REPLY_TIMEOUT 6000 CACHE STRING
- "Timeout for CDAP to wait for reply")
set(SHM_PREFIX "ouroboros" CACHE STRING
"String to prepend to POSIX shared memory filenames")
set(SHM_RBUFF_PREFIX "/${SHM_PREFIX}.rbuff." CACHE INTERNAL
@@ -154,8 +150,6 @@ set(SOURCE_FILES
bitmap.c
btree.c
cacep.c
- cdap.c
- cdap_req.c
crc32.c
dev.c
frct_pci.c
@@ -166,10 +160,10 @@ set(SOURCE_FILES
lockfile.c
logs.c
md5.c
+ notifier.c
qos.c
qoscube.c
random.c
- rib.c
sha3.c
shm_flow_set.c
shm_rbuff.c
@@ -185,8 +179,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY)
add_library(ouroboros SHARED ${SOURCE_FILES} ${IRM_PROTO_SRCS}
- ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CDAP_PROTO_SRCS}
- ${CACEP_PROTO_SRCS} ${RO_PROTO_SRCS})
+ ${IPCP_PROTO_SRCS} ${DIF_CONFIG_PROTO_SRCS} ${CACEP_PROTO_SRCS})
include(AddCompileFlags)
if (CMAKE_BUILD_TYPE MATCHES Debug)
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
deleted file mode 100644
index d9cb2036..00000000
--- a/src/lib/cdap.c
+++ /dev/null
@@ -1,868 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * The Common Distributed Application Protocol
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200809L
-
-#include <ouroboros/cdap.h>
-#include <ouroboros/bitmap.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/fqueue.h>
-#include <ouroboros/errno.h>
-
-#include "cdap_req.h"
-
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <assert.h>
-
-#include "cdap.pb-c.h"
-typedef Cdap cdap_t;
-
-#define CDAP_REPLY (CDAP_DELETE + 1)
-
-#define INVALID_ID -1
-#define IDS_SIZE 2048
-#define BUF_SIZE 2048
-
-struct fd_el {
- struct list_head next;
-
- int fd;
-};
-
-struct cdap {
- fset_t * set;
- fqueue_t * fq;
-
- bool proc;
- pthread_mutex_t mtx;
- pthread_cond_t cond;
-
- size_t n_flows;
- struct list_head flows;
- pthread_rwlock_t flows_lock;
-
- struct bmp * ids;
- pthread_mutex_t ids_lock;
-
- struct list_head sent;
- pthread_rwlock_t sent_lock;
-
- struct list_head rcvd;
- pthread_cond_t rcvd_cond;
- pthread_mutex_t rcvd_lock;
-
- pthread_t reader;
-};
-
-struct cdap_rcvd {
- struct list_head next;
-
- int fd;
- bool proc;
-
- invoke_id_t iid;
- cdap_key_t key;
-
- enum cdap_opcode opcode;
- char * name;
- void * data;
- size_t len;
- uint32_t flags;
-};
-
-static int next_id(struct cdap * instance)
-{
- int ret;
-
- assert(instance);
-
- pthread_mutex_lock(&instance->ids_lock);
-
- ret = bmp_allocate(instance->ids);
- if (!bmp_is_id_valid(instance->ids, ret))
- ret = INVALID_ID;
-
- pthread_mutex_unlock(&instance->ids_lock);
-
- return ret;
-}
-
-static int release_id(struct cdap * instance,
- int32_t id)
-{
- int ret;
-
- assert(instance);
-
- pthread_mutex_lock(&instance->ids_lock);
-
- ret = bmp_release(instance->ids, id);
-
- pthread_mutex_unlock(&instance->ids_lock);
-
- return ret;
-}
-
-#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL)
-
-static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
- cdap_key_t key)
-{
- struct list_head * p = NULL;
- struct cdap_req * req = NULL;
-
- assert(instance);
-
- pthread_rwlock_rdlock(&instance->sent_lock);
-
- list_for_each(p, &instance->sent) {
- req = list_entry(p, struct cdap_req, next);
- if (req->key == key) {
- pthread_rwlock_unlock(&instance->sent_lock);
- return req;
- }
- }
-
- pthread_rwlock_unlock(&instance->sent_lock);
-
- return NULL;
-}
-
-static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance,
- invoke_id_t iid)
-{
- struct list_head * p = NULL;
- struct cdap_req * req = NULL;
-
- assert(instance);
-
- pthread_rwlock_rdlock(&instance->sent_lock);
-
- list_for_each(p, &instance->sent) {
- req = list_entry(p, struct cdap_req, next);
- if (req->iid == iid) {
- pthread_rwlock_unlock(&instance->sent_lock);
- return req;
- }
- }
-
- pthread_rwlock_unlock(&instance->sent_lock);
-
- return NULL;
-}
-
-static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance,
- cdap_key_t key)
-{
- struct list_head * p = NULL;
- struct list_head * h = NULL;
- struct cdap_rcvd * rcvd = NULL;
-
- assert(instance);
-
- pthread_mutex_lock(&instance->rcvd_lock);
-
- list_for_each_safe(p, h, &instance->rcvd) {
- rcvd = list_entry(p, struct cdap_rcvd, next);
- if (rcvd->key == key) {
- list_del(&rcvd->next);
- pthread_mutex_unlock(&instance->rcvd_lock);
- return rcvd;
- }
- }
-
- pthread_mutex_unlock(&instance->rcvd_lock);
-
- assert(false);
-
- return NULL;
-}
-
-static struct cdap_req * cdap_sent_add(struct cdap * instance,
- int fd,
- invoke_id_t iid,
- cdap_key_t key)
-{
- struct cdap_req * req;
-
- assert(instance);
- assert(!cdap_sent_has_key(instance, key));
-
- req = cdap_req_create(fd, iid, key);
- if (req == NULL)
- return NULL;
-
- pthread_rwlock_wrlock(&instance->sent_lock);
-
- list_add(&req->next, &instance->sent);
-
- pthread_rwlock_unlock(&instance->sent_lock);
-
- return req;
-}
-
-static void cdap_sent_del(struct cdap * instance,
- struct cdap_req * req)
-{
- assert(instance);
- assert(req);
-
- assert(cdap_sent_has_key(instance, req->key));
-
- pthread_rwlock_wrlock(&instance->sent_lock);
-
- list_del(&req->next);
-
- pthread_rwlock_unlock(&instance->sent_lock);
-
- cdap_req_destroy(req);
-}
-
-static void cdap_sent_destroy(struct cdap * instance)
-{
- struct list_head * p = NULL;
- struct list_head * h = NULL;
-
- assert(instance);
-
- pthread_rwlock_wrlock(&instance->sent_lock);
-
- list_for_each_safe(p, h, &instance->sent) {
- struct cdap_req * req = list_entry(p, struct cdap_req, next);
- list_del(&req->next);
- cdap_req_cancel(req);
- cdap_req_destroy(req);
- }
-
- pthread_rwlock_unlock(&instance->sent_lock);
-}
-
-static void cdap_rcvd_destroy(struct cdap * instance)
-{
- struct list_head * p = NULL;
- struct list_head * h = NULL;
-
- assert(instance);
-
- pthread_mutex_lock(&instance->rcvd_lock);
-
- list_for_each_safe(p, h, &instance->rcvd) {
- struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next);
- list_del(&r->next);
- if (r->data != NULL)
- free(r->data);
- if (r->name != NULL)
- free(r->name);
- free(r);
- }
-
- pthread_cond_broadcast(&instance->rcvd_cond);
-
- pthread_mutex_unlock(&instance->rcvd_lock);
-}
-
-static void set_proc(struct cdap * instance,
- bool status)
-{
- pthread_mutex_lock(&instance->mtx);
-
- instance->proc = status;
- pthread_cond_signal(&instance->cond);
-
- pthread_mutex_unlock(&instance->mtx);
-}
-
-static void * sdu_reader(void * o)
-{
- struct cdap * instance = (struct cdap *) o;
- struct cdap_req * req;
- struct cdap_rcvd * rcvd;
- cdap_t * msg;
- uint8_t buf[BUF_SIZE];
- ssize_t len;
- buffer_t data;
-
- while (fevent(instance->set, instance->fq, NULL)) {
- int fd;
- set_proc(instance, true);
- fd = fqueue_next(instance->fq);
- len = flow_read(fd, buf, BUF_SIZE);
- if (len < 0) {
- set_proc(instance, false);
- continue;
- }
-
- msg = cdap__unpack(NULL, len, buf);
- if (msg == NULL) {
- set_proc(instance, false);
- continue;
- }
-
- if (msg->opcode != CDAP_REPLY) {
- rcvd = malloc(sizeof(*rcvd));
- if (rcvd == NULL) {
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- continue;
- }
-
- assert(msg->name);
-
- rcvd->opcode = msg->opcode;
- rcvd->fd = fd;
- rcvd->iid = msg->invoke_id;
- rcvd->key = next_id(instance);
- if (rcvd->key == INVALID_ID) {
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- free(rcvd);
- continue;
- }
-
- rcvd->flags = msg->flags;
- rcvd->proc = false;
- rcvd->name = strdup(msg->name);
- if (rcvd->name == NULL) {
- release_id(instance, rcvd->key);
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- free(rcvd);
- continue;
- }
-
- if (msg->has_value) {
- rcvd->len = msg->value.len;
- rcvd->data = malloc(rcvd->len);
- if (rcvd->data == NULL) {
- release_id(instance, rcvd->key);
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- free(rcvd->name);
- free(rcvd);
- continue;
- }
- memcpy(rcvd->data, msg->value.data, rcvd->len);
- } else {
- rcvd->len = 0;
- rcvd->data = NULL;
- }
-
- pthread_mutex_lock(&instance->rcvd_lock);
-
- list_add(&rcvd->next, &instance->rcvd);
-
- pthread_cond_signal(&instance->rcvd_cond);
- pthread_mutex_unlock(&instance->rcvd_lock);
- } else {
- req = cdap_sent_get_by_iid(instance, msg->invoke_id);
- if (req == NULL) {
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- continue;
- }
-
- if (msg->has_value) {
- data.len = msg->value.len;
- data.data = malloc(data.len);
- if (data.data == NULL) {
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- continue;
- }
- memcpy(data.data, msg->value.data, data.len);
- } else {
- data.len = 0;
- data.data = NULL;
- }
-
- cdap_req_respond(req, msg->result, data);
- }
-
- cdap__free_unpacked(msg, NULL);
- set_proc(instance, false);
- }
-
- return (void *) 0;
-}
-
-struct cdap * cdap_create()
-{
- struct cdap * instance = NULL;
-
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
- goto fail_malloc;
-
- if (pthread_rwlock_init(&instance->flows_lock, NULL))
- goto fail_flows_lock;
-
- if (pthread_mutex_init(&instance->ids_lock, NULL))
- goto fail_ids_lock;
-
- if (pthread_mutex_init(&instance->rcvd_lock, NULL))
- goto fail_rcvd_lock;
-
- if (pthread_rwlock_init(&instance->sent_lock, NULL))
- goto fail_sent_lock;
-
- if (pthread_cond_init(&instance->rcvd_cond, NULL))
- goto fail_rcvd_cond;
-
- if (pthread_mutex_init(&instance->mtx, NULL))
- goto fail_mtx;
-
- if (pthread_cond_init(&instance->cond, NULL))
- goto fail_cond;
-
- instance->ids = bmp_create(IDS_SIZE, 0);
- if (instance->ids == NULL)
- goto fail_bmp_create;
-
- instance->set = fset_create();
- if (instance->set == NULL)
- goto fail_set_create;
-
- instance->fq = fqueue_create();
- if (instance->fq == NULL)
- goto fail_fqueue_create;
-
- instance->n_flows = 0;
- instance->proc = false;
-
- list_head_init(&instance->flows);
- list_head_init(&instance->sent);
- list_head_init(&instance->rcvd);
-
- if (pthread_create(&instance->reader, NULL, sdu_reader, instance))
- goto fail_pthread_create;
-
- return instance;
-
- fail_pthread_create:
- fqueue_destroy(instance->fq);
- fail_fqueue_create:
- fset_destroy(instance->set);
- fail_set_create:
- bmp_destroy(instance->ids);
- fail_bmp_create:
- pthread_cond_destroy(&instance->cond);
- fail_cond:
- pthread_mutex_destroy(&instance->mtx);
- fail_mtx:
- pthread_cond_destroy(&instance->rcvd_cond);
- fail_rcvd_cond:
- pthread_rwlock_destroy(&instance->sent_lock);
- fail_sent_lock:
- pthread_mutex_destroy(&instance->rcvd_lock);
- fail_rcvd_lock:
- pthread_mutex_destroy(&instance->ids_lock);
- fail_ids_lock:
- pthread_rwlock_destroy(&instance->flows_lock);
- fail_flows_lock:
- free(instance);
- fail_malloc:
- return NULL;
-}
-
-int cdap_destroy(struct cdap * instance)
-{
- struct list_head * p;
- struct list_head * h;
-
- if (instance == NULL)
- return 0;
-
- pthread_cancel(instance->reader);
- pthread_join(instance->reader, NULL);
-
- fqueue_destroy(instance->fq);
-
- fset_destroy(instance->set);
-
- pthread_cond_destroy(&instance->cond);
- pthread_mutex_destroy(&instance->mtx);
-
- pthread_rwlock_wrlock(&instance->flows_lock);
-
- list_for_each_safe(p,h, &instance->flows) {
- struct fd_el * e = list_entry(p, struct fd_el, next);
- list_del(&e->next);
- free(e);
- }
-
- pthread_rwlock_unlock(&instance->flows_lock);
-
- pthread_rwlock_destroy(&instance->flows_lock);
-
- pthread_mutex_lock(&instance->ids_lock);
-
- bmp_destroy(instance->ids);
-
- pthread_mutex_unlock(&instance->ids_lock);
-
- pthread_mutex_destroy(&instance->ids_lock);
-
- cdap_sent_destroy(instance);
-
- pthread_rwlock_destroy(&instance->sent_lock);
-
- cdap_rcvd_destroy(instance);
-
- pthread_mutex_destroy(&instance->rcvd_lock);
-
- free(instance);
-
- return 0;
-}
-
-int cdap_add_flow(struct cdap * instance,
- int fd)
-{
- struct fd_el * e;
-
- if (fd < 0)
- return -EINVAL;
-
- e = malloc(sizeof(*e));
- if (e == NULL)
- return -ENOMEM;
-
- e->fd = fd;
-
- pthread_rwlock_wrlock(&instance->flows_lock);
-
- if (fset_add(instance->set, fd)) {
- pthread_rwlock_unlock(&instance->flows_lock);
- free(e);
- return -1;
- }
-
- list_add(&e->next, &instance->flows);
-
- ++instance->n_flows;
-
- pthread_rwlock_unlock(&instance->flows_lock);
-
- return 0;
-}
-
-int cdap_del_flow(struct cdap * instance,
- int fd)
-{
- struct list_head * p;
- struct list_head * h;
-
- if (fd < 0)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&instance->flows_lock);
-
- fset_del(instance->set, fd);
-
- list_for_each_safe(p, h, &instance->flows) {
- struct fd_el * e = list_entry(p, struct fd_el, next);
- if (e->fd == fd) {
- list_del(&e->next);
- free(e);
- break;
- }
- }
-
- --instance->n_flows;
-
- pthread_rwlock_unlock(&instance->flows_lock);
-
- pthread_mutex_lock(&instance->mtx);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &instance->mtx);
-
- while (instance->proc)
- pthread_cond_wait(&instance->cond, &instance->mtx);
-
- pthread_cleanup_pop(true);
-
- return 0;
-}
-
-static int write_msg(int fd,
- cdap_t * msg)
-{
- uint8_t * data;
- size_t len;
-
- assert(msg);
-
- len = cdap__get_packed_size(msg);
- if (len == 0)
- return -1;
-
- data = malloc(len);
- if (data == NULL)
- return -ENOMEM;
-
- cdap__pack(msg, data);
-
- if (flow_write(fd, data, len)) {
- free(data);
- return -1;
- }
-
- free(data);
-
- return 0;
-}
-
-cdap_key_t * cdap_request_send(struct cdap * instance,
- enum cdap_opcode code,
- const char * name,
- const void * data,
- size_t len,
- uint32_t flags)
-{
- cdap_key_t * keys;
- cdap_key_t * key;
- cdap_t msg = CDAP__INIT;
- struct list_head * p;
- int ret;
-
- if (instance == NULL || name == NULL || code > CDAP_DELETE)
- return NULL;
-
- pthread_rwlock_rdlock(&instance->flows_lock);
-
- keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
- if (keys == NULL) {
- pthread_rwlock_unlock(&instance->flows_lock);
- return NULL;
- }
-
- memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));
-
- key = keys;
-
- cdap__init(&msg);
-
- msg.opcode = code;
- msg.name = (char *) name;
- msg.has_flags = true;
- msg.flags = flags;
-
- if (data != NULL) {
- msg.has_value = true;
- msg.value.data = (uint8_t *) data;
- msg.value.len = len;
- }
-
- list_for_each(p, &instance->flows) {
- struct cdap_req * req;
- invoke_id_t iid;
- struct fd_el * e;
-
- iid = next_id(instance);
- if (iid == INVALID_ID) {
- pthread_rwlock_unlock(&instance->flows_lock);
- return keys;
- }
-
- msg.invoke_id = iid;
-
- e = list_entry(p, struct fd_el, next);
-
- *key = next_id(instance);
- if (*key == INVALID_ID) {
- release_id(instance, iid);
- pthread_rwlock_unlock(&instance->flows_lock);
- return keys;
- }
-
- req = cdap_sent_add(instance, e->fd, iid, *key);
- if (req == NULL) {
- release_id(instance, *key);
- release_id(instance, iid);
- pthread_rwlock_unlock(&instance->flows_lock);
- *key = INVALID_CDAP_KEY;
- return keys;
- }
-
- ret = write_msg(e->fd, &msg);
- if (ret == -ENOMEM) {
- cdap_sent_del(instance, req);
- release_id(instance, *key);
- release_id(instance, iid);
- pthread_rwlock_unlock(&instance->flows_lock);
- *key = INVALID_CDAP_KEY;
- return keys;
- }
-
- if (ret < 0) {
- cdap_sent_del(instance, req);
- release_id(instance, *key);
- release_id(instance, iid);
- pthread_rwlock_unlock(&instance->flows_lock);
- *key = INVALID_CDAP_KEY;
- return keys;
- }
-
- ++key;
- }
-
- pthread_rwlock_unlock(&instance->flows_lock);
-
- return keys;
-}
-
-int cdap_reply_wait(struct cdap * instance,
- cdap_key_t key,
- uint8_t ** data,
- size_t * len)
-{
- int ret;
- struct cdap_req * r;
- invoke_id_t iid;
-
- if (instance == NULL || (data != NULL && len == NULL))
- return -EINVAL;
-
- r = cdap_sent_get_by_key(instance, key);
- if (r == NULL)
- return -EINVAL;
-
- iid = r->iid;
-
- ret = cdap_req_wait(r);
- if (ret < 0) {
- cdap_sent_del(instance, r);
- release_id(instance, iid);
- release_id(instance, key);
- return ret;
- }
-
- assert(ret == 0);
-
- if (data != NULL) {
- *data = r->data.data;
- *len = r->data.len;
- }
-
- ret = r->response;
-
- cdap_sent_del(instance, r);
- release_id(instance, iid);
- release_id(instance, key);
-
- return ret;
-}
-
-cdap_key_t cdap_request_wait(struct cdap * instance,
- enum cdap_opcode * opcode,
- char ** name,
- uint8_t ** data,
- size_t * len,
- uint32_t * flags)
-{
- struct cdap_rcvd * rcv = NULL;
-
- if (instance == NULL || opcode == NULL || name == NULL || data == NULL
- || len == NULL || flags == NULL)
- return -EINVAL;
-
- pthread_mutex_lock(&instance->rcvd_lock);
-
- pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
- (void *) &instance->rcvd_lock);
-
- while (rcv == NULL) {
- while (list_is_empty(&instance->rcvd))
- pthread_cond_wait(&instance->rcvd_cond,
- &instance->rcvd_lock);
-
- rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
- if (rcv->proc) {
- rcv = NULL;
- pthread_cond_wait(&instance->rcvd_cond,
- &instance->rcvd_lock);
- }
- }
-
- assert(rcv->proc == false);
-
- rcv->proc = true;
- list_del(&rcv->next);
- list_add_tail(&rcv->next, &instance->rcvd);
-
- pthread_cleanup_pop(true);
-
- *opcode = rcv->opcode;
- *name = rcv->name;
- *data = rcv->data;
- *len = rcv->len;
- *flags = rcv->flags;
-
- rcv->name = NULL;
- rcv->data = NULL;
-
- return rcv->key;
-}
-
-int cdap_reply_send(struct cdap * instance,
- cdap_key_t key,
- int result,
- const void * data,
- size_t len)
-{
- int fd;
- cdap_t msg = CDAP__INIT;
- struct cdap_rcvd * rcvd;
-
- if (instance == NULL)
- return -EINVAL;
-
- rcvd = cdap_rcvd_get_by_key(instance, key);
- if (rcvd == NULL)
- return -1;
-
- msg.opcode = CDAP_REPLY;
- msg.invoke_id = rcvd->iid;
- msg.has_result = true;
- msg.result = result;
-
- if (data != NULL) {
- msg.has_value = true;
- msg.value.data = (uint8_t *) data;
- msg.value.len = len;
- }
-
- fd = rcvd->fd;
-
- release_id(instance, rcvd->key);
-
- assert(rcvd->data == NULL);
- assert(rcvd->name == NULL);
- assert(rcvd->proc);
-
- free(rcvd);
-
- return write_msg(fd, &msg);
-}
diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto
deleted file mode 100644
index 29effc9a..00000000
--- a/src/lib/cdap.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016- 2017
- *
- * CDAP message
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-syntax = "proto2";
-
-message cdap {
- required uint32 opcode = 1;
- required uint32 invoke_id = 2;
- optional uint32 flags = 3;
- optional string name = 4;
- optional bytes value = 5;
- optional int32 result = 6;
-}
diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c
deleted file mode 100644
index a9b85525..00000000
--- a/src/lib/cdap_req.c
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * CDAP - CDAP request management
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200809L
-
-#include "config.h"
-
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
-
-#include "cdap_req.h"
-
-#include <stdlib.h>
-#include <assert.h>
-
-struct cdap_req * cdap_req_create(int fd,
- invoke_id_t iid,
- cdap_key_t key)
-{
- struct cdap_req * creq = malloc(sizeof(*creq));
- pthread_condattr_t cattr;
-
- if (creq == NULL)
- return NULL;
-
- creq->fd = fd;
- creq->iid = iid;
- creq->key = key;
- creq->state = REQ_INIT;
- creq->response = -1;
- creq->data.data = NULL;
- creq->data.len = 0;
-
- if (pthread_mutex_init(&creq->lock, NULL)) {
- free(creq);
- return NULL;
- }
-
- pthread_condattr_init(&cattr);
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&creq->cond, &cattr)) {
- pthread_condattr_destroy(&cattr);
- pthread_mutex_destroy(&creq->lock);
- free(creq);
- return NULL;
- }
-
- pthread_condattr_destroy(&cattr);
-
- list_head_init(&creq->next);
-
- clock_gettime(PTHREAD_COND_CLOCK, &creq->birth);
-
- return creq;
-}
-
-void cdap_req_destroy(struct cdap_req * creq)
-{
- assert(creq);
-
- pthread_mutex_lock(&creq->lock);
-
- switch(creq->state) {
- case REQ_DESTROY:
- pthread_mutex_unlock(&creq->lock);
- return;
- case REQ_INIT:
- creq->state = REQ_NULL;
- pthread_cond_broadcast(&creq->cond);
- break;
- case REQ_INIT_PENDING:
- case REQ_PENDING:
- case REQ_RESPONSE:
- creq->state = REQ_DESTROY;
- pthread_cond_broadcast(&creq->cond);
- break;
- default:
- break;
- }
-
- while (creq->state != REQ_NULL)
- pthread_cond_wait(&creq->cond, &creq->lock);
-
- pthread_mutex_unlock(&creq->lock);
-
- pthread_cond_destroy(&creq->cond);
- pthread_mutex_destroy(&creq->lock);
-
- free(creq);
-}
-
-int cdap_req_wait(struct cdap_req * creq)
-{
- struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),
- (CDAP_REPLY_TIMEOUT % 1000) * MILLION};
- struct timespec abstime;
- int ret = -1;
-
- assert(creq);
-
- ts_add(&creq->birth, &timeout, &abstime);
-
- pthread_mutex_lock(&creq->lock);
-
- if (creq->state != REQ_INIT) {
- pthread_mutex_unlock(&creq->lock);
- return -EINVAL;
- }
-
- creq->state = REQ_PENDING;
- pthread_cond_broadcast(&creq->cond);
-
- while (creq->state == REQ_PENDING && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&creq->cond,
- &creq->lock,
- &abstime);
-
- switch(creq->state) {
- case REQ_DESTROY:
- ret = -1;
- /* FALLTHRU */
- case REQ_PENDING:
- creq->state = REQ_NULL;
- pthread_cond_broadcast(&creq->cond);
- break;
- case REQ_RESPONSE:
- creq->state = REQ_DONE;
- pthread_cond_broadcast(&creq->cond);
- break;
- default:
- assert(false);
- break;
- }
-
- pthread_mutex_unlock(&creq->lock);
-
- return ret;
-}
-
-void cdap_req_respond(struct cdap_req * creq,
- int response,
- buffer_t data)
-{
- assert(creq);
-
- pthread_mutex_lock(&creq->lock);
-
- if (creq->state == REQ_INIT)
- creq->state = REQ_INIT_PENDING;
-
- while (creq->state == REQ_INIT_PENDING)
- pthread_cond_wait(&creq->cond, &creq->lock);
-
- if (creq->state != REQ_PENDING) {
- creq->state = REQ_NULL;
- pthread_cond_broadcast(&creq->cond);
- pthread_mutex_unlock(&creq->lock);
- return;
- }
-
- creq->state = REQ_RESPONSE;
- creq->response = response;
- creq->data = data;
-
- pthread_cond_broadcast(&creq->cond);
-
- while (creq->state == REQ_RESPONSE)
- pthread_cond_wait(&creq->cond, &creq->lock);
-
- creq->state = REQ_NULL;
- pthread_cond_broadcast(&creq->cond);
-
- pthread_mutex_unlock(&creq->lock);
-}
-
-
-void cdap_req_cancel(struct cdap_req * creq)
-{
- assert(creq);
-
- pthread_mutex_lock(&creq->lock);
-
- creq->state = REQ_NULL;
- pthread_cond_broadcast(&creq->cond);
-
- pthread_mutex_unlock(&creq->lock);
-}
diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h
deleted file mode 100644
index 4c9cd15b..00000000
--- a/src/lib/cdap_req.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * CDAP - CDAP request management
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_CDAP_REQ_H
-#define OUROBOROS_CDAP_REQ_H
-
-#include <ouroboros/cdap.h>
-#include <ouroboros/list.h>
-#include <ouroboros/utils.h>
-
-#include <pthread.h>
-
-typedef cdap_key_t invoke_id_t;
-
-enum creq_state {
- REQ_NULL = 0,
- REQ_INIT,
- REQ_INIT_PENDING,
- REQ_PENDING,
- REQ_RESPONSE,
- REQ_DONE,
- REQ_DESTROY
-};
-
-struct cdap_req {
- struct list_head next;
-
- int fd;
- struct timespec birth;
- cdap_key_t key;
- invoke_id_t iid;
-
- int response;
- buffer_t data;
-
- enum creq_state state;
- pthread_cond_t cond;
- pthread_mutex_t lock;
-};
-
-struct cdap_req * cdap_req_create(int fd,
- cdap_key_t key,
- invoke_id_t iid);
-
-void cdap_req_destroy(struct cdap_req * creq);
-
-int cdap_req_wait(struct cdap_req * creq);
-
-void cdap_req_respond(struct cdap_req * creq,
- int response,
- buffer_t data);
-
-void cdap_req_cancel(struct cdap_req * creq);
-
-#endif /* OUROBOROS_CDAP_REQ_H */
diff --git a/src/lib/hashtable.c b/src/lib/hashtable.c
index 75cdee84..2aa248ba 100644
--- a/src/lib/hashtable.c
+++ b/src/lib/hashtable.c
@@ -38,7 +38,8 @@ struct htable {
uint64_t buckets_size;
};
-struct htable * htable_create(uint64_t buckets, bool hash_key)
+struct htable * htable_create(uint64_t buckets,
+ bool hash_key)
{
struct htable * tmp;
unsigned int i;
diff --git a/src/lib/notifier.c b/src/lib/notifier.c
new file mode 100644
index 00000000..cfd383d4
--- /dev/null
+++ b/src/lib/notifier.c
@@ -0,0 +1,128 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Notifier event system using callbacks
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/errno.h>
+#include <ouroboros/notifier.h>
+#include <ouroboros/list.h>
+
+#include <pthread.h>
+#include <stdlib.h>
+
+struct listener {
+ struct list_head next;
+ notifier_fn_t callback;
+};
+
+struct {
+ struct list_head listeners;
+ pthread_mutex_t lock;
+} notifier;
+
+int notifier_init(void)
+{
+ if (pthread_mutex_init(&notifier.lock, NULL))
+ return -1;
+
+ list_head_init(&notifier.listeners);
+
+ return 0;
+}
+
+void notifier_fini(void)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_mutex_lock(&notifier.lock);
+
+ list_for_each_safe(p, h, &notifier.listeners) {
+ struct listener * l = list_entry(p, struct listener, next);
+ list_del(&l->next);
+ free(l);
+ }
+
+ pthread_mutex_unlock(&notifier.lock);
+
+ pthread_mutex_destroy(&notifier.lock);
+}
+
+void notifier_event(int event,
+ const void * o)
+{
+ struct list_head * p;
+
+ pthread_mutex_lock(&notifier.lock);
+
+ list_for_each(p, &notifier.listeners)
+ list_entry(p, struct listener, next)->callback(event, o);
+
+ pthread_mutex_unlock(&notifier.lock);
+}
+
+int notifier_reg(notifier_fn_t callback)
+{
+ struct listener * l;
+ struct list_head * p;
+
+ pthread_mutex_lock(&notifier.lock);
+
+ list_for_each(p, &notifier.listeners) {
+ struct listener * l = list_entry(p, struct listener, next);
+ if (l->callback == callback) {
+ pthread_mutex_unlock(&notifier.lock);
+ return -EPERM;
+ }
+ }
+
+ l = malloc(sizeof(*l));
+ if (l == NULL) {
+ pthread_mutex_unlock(&notifier.lock);
+ return -ENOMEM;
+ }
+
+ l->callback = callback;
+
+ list_add(&l->next, &notifier.listeners);
+
+ pthread_mutex_unlock(&notifier.lock);
+
+ return 0;
+}
+
+void notifier_unreg(notifier_fn_t callback)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_mutex_lock(&notifier.lock);
+
+ list_for_each_safe(p, h, &notifier.listeners) {
+ struct listener * l = list_entry(p, struct listener, next);
+ if (l->callback == callback) {
+ list_del(&l->next);
+ free(l);
+ break;
+ }
+ }
+
+ pthread_mutex_unlock(&notifier.lock);
+}
diff --git a/src/lib/rib.c b/src/lib/rib.c
deleted file mode 100644
index 9e45a302..00000000
--- a/src/lib/rib.c
+++ /dev/null
@@ -1,1431 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Resource Information Base
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200809L
-
-#include "config.h"
-
-#include <ouroboros/errno.h>
-#include <ouroboros/list.h>
-#include <ouroboros/rib.h>
-#include <ouroboros/rqueue.h>
-#include <ouroboros/bitmap.h>
-#include <ouroboros/crc32.h>
-#include <ouroboros/time_utils.h>
-#include <ouroboros/sha3.h>
-#include <ouroboros/btree.h>
-
-#include "ro.pb-c.h"
-typedef RoMsg ro_msg_t;
-
-#include <pthread.h>
-#include <string.h>
-#include <assert.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#define RIB_PATH_DLR "/"
-#define RIB_BTREE_ORDER 64
-#define GEN_NAME_SIZE 8
-
-struct revent {
- struct list_head next;
-
- char * path;
- int32_t flags;
-};
-
-struct rqueue {
- struct list_head events;
-};
-
-struct ro_set {
- uint32_t sid;
-};
-
-struct rn_ptr {
- struct list_head next;
-
- struct rnode * node;
-};
-
-struct rib_sub {
- struct list_head next;
-
- uint32_t sid;
-
- struct list_head rnodes;
-
- struct list_head events;
-
- pthread_cond_t cond;
- pthread_mutex_t lock;
-};
-
-struct rn_sub {
- struct list_head next;
-
- struct rib_sub * sub;
- int32_t flags;
-};
-
-struct rnode {
- char * path;
- char * name;
-
- uint8_t * data;
- size_t len;
-
- uint8_t sha3[SHA3_256_HASH_LEN];
-
- struct rnode * parent;
-
- size_t chlen;
- struct list_head children;
-
- struct list_head subs;
-};
-
-struct child {
- struct list_head next;
-
- struct rnode * node;
-};
-
-struct rib {
- struct rnode * root;
-
- struct btree * idx;
-
- pthread_rwlock_t lock;
-
- struct bmp * sids;
-
- struct list_head subs;
-
- pthread_rwlock_t s_lock;
-} rib;
-
-static void rnode_hash(struct rnode * node)
-{
- struct sha3_ctx ctx;
- struct list_head * p;
-
- assert(node);
- assert(node->path);
- assert(node->name);
-
- rhash_sha3_256_init(&ctx);
-
- rhash_sha3_update(&ctx, (uint8_t *) node->path, strlen(node->path));
-
- if (node->data != NULL)
- rhash_sha3_update(&ctx, node->data, node->len);
-
- list_for_each(p, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- rhash_sha3_update(&ctx, c->node->sha3, SHA3_256_HASH_LEN);
- }
-
- rhash_sha3_final(&ctx, node->sha3);
-}
-
-static void branch_hash(struct rnode * node)
-{
- assert(node);
-
- do {
- rnode_hash(node);
- node = node->parent;
- } while (node != NULL);
-}
-
-static struct revent * revent_dup(struct revent * ev)
-{
- struct revent * re;
-
- assert(ev);
- assert(ev->path);
-
- re = malloc(sizeof(*re));
- if (re == NULL)
- return NULL;
-
- re->path = strdup(ev->path);
- if (re->path == NULL) {
- free(re);
- return NULL;
- }
-
- re->flags = ev->flags;
-
- list_head_init(&re->next);
-
- return re;
-}
-
-/* defined below but needed here */
-static void rib_sub_del_rnode(struct rib_sub * sub,
- struct rnode * node);
-
-static void rnode_notify_subs(struct rnode * node,
- struct rnode * ch,
- struct revent * ev)
-{
- struct list_head * p;
-
- assert(node);
-
- list_for_each(p, &node->subs) {
- struct rn_sub * s = list_entry(p, struct rn_sub, next);
- if (s->flags & ev->flags) {
- struct revent * e = revent_dup(ev);
- if (e == NULL)
- continue;
-
- pthread_mutex_lock(&s->sub->lock);
- list_add_tail(&e->next, &s->sub->events);
- pthread_cond_signal(&s->sub->cond);
- pthread_mutex_unlock(&s->sub->lock);
- }
-
- if (ev->flags & RO_DELETE)
- rib_sub_del_rnode(s->sub, ch);
- }
-}
-
-static int rnode_throw_event(struct rnode * node,
- int32_t flags)
-{
- struct revent * ev = malloc(sizeof(*ev));
- struct rnode * rn = node;
-
- assert(node);
- assert(node->path);
-
- if (ev == NULL)
- return -ENOMEM;
-
- list_head_init(&ev->next);
-
- ev->path = strdup(node->path);
- if (ev->path == NULL) {
- free(ev);
- return -ENOMEM;
- }
-
- ev->flags = flags;
-
- do {
- rnode_notify_subs(rn, node, ev);
- rn = rn->parent;
- } while (rn != NULL);
-
- free(ev->path);
- free(ev);
-
- return 0;
-}
-
-static int rnode_add_child(struct rnode * node,
- struct rnode * child)
-{
- struct child * c;
- struct list_head * p;
- struct child * n;
-
- assert(node);
- assert(child);
-
- c = malloc(sizeof(*c));
- if (c == NULL)
- return -ENOMEM;
-
- c->node = child;
-
- list_for_each(p, &node->children) {
- n = list_entry(p, struct child, next);
- if (strcmp(n->node->name, child->name) > 0)
- break;
- }
-
- list_add_tail(&c->next, p);
-
- ++node->chlen;
-
- return 0;
-}
-
-static void rnode_remove_child(struct rnode * node,
- struct rnode * child)
-{
- struct list_head * p;
- struct list_head * h;
-
- assert(node);
- assert(child);
-
- list_for_each_safe(p, h, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- if (c->node == child) {
- list_del(&c->next);
- free(c);
- --node->chlen;
- return;
- }
- }
-}
-
-static struct rnode * rnode_create(struct rnode * parent,
- const char * name)
-{
- struct rnode * node;
- char * parent_path;
-
- uint32_t crc = 0;
-
- assert(name);
-
- node = malloc(sizeof(*node));
- if (node == NULL)
- return NULL;
-
- list_head_init(&node->children);
- list_head_init(&node->subs);
-
- if (parent == NULL)
- parent_path = "";
- else
- parent_path = parent->path;
-
- node->path = malloc(strlen(parent_path)
- + strlen(RIB_PATH_DLR)
- + strlen(name)
- + 1);
- if (node->path == NULL) {
- free(node);
- return NULL;
- }
-
- strcpy(node->path, parent_path);
- node->name = node->path + strlen(parent_path);
- if (parent != NULL) {
- strcpy(node->name, RIB_PATH_DLR);
- node->name += strlen(RIB_PATH_DLR);
- }
-
- strcpy(node->name, name);
-
- if (parent != NULL) {
- if (rnode_add_child(parent, node)) {
- free(node->path);
- free(node);
- return NULL;
- }
- }
-
- node->data = NULL;
- node->len = 0;
-
- node->parent = parent;
-
- node->chlen = 0;
-
- crc32(&crc, node->path, strlen(node->path));
- btree_insert(rib.idx, crc, node);
-
- branch_hash(node);
- rnode_throw_event(node, RO_CREATE);
-
- return node;
-}
-
-static void destroy_rnode(struct rnode * node)
-{
- struct list_head * p;
- struct list_head * h;
-
- uint32_t crc = 0;
-
- assert(node);
-
- if (node != rib.root) {
- rnode_remove_child(node->parent, node);
- branch_hash(node->parent);
- }
-
- if (node->parent != NULL)
- rnode_throw_event(node->parent, RO_DELETE);
-
- list_for_each_safe(p, h, &node->subs) {
- struct rn_sub * s = list_entry(p, struct rn_sub, next);
- list_del(&s->next);
- free(s);
- }
-
- crc32(&crc, node->path, strlen(node->path));
- btree_remove(rib.idx, crc);
-
- free(node->path);
- if (node->data != NULL)
- free(node->data);
-
- free(node);
-}
-
-static void destroy_rtree(struct rnode * node)
-{
- struct list_head * p;
- struct list_head * h;
-
- assert(node);
-
- list_for_each_safe(p, h, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- destroy_rtree(c->node);
- }
-
- destroy_rnode(node);
-}
-
-static void rnode_update(struct rnode * node,
- uint8_t * data,
- size_t len)
-{
- assert(node);
- assert(!(data == NULL && len != 0));
- assert(!(data != NULL && len == 0));
-
- if (node->data != NULL)
- free(node->data);
-
- node->data = data;
- node->len = len;
-
- rnode_throw_event(node, RO_MODIFY);
-
- branch_hash(node);
-}
-
-static struct rn_sub * rnode_get_sub(struct rnode * node,
- struct rib_sub * sub)
-{
- struct list_head * p;
-
- list_for_each(p, &node->subs) {
- struct rn_sub * r = list_entry(p, struct rn_sub, next);
- if (r->sub == sub)
- return r;
- }
-
- return NULL;
-}
-
-static int rnode_add_sub(struct rnode * node,
- struct rib_sub * sub,
- int32_t flags)
-{
- struct rn_sub * rs;
-
- assert(node);
- assert(sub);
-
- rs = rnode_get_sub(node, sub);
- if (rs != NULL)
- return -EPERM;
-
- rs = malloc(sizeof(*rs));
- if (rs == NULL)
- return -ENOMEM;
-
- rs->sub = sub;
- rs->flags = flags;
-
- list_add(&rs->next, &node->subs);
-
- return 0;
-}
-
-static int rnode_del_sub(struct rnode * node,
- struct rib_sub * sub)
-{
- struct rn_sub * rs;
-
- assert(node);
- assert(sub);
-
- rs = rnode_get_sub(node, sub);
- if (rs == NULL)
- return 0;
-
- list_del(&rs->next);
- free(rs);
-
- return 0;
-}
-
-static struct rnode * find_rnode_by_path(const char * path)
-{
- uint32_t crc = 0;
-
- if (strcmp(path, RIB_ROOT) == 0)
- return rib.root;
-
- crc32(&crc, path, strlen(path));
-
- return (struct rnode *) btree_search(rib.idx, crc);
-}
-
-int rib_init(void)
-{
- if (rib.root != NULL)
- return -EPERM;
-
- rib.idx = btree_create(RIB_BTREE_ORDER);
- if (rib.idx == NULL) {
- destroy_rtree(rib.root);
- rib.root = NULL;
- return -1;
- }
-
- rib.root = rnode_create(NULL, "");
- if (rib.root == NULL)
- return -ENOMEM;
-
- rib.sids = bmp_create(32, 1);
- if (rib.sids == NULL) {
- btree_destroy(rib.idx);
- destroy_rtree(rib.root);
- rib.root = NULL;
- return -1;
- }
-
- if (pthread_rwlock_init(&rib.lock, NULL)) {
- bmp_destroy(rib.sids);
- btree_destroy(rib.idx);
- destroy_rtree(rib.root);
- rib.root = NULL;
- return -1;
- }
-
- if (pthread_rwlock_init(&rib.s_lock, NULL)) {
- pthread_rwlock_destroy(&rib.lock);
- bmp_destroy(rib.sids);
- btree_destroy(rib.idx);
- destroy_rtree(rib.root);
- rib.root = NULL;
- return -1;
- }
-
- list_head_init(&rib.subs);
-
- assert(rib.root);
-
- return 0;
-}
-
-void rib_fini(void)
-{
- if (rib.root == NULL)
- return;
-
- bmp_destroy(rib.sids);
-
- destroy_rtree(rib.root);
- rib.root = NULL;
-
- btree_destroy(rib.idx);
-
- pthread_rwlock_destroy(&rib.lock);
-}
-
-int rib_add(const char * path,
- const char * name)
-{
- struct rnode * parent;
- struct rnode * node;
-
- if (name == NULL)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- parent = find_rnode_by_path(path);
- if (parent == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- node = rnode_create(parent, name);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -ENOMEM;
- }
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-int rib_del(char * path)
-{
- struct rnode * node;
-
- if (path == NULL)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EINVAL;
- }
-
- destroy_rtree(node);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-ssize_t rib_read(const char * path,
- void * data,
- size_t len)
-{
- struct rnode * node;
- ssize_t rlen;
-
- if (path == NULL || data == NULL)
- return -EINVAL;
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- if (len < node->len) {
- pthread_rwlock_unlock(&rib.lock);
- return -EFBIG;
- }
-
- if (node->data == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return 0;
- }
-
- assert(node->len > 0);
-
- memcpy(data, node->data, node->len);
- rlen = node->len;
-
- rnode_throw_event(node, RO_READ);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return rlen;
-}
-
-int rib_write(const char * path,
- const void * data,
- size_t len)
-{
- struct rnode * node;
-
- uint8_t * cdata;
-
- if (path == NULL || data == NULL || len == 0)
- return -EINVAL;
-
- cdata = malloc(len);
- if (cdata == NULL)
- return -ENOMEM;
-
- memcpy(cdata, data, len);
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- free(cdata);
- return -1;
- }
-
- rnode_update(node, cdata, len);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-int rib_put(const char * path,
- void * data,
- size_t len)
-{
- struct rnode * node;
-
- if (path == NULL)
- return -EINVAL;
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node != NULL)
- rnode_update(node, (uint8_t *) data, len);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-bool rib_has(const char * path)
-{
- struct rnode * node;
-
- if (path == NULL)
- return -EINVAL;
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return node != NULL;
-}
-
-ssize_t rib_children(const char * path,
- char *** children)
-{
- struct list_head * p;
-
- struct rnode * node;
-
- ssize_t i = 0;
-
- if (path == NULL)
- return -EINVAL;
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- if (children == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- assert((ssize_t) node->chlen >= 0);
- return (ssize_t) node->chlen;
- }
-
- if (node->chlen == 0) {
- pthread_rwlock_unlock(&rib.lock);
- *children = NULL;
- return 0;
- }
-
- *children = malloc(sizeof(**children) * node->chlen);
- if (*children == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -ENOMEM;
- }
-
- list_for_each(p, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- (*children)[i] = strdup(c->node->name);
- if ((*children)[i] == NULL) {
- ssize_t j;
- pthread_rwlock_unlock(&rib.lock);
- for (j = 0; j < i; ++j)
- free((*children)[j]);
- free(*children);
- return -ENOMEM;
- }
- ++i;
- }
-
- assert(i > 0);
- assert((size_t) i == node->chlen);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return i;
-}
-
-static struct rib_sub * rib_get_sub(uint32_t sid)
-{
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &rib.subs) {
- struct rib_sub * r = list_entry(p, struct rib_sub, next);
- if (r->sid == sid)
- return r;
- }
-
- return NULL;
-}
-
-static struct rib_sub * rib_sub_create(uint32_t sid)
-{
- pthread_condattr_t cattr;
- struct rib_sub * sub = malloc(sizeof(*sub));
- if (sub == NULL)
- return NULL;
-
- if (pthread_condattr_init(&cattr)) {
- free(sub);
- return NULL;
- }
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&sub->cond, &cattr)) {
- free(sub);
- return NULL;
- }
-
- if (pthread_mutex_init(&sub->lock, NULL)) {
- pthread_cond_destroy(&sub->cond);
- free(sub);
- return NULL;
- }
-
- list_head_init(&sub->rnodes);
- list_head_init(&sub->events);
-
- sub->sid = sid;
-
- return sub;
-}
-
-static void rib_sub_zero(struct rib_sub * sub)
-{
- struct list_head * p;
- struct list_head * h;
-
- assert(sub);
-
- list_for_each_safe(p, h, &sub->rnodes) {
- struct rn_ptr * r = list_entry(p, struct rn_ptr, next);
- assert(r->node);
- rnode_del_sub(r->node, sub);
- list_del(&r->next);
- free(r);
- }
-
- list_for_each_safe(p, h, &sub->events) {
- struct revent * r = list_entry(p, struct revent, next);
- list_del(&r->next);
- assert(r->path);
- free(r->path);
- free(r);
- }
-}
-
-static struct rn_ptr * rib_sub_get_rn_ptr(struct rib_sub * sub,
- struct rnode * node)
-{
- struct list_head * p;
-
- list_for_each(p, &sub->rnodes) {
- struct rn_ptr * r = list_entry(p, struct rn_ptr, next);
- assert(r->node);
- if (r->node == node)
- return r;
- }
-
- return NULL;
-}
-
-static int rib_sub_add_rnode(struct rib_sub * sub,
- struct rnode * node)
-{
- struct rn_ptr * rn;
-
- assert(sub);
- assert(node);
-
- if (rib_sub_get_rn_ptr(sub, node) != NULL)
- return 0;
-
- rn = malloc(sizeof(*rn));
- if (rn == NULL)
- return -ENOMEM;
-
- rn->node = node;
-
- list_add(&rn->next, &sub->rnodes);
-
- return 0;
-}
-
-static void rib_sub_del_rnode(struct rib_sub * sub,
- struct rnode * node)
-{
- struct rn_ptr * rn;
-
- assert(sub);
- assert(node);
-
- rn = rib_sub_get_rn_ptr(sub, node);
- if (rn == NULL)
- return;
-
- list_del(&rn->next);
-
- free(rn);
-}
-
-static void rib_sub_destroy(struct rib_sub * sub)
-{
- assert(sub);
-
- rib_sub_zero(sub);
-
- free(sub);
-}
-
-/* Event calls from rqueue.h. */
-ro_set_t * ro_set_create(void)
-{
- ro_set_t * set;
- struct rib_sub * sub;
-
- set = malloc(sizeof(*set));
- if (set == NULL)
- return NULL;
-
- pthread_rwlock_wrlock(&rib.s_lock);
-
- set->sid = bmp_allocate(rib.sids);
- if (!bmp_is_id_valid(rib.sids, set->sid)) {
- pthread_rwlock_unlock(&rib.s_lock);
- free(set);
- return NULL;
- }
-
- pthread_rwlock_unlock(&rib.s_lock);
-
- pthread_rwlock_wrlock(&rib.lock);
-
- sub = rib_sub_create(set->sid);
- if (sub == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- free(set);
- return NULL;
- }
-
- list_add(&sub->next, &rib.subs);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return set;
-}
-
-void ro_set_destroy(ro_set_t * set)
-{
- struct rib_sub * sub = NULL;
-
- struct list_head * p;
- struct list_head * h;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- list_for_each_safe(p, h, &rib.subs) {
- struct rib_sub * r = list_entry(p, struct rib_sub, next);
- if (r->sid == set->sid) {
- sub = r;
- break;
- }
- }
-
- if (sub != NULL)
- rib_sub_destroy(sub);
-
- pthread_rwlock_unlock(&rib.lock);
-
- pthread_rwlock_wrlock(&rib.s_lock);
-
- bmp_release(rib.sids, set->sid);
-
- pthread_rwlock_unlock(&rib.s_lock);
-
- free(set);
-}
-
-rqueue_t * rqueue_create(void)
-{
- rqueue_t * rq = malloc(sizeof(*rq));
- if (rq == NULL)
- return NULL;
-
- list_head_init(&rq->events);
-
- return rq;
-}
-
-int rqueue_destroy(struct rqueue * rq)
-{
- struct list_head * p;
- struct list_head * h;
-
- list_for_each_safe(p, h, &rq->events) {
- struct revent * e = list_entry(p, struct revent, next);
- list_del(&e->next);
- free(e->path);
- free(e);
- }
-
- free(rq);
-
- return 0;
-}
-
-int ro_set_zero(ro_set_t * set)
-{
- struct rib_sub * sub;
-
- if (set == NULL)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- sub = rib_get_sub(set->sid);
-
- assert(sub);
-
- rib_sub_zero(sub);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-int ro_set_add(ro_set_t * set,
- const char * path,
- int32_t flags)
-{
- struct rib_sub * sub;
- struct rnode * node;
-
- if (set == NULL)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- sub = rib_get_sub(set->sid);
-
- assert(sub);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -1;
- }
-
- if (rnode_add_sub(node, sub, flags)) {
- pthread_rwlock_unlock(&rib.lock);
- return -ENOMEM;
- }
-
- if (rib_sub_add_rnode(sub, node)) {
- pthread_rwlock_unlock(&rib.lock);
- return -ENOMEM;
- }
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-int ro_set_del(ro_set_t * set,
- const char * path)
-{
- struct rib_sub * sub;
- struct rnode * node;
-
- if (set == NULL)
- return -EINVAL;
-
- pthread_rwlock_wrlock(&rib.lock);
-
- sub = rib_get_sub(set->sid);
-
- assert(sub);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -1;
- }
-
- rnode_del_sub(node, sub);
-
- rib_sub_del_rnode(sub, node);
-
- pthread_rwlock_unlock(&rib.lock);
-
- return 0;
-}
-
-int32_t rqueue_next(rqueue_t * rq,
- char * path)
-{
- struct revent * ev;
- int32_t ret;
-
- if (list_is_empty(&rq->events))
- return -1;
-
- ev = list_first_entry(&rq->events, struct revent, next);
- list_del(&ev->next);
-
- strcpy(path, ev->path);
- ret = ev->flags;
-
- free(ev->path);
- free(ev);
-
- return ret;
-}
-
-int rib_event_wait(ro_set_t * set,
- rqueue_t * rq,
- const struct timespec * timeout)
-{
- struct rib_sub * sub;
- struct timespec abstime;
- struct revent * ev;
-
- ssize_t ret = 0;
-
- if (set == NULL || rq == NULL)
- return -EINVAL;
-
- if (!list_is_empty(&rq->events))
- return 0;
-
- if (timeout != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeout, &abstime);
- }
-
- pthread_rwlock_rdlock(&rib.lock);
-
- sub = rib_get_sub(set->sid);
-
- assert(sub);
-
- pthread_rwlock_unlock(&rib.lock);
-
- pthread_mutex_lock(&sub->lock);
-
- pthread_cleanup_push((void(*)(void *)) pthread_mutex_unlock,
- (void *) &sub->lock);
-
- while (list_is_empty(&sub->events) && ret != -ETIMEDOUT) {
- if (timeout != NULL)
- ret = -pthread_cond_timedwait(&sub->cond,
- &sub->lock,
- &abstime);
- else
- ret = -pthread_cond_wait(&sub->cond, &sub->lock);
- }
-
- pthread_cleanup_pop(true);
-
- pthread_rwlock_wrlock(&rib.lock);
-
- if (ret != -ETIMEDOUT) {
- ev = list_first_entry(&sub->events, struct revent, next);
- list_move(&ev->next, &rq->events);
- }
-
- pthread_rwlock_unlock(&rib.lock);
-
- return ret;
-}
-
-/* Path name management. */
-char * rib_path_append(char * path,
- const char * name)
-{
- char * pos;
-
- if (path == NULL || name == NULL || strstr(name, RIB_PATH_DLR))
- return NULL;
-
- pos = path + strlen(path);
- memcpy(pos++, RIB_PATH_DLR, 1);
- strcpy(pos, name);
-
- return path;
-}
-
-char * rib_name_gen(void * data,
- size_t len)
-{
- uint32_t crc = 0;
- char * name;
-
- if (data == NULL || len == 0)
- return NULL;
-
- name= malloc(GEN_NAME_SIZE + 1);
- if (name == NULL)
- return NULL;
-
- crc32(&crc, data, len);
-
- sprintf(name, "%08x", crc);
-
- return name;
-}
-
-static ro_msg_t * rnode_pack(struct rnode * node,
- uint32_t flags,
- bool root)
-{
- ro_msg_t * msg;
-
- assert(node);
-
- if (node->parent == NULL)
- return NULL;
-
- msg = malloc(sizeof(*msg));
- if (msg == NULL)
- return NULL;
-
- ro_msg__init(msg);
-
- msg->name = node->name;
- if (root) {
- assert(node->parent->path);
- msg->parent = node->parent->path;
- }
-
- if ((root && (flags & PACK_HASH_ROOT)) ||
- (flags & PACK_HASH_ALL)) {
- msg->has_hash = true;
- msg->hash.data = node->sha3;
- msg->hash.len = SHA3_256_HASH_LEN;
- }
-
- if (node->data != NULL) {
- msg->has_data = true;
- msg->data.data = node->data;
- msg->data.len = node->len;
- }
-
- if (node->chlen > 0) {
- int n = 0;
- struct list_head * p;
- ro_msg_t ** msgs = malloc(sizeof(*msgs) * node->chlen);
- if (msgs == NULL) {
- free(msg);
- return NULL;
- }
-
- msg->n_children = node->chlen;
-
- list_for_each(p, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- msgs[n] = rnode_pack(c->node, flags, false);
- if (msgs[n] == NULL) {
- int i;
- for (i = 0; i < n; ++i)
- free(msgs[i]);
- free(msgs);
- free(msg);
- return NULL;
- }
- ++n;
- }
- msg->children = msgs;
- }
-
- return msg;
-}
-
-static void free_ro_msg(ro_msg_t * msg)
-{
- size_t n = 0;
- while (n < msg->n_children)
- free_ro_msg(msg->children[n++]);
-
- free(msg->children);
- free(msg);
-}
-
-ssize_t rib_pack(const char * path,
- uint8_t ** buf,
- uint32_t flags)
-{
- struct rnode * node;
- ro_msg_t * msg;
- ssize_t len;
-
- if (path == NULL)
- return -EINVAL;
-
- pthread_rwlock_rdlock(&rib.lock);
-
- node = find_rnode_by_path(path);
- if (node == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- msg = rnode_pack(node, flags, true);
- if (msg == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- len = ro_msg__get_packed_size(msg);
- if (len == 0) {
- pthread_rwlock_unlock(&rib.lock);
- free_ro_msg(msg);
- return 0;
- }
-
- *buf = malloc(len);
- if (*buf == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- free_ro_msg(msg);
- return -ENOMEM;
- }
-
- ro_msg__pack(msg, *buf);
-
- pthread_rwlock_unlock(&rib.lock);
-
- free_ro_msg(msg);
-
- return len;
-}
-
-static struct rnode * rnode_get_child(struct rnode * node,
- const char * name)
-{
- struct list_head * p;
-
- list_for_each(p, &node->children) {
- struct child * c = list_entry(p, struct child, next);
- if (strcmp(c->node->name, name) == 0)
- return c->node;
- }
-
- return NULL;
-}
-
-static int rnode_unpack(ro_msg_t * msg,
- struct rnode * parent,
- uint32_t flags)
-{
- struct rnode * node;
-
- size_t i;
-
- assert(msg);
- assert(parent);
-
- node = rnode_get_child(parent, msg->name);
- if (node == NULL) {
- if (flags & UNPACK_CREATE)
- node = rnode_create(parent, msg->name);
- else
- return -EPERM;
- }
-
- if (node == NULL)
- return -ENOMEM;
-
- /* Unpack in reverse order for faster insertion */
- i = msg->n_children;
- while (i > 0)
- rnode_unpack(msg->children[--i], node, flags);
-
- if (msg->has_data) {
- uint8_t * data = malloc(msg->data.len);
- if (data == NULL)
- return -ENOMEM;
-
- memcpy(data, msg->data.data, msg->data.len);
- rnode_update(node, data, msg->data.len);
- }
-
- return 0;
-}
-
-int rib_unpack(uint8_t * packed,
- size_t len,
- uint32_t flags)
-{
- ro_msg_t * msg;
- struct rnode * root;
- int ret;
-
- if (packed == NULL)
- return -EINVAL;
-
- msg = ro_msg__unpack(NULL, len, packed);
- if (msg == NULL)
- return -EPERM;
-
- assert(msg->parent);
-
- pthread_rwlock_wrlock(&rib.lock);
-
- root = find_rnode_by_path(msg->parent);
- if (root == NULL) {
- pthread_rwlock_unlock(&rib.lock);
- return -EPERM;
- }
-
- ret = rnode_unpack(msg, root, flags);
-
- if (ret == 0 && msg->has_hash) {
- root = rnode_get_child(root, msg->name);
- if (memcmp(msg->hash.data, root->sha3, SHA3_256_HASH_LEN)) {
- ro_msg__free_unpacked(msg, NULL);
- pthread_rwlock_unlock(&rib.lock);
- return -EFAULT;
- }
- }
-
- pthread_rwlock_unlock(&rib.lock);
-
- ro_msg__free_unpacked(msg, NULL);
-
- free(packed);
-
- return ret;
-}
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index 0223262a..a93bf321 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -14,7 +14,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
crc32_test.c
hashtable_test.c
md5_test.c
- rib_test.c
sha3_test.c
time_utils_test.c
${TIMERWHEEL_TEST}
diff --git a/src/lib/tests/rib_test.c b/src/lib/tests/rib_test.c
deleted file mode 100644
index 6a2446b9..00000000
--- a/src/lib/tests/rib_test.c
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2017
- *
- * Test of the RIB
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 199309L
-
-#include <ouroboros/time_utils.h>
-#include <ouroboros/rib.h>
-#include <ouroboros/rqueue.h>
-#include <ouroboros/errno.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#define RIB_MAX_PATH_LEN 256
-
-int rib_test(int argc,
- char ** argv)
-{
- uint64_t * address;
-
- size_t addr_size = 8;
- size_t addr_chk;
-
- char * addr_name;
-
- ro_set_t * set;
- rqueue_t * rq;
-
- int ret;
-
- char tmp[RIB_MAX_PATH_LEN];
-
- char ** kids;
- ssize_t ch;
-
- uint8_t * buf;
- ssize_t buf_len;
-
- struct timespec t = {0, 100 * MILLION};
-
- (void) argc;
- (void) argv;
-
- address = malloc(sizeof(*address));
- if (address == NULL)
- return -ENOMEM;
-
- if (rib_init()) {
- printf("Failed to initialize rib.\n");
- return -1;
- }
-
- rib_fini();
-
- if (rib_init()) {
- printf("Failed to re-initialize rib.\n");
- return -1;
- }
-
- if (rib_add(RIB_ROOT, "static_info")) {
- printf("Failed to add element to rib.\n");
- rib_fini();
- return -1;
- }
-
- ch = rib_children("/static_info", &kids);
- if (ch != 0) {
- printf("Wrong number of children returned.\n");
- rib_fini();
- while (ch > 0)
- free(kids[--ch]);
- free(kids);
- return -1;
- }
-
- if (!rib_has("/static_info")) {
- printf("Failed to find added element.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_add(RIB_ROOT, "dynamic_info")) {
- printf("Failed to add element to rib.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_add("/static_info", "addr_size")) {
- printf("Failed to add sub-element to rib.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_write("/static_info/addr_size",
- &addr_size, sizeof(addr_size))) {
- printf("Failed to add sub-element to rib.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_add("/static_info", "addresses")) {
- printf("Failed to add sub-element to rib.\n");
- rib_fini();
- return -1;
- }
-
- if (!rib_has("/static_info/addr_size")) {
- printf("Failed to find added subelement.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_read("/static_info/addr_size",
- &addr_chk, sizeof(addr_chk))
- != sizeof(addr_chk)) {
- printf("Failed to read added element.\n");
- rib_fini();
- return -1;
- }
-
- ch = rib_children("/static_info", &kids);
- if (ch != 2) {
- printf("Wrong number of children returned.\n");
- rib_fini();
- return -1;
- }
-
- while (ch > 0)
- free(kids[--ch]);
- free(kids);
-
- if (addr_chk != addr_size) {
- printf("Failed to verify added element contents.\n");
- rib_fini();
- return -1;
- }
-
- addr_size = 16;
-
- if (rib_write("/static_info/addr_size",
- &addr_size, sizeof(addr_size))) {
- printf("Failed to write into added element.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_read("/static_info/addr_size",
- &addr_chk, sizeof(addr_chk))
- != sizeof(addr_chk)) {
- printf("Failed to verify added element update size.\n");
- rib_fini();
- return -1;
- }
-
- if (addr_chk != addr_size) {
- printf("Failed to verify added element update size.\n");
- rib_fini();
- return -1;
- }
-
- addr_name = rib_name_gen(address, sizeof(*address));
- if (addr_name == NULL) {
- printf("Failed to create a name.\n");
- rib_fini();
- return -1;
- }
-
- strcpy(tmp, "/dynamic_info");
-
- if (rib_add(tmp, addr_name)) {
- free(addr_name);
- printf("Failed to add address.\n");
- rib_fini();
- return -1;
- }
-
- rib_path_append(tmp, addr_name);
-
- if (rib_put(tmp, address, sizeof(*address))) {
- free(addr_name);
- printf("Failed to add address.\n");
- rib_fini();
- return -1;
- }
-
- free(addr_name);
-
- buf_len = rib_pack("/static_info", &buf, PACK_HASH_ALL);
- if (buf_len < 0) {
- printf("Failed pack.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_del("/static_info")) {
- printf("Failed to delete.\n");
- rib_fini();
- return -1;
- }
-
- if (rib_unpack(buf, buf_len, UNPACK_CREATE)) {
- printf("Failed to unpack.\n");
- rib_fini();
- return -1;
- }
-
- if (!rib_has("/static_info")) {
- printf("Failed to find unpacked element.\n");
- rib_fini();
- return -1;
- }
-
- ch = rib_children("/static_info", &kids);
- if (ch != 2) {
- printf("Wrong number of children returned.\n");
- rib_fini();
- return -1;
- }
-
- while (ch > 0)
- free(kids[--ch]);
- free(kids);
-
- set = ro_set_create();
- if (set == NULL) {
- printf("Failed to create ro_set.\n");
- rib_fini();
- return -1;
- }
-
- rq = rqueue_create();
- if (rq == NULL) {
- printf("Failed to create rqueue.\n");
- ro_set_destroy(set);
- rib_fini();
- return -1;
- }
-
- if (ro_set_add(set, "/static_info", RO_ALL_OPS)) {
- printf("Failed to add to rqueue.\n");
- ro_set_destroy(set);
- rqueue_destroy(rq);
- rib_fini();
- return -1;
- }
-
- ret = rib_event_wait(set, rq, &t);
- if (ret != -ETIMEDOUT) {
- printf("Wait failed to timeout: %d.\n", ret);
- ro_set_destroy(set);
- rqueue_destroy(rq);
- rib_fini();
- return -1;
- }
-
- if (rib_del("/static_info")) {
- printf("Failed to delete rib subtree.\n");
- rib_fini();
- return -1;
- }
-
- ro_set_destroy(set);
-
- rqueue_destroy(rq);
-
- rib_fini();
-
- return 0;
-}