diff options
author | Dimitri Staessens <[email protected]> | 2022-03-26 19:02:17 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2022-03-30 15:05:05 +0200 |
commit | 643c285c20abab5dadaa5c1929d978b725911b5d (patch) | |
tree | 7472dcac30c7707b9fb118db447b0f2c604cac30 /src/lib/dev.c | |
parent | 8a066315ef2d6baeeece08b94f86ad54e1ec6ee2 (diff) | |
download | ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.tar.gz ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.zip |
lib: Move timerwheel processing to its own thread
This is the first step moving away from scheduling the FRCT and flow
monitoring functions as part of the IPC calls (flow_read / flow_write
/ fevent) and towards the more scalable (and far less complicated)
implementation to take care of these functions in separate threads.
If a process creates the first flow that requires FRCT, it will spin
up a thread to process events on the timerwheel (retransmissions and
delayed ACKs). This single thread lives until the last flow with FRCT
is deallocated.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index a0f7398b..ac885711 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -147,6 +147,9 @@ struct { struct flow * flows; struct port * ports; + pthread_t tx; + size_t n_frcti; + pthread_rwlock_t lock; } ai; @@ -262,17 +265,39 @@ static void flow_clear(int fd) #include "crypt.c" #include "frct.c" +void * frct_tx(void * o) +{ + struct timespec tic = {0, TICTIME}; + + (void) o; + + while (true) { + timerwheel_move(); + + nanosleep(&tic, NULL); + } + + return (void *) 0; +} + static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); + if (ai.flows[fd].frcti != NULL) { + ai.n_frcti--; + if (ai.n_frcti == 0) { + pthread_cancel(ai.tx); + pthread_join(ai.tx, NULL); + } + frcti_destroy(ai.flows[fd].frcti); + } + if (ai.flows[fd].flow_id != -1) { port_destroy(&ai.ports[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } - if (ai.flows[fd].frcti != NULL) - frcti_destroy(ai.flows[fd].frcti); if (ai.flows[fd].rx_rb != NULL) { shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); @@ -354,6 +379,11 @@ static int flow_init(int flow_id, flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl); if (flow->frcti == NULL) goto fail_frcti; + + ++ai.n_frcti; + if (ai.n_frcti == 1 && + pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0) + goto fail_tx_thread; } ai.ports[flow_id].fd = fd; @@ -364,6 +394,8 @@ static int flow_init(int flow_id, return fd; + fail_tx_thread: + frcti_destroy(flow->frcti); fail_frcti: crypt_fini(flow->ctx); fail_ctx: @@ -1182,8 +1214,6 @@ ssize_t flow_write(int fd, if (flow_keepalive(fd)) return -EFLOWPEER; - frcti_tick(flow->frcti); - ts_add(&tictime, &tic, &tictime); } idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); @@ -1300,8 +1330,6 @@ ssize_t flow_read(int fd, idx = flow_rx_sdb(flow, &sdb, block, &tictime); if (idx < 0) { - frcti_tick(flow->frcti); - if (idx != -ETIMEDOUT) return idx; @@ -1326,8 +1354,6 @@ ssize_t flow_read(int fd, sdb = shm_rdrbuff_get(ai.rdrb, idx); - frcti_tick(flow->frcti); - pthread_rwlock_unlock(&ai.lock); packet = shm_du_buff_head(sdb); @@ -1910,8 +1936,6 @@ int ipcp_flow_read(int fd, frcti_rcv(flow->frcti, *sdb); } - frcti_tick(flow->frcti); - pthread_rwlock_unlock(&ai.lock); return 0; |