diff options
author | dimitri staessens <[email protected]> | 2017-07-16 11:13:02 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-07-16 11:28:26 +0200 |
commit | d1070fcdf36b32a7cdeefc0ca802a1a8973de827 (patch) | |
tree | 00c3d70d7a483e32a9d14e6d8f31dbb132baf229 /src | |
parent | f02ce7519ed794f22f701f3c1562dfb63bd49d72 (diff) | |
download | ouroboros-d1070fcdf36b32a7cdeefc0ca802a1a8973de827.tar.gz ouroboros-d1070fcdf36b32a7cdeefc0ca802a1a8973de827.zip |
lib: Wait for fqueue loop at cdap_del_flow
The enrollment calls dealloc immediately after cdap_del_flow(), but
the CDAP instance may still have that fd in its fqueue loop.
cdap_del_flow will now wait for an fqueue loop to end before
returning, to make sure the flow is not needed anymore.
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/cdap.c | 142 |
1 files changed, 80 insertions, 62 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c index f0db2419..8ebfbec1 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -55,6 +55,10 @@ struct cdap { flow_set_t * set; fqueue_t * fq; + bool proc; + pthread_mutex_t mtx; + pthread_cond_t cond; + size_t n_flows; struct list_head flows; pthread_rwlock_t flows_lock; @@ -279,6 +283,17 @@ static void cdap_rcvd_destroy(struct cdap * instance) pthread_mutex_unlock(&instance->rcvd_lock); } +static void set_proc(struct cdap * instance, + bool status) +{ + pthread_mutex_lock(&instance->mtx); + + instance->proc = status; + pthread_cond_signal(&instance->cond); + + pthread_mutex_unlock(&instance->mtx); +} + static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; @@ -290,7 +305,9 @@ static void * sdu_reader(void * o) buffer_t data; while (flow_event_wait(instance->set, instance->fq, NULL)) { - int fd = fqueue_next(instance->fq); + int fd; + set_proc(instance, true); + fd = fqueue_next(instance->fq); len = flow_read(fd, buf, BUF_SIZE); if (len < 0) continue; @@ -374,6 +391,7 @@ static void * sdu_reader(void * o) } cdap__free_unpacked(msg, NULL); + set_proc(instance, false); } return (void *) 0; @@ -385,87 +403,77 @@ struct cdap * cdap_create() instance = malloc(sizeof(*instance)); if (instance == NULL) - return NULL; + goto fail_malloc; - if (pthread_rwlock_init(&instance->flows_lock, NULL)) { - free(instance); - return NULL; - } + if (pthread_rwlock_init(&instance->flows_lock, NULL)) + goto fail_flows_lock; - if (pthread_mutex_init(&instance->ids_lock, NULL)) { - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (pthread_mutex_init(&instance->ids_lock, NULL)) + goto fail_ids_lock; - if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (pthread_mutex_init(&instance->rcvd_lock, NULL)) + goto fail_rcvd_lock; - if (pthread_rwlock_init(&instance->sent_lock, NULL)) { - pthread_mutex_destroy(&instance->rcvd_lock); - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (pthread_rwlock_init(&instance->sent_lock, NULL)) + goto fail_sent_lock; - if (pthread_cond_init(&instance->rcvd_cond, NULL)) { - pthread_rwlock_destroy(&instance->sent_lock); - pthread_mutex_destroy(&instance->rcvd_lock); - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (pthread_cond_init(&instance->rcvd_cond, NULL)) + goto fail_rcvd_cond; + + if (pthread_mutex_init(&instance->mtx, NULL)) + goto fail_mtx; + + if (pthread_cond_init(&instance->cond, NULL)) + goto fail_cond; instance->ids = bmp_create(IDS_SIZE, 0); - if (instance->ids == NULL) { - pthread_cond_destroy(&instance->rcvd_cond); - pthread_rwlock_destroy(&instance->sent_lock); - pthread_mutex_destroy(&instance->rcvd_lock); - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (instance->ids == NULL) + goto fail_bmp_create; instance->set = flow_set_create(); - if (instance->set == NULL) { - bmp_destroy(instance->ids); - pthread_cond_destroy(&instance->rcvd_cond); - pthread_rwlock_destroy(&instance->sent_lock); - pthread_mutex_destroy(&instance->rcvd_lock); - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - return NULL; - } + if (instance->set == NULL) + goto fail_set_create; instance->fq = fqueue_create(); - if (instance->fq == NULL) { - flow_set_destroy(instance->set); - bmp_destroy(instance->ids); - pthread_cond_destroy(&instance->rcvd_cond); - pthread_rwlock_destroy(&instance->sent_lock); - pthread_mutex_destroy(&instance->rcvd_lock); - pthread_mutex_destroy(&instance->ids_lock); - pthread_rwlock_destroy(&instance->flows_lock); - free(instance); - } + if (instance->fq == NULL) + goto fail_fqueue_create; instance->n_flows = 0; + instance->proc = false; list_head_init(&instance->flows); list_head_init(&instance->sent); list_head_init(&instance->rcvd); - pthread_create(&instance->reader, NULL, sdu_reader, instance); + if (pthread_create(&instance->reader, NULL, sdu_reader, instance)) + goto fail_pthread_create; return instance; + + fail_pthread_create: + fqueue_destroy(instance->fq); + fail_fqueue_create: + flow_set_destroy(instance->set); + fail_set_create: + bmp_destroy(instance->ids); + fail_bmp_create: + pthread_cond_destroy(&instance->cond); + fail_cond: + pthread_mutex_destroy(&instance->mtx); + fail_mtx: + pthread_cond_destroy(&instance->rcvd_cond); + fail_rcvd_cond: + pthread_rwlock_destroy(&instance->sent_lock); + fail_sent_lock: + pthread_mutex_destroy(&instance->rcvd_lock); + fail_rcvd_lock: + pthread_mutex_destroy(&instance->ids_lock); + fail_ids_lock: + pthread_rwlock_destroy(&instance->flows_lock); + fail_flows_lock: + free(instance); + fail_malloc: + return NULL; } int cdap_destroy(struct cdap * instance) @@ -483,6 +491,9 @@ int cdap_destroy(struct cdap * instance) flow_set_destroy(instance->set); + pthread_cond_destroy(&instance->cond); + pthread_mutex_destroy(&instance->mtx); + pthread_rwlock_wrlock(&instance->flows_lock); list_for_each_safe(p,h, &instance->flows) { @@ -557,8 +568,15 @@ int cdap_del_flow(struct cdap * instance, pthread_rwlock_wrlock(&instance->flows_lock); + pthread_mutex_lock(&instance->mtx); + + while (instance->proc) + pthread_cond_wait(&instance->cond, &instance->mtx); + flow_set_del(instance->set, fd); + pthread_mutex_unlock(&instance->mtx); + list_for_each_safe(p, h, &instance->flows) { struct fd_el * e = list_entry(p, struct fd_el, next); if (e->fd == fd) { |