summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/ouroboros/cdap.h46
-rw-r--r--include/ouroboros/config.h.in1
-rw-r--r--include/ouroboros/ipcp-dev.h4
-rw-r--r--include/ouroboros/logs.h7
-rw-r--r--include/ouroboros/shared.h6
-rw-r--r--src/ipcpd/local/main.c8
-rw-r--r--src/ipcpd/normal/CMakeLists.txt1
-rw-r--r--src/ipcpd/normal/fmgr.c390
-rw-r--r--src/ipcpd/normal/ribmgr.c501
-rw-r--r--src/ipcpd/shim-eth-llc/main.c8
-rw-r--r--src/ipcpd/shim-udp/main.c8
-rw-r--r--src/irmd/ipcp.c12
-rw-r--r--src/irmd/ipcp.h15
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/cdap.c531
-rw-r--r--src/lib/cdap_req.c (renamed from src/ipcpd/normal/cdap_request.c)56
-rw-r--r--src/lib/cdap_req.h (renamed from src/ipcpd/normal/cdap_request.h)37
-rw-r--r--src/lib/dev.c8
18 files changed, 849 insertions, 791 deletions
diff --git a/include/ouroboros/cdap.h b/include/ouroboros/cdap.h
index 7312fb6f..89d598f9 100644
--- a/include/ouroboros/cdap.h
+++ b/include/ouroboros/cdap.h
@@ -30,6 +30,8 @@
#define F_SYNC 0x0001
+#define INVALID_CDAP_KEY -1
+
enum cdap_opcode {
CDAP_READ = 0,
CDAP_WRITE,
@@ -41,40 +43,36 @@ enum cdap_opcode {
struct cdap;
-/* Callback functions that work on the application's RIB */
-struct cdap_ops {
- int (* cdap_request)(struct cdap * instance,
- int invoke_id,
- enum cdap_opcode opcode,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags);
-
- int (* cdap_reply)(struct cdap * instance,
- int invoke_id,
- int result,
- uint8_t * data,
- size_t len);
-};
+typedef int32_t cdap_key_t;
/* Assumes flow is blocking */
-struct cdap * cdap_create(struct cdap_ops * ops,
- int fd);
+struct cdap * cdap_create(int fd);
+
int cdap_destroy(struct cdap * instance);
-/* Returns a positive invoke-id on success to be used in the callback */
-int cdap_send_request(struct cdap * instance,
+cdap_key_t cdap_request_send(struct cdap * instance,
enum cdap_opcode code,
char * name,
uint8_t * data,
size_t len,
uint32_t flags);
-/* Can only be called following a callback function */
-int cdap_send_reply(struct cdap * instance,
- int invoke_id,
+int cdap_reply_wait(struct cdap * instance,
+ cdap_key_t key,
+ uint8_t ** data,
+ size_t * len);
+
+cdap_key_t cdap_request_wait(struct cdap * instance,
+ enum cdap_opcode * opcode,
+ char ** name,
+ uint8_t ** data,
+ size_t * len,
+ uint32_t * flags);
+
+int cdap_reply_send(struct cdap * instance,
+ cdap_key_t key,
int result,
uint8_t * data,
size_t len);
-#endif
+
+#endif /* OUROBOROS_CDAP_H */
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 2417a5c0..4208c6d2 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -49,6 +49,7 @@
#define IRMD_MAX_FLOWS 4096
#define IRMD_THREADPOOL_SIZE 5
#define IPCPD_THREADPOOL_SIZE 3
+#define IPCPD_MAX_CONNS IRMD_MAX_FLOWS
#define LOG_DIR "/@LOG_DIR@/"
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
#define PFT_SIZE 1 << 12
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index 19a66762..fd1e7478 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -45,7 +45,7 @@ void ipcp_flow_fini(int fd);
void ipcp_flow_del(struct shm_du_buff * sdb);
-int ipcp_flow_get_qoscube(int fd,
- enum qos_cube * cube);
+int ipcp_flow_get_qoscube(int fd,
+ qoscube_t * cube);
#endif /* OUROBOROS_IPCP_DEV_H */
diff --git a/include/ouroboros/logs.h b/include/ouroboros/logs.h
index 56eac068..6a16aca6 100644
--- a/include/ouroboros/logs.h
+++ b/include/ouroboros/logs.h
@@ -24,6 +24,7 @@
#ifndef OUROBOROS_LOGS_H
#define OUROBOROS_LOGS_H
+#include <unistd.h>
#include <stdio.h>
#ifndef OUROBOROS_PREFIX
@@ -55,8 +56,10 @@ extern FILE * logfile;
FMT ANSI_COLOR_RESET "\n", ##ARGS); \
fflush(logfile); \
} else { \
- printf(CLR OUROBOROS_PREFIX "(" LVL "): " \
- FMT ANSI_COLOR_RESET "\n", ##ARGS); \
+ printf(CLR "==%05d== " \
+ OUROBOROS_PREFIX "(" LVL "): " \
+ FMT ANSI_COLOR_RESET "\n", getpid(), \
+ ##ARGS); \
} \
} while (0)
diff --git a/include/ouroboros/shared.h b/include/ouroboros/shared.h
index c38b1bde..7b281cc2 100644
--- a/include/ouroboros/shared.h
+++ b/include/ouroboros/shared.h
@@ -24,10 +24,10 @@
#define OUROBOROS_SHARED_H
/* FIXME: To be decided which QoS cubes we support */
-enum qos_cube {
+typedef enum qos_cube {
QOS_CUBE_BE = 0,
QOS_CUBE_VIDEO,
- QOS_MAX
-};
+ QOS_CUBE_MAX
+} qoscube_t;
#endif /* OUROBOROS_SHARED_H */
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 30d2d2bd..122beafb 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -217,10 +217,10 @@ static int ipcp_local_name_query(char * name)
return ret;
}
-static int ipcp_local_flow_alloc(int fd,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos)
+static int ipcp_local_flow_alloc(int fd,
+ char * dst_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
int out_fd = -1;
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 67a7953b..5f85dd89 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto)
set(SOURCE_FILES
# Add source files here
addr_auth.c
- cdap_request.c
crc32.c
dir.c
fmgr.c
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 8e416aa4..6684db7c 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t;
#define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */
-struct np1_flow {
- int fd;
- cep_id_t cep_id;
- enum qos_cube qos;
-};
-
-struct nm1_flow {
- int fd;
- enum qos_cube qos;
-};
-
struct {
- pthread_t nm1_flow_acceptor;
- struct nm1_flow ** nm1_flows;
+ flow_set_t * nm1_set[QOS_CUBE_MAX];
+ fqueue_t * nm1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t nm1_flows_lock;
- flow_set_t * nm1_set;
- pthread_t nm1_sdu_reader;
- struct np1_flow ** np1_flows;
- struct np1_flow ** np1_flows_cep;
+ flow_set_t * np1_set[QOS_CUBE_MAX];
+ fqueue_t * np1_fqs[QOS_CUBE_MAX];
pthread_rwlock_t np1_flows_lock;
- flow_set_t * np1_set;
+
+ cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
+ int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
+
+ pthread_t nm1_flow_acceptor;
+ pthread_t nm1_sdu_reader;
pthread_t np1_sdu_reader;
/* FIXME: Replace with PFF */
int fd;
} fmgr;
-static int add_nm1_fd(int fd,
- enum qos_cube qos)
-{
- struct nm1_flow * tmp;
-
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL)
- return -1;
-
- tmp->fd = fd;
- tmp->qos = qos;
-
- pthread_rwlock_wrlock(&fmgr.nm1_flows_lock);
- fmgr.nm1_flows[fd] = tmp;
- pthread_rwlock_unlock(&fmgr.nm1_flows_lock);
-
- flow_set_add(fmgr.nm1_set, fd);
-
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
-
- return 0;
-}
-
-static int add_np1_fd(int fd,
- cep_id_t cep_id,
- enum qos_cube qos)
-{
- struct np1_flow * flow;
-
- flow = malloc(sizeof(*flow));
- if (flow == NULL)
- return -1;
-
- flow->cep_id = cep_id;
- flow->qos = qos;
- flow->fd = fd;
-
- fmgr.np1_flows[fd] = flow;
- fmgr.np1_flows_cep[cep_id] = flow;
-
- return 0;
-}
-
static void * fmgr_nm1_acceptor(void * o)
{
int fd;
char * ae_name;
+ qoscube_t cube;
qosspec_t qs;
(void) o;
@@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o)
if (!(strcmp(ae_name, MGMT_AE) == 0 ||
strcmp(ae_name, DT_AE) == 0)) {
if (flow_alloc_resp(fd, -1))
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
if (flow_alloc_resp(fd, 0)) {
- LOG_ERR("Failed to reply to flow allocation.");
+ LOG_WARN("Failed to reply to flow allocation.");
flow_dealloc(fd);
continue;
}
@@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o)
if (strcmp(ae_name, MGMT_AE) == 0) {
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand fd to RIB.");
+ LOG_WARN("Failed to hand fd to RIB.");
flow_dealloc(fd);
continue;
}
} else {
- /* FIXME: Pass correct QoS cube */
- if (add_nm1_fd(fd, QOS_CUBE_BE)) {
- LOG_ERR("Failed to add fd to list.");
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (flow_set_add(fmgr.nm1_set[cube], fd)) {
+ LOG_WARN("Failed to add fd.");
flow_dealloc(fd);
continue;
}
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
}
free(ae_name);
@@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
- struct np1_flow * flow;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.np1_set[i],
+ fmgr.np1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
if (ret < 0) {
- LOG_ERR("Event error: %d.", ret);
+ LOG_WARN("Event error: %d.", ret);
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ LOG_WARN("Failed to read SDU from fd %d.", fd);
continue;
}
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
-
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
+ LOG_WARN("Failed to hand SDU to FRCT.");
continue;
}
@@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct pci * pci;
int fd;
- fqueue_t * fq = fqueue_create();
- if (fq == NULL)
- return (void *) 1;
+ int i = 0;
+ int ret;
(void) o;
while (true) {
- int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ /* FIXME: replace with scheduling policy call */
+ i = (i + 1) % QOS_CUBE_MAX;
+
+ ret = flow_event_wait(fmgr.nm1_set[i],
+ fmgr.nm1_fqs[i],
+ &timeout);
if (ret == -ETIMEDOUT)
continue;
@@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o)
continue;
}
- while ((fd = fqueue_next(fq)) >= 0) {
+ while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
LOG_ERR("Failed to read SDU from fd %d.", fd);
continue;
@@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o)
return (void *) 0;
}
-int fmgr_init()
+static void fmgr_destroy_flows(void)
{
int i;
- fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.nm1_flows == NULL)
- return -1;
-
- fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows == NULL) {
- free(fmgr.nm1_flows);
- return -1;
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ flow_set_destroy(fmgr.nm1_set[i]);
+ flow_set_destroy(fmgr.np1_set[i]);
+ fqueue_destroy(fmgr.nm1_fqs[i]);
+ fqueue_destroy(fmgr.np1_fqs[i]);
}
+}
- fmgr.np1_flows_cep =
- malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS);
- if (fmgr.np1_flows_cep == NULL) {
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+int fmgr_init()
+{
+ int i;
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- fmgr.nm1_flows[i] = NULL;
- fmgr.np1_flows[i] = NULL;
- fmgr.np1_flows_cep[i] = NULL;
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID;
+
+ for (i = 0; i < IPCPD_MAX_CONNS; ++i)
+ fmgr.np1_cep_id_to_fd[i] = -1;
pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);
pthread_rwlock_init(&fmgr.np1_flows_lock, NULL);
- fmgr.np1_set = flow_set_create();
- if (fmgr.np1_set == NULL) {
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
- }
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ fmgr.np1_set[i] = flow_set_create();
+ if (fmgr.np1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
- fmgr.nm1_set = flow_set_create();
- if (fmgr.nm1_set == NULL) {
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
- return -1;
+ fmgr.np1_fqs[i] = fqueue_create();
+ if (fmgr.np1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_set[i] = flow_set_create();
+ if (fmgr.nm1_set == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
+
+ fmgr.nm1_fqs[i] = fqueue_create();
+ if (fmgr.nm1_fqs[i] == NULL) {
+ fmgr_destroy_flows();
+ return -1;
+ }
}
pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);
@@ -378,6 +332,7 @@ int fmgr_init()
int fmgr_fini()
{
int i;
+ int j;
pthread_cancel(fmgr.nm1_flow_acceptor);
pthread_cancel(fmgr.np1_sdu_reader);
@@ -387,29 +342,25 @@ int fmgr_fini()
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
- for (i = 0; i < IRMD_MAX_FLOWS; i++) {
- if (fmgr.nm1_flows[i] == NULL)
- continue;
- flow_dealloc(fmgr.nm1_flows[i]->fd);
- free(fmgr.nm1_flows[i]);
- }
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ if (flow_set_has(fmgr.nm1_set[j], i)) {
+ flow_dealloc(i);
+ flow_set_del(fmgr.nm1_set[j], i);
+ }
pthread_rwlock_destroy(&fmgr.nm1_flows_lock);
pthread_rwlock_destroy(&fmgr.np1_flows_lock);
- flow_set_destroy(fmgr.nm1_set);
- flow_set_destroy(fmgr.np1_set);
- free(fmgr.np1_flows_cep);
- free(fmgr.np1_flows);
- free(fmgr.nm1_flows);
+ fmgr_destroy_flows();
return 0;
}
-int fmgr_np1_alloc(int fd,
- char * dst_ap_name,
- char * src_ae_name,
- enum qos_cube qos)
+int fmgr_np1_alloc(int fd,
+ char * dst_ap_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
cep_id_t cep_id;
buffer_t buf;
@@ -478,10 +429,8 @@ int fmgr_np1_alloc(int fd,
free(ro_data);
- if (add_np1_fd(fd, cep_id, qos)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -491,16 +440,13 @@ int fmgr_np1_alloc(int fd,
/* Call under np1_flows lock */
static int np1_flow_dealloc(int fd)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
+ qoscube_t cube;
- flow_set_del(fmgr.np1_set, fd);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL)
- return -1;
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
@@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd)
buf.data = malloc(buf.len);
if (buf.data == NULL)
- return -1;
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
- ret = frct_i_destroy(flow->cep_id, &buf);
+ ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
- free(flow);
free(buf.data);
return ret;
@@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd)
int fmgr_np1_alloc_resp(int fd, int response)
{
- struct np1_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
-
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
-
msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
msg.response = response;
msg.has_response = true;
buf.len = flow_alloc_msg__get_packed_size(&msg);
- if (buf.len == 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ if (buf.len == 0)
return -1;
- }
buf.data = malloc(buf.len);
- if (buf.data == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- return -1;
- }
+ if (buf.data == NULL)
+ return -ENOMEM;
flow_alloc_msg__pack(&msg, buf.data);
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
if (response < 0) {
- frct_i_destroy(flow->cep_id, &buf);
+ frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);
free(buf.data);
- fmgr.np1_flows[fd] = NULL;
- fmgr.np1_flows_cep[flow->cep_id] = NULL;
- free(flow);
+ fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]]
+ = INVALID_CEP_ID;
+ fmgr.np1_fd_to_cep_id[fd] = -1;
} else {
- if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
+ qoscube_t cube;
+ ipcp_flow_get_qoscube(fd, &cube);
+ if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return -1;
}
- flow_set_add(fmgr.np1_set, fd);
+ flow_set_add(fmgr.np1_set[cube], fd);
}
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
@@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd)
int ret;
pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
ret = np1_flow_dealloc(fd);
+
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
return ret;
}
-int fmgr_np1_post_buf(cep_id_t cep_id,
- buffer_t * buf)
+int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)
{
- struct np1_flow * flow;
int ret = 0;
int fd;
flow_alloc_msg_t * msg;
-
- pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+ qoscube_t cube;
/* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
@@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
msg->dst_name,
msg->src_ae_name);
if (fd < 0) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("Failed to get fd for flow.");
return -1;
}
- if (add_np1_fd(fd, cep_id, msg->qos_cube)) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to add np1 flow.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
+
+ fmgr.np1_fd_to_cep_id[fd] = cep_id;
+ fmgr.np1_cep_id_to_fd[cep_id] = fd;
+
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
+ pthread_rwlock_wrlock(&fmgr.np1_flows_lock);
- ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ret = ipcp_flow_alloc_reply(fd, msg->response);
if (msg->response < 0) {
- fmgr.np1_flows[flow->fd] = NULL;
- fmgr.np1_flows_cep[cep_id] = NULL;
- free(flow);
+ fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID;
+ fmgr.np1_cep_id_to_fd[cep_id] = -1;
} else {
- flow_set_add(fmgr.np1_set, flow->fd);
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_add(fmgr.np1_set[cube],
+ fmgr.np1_cep_id_to_fd[cep_id]);
}
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("No such flow in flow manager.");
- return -1;
- }
-
- flow_set_del(fmgr.np1_set, flow->fd);
-
- ret = flow_dealloc(flow->fd);
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ ipcp_flow_get_qoscube(fd, &cube);
+ flow_set_del(fmgr.np1_set[cube], fd);
+ ret = flow_dealloc(fd);
break;
default:
LOG_ERR("Got an unknown flow allocation message.");
@@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t cep_id,
int fmgr_np1_post_sdu(cep_id_t cep_id,
struct shm_du_buff * sdb)
{
- struct np1_flow * flow;
+ int fd;
pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows_cep[cep_id];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- LOG_ERR("Failed to find N flow.");
- return -1;
- }
-
- if (ipcp_flow_write(flow->fd, sdb)) {
+ fd = fmgr.np1_cep_id_to_fd[cep_id];
+ if (ipcp_flow_write(fd, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
LOG_ERR("Failed to hand SDU to N flow.");
return -1;
@@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
+ LOG_ERR("Result of flow allocation to %s is %d.",
dst_name, result);
return -1;
}
if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand file descriptor to RIB manager");
+ LOG_ERR("Failed to hand file descriptor to RIB manager.");
flow_dealloc(fd);
return -1;
}
@@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name)
return 0;
}
-int fmgr_nm1_dt_flow(char * dst_name,
- enum qos_cube qos)
+int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)
{
int fd;
int result;
@@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name,
/* FIXME: Map qos cube on correct QoS. */
fd = flow_alloc(dst_name, DT_AE, NULL);
if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s", dst_name);
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
return -1;
}
result = flow_alloc_res(fd);
if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d",
- dst_name, result);
+ LOG_ERR("Allocate flow to %s result %d.", dst_name, result);
return -1;
}
- if (add_nm1_fd(fd, qos)) {
- LOG_ERR("Failed to add file descriptor to list.");
- flow_dealloc(fd);
- return -1;
- }
+ flow_set_add(fmgr.nm1_set[qos], fd);
+
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
return 0;
}
-int fmgr_nm1_write_sdu(struct pci * pci,
- struct shm_du_buff * sdb)
+int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)
{
if (pci == NULL || sdb == NULL)
return -1;
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index b0738a0c..1e9bcc18 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -42,7 +42,6 @@
#include "dt_const.h"
#include "frct.h"
#include "ipcp.h"
-#include "cdap_request.h"
#include "ro.h"
#include "path.h"
#include "dir.h"
@@ -85,22 +84,28 @@ struct rnode {
};
struct mgmt_flow {
+ struct list_head next;
+
struct cdap * instance;
int fd;
- struct list_head next;
+
+ pthread_t handler;
};
struct ro_sub {
+ struct list_head next;
+
int sid;
+
char * name;
struct ro_sub_ops * ops;
- struct list_head next;
};
struct ro_id {
+ struct list_head next;
+
uint64_t seqno;
char * full_name;
- struct list_head next;
};
struct {
@@ -124,9 +129,6 @@ struct {
struct list_head flows;
pthread_rwlock_t flows_lock;
- struct list_head cdap_reqs;
- pthread_mutex_t cdap_reqs_lock;
-
struct addr_auth * addr_auth;
enum pol_addr_auth addr_auth_type;
} rib;
@@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,
pthread_rwlock_unlock(&ipcpi.state_lock);
}
-/* We only have a create operation for now */
+/* We only have a create operation for now. */
static struct ro_sub_ops ribmgr_sub_ops = {
.ro_created = ribmgr_ro_created,
.ro_updated = NULL,
@@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)
{
char * name = (char *) o;
- if (ribmgr_ro_delete(name)) {
+ pthread_mutex_lock(&rib.ro_lock);
+
+ if (ribmgr_ro_delete(name))
LOG_ERR("Failed to delete %s.", name);
- }
+
+ pthread_mutex_unlock(&rib.ro_lock);
}
static struct rnode * ribmgr_ro_create(const char * name,
@@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char * name,
node = node->child;
sibling = false;
- /* Search horizontally */
+ /* Search horizontally. */
while (node != NULL) {
if (strcmp(node->name, token) == 0) {
break;
@@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char * name,
LOG_DBG("Created RO with name %s.", name);
- if (!(attr.expiry.tv_sec == 0 &&
- attr.expiry.tv_nsec == 0)) {
+ if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {
timeout = attr.expiry.tv_sec * 1000 +
attr.expiry.tv_nsec / MILLION;
- if (timerwheel_add(rib.wheel, ro_delete_timer,
- new->full_name, strlen(new->full_name) + 1,
- timeout)) {
+ if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name,
+ strlen(new->full_name) + 1, timeout))
LOG_ERR("Failed to add deletion timer of RO.");
- }
}
return new;
@@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,
return node;
}
-/* Call while holding cdap_reqs_lock */
-/* FIXME: better not to call blocking functions under any lock */
-int cdap_result_wait(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- int invoke_id)
-{
- struct cdap_request * req;
- int ret;
- char * name_dup = strdup(name);
- if (name_dup == NULL)
- return -1;
-
- req = cdap_request_create(code, name_dup, invoke_id, instance);
- if (req == NULL) {
- free(name_dup);
- return -1;
- }
-
- list_add(&req->next, &rib.cdap_reqs);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- ret = cdap_request_wait(req);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- if (ret == -1) /* should only be on ipcp shutdown */
- LOG_DBG("Waiting CDAP request destroyed.");
-
- if (ret == -ETIMEDOUT)
- LOG_ERR("CDAP Request timed out.");
-
- if (ret)
- LOG_DBG("Unknown error code: %d.", ret);
-
- if (!ret)
- ret = req->result;
-
- list_del(&req->next);
- cdap_request_destroy(req);
-
- return ret;
-}
-
static int write_ro_msg(struct cdap * neighbor,
ro_msg_t * msg,
char * name,
@@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap * neighbor,
{
uint8_t * data;
size_t len;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
len = ro_msg__get_packed_size(msg);
if (len == 0)
@@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap * neighbor,
ro_msg__pack(msg, data);
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(neighbor, code,
- name, data, len, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ key = cdap_request_send(neighbor, code, name, data, len, 0);
+ if (key < 0) {
+ LOG_ERR("Failed to send CDAP request.");
free(data);
return -1;
}
- if (cdap_result_wait(neighbor, code, name, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
- LOG_ERR("Remote did not receive RIB object.");
+ free(data);
+
+ ret = cdap_reply_wait(neighbor, key, NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("CDAP command with code %d and name %s failed: %d.",
+ code, name, ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
- free(data);
return 0;
}
@@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap * neighbor,
int ribmgr_init()
{
INIT_LIST_HEAD(&rib.flows);
- INIT_LIST_HEAD(&rib.cdap_reqs);
INIT_LIST_HEAD(&rib.subs);
INIT_LIST_HEAD(&rib.ro_ids);
@@ -540,17 +495,9 @@ int ribmgr_init()
return -1;
}
- if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) {
- LOG_ERR("Failed to initialize mutex.");
- pthread_rwlock_destroy(&rib.flows_lock);
- free(rib.root);
- return -1;
- }
-
if (pthread_mutex_init(&rib.ro_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
free(rib.root);
return -1;
}
@@ -558,7 +505,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.subs_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
free(rib.root);
return -1;
@@ -567,7 +513,6 @@ int ribmgr_init()
if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
free(rib.root);
@@ -578,7 +523,6 @@ int ribmgr_init()
if (rib.sids == NULL) {
LOG_ERR("Failed to create bitmap.");
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -591,7 +535,6 @@ int ribmgr_init()
LOG_ERR("Failed to create timerwheel.");
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -605,7 +548,6 @@ int ribmgr_init()
timerwheel_destroy(rib.wheel);
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -633,16 +575,6 @@ int ribmgr_fini()
struct list_head * pos = NULL;
struct list_head * n = NULL;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- free(req->name);
- list_del(&req->next);
- free(req);
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
pthread_rwlock_wrlock(&rib.flows_lock);
list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
@@ -668,7 +600,6 @@ int ribmgr_fini()
timerwheel_destroy(rib.wheel);
pthread_mutex_destroy(&rib.subs_lock);
- pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_rwlock_destroy(&rib.flows_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
@@ -676,49 +607,8 @@ int ribmgr_fini()
return 0;
}
-static int ribmgr_cdap_reply(struct cdap * instance,
- int invoke_id,
- int result,
- uint8_t * data,
- size_t len)
-{
- struct list_head * pos, * n = NULL;
-
- /* We never perform reads on other RIBs */
- (void) data;
- (void) len;
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- list_for_each_safe(pos, n, &rib.cdap_reqs) {
- struct cdap_request * req =
- list_entry(pos, struct cdap_request, next);
- if (req->instance == instance &&
- req->invoke_id == invoke_id &&
- req->state == REQ_PENDING) {
- if (result != 0)
- LOG_ERR("CDAP command with code %d and name %s "
- "failed with error %d",
- req->code, req->name, result);
- else
- LOG_DBG("CDAP command with code %d and name %s "
- "executed succesfully",
- req->code, req->name);
-
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- cdap_request_respond(req, result);
-
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- }
- }
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
-
- return 0;
-}
-
static int ribmgr_cdap_create(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg)
{
@@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,
struct ro_attr attr;
struct rnode * node;
+ assert(instance);
+
ro_attr_init(&attr);
attr.expiry.tv_sec = msg->sec;
attr.expiry.tv_nsec = msg->nsec;
@@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
free(ro_data);
return -1;
}
@@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,
}
static int ribmgr_cdap_delete(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
struct list_head * p = NULL;
@@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
if (ribmgr_ro_delete(name)) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
@@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
LOG_ERR("Failed to send reply to create request.");
return -1;
}
@@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,
}
static int ribmgr_cdap_write(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name,
ro_msg_t * msg,
uint32_t flags)
@@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
ro_data = malloc(msg->value.len);
if (ro_data == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
memcpy(ro_data, msg->value.data, msg->value.len);
@@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
if (node == NULL) {
pthread_mutex_unlock(&rib.ro_lock);
free(ro_data);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
+ cdap_reply_send(instance, key, -1, NULL, 0);
return -1;
}
node->seqno = msg->seqno;
@@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
pthread_mutex_unlock(&rib.subs_lock);
pthread_mutex_unlock(&rib.ro_lock);
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to write request.");
return -1;
}
@@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
return 0;
}
-static int ribmgr_enrol_sync(struct cdap * instance,
- struct rnode * node)
+static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)
{
int ret = 0;
@@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,
}
static int ribmgr_cdap_start(struct cdap * instance,
- int invoke_id,
+ cdap_key_t key,
char * name)
{
- int iid = 0;
-
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_OPERATIONAL &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("New enrollment request.");
- if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
+ return -1;
+ }
+
+ if (cdap_reply_send(instance, key, 0, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to enrollment request.");
return -1;
}
- /* Loop through rtree and send correct objects */
- LOG_DBGF("Sending ROs that need to be sent on enrolment...");
+ /* Loop through rtree and send correct objects. */
+ LOG_DBG("Sending ROs that need to be sent on enrolment...");
pthread_mutex_lock(&rib.ro_lock);
if (ribmgr_enrol_sync(instance, rib.root->child)) {
@@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,
LOG_ERR("Failed to sync part of the RIB.");
return -1;
}
+
pthread_mutex_unlock(&rib.ro_lock);
LOG_DBGF("Sending stop enrollment...");
- pthread_mutex_lock(&rib.cdap_reqs_lock);
-
- iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT,
+ key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,
NULL, 0, 0);
- if (iid < 0) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (key < 0) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_STOP,
- ENROLLMENT, iid)) {
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ if (cdap_reply_wait(instance, key, NULL, NULL)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
} else {
- if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to send reply to start request.");
- return -1;
- }
+ LOG_WARN("Request to start unknown operation.");
+ if (cdap_reply_send(instance, key, -1, NULL, 0))
+ LOG_ERR("Failed to send negative reply.");
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
-static int ribmgr_cdap_stop(struct cdap * instance,
- int invoke_id,
- char * name)
+static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)
{
int ret = 0;
pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_CONFIG &&
- strcmp(name, ENROLLMENT) == 0) {
+ if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("Stop enrollment received.");
ipcp_set_state(IPCP_BOOTING);
} else
ret = -1;
- if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
+ if (cdap_reply_send(instance, key, ret, NULL, 0)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to stop request.");
return -1;
@@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)
pthread_mutex_unlock(&rib.ro_ids_lock);
}
-static int ro_id_create(char * name,
- ro_msg_t * msg)
+static int ro_id_create(char * name, ro_msg_t * msg)
{
struct ro_id * tmp;
@@ -1062,105 +947,113 @@ static int ro_id_create(char * name,
return 0;
}
-static int ribmgr_cdap_request(struct cdap * instance,
- int invoke_id,
- enum cdap_opcode opcode,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static void * cdap_req_handler(void * o)
{
+ struct cdap * instance = (struct cdap *) o;
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
ro_msg_t * msg;
- int ret = -1;
struct list_head * p = NULL;
- if (opcode == CDAP_START)
- return ribmgr_cdap_start(instance,
- invoke_id,
- name);
- else if (opcode == CDAP_STOP)
- return ribmgr_cdap_stop(instance,
- invoke_id,
- name);
-
- msg = ro_msg__unpack(NULL, len, data);
- if (msg == NULL) {
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- LOG_ERR("Failed to unpack RO message");
- return -1;
- }
+ assert(instance);
- pthread_mutex_lock(&rib.ro_ids_lock);
- list_for_each(p, &rib.ro_ids) {
- struct ro_id * e = list_entry(p, struct ro_id, next);
+ while (true) {
+ cdap_key_t key = cdap_request_wait(instance,
+ &opcode,
+ &name,
+ &data,
+ &len,
+ &flags);
+ assert(key >= 0);
- if (strcmp(e->full_name, name) == 0 &&
- e->seqno == msg->seqno) {
- pthread_mutex_unlock(&rib.ro_ids_lock);
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, 0, NULL, 0);
- LOG_DBG("Already received this RO.");
- return 0;
+ if (opcode == CDAP_START) {
+ if (ribmgr_cdap_start(instance, key, name))
+ LOG_WARN("CDAP start failed.");
+ continue;
+ }
+ else if (opcode == CDAP_STOP) {
+ if (ribmgr_cdap_stop(instance, key, name))
+ LOG_WARN("CDAP stop failed.");
+ continue;
}
- }
- pthread_mutex_unlock(&rib.ro_ids_lock);
-
- if (opcode == CDAP_CREATE) {
- ret = ribmgr_cdap_create(instance,
- invoke_id,
- name,
- msg);
- } else if (opcode == CDAP_WRITE) {
- ret = ribmgr_cdap_write(instance,
- invoke_id,
- name, msg,
- flags);
-
- } else if (opcode == CDAP_DELETE) {
- ret = ribmgr_cdap_delete(instance,
- invoke_id,
- name);
- } else {
- LOG_INFO("Unsupported opcode received.");
- ro_msg__free_unpacked(msg, NULL);
- cdap_send_reply(instance, invoke_id, -1, NULL, 0);
- return -1;
- }
- if (ro_id_create(name, msg)) {
- LOG_ERR("Failed to create RO id.");
- return -1;
- }
+ msg = ro_msg__unpack(NULL, len, data);
+ if (msg == NULL) {
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ LOG_WARN("Failed to unpack RO message");
+ continue;
+ }
- if (msg->recv_set == ALL_MEMBERS) {
- pthread_rwlock_rdlock(&rib.flows_lock);
- list_for_each(p, &rib.flows) {
- struct mgmt_flow * e =
- list_entry(p, struct mgmt_flow, next);
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_for_each(p, &rib.ro_ids) {
+ struct ro_id * e = list_entry(p, struct ro_id, next);
- /* Don't send it back */
- if (e->instance == instance)
+ if (strcmp(e->full_name, name) == 0 &&
+ e->seqno == msg->seqno) {
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, 0, NULL, 0);
+ LOG_DBG("Already received this RO.");
continue;
+ }
+ }
+ pthread_mutex_unlock(&rib.ro_ids_lock);
- if (write_ro_msg(e->instance, msg, name, opcode)) {
- LOG_ERR("Failed to send to a neighbor.");
- pthread_rwlock_unlock(&rib.flows_lock);
+ if (opcode == CDAP_CREATE) {
+ if (ribmgr_cdap_create(instance, key, name, msg)) {
+ LOG_WARN("CDAP create failed.");
ro_msg__free_unpacked(msg, NULL);
- return -1;
+ continue;
}
+ } else if (opcode == CDAP_WRITE) {
+ if (ribmgr_cdap_write(instance, key, name,
+ msg, flags)) {
+ LOG_WARN("CDAP write failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else if (opcode == CDAP_DELETE) {
+ if (ribmgr_cdap_delete(instance, key, name)) {
+ LOG_WARN("CDAP delete failed.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
+ } else {
+ LOG_INFO("Unsupported opcode received.");
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_reply_send(instance, key, -1, NULL, 0);
+ continue;
}
- pthread_rwlock_unlock(&rib.flows_lock);
- }
- ro_msg__free_unpacked(msg, NULL);
+ if (ro_id_create(name, msg)) {
+ LOG_WARN("Failed to create RO id.");
+ ro_msg__free_unpacked(msg, NULL);
+ continue;
+ }
- return ret;
-}
+ if (msg->recv_set == ALL_MEMBERS) {
+ pthread_rwlock_rdlock(&rib.flows_lock);
+ list_for_each(p, &rib.flows) {
+ struct mgmt_flow * e =
+ list_entry(p, struct mgmt_flow, next);
-static struct cdap_ops ribmgr_cdap_ops = {
- .cdap_reply = ribmgr_cdap_reply,
- .cdap_request = ribmgr_cdap_request
-};
+ /* Don't send it back. */
+ if (e->instance == instance)
+ continue;
+
+ if (write_ro_msg(e->instance, msg,
+ name, opcode))
+ LOG_WARN("Failed to send to neighbor.");
+ }
+ pthread_rwlock_unlock(&rib.flows_lock);
+ }
+
+ ro_msg__free_unpacked(msg, NULL);
+ }
+}
int ribmgr_add_flow(int fd)
{
@@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)
flow = malloc(sizeof(*flow));
if (flow == NULL)
- return -1;
+ return -ENOMEM;
- instance = cdap_create(&ribmgr_cdap_ops, fd);
+ instance = cdap_create(fd);
if (instance == NULL) {
LOG_ERR("Failed to create CDAP instance");
free(flow);
@@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)
flow->instance = instance;
flow->fd = fd;
+ if (pthread_create(&flow->handler, NULL,
+ cdap_req_handler, instance)) {
+ LOG_ERR("Failed to start handler thread for mgt flow.");
+ free(flow);
+ return -1;
+ }
+
pthread_rwlock_wrlock(&rib.flows_lock);
+
list_add(&flow->next, &rib.flows);
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (flow->fd == fd) {
+ pthread_cancel(flow->handler);
if (cdap_destroy(flow->instance))
LOG_ERR("Failed to destroy CDAP instance.");
list_del(&flow->next);
@@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)
size_t len = 0;
struct ro_attr attr;
- if (conf == NULL ||
- conf->type != IPCP_NORMAL) {
+ if (conf == NULL || conf->type != IPCP_NORMAL) {
LOG_ERR("Bad DIF configuration.");
- return -1;
+ return -EINVAL;
}
ro_attr_init(&attr);
@@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
len = static_info_msg__get_packed_size(&stat_info);
if (len == 0) {
LOG_ERR("Failed to get size of static information.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
data = malloc(len);
if (data == NULL) {
LOG_ERR("Failed to allocate memory.");
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
attr, data, len) == NULL) {
LOG_ERR("Failed to create static info RO.");
free(data);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
if (dir_init()) {
LOG_ERR("Failed to init directory");
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
LOG_ERR("Failed to initialize FRCT.");
dir_fini();
ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- addr_auth_destroy(rib.addr_auth);
ribmgr_ro_delete(RIBMGR_PREFIX);
return -1;
}
@@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
- int iid = 0;
+ cdap_key_t key;
+ int ret;
pthread_rwlock_wrlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_INIT) {
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("IPCP in wrong state.");
return -1;
}
@@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)
ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("No flows in RIB.");
return -1;
}
- flow = list_entry((&rib.flows)->next, struct mgmt_flow, next);
+ flow = list_first_entry((&rib.flows), struct mgmt_flow, next);
instance = flow->instance;
- pthread_mutex_lock(&rib.cdap_reqs_lock);
- iid = cdap_send_request(instance,
- CDAP_START,
- ENROLLMENT,
- NULL, 0, 0);
- if (iid < 0) {
+ key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0);
+ if (key < 0) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
return -1;
}
- if (cdap_result_wait(instance, CDAP_START,
- ENROLLMENT, iid)) {
+ ret = cdap_reply_wait(instance, key, NULL, NULL);
+ if (ret) {
ipcp_set_state(IPCP_INIT);
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Failed to start enrollment.");
+ LOG_ERR("Failed to enroll: %d.", ret);
return -1;
}
- pthread_mutex_unlock(&rib.cdap_reqs_lock);
+
pthread_rwlock_unlock(&rib.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)
int ribmgr_start_policies(void)
{
pthread_rwlock_rdlock(&ipcpi.state_lock);
+
if (ipcp_get_state() != IPCP_BOOTING) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Cannot start policies in wrong state");
+ LOG_ERR("Cannot start policies in wrong state.");
return -1;
}
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)
}
rib.address = rib.addr_auth->address();
- LOG_DBG("IPCP has address %lu", (unsigned long) rib.address);
+ LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);
return 0;
}
@@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()
return rib.address;
}
-static int send_neighbors_ro(char * name,
- ro_msg_t * msg,
- enum cdap_opcode code)
+static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)
{
struct list_head * p = NULL;
pthread_rwlock_rdlock(&rib.flows_lock);
+
list_for_each(p, &rib.flows) {
struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next);
-
if (write_ro_msg(e->instance, msg, name, code)) {
- LOG_ERR("Failed to send to a neighbor.");
pthread_rwlock_unlock(&rib.flows_lock);
+ LOG_ERR("Failed to send to a neighbor.");
return -1;
}
}
+
pthread_rwlock_unlock(&rib.flows_lock);
return 0;
@@ -1499,9 +1393,7 @@ int ro_delete(const char * name)
return 0;
}
-int ro_write(const char * name,
- uint8_t * data,
- size_t len)
+int ro_write(const char * name, uint8_t * data, size_t len)
{
struct rnode * node;
ro_msg_t msg = RO_MSG__INIT;
@@ -1541,8 +1433,7 @@ int ro_write(const char * name,
return 0;
}
-ssize_t ro_read(const char * name,
- uint8_t ** data)
+ssize_t ro_read(const char * name, uint8_t ** data)
{
struct rnode * node;
ssize_t len;
@@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,
return len;
}
-ssize_t ro_children(const char * name,
- char *** children)
+ssize_t ro_children(const char * name, char *** children)
{
struct rnode * node;
struct rnode * child;
@@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)
return found;
}
-int ro_subscribe(const char * name,
- struct ro_sub_ops * ops)
+int ro_subscribe(const char * name, struct ro_sub_ops * ops)
{
struct ro_sub * sub;
int sid;
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index c7e17ff6..fd25dcd9 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -932,10 +932,10 @@ static int eth_llc_ipcp_name_query(char * name)
return ret;
}
-static int eth_llc_ipcp_flow_alloc(int fd,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos)
+static int eth_llc_ipcp_flow_alloc(int fd,
+ char * dst_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
uint8_t ssap = 0;
uint8_t r_addr[MAC_SIZE];
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index fd321780..ea408914 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -945,10 +945,10 @@ static int ipcp_udp_name_query(char * name)
return 0;
}
-static int ipcp_udp_flow_alloc(int fd,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos)
+static int ipcp_udp_flow_alloc(int fd,
+ char * dst_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
struct sockaddr_in r_saddr; /* server address */
struct sockaddr_in f_saddr; /* flow */
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 29327df4..aa5e4f8a 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -326,12 +326,12 @@ int ipcp_name_query(pid_t api,
return ret;
}
-int ipcp_flow_alloc(pid_t api,
- int port_id,
- pid_t n_api,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos)
+int ipcp_flow_alloc(pid_t api,
+ int port_id,
+ pid_t n_api,
+ char * dst_name,
+ char * src_ae_name,
+ qoscube_t qos)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
index 67b14ece..eb278e5b 100644
--- a/src/irmd/ipcp.h
+++ b/src/irmd/ipcp.h
@@ -42,17 +42,20 @@ int ipcp_bootstrap(pid_t api,
int ipcp_name_reg(pid_t api,
char * name);
+
int ipcp_name_unreg(pid_t api,
char * name);
+
int ipcp_name_query(pid_t api,
char * name);
-int ipcp_flow_alloc(pid_t api,
- int port_id,
- pid_t n_api,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos);
+int ipcp_flow_alloc(pid_t api,
+ int port_id,
+ pid_t n_api,
+ char * dst_name,
+ char * src_ae_name,
+ qoscube_t qos);
+
int ipcp_flow_alloc_resp(pid_t api,
int port_id,
pid_t n_api,
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 9eaca9fd..22971806 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -29,6 +29,7 @@ set(SOURCE_FILES
# Add source files here
bitmap.c
cdap.c
+ cdap_req.c
dev.c
hashtable.c
irm.c
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index df79be54..dee8f88c 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -25,39 +25,65 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
+
+#include "cdap_req.h"
#include <stdlib.h>
#include <pthread.h>
+#include <string.h>
+#include <assert.h>
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
typedef Opcode opcode_t;
+typedef int32_t invoke_id_t;
+
+#define INVALID_INVOKE_ID -1
#define IDS_SIZE 256
#define BUF_SIZE 2048
struct cdap {
- int fd;
- struct bmp * ids;
- pthread_mutex_t ids_lock;
- pthread_t reader;
- struct cdap_ops * ops;
+ int fd;
+
+ struct bmp * ids;
+ pthread_mutex_t ids_lock;
+
+ pthread_t reader;
+
+ struct list_head sent;
+ pthread_rwlock_t sent_lock;
+
+ struct list_head rcvd;
+ pthread_cond_t rcvd_cond;
+ pthread_mutex_t rcvd_lock;
};
-struct cdap_info {
- pthread_t thread;
- struct cdap * instance;
- cdap_t * msg;
+struct cdap_rcvd {
+ struct list_head next;
+
+ invoke_id_t iid;
+
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
};
static int next_invoke_id(struct cdap * instance)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_allocate(instance->ids);
if (!bmp_is_id_valid(instance->ids, ret))
- ret = -1; /* INVALID_INVOKE_ID */
+ ret = INVALID_INVOKE_ID;
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
@@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_release(instance->ids, id);
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
}
-static void * handle_cdap_msg(void * o)
+#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL)
+
+struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key)
{
- struct cdap_info * info = (struct cdap_info *) o;
- struct cdap * instance = info->instance;
- cdap_t * msg = info->msg;
-
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_READ,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_WRITE,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_CREATE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_DELETE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_START,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_STOP,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ struct list_head * p = NULL;
+ struct cdap_req * req = NULL;
+
+ assert(instance);
+ assert(key >= 0);
+
+ pthread_rwlock_rdlock(&instance->sent_lock);
+
+ list_for_each(p, &instance->sent) {
+ req = list_entry(p, struct cdap_req, next);
+ if (req->key == key) {
+ pthread_rwlock_unlock(&instance->sent_lock);
+ return req;
+ }
}
- free(info);
- cdap__free_unpacked(msg, NULL);
+ pthread_rwlock_unlock(&instance->sent_lock);
- return (void *) 0;
+ return NULL;
+}
+
+static int cdap_sent_add(struct cdap * instance, struct cdap_req * req)
+{
+ assert (instance);
+ assert (req);
+
+ if (cdap_sent_has_key(instance, req->key))
+ return -EPERM;
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_add(&req->next, &instance->sent);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+
+ return 0;
+}
+
+static void cdap_sent_del(struct cdap * instance, struct cdap_req * req)
+{
+ assert(instance);
+ assert(req);
+
+ assert(cdap_sent_has_key(instance, req->key));
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_del(&req->next);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_sent_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_req * req = list_entry(p, struct cdap_req, next);
+ list_del(&req->next);
+ cdap_req_destroy(req);
+ }
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_rcvd_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next);
+ list_del(&r->next);
+ if (r->data != NULL)
+ free(r->data);
+ if (r->name != NULL)
+ free(r->name);
+ free(r);
+ }
+
+ pthread_mutex_unlock(&instance->rcvd_lock);
}
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
+ struct cdap_req * req;
+ struct cdap_rcvd * rcvd;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
- struct cdap_info * cdap_info;
+ buffer_t data;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
if (len < 0)
- return (void *) -1;
+ continue;
msg = cdap__unpack(NULL, len, buf);
if (msg == NULL)
continue;
- cdap_info = malloc(sizeof(*cdap_info));
- if (cdap_info == NULL) {
- cdap__free_unpacked(msg, NULL);
- continue;
+ if (msg->opcode != OPCODE__REPLY) {
+ rcvd = malloc(sizeof(*rcvd));
+ if (rcvd == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+
+ switch (msg->opcode) {
+ case OPCODE__START:
+ rcvd->opcode = CDAP_START;
+ break;
+ case OPCODE__STOP:
+ rcvd->opcode = CDAP_STOP;
+ break;
+ case OPCODE__READ:
+ rcvd->opcode = CDAP_READ;
+ break;
+ case OPCODE__WRITE:
+ rcvd->opcode = CDAP_WRITE;
+ break;
+ case OPCODE__CREATE:
+ rcvd->opcode = CDAP_CREATE;
+ break;
+ case OPCODE__DELETE:
+ rcvd->opcode = CDAP_DELETE;
+ break;
+ default:
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ rcvd->iid = msg->invoke_id;
+ rcvd->flags = msg->flags;
+ rcvd->name = strdup(msg->name);
+ if (rcvd->name == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+
+ if (msg->has_value) {
+ rcvd->len = msg->value.len;
+ rcvd->data = malloc(rcvd->len);
+ if (rcvd->data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ memcpy(rcvd->data, msg->value.data, rcvd->len);
+ } else {
+ rcvd->len = 0;
+ rcvd->data = NULL;
+ }
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_add(&rcvd->next, &instance->rcvd);
+
+ pthread_cond_signal(&instance->rcvd_cond);
+ pthread_mutex_unlock(&instance->rcvd_lock);
+ } else {
+ req = cdap_sent_get_by_key(instance, msg->invoke_id);
+ if (req == NULL)
+ continue;
+
+ if (msg->has_value) {
+ data.len = msg->value.len;
+ data.data = malloc(data.len);
+ if (data.data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+ memcpy(data.data, msg->value.data, data.len);
+ } else {
+ data.len = 0;
+ data.data = NULL;
+ }
+
+ cdap_req_respond(req, msg->result, data);
}
- cdap_info->instance = instance;
- cdap_info->msg = msg;
-
- pthread_create(&cdap_info->thread,
- NULL,
- handle_cdap_msg,
- (void *) cdap_info);
-
- pthread_detach(cdap_info->thread);
-
+ cdap__free_unpacked(msg, NULL);
}
return (void *) 0;
}
-struct cdap * cdap_create(struct cdap_ops * ops,
- int fd)
+struct cdap * cdap_create(int fd)
{
struct cdap * instance = NULL;
int flags;
- if (ops == NULL || fd < 0 ||
- ops->cdap_reply == NULL ||
- ops->cdap_request == NULL)
+ if (fd < 0)
return NULL;
flags = flow_get_flags(fd);
@@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops,
return NULL;
}
- instance->ops = ops;
- instance->fd = fd;
+ if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_cond_init(&instance->rcvd_cond, NULL)) {
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
instance->ids = bmp_create(IDS_SIZE, 0);
if (instance->ids == NULL) {
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
free(instance);
return NULL;
}
- pthread_create(&instance->reader,
- NULL,
- sdu_reader,
- (void *) instance);
+ INIT_LIST_HEAD(&instance->sent);
+ INIT_LIST_HEAD(&instance->rcvd);
+
+ instance->fd = fd;
+
+ pthread_create(&instance->reader, NULL, sdu_reader, instance);
return instance;
}
@@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance)
pthread_mutex_unlock(&instance->ids_lock);
- flow_dealloc(instance->fd);
+ pthread_mutex_destroy(&instance->ids_lock);
+
+ cdap_sent_destroy(instance);
+
+ pthread_rwlock_destroy(&instance->sent_lock);
+
+ cdap_rcvd_destroy(instance);
+
+ pthread_mutex_destroy(&instance->rcvd_lock);
free(instance);
return 0;
}
-static int write_msg(struct cdap * instance,
- cdap_t * msg)
+static int write_msg(struct cdap * instance, cdap_t * msg)
{
int ret;
uint8_t * data;
size_t len;
+ assert(instance);
+ assert(msg);
+
len = cdap__get_packed_size(msg);
if (len == 0)
return -1;
- data = malloc(BUF_SIZE);
+ data = malloc(len);
if (data == NULL)
- return -1;
+ return -ENOMEM;
cdap__pack(msg, data);
@@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance,
return ret;
}
-int cdap_send_request(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static cdap_key_t invoke_id_to_key(invoke_id_t iid)
+{
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
+
+ return (cdap_key_t) iid;
+}
+
+static invoke_id_t key_to_invoke_id(cdap_key_t key)
+{
+ if (key == INVALID_CDAP_KEY)
+ return INVALID_INVOKE_ID;
+
+ return (invoke_id_t) key;
+}
+
+cdap_key_t cdap_request_send(struct cdap * instance,
+ enum cdap_opcode code,
+ char * name,
+ uint8_t * data,
+ size_t len,
+ uint32_t flags)
{
- int id;
cdap_t msg = CDAP__INIT;
+ struct cdap_req * req;
+ invoke_id_t iid;
+ cdap_key_t key;
if (instance == NULL || name == NULL)
- return -1;
+ return -EINVAL;
- id = next_invoke_id(instance);
- if (!bmp_is_id_valid(instance->ids, id))
- return -1;
+
+ iid = next_invoke_id(instance);
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
switch (code) {
case CDAP_READ:
@@ -315,39 +479,132 @@ int cdap_send_request(struct cdap * instance,
msg.opcode = OPCODE__STOP;
break;
default:
- release_invoke_id(instance, id);
- return -1;
+ release_invoke_id(instance, iid);
+ return -EINVAL;
}
msg.name = name;
msg.has_flags = true;
msg.flags = flags;
- msg.invoke_id = id;
+ msg.invoke_id = iid;
if (data != NULL) {
msg.has_value = true;
msg.value.data = data;
msg.value.len = len;
}
- if (write_msg(instance, &msg))
- return -1;
+ key = invoke_id_to_key(iid);
+
+ req = cdap_req_create(key);
+ if (req == NULL)
+ return INVALID_CDAP_KEY;
+
+ if (cdap_sent_add(instance, req)) {
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ if (write_msg(instance, &msg)) {
+ cdap_sent_del(instance, req);
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ return key;
+}
+
+int cdap_reply_wait(struct cdap * instance,
+ cdap_key_t key,
+ uint8_t ** data,
+ size_t * len)
+{
+ int ret;
+ struct cdap_req * r;
+ invoke_id_t iid = key_to_invoke_id(key);
+
+ if (instance == NULL || iid == INVALID_INVOKE_ID)
+ return -EINVAL;
+
+ r = cdap_sent_get_by_key(instance, key);
+ if (r == NULL)
+ return -EINVAL;
+
+ ret = cdap_req_wait(r);
+ if (ret < 0)
+ return ret;
+
+ if (r->response)
+ return r->response;
+
+ assert(ret == 0);
+
+ if (data != NULL) {
+ *data = r->data.data;
+ *len = r->data.len;
+ }
+
+ cdap_sent_del(instance, r);
- return id;
+ release_invoke_id(instance, iid);
+
+ return 0;
}
-int cdap_send_reply(struct cdap * instance,
- int invoke_id,
+cdap_key_t cdap_request_wait(struct cdap * instance,
+ enum cdap_opcode * opcode,
+ char ** name,
+ uint8_t ** data,
+ size_t * len,
+ uint32_t * flags)
+{
+ struct cdap_rcvd * rcvd;
+ invoke_id_t iid;
+
+ if (instance == NULL || opcode == NULL || name == NULL || data == NULL
+ || len == NULL || flags == NULL)
+ return -EINVAL;
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &instance->rcvd_lock);
+
+ while (list_empty(&instance->rcvd))
+ pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock);
+
+ rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
+
+ list_del(&rcvd->next);
+
+ pthread_cleanup_pop(true);
+
+ *opcode = rcvd->opcode;
+ *name = rcvd->name;
+ *data = rcvd->data;
+ *len = rcvd->len;
+ *flags = rcvd->flags;
+
+ iid = rcvd->iid;
+
+ free(rcvd);
+
+ return invoke_id_to_key(iid);
+}
+
+int cdap_reply_send(struct cdap * instance,
+ cdap_key_t key,
int result,
uint8_t * data,
size_t len)
{
cdap_t msg = CDAP__INIT;
+ invoke_id_t iid = key_to_invoke_id(key);
if (instance == NULL)
- return -1;
+ return -EINVAL;
msg.opcode = OPCODE__REPLY;
- msg.invoke_id = invoke_id;
+ msg.invoke_id = iid;
msg.has_result = true;
msg.result = result;
diff --git a/src/ipcpd/normal/cdap_request.c b/src/lib/cdap_req.c
index 8409b508..02fa0846 100644
--- a/src/ipcpd/normal/cdap_request.c
+++ b/src/lib/cdap_req.c
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016
*
- * Normal IPCP - RIB Manager - CDAP request
+ * CDAP - CDAP request management
*
* Sander Vrijders <[email protected]>
* Dimitri Staessens <[email protected]>
@@ -25,27 +25,25 @@
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
-#include "cdap_request.h"
+#include "cdap_req.h"
#include <stdlib.h>
+#include <assert.h>
-struct cdap_request * cdap_request_create(enum cdap_opcode code,
- char * name,
- int invoke_id,
- struct cdap * instance)
+struct cdap_req * cdap_req_create(cdap_key_t key)
{
- struct cdap_request * creq = malloc(sizeof(*creq));
+ struct cdap_req * creq = malloc(sizeof(*creq));
pthread_condattr_t cattr;
if (creq == NULL)
return NULL;
- creq->code = code;
- creq->name = name;
- creq->invoke_id = invoke_id;
- creq->instance = instance;
- creq->state = REQ_INIT;
- creq->result = -1;
+ creq->key = key;
+ creq->state = REQ_INIT;
+
+ creq->response = -1;
+ creq->data.data = NULL;
+ creq->data.len = 0;
pthread_condattr_init(&cattr);
#ifndef __APPLE__
@@ -56,13 +54,14 @@ struct cdap_request * cdap_request_create(enum cdap_opcode code,
INIT_LIST_HEAD(&creq->next);
+ clock_gettime(PTHREAD_COND_CLOCK, &creq->birth);
+
return creq;
}
-void cdap_request_destroy(struct cdap_request * creq)
+void cdap_req_destroy(struct cdap_req * creq)
{
- if (creq == NULL)
- return;
+ assert(creq);
pthread_mutex_lock(&creq->lock);
@@ -87,24 +86,19 @@ void cdap_request_destroy(struct cdap_request * creq)
pthread_cond_destroy(&creq->cond);
pthread_mutex_destroy(&creq->lock);
- if (creq->name != NULL)
- free(creq->name);
-
free(creq);
}
-int cdap_request_wait(struct cdap_request * creq)
+int cdap_req_wait(struct cdap_req * creq)
{
struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),
(CDAP_REPLY_TIMEOUT % 1000) * MILLION};
struct timespec abstime;
int ret = -1;
- if (creq == NULL)
- return -EINVAL;
+ assert(creq);
- clock_gettime(CLOCK_REALTIME, &abstime);
- ts_add(&abstime, &timeout, &abstime);
+ ts_add(&creq->birth, &timeout, &abstime);
pthread_mutex_lock(&creq->lock);
@@ -118,9 +112,8 @@ int cdap_request_wait(struct cdap_request * creq)
while (creq->state == REQ_PENDING) {
if ((ret = -pthread_cond_timedwait(&creq->cond,
&creq->lock,
- &abstime)) == -ETIMEDOUT) {
+ &abstime)) == -ETIMEDOUT)
break;
- }
}
if (creq->state == REQ_DESTROY)
@@ -134,10 +127,9 @@ int cdap_request_wait(struct cdap_request * creq)
return ret;
}
-void cdap_request_respond(struct cdap_request * creq, int response)
+void cdap_req_respond(struct cdap_req * creq, int response, buffer_t data)
{
- if (creq == NULL)
- return;
+ assert(creq);
pthread_mutex_lock(&creq->lock);
@@ -146,8 +138,10 @@ void cdap_request_respond(struct cdap_request * creq, int response)
return;
}
- creq->state = REQ_RESPONSE;
- creq->result = response;
+ creq->state = REQ_RESPONSE;
+ creq->response = response;
+ creq->data = data;
+
pthread_cond_broadcast(&creq->cond);
while (creq->state == REQ_RESPONSE)
diff --git a/src/ipcpd/normal/cdap_request.h b/src/lib/cdap_req.h
index 9cccfda5..714744ab 100644
--- a/src/ipcpd/normal/cdap_request.h
+++ b/src/lib/cdap_req.h
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016
*
- * Normal IPCP - RIB Manager - CDAP request
+ * CDAP - CDAP request management
*
* Sander Vrijders <[email protected]>
* Dimitri Staessens <[email protected]>
@@ -21,12 +21,13 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H
-#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H
+#ifndef OUROBOROS_CDAP_REQ_H
+#define OUROBOROS_CDAP_REQ_H
#include <ouroboros/config.h>
#include <ouroboros/cdap.h>
#include <ouroboros/list.h>
+#include <ouroboros/utils.h>
#include <pthread.h>
@@ -38,31 +39,31 @@ enum creq_state {
REQ_DESTROY
};
-struct cdap_request {
+struct cdap_req {
struct list_head next;
- enum cdap_opcode code;
- char * name;
- int invoke_id;
- struct cdap * instance;
+ struct timespec birth;
- int result;
+ cdap_key_t key;
+
+ int response;
+ buffer_t data;
enum creq_state state;
pthread_cond_t cond;
pthread_mutex_t lock;
};
-struct cdap_request * cdap_request_create(enum cdap_opcode code,
- char * name,
- int invoke_id,
- struct cdap * instance);
+struct cdap_req * cdap_req_create(cdap_key_t key);
+
+void cdap_req_destroy(struct cdap_req * creq);
-void cdap_request_destroy(struct cdap_request * creq);
+int cdap_req_wait(struct cdap_req * creq);
-int cdap_request_wait(struct cdap_request * creq);
+void cdap_req_respond(struct cdap_req * creq,
+ int response,
+ buffer_t data);
-void cdap_request_respond(struct cdap_request * creq,
- int response);
+enum creq_state cdap_req_get_state(struct cdap_req * creq);
-#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */
+#endif /* OUROBOROS_CDAP_REQ_H */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index bad56129..20976375 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -134,7 +134,7 @@ struct flow {
struct shm_flow_set * set;
int port_id;
int oflags;
- enum qos_cube qos;
+ qoscube_t qos;
pid_t api;
@@ -654,9 +654,8 @@ int flow_dealloc(int fd)
pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
- if (recv_msg == NULL) {
+ if (recv_msg == NULL)
return -1;
- }
if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
@@ -1435,11 +1434,10 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_unlock(&ai.data_lock);
shm_rbuff_fini(rb);
-
return 0;
}
-int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube)
+int ipcp_flow_get_qoscube(int fd, qoscube_t * cube)
{
if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
return -EINVAL;