diff options
author | dimitri staessens <[email protected]> | 2017-09-30 17:58:18 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-09-30 17:58:18 +0200 |
commit | 9405ad97e20686f74c06bcbac9523a8b4f10272e (patch) | |
tree | a0489929634ee7588de3ad77a6a1166ce11508e2 /src/ipcpd | |
parent | 5e974395fadc5e1922f200855c14ca0538ba50dc (diff) | |
download | ouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.tar.gz ouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.zip |
lib: Cancel tpm threads instead of marking exit
This makes the threadpool use pthread_cancel instead of setting an
exit flag that threadpool managed threads check periodically. This
drastically reduces CPU consumption in the irmd when running a lot of
applications. It requires cancellation handlers in the ipcp and irmd
to be implemented to ensure safe cancellation during operation and
shutdown.
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/ipcp.c | 59 | ||||
-rw-r--r-- | src/ipcpd/normal/dht.c | 113 |
2 files changed, 94 insertions, 78 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 513c638a..9f4d9eea 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -159,6 +159,16 @@ static void * acceptloop(void * o) return (void *) 0; } +static void close_ptr(void * o) +{ + close(*((int *) o)); +} + +static void free_msg(void * o) +{ + ipcp_msg__free_unpacked((ipcp_msg_t *) o, NULL); +} + static void * mainloop(void * o) { int sfd; @@ -167,14 +177,10 @@ static void * mainloop(void * o) struct dif_info info; ipcp_config_msg_t * conf_msg; ipcp_msg_t * msg; - struct timespec dl; - struct timespec to = {(IPCP_ACCEPT_TIMEOUT / 1000), - (IPCP_ACCEPT_TIMEOUT % 1000) * MILLION}; (void) o; while (true) { - int ret = 0; ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_info_msg_t dif_info = DIF_INFO_MSG__INIT; int fd = -1; @@ -182,27 +188,18 @@ static void * mainloop(void * o) ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; - clock_gettime(PTHREAD_COND_CLOCK, &dl); - ts_add(&dl, &to, &dl); - pthread_mutex_lock(&ipcpi.cmd_lock); - while (list_is_empty(&ipcpi.cmds) && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&ipcpi.cmd_cond, - &ipcpi.cmd_lock, - &dl); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &ipcpi.cmd_lock); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(&ipcpi.cmd_lock); - if (tpm_check(ipcpi.tpm)) - break; - continue; - } + while (list_is_empty(&ipcpi.cmds)) + pthread_cond_wait(&ipcpi.cmd_cond, &ipcpi.cmd_lock); cmd = list_last_entry(&ipcpi.cmds, struct cmd, next); list_del(&cmd->next); - pthread_mutex_unlock(&ipcpi.cmd_lock); + pthread_cleanup_pop(true); msg = ipcp_msg__unpack(NULL, cmd->len, cmd->cbuf); sfd = cmd->fd; @@ -216,6 +213,9 @@ static void * mainloop(void * o) tpm_dec(ipcpi.tpm); + pthread_cleanup_push(close_ptr, &sfd); + pthread_cleanup_push(free_msg, msg); + switch (msg->code) { case IPCP_MSG_CODE__IPCP_BOOTSTRAP: ret_msg.has_result = true; @@ -482,7 +482,8 @@ static void * mainloop(void * o) break; } - ipcp_msg__free_unpacked(msg, NULL); + pthread_cleanup_pop(true); + pthread_cleanup_pop(false); buffer.len = ipcp_msg__get_packed_size(&ret_msg); if (buffer.len == 0) { @@ -502,22 +503,17 @@ static void * mainloop(void * o) ipcp_msg__pack(&ret_msg, buffer.data); - if (write(sfd, buffer.data, buffer.len) == -1) { - log_err("Failed to send reply message"); - free(buffer.data); - close(sfd); - tpm_inc(ipcpi.tpm); - continue; - } + pthread_cleanup_push(close_ptr, &sfd); + + if (write(sfd, buffer.data, buffer.len) == -1) + log_warn("Failed to send reply message"); free(buffer.data); - close(sfd); + pthread_cleanup_pop(true); tpm_inc(ipcpi.tpm); } - tpm_exit(ipcpi.tpm); - return (void *) 0; } @@ -778,6 +774,9 @@ int ipcp_wait_state(enum ipcp_state state, pthread_mutex_lock(&ipcpi.state_mtx); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &ipcpi.state_mtx); + while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN && ipcpi.state != IPCP_NULL @@ -791,7 +790,7 @@ int ipcp_wait_state(enum ipcp_state state, &abstime); } - pthread_mutex_unlock(&ipcpi.state_mtx); + pthread_cleanup_pop(true); return ret; } diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b06c4480..7ca555ab 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -288,13 +288,16 @@ static int dht_wait_running(struct dht * dht) pthread_mutex_lock(&dht->mtx); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &dht->mtx); + while (dht->state == DHT_JOINING) pthread_cond_wait(&dht->cond, &dht->mtx); if (dht->state != DHT_RUNNING) ret = -1; - pthread_mutex_unlock(&dht->mtx); + pthread_cleanup_pop(true); return ret; } @@ -379,6 +382,21 @@ static void kad_req_create(struct dht * dht, pthread_rwlock_unlock(&dht->lock); } +static void cancel_req_destroy(void * o) +{ + struct kad_req * req = (struct kad_req *) o; + + pthread_mutex_unlock(&req->lock); + + pthread_cond_destroy(&req->cond); + pthread_mutex_destroy(&req->lock); + + if (req->key != NULL) + free(req->key); + + free(req); +} + static void kad_req_destroy(struct kad_req * req) { assert(req); @@ -403,18 +421,12 @@ static void kad_req_destroy(struct kad_req * req) break; } + pthread_cleanup_push(cancel_req_destroy, req); + while (req->state != REQ_NULL && req->state != REQ_DONE) pthread_cond_wait(&req->cond, &req->lock); - pthread_mutex_unlock(&req->lock); - - pthread_cond_destroy(&req->cond); - pthread_mutex_destroy(&req->lock); - - if (req->key != NULL) - free(req->key); - - free(req); + pthread_cleanup_pop(true); } static int kad_req_wait(struct kad_req * req, @@ -434,6 +446,9 @@ static int kad_req_wait(struct kad_req * req, req->state = REQ_PENDING; + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &req->lock); + while (req->state == REQ_PENDING && ret != -ETIMEDOUT) ret = -pthread_cond_timedwait(&req->cond, &req->lock, &abs); @@ -452,7 +467,7 @@ static int kad_req_wait(struct kad_req * req, break; } - pthread_mutex_unlock(&req->lock); + pthread_cleanup_pop(true); return ret; } @@ -683,11 +698,34 @@ static void lookup_add_out(struct lookup * lu, pthread_mutex_unlock(&lu->lock); } -static void lookup_destroy(struct lookup * lu) +static void cancel_lookup_destroy(void * o) { + struct lookup * lu; struct list_head * p; struct list_head * h; + lu = (struct lookup *) o; + + if (lu->key != NULL) + free(lu->key); + if (lu->addrs != NULL) + free(lu->addrs); + + list_for_each_safe(p, h, &lu->contacts) { + struct contact * c = list_entry(p, struct contact, next); + list_del(&c->next); + contact_destroy(c); + } + + pthread_mutex_unlock(&lu->lock); + + pthread_mutex_destroy(&lu->lock); + + free(lu); +} + +static void lookup_destroy(struct lookup * lu) +{ assert(lu); pthread_mutex_lock(&lu->lock); @@ -711,25 +749,12 @@ static void lookup_destroy(struct lookup * lu) break; } + pthread_cleanup_push(cancel_lookup_destroy, lu); + while (lu->state != LU_NULL) pthread_cond_wait(&lu->cond, &lu->lock); - if (lu->key != NULL) - free(lu->key); - if (lu->addrs != NULL) - free(lu->addrs); - - list_for_each_safe(p, h, &lu->contacts) { - struct contact * c = list_entry(p, struct contact, next); - list_del(&c->next); - contact_destroy(c); - } - - pthread_mutex_unlock(&lu->lock); - - pthread_mutex_destroy(&lu->lock); - - free(lu); + pthread_cleanup_pop(true); } static void lookup_update(struct dht * dht, @@ -765,12 +790,17 @@ static void lookup_update(struct dht * dht, return; } + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &lu->lock); + while (lu->state == LU_INIT) { pthread_rwlock_unlock(&dht->lock); pthread_cond_wait(&lu->cond, &lu->lock); pthread_rwlock_rdlock(&dht->lock); } + pthread_cleanup_pop(false); + /* BUG: this should not be allowed since it's use-after-free. */ if (lu->state == LU_DESTROY || lu->state == LU_NULL) { log_warn("Use-after-free. Update aborted to avoid worse."); @@ -2302,10 +2332,8 @@ uint64_t dht_query(struct dht * dht, static void * dht_handle_sdu(void * o) { - struct dht * dht = (struct dht *) o; - struct timespec dl; - struct timespec to = {(HANDLE_TIMEO / 1000), - (HANDLE_TIMEO % 1000) * MILLION}; + struct dht * dht = (struct dht *) o; + assert(dht); while (true) { @@ -2318,28 +2346,19 @@ static void * dht_handle_sdu(void * o) size_t b; size_t t_expire; struct cmd * cmd; - int ret = 0; - - clock_gettime(CLOCK_REALTIME_COARSE, &dl); - ts_add(&dl, &to, &dl); pthread_mutex_lock(&dht->mtx); - while (list_is_empty(&dht->cmds) && ret != -ETIMEDOUT) - ret = -pthread_cond_timedwait(&dht->cond, - &dht->mtx, &dl); + pthread_cleanup_push((void *)(void *) pthread_mutex_unlock, + &dht->mtx); - if (ret == -ETIMEDOUT) { - pthread_mutex_unlock(&dht->mtx); - if (tpm_check(dht->tpm)) - break; - continue; - } + while (list_is_empty(&dht->cmds)) + pthread_cond_wait(&dht->cond, &dht->mtx); cmd = list_last_entry(&dht->cmds, struct cmd, next); list_del(&cmd->next); - pthread_mutex_unlock(&dht->mtx); + pthread_cleanup_pop(true); i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb); @@ -2487,8 +2506,6 @@ static void * dht_handle_sdu(void * o) tpm_inc(dht->tpm); } - tpm_exit(dht->tpm); - return (void *) 0; } |