summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2020-06-06 09:48:56 +0200
committerSander Vrijders <[email protected]>2020-06-06 18:29:14 +0200
commit5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (patch)
tree0f99ee5bc51c8599d4771b2a48701dd2f26d0ac5 /src/lib/dev.c
parent7a6bc98a1ea07991d8ff00a9b77be196bd9cef45 (diff)
downloadouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.tar.gz
ouroboros-5f468ee5e02a0d63ed8ad7420ee1beda87e524d6.zip
lib: Allow pure acknowledgment packets in FRCT
This adds the logic to send a pure acknowledgment packet without any data to send. This needed the event filter for the fqueue, as these non-data packets should not trigger application PKT events. The default timeout is now 10ms, until we have FRCP tuning as part of fccntl. Karn's algorithm seems to be very unstable with low (sub-ms) RTT estimates. Doubling RTO (every RTO) seems still too slow to prevent rtx storms when the measured rtt suddenly spikes several orders of magnitude. Just assuming the ACK'd packet is the last one transmitted seems to be a lot more stable. It can lead to temporary underestimation, but this is not a throughput-killer in FRCP. Changes most time units to nanoseconds for faster computation. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c71
1 files changed, 50 insertions, 21 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index efd08146..df616ead 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1017,18 +1017,22 @@ ssize_t flow_write(int fd,
memcpy(ptr, buf, count);
+ pthread_rwlock_wrlock(&ai.lock);
+
if (frcti_snd(flow->frcti, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- pthread_rwlock_wrlock(&ai.lock);
- if (flow->qs.cypher_s > 0)
+ if (flow->qs.cypher_s > 0) {
if (crypt_encrypt(flow, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
+ }
+
pthread_rwlock_unlock(&ai.lock);
if (flow->qs.ber == 0 && add_crc(sdb) != 0) {
@@ -1097,12 +1101,12 @@ ssize_t flow_read(int fd,
abstime = &abs;
}
- pthread_rwlock_unlock(&ai.lock);
-
idx = flow->part_idx;
+
if (idx < 0) {
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
+
idx = noblock ? shm_rbuff_read(rb) :
shm_rbuff_read_b(rb, abstime);
if (idx < 0)
@@ -1110,20 +1114,28 @@ ssize_t flow_read(int fd,
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
+ pthread_rwlock_rdlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
continue;
}
+ pthread_rwlock_rdlock(&ai.lock);
+
if (flow->qs.cypher_s > 0
&& crypt_decrypt(flow, sdb) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
return -ENOMEM;
}
- idx = frcti_rcv(flow->frcti, sdb);
+ frcti_rcv(flow->frcti, sdb);
}
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
assert(n >= 0);
@@ -1144,7 +1156,9 @@ ssize_t flow_read(int fd,
memcpy(buf, packet, count);
sdb = shm_rdrbuff_get(ai.rdrb, idx);
shm_du_buff_head_release(sdb, n);
+ pthread_rwlock_wrlock(&ai.lock);
flow->part_idx = idx;
+ pthread_rwlock_unlock(&ai.lock);
return count;
} else {
shm_rdrbuff_remove(ai.rdrb, idx);
@@ -1295,6 +1309,11 @@ int fqueue_next(struct fqueue * fq)
pthread_rwlock_rdlock(&ai.lock);
+ if (fq->next != 0 && frcti_filter(fq) == 0) {
+ pthread_rwlock_unlock(&ai.lock);
+ return -EPERM;
+ }
+
fd = ai.ports[fq->fqueue[fq->next]].fd;
fq->next += 2;
@@ -1319,7 +1338,7 @@ ssize_t fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
{
- ssize_t ret;
+ ssize_t ret = 0;
struct timespec abstime;
struct timespec * t = NULL;
@@ -1335,18 +1354,22 @@ ssize_t fevent(struct flow_set * set,
t = &abstime;
}
- ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
- if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
- }
+ while (ret == 0) {
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
+ if (ret == -ETIMEDOUT) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
- fq->fqsize = ret << 1;
- fq->next = 0;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
+
+ ret = frcti_filter(fq);
+ }
assert(ret);
- return ret;
+ return 1;
}
/* ipcp-dev functions. */
@@ -1509,7 +1532,7 @@ int ipcp_flow_read(int fd,
{
struct flow * flow;
struct shm_rbuff * rb;
- ssize_t idx;
+ ssize_t idx = -1;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
assert(sdb);
@@ -1522,20 +1545,26 @@ int ipcp_flow_read(int fd,
rb = flow->rx_rb;
- pthread_rwlock_unlock(&ai.lock);
+ while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
+ pthread_rwlock_unlock(&ai.lock);
- idx = frcti_queued_pdu(flow->frcti);
- while (idx < 0) {
idx = shm_rbuff_read(rb);
if (idx < 0)
return idx;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
continue;
- idx = frcti_rcv(flow->frcti, *sdb);
+ frcti_rcv(flow->frcti, *sdb);
}
+ frcti_tick(flow->frcti);
+
+ pthread_rwlock_unlock(&ai.lock);
+
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;