summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2018-06-05 13:39:40 +0200
committerSander Vrijders <[email protected]>2018-06-05 14:04:45 +0200
commit635ffc5c1c8da07f3f34218280d6a25a29c78bc7 (patch)
tree87abf5017a8367498ca676a21a941834296973ef
parent1ea6f8baa31d46088710ae02533678aa965528c2 (diff)
downloadouroboros-635ffc5c1c8da07f3f34218280d6a25a29c78bc7.tar.gz
ouroboros-635ffc5c1c8da07f3f34218280d6a25a29c78bc7.zip
lib: Simplify delta-t logic
This revises the delta-t implementation to align with Watson's timer specifications. FRCT will never deliver out-of-order packets. A raw flow (without delta-t state machine) will be able to provide such a service. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r--doc/man/fccntl.32
-rw-r--r--include/ouroboros/fccntl.h3
-rw-r--r--src/lib/frct.c109
3 files changed, 56 insertions, 58 deletions
diff --git a/doc/man/fccntl.3 b/doc/man/fccntl.3
index bec506ec..543a6bb6 100644
--- a/doc/man/fccntl.3
+++ b/doc/man/fccntl.3
@@ -83,8 +83,6 @@ argument. Supported flags are:
\fIFRCTFERRCHCK\fR - enable checksum (CRC32).
-\fIFRCTFORDERING\fR - enable packet in-order delivery.
-
\fIFRCTFPARTIAL\fR - enable partial delivery.
.RE
diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h
index 5c3fc064..bc7c7206 100644
--- a/include/ouroboros/fccntl.h
+++ b/include/ouroboros/fccntl.h
@@ -48,8 +48,7 @@
#define FRCTFRESCNTRL 00000001 /* Feedback from receiver */
#define FRCTFRTX 00000002 /* Reliable flow */
#define FRCTFERRCHCK 00000004 /* Check for errors */
-#define FRCTFORDERING 00000010 /* Ordered delivery */
-#define FRCTFPARTIAL 00000020 /* Allow partial delivery */
+#define FRCTFPARTIAL 00000010 /* Allow partial delivery */
/* Flow operations */
#define FLOWSRCVTIMEO 00000001 /* Set read timeout */
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2eb79fb4..395fa2d8 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -56,7 +56,7 @@ struct frcti {
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
- size_t rq[RQ_SIZE];
+ ssize_t rq[RQ_SIZE];
struct timespec rtt;
@@ -107,20 +107,25 @@ static void frct_fini(void)
static struct frcti * frcti_create(int fd)
{
- struct frcti * frcti;
- time_t delta_t;
- ssize_t idx;
+ struct frcti * frcti;
+ time_t delta_t;
+ ssize_t idx;
+ struct timespec now;
frcti = malloc(sizeof(*frcti));
if (frcti == NULL)
goto fail_malloc;
+ memset(frcti, 0, sizeof(*frcti));
+
if (pthread_rwlock_init(&frcti->lock, NULL))
goto fail_lock;
for (idx = 0; idx < RQ_SIZE; ++idx)
frcti->rq[idx] = -1;
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
frcti->mpl = DELT_MPL;
frcti->a = DELT_A;
frcti->r = DELT_R;
@@ -128,23 +133,12 @@ static struct frcti * frcti_create(int fd)
delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000;
- frcti->snd_cr.drf = true;
frcti->snd_cr.conf = true;
-#ifdef OUROBOROS_CONFIG_DEBUG
- frcti->snd_cr.seqno = 0;
-#else
- random_buffer(&frcti->snd_cr.seqno, sizeof(frcti->snd_cr.seqno));
-#endif
- frcti->snd_cr.lwe = 0;
- frcti->snd_cr.rwe = 0;
- frcti->snd_cr.cflags = 0;
frcti->snd_cr.inact = 3 * delta_t + 1;
+ frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1);
- frcti->rcv_cr.drf = true;
- frcti->rcv_cr.lwe = 0;
- frcti->rcv_cr.rwe = 0;
- frcti->rcv_cr.cflags = 0;
frcti->rcv_cr.inact = 2 * delta_t + 1;
+ frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1);
return frcti;
@@ -226,10 +220,8 @@ static ssize_t __frcti_queued_pdu(struct frcti * frcti)
sdb = shm_rdrbuff_get(ai.rdrb, idx);
pci = (struct frct_pci *) shm_du_buff_head(sdb) - 1;
- if (pci->flags & FRCT_CFG) {
- assert(pci->flags & FRCT_DRF);
+ if (pci->flags & FRCT_CFG)
frcti->rcv_cr.cflags = pci->cflags;
- }
++frcti->rcv_cr.lwe;
frcti->rq[pos] = -1;
@@ -286,23 +278,8 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_wrlock(&frcti->lock);
- /* Check if sender is inactive. */
- if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact)
- snd_cr->drf = true;
-
pci->flags |= FRCT_DATA;
- /* Set the DRF in the first packet of a new run of SDUs. */
- if (snd_cr->drf) {
- pci->flags |= FRCT_DRF;
- if (snd_cr->conf) {
- pci->flags |= FRCT_CFG;
- pci->cflags = snd_cr->cflags;
- }
- }
-
- pci->seqno = hton32(snd_cr->seqno++);
-
if (snd_cr->cflags & FRCTFERRCHCK) {
uint8_t * tail = shm_du_buff_tail_alloc(sdb, FRCT_CRCLEN);
if (tail == NULL) {
@@ -315,9 +292,33 @@ static int __frcti_snd(struct frcti * frcti,
pci->flags |= FRCT_CRC;
}
- snd_cr->act = now.tv_sec;
+ /* Set DRF if there are no unacknowledged packets. */
+ if (snd_cr->seqno == snd_cr->lwe)
+ pci->flags |= FRCT_DRF;
+
+ if (snd_cr->conf) {
+ /* FIXME: This packet must be acked! */
+ pci->flags |= FRCT_CFG;
+ pci->cflags = snd_cr->cflags;
+ }
+
+ /* Choose a new sequence number if sender inactivity expired. */
+ if (now.tv_sec - snd_cr->act > snd_cr->inact) {
+ /* There are no unacknowledged packets. */
+ assert(snd_cr->seqno == snd_cr->lwe);
+#ifdef OUROBOROS_CONFIG_DEBUG
+ frcti->snd_cr.seqno = 0;
+#else
+ random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
+#endif
+ frcti->snd_cr.lwe = frcti->snd_cr.seqno;
+ }
+
+ pci->seqno = hton32(snd_cr->seqno++);
+ if (!(snd_cr->cflags & FRCTFRTX))
+ snd_cr->lwe++;
- snd_cr->drf = false;
+ snd_cr->act = now.tv_sec;
snd_cr->conf = false;
pthread_rwlock_unlock(&frcti->lock);
@@ -354,13 +355,10 @@ static int __frcti_rcv(struct frcti * frcti,
goto drop_packet;
}
- /* Check if receiver inactivity is true. */
- if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact)
- rcv_cr->drf = true;
-
seqno = ntoh32(pci->seqno);
- if (rcv_cr->drf) {
+ /* Check if receiver inactivity is true. */
+ if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
/* Inactive receiver, check for DRF. */
if (pci->flags & FRCT_DRF) /* New run. */
rcv_cr->lwe = seqno;
@@ -368,24 +366,27 @@ static int __frcti_rcv(struct frcti * frcti,
goto drop_packet;
}
- /* Queue the PDU if needed. */
- if (rcv_cr->cflags & FRCTFORDERING) {
- if (seqno < rcv_cr->lwe || seqno > rcv_cr->lwe + RQ_SIZE)
+ if (seqno == rcv_cr->lwe) {
+ ++rcv_cr->lwe;
+ /* Check for online reconfiguration. */
+ if (pci->flags & FRCT_CFG)
+ rcv_cr->cflags = pci->cflags;
+ } else { /* Out of order. */
+ if (seqno < rcv_cr->lwe) /* Duplicate. */
goto drop_packet;
- if (seqno == rcv_cr->lwe) {
- ++rcv_cr->lwe;
- /* Check for online reconfiguration. */
- if (pci->flags & FRCT_CFG) {
- assert(pci->flags & FRCT_DRF);
- rcv_cr->cflags = pci->cflags;
- }
+ if (rcv_cr->cflags & FRCTFRTX) {
+ size_t pos = seqno & (RQ_SIZE - 1);
+ if (seqno > rcv_cr->lwe + RQ_SIZE || /* Out of range. */
+ frcti->rq[pos] != -1) /* Duplicate in rq. */
+ goto drop_packet;
+ /* Queue. */
+ frcti->rq[pos] = idx;
} else {
- frcti->rq[seqno & (RQ_SIZE - 1)] = idx;
+ rcv_cr->lwe = seqno;
}
}
- rcv_cr->drf = false;
rcv_cr->act = now.tv_sec;
if (!(pci->flags & FRCT_DATA))