summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-08 19:42:51 +0200
committerSander Vrijders <[email protected]>2016-10-11 16:15:33 +0200
commit69ef99bb2dc05337e8189acc42dc9122f4182ead (patch)
tree2808e55f80991cb9b59266c15726c982f3a4ca50 /src/ipcpd/normal/fmgr.c
parentdbf3ab8dfb2cbe8167c464e3cf6a9aa757bfff6a (diff)
downloadouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.tar.gz
ouroboros-69ef99bb2dc05337e8189acc42dc9122f4182ead.zip
ipcpd: normal: First version of the fast path bootstrap
This is the first version of the fast path bootstrap in the normal IPCP. It sets up a connection with the other end, and creates the appropriate data structures. N+1 and N-1 SDUs are read and written and passed through the right components.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c203
1 files changed, 150 insertions, 53 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index b6ec1984..25898661 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,6 +27,8 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/select.h>
+#include <ouroboros/errno.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,33 +39,41 @@
#include "ribmgr.h"
#include "frct.h"
#include "ipcp.h"
+#include "rmt.h"
+#include "shm_pci.h"
+#include "config.h"
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
struct n_flow {
- int fd;
- struct frct_i * frct_i;
+ int fd;
+ cep_id_t cep_id;
enum qos_cube qos;
struct list_head next;
};
struct n_1_flow {
- int fd;
- char * ae_name;
+ int fd;
+ char * ae_name;
struct list_head next;
};
struct {
- pthread_t listen_thread;
+ pthread_t n_1_flow_acceptor;
+ /* FIXME: Make this a table */
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ /* FIXME: Make this a table */
struct list_head n_flows;
/* FIXME: Make this a read/write lock */
pthread_mutex_t n_flows_lock;
+
+ struct flow_set * set;
+ pthread_t n_reader;
} fmgr;
static int add_n_1_fd(int fd, char * ae_name)
@@ -89,9 +99,37 @@ static int add_n_1_fd(int fd, char * ae_name)
return 0;
}
-static void * fmgr_listen(void * o)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_fd(int fd)
{
- int fd;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->fd == fd)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_cep_id(cep_id_t cep_id)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->cep_id == cep_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+static void * fmgr_n_1_acceptor(void * o)
+{
+ int fd;
char * ae_name;
while (true) {
@@ -139,7 +177,7 @@ static void * fmgr_listen(void * o)
if (strcmp(ae_name, DT_AE) == 0) {
/* FIXME: Pass correct QoS cube */
- if (frct_dt_flow(fd, 0)) {
+ if (rmt_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -156,6 +194,49 @@ static void * fmgr_listen(void * o)
return (void *) 0;
}
+static void * fmgr_n_reader(void * o)
+{
+ struct shm_du_buff * sdb;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
+ struct n_flow * flow;
+
+ while (true) {
+ int fd = flow_select(fmgr.set, &timeout);
+ if (fd == -ETIMEDOUT)
+ continue;
+
+ if (fd < 0) {
+ LOG_ERR("Failed to get active fd.");
+ continue;
+ }
+
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
+
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+ flow = get_n_flow_by_fd(fd);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
+
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ }
+
+ return (void *) 0;
+}
+
int fmgr_init()
{
INIT_LIST_HEAD(&fmgr.n_1_flows);
@@ -164,7 +245,12 @@ int fmgr_init()
pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);
pthread_mutex_init(&fmgr.n_flows_lock, NULL);
- pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL);
+ fmgr.set = flow_set_create();
+ if (fmgr.set == NULL)
+ return -1;
+
+ pthread_create(&fmgr.n_1_flow_acceptor, NULL, fmgr_n_1_acceptor, NULL);
+ pthread_create(&fmgr.n_reader, NULL, fmgr_n_reader, NULL);
return 0;
}
@@ -173,9 +259,11 @@ int fmgr_fini()
{
struct list_head * pos = NULL;
- pthread_cancel(fmgr.listen_thread);
+ pthread_cancel(fmgr.n_1_flow_acceptor);
+ pthread_cancel(fmgr.n_reader);
- pthread_join(fmgr.listen_thread, NULL);
+ pthread_join(fmgr.n_1_flow_acceptor, NULL);
+ pthread_join(fmgr.n_reader, NULL);
list_for_each(pos, &fmgr.n_1_flows) {
struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);
@@ -188,6 +276,8 @@ int fmgr_fini()
pthread_mutex_destroy(&fmgr.n_1_flows_lock);
pthread_mutex_destroy(&fmgr.n_flows_lock);
+ flow_set_destroy(fmgr.set);
+
return 0;
}
@@ -259,7 +349,7 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
return -1;
}
- if (frct_dt_flow(fd, qos)) {
+ if (rmt_dt_flow(fd, qos)) {
LOG_ERR("Failed to hand file descriptor to FRCT");
flow_dealloc(fd);
free(ae_name);
@@ -276,41 +366,13 @@ int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
return 0;
}
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_fd(int fd)
-{
- struct list_head * pos = NULL;
-
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->fd == fd)
- return e;
- }
-
- return NULL;
-}
-
-/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
-{
- struct list_head * pos = NULL;
-
- list_for_each(pos, &fmgr.n_flows) {
- struct n_flow * e = list_entry(pos, struct n_flow, next);
- if (e->frct_i == frct_i)
- return e;
- }
-
- return NULL;
-}
-
int fmgr_flow_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos)
{
struct n_flow * flow;
- struct frct_i * frct_i;
+ cep_id_t cep_id;
uint32_t address = 0;
buffer_t buf;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
@@ -343,8 +405,8 @@ int fmgr_flow_alloc(int fd,
pthread_mutex_lock(&fmgr.n_flows_lock);
- frct_i = frct_i_create(address, &buf, qos);
- if (frct_i == NULL) {
+ cep_id = frct_i_create(address, &buf, qos);
+ if (cep_id == INVALID_CEP_ID) {
free(buf.data);
free(flow);
pthread_mutex_unlock(&fmgr.n_flows_lock);
@@ -354,7 +416,7 @@ int fmgr_flow_alloc(int fd,
free(buf.data);
flow->fd = fd;
- flow->frct_i = frct_i;
+ flow->cep_id = cep_id;
flow->qos = qos;
INIT_LIST_HEAD(&flow->next);
@@ -374,6 +436,8 @@ static int n_flow_dealloc(int fd)
buffer_t buf;
int ret;
+ flow_set_del(fmgr.set, fd);
+
flow = get_n_flow_by_fd(fd);
if (flow == NULL)
return -1;
@@ -390,7 +454,7 @@ static int n_flow_dealloc(int fd)
flow_alloc_msg__pack(&msg, buf.data);
- ret = frct_i_destroy(flow->frct_i, &buf);
+ ret = frct_i_destroy(flow->cep_id, &buf);
list_del(&flow->next);
free(flow);
@@ -432,13 +496,16 @@ int fmgr_flow_alloc_resp(int fd, int response)
flow_alloc_msg__pack(&msg, buf.data);
if (response < 0) {
- frct_i_destroy(flow->frct_i, &buf);
+ frct_i_destroy(flow->cep_id, &buf);
free(buf.data);
list_del(&flow->next);
free(flow);
- } else if (frct_i_accept(flow->frct_i, &buf)) {
- pthread_mutex_unlock(&fmgr.n_flows_lock);
- return -1;
+ } else {
+ if (frct_i_accept(flow->cep_id, &buf, flow->qos)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ return -1;
+ }
+ flow_set_add(fmgr.set, fd);
}
pthread_mutex_unlock(&fmgr.n_flows_lock);
@@ -457,7 +524,8 @@ int fmgr_flow_dealloc(int fd)
return ret;
}
-int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
+int fmgr_frct_post_buf(cep_id_t cep_id,
+ buffer_t * buf)
{
struct n_flow * flow;
int ret = 0;
@@ -484,7 +552,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return -1;
}
- flow->frct_i = frct_i;
+ flow->cep_id = cep_id;
flow->qos = msg->qos_cube;
fd = ipcp_flow_req_arr(getpid(),
@@ -505,7 +573,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
list_add(&flow->next, &fmgr.n_flows);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
- flow = get_n_flow_by_frct_i(frct_i);
+ flow = get_n_flow_by_cep_id(cep_id);
if (flow == NULL) {
pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -517,11 +585,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
if (msg->response < 0) {
list_del(&flow->next);
free(flow);
+ } else {
+ flow_set_add(fmgr.set, flow->fd);
}
break;
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
- flow = get_n_flow_by_frct_i(frct_i);
+ flow = get_n_flow_by_cep_id(cep_id);
if (flow == NULL) {
pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
@@ -529,6 +599,8 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return -1;
}
+ flow_set_del(fmgr.set, flow->fd);
+
ret = flow_dealloc(flow->fd);
break;
default:
@@ -543,3 +615,28 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
return ret;
}
+
+int fmgr_frct_post_sdu(cep_id_t cep_id,
+ struct shm_du_buff * sdb)
+{
+ struct n_flow * flow;
+
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+
+ flow = get_n_flow_by_cep_id(cep_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ LOG_ERR("Failed to find N flow.");
+ return -1;
+ }
+
+ if (ipcp_flow_write(flow->fd, sdb)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ LOG_ERR("Failed to hand SDU to N flow.");
+ return -1;
+ }
+
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+
+ return 0;
+}