diff options
author | dimitri staessens <[email protected]> | 2016-08-29 19:49:39 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-08-29 20:32:54 +0200 |
commit | 2cc89f6da424ab503af563e0cc92dda43b8f8432 (patch) | |
tree | 303d3d61717d4d3018b8025a9825ff799da01c08 /src/lib | |
parent | caeefb4d96331d24b38e845c99d0517913a71671 (diff) | |
download | ouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.tar.gz ouroboros-2cc89f6da424ab503af563e0cc92dda43b8f8432.zip |
lib: Refactor shm_du_map to shm_rdrbuff
The shm_du_map is renamed to shm_rdrbuff to reflect the Random
Deletion Ringbuffer used in the implementation. The close_on_exit call
is removed and SDUs are cleaned up by the application in the ap_fini()
call. This required a non-blocking peek() operation in the shm_ap_rbuff.
Some initial implementation for future support of qos cubes has been
added to the shm_rdrbuff.
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/lib/dev.c | 34 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 28 | ||||
-rw-r--r-- | src/lib/shm_du_map.c | 767 | ||||
-rw-r--r-- | src/lib/shm_rdrbuff.c | 804 |
5 files changed, 850 insertions, 785 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 5e16c7e2..8c058dd8 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -33,7 +33,7 @@ set(SOURCE_FILES logs.c nsm.c shm_ap_rbuff.c - shm_du_map.c + shm_rdrbuff.c sockets.c time_utils.c utils.c diff --git a/src/lib/dev.c b/src/lib/dev.c index 3a5fc8e0..17c473ed 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -25,7 +25,7 @@ #include <ouroboros/dev.h> #include <ouroboros/sockets.h> #include <ouroboros/bitmap.h> -#include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/utils.h> @@ -45,7 +45,7 @@ struct flow { struct ap_data { char * ap_name; pid_t api; - struct shm_du_map * dum; + struct shm_rdrbuff * rdrb; struct bmp * fds; struct shm_ap_rbuff * rb; pthread_rwlock_t data_lock; @@ -105,8 +105,8 @@ int ap_init(char * ap_name) return -ENOMEM; } - _ap_instance->dum = shm_du_map_open(); - if (_ap_instance->dum == NULL) { + _ap_instance->rdrb = shm_rdrbuff_open(); + if (_ap_instance->rdrb == NULL) { bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -114,7 +114,7 @@ int ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { - shm_du_map_close(_ap_instance->dum); + shm_rdrbuff_close(_ap_instance->rdrb); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; @@ -146,12 +146,16 @@ void ap_fini(void) pthread_rwlock_wrlock(&_ap_instance->data_lock); + /* remove all remaining sdus */ + while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0) + shm_rdrbuff_remove(_ap_instance->rdrb, i); + if (_ap_instance->fds != NULL) bmp_destroy(_ap_instance->fds); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); - if (_ap_instance->dum != NULL) - shm_du_map_close_on_exit(_ap_instance->dum); + if (_ap_instance->rdrb != NULL) + shm_rdrbuff_close(_ap_instance->rdrb); pthread_rwlock_rdlock(&_ap_instance->flows_lock); @@ -515,7 +519,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) } if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_du_map_write(_ap_instance->dum, + idx = shm_rdrbuff_write(_ap_instance->rdrb, _ap_instance->flows[fd].api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, @@ -531,18 +535,18 @@ ssize_t flow_write(int fd, void * buf, size_t count) e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { - shm_du_map_remove(_ap_instance->dum, idx); + shm_rdrbuff_remove(_ap_instance->rdrb, idx); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; } } else { /* blocking */ - struct shm_du_map * dum = _ap_instance->dum; - pid_t api = _ap_instance->flows[fd].api; + struct shm_rdrbuff * rdrb = _ap_instance->rdrb; + pid_t api = _ap_instance->flows[fd].api; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ap_instance->data_lock); - idx = shm_du_map_write_b(dum, + idx = shm_rdrbuff_write_b(rdrb, api, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, @@ -567,7 +571,7 @@ ssize_t flow_write(int fd, void * buf, size_t count) int flow_select(const struct timespec * timeout) { - int port_id = shm_ap_rbuff_peek(_ap_instance->rb, timeout); + int port_id = shm_ap_rbuff_peek_b(_ap_instance->rb, timeout); if (port_id < 0) return port_id; return port_id_to_fd(port_id); @@ -612,7 +616,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) return -EAGAIN; } - n = shm_du_map_read(&sdu, _ap_instance->dum, idx); + n = shm_rdrbuff_read(&sdu, _ap_instance->rdrb, idx); if (n < 0) { pthread_rwlock_unlock(&_ap_instance->data_lock); return -1; @@ -620,7 +624,7 @@ ssize_t flow_read(int fd, void * buf, size_t count) memcpy(buf, sdu, MIN(n, count)); - shm_du_map_remove(_ap_instance->dum, idx); + shm_rdrbuff_remove(_ap_instance->rdrb, idx); pthread_rwlock_unlock(&_ap_instance->data_lock); diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 4ca29636..f21b1e86 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -285,8 +285,32 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) return 0; } -int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) +{ + int ret = 0; + + if (rb == NULL) + return -EINVAL; + + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } + + if (shm_rbuff_empty(rb)) { + pthread_mutex_unlock(rb->lock); + return -1; + } + + ret = (rb->shm_base + *rb->ptr_tail)->index; + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c deleted file mode 100644 index 9ca282b9..00000000 --- a/src/lib/shm_du_map.c +++ /dev/null @@ -1,767 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Shared memory map for data units - * - * Dimitri Staessens <[email protected]> - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/errno.h> -#include <ouroboros/shm_du_map.h> -#include <ouroboros/shm_ap_rbuff.h> -#include <ouroboros/time_utils.h> - -#include <pthread.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <unistd.h> -#include <stdlib.h> -#include <string.h> -#include <signal.h> -#include <sys/stat.h> - -#define OUROBOROS_PREFIX "shm_du_map" - -#include <ouroboros/logs.h> - -#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ - + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ - + sizeof(pid_t)) - -#define get_head_ptr(dum) \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_head * \ - SHM_DU_BUFF_BLOCK_SIZE))) - -#define get_tail_ptr(dum) \ -((struct shm_du_buff *)(dum->shm_base + (*dum->ptr_tail * \ - SHM_DU_BUFF_BLOCK_SIZE))) - -#define idx_to_du_buff_ptr(dum, idx) \ - ((struct shm_du_buff *)(dum->shm_base + (idx * SHM_DU_BUFF_BLOCK_SIZE))) - -#define block_ptr_to_idx(dum, sdb) \ - (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) - -#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ - & (SHM_BUFFER_SIZE - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE) - -#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) - -struct shm_du_buff { - size_t size; -#ifdef SHM_DU_MAP_MULTI_BLOCK - size_t blocks; -#endif - size_t du_head; - size_t du_tail; - pid_t dst_api; -}; - -struct shm_du_map { - uint8_t * shm_base; /* start of blocks */ - size_t * ptr_head; /* start of ringbuffer head */ - size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * lock; /* lock all free space in shm */ - size_t * choked; /* stale sdu detection */ - pthread_cond_t * healthy; /* du map is healthy */ - pthread_cond_t * full; /* run sanitizer when buffer full */ - pid_t * api; /* api of the irmd owner */ - int fd; -}; - -static void garbage_collect(struct shm_du_map * dum) -{ -#ifdef SHM_DU_MAP_MULTI_BLOCK - struct shm_du_buff * sdb; - while (!shm_map_empty(dum) && (sdb = get_tail_ptr(dum))->dst_api == -1) - *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) - & (SHM_BUFFER_SIZE - 1); -#else - while (!shm_map_empty(dum) && get_tail_ptr(dum)->dst_api == -1) - *dum->ptr_tail = - (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); - -#endif -} - -static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit) -{ - size_t idx = *dum->ptr_tail; - struct shm_du_buff * buf; - - while (idx != *dum->ptr_head) { - buf = idx_to_du_buff_ptr(dum, idx); - if (buf->dst_api == api) - buf->dst_api = -1; -#ifdef SHM_DU_MAP_MULTI_BLOCK - idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); -#else - idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); -#endif - } - - garbage_collect(dum); - - if (!exit && kill(api, 0) == 0) { - struct shm_ap_rbuff * rb; - rb = shm_ap_rbuff_open(api); - if (rb != NULL) { - shm_ap_rbuff_reset(rb); - shm_ap_rbuff_close(rb); - } - } - - *dum->choked = 0; -} - -struct shm_du_map * shm_du_map_create() -{ - struct shm_du_map * dum; - int shm_fd; - uint8_t * shm_base; - pthread_mutexattr_t mattr; - pthread_condattr_t cattr; - - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_CREAT | O_EXCL | O_RDWR, 0666); - if (shm_fd == -1) { - LOG_DBGF("Failed creating shared memory map."); - free(dum); - return NULL; - } - - if (fchmod(shm_fd, 0666)) { - LOG_DBGF("Failed to chmod shared memory map."); - free(dum); - return NULL; - } - - if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { - LOG_DBGF("Failed to extend shared memory map."); - free(dum); - return NULL; - } - - if (write(shm_fd, "", 1) != 1) { - LOG_DBGF("Failed to finalise extension of shared memory map."); - free(dum); - return NULL; - } - - shm_base = mmap(NULL, - SHM_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - - if (shm_base == MAP_FAILED) { - LOG_DBGF("Failed to map shared memory."); - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to remove invalid shm."); - - free(dum); - return NULL; - } - - dum->shm_base = shm_base; - dum->ptr_head = (size_t *) - ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); - dum->ptr_tail = dum->ptr_head + 1; - dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->lock + 1); - dum->healthy = (pthread_cond_t *) (dum->choked + 1); - dum->full = dum->healthy + 1; - dum->api = (pid_t *) (dum->full + 1); - - pthread_mutexattr_init(&mattr); - pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); - pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); - pthread_mutex_init(dum->lock, &mattr); - - pthread_condattr_init(&cattr); - pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(dum->full, &cattr); - pthread_cond_init(dum->healthy, &cattr); - - *dum->ptr_head = 0; - *dum->ptr_tail = 0; - - *dum->choked = 0; - - *dum->api = getpid(); - - dum->fd = shm_fd; - - return dum; -} - -struct shm_du_map * shm_du_map_open() -{ - struct shm_du_map * dum; - int shm_fd; - uint8_t * shm_base; - - dum = malloc(sizeof *dum); - if (dum == NULL) { - LOG_DBGF("Could not allocate struct."); - return NULL; - } - - shm_fd = shm_open(SHM_DU_MAP_FILENAME, O_RDWR, 0666); - if (shm_fd < 0) { - LOG_DBGF("Failed opening shared memory."); - free(dum); - return NULL; - } - - shm_base = mmap(NULL, - SHM_FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - shm_fd, - 0); - if (shm_base == MAP_FAILED) { - LOG_DBGF("Failed to map shared memory."); - if (close(shm_fd) == -1) - LOG_DBGF("Failed to close invalid shm."); - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to unlink invalid shm."); - free(dum); - return NULL; - } - - dum->shm_base = shm_base; - dum->ptr_head = (size_t *) - ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); - dum->ptr_tail = dum->ptr_head + 1; - dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->lock + 1); - dum->healthy = (pthread_cond_t *) (dum->choked + 1); - dum->full = dum->healthy + 1; - dum->api = (pid_t *) (dum->full + 1); - - dum->fd = shm_fd; - - return dum; -} - -void * shm_du_map_sanitize(void * o) -{ - struct shm_du_map * dum = (struct shm_du_map *) o; - struct timespec intv - = {SHM_DU_TIMEOUT_MICROS / MILLION, - (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; - - pid_t api; - - if (dum == NULL) - return (void *) -1; - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) dum->lock); - - while (true) { - int ret = 0; - struct timespec now; - struct timespec dl; - - if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - *dum->choked = 1; - - garbage_collect(dum); - - if (shm_map_empty(dum)) - continue; - - api = get_tail_ptr(dum)->dst_api; - - if (kill(api, 0)) { - LOG_DBGF("Dead process %d left stale sdu.", api); - clean_sdus(dum, api, false); - continue; - } - - clock_gettime(CLOCK_REALTIME, &now); - ts_add(&now, &intv, &dl); - while (*dum->choked) { - ret = pthread_cond_timedwait(dum->healthy, - dum->lock, - &dl); - if (!ret) - continue; - - if (ret == EOWNERDEAD) { - LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (ret == ETIMEDOUT) { - LOG_DBGF("SDU timed out (dst: %d).", api); - clean_sdus(dum, api, false); - } - } - } - - pthread_cleanup_pop(true); - - return (void *) 0; -} - -void shm_du_map_close_on_exit(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - clean_sdus(dum, getpid(), true); - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - free(dum); -} - -void shm_du_map_close(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - free(dum); -} - -void shm_du_map_destroy(struct shm_du_map * dum) -{ - if (dum == NULL) { - LOG_DBGF("Bogus input. Bugging out."); - return; - } - - if (getpid() != *dum->api && kill(*dum->api, 0) == 0) { - LOG_DBGF("Only IRMd can destroy %s.", SHM_DU_MAP_FILENAME); - return; - } - - if (close(dum->fd) < 0) - LOG_DBGF("Couldn't close shared memory."); - - if (munmap(dum->shm_base, SHM_FILE_SIZE) == -1) - LOG_DBGF("Couldn't unmap shared memory."); - - if (shm_unlink(SHM_DU_MAP_FILENAME) == -1) - LOG_DBGF("Failed to unlink shm."); - - free(dum); -} - -ssize_t shm_du_map_write(struct shm_du_map * dum, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t len) -{ - struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK - long blocks = 0; - long padblocks = 0; -#endif - int sz = size + sizeof *sdb; - uint8_t * write_pos; - ssize_t idx = -1; - - if (dum == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } - -#ifndef SHM_DU_MAP_MULTI_BLOCK - if (sz > SHM_DU_BUFF_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); - return -1; - } -#endif - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } -#ifdef SHM_DU_MAP_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) - padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - - if (!shm_map_free(dum, (blocks + padblocks))) { -#else - if (!shm_map_free(dum, 1)) { -#endif - pthread_cond_signal(dum->full); - pthread_mutex_unlock(dum->lock); - return -1; - } - -#ifdef SHM_DU_MAP_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(dum); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->dst_api = -1; - sdb->du_head = 0; - sdb->du_tail = 0; - - *dum->ptr_head = 0; - } -#endif - sdb = get_head_ptr(dum); - sdb->size = size; - sdb->dst_api = dst_api; - sdb->du_head = headspace; - sdb->du_tail = sdb->du_head + len; -#ifdef SHM_DU_MAP_MULTI_BLOCK - sdb->blocks = blocks; -#endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); - - idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif - pthread_mutex_unlock(dum->lock); - - return idx; -} - -ssize_t shm_du_map_write_b(struct shm_du_map * dum, - pid_t dst_api, - size_t headspace, - size_t tailspace, - uint8_t * data, - size_t len) -{ - struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; -#ifdef SHM_DU_MAP_MULTI_BLOCK - long blocks = 0; - long padblocks = 0; -#endif - int sz = size + sizeof *sdb; - uint8_t * write_pos; - ssize_t idx = -1; - - if (dum == NULL || data == NULL) { - LOG_DBGF("Bogus input, bugging out."); - return -1; - } - -#ifndef SHM_DU_MAP_MULTI_BLOCK - if (sz > SHM_DU_BUFF_BLOCK_SIZE) { - LOG_DBGF("Multi-block SDU's disabled. Dropping."); - return -1; - } -#endif - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) dum->lock); - -#ifdef SHM_DU_MAP_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } - - if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE) - padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; - - while (!shm_map_free(dum, (blocks + padblocks))) { -#else - while (!shm_map_free(dum, 1)) { -#endif - pthread_cond_signal(dum->full); - pthread_cond_wait(dum->healthy, dum->lock); - } - -#ifdef SHM_DU_MAP_MULTI_BLOCK - if (padblocks) { - sdb = get_head_ptr(dum); - sdb->size = 0; - sdb->blocks = padblocks; - sdb->dst_api = -1; - sdb->du_head = 0; - sdb->du_tail = 0; - - *dum->ptr_head = 0; - } -#endif - sdb = get_head_ptr(dum); - sdb->size = size; - sdb->dst_api = dst_api; - sdb->du_head = headspace; - sdb->du_tail = sdb->du_head + len; -#ifdef SHM_DU_MAP_MULTI_BLOCK - sdb->blocks = blocks; -#endif - write_pos = ((uint8_t *) (sdb + 1)) + headspace; - - memcpy(write_pos, data, len); - - idx = *dum->ptr_head; -#ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); -#else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); -#endif - pthread_cleanup_pop(true); - - return idx; -} - -int shm_du_map_read(uint8_t ** dst, - struct shm_du_map * dum, - ssize_t idx) -{ - size_t len = 0; - struct shm_du_buff * sdb; - - if (idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->lock); - return -1; - } - - sdb = idx_to_du_buff_ptr(dum, idx); - len = sdb->du_tail - sdb->du_head; - *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - - pthread_mutex_unlock(dum->lock); - - return len; -} - -int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) -{ - if (idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->lock); - return -1; - } - - idx_to_du_buff_ptr(dum, idx)->dst_api = -1; - - if (idx != *dum->ptr_tail) { - pthread_mutex_unlock(dum->lock); - return 0; - } - - garbage_collect(dum); - - *dum->choked = 0; - - pthread_cond_broadcast(dum->healthy); - - pthread_mutex_unlock(dum->lock); - - return 0; -} - -uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - uint8_t * buf; - - if (dum == NULL) - return NULL; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return NULL; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if ((long) (sdb->du_head - size) < 0) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Failed to allocate PCI headspace."); - return NULL; - } - - sdb->du_head -= size; - - buf = (uint8_t *) (sdb + 1) + sdb->du_head; - - pthread_mutex_unlock(dum->lock); - - return buf; -} - -uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - uint8_t * buf; - - if (dum == NULL) - return NULL; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return NULL; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (sdb->du_tail + size >= sdb->size) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Failed to allocate PCI tailspace."); - return NULL; - } - - buf = (uint8_t *) (sdb + 1) + sdb->du_tail; - - sdb->du_tail += size; - - pthread_mutex_unlock(dum->lock); - - return buf; -} - -int shm_du_buff_head_release(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - - if (dum == NULL) - return -1; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Tried to release beyond sdu boundary."); - return -EOVERFLOW; - } - - sdb->du_head += size; - - pthread_mutex_unlock(dum->lock); - - return 0; -} - -int shm_du_buff_tail_release(struct shm_du_map * dum, - int idx, - ssize_t size) -{ - struct shm_du_buff * sdb; - - if (dum == NULL) - return -1; - - if (idx < 0 || idx > SHM_BUFFER_SIZE) - return -1; - - if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { - LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->lock); - } - - sdb = idx_to_du_buff_ptr(dum, idx); - - if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->lock); - LOG_DBGF("Tried to release beyond sdu boundary."); - return -EOVERFLOW; - } - - sdb->du_tail -= size; - - pthread_mutex_unlock(dum->lock); - - return 0; -} diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c new file mode 100644 index 00000000..d42dbea7 --- /dev/null +++ b/src/lib/shm_rdrbuff.c @@ -0,0 +1,804 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * Random Deletion Ring Buffer for Data Units + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/shm_rdrbuff.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> + +#include <pthread.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <sys/stat.h> + +#define OUROBOROS_PREFIX "shm_rdrbuff" + +#include <ouroboros/logs.h> + +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_RDRB_BLOCK_SIZE) +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + + sizeof(pid_t)) + +#define get_head_ptr(rdrb) \ + ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_head * \ + SHM_RDRB_BLOCK_SIZE))) + +#define get_tail_ptr(rdrb) \ + ((struct shm_du_buff *)(rdrb->shm_base + (*rdrb->ptr_tail * \ + SHM_RDRB_BLOCK_SIZE))) + +#define idx_to_du_buff_ptr(rdrb, idx) \ + ((struct shm_du_buff *)(rdrb->shm_base + (idx * SHM_RDRB_BLOCK_SIZE))) + +#define block_ptr_to_idx(rdrb, sdb) \ + (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) + +#define shm_rdrb_used(rdrb) \ + ((*rdrb->ptr_head + SHM_BUFFER_SIZE - *rdrb->ptr_tail) \ + & (SHM_BUFFER_SIZE - 1)) +#define shm_rdrb_free(rdrb, i) \ + (shm_rdrb_used(rdrb) + i < SHM_BUFFER_SIZE) + +#define shm_rdrb_empty(rdrb) \ + (*rdrb->ptr_tail == *rdrb->ptr_head) + +struct shm_du_buff { + size_t size; +#ifdef SHM_DU_MAP_MULTI_BLOCK + size_t blocks; +#endif + size_t du_head; + size_t du_tail; + pid_t dst_api; +}; + +struct shm_rdrbuff { + uint8_t * shm_base; /* start of blocks */ + size_t * ptr_head; /* start of ringbuffer head */ + size_t * ptr_tail; /* start of ringbuffer tail */ + pthread_mutex_t * lock; /* lock all free space in shm */ + size_t * choked; /* stale sdu detection */ + pthread_cond_t * healthy; /* du map is healthy */ + pthread_cond_t * full; /* run sanitizer when buffer full */ + pid_t * api; /* api of the irmd owner */ + enum qos_cube qos; /* qos id which this buffer serves */ + int fd; +}; + +static void garbage_collect(struct shm_rdrbuff * rdrb) +{ +#ifdef SHM_RDRBUFF_MULTI_BLOCK + struct shm_du_buff * sdb; + while (!shm_rdrb_empty(rdrb) && + (sdb = get_tail_ptr(rdrb))->dst_api == -1) + *rdrb->ptr_tail = (*rdrb->ptr_tail + sdb->blocks) + & (SHM_BUFFER_SIZE - 1); +#else + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->dst_api == -1) + *rdrb->ptr_tail = + (*rdrb->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); + +#endif +} + +static void clean_sdus(struct shm_rdrbuff * rdrb, pid_t api) +{ + size_t idx = *rdrb->ptr_tail; + struct shm_du_buff * buf; + + while (idx != *rdrb->ptr_head) { + buf = idx_to_du_buff_ptr(rdrb, idx); + if (buf->dst_api == api) + buf->dst_api = -1; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); +#else + idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); +#endif + } + + garbage_collect(rdrb); + + *rdrb->choked = 0; +} + +static char * rdrb_filename(enum qos_cube qos) +{ + int chars = 0; + char * str; + int qm = QOS_MAX; + + do { + qm /= 10; + ++chars; + } while (qm > 0); + + str = malloc(strlen(SHM_RDRB_PREFIX) + chars + 2); + if (str == NULL) { + LOG_ERR("Failed to create shm_rdrbuff: Out of Memory."); + return NULL; + } + + sprintf(str, "%s.%d", SHM_RDRB_PREFIX, (int) qos); + + return str; +} + +/* FIXME: create a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_create() +{ + struct shm_rdrbuff * rdrb; + int shm_fd; + uint8_t * shm_base; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + enum qos_cube qos = QOS_CUBE_BE; + char * shm_rdrb_fn = rdrb_filename(qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return NULL; + } + + rdrb = malloc(sizeof *rdrb); + if (rdrb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(shm_rdrb_fn, O_CREAT | O_EXCL | O_RDWR, 0666); + if (shm_fd == -1) { + LOG_DBGF("Failed creating shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (fchmod(shm_fd, 0666)) { + LOG_DBGF("Failed to chmod shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (ftruncate(shm_fd, SHM_FILE_SIZE - 1) < 0) { + LOG_DBGF("Failed to extend shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + if (write(shm_fd, "", 1) != 1) { + LOG_DBGF("Failed to finalise extension of shared memory map."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBGF("Failed to remove invalid shm."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + rdrb->shm_base = shm_base; + rdrb->ptr_head = (size_t *) + ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->ptr_tail = rdrb->ptr_head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); + rdrb->choked = (size_t *) (rdrb->lock + 1); + rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); + rdrb->full = rdrb->healthy + 1; + rdrb->api = (pid_t *) (rdrb->full + 1); + + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); + pthread_mutex_init(rdrb->lock, &mattr); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(rdrb->full, &cattr); + pthread_cond_init(rdrb->healthy, &cattr); + + *rdrb->ptr_head = 0; + *rdrb->ptr_tail = 0; + + *rdrb->choked = 0; + + *rdrb->api = getpid(); + + rdrb->qos = qos; + rdrb->fd = shm_fd; + + free(shm_rdrb_fn); + + return rdrb; +} + +/* FIXME: open a ringbuffer for each qos cube in the system */ +struct shm_rdrbuff * shm_rdrbuff_open() +{ + struct shm_rdrbuff * rdrb; + int shm_fd; + uint8_t * shm_base; + + enum qos_cube qos = QOS_CUBE_BE; + char * shm_rdrb_fn = rdrb_filename(qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return NULL; + } + + rdrb = malloc(sizeof *rdrb); + if (rdrb == NULL) { + LOG_DBGF("Could not allocate struct."); + return NULL; + } + + shm_fd = shm_open(shm_rdrb_fn, O_RDWR, 0666); + if (shm_fd < 0) { + LOG_DBGF("Failed opening shared memory."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + shm_base = mmap(NULL, + SHM_FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + shm_fd, + 0); + if (shm_base == MAP_FAILED) { + LOG_DBGF("Failed to map shared memory."); + if (close(shm_fd) == -1) + LOG_DBG("Failed to close invalid shm."); + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBG("Failed to unlink invalid shm."); + free(shm_rdrb_fn); + free(rdrb); + return NULL; + } + + rdrb->shm_base = shm_base; + rdrb->ptr_head = (size_t *) + ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); + rdrb->ptr_tail = rdrb->ptr_head + 1; + rdrb->lock = (pthread_mutex_t *) (rdrb->ptr_tail + 1); + rdrb->choked = (size_t *) (rdrb->lock + 1); + rdrb->healthy = (pthread_cond_t *) (rdrb->choked + 1); + rdrb->full = rdrb->healthy + 1; + rdrb->api = (pid_t *) (rdrb->full + 1); + + rdrb->qos = qos; + rdrb->fd = shm_fd; + + free(shm_rdrb_fn); + + return rdrb; +} + +void * shm_rdrbuff_sanitize(void * o) +{ + struct shm_rdrbuff * rdrb = (struct shm_rdrbuff *) o; + struct timespec intv + = {SHM_DU_TIMEOUT_MICROS / MILLION, + (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + + pid_t api; + + if (rdrb == NULL) + return (void *) -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) rdrb->lock); + + while (true) { + int ret = 0; + struct timespec now; + struct timespec dl; + + if (pthread_cond_wait(rdrb->full, rdrb->lock) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + *rdrb->choked = 1; + + garbage_collect(rdrb); + + if (shm_rdrb_empty(rdrb)) + continue; + + api = get_tail_ptr(rdrb)->dst_api; + + if (kill(api, 0)) { + LOG_DBGF("Dead process %d left stale sdu.", api); + clean_sdus(rdrb, api); + continue; + } + + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*rdrb->choked) { + ret = pthread_cond_timedwait(rdrb->healthy, + rdrb->lock, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out (dst: %d).", api); + clean_sdus(rdrb, api); + } + } + } + + pthread_cleanup_pop(true); + + return (void *) 0; +} + +void shm_rdrbuff_close(struct shm_rdrbuff * rdrb) +{ + if (rdrb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (close(rdrb->fd) < 0) + LOG_DBGF("Couldn't close shared memory."); + + if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBGF("Couldn't unmap shared memory."); + + free(rdrb); +} + +void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) +{ + char * shm_rdrb_fn; + + if (rdrb == NULL) { + LOG_DBGF("Bogus input. Bugging out."); + return; + } + + if (getpid() != *rdrb->api && kill(*rdrb->api, 0) == 0) { + LOG_DBG("Process %d tried to destroy active rdrb.", getpid()); + return; + } + + if (close(rdrb->fd) < 0) + LOG_DBG("Couldn't close shared memory."); + + if (munmap(rdrb->shm_base, SHM_FILE_SIZE) == -1) + LOG_DBG("Couldn't unmap shared memory."); + + shm_rdrb_fn = rdrb_filename(rdrb->qos); + if (shm_rdrb_fn == NULL) { + LOG_ERR("Could not create rdrbuff. Out of Memory"); + return; + } + + if (shm_unlink(shm_rdrb_fn) == -1) + LOG_DBG("Failed to unlink shm."); + + free(rdrb); + free(shm_rdrb_fn); +} + +ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; +#endif + int sz = size + sizeof *sdb; + uint8_t * write_pos; + ssize_t idx = -1; + + if (rdrb == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK + if (sz > SHM_RDRB_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } +#ifdef SHM_RDRBUFF_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) + padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + + if (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else + if (!shm_rdrb_free(rdrb, 1)) { +#endif + pthread_cond_signal(rdrb->full); + pthread_mutex_unlock(rdrb->lock); + return -1; + } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(rdrb); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *rdrb->ptr_head = 0; + } +#endif + sdb = get_head_ptr(rdrb); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_mutex_unlock(rdrb->lock); + + return idx; +} + +ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; +#endif + int sz = size + sizeof *sdb; + uint8_t * write_pos; + ssize_t idx = -1; + + if (rdrb == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_RDRBUFF_MULTI_BLOCK + if (sz > SHM_RDRB_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rdrb->lock); + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *rdrb->ptr_head > SHM_BUFFER_SIZE) + padblocks = SHM_BUFFER_SIZE - *rdrb->ptr_head; + + while (!shm_rdrb_free(rdrb, (blocks + padblocks))) { +#else + while (!shm_rdrb_free(rdrb, 1)) { +#endif + pthread_cond_signal(rdrb->full); + pthread_cond_wait(rdrb->healthy, rdrb->lock); + } + +#ifdef SHM_RDRBUFF_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(rdrb); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *rdrb->ptr_head = 0; + } +#endif + sdb = get_head_ptr(rdrb); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *rdrb->ptr_head; +#ifdef SHM_RDRBUFF_MULTI_BLOCK + *rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *rdrb->ptr_head = (*rdrb->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_cleanup_pop(true); + + return idx; +} + +int shm_rdrbuff_read(uint8_t ** dst, + struct shm_rdrbuff * rdrb, + ssize_t idx) +{ + size_t len = 0; + struct shm_du_buff * sdb; + + if (idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return -1; + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + len = sdb->du_tail - sdb->du_head; + *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; + + pthread_mutex_unlock(rdrb->lock); + + return len; +} + +int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx) +{ + if (idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + if (shm_rdrb_empty(rdrb)) { + pthread_mutex_unlock(rdrb->lock); + return -1; + } + + idx_to_du_buff_ptr(rdrb, idx)->dst_api = -1; + + if (idx != *rdrb->ptr_tail) { + pthread_mutex_unlock(rdrb->lock); + return 0; + } + + garbage_collect(rdrb); + + *rdrb->choked = 0; + + pthread_cond_broadcast(rdrb->healthy); + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} + +uint8_t * shm_du_buff_head_alloc(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + uint8_t * buf; + + if (rdrb == NULL) + return NULL; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return NULL; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if ((long) (sdb->du_head - size) < 0) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Failed to allocate PCI headspace."); + return NULL; + } + + sdb->du_head -= size; + + buf = (uint8_t *) (sdb + 1) + sdb->du_head; + + pthread_mutex_unlock(rdrb->lock); + + return buf; +} + +uint8_t * shm_du_buff_tail_alloc(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + uint8_t * buf; + + if (rdrb == NULL) + return NULL; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return NULL; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (sdb->du_tail + size >= sdb->size) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Failed to allocate PCI tailspace."); + return NULL; + } + + buf = (uint8_t *) (sdb + 1) + sdb->du_tail; + + sdb->du_tail += size; + + pthread_mutex_unlock(rdrb->lock); + + return buf; +} + +int shm_du_buff_head_release(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + + if (rdrb == NULL) + return -1; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (size > sdb->du_tail - sdb->du_head) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Tried to release beyond sdu boundary."); + return -EOVERFLOW; + } + + sdb->du_head += size; + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} + +int shm_du_buff_tail_release(struct shm_rdrbuff * rdrb, + int idx, + ssize_t size) +{ + struct shm_du_buff * sdb; + + if (rdrb == NULL) + return -1; + + if (idx < 0 || idx > SHM_BUFFER_SIZE) + return -1; + + if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rdrb->lock); + } + + sdb = idx_to_du_buff_ptr(rdrb, idx); + + if (size > sdb->du_tail - sdb->du_head) { + pthread_mutex_unlock(rdrb->lock); + LOG_DBGF("Tried to release beyond sdu boundary."); + return -EOVERFLOW; + } + + sdb->du_tail -= size; + + pthread_mutex_unlock(rdrb->lock); + + return 0; +} |