summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
committerSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
commit5812dfb832e513dc455a0d48624bcad62334d457 (patch)
tree93a02e1b20f54bb869eadc856f201412c633315c /src/lib/dev.c
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
parent021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff)
downloadouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz
ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c344
1 files changed, 246 insertions, 98 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 6d8411c5..c99e8cdb 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -25,73 +25,190 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/instance_name.h>
+#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/utils.h>
#include <stdlib.h>
+#include <string.h>
-int ap_reg(char * ap_name,
- char ** difs,
- size_t difs_size)
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+struct flow {
+ struct shm_ap_rbuff * rb;
+ uint32_t port_id;
+ uint32_t oflags;
+
+ /* don't think this needs locking */
+};
+
+struct ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+} * _ap_instance;
+
+
+int ap_init(char * ap_name)
{
- irm_msg_t msg = IRM_MSG__INIT;
+ _ap_instance = malloc(sizeof(struct ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+#if 0
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+#endif
+
+int ap_reg(char ** difs,
+ size_t len)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = bmp_allocate(_ap_instance->fds);
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
+ if (_ap_instance == NULL) {
+ LOG_DBG("ap_init was not called");
+ return -1;
+ }
+
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ if (recv_msg->result < 0)
+ fd = -1;
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int ap_unreg(char * ap_name,
- char ** difs,
- size_t difs_size)
+int ap_unreg(char ** difs,
+ size_t len)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
int ret = -1;
- if (ap_name == NULL ||
- difs == NULL ||
- difs_size == 0 ||
+ if (difs == NULL ||
+ len == 0 ||
difs[0] == NULL) {
return -EINVAL;
}
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = getpid();
- msg.ap_name = ap_name;
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
- msg.n_dif_name = difs_size;
+ msg.n_dif_name = len;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -102,38 +219,62 @@ int ap_unreg(char * ap_name,
return ret;
}
-int flow_accept(int fd,
- char * ap_name,
- char * ae_name)
+int flow_accept(int fd,
+ char ** ap_name,
+ char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int cli_fd = 0;
-
- if (ap_name == NULL) {
- return -EINVAL;
- }
+ int cfd = -1;
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- cli_fd = recv_msg->fd;
- ap_name = recv_msg->ap_name;
- ae_name = recv_msg->ae_name;
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ *ap_name = strdup(recv_msg->ap_name);
+ if (*ap_name == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ if (ae_name != NULL) {
+ *ae_name = strdup(recv_msg->ae_name);
+ if (*ae_name == NULL) {
+ bmp_release(_ap_instance->fds, cfd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+ }
+
+ _ap_instance->flows[cfd].port_id = recv_msg->port_id;
+ _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
+
+
irm_msg__free_unpacked(recv_msg, NULL);
- return cli_fd;
+
+ bmp_release(_ap_instance->fds, fd);
+
+ return cfd;
}
int flow_alloc_resp(int fd,
@@ -145,9 +286,9 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.pid = _ap_instance->api->id;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
msg.has_response = true;
msg.response = response;
@@ -155,7 +296,7 @@ int flow_alloc_resp(int fd,
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -167,41 +308,49 @@ int flow_alloc_resp(int fd,
}
int flow_alloc(char * dst_name,
- char * src_ap_name,
char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+ struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = 0;
+ int fd = -1;
- if (dst_name == NULL ||
- src_ap_name == NULL) {
+ if (dst_name == NULL)
return -EINVAL;
- }
if (src_ae_name == NULL)
src_ae_name = UNKNOWN_AE;
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = src_ap_name;
+ msg.ap_name = _ap_instance->api->name;
+ msg.has_pid = true;
+ msg.pid = _ap_instance->api->id;
msg.ae_name = src_ae_name;
- msg.has_oflags = true;
- msg.oflags = oflags;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_fd == false) {
+ if (!recv_msg->has_pid || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- fd = recv_msg->fd;
+ fd = bmp_allocate(_ap_instance->fds);
+
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ bmp_release(_ap_instance->fds, fd);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ _ap_instance->flows[fd].port_id = recv_msg->port_id;
+ _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+
irm_msg__free_unpacked(recv_msg, NULL);
+
return fd;
}
@@ -211,17 +360,15 @@ int flow_alloc_res(int fd)
irm_msg_t * recv_msg = NULL;
int result = 0;
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -238,17 +385,15 @@ int flow_dealloc(int fd)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_port_id = true;
+ msg.port_id = _ap_instance->flows[fd].port_id;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -259,47 +404,50 @@ int flow_dealloc(int fd)
return ret;
}
-int flow_cntl(int fd, int oflags)
+int flow_cntl(int fd, int cmd, int oflags)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_fd = true;
- msg.fd = fd;
- msg.oflags = oflags;
+ return -1;
+}
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ssize_t flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+ if (index == -1)
return -1;
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
}
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
+ return 0;
}
-ssize_t flow_write(int fd,
- void * buf,
- size_t count)
+ssize_t flow_read(int fd, void * buf, size_t count)
{
- LOG_MISSING;
+ struct rb_entry * e = NULL;
+ int n;
+ uint8_t * sdu;
+ /* FIXME: move this to a thread */
+ while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id)
+ e = shm_ap_rbuff_read(_ap_instance->rb);
+
+ n = shm_du_map_read_sdu(&sdu,
+ _ap_instance->dum,
+ e->index);
+ if (n < 0)
+ return -1;
- return -1;
-}
+ memcpy(buf, sdu, MIN(n, count));
-ssize_t flow_read(int fd,
- void * buf,
- size_t count)
-{
- LOG_MISSING;
+ shm_release_du_buff(_ap_instance->dum, e->index);
- return -1;
+ return n;
}