diff options
author | dimitri staessens <[email protected]> | 2016-06-30 23:14:14 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-07-02 19:11:12 +0200 |
commit | 79475a4742bc28e1737044f2300bcb601e6e10bf (patch) | |
tree | cd79dba391c0ded80125836069d8187a22f7e5f5 /src/lib/shm_du_map.c | |
parent | d85f211d53a0cb35a756d0c44a2b28807eff4e5d (diff) | |
download | ouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.tar.gz ouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.zip |
lib: robust locking in shared memory and crash recovery
This PR enhances the shared memory providing recovery if a process
crashes. It adds a SHM_DU_TIMEOUT_MICROS variable, setting an
expiration time for SDU's when shared memory is full. If an
application doesn't read a blocking SDU within this time, the shared
memory will be cleansed of all SDU's for this application and the
application's rbuff will be cleared.
Some refactoring of the API's. Fixed wrong pthread checks in IRMd.
Fixes #13
Fixes #14
Diffstat (limited to 'src/lib/shm_du_map.c')
-rw-r--r-- | src/lib/shm_du_map.c | 257 |
1 files changed, 194 insertions, 63 deletions
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c index 2a316265..24adac1a 100644 --- a/src/lib/shm_du_map.c +++ b/src/lib/shm_du_map.c @@ -23,11 +23,14 @@ #include <ouroboros/config.h> #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/time_utils.h> #include <pthread.h> #include <sys/mman.h> #include <fcntl.h> #include <stdlib.h> #include <string.h> +#include <signal.h> #include <sys/stat.h> #define OUROBOROS_PREFIX "shm_du_map" @@ -35,8 +38,8 @@ #include <ouroboros/logs.h> #define SHM_BLOCKS_SIZE (SHM_BLOCKS_IN_MAP * SHM_DU_BUFF_BLOCK_SIZE) -#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof (size_t) \ - + sizeof(pthread_mutex_t) + sizeof(pthread_cond_t) \ +#define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 3 * sizeof (size_t) \ + + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) #define get_head_ptr(dum) \ @@ -57,6 +60,8 @@ & (SHM_BLOCKS_IN_MAP - 1)) #define shm_map_free(dum, i)(shm_map_used(dum) + i < SHM_BLOCKS_IN_MAP) +#define shm_map_empty(dum) (*dum->ptr_tail == *dum->ptr_head) + #define sdu_size(dum, idx) (idx_to_du_buff_ptr(dum, idx)->du_tail - \ idx_to_du_buff_ptr(dum, idx)->du_head) @@ -66,7 +71,7 @@ struct shm_du_buff { size_t size; size_t du_head; size_t du_tail; - size_t garbage; + pid_t dst_api; }; struct shm_du_map { @@ -74,11 +79,80 @@ struct shm_du_map { size_t * ptr_head; /* start of ringbuffer head */ size_t * ptr_tail; /* start of ringbuffer tail */ pthread_mutex_t * shm_mutex; /* lock all free space in shm */ - pthread_cond_t * sanitize; /* run sanitizer when buffer full */ + size_t * choked; /* stale sdu detection */ + pthread_cond_t * healthy; /* du map is healthy */ + pthread_cond_t * full; /* run sanitizer when buffer full */ pid_t * api; /* api of the irmd owner */ int fd; }; +static void garbage_collect(struct shm_du_map * dum) +{ +#ifndef SHM_MAP_SINGLE_BLOCK + long sz; + long blocks; +#endif + while (get_tail_ptr(dum)->dst_api == 0 && + !shm_map_empty(dum)) { +#ifndef SHM_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + *dum->ptr_tail = + (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); + +#else + *dum->ptr_tail = + (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } +} + +static void clean_sdus(struct shm_du_map * dum, pid_t api) +{ + size_t idx = *dum->ptr_tail; + struct shm_du_buff * buf; +#ifndef SHM_DU_MAP_SINGLE_BLOCK + long sz; + long blocks = 0; +#endif + + while (idx != *dum->ptr_head) { + buf = idx_to_du_buff_ptr(dum, idx); + if (buf->dst_api == api) + buf->dst_api = 0; + +#ifndef SHM_DU_MAP_SINGLE_BLOCK + blocks = 0; + sz = get_tail_ptr(dum)->size + (long) sizeof(struct shm_du_buff); + while (sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + ++blocks; + } + + idx = (idx + blocks) & (SHM_BLOCKS_IN_MAP - 1); +#else + idx = (idx + 1) & (SHM_BLOCKS_IN_MAP - 1); +#endif + } + + garbage_collect(dum); + + if (kill(api, 0) == 0) { + struct shm_ap_rbuff * rb; + rb = shm_ap_rbuff_open(api); + shm_ap_rbuff_reset(rb); + shm_ap_rbuff_close(rb); + } + + *dum->choked = 0; +} + struct shm_du_map * shm_du_map_create() { struct shm_du_map * dum; @@ -140,8 +214,10 @@ struct shm_du_map * shm_du_map_create() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); pthread_mutexattr_init(&mattr); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); @@ -150,11 +226,14 @@ struct shm_du_map * shm_du_map_create() pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(dum->sanitize, &cattr); + pthread_cond_init(dum->full, &cattr); + pthread_cond_init(dum->healthy, &cattr); *dum->ptr_head = 0; *dum->ptr_tail = 0; + *dum->choked = 0; + *dum->api = getpid(); dum->fd = shm_fd; @@ -202,8 +281,10 @@ struct shm_du_map * shm_du_map_open() ((uint8_t *) dum->shm_base + SHM_BLOCKS_SIZE); dum->ptr_tail = dum->ptr_head + 1; dum->shm_mutex = (pthread_mutex_t *) (dum->ptr_tail + 1); - dum->sanitize = (pthread_cond_t *) (dum->shm_mutex + 1); - dum->api = (pid_t *) (dum->sanitize + 1); + dum->choked = (size_t *) (dum->shm_mutex + 1); + dum->healthy = (pthread_cond_t *) (dum->choked + 1); + dum->full = dum->healthy + 1; + dum->api = (pid_t *) (dum->full + 1); dum->fd = shm_fd; @@ -212,12 +293,73 @@ struct shm_du_map * shm_du_map_open() pid_t shm_du_map_owner(struct shm_du_map * dum) { + if (dum == NULL) + return 0; + return *dum->api; } void * shm_du_map_sanitize(void * o) { - LOG_MISSING; + struct shm_du_map * dum = (struct shm_du_map *) o; + struct timespec intv + = {SHM_DU_TIMEOUT_MICROS / MILLION, + (SHM_DU_TIMEOUT_MICROS % MILLION) * 1000}; + + pid_t api; + + if (dum == NULL) + return (void *) -1; + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, + (void *) dum->shm_mutex); + + while (true) { + int ret = 0; + if (pthread_cond_wait(dum->full, dum->shm_mutex) + == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + *dum->choked = 1; + + api = get_tail_ptr(dum)->dst_api; + + if (kill(api, 0) == 0) { + struct timespec now; + struct timespec dl; + clock_gettime(CLOCK_REALTIME, &now); + ts_add(&now, &intv, &dl); + while (*dum->choked) { + ret = pthread_cond_timedwait(dum->healthy, + dum->shm_mutex, + &dl); + if (!ret) + continue; + + if (ret == EOWNERDEAD) { + LOG_WARN("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } + + if (ret == ETIMEDOUT) { + LOG_DBGF("SDU timed out."); + clean_sdus(dum, api); + } + } + } else { + LOG_DBGF("Dead process %d left stale sdu. sg %d", api,ret); + clean_sdus(dum, api); + } + } + + pthread_cleanup_pop(true); + return (void *) 0; } @@ -256,21 +398,23 @@ void shm_du_map_destroy(struct shm_du_map * dum) free(dum); } -ssize_t shm_create_du_buff(struct shm_du_map * dum, - size_t size, - size_t headspace, - uint8_t * data, - size_t len) +ssize_t shm_du_map_write(struct shm_du_map * dum, + pid_t dst_api, + size_t headspace, + size_t tailspace, + uint8_t * data, + size_t len) { struct shm_du_buff * sdb; #ifndef SHM_MAP_SINGLE_BLOCK long blocks = 0; - int sz = size + sizeof *sdb; - int sz2 = headspace + len + sizeof *sdb; + size_t size = headspace + len + tailspace; + int sz = headspace + len + sizeof *sdb; + int sz2 = sz + tailspace; size_t copy_len; #endif uint8_t * write_pos; - ssize_t index; + ssize_t index = -1; if (dum == NULL || data == NULL) { LOG_DBGF("Bogus input, bugging out."); @@ -294,36 +438,35 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, return -1; } - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } #ifndef SHM_MAP_SINGLE_BLOCK - while (sz > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; + while (sz2 > 0) { sz2 -= SHM_DU_BUFF_BLOCK_SIZE; - if (sz2 < 0 && sz > 0) { + sz -= SHM_DU_BUFF_BLOCK_SIZE; + if (sz < 0 && sz2 > 0) { pthread_mutex_unlock(dum->shm_mutex); - LOG_DBG("Can't handle this packet now"); - return -1; + LOG_DBG("Can't handle this packet now."); + return -EAGAIN; } ++blocks; } if (!shm_map_free(dum, blocks)) { - pthread_mutex_unlock(dum->shm_mutex); - pthread_cond_signal(dum->sanitize); - return -1; - } #else if (!shm_map_free(dum, 1)) { +#endif + pthread_cond_signal(dum->full); pthread_mutex_unlock(dum->shm_mutex); - ptrhead_cond_signal(dum->sanitize); return -1; } -#endif sdb = get_head_ptr(dum); sdb->size = size; - sdb->garbage = 0; + sdb->dst_api = dst_api; sdb->du_head = headspace; sdb->du_tail = sdb->du_head + len; @@ -333,7 +476,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE - headspace - sizeof *sdb); while (blocks > 0) { memcpy(write_pos, data, copy_len); - *(dum->ptr_head) = (*dum->ptr_head + 1) + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); len -= copy_len; copy_len = MIN(len, SHM_DU_BUFF_BLOCK_SIZE); @@ -346,7 +489,7 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, #else memcpy(write_pos, data, len); index = *dum->ptr_head; - *(dum->ptr_head) = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); + *dum->ptr_head = (*dum->ptr_head + 1) & (SHM_BLOCKS_IN_MAP - 1); #endif pthread_mutex_unlock(dum->shm_mutex); @@ -354,18 +497,21 @@ ssize_t shm_create_du_buff(struct shm_du_map * dum, } /* FIXME: this cannot handle packets stretching beyond the ringbuffer border */ -int shm_du_map_read_sdu(uint8_t ** dst, - struct shm_du_map * dum, - ssize_t idx) +int shm_du_map_read(uint8_t ** dst, + struct shm_du_map * dum, + ssize_t idx) { size_t len = 0; if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } @@ -380,47 +526,32 @@ int shm_du_map_read_sdu(uint8_t ** dst, return len; } -int shm_release_du_buff(struct shm_du_map * dum, ssize_t idx) +int shm_du_map_remove(struct shm_du_map * dum, ssize_t idx) { -#ifndef SHM_MAP_SINGLE_BLOCK - long sz; - long blocks = 0; -#endif if (idx > SHM_BLOCKS_IN_MAP) return -1; - pthread_mutex_lock(dum->shm_mutex); + if (pthread_mutex_lock(dum->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(dum->shm_mutex); + } - if (*dum->ptr_head == *dum->ptr_tail) { + if (shm_map_empty(dum)) { pthread_mutex_unlock(dum->shm_mutex); return -1; } - idx_to_du_buff_ptr(dum, idx)->garbage = 1; + idx_to_du_buff_ptr(dum, idx)->dst_api = 0; if (idx != *dum->ptr_tail) { pthread_mutex_unlock(dum->shm_mutex); return 0; } - while (get_tail_ptr(dum)->garbage == 1 && - *dum->ptr_tail != *dum->ptr_head) { -#ifndef SHM_MAP_SINGLE_BLOCK - sz = get_tail_ptr(dum)->size; - while (sz + (long) sizeof(struct shm_du_buff) > 0) { - sz -= SHM_DU_BUFF_BLOCK_SIZE; - ++blocks; - } + garbage_collect(dum); - *(dum->ptr_tail) = - (*dum->ptr_tail + blocks) & (SHM_BLOCKS_IN_MAP - 1); - - blocks = 0; -#else - *(dum->ptr_tail) = - (*dum->ptr_tail + 1) & (SHM_BLOCKS_IN_MAP - 1); -#endif - } + *dum->choked = 0; + pthread_cond_signal(dum->healthy); pthread_mutex_unlock(dum->shm_mutex); |