diff options
author | Sander Vrijders <[email protected]> | 2016-06-16 17:54:48 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-06-16 17:54:48 +0200 |
commit | 1c62c4554cb74e48f676b9d19a49d26eadee72c7 (patch) | |
tree | 8ab671400559eaf5b9bc22deec9ed7370e17635d /src/irmd | |
parent | 4aa4e30c5b4f89da1bab5022424c55f5d8b3f580 (diff) | |
parent | 9513708ddf4eedc03c41a8d07f3dcb6e48f23c0c (diff) | |
download | ouroboros-1c62c4554cb74e48f676b9d19a49d26eadee72c7.tar.gz ouroboros-1c62c4554cb74e48f676b9d19a49d26eadee72c7.zip |
Merged in dstaesse/ouroboros/be-cleanup-flows (pull request #127)
irmd: clean up stale pending flows
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/main.c | 66 |
1 files changed, 60 insertions, 6 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index db96b6ed..c20d63db 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -37,6 +37,7 @@ #include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <ouroboros/rw_lock.h> +#include <ouroboros/time_utils.h> #include "utils.h" @@ -50,13 +51,19 @@ #include <pthread.h> #ifndef IRMD_MAX_FLOWS - #define IRMD_MAX_FLOWS 4096 +#define IRMD_MAX_FLOWS 4096 #endif #ifndef IRMD_THREADPOOL_SIZE - #define IRMD_THREADPOOL_SIZE 3 +#define IRMD_THREADPOOL_SIZE 3 #endif +#ifndef IRMD_FLOW_TIMEOUT +#define IRMD_FLOW_TIMEOUT 5000 /* ms */ +#endif + +#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */ + struct ipcp_entry { struct list_head next; instance_name_t * api; @@ -98,6 +105,7 @@ struct port_map_entry { pthread_mutex_t res_lock; enum flow_state state; + struct timespec t0; }; struct irm { @@ -116,6 +124,8 @@ struct irm { pthread_t * threadpool; int sockfd; rw_lock_t state_lock; + + pthread_t cleanup_flows; } * instance = NULL; static struct port_map_entry * port_map_entry_create() @@ -139,6 +149,9 @@ static struct port_map_entry * port_map_entry_create() return NULL; } + e->t0.tv_sec = 0; + e->t0.tv_nsec = 0; + return e; } @@ -945,6 +958,8 @@ static struct port_map_entry * flow_alloc(pid_t pid, pme->n_pid = pid; pme->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &pme->t0) < 0) + LOG_WARN("Failed to set timestamp."); rw_lock_rdlock(&instance->state_lock); rw_lock_rdlock(&instance->reg_lock); @@ -1314,6 +1329,8 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } + pthread_cancel(instance->cleanup_flows); + rw_lock_unlock(&instance->state_lock); case SIGPIPE: @@ -1323,6 +1340,43 @@ void irmd_sig_handler(int sig, siginfo_t * info, void * c) } } +void * irm_flow_cleaner() +{ + struct timespec now; + struct list_head * pos = NULL; + struct list_head * n = NULL; + struct timespec timeout = {IRMD_CLEANUP_TIMER / BILLION, + IRMD_CLEANUP_TIMER % BILLION}; + + while (true) { + if(clock_gettime(CLOCK_MONOTONIC, &now) < 0) + LOG_WARN("Failed to get time."); + /* cleanup stale PENDING flows */ + rw_lock_rdlock(&instance->state_lock); + + list_for_each_safe(pos, n, &(instance->port_map)) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + pthread_mutex_lock(&e->res_lock); + + if (e->state == FLOW_PENDING && + ts_diff_ms(&e->t0, &now) > IRMD_FLOW_TIMEOUT) { + LOG_DBGF("Flow time exceeded on port ID %d.", + e->port_id); + e->state = FLOW_NULL; + pthread_cond_broadcast(&e->res_signal); + } + + pthread_mutex_unlock(&e->res_lock); + } + + rw_lock_unlock(&instance->state_lock); + + nanosleep(&timeout, NULL); + } +} + void * mainloop() { uint8_t buf[IRM_MSG_BUF_SIZE]; @@ -1579,10 +1633,8 @@ int main() if (instance == NULL) return 1; - /* - * FIXME: we need a main loop that delegates messages to subthreads in a - * way that avoids all possible deadlocks for local apps - */ + + pthread_create(&instance->cleanup_flows, NULL, irm_flow_cleaner, NULL); for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); @@ -1591,6 +1643,8 @@ int main() for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_join(instance->threadpool[t], NULL); + pthread_join(instance->cleanup_flows, NULL); + irm_destroy(instance); return 0; |