From 4931526cf9b5e40294e043deab856f25bf56c7cf Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 3 Aug 2016 13:40:16 +0200 Subject: lib: Revise blocking I/O Blocking I/O now uses condition variables in the shared memory instead of busy waiting. Timeouts can be specified. This requires the size of the rbuffs and du_map to be the same, to guarantee that when the shm_du_map is not full, the ap_rbuffs can't be full either. Added the timeout option to the flow for future use. --- src/lib/shm_du_map.c | 210 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 149 insertions(+), 61 deletions(-) (limited to 'src/lib/shm_du_map.c') diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index b090bb74..a12ef223 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -40,7 +40,7 @@ #include -#define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) +#define SHM_BLOCKS_SIZE (SHM_BUFFER_SIZE * SHM_DU_BUFF_BLOCK_SIZE) #define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) @@ -59,9 +59,9 @@ #define block_ptr_to_idx(dum, sdb) \ (((uint8_t *)sdb - dum->shm_base) / SHM_DU_BUFF_BLOCK_SIZE) -#define shm_map_used(dum)((*dum->ptr_head + SHM_BLOCKS_IN_MAP - *dum->ptr_tail)\ - & (SHM_BLOCKS_IN_MAP - 1)) -#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_used(dum)((*dum->ptr_head + SHM_BUFFER_SIZE - *dum->ptr_tail)\ + & (SHM_BUFFER_SIZE - 1)) +#define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BUFFER_SIZE) #define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) @@ -79,7 +79,7 @@ struct shm_du_map { uint8_t * shm_base; /* start of blocks */ size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ - pthread_mutex_t * shm_mutex; /* lock all free space in shm */ + pthread_mutex_t * lock; /* lock all free space in shm */ size_t * choked; /* stale sdu detection */ pthread_cond_t * healthy; /* du map is healthy */ pthread_cond_t * full; /* run sanitizer when buffer full */ @@ -94,12 +94,12 @@ static void garbage_collect(struct shm_du_map * dum) while ((sdb = get_tail_ptr(dum))->dst_api == -1 && !shm_map_empty(dum)) *dum->ptr_tail = (*dum->ptr_tail + sdb->blocks) - & (SHM_BLOCKS_IN_MAP - 1); + & (SHM_BUFFER_SIZE - 1); #else while (get_tail_ptr(dum)->dst_api == -1 && !shm_map_empty(dum)) *dum->ptr_tail = - (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); + (*dum->ptr_tail + 1) & (SHM_BUFFER_SIZE - 1); #endif } @@ -114,9 +114,9 @@ static void clean_sdus(struct shm_du_map * dum, pid_t api, bool exit) if (buf->dst_api == api) buf->dst_api = -1; #ifdef SHM_DU_MAP_MULTI_BLOCK - idx = (idx + buf->blocks) & (SHM_BLOCKS_IN_MAP - 1); + idx = (idx + buf->blocks) & (SHM_BUFFER_SIZE - 1); #else - idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); + idx = (idx + 1) & (SHM_BUFFER_SIZE - 1); #endif } @@ -194,8 +194,8 @@ struct shm_du_map * shm_du_map_create() dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; - dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); + dum->choked = (size_t *) (dum->lock + 1); dum->healthy = (pthread_cond_t *) (dum->choked + 1); dum->full = dum->healthy + 1; dum->api = (pid_t *) (dum->full + 1); @@ -203,7 +203,7 @@ struct shm_du_map * shm_du_map_create() pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); - pthread_mutex_init(dum->shm_mutex, &mattr); + pthread_mutex_init(dum->lock, &mattr); pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); @@ -261,8 +261,8 @@ struct shm_du_map * shm_du_map_open() dum->ptr_head = (size_t *) ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; - dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->lock = (pthread_mutex_t *) (dum->ptr_tail + 1); + dum->choked = (size_t *) (dum->lock + 1); dum->healthy = (pthread_cond_t *) (dum->choked + 1); dum->full = dum->healthy + 1; dum->api = (pid_t *) (dum->full + 1); @@ -283,23 +283,23 @@ void * shm_du_map_sanitize(void * o) if (dum == NULL) return (void *) -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, - (void *) dum->shm_mutex); + (void *) dum->lock); while (true) { int ret = 0; struct timespec now; struct timespec dl; - if (pthread_cond_wait(dum->full, dum->shm_mutex) + if (pthread_cond_wait(dum->full, dum->lock) == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } *dum->choked = 1; @@ -321,14 +321,14 @@ void * shm_du_map_sanitize(void * o) ts_add(&now, &intv, &dl); while (*dum->choked) { ret = pthread_cond_timedwait(dum->healthy, - dum->shm_mutex, + dum->lock, &dl); if (!ret) continue; if (ret == EOWNERDEAD) { LOG_WARN("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (ret == ETIMEDOUT) { @@ -429,9 +429,9 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, return -1; } #endif - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } #ifdef SHM_DU_MAP_MULTI_BLOCK while (sz > 0) { @@ -439,15 +439,15 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, ++blocks; } - if (blocks + *dum->ptr_head > SHM_BLOCKS_IN_MAP - 1) - padblocks = SHM_BLOCKS_IN_MAP - *dum->ptr_head; + if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) + padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; if (!shm_map_free(dum, (blocks + padblocks))) { #else if (!shm_map_free(dum, 1)) { #endif pthread_cond_signal(dum->full); - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } @@ -477,11 +477,99 @@ ssize_t shm_du_map_write(struct shm_du_map * dum, idx = *dum->ptr_head; #ifdef SHM_DU_MAP_MULTI_BLOCK - *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); #else - *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); #endif - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); + + return idx; +} + +ssize_t shm_du_map_write_b(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) +{ + struct shm_du_buff * sdb; + size_t size = headspace + len + tailspace; +#ifdef SHM_DU_MAP_MULTI_BLOCK + long blocks = 0; + long padblocks = 0; + int sz = size + sizeof *sdb; +#endif + uint8_t * write_pos; + ssize_t idx = -1; + + if (dum == NULL || data == NULL) { + LOG_DBGF("Bogus input, bugging out."); + return -1; + } + +#ifndef SHM_DU_MAP_MULTI_BLOCK + if (sz > SHM_DU_BUFF_BLOCK_SIZE) { + LOG_DBGF("Multi-block SDU's disabled. Dropping."); + return -1; + } +#endif + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->lock); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) dum->lock); + +#ifdef SHM_DU_MAP_MULTI_BLOCK + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + if (blocks + *dum->ptr_head > SHM_BUFFER_SIZE - 1) + padblocks = SHM_BUFFER_SIZE - *dum->ptr_head; + + while (!shm_map_free(dum, (blocks + padblocks))) { +#else + while (!shm_map_free(dum, 1)) { +#endif + pthread_cond_signal(dum->full); + pthread_cond_wait(dum->healthy, dum->lock); + } + +#ifdef SHM_DU_MAP_MULTI_BLOCK + if (padblocks) { + sdb = get_head_ptr(dum); + sdb->size = 0; + sdb->blocks = padblocks; + sdb->dst_api = -1; + sdb->du_head = 0; + sdb->du_tail = 0; + + *dum->ptr_head = 0; + } +#endif + sdb = get_head_ptr(dum); + sdb->size = size; + sdb->dst_api = dst_api; + sdb->du_head = headspace; + sdb->du_tail = sdb->du_head + len; +#ifdef SHM_DU_MAP_MULTI_BLOCK + sdb->blocks = blocks; +#endif + write_pos = ((uint8_t *) (sdb + 1)) + headspace; + + memcpy(write_pos, data, len); + + idx = *dum->ptr_head; +#ifdef SHM_DU_MAP_MULTI_BLOCK + *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1); +#else + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BUFFER_SIZE - 1); +#endif + pthread_cleanup_pop(true); return idx; } @@ -493,16 +581,16 @@ int shm_du_map_read(uint8_t ** dst, size_t len = 0; struct shm_du_buff * sdb; - if (idx > SHM_BLOCKS_IN_MAP) + if (idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } @@ -510,30 +598,30 @@ int shm_du_map_read(uint8_t ** dst, len = sdb->du_tail - sdb->du_head; *dst = ((uint8_t *) (sdb + 1)) + sdb->du_head; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return len; } int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) { - if (idx > SHM_BLOCKS_IN_MAP) + if (idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } if (shm_map_empty(dum)) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return -1; } idx_to_du_buff_ptr(dum, idx)->dst_api = -1; if (idx != *dum->ptr_tail) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -541,9 +629,9 @@ int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) *dum->choked = 0; - pthread_cond_signal(dum->healthy); + pthread_cond_broadcast(dum->healthy); - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -558,18 +646,18 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, if (dum == NULL) return NULL; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return NULL; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if ((long) (sdb->du_head - size) < 0) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Failed to allocate PCI headspace."); return NULL; } @@ -578,7 +666,7 @@ uint8_t * shm_du_buff_head_alloc(struct shm_du_map * dum, buf = (uint8_t *) (sdb + 1) + sdb->du_head; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return buf; } @@ -593,18 +681,18 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, if (dum == NULL) return NULL; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return NULL; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (sdb->du_tail + size >= sdb->size) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Failed to allocate PCI tailspace."); return NULL; } @@ -613,7 +701,7 @@ uint8_t * shm_du_buff_tail_alloc(struct shm_du_map * dum, sdb->du_tail += size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return buf; } @@ -627,25 +715,25 @@ int shm_du_buff_head_release(struct shm_du_map * dum, if (dum == NULL) return -1; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Tried to release beyond sdu boundary."); return -EOVERFLOW; } sdb->du_head += size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } @@ -659,25 +747,25 @@ int shm_du_buff_tail_release(struct shm_du_map * dum, if (dum == NULL) return -1; - if (idx < 0 || idx > SHM_BLOCKS_IN_MAP) + if (idx < 0 || idx > SHM_BUFFER_SIZE) return -1; - if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + if (pthread_mutex_lock(dum->lock) == EOWNERDEAD) { LOG_DBGF("Recovering dead mutex."); - pthread_mutex_consistent(dum->shm_mutex); + pthread_mutex_consistent(dum->lock); } sdb = idx_to_du_buff_ptr(dum, idx); if (size > sdb->du_tail - sdb->du_head) { - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); LOG_DBGF("Tried to release beyond sdu boundary."); return -EOVERFLOW; } sdb->du_tail -= size; - pthread_mutex_unlock(dum->shm_mutex); + pthread_mutex_unlock(dum->lock); return 0; } -- cgit v1.2.3