summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-11 13:39:43 +0200
committerdimitri staessens <[email protected]>2016-10-11 13:45:02 +0200
commitc69dd846c5aa2bed4db16961c5774a20cea7f828 (patch)
tree7e9eb9f82c4fab120abc11c4b539d4beb4c19982 /src/lib/shm_ap_rbuff.c
parent99356adf207e0fe81a34ee1acfd8cacc3d2860c7 (diff)
downloadouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.tar.gz
ouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.zip
lib: Track SDUs in the fast path
This will allow to finalize deallocating flows until all SDUs have been processed. Read and write calls will now block when a flow was deallocated. Replaces NULL checks in the fast path with asserts.
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c169
1 files changed, 121 insertions, 48 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index acbc81a6..ede0b7f7 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -39,28 +39,33 @@
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
+#include <assert.h>
#define FN_MAX_CHARS 255
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ + IRMD_MAX_FLOWS * sizeof(int8_t) \
+ + IRMD_MAX_FLOWS * sizeof (ssize_t) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
+ 2 * sizeof (pthread_cond_t))
-#define shm_rbuff_used(rb)((*rb->ptr_head + SHM_BUFFER_SIZE - *rb->ptr_tail) \
+#define shm_rbuff_used(rb)((*rb->head + SHM_BUFFER_SIZE - *rb->tail) \
& (SHM_BUFFER_SIZE - 1))
#define shm_rbuff_free(rb)(shm_rbuff_used(rb) + 1 < SHM_BUFFER_SIZE)
-#define shm_rbuff_empty(rb) (*rb->ptr_head == *rb->ptr_tail)
-#define head_el_ptr(rb) (rb->shm_base + *rb->ptr_head)
-#define tail_el_ptr(rb) (rb->shm_base + *rb->ptr_tail)
+#define shm_rbuff_empty(rb) (*rb->head == *rb->tail)
+#define head_el_ptr(rb) (rb->shm_base + *rb->head)
+#define tail_el_ptr(rb) (rb->shm_base + *rb->tail)
struct shm_ap_rbuff {
- struct rb_entry * shm_base; /* start of entry */
- size_t * ptr_head; /* start of ringbuffer head */
- size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * lock; /* lock all free space in shm */
- pthread_cond_t * add; /* SDU arrived */
- pthread_cond_t * del; /* SDU removed */
- pid_t api; /* api to which this rb belongs */
+ struct rb_entry * shm_base; /* start of entry */
+ size_t * head; /* start of ringbuffer head */
+ size_t * tail; /* start of ringbuffer tail */
+ int8_t * acl; /* start of port_id access table */
+ ssize_t * cntrs; /* start of port_id counters */
+ pthread_mutex_t * lock; /* lock all free space in shm */
+ pthread_cond_t * add; /* SDU arrived */
+ pthread_cond_t * del; /* SDU removed */
+ pid_t api; /* api to which this rb belongs */
int fd;
};
@@ -73,6 +78,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
pthread_condattr_t cattr;
char fn[FN_MAX_CHARS];
mode_t mask;
+ int i;
sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
@@ -125,9 +131,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
- rb->ptr_tail = rb->ptr_head + 1;
- rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
+ rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
+ rb->tail = rb->head + 1;
+ rb->acl = (int8_t *) (rb->tail + 1);
+ rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS);
+ rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS);
rb->add = (pthread_cond_t *) (rb->lock + 1);
rb->del = rb->add + 1;
@@ -143,11 +151,16 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ rb->cntrs[i] = 0;
+ rb->acl[i] = -1;
+ }
+
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->ptr_head = 0;
- *rb->ptr_tail = 0;
+ *rb->head = 0;
+ *rb->tail = 0;
rb->fd = shm_fd;
rb->api = getpid();
@@ -197,9 +210,11 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
}
rb->shm_base = shm_base;
- rb->ptr_head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
- rb->ptr_tail = rb->ptr_head + 1;
- rb->lock = (pthread_mutex_t *) (rb->ptr_tail + 1);
+ rb->head = (size_t *) (rb->shm_base + SHM_BUFFER_SIZE);
+ rb->tail = rb->head + 1;
+ rb->acl = (int8_t *) (rb->tail + 1);
+ rb->cntrs = (ssize_t *) (rb->acl + IRMD_MAX_FLOWS);
+ rb->lock = (pthread_mutex_t *) (rb->cntrs + IRMD_MAX_FLOWS);
rb->add = (pthread_cond_t *) (rb->lock + 1);
rb->del = rb->add + 1;
@@ -211,10 +226,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
- if (rb == NULL) {
- LOG_DBG("Bogus input. Bugging out.");
- return;
- }
+ assert(rb);
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
@@ -225,15 +237,56 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
free(rb);
}
+void shm_ap_rbuff_open_port(struct shm_ap_rbuff * rb, int port_id)
+{
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+
+#ifdef OUROBOROS_CONFIG_DEBUG
+ if (!rb->acl[port_id])
+ LOG_DBG("Trying to open open port.");
+#endif
+ rb->acl[port_id] = 0; /* open */
+
+ pthread_mutex_unlock(rb->lock);
+}
+
+void shm_ap_rbuff_close_port(struct shm_ap_rbuff * rb, int port_id)
+{
+
+ assert(rb);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+#ifdef OUROBOROS_CONFIG_DEBUG
+ if (rb->acl[port_id])
+ LOG_DBG("Trying to close closed port.");
+#endif
+ rb->acl[port_id] = -1;
+
+ pthread_mutex_unlock(rb->lock);
+}
+
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
{
char fn[25];
struct lockfile * lf = NULL;
- if (rb == NULL) {
- LOG_DBG("Bogus input. Bugging out.");
- return;
- }
+ assert(rb);
if (rb->api != getpid()) {
lf = lockfile_open();
@@ -267,8 +320,8 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
{
- if (rb == NULL || e == NULL)
- return -1;
+ assert(rb);
+ assert(e);
#ifdef __APPLE__
pthread_mutex_lock(rb->lock);
@@ -278,6 +331,11 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
pthread_mutex_consistent(rb->lock);
}
#endif
+ if (rb->acl[e->port_id]) {
+ pthread_mutex_unlock(rb->lock);
+ return -ENOTALLOC;
+ }
+
if (!shm_rbuff_free(rb)) {
pthread_mutex_unlock(rb->lock);
return -1;
@@ -287,7 +345,9 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
pthread_cond_broadcast(rb->add);
*head_el_ptr(rb) = *e;
- *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1);
+ *rb->head = (*rb->head + 1) & (SHM_BUFFER_SIZE -1);
+
+ ++rb->cntrs[e->port_id];
pthread_mutex_unlock(rb->lock);
@@ -298,8 +358,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb)
{
int ret = 0;
- if (rb == NULL)
- return -EINVAL;
+ assert(rb);
+
#ifdef __APPLE__
pthread_mutex_lock(rb->lock);
#else
@@ -314,8 +374,8 @@ int shm_ap_rbuff_pop_idx(struct shm_ap_rbuff * rb)
}
ret = tail_el_ptr(rb)->index;
-
- *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_BUFFER_SIZE -1);
+ --rb->cntrs[tail_el_ptr(rb)->port_id];
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_mutex_unlock(rb->lock);
@@ -328,8 +388,7 @@ static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
struct timespec abstime;
int ret = 0;
- if (rb == NULL)
- return -EINVAL;
+ assert(rb);
if (timeout != NULL) {
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
@@ -380,6 +439,8 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
struct timespec abstime;
int ret;
+ assert(rb);
+
if (set == NULL)
return shm_ap_rbuff_peek_b_all(rb, timeout);
@@ -453,12 +514,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
- if (rb == NULL)
- return NULL;
+ assert(rb);
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
-
#ifdef __APPLE__
pthread_mutex_lock(rb->lock);
#else
@@ -478,8 +537,9 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
#endif
e = malloc(sizeof(*e));
if (e != NULL) {
- *e = *(rb->shm_base + *rb->ptr_tail);
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+ *e = *(rb->shm_base + *rb->tail);
+ --rb->cntrs[e->port_id];
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
}
pthread_cleanup_pop(true);
@@ -499,14 +559,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
pthread_mutex_consistent(rb->lock);
}
#endif
+ if (rb->acl[port_id]) {
+ pthread_mutex_unlock(rb->lock);
+ return -ENOTALLOC;
+ }
+
if (shm_rbuff_empty(rb) || tail_el_ptr(rb)->port_id != port_id) {
pthread_mutex_unlock(rb->lock);
return -1;
}
idx = tail_el_ptr(rb)->index;
-
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+ --rb->cntrs[port_id];
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_cond_broadcast(rb->del);
pthread_mutex_unlock(rb->lock);
@@ -522,6 +587,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
int ret = 0;
ssize_t idx = -1;
+ assert(rb);
+
#ifdef __APPLE__
pthread_mutex_lock(rb->lock);
#else
@@ -530,7 +597,13 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
pthread_mutex_consistent(rb->lock);
}
#endif
+ if (rb->acl[port_id]) {
+ pthread_mutex_unlock(rb->lock);
+ return -ENOTALLOC;
+ }
+
if (timeout != NULL) {
+ idx = -ETIMEDOUT;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
ts_add(&abstime, timeout, &abstime);
}
@@ -577,7 +650,8 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
if (ret != ETIMEDOUT) {
idx = tail_el_ptr(rb)->index;
- *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_BUFFER_SIZE -1);
+ --rb->cntrs[port_id];
+ *rb->tail = (*rb->tail + 1) & (SHM_BUFFER_SIZE -1);
pthread_cond_broadcast(rb->del);
}
@@ -589,11 +663,10 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
{
- if (rb == NULL)
- return;
+ assert(rb);
pthread_mutex_lock(rb->lock);
- *rb->ptr_tail = 0;
- *rb->ptr_head = 0;
+ *rb->tail = 0;
+ *rb->head = 0;
pthread_mutex_unlock(rb->lock);
}