summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2022-03-27 18:18:52 +0200
committerSander Vrijders <[email protected]>2022-03-30 15:05:06 +0200
commitaaa9537b332ff09dde6af852fd9a95e64dea5dda (patch)
treebbf3f2db95c2dee34374431fd23fc6b0bb84e5dd /src/lib/dev.c
parent02b3893b1ec392f1b3ca030a03267c31eb1dc290 (diff)
downloadouroboros-aaa9537b332ff09dde6af852fd9a95e64dea5dda.tar.gz
ouroboros-aaa9537b332ff09dde6af852fd9a95e64dea5dda.zip
lib: Move flow monitoring to its own thread
This adds a monitoring thread to handle flow keepalive management in the application and removes the thread interruptions to schedule FRCT calls within the regular IPC calls. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c304
1 files changed, 130 insertions, 174 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index b3e9c69e..5a57aa08 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -123,9 +123,6 @@ struct flow_set_entry {
struct flow_set {
size_t idx;
- struct timespec chk; /* Last keepalive check. */
- uint32_t min; /* Minimum keepalive time in set. */
-
struct list_head flows;
pthread_rwlock_t lock;
};
@@ -148,6 +145,11 @@ struct {
struct flow * flows;
struct port * ports;
+ pthread_t mon;
+ int min_timeo;
+ int min_fd;
+ int max_fd;
+
pthread_t tx;
size_t n_frcti;
@@ -255,14 +257,6 @@ static int proc_announce(char * prog)
return ret;
}
-static void flow_clear(int fd)
-{
- memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
-
- ai.flows[fd].flow_id = -1;
- ai.flows[fd].pid = -1;
-}
-
#include "crypt.c"
#include "frct.c"
@@ -281,6 +275,90 @@ void * frct_tx(void * o)
return (void *) 0;
}
+static void flow_send_keepalive(int fd)
+{
+ flow_write(fd, NULL, 0);
+}
+
+static void flow_keepalive(int fd)
+{
+ struct timespec now;
+ struct timespec s_act;
+ struct timespec r_act;
+ struct flow * flow;
+ int flow_id;
+ uint32_t timeo;
+ struct shm_rbuff * rb;
+ uint32_t acl;
+
+ flow = &ai.flows[fd];
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->flow_id < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return;
+ }
+
+ s_act = flow->snd_act;
+ r_act = flow->rcv_act;
+
+ flow_id = flow->flow_id;
+ timeo = flow->qs.timeout;
+
+ rb = flow->rx_rb;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ acl = shm_rbuff_get_acl(rb);
+ if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
+ return;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER);
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ return;
+ }
+
+ if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2)
+ flow_send_keepalive(fd);
+}
+
+void * monitor(void * o)
+{
+ struct timespec tic = {0, TICTIME};
+
+ (void) o;
+
+ while (true) {
+ int i;
+ int min;
+ int max;
+
+ pthread_rwlock_rdlock(&ai.lock);
+ min = ai.min_fd;
+ max = ai.max_fd;
+ pthread_rwlock_unlock(&ai.lock);
+
+ for (i = min; i <= max; ++i)
+ flow_keepalive(i);
+
+ nanosleep(&tic, NULL);
+ }
+
+ return (void *) 0;
+}
+
+static void flow_clear(int fd)
+{
+ memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
+
+ ai.flows[fd].flow_id = -1;
+ ai.flows[fd].pid = -1;
+}
+
static void flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -299,7 +377,6 @@ static void flow_fini(int fd)
bmp_release(ai.fds, fd);
}
-
if (ai.flows[fd].rx_rb != NULL) {
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -321,6 +398,12 @@ static void flow_fini(int fd)
crypt_fini(ai.flows[fd].ctx);
flow_clear(fd);
+
+ while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd)
+ --ai.max_fd;
+
+ while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd)
+ ++ai.min_fd;
}
static int flow_init(int flow_id,
@@ -344,6 +427,12 @@ static int flow_init(int flow_id,
goto fail_fds;
}
+ if (fd > ai.max_fd)
+ ai.max_fd = fd;
+
+ if (fd < ai.min_fd)
+ ai.min_fd = fd;
+
flow = &ai.flows[fd];
flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
@@ -449,6 +538,9 @@ static void init(int argc,
if (ai.fds == NULL)
goto fail_fds;
+ ai.min_fd = PROG_RES_FDS;
+ ai.max_fd = PROG_RES_FDS;
+
ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
if (ai.fqueues == NULL)
goto fail_fqueues;
@@ -508,12 +600,17 @@ static void init(int argc,
goto fail_rib_init;
}
#endif
+ if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0)
+ goto fail_monitor;
+
return;
+ fail_monitor:
#if defined PROC_FLOW_STATS
+ rib_fini();
fail_rib_init:
- timerwheel_fini();
#endif
+ timerwheel_fini();
fail_timerwheel:
shm_flow_set_close(ai.fqset);
fail_fqset:
@@ -550,6 +647,9 @@ static void fini(void)
if (ai.fds == NULL)
return;
+ pthread_cancel(ai.mon);
+ pthread_join(ai.mon, NULL);
+
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
@@ -671,7 +771,7 @@ int flow_accept(qosspec_t * qs,
if (fd < 0)
return fd;
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (qs != NULL)
*qs = ai.flows[fd].qs;
@@ -1058,48 +1158,6 @@ static int add_crc(struct shm_du_buff * sdb)
return 0;
}
-static void flow_send_keepalive(int fd)
-{
- flow_write(fd, NULL, 0);
-}
-
-static int flow_keepalive(int fd)
-{
- struct timespec now;
- struct timespec s_act;
- struct timespec r_act;
- struct flow * flow;
- int flow_id;
- uint32_t timeo;
-
- flow = &ai.flows[fd];
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- pthread_rwlock_rdlock(&ai.lock);
-
- s_act = flow->snd_act;
- r_act = flow->rcv_act;
-
- flow_id = flow->flow_id;
- timeo = flow->qs.timeout;
-
- pthread_rwlock_unlock(&ai.lock);
-
- if (timeo == 0)
- return 0;
-
- if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
- shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
- return -EFLOWPEER;
- }
-
- if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2)
- flow_send_keepalive(fd);
-
- return 0;
-}
-
static int flow_tx_sdb(struct flow * flow,
struct shm_du_buff * sdb,
bool block,
@@ -1164,8 +1222,6 @@ ssize_t flow_write(int fd,
int flags;
struct timespec abs;
struct timespec * abstime = NULL;
- struct timespec tic = {0, TICTIME};
- struct timespec tictime;
struct shm_du_buff * sdb;
uint8_t * ptr;
@@ -1186,9 +1242,7 @@ ssize_t flow_write(int fd,
return -ENOTALLOC;
}
- ts_add(&tic, &abs, &tictime);
-
- if (ai.flows[fd].snd_timesout) {
+ if (flow->snd_timesout) {
ts_add(&abs, &flow->snd_timeo, &abs);
abstime = &abs;
}
@@ -1205,17 +1259,9 @@ ssize_t flow_write(int fd,
return -EAGAIN;
idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
} else {
- while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
- if (ret != -ETIMEDOUT)
+ while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) {
+ if (ret < 0)
return ret;
-
- if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
- return -ETIMEDOUT;
-
- if (flow_keepalive(fd))
- return -EFLOWPEER;
-
- ts_add(&tictime, &tic, &tictime);
}
idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
}
@@ -1226,7 +1272,7 @@ ssize_t flow_write(int fd,
if (count > 0)
memcpy(ptr, buf, count);
- ret = flow_tx_sdb(flow, sdb, flags & FLOWFWNOBLOCK, abstime);
+ ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime);
return ret < 0 ? (ssize_t) ret : (ssize_t) count;
}
@@ -1259,8 +1305,6 @@ static ssize_t flow_rx_sdb(struct flow * flow,
if (idx < 0)
return idx;
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&ai.lock);
@@ -1269,6 +1313,7 @@ static ssize_t flow_rx_sdb(struct flow * flow,
pthread_rwlock_unlock(&ai.lock);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (invalid_pkt(flow, *sdb)) {
shm_rdrbuff_remove(ai.rdrb, idx);
return -EAGAIN;
@@ -1287,8 +1332,6 @@ ssize_t flow_read(int fd,
struct shm_du_buff * sdb;
struct timespec abs;
struct timespec now;
- struct timespec tic = {0, TICTIME};
- struct timespec tictime;
struct timespec * abstime = NULL;
struct flow * flow;
bool block;
@@ -1317,8 +1360,6 @@ ssize_t flow_read(int fd,
block = !(flow->oflags & FLOWFRNOBLOCK);
partrd = !(flow->oflags & FLOWFRNOPART);
- ts_add(&now, &tic, &tictime);
-
if (flow->rcv_timesout) {
ts_add(&now, &flow->rcv_timeo, &abs);
abstime = &abs;
@@ -1329,19 +1370,12 @@ ssize_t flow_read(int fd,
while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
pthread_rwlock_unlock(&ai.lock);
- idx = flow_rx_sdb(flow, &sdb, block, &tictime);
+ idx = flow_rx_sdb(flow, &sdb, block, abstime);
if (idx < 0) {
- if (idx != -ETIMEDOUT && idx != -EAGAIN)
+ if (block && idx != -EAGAIN)
+ return idx;
+ if (!block)
return idx;
-
- if (abstime != NULL
- && ts_diff_ns(&tictime, &abs) <= 0)
- return -ETIMEDOUT;
-
- if (flow_keepalive(fd) < 0)
- return -EFLOWPEER;
-
- ts_add(&tictime, &tic, &tictime);
pthread_rwlock_rdlock(&ai.lock);
continue;
@@ -1399,9 +1433,6 @@ ssize_t flow_read(int fd,
struct flow_set * fset_create()
{
struct flow_set * set;
- struct timespec now;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
set = malloc(sizeof(*set));
if (set == NULL)
@@ -1418,9 +1449,6 @@ struct flow_set * fset_create()
if (!bmp_is_id_valid(ai.fqueues, set->idx))
goto fail_bmp_alloc;
- set->chk = now;
- set->min = UINT32_MAX;
-
pthread_rwlock_unlock(&ai.lock);
list_head_init(&set->flows);
@@ -1525,9 +1553,6 @@ int fset_add(struct flow_set * set,
pthread_rwlock_wrlock(&set->lock);
- if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
- set->min = flow->qs.timeout;
-
list_add_tail(&fse->next, &set->flows);
pthread_rwlock_unlock(&set->lock);
@@ -1551,15 +1576,12 @@ void fset_del(struct flow_set * set,
struct list_head * p;
struct list_head * h;
struct flow * flow;
- uint32_t min;
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
flow = &ai.flows[fd];
- min = UINT32_MAX;
-
pthread_rwlock_rdlock(&ai.lock);
if (flow->flow_id >= 0)
@@ -1573,14 +1595,10 @@ void fset_del(struct flow_set * set,
if (e->fd == fd) {
list_del(&e->next);
free(e);
- } else {
- if (flow->qs.timeout != 0 && flow->qs.timeout < min)
- min = flow->qs.timeout;
+ break;
}
}
- set->min = min;
-
pthread_rwlock_unlock(&set->lock);
pthread_rwlock_unlock(&ai.lock);
@@ -1608,48 +1626,6 @@ bool fset_has(const struct flow_set * set,
return ret;
}
-static void fset_keepalive(struct flow_set * set)
-{
- struct timespec now;
- struct list_head * p;
- struct list_head * h;
- struct list_head copy;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- pthread_rwlock_wrlock(&set->lock);
-
- if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
- pthread_rwlock_unlock(&set->lock);
- return;
- }
-
- set->chk = now;
-
- list_head_init(&copy);
-
- list_for_each(p, &set->flows) {
- struct flow_set_entry * c;
- struct flow_set_entry * e;
- e = list_entry(p, struct flow_set_entry, next);
- c = malloc(sizeof(*c));
- if (c == NULL)
- continue;
- c->fd = e->fd;
- list_add_tail(&c->next, &copy);
- }
-
- pthread_rwlock_unlock(&set->lock);
-
- list_for_each_safe(p, h, &copy) {
- struct flow_set_entry * e;
- e = list_entry(p, struct flow_set_entry, next);
- flow_send_keepalive(e->fd);
- list_del(&e->next);
- free(e);
- }
-}
-
int fqueue_next(struct fqueue * fq)
{
int fd;
@@ -1692,8 +1668,6 @@ ssize_t fevent(struct flow_set * set,
const struct timespec * timeo)
{
ssize_t ret = 0;
- struct timespec tic = {0, TICTIME};
- struct timespec tictime;
struct timespec abs;
struct timespec * t = NULL;
@@ -1705,27 +1679,15 @@ ssize_t fevent(struct flow_set * set,
clock_gettime(PTHREAD_COND_CLOCK, &abs);
- ts_add(&tic, &abs, &tictime);
- t = &tictime;
-
- if (timeo != NULL)
+ if (timeo != NULL) {
ts_add(&abs, timeo, &abs);
+ t = &abs;
+ }
while (ret == 0) {
ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- if (timeo != NULL && ts_diff_ns(t, &abs) < 0) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
- ret = 0;
- ts_add(t, &tic, t);
- pthread_rwlock_rdlock(&ai.lock);
- timerwheel_move();
- fset_keepalive(set);
- pthread_rwlock_unlock(&ai.lock);
- continue;
- }
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
fq->fqsize = ret << 1;
fq->next = 0;
@@ -1924,14 +1886,8 @@ int ipcp_flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
idx = flow_rx_sdb(flow, sdb, false, NULL);
- if (idx < 0) {
- if (idx == -EAGAIN) {
- pthread_rwlock_rdlock(&ai.lock);
- continue;
- }
-
+ if (idx < 0)
return idx;
- }
pthread_rwlock_rdlock(&ai.lock);