diff options
author | Dimitri Staessens <[email protected]> | 2022-03-27 18:18:52 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2022-03-30 15:05:06 +0200 |
commit | aaa9537b332ff09dde6af852fd9a95e64dea5dda (patch) | |
tree | bbf3f2db95c2dee34374431fd23fc6b0bb84e5dd | |
parent | 02b3893b1ec392f1b3ca030a03267c31eb1dc290 (diff) | |
download | ouroboros-aaa9537b332ff09dde6af852fd9a95e64dea5dda.tar.gz ouroboros-aaa9537b332ff09dde6af852fd9a95e64dea5dda.zip |
lib: Move flow monitoring to its own thread
This adds a monitoring thread to handle flow keepalive management in
the application and removes the thread interruptions to schedule FRCT
calls within the regular IPC calls.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r-- | src/lib/dev.c | 304 |
1 files changed, 130 insertions, 174 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index b3e9c69e..5a57aa08 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -123,9 +123,6 @@ struct flow_set_entry { struct flow_set { size_t idx; - struct timespec chk; /* Last keepalive check. */ - uint32_t min; /* Minimum keepalive time in set. */ - struct list_head flows; pthread_rwlock_t lock; }; @@ -148,6 +145,11 @@ struct { struct flow * flows; struct port * ports; + pthread_t mon; + int min_timeo; + int min_fd; + int max_fd; + pthread_t tx; size_t n_frcti; @@ -255,14 +257,6 @@ static int proc_announce(char * prog) return ret; } -static void flow_clear(int fd) -{ - memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - - ai.flows[fd].flow_id = -1; - ai.flows[fd].pid = -1; -} - #include "crypt.c" #include "frct.c" @@ -281,6 +275,90 @@ void * frct_tx(void * o) return (void *) 0; } +static void flow_send_keepalive(int fd) +{ + flow_write(fd, NULL, 0); +} + +static void flow_keepalive(int fd) +{ + struct timespec now; + struct timespec s_act; + struct timespec r_act; + struct flow * flow; + int flow_id; + uint32_t timeo; + struct shm_rbuff * rb; + uint32_t acl; + + flow = &ai.flows[fd]; + + pthread_rwlock_rdlock(&ai.lock); + + if (flow->flow_id < 0) { + pthread_rwlock_unlock(&ai.lock); + return; + } + + s_act = flow->snd_act; + r_act = flow->rcv_act; + + flow_id = flow->flow_id; + timeo = flow->qs.timeout; + + rb = flow->rx_rb; + + pthread_rwlock_unlock(&ai.lock); + + acl = shm_rbuff_get_acl(rb); + if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) + return; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { + shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER); + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); + return; + } + + if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) + flow_send_keepalive(fd); +} + +void * monitor(void * o) +{ + struct timespec tic = {0, TICTIME}; + + (void) o; + + while (true) { + int i; + int min; + int max; + + pthread_rwlock_rdlock(&ai.lock); + min = ai.min_fd; + max = ai.max_fd; + pthread_rwlock_unlock(&ai.lock); + + for (i = min; i <= max; ++i) + flow_keepalive(i); + + nanosleep(&tic, NULL); + } + + return (void *) 0; +} + +static void flow_clear(int fd) +{ + memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); + + ai.flows[fd].flow_id = -1; + ai.flows[fd].pid = -1; +} + static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); @@ -299,7 +377,6 @@ static void flow_fini(int fd) bmp_release(ai.fds, fd); } - if (ai.flows[fd].rx_rb != NULL) { shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN); shm_rbuff_close(ai.flows[fd].rx_rb); @@ -321,6 +398,12 @@ static void flow_fini(int fd) crypt_fini(ai.flows[fd].ctx); flow_clear(fd); + + while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd) + --ai.max_fd; + + while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd) + ++ai.min_fd; } static int flow_init(int flow_id, @@ -344,6 +427,12 @@ static int flow_init(int flow_id, goto fail_fds; } + if (fd > ai.max_fd) + ai.max_fd = fd; + + if (fd < ai.min_fd) + ai.min_fd = fd; + flow = &ai.flows[fd]; flow->rx_rb = shm_rbuff_open(getpid(), flow_id); @@ -449,6 +538,9 @@ static void init(int argc, if (ai.fds == NULL) goto fail_fds; + ai.min_fd = PROG_RES_FDS; + ai.max_fd = PROG_RES_FDS; + ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0); if (ai.fqueues == NULL) goto fail_fqueues; @@ -508,12 +600,17 @@ static void init(int argc, goto fail_rib_init; } #endif + if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0) + goto fail_monitor; + return; + fail_monitor: #if defined PROC_FLOW_STATS + rib_fini(); fail_rib_init: - timerwheel_fini(); #endif + timerwheel_fini(); fail_timerwheel: shm_flow_set_close(ai.fqset); fail_fqset: @@ -550,6 +647,9 @@ static void fini(void) if (ai.fds == NULL) return; + pthread_cancel(ai.mon); + pthread_join(ai.mon, NULL); + pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { @@ -671,7 +771,7 @@ int flow_accept(qosspec_t * qs, if (fd < 0) return fd; - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); if (qs != NULL) *qs = ai.flows[fd].qs; @@ -1058,48 +1158,6 @@ static int add_crc(struct shm_du_buff * sdb) return 0; } -static void flow_send_keepalive(int fd) -{ - flow_write(fd, NULL, 0); -} - -static int flow_keepalive(int fd) -{ - struct timespec now; - struct timespec s_act; - struct timespec r_act; - struct flow * flow; - int flow_id; - uint32_t timeo; - - flow = &ai.flows[fd]; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_rdlock(&ai.lock); - - s_act = flow->snd_act; - r_act = flow->rcv_act; - - flow_id = flow->flow_id; - timeo = flow->qs.timeout; - - pthread_rwlock_unlock(&ai.lock); - - if (timeo == 0) - return 0; - - if (ts_diff_ns(&r_act, &now) > timeo * MILLION) { - shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER); - return -EFLOWPEER; - } - - if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2) - flow_send_keepalive(fd); - - return 0; -} - static int flow_tx_sdb(struct flow * flow, struct shm_du_buff * sdb, bool block, @@ -1164,8 +1222,6 @@ ssize_t flow_write(int fd, int flags; struct timespec abs; struct timespec * abstime = NULL; - struct timespec tic = {0, TICTIME}; - struct timespec tictime; struct shm_du_buff * sdb; uint8_t * ptr; @@ -1186,9 +1242,7 @@ ssize_t flow_write(int fd, return -ENOTALLOC; } - ts_add(&tic, &abs, &tictime); - - if (ai.flows[fd].snd_timesout) { + if (flow->snd_timesout) { ts_add(&abs, &flow->snd_timeo, &abs); abstime = &abs; } @@ -1205,17 +1259,9 @@ ssize_t flow_write(int fd, return -EAGAIN; idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb); } else { - while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) { - if (ret != -ETIMEDOUT) + while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) { + if (ret < 0) return ret; - - if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0) - return -ETIMEDOUT; - - if (flow_keepalive(fd)) - return -EFLOWPEER; - - ts_add(&tictime, &tic, &tictime); } idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime); } @@ -1226,7 +1272,7 @@ ssize_t flow_write(int fd, if (count > 0) memcpy(ptr, buf, count); - ret = flow_tx_sdb(flow, sdb, flags & FLOWFWNOBLOCK, abstime); + ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime); return ret < 0 ? (ssize_t) ret : (ssize_t) count; } @@ -1259,8 +1305,6 @@ static ssize_t flow_rx_sdb(struct flow * flow, if (idx < 0) return idx; - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&ai.lock); @@ -1269,6 +1313,7 @@ static ssize_t flow_rx_sdb(struct flow * flow, pthread_rwlock_unlock(&ai.lock); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); if (invalid_pkt(flow, *sdb)) { shm_rdrbuff_remove(ai.rdrb, idx); return -EAGAIN; @@ -1287,8 +1332,6 @@ ssize_t flow_read(int fd, struct shm_du_buff * sdb; struct timespec abs; struct timespec now; - struct timespec tic = {0, TICTIME}; - struct timespec tictime; struct timespec * abstime = NULL; struct flow * flow; bool block; @@ -1317,8 +1360,6 @@ ssize_t flow_read(int fd, block = !(flow->oflags & FLOWFRNOBLOCK); partrd = !(flow->oflags & FLOWFRNOPART); - ts_add(&now, &tic, &tictime); - if (flow->rcv_timesout) { ts_add(&now, &flow->rcv_timeo, &abs); abstime = &abs; @@ -1329,19 +1370,12 @@ ssize_t flow_read(int fd, while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { pthread_rwlock_unlock(&ai.lock); - idx = flow_rx_sdb(flow, &sdb, block, &tictime); + idx = flow_rx_sdb(flow, &sdb, block, abstime); if (idx < 0) { - if (idx != -ETIMEDOUT && idx != -EAGAIN) + if (block && idx != -EAGAIN) + return idx; + if (!block) return idx; - - if (abstime != NULL - && ts_diff_ns(&tictime, &abs) <= 0) - return -ETIMEDOUT; - - if (flow_keepalive(fd) < 0) - return -EFLOWPEER; - - ts_add(&tictime, &tic, &tictime); pthread_rwlock_rdlock(&ai.lock); continue; @@ -1399,9 +1433,6 @@ ssize_t flow_read(int fd, struct flow_set * fset_create() { struct flow_set * set; - struct timespec now; - - clock_gettime(PTHREAD_COND_CLOCK, &now); set = malloc(sizeof(*set)); if (set == NULL) @@ -1418,9 +1449,6 @@ struct flow_set * fset_create() if (!bmp_is_id_valid(ai.fqueues, set->idx)) goto fail_bmp_alloc; - set->chk = now; - set->min = UINT32_MAX; - pthread_rwlock_unlock(&ai.lock); list_head_init(&set->flows); @@ -1525,9 +1553,6 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&set->lock); - if (flow->qs.timeout != 0 && flow->qs.timeout < set->min) - set->min = flow->qs.timeout; - list_add_tail(&fse->next, &set->flows); pthread_rwlock_unlock(&set->lock); @@ -1551,15 +1576,12 @@ void fset_del(struct flow_set * set, struct list_head * p; struct list_head * h; struct flow * flow; - uint32_t min; if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS) return; flow = &ai.flows[fd]; - min = UINT32_MAX; - pthread_rwlock_rdlock(&ai.lock); if (flow->flow_id >= 0) @@ -1573,14 +1595,10 @@ void fset_del(struct flow_set * set, if (e->fd == fd) { list_del(&e->next); free(e); - } else { - if (flow->qs.timeout != 0 && flow->qs.timeout < min) - min = flow->qs.timeout; + break; } } - set->min = min; - pthread_rwlock_unlock(&set->lock); pthread_rwlock_unlock(&ai.lock); @@ -1608,48 +1626,6 @@ bool fset_has(const struct flow_set * set, return ret; } -static void fset_keepalive(struct flow_set * set) -{ - struct timespec now; - struct list_head * p; - struct list_head * h; - struct list_head copy; - - clock_gettime(PTHREAD_COND_CLOCK, &now); - - pthread_rwlock_wrlock(&set->lock); - - if (ts_diff_ns(&now, &set->chk) < set->min >> 2) { - pthread_rwlock_unlock(&set->lock); - return; - } - - set->chk = now; - - list_head_init(©); - - list_for_each(p, &set->flows) { - struct flow_set_entry * c; - struct flow_set_entry * e; - e = list_entry(p, struct flow_set_entry, next); - c = malloc(sizeof(*c)); - if (c == NULL) - continue; - c->fd = e->fd; - list_add_tail(&c->next, ©); - } - - pthread_rwlock_unlock(&set->lock); - - list_for_each_safe(p, h, ©) { - struct flow_set_entry * e; - e = list_entry(p, struct flow_set_entry, next); - flow_send_keepalive(e->fd); - list_del(&e->next); - free(e); - } -} - int fqueue_next(struct fqueue * fq) { int fd; @@ -1692,8 +1668,6 @@ ssize_t fevent(struct flow_set * set, const struct timespec * timeo) { ssize_t ret = 0; - struct timespec tic = {0, TICTIME}; - struct timespec tictime; struct timespec abs; struct timespec * t = NULL; @@ -1705,27 +1679,15 @@ ssize_t fevent(struct flow_set * set, clock_gettime(PTHREAD_COND_CLOCK, &abs); - ts_add(&tic, &abs, &tictime); - t = &tictime; - - if (timeo != NULL) + if (timeo != NULL) { ts_add(&abs, timeo, &abs); + t = &abs; + } while (ret == 0) { ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) { - if (timeo != NULL && ts_diff_ns(t, &abs) < 0) { - fq->fqsize = 0; - return -ETIMEDOUT; - } - ret = 0; - ts_add(t, &tic, t); - pthread_rwlock_rdlock(&ai.lock); - timerwheel_move(); - fset_keepalive(set); - pthread_rwlock_unlock(&ai.lock); - continue; - } + if (ret == -ETIMEDOUT) + return -ETIMEDOUT; fq->fqsize = ret << 1; fq->next = 0; @@ -1924,14 +1886,8 @@ int ipcp_flow_read(int fd, pthread_rwlock_unlock(&ai.lock); idx = flow_rx_sdb(flow, sdb, false, NULL); - if (idx < 0) { - if (idx == -EAGAIN) { - pthread_rwlock_rdlock(&ai.lock); - continue; - } - + if (idx < 0) return idx; - } pthread_rwlock_rdlock(&ai.lock); |