summaryrefslogtreecommitdiff
path: root/src/lib/frct.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2020-09-20 13:04:52 +0200
committerSander Vrijders <[email protected]>2020-09-25 11:52:51 +0200
commit1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 (patch)
tree5193774eea2bb204d062cc47e178a3702d4790a4 /src/lib/frct.c
parent5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (diff)
downloadouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.tar.gz
ouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.zip
lib: Complete retransmission logic
This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/frct.c')
-rw-r--r--src/lib/frct.c244
1 files changed, 147 insertions, 97 deletions
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2bd126f4..c26910fa 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,7 +21,7 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL (60 * BILLION) /* ns */
+#define DELT_MPL (5 * BILLION) /* ns */
#define DELT_A (1 * BILLION) /* ns */
#define DELT_R (20 * BILLION) /* ns */
@@ -59,8 +59,6 @@ struct frcti {
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
- struct rxmwheel * rw;
-
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
};
@@ -86,7 +84,84 @@ struct frct_pci {
uint32_t ackno;
} __attribute__((packed));
-#include <rxmwheel.c>
+static bool before(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq2 - seq1) < 0;
+}
+
+static void __send_ack(int fd,
+ int ackno)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_ACK;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+}
+
+static void frct_send_ack(struct frcti * frcti)
+{
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ int fd;
+
+ assert(frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ ackno = frcti->rcv_cr.lwe;
+ fd = frcti->fd;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ if (diff > frcti->a || diff < DELT_ACK)
+ return;
+
+ __send_ack(fd, ackno);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
static struct frcti * frcti_create(int fd)
{
@@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd)
frcti->srtt = 0; /* Updated on first ACK */
frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
- frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
- frcti->snd_cr.cflags |= FRCTFRTX;
+ frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
- frcti->rw = rxmwheel_create();
- if (frcti->rw == NULL)
- goto fail_rw;
}
frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
@@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd)
return frcti;
- fail_rw:
- pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
fail_malloc:
@@ -151,24 +220,16 @@ static struct frcti * frcti_create(int fd)
static void frcti_destroy(struct frcti * frcti)
{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything we sent is acked.
- */
-
- if (frcti->rw != NULL)
- rxmwheel_destroy(frcti->rw);
-
pthread_rwlock_destroy(&frcti->lock);
free(frcti);
}
-static uint16_t frcti_getconf(struct frcti * frcti)
+static uint16_t frcti_getflags(struct frcti * frcti)
{
uint16_t ret;
- assert (frcti);
+ assert(frcti);
pthread_rwlock_rdlock(&frcti->lock);
@@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
+static void frcti_setflags(struct frcti * frcti,
+ uint16_t flags)
+{
+ flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */
+
+ assert(frcti);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */
+
+ frcti->snd_cr.cflags &= flags;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
#define frcti_queued_pdu(frcti) \
(frcti == NULL ? idx : __frcti_queued_pdu(frcti))
@@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti)
(frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
#define frcti_tick(frcti) \
- (frcti == NULL ? 0 : __frcti_tick(frcti))
+ (frcti == NULL ? 0 : __frcti_tick())
+#define frcti_dealloc(frcti) \
+ (frcti == NULL ? 0 : __frcti_dealloc(frcti))
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti)
return idx;
}
-static bool before(uint32_t seq1,
- uint32_t seq2)
-{
- return (int32_t)(seq1 - seq2) < 0;
-}
-
-static bool after(uint32_t seq1,
- uint32_t seq2)
-{
- return (int32_t)(seq2 - seq1) < 0;
-}
+#include <timerwheel.c>
-static void frct_send_ack(struct frcti * frcti)
+/*
+ * Send a final ACK for everything that has not been ACK'd.
+ * If the flow should be kept active for retransmission,
+ * the returned time will be negative.
+ */
+static time_t __frcti_dealloc(struct frcti * frcti)
{
- struct shm_du_buff * sdb;
- struct frct_pci * pci;
- ssize_t idx;
- struct timespec now;
- time_t diff;
- uint32_t ackno;
- struct flow * f;
+ struct timespec now;
+ time_t wait;
+ int ackno;
+ int fd = -1;
- assert(frcti);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_rdlock(&frcti->lock);
- if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
- pthread_rwlock_unlock(&frcti->lock);
- return;
- }
-
ackno = frcti->rcv_cr.lwe;
+ if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
+ fd = frcti->fd;
- pthread_rwlock_unlock(&frcti->lock);
+ wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
+ frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
+ if (frcti->snd_cr.cflags & FRCTFLINGER
+ && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
+ wait = -wait;
- diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
-
- if (diff > frcti->a)
- return;
-
- if (diff < DELT_ACK)
- return;
-
- /* Raw calls needed to bypass frcti. */
- idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
- if (idx < 0)
- return;
-
- pci = (struct frct_pci *) shm_du_buff_head(sdb);
- memset(pci, 0, sizeof(*pci));
-
- pci->flags = FRCT_ACK;
- pci->ackno = hton32(ackno);
-
- f = &ai.flows[frcti->fd];
-
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- pthread_rwlock_rdlock(&ai.lock);
- ipcp_sdb_release(sdb);
- return;
- }
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- pthread_rwlock_wrlock(&frcti->lock);
+ pthread_rwlock_unlock(&frcti->lock);
- if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
- frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+ if (fd != -1)
+ __send_ack(fd, ackno);
- pthread_rwlock_unlock(&frcti->lock);
+ return wait;
}
static int __frcti_snd(struct frcti * frcti,
@@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti * frcti,
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ bool rtx;
assert(frcti);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ timerwheel_move();
pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
@@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_wrlock(&frcti->lock);
+ rtx = snd_cr->cflags & FRCTFRTX;
+
pci->flags |= FRCT_DATA;
/* Set DRF if there are no unacknowledged packets. */
@@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti * frcti,
seqno = snd_cr->seqno;
pci->seqno = hton32(seqno);
- if (!(snd_cr->cflags & FRCTFRTX)) {
+ if (!rtx) {
snd_cr->lwe++;
} else {
if (!frcti->probe) {
@@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
- if (frcti->rw != NULL)
- rxmwheel_add(frcti->rw, frcti, seqno, sdb);
+ if (rtx)
+ timerwheel_rxm(frcti, seqno, sdb);
return 0;
}
@@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti,
if (srtt == 0) { /* first measurement */
srtt = mrtt;
rttvar = mrtt >> 1;
-
} else {
time_t delta = mrtt - srtt;
srtt += (delta >> 3);
- rttvar -= rttvar >> 2;
- rttvar += ABS(delta) >> 2;
+ rttvar += (ABS(delta) - rttvar) >> 2;
}
frcti->srtt = MAX(1000U, srtt);
@@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti,
frcti->srtt + (frcti->mdev << 1));
}
-static void __frcti_tick(struct frcti * frcti)
+static void __frcti_tick(void)
{
- if (frcti->rw != NULL) {
- rxmwheel_move(frcti->rw);
- frct_send_ack(frcti);
- }
+ timerwheel_move();
}
/* Always queues the next application packet on the RQ. */
@@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti * frcti,
struct frct_cr * rcv_cr;
uint32_t seqno;
uint32_t ackno;
+ int fd = -1;
assert(frcti);
@@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti * frcti,
if (!(pci->flags & FRCT_DATA))
goto drop_packet;
- if (before(seqno, rcv_cr->lwe))
+ if (before(seqno, rcv_cr->lwe)) {
+ rcv_cr->seqno = seqno;
goto drop_packet;
+ }
if (rcv_cr->cflags & FRCTFRTX) {
if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
@@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti * frcti,
if (frcti->rq[pos] != -1)
goto drop_packet; /* Duplicate in rq. */
+
+ fd = frcti->fd;
} else {
rcv_cr->lwe = seqno;
}
@@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
+ if (fd != -1)
+ timerwheel_ack(fd, frcti);
+
return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
+
+ frct_send_ack(frcti);
+
shm_rdrbuff_remove(ai.rdrb, idx);
return;
}
@@ -492,7 +542,7 @@ int frcti_filter(struct fqueue * fq)
struct frcti * frcti;
struct shm_rbuff * rb;
- while(fq->next < fq->fqsize) {
+ while (fq->next < fq->fqsize) {
if (fq->fqueue[fq->next + 1] != FLOW_PKT)
return 1;