summaryrefslogtreecommitdiff
path: root/src/lib/shm_du_map.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-08-03 13:40:16 +0200
committerdimitri staessens <[email protected]>2016-08-03 13:40:16 +0200
commit4931526cf9b5e40294e043deab856f25bf56c7cf (patch)
treefff3eeadb6eb04edee21340ecdcdfc13da3115b4 /src/lib/shm_du_map.c
parentca494922f3815077efbcd28da3748df38c8a6961 (diff)
downloadouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.tar.gz
ouroboros-4931526cf9b5e40294e043deab856f25bf56c7cf.zip
lib: Revise blocking I/O
Blocking I/O now uses condition variables in the shared memory instead of busy waiting. Timeouts can be specified. This requires the size of the rbuffs and du_map to be the same, to guarantee that when the shm_du_map is not full, the ap_rbuffs can't be full either. Added the timeout option to the flow for future use.
Diffstat (limited to 'src/lib/shm_du_map.c')
-rw-r--r--src/lib/shm_du_map.c210
1 files changed, 149 insertions, 61 deletions
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index b090bb74..a12ef223 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -40,7 +40,7 @@
#include <ouroboros/logs.h>
-#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE)
+#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE)
#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \
+ sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \
+ sizeof(pid_t))
@@ -59,9 +59,9 @@
#define block_ptr_to_idx(dum, sdb) \
(((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE)
-#define shm_map_used(dum)((*dum->ptr_head + SHM_BLOCKS_IN_MAP - *dum->ptr_tail)\
- & (SHM_BLOCKS_IN_MAP - 1))
-#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP)
+#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\
+ & (SHM_BUFFER_SIZE - 1))
+#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE)
#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head)
@@ -79,7 +79,7 @@ struct shm_du_map {
uint8_t * shm_base; /* start of blocks */
size_t * ptr_head; /* start of ringbuffer head */
size_t * ptr_tail; /* start of ringbuffer tail */
- pthread_mutex_t * shm_mutex; /* lock all free space in shm */
+ pthread_mutex_t * lock; /* lock all free space in shm */
size_t * choked; /* stale sdu detection */
pthread_cond_t * healthy; /* du map is healthy */
pthread_cond_t * full; /* run sanitizer when buffer full */
@@ -94,12 +94,12 @@ static void garbage_collect(struct shm_du_map * dum)
while ((sdb = get_tail_ptr(dum))->dst_api == -1 &&
!shm_map_empty(dum))
*dum->ptr_tail = (*dum->ptr_tail + sdb->blocks)
- & (SHM_BLOCKS_IN_MAP - 1);
+ & (SHM_BUFFER_SIZE - 1);
#else
while (get_tail_ptr(dum)->dst_api == -1 &&
!shm_map_empty(dum))
*dum->ptr_tail =
- (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1);
#endif
}
@@ -114,9 +114,9 @@ static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit)
if (buf->dst_api == api)
buf->dst_api = -1;
#ifdef SHM_DU_MAP_MULTI_BLOCK
- idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1);
+ idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1);
#else
- idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ idx = (idx + 1) & (SHM_BUFFER_SIZE - 1);
#endif
}
@@ -194,8 +194,8 @@ struct shm_du_map * shm_du_map_create()
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
- dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
+ dum->choked = (size_t *) (dum->lock + 1);
dum->healthy = (pthread_cond_t *) (dum->choked + 1);
dum->full = dum->healthy + 1;
dum->api = (pid_t *) (dum->full + 1);
@@ -203,7 +203,7 @@ struct shm_du_map * shm_du_map_create()
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
- pthread_mutex_init(dum->shm_mutex, &mattr);
+ pthread_mutex_init(dum->lock, &mattr);
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
@@ -261,8 +261,8 @@ struct shm_du_map * shm_du_map_open()
dum->ptr_head = (size_t *)
((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE);
dum->ptr_tail = dum->ptr_head + 1;
- dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1);
- dum->choked = (size_t *) (dum->shm_mutex + 1);
+ dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1);
+ dum->choked = (size_t *) (dum->lock + 1);
dum->healthy = (pthread_cond_t *) (dum->choked + 1);
dum->full = dum->healthy + 1;
dum->api = (pid_t *) (dum->full + 1);
@@ -283,23 +283,23 @@ void * shm_du_map_sanitize(void * o)
if (dum == NULL)
return (void *) -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock,
- (void *) dum->shm_mutex);
+ (void *) dum->lock);
while (true) {
int ret = 0;
struct timespec now;
struct timespec dl;
- if (pthread_cond_wait(dum->full, dum->shm_mutex)
+ if (pthread_cond_wait(dum->full, dum->lock)
== EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
*dum->choked = 1;
@@ -321,14 +321,14 @@ void * shm_du_map_sanitize(void * o)
ts_add(&now, &intv, &dl);
while (*dum->choked) {
ret = pthread_cond_timedwait(dum->healthy,
- dum->shm_mutex,
+ dum->lock,
&dl);
if (!ret)
continue;
if (ret == EOWNERDEAD) {
LOG_WARN("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (ret == ETIMEDOUT) {
@@ -429,9 +429,9 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
return -1;
}
#endif
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
#ifdef SHM_DU_MAP_MULTI_BLOCK
while (sz > 0) {
@@ -439,15 +439,15 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
++blocks;
}
- if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1)
- padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head;
+ if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1)
+ padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
if (!shm_map_free(dum, (blocks + padblocks))) {
#else
if (!shm_map_free(dum, 1)) {
#endif
pthread_cond_signal(dum->full);
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
@@ -477,11 +477,99 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
idx = *dum->ptr_head;
#ifdef SHM_DU_MAP_MULTI_BLOCK
- *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
- *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1);
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
#endif
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
+
+ return idx;
+}
+
+ssize_t shm_du_map_write_b(struct shm_du_map * dum,
+ pid_t dst_api,
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
+{
+ struct shm_du_buff * sdb;
+ size_t size = headspace + len + tailspace;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ long blocks = 0;
+ long padblocks = 0;
+ int sz = size + sizeof *sdb;
+#endif
+ uint8_t * write_pos;
+ ssize_t idx = -1;
+
+ if (dum == NULL || data == NULL) {
+ LOG_DBGF("Bogus input, bugging out.");
+ return -1;
+ }
+
+#ifndef SHM_DU_MAP_MULTI_BLOCK
+ if (sz > SHM_DU_BUFF_BLOCK_SIZE) {
+ LOG_DBGF("Multi-block SDU's disabled. Dropping.");
+ return -1;
+ }
+#endif
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(dum->lock);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) dum->lock);
+
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ while (sz > 0) {
+ sz -= SHM_DU_BUFF_BLOCK_SIZE;
+ ++blocks;
+ }
+
+ if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1)
+ padblocks = SHM_BUFFER_SIZE - *dum->ptr_head;
+
+ while (!shm_map_free(dum, (blocks + padblocks))) {
+#else
+ while (!shm_map_free(dum, 1)) {
+#endif
+ pthread_cond_signal(dum->full);
+ pthread_cond_wait(dum->healthy, dum->lock);
+ }
+
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ if (padblocks) {
+ sdb = get_head_ptr(dum);
+ sdb->size = 0;
+ sdb->blocks = padblocks;
+ sdb->dst_api = -1;
+ sdb->du_head = 0;
+ sdb->du_tail = 0;
+
+ *dum->ptr_head = 0;
+ }
+#endif
+ sdb = get_head_ptr(dum);
+ sdb->size = size;
+ sdb->dst_api = dst_api;
+ sdb->du_head = headspace;
+ sdb->du_tail = sdb->du_head + len;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ sdb->blocks = blocks;
+#endif
+ write_pos = ((uint8_t *) (sdb + 1)) + headspace;
+
+ memcpy(write_pos, data, len);
+
+ idx = *dum->ptr_head;
+#ifdef SHM_DU_MAP_MULTI_BLOCK
+ *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
+#else
+ *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1);
+#endif
+ pthread_cleanup_pop(true);
return idx;
}
@@ -493,16 +581,16 @@ int shm_du_map_read(uint8_t ** dst,
size_t len = 0;
struct shm_du_buff * sdb;
- if (idx > SHM_BLOCKS_IN_MAP)
+ if (idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
@@ -510,30 +598,30 @@ int shm_du_map_read(uint8_t ** dst,
len = sdb->du_tail - sdb->du_head;
*dst = ((uint8_t *) (sdb + 1)) + sdb->du_head;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return len;
}
int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
{
- if (idx > SHM_BLOCKS_IN_MAP)
+ if (idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
if (shm_map_empty(dum)) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return -1;
}
idx_to_du_buff_ptr(dum, idx)->dst_api = -1;
if (idx != *dum->ptr_tail) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -541,9 +629,9 @@ int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx)
*dum->choked = 0;
- pthread_cond_signal(dum->healthy);
+ pthread_cond_broadcast(dum->healthy);
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -558,18 +646,18 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,
if (dum == NULL)
return NULL;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return NULL;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if ((long) (sdb->du_head - size) < 0) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Failed to allocate PCI headspace.");
return NULL;
}
@@ -578,7 +666,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum,
buf = (uint8_t *) (sdb + 1) + sdb->du_head;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return buf;
}
@@ -593,18 +681,18 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,
if (dum == NULL)
return NULL;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return NULL;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (sdb->du_tail + size >= sdb->size) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Failed to allocate PCI tailspace.");
return NULL;
}
@@ -613,7 +701,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum,
sdb->du_tail += size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return buf;
}
@@ -627,25 +715,25 @@ int shm_du_buff_head_release(struct shm_du_map * dum,
if (dum == NULL)
return -1;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Tried to release beyond sdu boundary.");
return -EOVERFLOW;
}
sdb->du_head += size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}
@@ -659,25 +747,25 @@ int shm_du_buff_tail_release(struct shm_du_map * dum,
if (dum == NULL)
return -1;
- if (idx < 0 || idx > SHM_BLOCKS_IN_MAP)
+ if (idx < 0 || idx > SHM_BUFFER_SIZE)
return -1;
- if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) {
+ if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) {
LOG_DBGF("Recovering dead mutex.");
- pthread_mutex_consistent(dum->shm_mutex);
+ pthread_mutex_consistent(dum->lock);
}
sdb = idx_to_du_buff_ptr(dum, idx);
if (size > sdb->du_tail - sdb->du_head) {
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
LOG_DBGF("Tried to release beyond sdu boundary.");
return -EOVERFLOW;
}
sdb->du_tail -= size;
- pthread_mutex_unlock(dum->shm_mutex);
+ pthread_mutex_unlock(dum->lock);
return 0;
}