diff options
author | Dimitri Staessens <[email protected]> | 2022-02-23 21:14:26 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2022-02-24 17:27:46 +0100 |
commit | 9719dbe335af4c6add39d739f78a68040b62d8a3 (patch) | |
tree | 071a8141e0dcb1544b441ccc985a691545588b45 /src/lib/dev.c | |
parent | 65820fa84f2b16ee1c9291135a49a75437baeb4e (diff) | |
download | ouroboros-9719dbe335af4c6add39d739f78a68040b62d8a3.tar.gz ouroboros-9719dbe335af4c6add39d739f78a68040b62d8a3.zip |
lib: Add initial flow liveness monitoring
This adds flow liveness monitoring for flows, with a fixed timeout of
120s. I will make it configurable at flow allocation later on (timeout
needs to be communicated to the peer). If one peer dies, or doesn't
call any IPC calls (flow_write/flow_read/fevent) it will stop sending
keepalives and the other peer's read/writes will error on an
-EFLOWDOWN after the timeout expires.
Packets without a payload (0 length packets) are interpreted as
keepalive packets for the flow. They can be sent from any application,
but they will not trigger a message read at the receiver side (0 as a
return value on flow_read indicates a previous partial read has
completed at exactly the buffer size).
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 182 |
1 files changed, 159 insertions, 23 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 0acc7455..4c21fcdf 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -68,6 +68,7 @@ #define SECMEMSZ 16384 #define SYMMKEYSZ 32 #define MSGBUFSZ 2048 +#define FLOWTIMEO 120 /* seconds */ enum port_state { PORT_NULL = 0, @@ -102,6 +103,9 @@ struct flow { pid_t pid; + struct timespec snd_act; + struct timespec rcv_act; + bool snd_timesout; bool rcv_timesout; struct timespec snd_timeo; @@ -119,6 +123,8 @@ struct flow_set_entry { struct flow_set { size_t idx; + struct timespec chk; /* Last keepalive check */ + struct list_head flows; pthread_rwlock_t lock; }; @@ -300,8 +306,11 @@ static int flow_init(int flow_id, qosspec_t qs, uint8_t * s) { - int fd; - int err = -ENOMEM; + struct timespec now; + int fd; + int err = -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_wrlock(&ai.lock); @@ -328,6 +337,8 @@ static int flow_init(int flow_id, ai.flows[fd].pid = pid; ai.flows[fd].part_idx = NO_PART; ai.flows[fd].qs = qs; + ai.flows[fd].snd_act = now; + ai.flows[fd].rcv_act = now; if (qs.cypher_s > 0) { assert(s != NULL); @@ -1033,6 +1044,43 @@ static int add_crc(struct shm_du_buff * sdb) return 0; } +static void flow_send_keepalive(int fd) +{ + flow_write(fd, NULL, 0); +} + +static int flow_keepalive(int fd) +{ + struct timespec now; + struct timespec s_act; + struct timespec r_act; + struct flow * flow; + int flow_id; + + flow = &ai.flows[fd]; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_rdlock(&ai.lock); + + s_act = flow->snd_act; + r_act = flow->rcv_act; + + flow_id = flow->flow_id; + + pthread_rwlock_unlock(&ai.lock); + + if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) { + shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT); + return -EFLOWDOWN; + } + + if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION) + flow_send_keepalive(fd); + + return 0; +} + ssize_t flow_write(int fd, const void * buf, size_t count) @@ -1048,17 +1096,17 @@ ssize_t flow_write(int fd, struct shm_du_buff * sdb; uint8_t * ptr; - if (buf == NULL) + if (buf == NULL && count != 0) return 0; - if (fd < 0 || fd > PROG_MAX_FLOWS) + if (fd < 0 || fd >= PROG_MAX_FLOWS) return -EBADF; flow = &ai.flows[fd]; clock_gettime(PTHREAD_COND_CLOCK, &abs); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); @@ -1091,6 +1139,9 @@ ssize_t flow_write(int fd, if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0) return -ETIMEDOUT; + if (flow_keepalive(fd)) + return -EFLOWDOWN; + frcti_tick(flow->frcti); ts_add(&tictime, &tic, &tictime); @@ -1101,11 +1152,20 @@ ssize_t flow_write(int fd, if (idx < 0) return idx; - memcpy(ptr, buf, count); + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + pthread_rwlock_wrlock(&ai.lock); + + flow->snd_act = abs; + + pthread_rwlock_unlock(&ai.lock); + + if (count > 0) + memcpy(ptr, buf, count); pthread_rwlock_rdlock(&ai.lock); - if (frcti_snd(flow->frcti, sdb) < 0) + if (count != 0 && frcti_snd(flow->frcti, sdb) < 0) goto enomem; if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0) @@ -1114,6 +1174,8 @@ ssize_t flow_write(int fd, if (flow->qs.ber == 0 && add_crc(sdb) != 0) goto enomem; + pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); + if (flags & FLOWFWNOBLOCK) ret = shm_rbuff_write(flow->tx_rb, idx); else @@ -1124,7 +1186,7 @@ ssize_t flow_write(int fd, else shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); - pthread_rwlock_unlock(&ai.lock); + pthread_cleanup_pop(true); return ret < 0 ? (ssize_t) ret : (ssize_t) count; @@ -1144,6 +1206,7 @@ ssize_t flow_read(int fd, struct shm_rbuff * rb; struct shm_du_buff * sdb; struct timespec abs; + struct timespec now; struct timespec tic = {0, TICTIME}; struct timespec tictime; struct timespec * abstime = NULL; @@ -1156,7 +1219,7 @@ ssize_t flow_read(int fd, flow = &ai.flows[fd]; - clock_gettime(PTHREAD_COND_CLOCK, &abs); + clock_gettime(PTHREAD_COND_CLOCK, &now); pthread_rwlock_rdlock(&ai.lock); @@ -1175,15 +1238,14 @@ ssize_t flow_read(int fd, noblock = flow->oflags & FLOWFRNOBLOCK; partrd = !(flow->oflags & FLOWFRNOPART); - ts_add(&tic, &abs, &tictime); + ts_add(&now, &tic, &tictime); if (ai.flows[fd].rcv_timesout) { - ts_add(&abs, &flow->rcv_timeo, &abs); + ts_add(&now, &flow->rcv_timeo, &abs); abstime = &abs; } idx = flow->part_idx; - if (idx < 0) { while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { pthread_rwlock_unlock(&ai.lock); @@ -1200,20 +1262,28 @@ ssize_t flow_read(int fd, && ts_diff_ns(&tictime, &abs) <= 0) return -ETIMEDOUT; + if (flow_keepalive(fd) < 0) + return -EFLOWDOWN; + ts_add(&tictime, &tic, &tictime); - pthread_rwlock_rdlock(&ai.lock); + + pthread_rwlock_wrlock(&ai.lock); continue; } sdb = shm_rdrbuff_get(ai.rdrb, idx); - if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { - pthread_rwlock_rdlock(&ai.lock); + + pthread_rwlock_wrlock(&ai.lock); + + flow->rcv_act = tictime; + + if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) || + shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) { shm_rdrbuff_remove(ai.rdrb, idx); + idx = -EAGAIN; continue; } - pthread_rwlock_rdlock(&ai.lock); - if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); @@ -1242,6 +1312,8 @@ ssize_t flow_read(int fd, flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; + flow->rcv_act = now; + pthread_rwlock_unlock(&ai.lock); return n; } else { @@ -1251,6 +1323,9 @@ ssize_t flow_read(int fd, shm_du_buff_head_release(sdb, n); pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; + + flow->rcv_act = now; + pthread_rwlock_unlock(&ai.lock); return count; } else { @@ -1265,6 +1340,9 @@ ssize_t flow_read(int fd, struct flow_set * fset_create() { struct flow_set * set; + struct timespec now; + + clock_gettime(PTHREAD_COND_CLOCK, &now); set = malloc(sizeof(*set)); if (set == NULL) @@ -1281,6 +1359,8 @@ struct flow_set * fset_create() if (!bmp_is_id_valid(ai.fqueues, set->idx)) goto fail_bmp_alloc; + set->chk = now; + pthread_rwlock_unlock(&ai.lock); list_head_init(&set->flows); @@ -1453,6 +1533,48 @@ bool fset_has(const struct flow_set * set, return ret; } +static void fset_keepalive(struct flow_set * set) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + struct list_head copy; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&set->lock); + + if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) { + pthread_rwlock_unlock(&set->lock); + return; + } + + set->chk = now; + + list_head_init(©); + + list_for_each(p, &set->flows) { + struct flow_set_entry * c; + struct flow_set_entry * e; + e = list_entry(p, struct flow_set_entry, next); + c = malloc(sizeof(*c)); + if (c == NULL) + continue; + c->fd = e->fd; + list_add_tail(&c->next, ©); + } + + pthread_rwlock_unlock(&set->lock); + + list_for_each_safe(p, h, ©) { + struct flow_set_entry * e; + e = list_entry(p, struct flow_set_entry, next); + flow_send_keepalive(e->fd); + list_del(&e->next); + free(e); + } +} + int fqueue_next(struct fqueue * fq) { int fd; @@ -1525,6 +1647,7 @@ ssize_t fevent(struct flow_set * set, ts_add(t, &tic, t); pthread_rwlock_rdlock(&ai.lock); timerwheel_move(); + fset_keepalive(set); pthread_rwlock_unlock(&ai.lock); continue; } @@ -1707,6 +1830,7 @@ int ipcp_flow_alloc_reply(int fd, int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { + struct timespec now; struct flow * flow; struct shm_rbuff * rb; ssize_t idx = -1; @@ -1729,11 +1853,18 @@ int ipcp_flow_read(int fd, if (idx < 0) return idx; - pthread_rwlock_rdlock(&ai.lock); + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_rwlock_wrlock(&ai.lock); *sdb = shm_rdrbuff_get(ai.rdrb, idx); - if (flow->qs.ber == 0 && chk_crc(*sdb) != 0) + if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) || + (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) { + shm_rdrbuff_remove(ai.rdrb, idx); continue; + } + + flow->rcv_act = now; frcti_rcv(flow->frcti, *sdb); } @@ -1750,16 +1881,19 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - struct flow * flow; - int ret; - ssize_t idx; + struct timespec now; + struct flow * flow; + int ret; + ssize_t idx; assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); + clock_gettime(PTHREAD_COND_CLOCK, &now); + flow = &ai.flows[fd]; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); @@ -1792,6 +1926,8 @@ int ipcp_flow_write(int fd, else shm_rdrbuff_remove(ai.rdrb, idx); + flow->snd_act = now; + pthread_rwlock_unlock(&ai.lock); assert(ret <= 0); |