summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-02-12 16:15:46 +0100
committerdimitri staessens <[email protected]>2017-02-12 22:19:50 +0100
commit98a15feabb6a14e52a54a09dfed58d55e0f99884 (patch)
tree0a67e3eb076558b6408d5744d74808e756e0639d /src
parent2ee140ec27335ca50e813080ee0e85e4ab86af37 (diff)
downloadouroboros-98a15feabb6a14e52a54a09dfed58d55e0f99884.tar.gz
ouroboros-98a15feabb6a14e52a54a09dfed58d55e0f99884.zip
irmd: Allow time for AP to call flow_accept()
When there is a burst of successive flow allocations for a certain name, each such request will block a thread in the IRMD for IRMD_REQ_ARR_TIMEOUT ms to allow the application some time to respond. This refactors some parts of the IRMd.
Diffstat (limited to 'src')
-rw-r--r--src/irmd/api_table.c15
-rw-r--r--src/irmd/main.c45
-rw-r--r--src/irmd/registry.c129
-rw-r--r--src/irmd/registry.h11
4 files changed, 153 insertions, 47 deletions
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c
index 7619fcf6..df300cea 100644
--- a/src/irmd/api_table.c
+++ b/src/irmd/api_table.c
@@ -159,16 +159,15 @@ int api_entry_sleep(struct api_entry * e)
e->state = API_SLEEP;
- while (e->state == API_SLEEP) {
- if ((ret = -pthread_cond_timedwait(&e->state_cond,
- &e->state_lock,
- &dl)) == -ETIMEDOUT) {
- break;
- }
- }
+ while (e->state == API_SLEEP && ret != -ETIMEDOUT)
+ ret = -pthread_cond_timedwait(&e->state_cond,
+ &e->state_lock,
+ &dl);
- if (e->state == API_DESTROY)
+ if (e->state == API_DESTROY) {
+ reg_entry_del_api(e->re, e->api);
ret = -1;
+ }
e->state = API_INIT;
pthread_cond_broadcast(&e->state_cond);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 2454a9ca..aa4614c1 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -970,7 +970,6 @@ static struct irm_flow * flow_accept(pid_t api,
log_err("Unknown instance %d calling accept.", api);
return NULL;
}
-
log_dbg("New instance (%d) of %s added.", api, e->apn);
log_dbg("This instance accepts flows for:");
list_for_each(p, &e->names) {
@@ -996,6 +995,7 @@ static struct irm_flow * flow_accept(pid_t api,
pthread_rwlock_rdlock(&irmd->state_lock);
if (irmd->state != IRMD_RUNNING) {
+ reg_entry_set_state(re, REG_NAME_NULL);
pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
@@ -1331,11 +1331,14 @@ static struct irm_flow * flow_req_arr(pid_t api,
pid_t h_api = -1;
int port_id = -1;
+ struct timespec wt = {IRMD_REQ_ARR_TIMEOUT % 1000,
+ (IRMD_REQ_ARR_TIMEOUT % 1000) * MILLION};
+
log_dbg("Flow req arrived from IPCP %d for %s on AE %s.",
api, dst_name, ae_name);
pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->reg_lock);
+ pthread_rwlock_rdlock(&irmd->reg_lock);
re = registry_get_entry(&irmd->registry, dst_name);
if (re == NULL) {
@@ -1345,6 +1348,18 @@ static struct irm_flow * flow_req_arr(pid_t api,
return NULL;
}
+ pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
+
+ /* Give the AP a bit of slop time to call accept */
+ if (reg_entry_leave_state(re, REG_NAME_IDLE, &wt) == -1) {
+ log_err("No APs for %s.", dst_name);
+ return NULL;
+ }
+
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->reg_lock);
+
switch (reg_entry_get_state(re)) {
case REG_NAME_IDLE:
pthread_rwlock_unlock(&irmd->reg_lock);
@@ -1378,17 +1393,12 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- reg_entry_leave_state(re, REG_NAME_AUTO_EXEC);
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_rdlock(&irmd->reg_lock);
-
- if (reg_entry_get_state(re) == REG_NAME_DESTROY) {
- reg_entry_set_state(re, REG_NAME_NULL);
+ if (reg_entry_leave_state(re, REG_NAME_AUTO_EXEC, NULL)) {
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return NULL;
}
+
case REG_NAME_FLOW_ACCEPT:
h_api = reg_entry_get_api(re);
if (h_api == -1) {
@@ -1453,7 +1463,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- reg_entry_leave_state(re, REG_NAME_FLOW_ARRIVED);
+ reg_entry_leave_state(re, REG_NAME_FLOW_ARRIVED, NULL);
return f;
}
@@ -1518,10 +1528,16 @@ static void irm_destroy(void)
list_del(&e->next);
ipcp_destroy(e->api);
clear_spawned_api(e->api);
+ registry_del_api(&irmd->registry, e->api);
ipcp_entry_destroy(e);
}
- registry_destroy(&irmd->registry);
+ list_for_each_safe(p, h, &irmd->api_table) {
+ struct api_entry * e = list_entry(p, struct api_entry, next);
+ list_del(&e->next);
+ registry_del_api(&irmd->registry, e->api);
+ api_entry_destroy(e);
+ }
list_for_each_safe(p, h, &irmd->spawned_apis) {
struct pid_el * e = list_entry(p, struct pid_el, next);
@@ -1531,6 +1547,7 @@ static void irm_destroy(void)
else if (waitpid(e->pid, &status, 0) < 0)
log_dbg("Error waiting for %d to exit.", e->pid);
list_del(&e->next);
+ registry_del_api(&irmd->registry, e->pid);
free(e);
}
@@ -1540,11 +1557,7 @@ static void irm_destroy(void)
apn_entry_destroy(e);
}
- list_for_each_safe(p, h, &irmd->api_table) {
- struct api_entry * e = list_entry(p, struct api_entry, next);
- list_del(&e->next);
- api_entry_destroy(e);
- }
+ registry_destroy(&irmd->registry);
pthread_rwlock_unlock(&irmd->reg_lock);
diff --git a/src/irmd/registry.c b/src/irmd/registry.c
index d22c1be3..985ecda0 100644
--- a/src/irmd/registry.c
+++ b/src/irmd/registry.c
@@ -25,6 +25,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/irm_config.h>
+#include <ouroboros/time_utils.h>
#include "registry.h"
#include "utils.h"
@@ -60,6 +61,8 @@ static struct reg_entry * reg_entry_create(void)
static struct reg_entry * reg_entry_init(struct reg_entry * e,
char * name)
{
+ pthread_condattr_t cattr;
+
if (e == NULL || name == NULL)
return NULL;
@@ -70,7 +73,13 @@ static struct reg_entry * reg_entry_init(struct reg_entry * e,
e->name = name;
- if (pthread_cond_init(&e->state_cond, NULL))
+ if (pthread_condattr_init(&cattr))
+ return NULL;
+
+#ifdef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&e->state_cond, &cattr))
return NULL;
if (pthread_mutex_init(&e->state_lock, NULL))
@@ -91,9 +100,21 @@ static void reg_entry_destroy(struct reg_entry * e)
pthread_mutex_lock(&e->state_lock);
- e->state = REG_NAME_DESTROY;
+ if (e->state == REG_NAME_DESTROY) {
+ pthread_mutex_unlock(&e->state_lock);
+ return;
+ }
+
+ if (e->state != REG_NAME_FLOW_ACCEPT)
+ e->state = REG_NAME_NULL;
+ else
+ e->state = REG_NAME_DESTROY;
pthread_cond_broadcast(&e->state_cond);
+
+ while (e->state != REG_NAME_NULL)
+ pthread_cond_wait(&e->state_cond, &e->state_lock);
+
pthread_mutex_unlock(&e->state_lock);
pthread_cond_destroy(&e->state_cond);
@@ -102,12 +123,6 @@ static void reg_entry_destroy(struct reg_entry * e)
if (e->name != NULL)
free(e->name);
- list_for_each_safe(p, h, &e->reg_apis) {
- struct pid_el * i = list_entry(p, struct pid_el, next);
- list_del(&i->next);
- free(i);
- }
-
list_for_each_safe(p, h, &e->reg_apns) {
struct str_el * a = list_entry(p, struct str_el, next);
list_del(&a->next);
@@ -171,7 +186,8 @@ static void reg_entry_del_local_from_dif(struct reg_entry * e,
}
}
-static bool reg_entry_has_apn(struct reg_entry * e, char * apn)
+static bool reg_entry_has_apn(struct reg_entry * e,
+ char * apn)
{
struct list_head * p;
@@ -184,7 +200,8 @@ static bool reg_entry_has_apn(struct reg_entry * e, char * apn)
return false;
}
-int reg_entry_add_apn(struct reg_entry * e, struct apn_entry * a)
+int reg_entry_add_apn(struct reg_entry * e,
+ struct apn_entry * a)
{
struct str_el * n;
@@ -215,7 +232,8 @@ int reg_entry_add_apn(struct reg_entry * e, struct apn_entry * a)
return 0;
}
-void reg_entry_del_apn(struct reg_entry * e, char * apn)
+void reg_entry_del_apn(struct reg_entry * e,
+ char * apn)
{
struct list_head * p = NULL;
struct list_head * h = NULL;
@@ -244,7 +262,8 @@ char * reg_entry_get_apn(struct reg_entry * e)
return list_first_entry(&e->reg_apns, struct str_el, next)->str;
}
-static bool reg_entry_has_api(struct reg_entry * e, pid_t api)
+static bool reg_entry_has_api(struct reg_entry * e,
+ pid_t api)
{
struct list_head * p;
@@ -257,7 +276,8 @@ static bool reg_entry_has_api(struct reg_entry * e, pid_t api)
return false;
}
-int reg_entry_add_api(struct reg_entry * e, pid_t api)
+int reg_entry_add_api(struct reg_entry * e,
+ pid_t api)
{
struct pid_el * i;
@@ -288,7 +308,7 @@ int reg_entry_add_api(struct reg_entry * e, pid_t api)
e->state == REG_NAME_AUTO_ACCEPT ||
e->state == REG_NAME_AUTO_EXEC) {
e->state = REG_NAME_FLOW_ACCEPT;
- pthread_cond_signal(&e->state_cond);
+ pthread_cond_broadcast(&e->state_cond);
}
pthread_mutex_unlock(&e->state_lock);
@@ -298,6 +318,12 @@ int reg_entry_add_api(struct reg_entry * e, pid_t api)
static void reg_entry_check_state(struct reg_entry * e)
{
+ if (e->state == REG_NAME_DESTROY) {
+ e->state = REG_NAME_NULL;
+ pthread_cond_broadcast(&e->state_cond);
+ return;
+ }
+
if (list_is_empty(&e->reg_apis)) {
if (!list_is_empty(&e->reg_apns))
e->state = REG_NAME_AUTO_ACCEPT;
@@ -319,7 +345,8 @@ void reg_entry_del_pid_el(struct reg_entry * e,
reg_entry_check_state(e);
}
-void reg_entry_del_api(struct reg_entry * e, pid_t api)
+void reg_entry_del_api(struct reg_entry * e,
+ pid_t api)
{
struct list_head * p;
struct list_head * h;
@@ -365,7 +392,8 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e)
return state;
}
-int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state)
+int reg_entry_set_state(struct reg_entry * e,
+ enum reg_name_state state)
{
if (state == REG_NAME_DESTROY)
return -EPERM;
@@ -380,19 +408,80 @@ int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state)
return 0;
}
-int reg_entry_leave_state(struct reg_entry * e, enum reg_name_state state)
+int reg_entry_leave_state(struct reg_entry * e,
+ enum reg_name_state state,
+ struct timespec * timeout)
{
+ struct timespec abstime;
+ int ret = 0;
+
if (e == NULL || state == REG_NAME_DESTROY)
return -EINVAL;
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
pthread_mutex_lock(&e->state_lock);
- while (e->state == state)
- pthread_cond_wait(&e->state_cond, &e->state_lock);
+ while (e->state == state && ret != -ETIMEDOUT)
+ if (timeout)
+ ret = -pthread_cond_timedwait(&e->state_cond,
+ &e->state_lock,
+ timeout);
+ else
+ ret = -pthread_cond_wait(&e->state_cond,
+ &e->state_lock);
+
+ if (e->state == REG_NAME_DESTROY) {
+ ret = -1;
+ e->state = REG_NAME_NULL;
+ pthread_cond_broadcast(&e->state_cond);
+ }
pthread_mutex_unlock(&e->state_lock);
- return 0;
+ return ret;
+}
+
+int reg_entry_wait_state(struct reg_entry * e,
+ enum reg_name_state state,
+ struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret = 0;
+
+ if (e == NULL || state == REG_NAME_DESTROY)
+ return -EINVAL;
+
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_mutex_lock(&e->state_lock);
+
+ while (e->state != state &&
+ e->state != REG_NAME_DESTROY &&
+ ret != -ETIMEDOUT)
+ if (timeout)
+ ret = -pthread_cond_timedwait(&e->state_cond,
+ &e->state_lock,
+ timeout);
+ else
+ ret = -pthread_cond_wait(&e->state_cond,
+ &e->state_lock);
+
+ if (e->state == REG_NAME_DESTROY) {
+ ret = -1;
+ e->state = REG_NAME_NULL;
+ pthread_cond_broadcast(&e->state_cond);
+ }
+
+ pthread_mutex_unlock(&e->state_lock);
+
+ return ret;
}
struct reg_entry * registry_get_entry(struct list_head * registry,
diff --git a/src/irmd/registry.h b/src/irmd/registry.h
index 7713e278..67e4da40 100644
--- a/src/irmd/registry.h
+++ b/src/irmd/registry.h
@@ -91,11 +91,16 @@ pid_t reg_entry_get_api(struct reg_entry * e);
enum reg_name_state reg_entry_get_state(struct reg_entry * e);
-int reg_entry_set_state(struct reg_entry * e,
+int reg_entry_set_state(struct reg_entry * e,
enum reg_name_state state);
-int reg_entry_leave_state(struct reg_entry * e,
- enum reg_name_state state);
+int reg_entry_leave_state(struct reg_entry * e,
+ enum reg_name_state state,
+ struct timespec * timeout);
+
+int reg_entry_wait_state(struct reg_entry * e,
+ enum reg_name_state state,
+ struct timespec * timeout);
struct reg_entry * registry_add_name(struct list_head * registry,
char * name);