diff options
author | dimitri staessens <[email protected]> | 2016-06-30 23:14:14 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-07-02 19:11:12 +0200 |
commit | 79475a4742bc28e1737044f2300bcb601e6e10bf (patch) | |
tree | cd79dba391c0ded80125836069d8187a22f7e5f5 /src/lib/shm_ap_rbuff.c | |
parent | d85f211d53a0cb35a756d0c44a2b28807eff4e5d (diff) | |
download | ouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.tar.gz ouroboros-79475a4742bc28e1737044f2300bcb601e6e10bf.zip |
lib: robust locking in shared memory and crash recovery
This PR enhances the shared memory providing recovery if a process
crashes. It adds a SHM_DU_TIMEOUT_MICROS variable, setting an
expiration time for SDU's when shared memory is full. If an
application doesn't read a blocking SDU within this time, the shared
memory will be cleansed of all SDU's for this application and the
application's rbuff will be cleared.
Some refactoring of the API's. Fixed wrong pthread checks in IRMd.
Fixes #13
Fixes #14
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 70 |
1 files changed, 63 insertions, 7 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 69e96c40..f54627b7 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -26,6 +26,7 @@ #include <ouroboros/logs.h> #include <ouroboros/shm_ap_rbuff.h> +#include <ouroboros/shm_du_map.h> #include <pthread.h> #include <sys/mman.h> @@ -34,7 +35,6 @@ #include <string.h> #include <stdint.h> #include <unistd.h> -#include <stdbool.h> #include <errno.h> #include <sys/stat.h> @@ -127,6 +127,7 @@ struct shm_ap_rbuff * shm_ap_rbuff_create() rb->work = (pthread_cond_t *) (rb->shm_mutex + 1); pthread_mutexattr_init(&mattr); + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(rb->shm_mutex, &mattr); @@ -213,6 +214,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb) void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) { char fn[25]; + struct shm_du_map * dum = NULL; if (rb == NULL) { LOG_DBGF("Bogus input. Bugging out."); @@ -220,8 +222,17 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb) } if (rb->api != getpid()) { - LOG_ERR("Tried to destroy other AP's rbuff."); - return; + dum = shm_du_map_open(); + if (shm_du_map_owner(dum) == getpid()) { + LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.", + rb->api, getpid()); + shm_du_map_close(dum); + } else { + LOG_ERR("AP-I %d tried to destroy rbuff owned by %d.", + getpid(), rb->api); + shm_du_map_close(dum); + return; + } } if (close(rb->fd) < 0) @@ -243,12 +254,16 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e) if (rb == NULL || e == NULL) return -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (!shm_rbuff_free(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + if (shm_rbuff_empty(rb)) pthread_cond_broadcast(rb->work); @@ -269,10 +284,21 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->shm_mutex); - pthread_mutex_lock(rb->shm_mutex); + + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } + + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); while(shm_rbuff_empty(rb)) - pthread_cond_wait(rb->work, rb->shm_mutex); + if (pthread_cond_wait(rb->work, rb->shm_mutex) + == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } e = malloc(sizeof(*e)); if (e == NULL) { @@ -293,13 +319,19 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) { ssize_t idx = -1; - pthread_mutex_lock(rb->shm_mutex); + if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) { + LOG_DBGF("Recovering dead mutex."); + pthread_mutex_consistent(rb->shm_mutex); + } if (shm_rbuff_empty(rb)) { pthread_mutex_unlock(rb->shm_mutex); return -1; } + while (tail_el_ptr->port_id < 0) + *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); + if (tail_el_ptr->port_id != port_id) { pthread_mutex_unlock(rb->shm_mutex); return -1; @@ -313,3 +345,27 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } + +pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb) +{ + pid_t api = 0; + if (rb == NULL) + return 0; + + pthread_mutex_lock(rb->shm_mutex); + api = rb->api; + pthread_mutex_unlock(rb->shm_mutex); + + return api; +} + +void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb) +{ + if (rb == NULL) + return; + + pthread_mutex_lock(rb->shm_mutex); + *rb->ptr_tail = 0; + *rb->ptr_head = 0; + pthread_mutex_unlock(rb->shm_mutex); +} |