diff options
author | Dimitri Staessens <[email protected]> | 2022-03-26 19:02:17 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2022-03-30 15:05:05 +0200 |
commit | 643c285c20abab5dadaa5c1929d978b725911b5d (patch) | |
tree | 7472dcac30c7707b9fb118db447b0f2c604cac30 | |
parent | 8a066315ef2d6baeeece08b94f86ad54e1ec6ee2 (diff) | |
download | ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.tar.gz ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.zip |
lib: Move timerwheel processing to its own thread
This is the first step moving away from scheduling the FRCT and flow
monitoring functions as part of the IPC calls (flow_read / flow_write
/ fevent) and towards the more scalable (and far less complicated)
implementation to take care of these functions in separate threads.
If a process creates the first flow that requires FRCT, it will spin
up a thread to process events on the timerwheel (retransmissions and
delayed ACKs). This single thread lives until the last flow with FRCT
is deallocated.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r-- | src/lib/dev.c | 44 | ||||
-rw-r--r-- | src/lib/frct.c | 8 | ||||
-rw-r--r-- | src/lib/timerwheel.c | 9 |
3 files changed, 34 insertions, 27 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index a0f7398b..ac885711 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -147,6 +147,9 @@ struct { struct flow * flows; struct port * ports; + pthread_t tx; + size_t n_frcti; + pthread_rwlock_t lock; } ai; @@ -262,17 +265,39 @@ static void flow_clear(int fd) #include "crypt.c" #include "frct.c" +void * frct_tx(void * o) +{ + struct timespec tic = {0, TICTIME}; + + (void) o; + + while (true) { + timerwheel_move(); + + nanosleep(&tic, NULL); + } + + return (void *) 0; +} + static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); + if (ai.flows[fd].frcti != NULL) { + ai.n_frcti--; + if (ai.n_frcti == 0) { + pthread_cancel(ai.tx); + pthread_join(ai.tx, NULL); + } + frcti_destroy(ai.flows[fd].frcti); + } + if (ai.flows[fd].flow_id != -1) { port_destroy(&ai.ports[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } - if (ai.flows[fd].frcti != NULL) - frcti_destroy(ai.flows[fd].frcti); if (ai.flows[fd].rx_rb != NULL) { shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); @@ -354,6 +379,11 @@ static int flow_init(int flow_id, flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl); if (flow->frcti == NULL) goto fail_frcti; + + ++ai.n_frcti; + if (ai.n_frcti == 1 && + pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0) + goto fail_tx_thread; } ai.ports[flow_id].fd = fd; @@ -364,6 +394,8 @@ static int flow_init(int flow_id, return fd; + fail_tx_thread: + frcti_destroy(flow->frcti); fail_frcti: crypt_fini(flow->ctx); fail_ctx: @@ -1182,8 +1214,6 @@ ssize_t flow_write(int fd, if (flow_keepalive(fd)) return -EFLOWPEER; - frcti_tick(flow->frcti); - ts_add(&tictime, &tic, &tictime); } idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); @@ -1300,8 +1330,6 @@ ssize_t flow_read(int fd, idx = flow_rx_sdb(flow, &sdb, block, &tictime); if (idx < 0) { - frcti_tick(flow->frcti); - if (idx != -ETIMEDOUT) return idx; @@ -1326,8 +1354,6 @@ ssize_t flow_read(int fd, sdb = shm_rdrbuff_get(ai.rdrb, idx); - frcti_tick(flow->frcti); - pthread_rwlock_unlock(&ai.lock); packet = shm_du_buff_head(sdb); @@ -1910,8 +1936,6 @@ int ipcp_flow_read(int fd, frcti_rcv(flow->frcti, *sdb); } - frcti_tick(flow->frcti); - pthread_rwlock_unlock(&ai.lock); return 0; diff --git a/src/lib/frct.c b/src/lib/frct.c index a93a1006..2d31e6f2 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -467,9 +467,6 @@ static void frcti_setflags(struct frcti * frcti, #define frcti_rcv(frcti, sdb) \ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) -#define frcti_tick(frcti) \ - (frcti == NULL ? 0 : __frcti_tick()) - #define frcti_dealloc(frcti) \ (frcti == NULL ? 0 : __frcti_dealloc(frcti)) @@ -769,11 +766,6 @@ static void rtt_estimator(struct frcti * frcti, frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 2)); } -static void __frcti_tick(void) -{ - timerwheel_move(); -} - /* Always queues the next application packet on the RQ. */ static void __frcti_rcv(struct frcti * frcti, struct shm_du_buff * sdb) diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c index c3be08e0..cfdf2c9f 100644 --- a/src/lib/timerwheel.c +++ b/src/lib/timerwheel.c @@ -62,8 +62,6 @@ struct { size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */ size_t prv_ack; /* Last processed ack slot. */ pthread_mutex_t lock; - - bool in_use; } rw; static void timerwheel_fini(void) @@ -141,9 +139,6 @@ static void timerwheel_move(void) size_t i; size_t j; - if (!__sync_bool_compare_and_swap(&rw.in_use, true, true)) - return; - pthread_mutex_lock(&rw.lock); pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock); @@ -383,8 +378,6 @@ static int timerwheel_rxm(struct frcti * frcti, #endif pthread_mutex_unlock(&rw.lock); - __sync_bool_compare_and_swap(&rw.in_use, false, true); - return 0; } @@ -426,7 +419,5 @@ static int timerwheel_ack(int fd, pthread_mutex_unlock(&rw.lock); - __sync_bool_compare_and_swap(&rw.in_use, false, true); - return 0; } |