diff options
author | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
commit | 5812dfb832e513dc455a0d48624bcad62334d457 (patch) | |
tree | 93a02e1b20f54bb869eadc856f201412c633315c /src/lib/dev.c | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
parent | 021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff) | |
download | ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip |
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 344 |
1 files changed, 246 insertions, 98 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..c99e8cdb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h> #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, - char ** difs, - size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { + struct shm_ap_rbuff * rb; + uint32_t port_id; + uint32_t oflags; + + /* don't think this needs locking */ +}; + +struct ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name) { - irm_msg_t msg = IRM_MSG__INIT; + _ap_instance = malloc(sizeof(struct ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} +#endif + +int ap_reg(char ** difs, + size_t len) +{ + irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = bmp_allocate(_ap_instance->fds); - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } + if (_ap_instance == NULL) { + LOG_DBG("ap_init was not called"); + return -1; + } + msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + if (recv_msg->result < 0) + fd = -1; + irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int ap_unreg(char * ap_name, - char ** difs, - size_t difs_size) +int ap_unreg(char ** difs, + size_t len) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; int ret = -1; - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -102,38 +219,62 @@ int ap_unreg(char * ap_name, return ret; } -int flow_accept(int fd, - char * ap_name, - char * ae_name) +int flow_accept(int fd, + char ** ap_name, + char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cli_fd = 0; - - if (ap_name == NULL) { - return -EINVAL; - } + int cfd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - cli_fd = recv_msg->fd; - ap_name = recv_msg->ap_name; - ae_name = recv_msg->ae_name; + + cfd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + *ap_name = strdup(recv_msg->ap_name); + if (*ap_name == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + if (ae_name != NULL) { + *ae_name = strdup(recv_msg->ae_name); + if (*ae_name == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + } + + _ap_instance->flows[cfd].port_id = recv_msg->port_id; + _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; + + irm_msg__free_unpacked(recv_msg, NULL); - return cli_fd; + + bmp_release(_ap_instance->fds, fd); + + return cfd; } int flow_alloc_resp(int fd, @@ -145,9 +286,9 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; msg.has_response = true; msg.response = response; @@ -155,7 +296,7 @@ int flow_alloc_resp(int fd, if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -167,41 +308,49 @@ int flow_alloc_resp(int fd, } int flow_alloc(char * dst_name, - char * src_ap_name, char * src_ae_name, - struct qos_spec * qos, - int oflags) + struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = -1; - if (dst_name == NULL || - src_ap_name == NULL) { + if (dst_name == NULL) return -EINVAL; - } if (src_ae_name == NULL) src_ae_name = UNKNOWN_AE; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = src_ap_name; + msg.ap_name = _ap_instance->api->name; + msg.has_pid = true; + msg.pid = _ap_instance->api->id; msg.ae_name = src_ae_name; - msg.has_oflags = true; - msg.oflags = oflags; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + fd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[fd].rb == NULL) { + bmp_release(_ap_instance->fds, fd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + _ap_instance->flows[fd].port_id = recv_msg->port_id; + _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + irm_msg__free_unpacked(recv_msg, NULL); + return fd; } @@ -211,17 +360,15 @@ int flow_alloc_res(int fd) irm_msg_t * recv_msg = NULL; int result = 0; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -238,17 +385,15 @@ int flow_dealloc(int fd) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -259,47 +404,50 @@ int flow_dealloc(int fd) return ret; } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; - msg.oflags = oflags; + return -1; +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + if (index == -1) return -1; - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; } - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } -ssize_t flow_write(int fd, - void * buf, - size_t count) +ssize_t flow_read(int fd, void * buf, size_t count) { - LOG_MISSING; + struct rb_entry * e = NULL; + int n; + uint8_t * sdu; + /* FIXME: move this to a thread */ + while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) + e = shm_ap_rbuff_read(_ap_instance->rb); + + n = shm_du_map_read_sdu(&sdu, + _ap_instance->dum, + e->index); + if (n < 0) + return -1; - return -1; -} + memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, - void * buf, - size_t count) -{ - LOG_MISSING; + shm_release_du_buff(_ap_instance->dum, e->index); - return -1; + return n; } |