summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2022-02-23 21:14:26 +0100
committerSander Vrijders <[email protected]>2022-02-24 17:27:46 +0100
commit9719dbe335af4c6add39d739f78a68040b62d8a3 (patch)
tree071a8141e0dcb1544b441ccc985a691545588b45 /src/lib/dev.c
parent65820fa84f2b16ee1c9291135a49a75437baeb4e (diff)
downloadouroboros-9719dbe335af4c6add39d739f78a68040b62d8a3.tar.gz
ouroboros-9719dbe335af4c6add39d739f78a68040b62d8a3.zip
lib: Add initial flow liveness monitoring
This adds flow liveness monitoring for flows, with a fixed timeout of 120s. I will make it configurable at flow allocation later on (timeout needs to be communicated to the peer). If one peer dies, or doesn't call any IPC calls (flow_write/flow_read/fevent) it will stop sending keepalives and the other peer's read/writes will error on an -EFLOWDOWN after the timeout expires. Packets without a payload (0 length packets) are interpreted as keepalive packets for the flow. They can be sent from any application, but they will not trigger a message read at the receiver side (0 as a return value on flow_read indicates a previous partial read has completed at exactly the buffer size). Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c182
1 files changed, 159 insertions, 23 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 0acc7455..4c21fcdf 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,6 +68,7 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
+#define FLOWTIMEO 120 /* seconds */
enum port_state {
PORT_NULL = 0,
@@ -102,6 +103,9 @@ struct flow {
pid_t pid;
+ struct timespec snd_act;
+ struct timespec rcv_act;
+
bool snd_timesout;
bool rcv_timesout;
struct timespec snd_timeo;
@@ -119,6 +123,8 @@ struct flow_set_entry {
struct flow_set {
size_t idx;
+ struct timespec chk; /* Last keepalive check */
+
struct list_head flows;
pthread_rwlock_t lock;
};
@@ -300,8 +306,11 @@ static int flow_init(int flow_id,
qosspec_t qs,
uint8_t * s)
{
- int fd;
- int err = -ENOMEM;
+ struct timespec now;
+ int fd;
+ int err = -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_wrlock(&ai.lock);
@@ -328,6 +337,8 @@ static int flow_init(int flow_id,
ai.flows[fd].pid = pid;
ai.flows[fd].part_idx = NO_PART;
ai.flows[fd].qs = qs;
+ ai.flows[fd].snd_act = now;
+ ai.flows[fd].rcv_act = now;
if (qs.cypher_s > 0) {
assert(s != NULL);
@@ -1033,6 +1044,43 @@ static int add_crc(struct shm_du_buff * sdb)
return 0;
}
+static void flow_send_keepalive(int fd)
+{
+ flow_write(fd, NULL, 0);
+}
+
+static int flow_keepalive(int fd)
+{
+ struct timespec now;
+ struct timespec s_act;
+ struct timespec r_act;
+ struct flow * flow;
+ int flow_id;
+
+ flow = &ai.flows[fd];
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ s_act = flow->snd_act;
+ r_act = flow->rcv_act;
+
+ flow_id = flow->flow_id;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
+ return -EFLOWDOWN;
+ }
+
+ if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+ flow_send_keepalive(fd);
+
+ return 0;
+}
+
ssize_t flow_write(int fd,
const void * buf,
size_t count)
@@ -1048,17 +1096,17 @@ ssize_t flow_write(int fd,
struct shm_du_buff * sdb;
uint8_t * ptr;
- if (buf == NULL)
+ if (buf == NULL && count != 0)
return 0;
- if (fd < 0 || fd > PROG_MAX_FLOWS)
+ if (fd < 0 || fd >= PROG_MAX_FLOWS)
return -EBADF;
flow = &ai.flows[fd];
clock_gettime(PTHREAD_COND_CLOCK, &abs);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -1091,6 +1139,9 @@ ssize_t flow_write(int fd,
if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
return -ETIMEDOUT;
+ if (flow_keepalive(fd))
+ return -EFLOWDOWN;
+
frcti_tick(flow->frcti);
ts_add(&tictime, &tic, &tictime);
@@ -1101,11 +1152,20 @@ ssize_t flow_write(int fd,
if (idx < 0)
return idx;
- memcpy(ptr, buf, count);
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->snd_act = abs;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ if (count > 0)
+ memcpy(ptr, buf, count);
pthread_rwlock_rdlock(&ai.lock);
- if (frcti_snd(flow->frcti, sdb) < 0)
+ if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
goto enomem;
if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
@@ -1114,6 +1174,8 @@ ssize_t flow_write(int fd,
if (flow->qs.ber == 0 && add_crc(sdb) != 0)
goto enomem;
+ pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
+
if (flags & FLOWFWNOBLOCK)
ret = shm_rbuff_write(flow->tx_rb, idx);
else
@@ -1124,7 +1186,7 @@ ssize_t flow_write(int fd,
else
shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
- pthread_rwlock_unlock(&ai.lock);
+ pthread_cleanup_pop(true);
return ret < 0 ? (ssize_t) ret : (ssize_t) count;
@@ -1144,6 +1206,7 @@ ssize_t flow_read(int fd,
struct shm_rbuff * rb;
struct shm_du_buff * sdb;
struct timespec abs;
+ struct timespec now;
struct timespec tic = {0, TICTIME};
struct timespec tictime;
struct timespec * abstime = NULL;
@@ -1156,7 +1219,7 @@ ssize_t flow_read(int fd,
flow = &ai.flows[fd];
- clock_gettime(PTHREAD_COND_CLOCK, &abs);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_rdlock(&ai.lock);
@@ -1175,15 +1238,14 @@ ssize_t flow_read(int fd,
noblock = flow->oflags & FLOWFRNOBLOCK;
partrd = !(flow->oflags & FLOWFRNOPART);
- ts_add(&tic, &abs, &tictime);
+ ts_add(&now, &tic, &tictime);
if (ai.flows[fd].rcv_timesout) {
- ts_add(&abs, &flow->rcv_timeo, &abs);
+ ts_add(&now, &flow->rcv_timeo, &abs);
abstime = &abs;
}
idx = flow->part_idx;
-
if (idx < 0) {
while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -1200,20 +1262,28 @@ ssize_t flow_read(int fd,
&& ts_diff_ns(&tictime, &abs) <= 0)
return -ETIMEDOUT;
+ if (flow_keepalive(fd) < 0)
+ return -EFLOWDOWN;
+
ts_add(&tictime, &tic, &tictime);
- pthread_rwlock_rdlock(&ai.lock);
+
+ pthread_rwlock_wrlock(&ai.lock);
continue;
}
sdb = shm_rdrbuff_get(ai.rdrb, idx);
- if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
- pthread_rwlock_rdlock(&ai.lock);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
+ flow->rcv_act = tictime;
+
+ if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) ||
+ shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) {
shm_rdrbuff_remove(ai.rdrb, idx);
+ idx = -EAGAIN;
continue;
}
- pthread_rwlock_rdlock(&ai.lock);
-
if (flow->qs.cypher_s > 0
&& crypt_decrypt(flow, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -1242,6 +1312,8 @@ ssize_t flow_read(int fd,
flow->part_idx = (partrd && n == (ssize_t) count) ?
DONE_PART : NO_PART;
+ flow->rcv_act = now;
+
pthread_rwlock_unlock(&ai.lock);
return n;
} else {
@@ -1251,6 +1323,9 @@ ssize_t flow_read(int fd,
shm_du_buff_head_release(sdb, n);
pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+
+ flow->rcv_act = now;
+
pthread_rwlock_unlock(&ai.lock);
return count;
} else {
@@ -1265,6 +1340,9 @@ ssize_t flow_read(int fd,
struct flow_set * fset_create()
{
struct flow_set * set;
+ struct timespec now;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
set = malloc(sizeof(*set));
if (set == NULL)
@@ -1281,6 +1359,8 @@ struct flow_set * fset_create()
if (!bmp_is_id_valid(ai.fqueues, set->idx))
goto fail_bmp_alloc;
+ set->chk = now;
+
pthread_rwlock_unlock(&ai.lock);
list_head_init(&set->flows);
@@ -1453,6 +1533,48 @@ bool fset_has(const struct flow_set * set,
return ret;
}
+static void fset_keepalive(struct flow_set * set)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ struct list_head copy;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&set->lock);
+
+ if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+ pthread_rwlock_unlock(&set->lock);
+ return;
+ }
+
+ set->chk = now;
+
+ list_head_init(&copy);
+
+ list_for_each(p, &set->flows) {
+ struct flow_set_entry * c;
+ struct flow_set_entry * e;
+ e = list_entry(p, struct flow_set_entry, next);
+ c = malloc(sizeof(*c));
+ if (c == NULL)
+ continue;
+ c->fd = e->fd;
+ list_add_tail(&c->next, &copy);
+ }
+
+ pthread_rwlock_unlock(&set->lock);
+
+ list_for_each_safe(p, h, &copy) {
+ struct flow_set_entry * e;
+ e = list_entry(p, struct flow_set_entry, next);
+ flow_send_keepalive(e->fd);
+ list_del(&e->next);
+ free(e);
+ }
+}
+
int fqueue_next(struct fqueue * fq)
{
int fd;
@@ -1525,6 +1647,7 @@ ssize_t fevent(struct flow_set * set,
ts_add(t, &tic, t);
pthread_rwlock_rdlock(&ai.lock);
timerwheel_move();
+ fset_keepalive(set);
pthread_rwlock_unlock(&ai.lock);
continue;
}
@@ -1707,6 +1830,7 @@ int ipcp_flow_alloc_reply(int fd,
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb)
{
+ struct timespec now;
struct flow * flow;
struct shm_rbuff * rb;
ssize_t idx = -1;
@@ -1729,11 +1853,18 @@ int ipcp_flow_read(int fd,
if (idx < 0)
return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_rwlock_wrlock(&ai.lock);
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
- if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
+ if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) ||
+ (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
continue;
+ }
+
+ flow->rcv_act = now;
frcti_rcv(flow->frcti, *sdb);
}
@@ -1750,16 +1881,19 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- struct flow * flow;
- int ret;
- ssize_t idx;
+ struct timespec now;
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
flow = &ai.flows[fd];
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (flow->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -1792,6 +1926,8 @@ int ipcp_flow_write(int fd,
else
shm_rdrbuff_remove(ai.rdrb, idx);
+ flow->snd_act = now;
+
pthread_rwlock_unlock(&ai.lock);
assert(ret <= 0);