diff options
author | Dimitri Staessens <[email protected]> | 2020-06-06 09:48:56 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2020-06-06 18:29:14 +0200 |
commit | 5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (patch) | |
tree | 0f99ee5bc51c8599d4771b2a48701dd2f26d0ac5 /src/lib/dev.c | |
parent | 7a6bc98a1ea07991d8ff00a9b77be196bd9cef45 (diff) | |
download | ouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.tar.gz ouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.zip |
lib: Allow pure acknowledgment packets in FRCT
This adds the logic to send a pure acknowledgment packet without any
data to send. This needed the event filter for the fqueue, as these
non-data packets should not trigger application PKT events. The
default timeout is now 10ms, until we have FRCP tuning as part of
fccntl.
Karn's algorithm seems to be very unstable with low (sub-ms) RTT
estimates. Doubling RTO (every RTO) seems still too slow to prevent
rtx storms when the measured rtt suddenly spikes several orders of
magnitude. Just assuming the ACK'd packet is the last one transmitted
seems to be a lot more stable. It can lead to temporary
underestimation, but this is not a throughput-killer in FRCP.
Changes most time units to nanoseconds for faster computation.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 71 |
1 files changed, 50 insertions, 21 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index efd08146..df616ead 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd, memcpy(ptr, buf, count); + pthread_rwlock_wrlock(&ai.lock); + if (frcti_snd(flow->frcti, sdb) < 0) { + pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } - pthread_rwlock_wrlock(&ai.lock); - if (flow->qs.cypher_s > 0) + if (flow->qs.cypher_s > 0) { if (crypt_encrypt(flow, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } + } + pthread_rwlock_unlock(&ai.lock); if (flow->qs.ber == 0 && add_crc(sdb) != 0) { @@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd, abstime = &abs; } - pthread_rwlock_unlock(&ai.lock); - idx = flow->part_idx; + if (idx < 0) { - idx = frcti_queued_pdu(flow->frcti); - while (idx < 0) { + while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { + pthread_rwlock_unlock(&ai.lock); + idx = noblock ? shm_rbuff_read(rb) : shm_rbuff_read_b(rb, abstime); if (idx < 0) @@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd, sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(sdb) != 0) { + pthread_rwlock_rdlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); continue; } + pthread_rwlock_rdlock(&ai.lock); + if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0) { + pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -ENOMEM; } - idx = frcti_rcv(flow->frcti, sdb); + frcti_rcv(flow->frcti, sdb); } } + frcti_tick(flow->frcti); + + pthread_rwlock_unlock(&ai.lock); + n = shm_rdrbuff_read(&packet, ai.rdrb, idx); assert(n >= 0); @@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd, memcpy(buf, packet, count); sdb = shm_rdrbuff_get(ai.rdrb, idx); shm_du_buff_head_release(sdb, n); + pthread_rwlock_wrlock(&ai.lock); flow->part_idx = idx; + pthread_rwlock_unlock(&ai.lock); return count; } else { shm_rdrbuff_remove(ai.rdrb, idx); @@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq) pthread_rwlock_rdlock(&ai.lock); + if (fq->next != 0 && frcti_filter(fq) == 0) { + pthread_rwlock_unlock(&ai.lock); + return -EPERM; + } + fd = ai.ports[fq->fqueue[fq->next]].fd; fq->next += 2; @@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set, struct fqueue * fq, const struct timespec * timeo) { - ssize_t ret; + ssize_t ret = 0; struct timespec abstime; struct timespec * t = NULL; @@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set, t = &abstime; } - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); - if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; - } + while (ret == 0) { + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } - fq->fqsize = ret << 1; - fq->next = 0; + fq->fqsize = ret << 1; + fq->next = 0; + + ret = frcti_filter(fq); + } assert(ret); - return ret; + return 1; } /* ipcp-dev functions. */ @@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd, { struct flow * flow; struct shm_rbuff * rb; - ssize_t idx; + ssize_t idx = -1; assert(fd >= 0 && fd < SYS_MAX_FLOWS); assert(sdb); @@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd, rb = flow->rx_rb; - pthread_rwlock_unlock(&ai.lock); + while ((idx = frcti_queued_pdu(flow->frcti)) < 0) { + pthread_rwlock_unlock(&ai.lock); - idx = frcti_queued_pdu(flow->frcti); - while (idx < 0) { idx = shm_rbuff_read(rb); if (idx < 0) return idx; + + pthread_rwlock_rdlock(&ai.lock); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); if (flow->qs.ber == 0 && chk_crc(*sdb) != 0) continue; - idx = frcti_rcv(flow->frcti, *sdb); + frcti_rcv(flow->frcti, *sdb); } + frcti_tick(flow->frcti); + + pthread_rwlock_unlock(&ai.lock); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); return 0; |