summaryrefslogtreecommitdiff
path: root/src/irmd
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-09-30 17:58:18 +0200
committerdimitri staessens <[email protected]>2017-09-30 17:58:18 +0200
commit9405ad97e20686f74c06bcbac9523a8b4f10272e (patch)
treea0489929634ee7588de3ad77a6a1166ce11508e2 /src/irmd
parent5e974395fadc5e1922f200855c14ca0538ba50dc (diff)
downloadouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.tar.gz
ouroboros-9405ad97e20686f74c06bcbac9523a8b4f10272e.zip
lib: Cancel tpm threads instead of marking exit
This makes the threadpool use pthread_cancel instead of setting an exit flag that threadpool managed threads check periodically. This drastically reduces CPU consumption in the irmd when running a lot of applications. It requires cancellation handlers in the ipcp and irmd to be implemented to ensure safe cancellation during operation and shutdown.
Diffstat (limited to 'src/irmd')
-rw-r--r--src/irmd/api_table.c91
-rw-r--r--src/irmd/api_table.h10
-rw-r--r--src/irmd/irm_flow.c32
-rw-r--r--src/irmd/main.c86
-rw-r--r--src/irmd/registry.c59
5 files changed, 146 insertions, 132 deletions
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c
index df56dd02..a244f3a2 100644
--- a/src/irmd/api_table.c
+++ b/src/irmd/api_table.c
@@ -36,8 +36,6 @@
#include <limits.h>
#include <assert.h>
-#define ENTRY_SLEEP_TIMEOUT 10 /* ms */
-
struct api_entry * api_entry_create(pid_t api,
char * apn)
{
@@ -70,14 +68,14 @@ struct api_entry * api_entry_create(pid_t api,
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- if (pthread_mutex_init(&e->state_lock, NULL)) {
+ if (pthread_mutex_init(&e->lock, NULL)) {
free(e);
return NULL;
}
- if (pthread_cond_init(&e->state_cond, &cattr)) {
- pthread_mutex_destroy(&e->state_lock);
+ if (pthread_cond_init(&e->cond, &cattr)) {
+ pthread_mutex_destroy(&e->lock);
free(e);
return NULL;
}
@@ -85,6 +83,15 @@ struct api_entry * api_entry_create(pid_t api,
return e;
}
+void cancel_api_entry(void * o)
+{
+ struct api_entry * e = (struct api_entry *) o;
+
+ e->state = API_NULL;
+
+ pthread_mutex_unlock(&e->lock);
+}
+
void api_entry_destroy(struct api_entry * e)
{
struct list_head * p;
@@ -92,25 +99,29 @@ void api_entry_destroy(struct api_entry * e)
assert(e);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state == API_DESTROY) {
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
return;
}
if (e->state == API_SLEEP)
e->state = API_DESTROY;
- pthread_cond_signal(&e->state_cond);
+ pthread_cond_signal(&e->cond);
+
+ pthread_cleanup_push(cancel_api_entry, e);
while (e->state != API_INIT)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
- pthread_cond_destroy(&e->state_cond);
- pthread_mutex_destroy(&e->state_lock);
+ pthread_cond_destroy(&e->cond);
+ pthread_mutex_destroy(&e->lock);
if (e->apn != NULL)
free(e->apn);
@@ -164,39 +175,34 @@ void api_entry_del_name(struct api_entry * e,
}
}
-void api_entry_cancel(struct api_entry * e)
+int api_entry_sleep(struct api_entry * e,
+ struct timespec * timeo)
{
- pthread_mutex_lock(&e->state_lock);
-
- e->state = API_INIT;
- pthread_cond_broadcast(&e->state_cond);
-
- pthread_mutex_unlock(&e->state_lock);
-}
-
-int api_entry_sleep(struct api_entry * e)
-{
- struct timespec timeout = {(ENTRY_SLEEP_TIMEOUT / 1000),
- (ENTRY_SLEEP_TIMEOUT % 1000) * MILLION};
- struct timespec now;
struct timespec dl;
int ret = 0;
assert(e);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- ts_add(&now, &timeout, &dl);
+ if (timeo != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &dl);
+ ts_add(&dl, timeo, &dl);
+ }
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state != API_WAKE && e->state != API_DESTROY)
e->state = API_SLEEP;
+ pthread_cleanup_push(cancel_api_entry, e);
+
while (e->state == API_SLEEP && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&e->state_cond,
- &e->state_lock,
- &dl);
+ if (timeo)
+ ret = -pthread_cond_timedwait(&e->cond, &e->lock, &dl);
+ else
+ ret = -pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
if (e->state == API_DESTROY) {
if (e->re != NULL)
@@ -204,11 +210,10 @@ int api_entry_sleep(struct api_entry * e)
ret = -1;
}
- if (ret != -ETIMEDOUT)
- e->state = API_INIT;
+ e->state = API_INIT;
- pthread_cond_broadcast(&e->state_cond);
- pthread_mutex_unlock(&e->state_lock);
+ pthread_cond_broadcast(&e->cond);
+ pthread_mutex_unlock(&e->lock);
return ret;
}
@@ -219,25 +224,29 @@ void api_entry_wake(struct api_entry * e,
assert(e);
assert(re);
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
if (e->state != API_SLEEP) {
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
return;
}
e->state = API_WAKE;
e->re = re;
- pthread_cond_broadcast(&e->state_cond);
+ pthread_cond_broadcast(&e->cond);
+
+ pthread_cleanup_push(cancel_api_entry, e);
while (e->state == API_WAKE)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ pthread_cond_wait(&e->cond, &e->lock);
+
+ pthread_cleanup_pop(false);
if (e->state == API_DESTROY)
e->state = API_INIT;
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
}
int api_table_add(struct list_head * api_table,
diff --git a/src/irmd/api_table.h b/src/irmd/api_table.h
index d2ac7723..1fb2e285 100644
--- a/src/irmd/api_table.h
+++ b/src/irmd/api_table.h
@@ -23,6 +23,7 @@
#ifndef OUROBOROS_IRMD_API_TABLE_H
#define OUROBOROS_IRMD_API_TABLE_H
+#include "time.h"
#include "utils.h"
#include <unistd.h>
@@ -45,10 +46,10 @@ struct api_entry {
struct reg_entry * re; /* reg_entry for which a flow arrived */
- /* the api will block on this */
+ /* The process will block on this */
enum api_state state;
- pthread_cond_t state_cond;
- pthread_mutex_t state_lock;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
};
struct api_entry * api_entry_create(pid_t api,
@@ -56,7 +57,8 @@ struct api_entry * api_entry_create(pid_t api,
void api_entry_destroy(struct api_entry * e);
-int api_entry_sleep(struct api_entry * e);
+int api_entry_sleep(struct api_entry * e,
+ struct timespec * timeo);
void api_entry_wake(struct api_entry * e,
struct reg_entry * re);
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index e335ef48..991644c9 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -88,6 +88,21 @@ struct irm_flow * irm_flow_create(pid_t n_api,
return f;
}
+static void cancel_irm_destroy(void * o)
+{
+ struct irm_flow * f = (struct irm_flow *) o;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ pthread_cond_destroy(&f->state_cond);
+ pthread_mutex_destroy(&f->state_lock);
+
+ shm_rbuff_destroy(f->n_rb);
+ shm_rbuff_destroy(f->n_1_rb);
+
+ free(f);
+}
+
void irm_flow_destroy(struct irm_flow * f)
{
assert(f);
@@ -106,18 +121,12 @@ void irm_flow_destroy(struct irm_flow * f)
pthread_cond_signal(&f->state_cond);
+ pthread_cleanup_push(cancel_irm_destroy, f);
+
while (f->state != FLOW_NULL)
pthread_cond_wait(&f->state_cond, &f->state_lock);
- pthread_mutex_unlock(&f->state_lock);
-
- pthread_cond_destroy(&f->state_cond);
- pthread_mutex_destroy(&f->state_lock);
-
- shm_rbuff_destroy(f->n_rb);
- shm_rbuff_destroy(f->n_1_rb);
-
- free(f);
+ pthread_cleanup_pop(true);
}
enum flow_state irm_flow_get_state(struct irm_flow * f)
@@ -172,6 +181,9 @@ int irm_flow_wait_state(struct irm_flow * f,
assert(f->state != FLOW_NULL);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &f->state_lock);
+
while (!(f->state == state ||
f->state == FLOW_DESTROY ||
f->state == FLOW_DEALLOC_PENDING) &&
@@ -194,7 +206,7 @@ int irm_flow_wait_state(struct irm_flow * f,
s = f->state;
- pthread_mutex_unlock(&f->state_lock);
+ pthread_cleanup_pop(true);
return ret ? ret : s;
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 3fceadb6..64e4d459 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1111,19 +1111,11 @@ static int flow_accept(pid_t api,
struct reg_entry * re = NULL;
struct list_head * p = NULL;
- struct timespec dl;
- struct timespec now;
-
pid_t api_n1;
pid_t api_n;
int port_id;
int ret;
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- ts_add(&now, timeo, &dl);
- }
-
pthread_rwlock_wrlock(&irmd.reg_lock);
e = api_table_get(&irmd.api_table, api);
@@ -1147,28 +1139,12 @@ static int flow_accept(pid_t api,
pthread_rwlock_unlock(&irmd.reg_lock);
- while (true) {
- if (timeo != NULL && ts_diff_ns(&now, &dl) < 0) {
- log_dbg("Accept timed out.");
- return -ETIMEDOUT;
- }
-
- if (irmd_get_state() != IRMD_RUNNING)
- return -EIRMD;
+ ret = api_entry_sleep(e, timeo);
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
- ret = api_entry_sleep(e);
- if (ret == -ETIMEDOUT) {
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- api_entry_cancel(e);
- continue;
- }
-
- if (ret == -1)
- return -EPIPE;
-
- if (ret == 0)
- break;
- }
+ if (ret == -1)
+ return -EPIPE;
if (irmd_get_state() != IRMD_RUNNING) {
reg_entry_set_state(re, REG_NAME_NULL);
@@ -1206,11 +1182,11 @@ static int flow_accept(pid_t api,
return -EPERM;
}
- pthread_mutex_lock(&e->state_lock);
+ pthread_mutex_lock(&e->lock);
re = e->re;
- pthread_mutex_unlock(&e->state_lock);
+ pthread_mutex_unlock(&e->lock);
if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
pthread_rwlock_unlock(&irmd.reg_lock);
@@ -1915,19 +1891,25 @@ static void * acceptloop(void * o)
return (void *) 0;
}
-void * mainloop(void * o)
+static void close_ptr(void * o)
+{
+ close(*((int *) o));
+}
+
+static void free_msg(void * o)
+{
+ irm_msg__free_unpacked((irm_msg_t *) o, NULL);
+}
+
+static void * mainloop(void * o)
{
int sfd;
irm_msg_t * msg;
buffer_t buffer;
- struct timespec dl;
- struct timespec to = {(IRMD_ACCEPT_TIMEOUT / 1000),
- (IRMD_ACCEPT_TIMEOUT % 1000) * MILLION};
(void) o;
while (true) {
- int ret = 0;
irm_msg_t ret_msg = IRM_MSG__INIT;
struct irm_flow * e = NULL;
pid_t * apis = NULL;
@@ -1937,27 +1919,18 @@ void * mainloop(void * o)
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- clock_gettime(PTHREAD_COND_CLOCK, &dl);
- ts_add(&dl, &to, &dl);
-
pthread_mutex_lock(&irmd.cmd_lock);
- while (list_is_empty(&irmd.cmds) && ret != -ETIMEDOUT)
- ret = -pthread_cond_timedwait(&irmd.cmd_cond,
- &irmd.cmd_lock,
- &dl);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &irmd.cmd_lock);
- if (ret == -ETIMEDOUT) {
- pthread_mutex_unlock(&irmd.cmd_lock);
- if (tpm_check(irmd.tpm))
- break;
- continue;
- }
+ while (list_is_empty(&irmd.cmds))
+ pthread_cond_wait(&irmd.cmd_cond, &irmd.cmd_lock);
cmd = list_last_entry(&irmd.cmds, struct cmd, next);
list_del(&cmd->next);
- pthread_mutex_unlock(&irmd.cmd_lock);
+ pthread_cleanup_pop(true);
msg = irm_msg__unpack(NULL, cmd->len, cmd->cbuf);
sfd = cmd->fd;
@@ -1979,6 +1952,9 @@ void * mainloop(void * o)
timeo = &ts;
}
+ pthread_cleanup_push(close_ptr, &sfd);
+ pthread_cleanup_push(free_msg, msg);
+
switch (msg->code) {
case IRM_MSG_CODE__IRM_CREATE_IPCP:
ret_msg.has_result = true;
@@ -2106,7 +2082,8 @@ void * mainloop(void * o)
break;
}
- irm_msg__free_unpacked(msg, NULL);
+ pthread_cleanup_pop(true);
+ pthread_cleanup_pop(false);
if (ret_msg.result == -EPIPE || !ret_msg.has_result) {
close(sfd);
@@ -2138,18 +2115,19 @@ void * mainloop(void * o)
if (apis != NULL)
free(apis);
+ pthread_cleanup_push(close_ptr, &sfd);
+
if (write(sfd, buffer.data, buffer.len) == -1)
if (ret_msg.result != -EIRMD)
log_warn("Failed to send reply message.");
free(buffer.data);
- close(sfd);
+
+ pthread_cleanup_pop(true);
tpm_inc(irmd.tpm);
}
- tpm_exit(irmd.tpm);
-
return (void *) 0;
}
diff --git a/src/irmd/registry.c b/src/irmd/registry.c
index c7e7b52d..f863af6a 100644
--- a/src/irmd/registry.c
+++ b/src/irmd/registry.c
@@ -92,30 +92,13 @@ static int reg_entry_init(struct reg_entry * e,
return 0;
}
-static void reg_entry_destroy(struct reg_entry * e)
+static void cancel_reg_entry_destroy(void * o)
{
- struct list_head * p = NULL;
- struct list_head * h = NULL;
-
- if (e == NULL)
- return;
-
- pthread_mutex_lock(&e->state_lock);
-
- if (e->state == REG_NAME_DESTROY) {
- pthread_mutex_unlock(&e->state_lock);
- return;
- }
-
- if (e->state != REG_NAME_FLOW_ACCEPT)
- e->state = REG_NAME_NULL;
- else
- e->state = REG_NAME_DESTROY;
-
- pthread_cond_broadcast(&e->state_cond);
+ struct reg_entry * e;
+ struct list_head * p;
+ struct list_head * h;
- while (e->state != REG_NAME_NULL)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ e = (struct reg_entry *) o;
pthread_mutex_unlock(&e->state_lock);
@@ -148,6 +131,33 @@ static void reg_entry_destroy(struct reg_entry * e)
free(e);
}
+static void reg_entry_destroy(struct reg_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ pthread_mutex_lock(&e->state_lock);
+
+ if (e->state == REG_NAME_DESTROY) {
+ pthread_mutex_unlock(&e->state_lock);
+ return;
+ }
+
+ if (e->state != REG_NAME_FLOW_ACCEPT)
+ e->state = REG_NAME_NULL;
+ else
+ e->state = REG_NAME_DESTROY;
+
+ pthread_cond_broadcast(&e->state_cond);
+
+ pthread_cleanup_push(cancel_reg_entry_destroy, e);
+
+ while (e->state != REG_NAME_NULL)
+ pthread_cond_wait(&e->state_cond, &e->state_lock);
+
+ pthread_cleanup_pop(true);
+}
+
static bool reg_entry_is_local_in_dif(struct reg_entry * e,
const char * dif_name)
{
@@ -459,6 +469,9 @@ int reg_entry_leave_state(struct reg_entry * e,
pthread_mutex_lock(&e->state_lock);
+ pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+ &e->state_lock);
+
while (e->state == state && ret != -ETIMEDOUT)
if (timeout)
ret = -pthread_cond_timedwait(&e->state_cond,
@@ -474,7 +487,7 @@ int reg_entry_leave_state(struct reg_entry * e,
pthread_cond_broadcast(&e->state_cond);
}
- pthread_mutex_unlock(&e->state_lock);
+ pthread_cleanup_pop(true);
return ret;
}