summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2022-03-26 19:02:17 +0100
committerSander Vrijders <[email protected]>2022-03-30 15:05:05 +0200
commit643c285c20abab5dadaa5c1929d978b725911b5d (patch)
tree7472dcac30c7707b9fb118db447b0f2c604cac30 /src
parent8a066315ef2d6baeeece08b94f86ad54e1ec6ee2 (diff)
downloadouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.tar.gz
ouroboros-643c285c20abab5dadaa5c1929d978b725911b5d.zip
lib: Move timerwheel processing to its own thread
This is the first step moving away from scheduling the FRCT and flow monitoring functions as part of the IPC calls (flow_read / flow_write / fevent) and towards the more scalable (and far less complicated) implementation to take care of these functions in separate threads. If a process creates the first flow that requires FRCT, it will spin up a thread to process events on the timerwheel (retransmissions and delayed ACKs). This single thread lives until the last flow with FRCT is deallocated. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src')
-rw-r--r--src/lib/dev.c44
-rw-r--r--src/lib/frct.c8
-rw-r--r--src/lib/timerwheel.c9
3 files changed, 34 insertions, 27 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index a0f7398b..ac885711 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -147,6 +147,9 @@ struct {
struct flow * flows;
struct port * ports;
+ pthread_t tx;
+ size_t n_frcti;
+
pthread_rwlock_t lock;
} ai;
@@ -262,17 +265,39 @@ static void flow_clear(int fd)
#include "crypt.c"
#include "frct.c"
+void * frct_tx(void * o)
+{
+ struct timespec tic = {0, TICTIME};
+
+ (void) o;
+
+ while (true) {
+ timerwheel_move();
+
+ nanosleep(&tic, NULL);
+ }
+
+ return (void *) 0;
+}
+
static void flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+ if (ai.flows[fd].frcti != NULL) {
+ ai.n_frcti--;
+ if (ai.n_frcti == 0) {
+ pthread_cancel(ai.tx);
+ pthread_join(ai.tx, NULL);
+ }
+ frcti_destroy(ai.flows[fd].frcti);
+ }
+
if (ai.flows[fd].flow_id != -1) {
port_destroy(&ai.ports[ai.flows[fd].flow_id]);
bmp_release(ai.fds, fd);
}
- if (ai.flows[fd].frcti != NULL)
- frcti_destroy(ai.flows[fd].frcti);
if (ai.flows[fd].rx_rb != NULL) {
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
@@ -354,6 +379,11 @@ static int flow_init(int flow_id,
flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
if (flow->frcti == NULL)
goto fail_frcti;
+
+ ++ai.n_frcti;
+ if (ai.n_frcti == 1 &&
+ pthread_create(&ai.tx, NULL, frct_tx, NULL) < 0)
+ goto fail_tx_thread;
}
ai.ports[flow_id].fd = fd;
@@ -364,6 +394,8 @@ static int flow_init(int flow_id,
return fd;
+ fail_tx_thread:
+ frcti_destroy(flow->frcti);
fail_frcti:
crypt_fini(flow->ctx);
fail_ctx:
@@ -1182,8 +1214,6 @@ ssize_t flow_write(int fd,
if (flow_keepalive(fd))
return -EFLOWPEER;
- frcti_tick(flow->frcti);
-
ts_add(&tictime, &tic, &tictime);
}
idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
@@ -1300,8 +1330,6 @@ ssize_t flow_read(int fd,
idx = flow_rx_sdb(flow, &sdb, block, &tictime);
if (idx < 0) {
- frcti_tick(flow->frcti);
-
if (idx != -ETIMEDOUT)
return idx;
@@ -1326,8 +1354,6 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
- frcti_tick(flow->frcti);
-
pthread_rwlock_unlock(&ai.lock);
packet = shm_du_buff_head(sdb);
@@ -1910,8 +1936,6 @@ int ipcp_flow_read(int fd,
frcti_rcv(flow->frcti, *sdb);
}
- frcti_tick(flow->frcti);
-
pthread_rwlock_unlock(&ai.lock);
return 0;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index a93a1006..2d31e6f2 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -467,9 +467,6 @@ static void frcti_setflags(struct frcti * frcti,
#define frcti_rcv(frcti, sdb) \
(frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
-#define frcti_tick(frcti) \
- (frcti == NULL ? 0 : __frcti_tick())
-
#define frcti_dealloc(frcti) \
(frcti == NULL ? 0 : __frcti_dealloc(frcti))
@@ -769,11 +766,6 @@ static void rtt_estimator(struct frcti * frcti,
frcti->rto = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 2));
}
-static void __frcti_tick(void)
-{
- timerwheel_move();
-}
-
/* Always queues the next application packet on the RQ. */
static void __frcti_rcv(struct frcti * frcti,
struct shm_du_buff * sdb)
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index c3be08e0..cfdf2c9f 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -62,8 +62,6 @@ struct {
size_t prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
size_t prv_ack; /* Last processed ack slot. */
pthread_mutex_t lock;
-
- bool in_use;
} rw;
static void timerwheel_fini(void)
@@ -141,9 +139,6 @@ static void timerwheel_move(void)
size_t i;
size_t j;
- if (!__sync_bool_compare_and_swap(&rw.in_use, true, true))
- return;
-
pthread_mutex_lock(&rw.lock);
pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock);
@@ -383,8 +378,6 @@ static int timerwheel_rxm(struct frcti * frcti,
#endif
pthread_mutex_unlock(&rw.lock);
- __sync_bool_compare_and_swap(&rw.in_use, false, true);
-
return 0;
}
@@ -426,7 +419,5 @@ static int timerwheel_ack(int fd,
pthread_mutex_unlock(&rw.lock);
- __sync_bool_compare_and_swap(&rw.in_use, false, true);
-
return 0;
}