summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-08-22 14:58:12 +0200
committerSander Vrijders <[email protected]>2017-08-22 15:00:56 +0200
commit514791e5c6ded690aaf6dc43709dd02bc6a2ff6a (patch)
treeb751a94678188772c5772261549fe772a7872427
parent911a0ba062f7fd9debfebd330e94ff7de6ac1101 (diff)
downloadouroboros-514791e5c6ded690aaf6dc43709dd02bc6a2ff6a.tar.gz
ouroboros-514791e5c6ded690aaf6dc43709dd02bc6a2ff6a.zip
lib: Make timerwheel a passive component
This turns the timerwheel into a passive component since it is used by application using the library. The user of the timerwheel now has to call timerwheel_move to advance the timerwheel.
-rw-r--r--include/ouroboros/timerwheel.h2
-rw-r--r--src/lib/dev.c21
-rw-r--r--src/lib/tests/timerwheel_test.c6
-rw-r--r--src/lib/timerwheel.c158
4 files changed, 25 insertions, 162 deletions
diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h
index e259c855..b0c9ee29 100644
--- a/include/ouroboros/timerwheel.h
+++ b/include/ouroboros/timerwheel.h
@@ -42,4 +42,6 @@ int timerwheel_restart(struct timerwheel * tw,
void timerwheel_stop(struct timerwheel * tw,
struct tw_f * f);
+void timerwheel_move(struct timerwheel * tw);
+
#endif /* OUROBOROS_LIB_TIMERWHEEL_H */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1018f556..52a56097 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -83,11 +83,7 @@ struct frcti {
uint64_t rcv_lwe;
uint64_t rcv_rwe;
- bool resource_control;
- bool reliable;
- bool error_check;
- bool ordered;
- bool partial;
+ uint8_t conf_flags;
};
struct port {
@@ -121,7 +117,6 @@ struct {
struct shm_flow_set * fqset;
struct timerwheel * tw;
- int tw_users;
struct bmp * fds;
struct bmp * fqueues;
@@ -317,6 +312,12 @@ static int frcti_write(int fd,
frcti = &(ai.frcti[fd]);
+ pthread_rwlock_unlock(&ai.lock);
+
+ timerwheel_move(ai.tw);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
/*
* Set the DRF in the first packet of a new run of SDUs,
* otherwise simply recharge the timer.
@@ -337,7 +338,7 @@ static int frcti_write(int fd,
pci.seqno = frcti->snd_lwe++;
pci.type |= PDU_TYPE_DATA;
- if (frct_pci_ser(sdb, &pci, frcti->error_check))
+ if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK))
return -1;
if (finalize_write(fd, shm_du_buff_get_idx(sdb)))
@@ -368,6 +369,8 @@ static ssize_t frcti_read(int fd)
struct frct_pci pci;
struct shm_du_buff * sdb;
+ timerwheel_move(ai.tw);
+
pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
@@ -399,7 +402,7 @@ static ssize_t frcti_read(int fd)
sdb = shm_rdrbuff_get(ai.rdrb, idx);
/* SDU may be corrupted. */
- if (frct_pci_des(sdb, &pci, frcti->error_check)) {
+ if (frct_pci_des(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -1;
@@ -460,6 +463,8 @@ static int frcti_event_wait(struct flow_set * set,
assert(fq);
assert(timeout);
+ timerwheel_move(ai.tw);
+
/*
* FIXME: Return the fq only if a data SDU
* for the application is available.
diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c
index d9ca164e..d7478487 100644
--- a/src/lib/tests/timerwheel_test.c
+++ b/src/lib/tests/timerwheel_test.c
@@ -91,14 +91,12 @@ int timerwheel_test(int argc, char ** argv)
nanosleep(&wait, NULL);
- /* On some systems and VMs, the scheduler may be too slow. */
- if (total != check_total)
- nanosleep(&wait, NULL);
+ timerwheel_move(tw);
timerwheel_destroy(tw);
if (total != check_total) {
- printf("Totals do not match.\n");
+ printf("Totals do not match: %d and %d.\n", total, check_total);
return -1;
}
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 76f0ab32..2952c5d3 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -39,12 +39,6 @@
#define tw_free(tw) (tw_used(tw) + 1 < tw->elements)
#define tw_empty(tw) (tw->head == tw->tail)
-enum tw_state {
- TW_NULL = 0,
- TW_RUNNING,
- TW_DESTROY
-};
-
struct tw_f {
struct list_head next;
void (* func)(void *);
@@ -63,19 +57,10 @@ struct timerwheel {
size_t pos;
- struct list_head wq;
-
- pthread_cond_t work;
pthread_mutex_t lock;
- int resolution;
+ time_t resolution;
unsigned int elements;
-
- enum tw_state state;
- pthread_mutex_t s_lock;
-
- pthread_t ticker;
- pthread_t worker;
};
static void tw_el_fini(struct tw_el * e)
@@ -89,72 +74,8 @@ static void tw_el_fini(struct tw_el * e)
}
}
-static enum tw_state tw_get_state(struct timerwheel * tw)
-{
- enum tw_state state;
-
- assert(tw);
-
- pthread_mutex_lock(&tw->s_lock);
-
- state = tw->state;
-
- pthread_mutex_unlock(&tw->s_lock);
-
- return state;
-}
-
-static void tw_set_state(struct timerwheel * tw, enum tw_state state)
-{
- assert(tw);
- assert(state != TW_NULL);
-
- pthread_mutex_lock(&tw->s_lock);
-
- tw->state = state;
-
- pthread_mutex_unlock(&tw->s_lock);
-}
-
-static void * worker(void * o)
-{
- struct list_head * p;
- struct list_head * h;
-
- struct timerwheel * tw = (struct timerwheel *) o;
- struct timespec dl;
- struct timespec now;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- ts_add(&now, &tw->intv, &dl);
-
- pthread_mutex_lock(&tw->lock);
-
- while (tw_get_state(tw) == TW_RUNNING) {
- if (pthread_cond_timedwait(&tw->work, &tw->lock, &dl)
- == ETIMEDOUT)
- ts_add(&dl, &tw->intv, &dl);
-
- list_for_each_safe(p, h, &tw->wq) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
- pthread_mutex_unlock(&tw->lock);
- f->func(f->arg);
- free(f);
-
- pthread_mutex_lock(&tw->lock);
- }
- }
-
- pthread_mutex_unlock(&tw->lock);
-
- return (void *) o;
-}
-
-static void * movement(void * o)
+void timerwheel_move(struct timerwheel * tw)
{
- struct timerwheel * tw = (struct timerwheel *) o;
struct timespec now = {0, 0};
long ms = tw->resolution * tw->elements;
struct timespec total = {ms / 1000,
@@ -162,21 +83,16 @@ static void * movement(void * o)
struct list_head * p;
struct list_head * h;
- while (tw_get_state(tw) == TW_RUNNING) {
- clock_gettime(CLOCK_MONOTONIC, &now);
-
- pthread_mutex_lock(&tw->lock);
+ clock_gettime(CLOCK_MONOTONIC, &now);
- if (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) < 0) {
- pthread_mutex_unlock(&tw->lock);
- nanosleep(&tw->intv, NULL);
- continue;
- }
+ pthread_mutex_lock(&tw->lock);
+ while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {
list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {
struct tw_f * f = list_entry(p, struct tw_f, next);
list_del(&f->next);
- list_add(&f->next, &tw->wq);
+ f->func(f->arg);
+ free(f);
}
ts_add(&tw->wheel[tw->pos].expiry,
@@ -184,13 +100,9 @@ static void * movement(void * o)
&tw->wheel[tw->pos].expiry);
tw->pos = (tw->pos + 1) & (tw->elements - 1);
-
- pthread_cond_signal(&tw->work);
-
- pthread_mutex_unlock(&tw->lock);
}
- return (void *) 0;
+ pthread_mutex_unlock(&tw->lock);
}
struct timerwheel * timerwheel_create(time_t resolution,
@@ -203,8 +115,6 @@ struct timerwheel * timerwheel_create(time_t resolution,
struct timerwheel * tw;
- pthread_condattr_t cattr;
-
assert(resolution != 0);
tw = malloc(sizeof(*tw));
@@ -228,25 +138,10 @@ struct timerwheel * timerwheel_create(time_t resolution,
tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;
tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION;
- list_head_init(&tw->wq);
-
if (pthread_mutex_init(&tw->lock, NULL))
goto fail_lock_init;
- if (pthread_mutex_init(&tw->s_lock, NULL))
- goto fail_s_lock_init;
-
- if (pthread_condattr_init(&cattr))
- goto fail_cond_init;
-
-#ifndef __APPLE__
- pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
-#endif
- if (pthread_cond_init(&tw->work, &cattr))
- goto fail_cond_init;
-
tw->pos = 0;
- tw->state = TW_RUNNING;
clock_gettime(CLOCK_MONOTONIC, &now);
now.tv_nsec -= (now.tv_nsec % MILLION);
@@ -257,24 +152,8 @@ struct timerwheel * timerwheel_create(time_t resolution,
ts_add(&now, &res_ts, &now);
}
- if (pthread_create(&tw->worker, NULL, worker, (void *) tw))
- goto fail_worker_create;
-
- if (pthread_create(&tw->ticker, NULL, movement, (void *) tw)) {
- tw_set_state(tw, TW_DESTROY);
- goto fail_ticker_create;
- }
-
return tw;
- fail_ticker_create:
- pthread_join(tw->worker, NULL);
- fail_worker_create:
- pthread_cond_destroy(&tw->work);
- fail_cond_init:
- pthread_mutex_destroy(&tw->s_lock);
- fail_s_lock_init:
- pthread_mutex_destroy(&tw->lock);
fail_lock_init:
free(tw->wheel);
fail_wheel_malloc:
@@ -286,31 +165,10 @@ void timerwheel_destroy(struct timerwheel * tw)
{
unsigned long i;
- struct list_head * p;
- struct list_head * h;
-
- tw_set_state(tw, TW_DESTROY);
-
- pthread_join(tw->ticker, NULL);
- pthread_join(tw->worker, NULL);
-
for (i = 0; i < tw->elements; ++i)
tw_el_fini(&tw->wheel[i]);
- pthread_mutex_lock(&tw->lock);
-
- list_for_each_safe(p, h, &tw->wq) {
- struct tw_f * f = list_entry(p, struct tw_f, next);
- list_del(&f->next);
- free(f);
- }
-
- pthread_mutex_unlock(&tw->lock);
-
- pthread_cond_destroy(&tw->work);
pthread_mutex_destroy(&tw->lock);
- pthread_mutex_destroy(&tw->s_lock);
-
free(tw->wheel);
free(tw);
}