diff options
author | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
commit | 5812dfb832e513dc455a0d48624bcad62334d457 (patch) | |
tree | 93a02e1b20f54bb869eadc856f201412c633315c /src/lib | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
parent | 021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff) | |
download | ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip |
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/lib/bitmap.c | 19 | ||||
-rw-r--r-- | src/lib/dev.c | 344 | ||||
-rw-r--r-- | src/lib/ipcp.c | 65 | ||||
-rw-r--r-- | src/lib/ipcpd_messages.proto | 6 | ||||
-rw-r--r-- | src/lib/irmd_messages.proto | 26 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 268 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 143 | ||||
-rw-r--r-- | src/lib/tests/shm_du_map_test.c | 53 |
9 files changed, 708 insertions, 217 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 4922e07c..53a7b354 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES ipcp.c irm.c list.c + shm_ap_rbuff.c shm_du_map.c sockets.c utils.c diff --git a/src/lib/bitmap.c b/src/lib/bitmap.c index 8aabb4f4..e84145b2 100644 --- a/src/lib/bitmap.c +++ b/src/lib/bitmap.c @@ -108,12 +108,14 @@ struct bmp * bmp_create(size_t bits, ssize_t offset) return NULL; tmp = malloc(sizeof(*tmp)); - if (!tmp) + if (tmp == NULL) return NULL; - tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(*(tmp->bitmap))); - if (!tmp->bitmap) + tmp->bitmap = malloc(BITS_TO_LONGS(bits) * sizeof(unsigned long)); + if (tmp->bitmap == NULL) { + free(tmp); return NULL; + } tmp->size = bits; tmp->offset = offset; @@ -140,7 +142,8 @@ int bmp_destroy(struct bmp * b) static ssize_t bad_id(struct bmp * b) { - assert(b); + if (b == NULL) + return -1; return b->offset - 1; } @@ -149,8 +152,8 @@ ssize_t bmp_allocate(struct bmp * b) { ssize_t id; - if (!b) - return bad_id(b); + if (b == NULL) + return -1; id = (ssize_t) find_next_zero_bit(b->bitmap, b->size); @@ -177,7 +180,7 @@ static bool is_id_valid(struct bmp * b, bool bmp_is_id_valid(struct bmp * b, ssize_t id) { - if (!b) + if (b == NULL) return false; return is_id_valid(b, id); @@ -188,7 +191,7 @@ int bmp_release(struct bmp * b, { ssize_t rid; - if (!b) + if (b == NULL) return -1; if (!is_id_valid(b, id)) diff --git a/src/lib/dev.c b/src/lib/dev.c index 6d8411c5..c99e8cdb 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,73 +25,190 @@ #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/sockets.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/instance_name.h> +#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/utils.h> #include <stdlib.h> +#include <string.h> -int ap_reg(char * ap_name, - char ** difs, - size_t difs_size) +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +struct flow { + struct shm_ap_rbuff * rb; + uint32_t port_id; + uint32_t oflags; + + /* don't think this needs locking */ +}; + +struct ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; +} * _ap_instance; + + +int ap_init(char * ap_name) { - irm_msg_t msg = IRM_MSG__INIT; + _ap_instance = malloc(sizeof(struct ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +#if 0 +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} +#endif + +int ap_reg(char ** difs, + size_t len) +{ + irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = bmp_allocate(_ap_instance->fds); - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } + if (_ap_instance == NULL) { + LOG_DBG("ap_init was not called"); + return -1; + } + msg.code = IRM_MSG_CODE__IRM_AP_REG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + if (recv_msg->result < 0) + fd = -1; + irm_msg__free_unpacked(recv_msg, NULL); return fd; } -int ap_unreg(char * ap_name, - char ** difs, - size_t difs_size) +int ap_unreg(char ** difs, + size_t len) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; int ret = -1; - if (ap_name == NULL || - difs == NULL || - difs_size == 0 || + if (difs == NULL || + len == 0 || difs[0] == NULL) { return -EINVAL; } msg.code = IRM_MSG_CODE__IRM_AP_UNREG; msg.has_pid = true; - msg.pid = getpid(); - msg.ap_name = ap_name; + msg.pid = _ap_instance->api->id; + msg.ap_name = _ap_instance->api->name; msg.dif_name = difs; - msg.n_dif_name = difs_size; + msg.n_dif_name = len; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -102,38 +219,62 @@ int ap_unreg(char * ap_name, return ret; } -int flow_accept(int fd, - char * ap_name, - char * ae_name) +int flow_accept(int fd, + char ** ap_name, + char ** ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int cli_fd = 0; - - if (ap_name == NULL) { - return -EINVAL; - } + int cfd = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - cli_fd = recv_msg->fd; - ap_name = recv_msg->ap_name; - ae_name = recv_msg->ae_name; + + cfd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[cfd].rb == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + *ap_name = strdup(recv_msg->ap_name); + if (*ap_name == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + if (ae_name != NULL) { + *ae_name = strdup(recv_msg->ae_name); + if (*ae_name == NULL) { + bmp_release(_ap_instance->fds, cfd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + } + + _ap_instance->flows[cfd].port_id = recv_msg->port_id; + _ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT; + + irm_msg__free_unpacked(recv_msg, NULL); - return cli_fd; + + bmp_release(_ap_instance->fds, fd); + + return cfd; } int flow_alloc_resp(int fd, @@ -145,9 +286,9 @@ int flow_alloc_resp(int fd, msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP; msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.pid = _ap_instance->api->id; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; msg.has_response = true; msg.response = response; @@ -155,7 +296,7 @@ int flow_alloc_resp(int fd, if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -167,41 +308,49 @@ int flow_alloc_resp(int fd, } int flow_alloc(char * dst_name, - char * src_ap_name, char * src_ae_name, - struct qos_spec * qos, - int oflags) + struct qos_spec * qos) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = 0; + int fd = -1; - if (dst_name == NULL || - src_ap_name == NULL) { + if (dst_name == NULL) return -EINVAL; - } if (src_ae_name == NULL) src_ae_name = UNKNOWN_AE; msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; msg.dst_name = dst_name; - msg.ap_name = src_ap_name; + msg.ap_name = _ap_instance->api->name; + msg.has_pid = true; + msg.pid = _ap_instance->api->id; msg.ae_name = src_ae_name; - msg.has_oflags = true; - msg.oflags = oflags; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_pid || !recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + fd = bmp_allocate(_ap_instance->fds); + + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid); + if (_ap_instance->flows[fd].rb == NULL) { + bmp_release(_ap_instance->fds, fd); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + _ap_instance->flows[fd].port_id = recv_msg->port_id; + _ap_instance->flows[fd].oflags = FLOW_O_DEFAULT; + irm_msg__free_unpacked(recv_msg, NULL); + return fd; } @@ -211,17 +360,15 @@ int flow_alloc_res(int fd) irm_msg_t * recv_msg = NULL; int result = 0; - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -238,17 +385,15 @@ int flow_dealloc(int fd) irm_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; + msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = _ap_instance->flows[fd].port_id; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -259,47 +404,50 @@ int flow_dealloc(int fd) return ret; } -int flow_cntl(int fd, int oflags) +int flow_cntl(int fd, int cmd, int oflags) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; - - msg.has_pid = true; - msg.pid = getpid(); - msg.has_fd = true; - msg.fd = fd; - msg.oflags = oflags; + return -1; +} - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) +ssize_t flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count + DU_BUFF_HEADSPACE + + DU_BUFF_TAILSPACE, + DU_BUFF_HEADSPACE, + (uint8_t *) buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + if (index == -1) return -1; - if (recv_msg->has_result == false) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; } - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } -ssize_t flow_write(int fd, - void * buf, - size_t count) +ssize_t flow_read(int fd, void * buf, size_t count) { - LOG_MISSING; + struct rb_entry * e = NULL; + int n; + uint8_t * sdu; + /* FIXME: move this to a thread */ + while (e == NULL || e->port_id != _ap_instance->flows[fd].port_id) + e = shm_ap_rbuff_read(_ap_instance->rb); + + n = shm_du_map_read_sdu(&sdu, + _ap_instance->dum, + e->index); + if (n < 0) + return -1; - return -1; -} + memcpy(buf, sdu, MIN(n, count)); -ssize_t flow_read(int fd, - void * buf, - size_t count) -{ - LOG_MISSING; + shm_release_du_buff(_ap_instance->dum, e->index); - return -1; + return n; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 387572b3..75676915 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -121,6 +121,8 @@ pid_t ipcp_create(char * ipcp_name, return pid; } + /* clear fd table */ + if (ipcp_type == IPCP_NORMAL) exec_name = IPCP_NORMAL_EXEC; else if (ipcp_type == IPCP_SHIM_UDP) @@ -286,13 +288,8 @@ int ipcp_enroll(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_ENROLL; - msg.member_name = malloc(sizeof(*(msg.member_name))); - if (msg.member_name == NULL) { - LOG_ERR("Failed to malloc."); - return -1; - } - msg.n_1_dif = n_1_dif; msg.member_name = member_name; + msg.n_1_dif = n_1_dif; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) { @@ -323,8 +320,8 @@ int ipcp_name_reg(pid_t pid, if (name == NULL) return -1; - msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; - msg.name = name; + msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; + msg.name = name; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -368,6 +365,7 @@ int ipcp_name_unreg(pid_t pid, int ipcp_flow_alloc(pid_t pid, uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, @@ -381,17 +379,19 @@ int ipcp_flow_alloc(pid_t pid, return -EINVAL; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; msg.src_ap_name = src_ap_name; msg.src_ae_name = src_ae_name; msg.dst_name = dst_name; - msg.port_id = port_id; - msg.has_port_id = true; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { ipcp_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -404,17 +404,20 @@ int ipcp_flow_alloc(pid_t pid, int ipcp_flow_alloc_resp(pid_t pid, uint32_t port_id, - int result) + pid_t n_pid, + int response) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; - msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; - msg.has_result = true; - msg.result = result; + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_pid = true; + msg.pid = n_pid; + msg.has_response = true; + msg.response = response; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) @@ -431,38 +434,38 @@ int ipcp_flow_alloc_resp(pid_t pid, return ret; } -int ipcp_flow_req_arr(pid_t pid, - char * dst_name, - char * src_ap_name, - char * src_ae_name) +int ipcp_flow_req_arr(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name) { irm_msg_t msg = IRM_MSG__INIT; irm_msg_t * recv_msg = NULL; - int fd = -1; + int port_id = -1; if (src_ap_name == NULL || src_ae_name == NULL) return -EINVAL; msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; + msg.has_pid = true; + msg.pid = pid; msg.dst_name = dst_name; msg.ap_name = src_ap_name; msg.ae_name = src_ae_name; - msg.pid = pid; - msg.has_pid = true; recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) return -1; - if (recv_msg->has_fd == false) { + if (!recv_msg->has_port_id) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } - fd = recv_msg->fd; + port_id = recv_msg->port_id; irm_msg__free_unpacked(recv_msg, NULL); - return fd; + return port_id; } int ipcp_flow_alloc_reply(pid_t pid, @@ -509,11 +512,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { ipcp_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; @@ -531,11 +534,11 @@ int ipcp_flow_dealloc(pid_t pid, recv_msg = send_recv_irm_msg(&msg); if (recv_msg == NULL) - return -1; + return 0; if (recv_msg->has_result == false) { irm_msg__free_unpacked(recv_msg, NULL); - return -1; + return 0; } ret = recv_msg->result; diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index da4bb469..daca011d 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -25,6 +25,8 @@ message ipcp_msg { optional string src_ap_name = 9; optional string src_ae_name = 10; optional dif_config_msg conf = 11; - optional int32 result = 12; - optional int32 fd = 13; + optional int32 fd = 12; + optional int32 pid = 13; + optional int32 response = 14; + optional int32 result = 15; }; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 89e2c882..c336614e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -36,13 +36,10 @@ enum irm_msg_code { IRM_FLOW_ALLOC = 11; IRM_FLOW_ALLOC_RES = 12; IRM_FLOW_DEALLOC = 13; - IRM_FLOW_CONTROL = 14; - IRM_FLOW_WRITE = 15; - IRM_FLOW_READ = 16; - IPCP_FLOW_REQ_ARR = 17; - IPCP_FLOW_ALLOC_REPLY = 18; - IPCP_FLOW_DEALLOC = 19; - IRM_REPLY = 20; + IPCP_FLOW_REQ_ARR = 14; + IPCP_FLOW_ALLOC_REPLY = 15; + IPCP_FLOW_DEALLOC = 16; + IRM_REPLY = 17; }; message irm_msg { @@ -52,12 +49,11 @@ message irm_msg { optional uint32 api_id = 3; optional uint32 ipcp_type = 5; repeated string dif_name = 6; - optional int32 fd = 7; - optional int32 response = 8; - optional int32 oflags = 9; - optional string dst_name = 10; - optional uint32 port_id = 11; - optional int32 pid = 12; - optional dif_config_msg conf = 13; - optional int32 result = 14; + optional int32 response = 7; + optional string dst_name = 8; + optional uint32 port_id = 9; + optional int32 pid = 10; + optional dif_config_msg conf = 11; + optional int32 cfd = 12; + optional int32 result = 13; }; diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c new file mode 100644 index 00000000..0a41dfb3 --- /dev/null +++ b/src/lib/shm_ap_rbuff.c @@ -0,0 +1,268 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Ring buffer for application processes + * + * Dimitri Staessens <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/shm_ap_rbuff.h> +#define OUROBOROS_PREFIX "shm_ap_rbuff" + +#include <ouroboros/logs.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <unistd.h> +#include <stdbool.h> +#include <errno.h> + +#define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry) \ + + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)) + +#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_RBUFF_SIZE - *rb->ptr_tail)\ + & (SHM_RBUFF_SIZE - 1)) +#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_RBUFF_SIZE) + +struct shm_ap_rbuff { + struct rb_entry * shm_base; /* start of entry */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pid_t pid; /* pid to which this rb belongs */ + int fd; +}; + +struct shm_ap_rbuff * shm_ap_rbuff_create() +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + pthread_mutexattr_t attr; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid()); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating ring buffer."); + free(rb); + return NULL; + } + + if (lseek(shm_fd, SHM_RBUFF_FILE_SIZE - 1, SEEK_SET) < 0) { + LOG_DBGF("Failed to extend ringbuffer."); + free(rb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of ringbuffer."); + free(rb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(rb->shm_mutex, &attr); + + *rb->ptr_head = 0; + *rb->ptr_tail = 0; + + rb->fd = shm_fd; + rb->pid = getpid(); + + return rb; +} + +struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t pid) +{ + struct shm_ap_rbuff * rb; + int shm_fd; + struct rb_entry * shm_base; + char fn[25]; + + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", pid); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(fn, O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed opening shared memory %s.", fn); + return NULL; + } + + shm_base = mmap(NULL, + SHM_RBUFF_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBGF("Failed to close invalid shm."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + + free(rb); + return NULL; + } + + rb->shm_base = shm_base; + rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE); + rb->ptr_tail = (size_t *) + ((uint8_t *) rb->ptr_head + sizeof(size_t)); + rb->shm_mutex = (pthread_mutex_t *) + ((uint8_t *) rb->ptr_tail + sizeof(size_t)); + + rb->fd = shm_fd; + rb->pid = pid; + + return rb; +} +void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rb); +} + +void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) +{ + char fn[25]; + + + if (rb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (rb->pid != getpid()) { + LOG_ERR("Tried to destroy other AP's rbuff."); + return; + } + + sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->pid); + + if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + if (shm_unlink(fn) == -1) + LOG_DBGF("Failed to unlink shm."); + + free(rb); +} + +int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) +{ + struct rb_entry * pos; + + if (rb == NULL || e == NULL) + return -1; + + pthread_mutex_lock(rb->shm_mutex); + + if (!shm_rbuff_free(rb)) { + pthread_mutex_unlock(rb->shm_mutex); + return -1; + } + + pos = rb->shm_base + *rb->ptr_head; + *pos = *e; + *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return 0; +} +struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) +{ + struct rb_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + if (rb == NULL) + return NULL; + + pthread_mutex_lock(rb->shm_mutex); + + if (shm_rbuff_used(rb) == 0) { + pthread_mutex_unlock(rb->shm_mutex); + return NULL; + } + + *e = *(rb->shm_base + *rb->ptr_tail); + + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + + pthread_mutex_unlock(rb->shm_mutex); + + return e; +} diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index dfccca6a..56062c9d 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -45,6 +45,9 @@ ((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ SHM_DU_BUFF_BLOCK_SIZE))) +#define idx_to_du_buff_ptr(dum, idx) \ + ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) + #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) @@ -52,27 +55,31 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ + idx_to_du_buff_ptr(dum, idx)->du_head) + #define MIN(a,b)(a < b ? a : b) struct shm_du_buff { - size_t size; - size_t du_head; - size_t du_tail; + size_t size; + size_t du_head; + size_t du_tail; + size_t garbage; }; struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - int fd; + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + int fd; }; struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; pthread_mutexattr_t attr; dum = malloc(sizeof *dum); @@ -141,7 +148,13 @@ struct shm_du_map * shm_du_map_open() { struct shm_du_map * dum; int shm_fd; - uint8_t * shm_base; + uint8_t * shm_base; + + dum = malloc(sizeof *dum); + if (dum == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); if (shm_fd == -1) { @@ -166,12 +179,6 @@ struct shm_du_map * shm_du_map_open() return NULL; } - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - dum->shm_base = shm_base; dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); @@ -195,38 +202,52 @@ void shm_du_map_close(struct shm_du_map * dum) if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) LOG_DBGF("Couldn't unmap shared memory."); + free(dum); +} + +void shm_du_map_destroy(struct shm_du_map * dum) +{ + if (dum == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) LOG_DBGF("Failed to unlink shm."); free(dum); } -struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +int shm_create_du_buff(struct shm_du_map * dum, + size_t size, + size_t headspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; long blocks = 0; int sz = size + sizeof *sdb; int sz2 = headspace + len + sizeof *sdb; - uint8_t * write_pos; + uint8_t * write_pos; size_t copy_len; + size_t index; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); - return NULL; + return -1; } if (headspace >= size) { LOG_DBGF("Index out of bounds."); - return NULL; + return -1; } if (headspace + len > size) { LOG_DBGF("Buffer too small for data."); - return NULL; + return -1; } pthread_mutex_lock(dum->shm_mutex); @@ -237,20 +258,20 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, if (sz2 < 0 && sz > 0) { pthread_mutex_unlock(dum->shm_mutex); LOG_DBG("Can't handle this packet now"); - return NULL; + return -1; } ++blocks; } if (!shm_map_free(dum, blocks)) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBGF("Allocation failed, Out of Memory."); - return NULL; + return -1; } sdb = get_head_ptr(dum); sdb->size = size; + sdb->garbage = 0; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -267,32 +288,76 @@ struct shm_du_buff * shm_create_du_buff(struct shm_du_map * dum, --blocks; } + index = *dum->ptr_head - 1; + pthread_mutex_unlock(dum->shm_mutex); - return sdb; + return index; } -int shm_release_du_buff(struct shm_du_map * dum) +/* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ +int shm_du_map_read_sdu(uint8_t ** dst, + struct shm_du_map * dum, + size_t idx) +{ + size_t len = 0; + + if (idx > SHM_BLOCKS_IN_MAP) + return -1; + + pthread_mutex_lock(dum->shm_mutex); + + if (*dum->ptr_head == *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return -1; + } + + *dst = ((uint8_t *) idx_to_du_buff_ptr(dum, idx)) + + sizeof(struct shm_du_buff) + + idx_to_du_buff_ptr(dum, idx)->du_head; + len = sdu_size(dum, idx); + + pthread_mutex_unlock(dum->shm_mutex); + + return len; +} + +int shm_release_du_buff(struct shm_du_map * dum, size_t idx) { long sz; long blocks = 0; + + /* FIXME: this is crap for the test */ + if (idx > SHM_BLOCKS_IN_MAP) + idx = *dum->ptr_tail; + pthread_mutex_lock(dum->shm_mutex); if (*dum->ptr_head == *dum->ptr_tail) { - LOG_DBGF("Attempt to free empty ringbuffer. Nothing to do."); pthread_mutex_unlock(dum->shm_mutex); return -1; } - sz = get_tail_ptr(dum)->size; + idx_to_du_buff_ptr(dum, idx)->garbage = 1; - while (sz + (long) sizeof (struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; + if (idx != *dum->ptr_tail) { + pthread_mutex_unlock(dum->shm_mutex); + return 0; + } + + while (get_tail_ptr(dum)->garbage == 1) { + sz = get_tail_ptr(dum)->size; + + while (sz + (long) sizeof (struct shm_du_buff) > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *(dum->ptr_tail) = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); } - *(dum->ptr_tail) = (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); pthread_mutex_unlock(dum->shm_mutex); return 0; @@ -317,7 +382,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, } uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -335,7 +400,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_buff * sdb, } int shm_du_buff_head_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -353,7 +418,7 @@ int shm_du_buff_head_release(struct shm_du_buff * sdb, } int shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { if (sdb == NULL) { LOG_DBGF("Bogus input, bugging out."); diff --git a/src/lib/tests/shm_du_map_test.c b/src/lib/tests/shm_du_map_test.c index 85a82e4d..55938a62 100644 --- a/src/lib/tests/shm_du_map_test.c +++ b/src/lib/tests/shm_du_map_test.c @@ -32,7 +32,7 @@ #include <ouroboros/logs.h> -#define SIZE_OF_DU_BUFF 24 +#define SIZE_OF_DU_BUFF 32 #define TEST_BUFF_SIZE (SHM_DU_BUFF_BLOCK_SIZE - SIZE_OF_DU_BUFF) #define MAX(a,b) (a > b ? a : b) @@ -44,7 +44,7 @@ void * produce() { struct shm_du_map * dum; long test_buf_size = 0; - uint8_t * test_values; + uint8_t * test_values; int headspace; int tailspace; long i; @@ -66,9 +66,8 @@ void * produce() test_values[i] = 170; clock_gettime(CLOCK_MONOTONIC, &starttime); - for (i = 1; i < SHM_BLOCKS_IN_MAP; i++) { - struct shm_du_buff * sdb; - size_t len; + for (i = 1; i < 16 * SHM_BLOCKS_IN_MAP; i++) { + size_t len; test_buf_size = TEST_BUFF_SIZE; @@ -77,21 +76,19 @@ void * produce() len = test_buf_size - (headspace + tailspace); - sdb = shm_create_du_buff(dum, - test_buf_size, - headspace, - test_values, - len); - - if (sdb != NULL) { - bytes_written += len; - } - else { - sync = -2; - break; + if (shm_create_du_buff(dum, + test_buf_size, + headspace, + test_values, + len) < 0) { + continue; } + + bytes_written += len; } + sync = -2; + clock_gettime(CLOCK_MONOTONIC, &stoptime); elapsed =(stoptime.tv_sec + stoptime.tv_nsec / 1000000000.0) - (starttime.tv_sec + starttime.tv_nsec / 1000000000.0); @@ -104,13 +101,14 @@ void * produce() sync = -1; + shm_du_map_close(dum); + return 0; } void * consume() { struct shm_du_map * dum; - struct timespec ts; ts.tv_sec = 0; @@ -123,10 +121,15 @@ void * consume() return (void *)-1; } - while (!sync) { - while (!shm_release_du_buff(dum)); - nanosleep(&ts, NULL); + while (true) { + shm_release_du_buff(dum, 1823429173941); + if (sync) + break; } + nanosleep(&ts, NULL); + + + shm_du_map_close(dum); return 0; } @@ -149,7 +152,7 @@ int shm_du_map_test(int argc, char ** argv) return -1; } - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -165,7 +168,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_create(&consumer, NULL, consume, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); @@ -173,6 +176,8 @@ int shm_du_map_test(int argc, char ** argv) LOG_INFO("starting concurrency test."); + sync = 0; + dum = shm_du_map_create(); res1 = (int) pthread_create(&producer, NULL, produce, NULL); @@ -181,7 +186,7 @@ int shm_du_map_test(int argc, char ** argv) pthread_join(producer, NULL); pthread_join(consumer, NULL); - shm_du_map_close(dum); + shm_du_map_destroy(dum); LOG_INFO("done."); |