summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-09-14 13:43:09 +0200
committerSander Vrijders <[email protected]>2017-09-15 11:30:40 +0200
commit26d4a6072cbf59708071dac8393c88ddacd69a37 (patch)
tree40d3dd4c9e955528e770325f5e59b84c61316d2c /src/lib
parent07a2d315205fccf42e891e2bb46db44401b29153 (diff)
downloadouroboros-26d4a6072cbf59708071dac8393c88ddacd69a37.tar.gz
ouroboros-26d4a6072cbf59708071dac8393c88ddacd69a37.zip
lib: Add reordering queue to FRCT
This adds a reordering queue to FRCT so that SDUs can be delivered in-order when requested.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/dev.c70
-rw-r--r--src/lib/rq.c157
-rw-r--r--src/lib/tests/CMakeLists.txt1
-rw-r--r--src/lib/tests/rq_test.c115
5 files changed, 331 insertions, 13 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 5a09c52e..fd7ece83 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -163,6 +163,7 @@ set(SOURCE_FILES
qos.c
qoscube.c
random.c
+ rq.c
sha3.c
shm_flow_set.c
shm_rbuff.c
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 47fec48d..14ee31f4 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -39,6 +39,7 @@
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
#include <ouroboros/frct_pci.h>
+#include <ouroboros/rq.h>
#include <stdlib.h>
#include <string.h>
@@ -50,7 +51,8 @@
#define TW_ELEMENTS 6000
#define TW_RESOLUTION 1 /* ms */
-#define MPL 2000 /* ms */
+#define MPL 2000 /* ms */
+#define RQ_SIZE 20
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
@@ -89,6 +91,8 @@ struct frcti {
uint16_t conf_flags;
+ struct rq * rq;
+
pthread_rwlock_t lock;
};
@@ -269,6 +273,12 @@ static int frcti_init(int fd)
frcti->rcv_lwe = 0;
frcti->rcv_rwe = 0;
+ frcti->conf_flags = 0;
+
+ frcti->rq = rq_create(RQ_SIZE);
+ if (frcti->rq == NULL)
+ return -1;
+
return 0;
}
@@ -285,6 +295,8 @@ static void frcti_fini(int fd)
*/
frcti_clear(fd);
+
+ rq_destroy(ai.frcti[fd].rq);
}
static int frcti_send(int fd,
@@ -382,6 +394,25 @@ static ssize_t frcti_read(int fd)
struct frcti * frcti;
struct frct_pci pci;
struct shm_du_buff * sdb;
+ uint64_t seqno;
+ bool nxt_pdu = true;
+
+ frcti = &(ai.frcti[fd]);
+
+ /* See if we already have the next PDU */
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (!rq_is_empty(frcti->rq)) {
+ seqno = rq_peek(frcti->rq);
+ if (seqno == frcti->rcv_lwe) {
+ frcti->rcv_lwe++;
+ idx = rq_pop(frcti->rq);
+ pthread_rwlock_unlock(&frcti->lock);
+ return idx;
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
do {
struct timespec now;
@@ -390,7 +421,7 @@ static ssize_t frcti_read(int fd)
struct shm_rbuff * rb;
bool noblock;
- clock_gettime(PTHREAD_COND_CLOCK, &now);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
pthread_rwlock_rdlock(&ai.lock);
@@ -404,18 +435,16 @@ static ssize_t frcti_read(int fd)
pthread_rwlock_unlock(&ai.lock);
- if (noblock)
+ if (noblock) {
idx = shm_rbuff_read(rb);
- else
+ } else {
idx = shm_rbuff_read_b(rb, abstime);
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+ }
if (idx < 0)
return idx;
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- frcti = &(ai.frcti[fd]);
-
sdb = shm_rdrbuff_get(ai.rdrb, idx);
pthread_rwlock_wrlock(&frcti->lock);
@@ -432,10 +461,11 @@ static ssize_t frcti_read(int fd)
ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
frcti->rcv_drf = true;
- /* We don't accept packets when there is receiver inactivity. */
+ /* When there is receiver inactivity queue the packet. */
if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
+ if (rq_push(frcti->rq, pci.seqno, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
return -EAGAIN;
}
@@ -451,12 +481,26 @@ static ssize_t frcti_read(int fd)
frcti->last_rcv = now;
- pthread_rwlock_unlock(&frcti->lock);
+ nxt_pdu = true;
- if (!(pci.type & PDU_TYPE_DATA))
+ if (!(pci.type & PDU_TYPE_DATA)) {
shm_rdrbuff_remove(ai.rdrb, idx);
+ nxt_pdu = false;
+ }
+
+ if (frcti->conf_flags & FRCTFORDERING) {
+ if (pci.seqno != frcti->rcv_lwe) {
+ if (rq_push(frcti->rq, pci.seqno, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ nxt_pdu = false;
+ } else {
+ frcti->rcv_lwe++;
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
- } while (!(pci.type & PDU_TYPE_DATA));
+ } while (!nxt_pdu);
return idx;
}
diff --git a/src/lib/rq.c b/src/lib/rq.c
new file mode 100644
index 00000000..bd0594b5
--- /dev/null
+++ b/src/lib/rq.c
@@ -0,0 +1,157 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Reordering queue
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/rq.h>
+
+#include <assert.h>
+
+struct pdu {
+ uint64_t seqno;
+ size_t idx;
+};
+
+struct rq {
+ struct pdu * items;
+ int n_items;
+ int size;
+};
+
+struct rq * rq_create(int size)
+{
+ struct rq * rq;
+
+ rq = malloc(sizeof(*rq));
+ if (rq == NULL)
+ return NULL;
+
+ rq->items = malloc(sizeof(struct pdu) * (size + 1));
+ if (rq->items == NULL) {
+ free(rq);
+ return NULL;
+ }
+
+ rq->size = size;
+ rq->n_items = 0;
+
+ return rq;
+}
+
+void rq_destroy(struct rq * rq)
+{
+ assert(rq);
+
+ free(rq->items);
+ free(rq);
+}
+
+int rq_push(struct rq * rq,
+ uint64_t seqno,
+ size_t idx)
+{
+ int i;
+ int j;
+
+ assert(rq);
+
+ /* Queue is full. */
+ if (rq->n_items == rq->size)
+ return -1;
+
+ i = ++rq->n_items;
+ j = i / 2;
+ while (i > 1 && rq->items[j].seqno > seqno) {
+ rq->items[i] = rq->items[j];
+ i = j;
+ j = j / 2;
+ }
+
+ rq->items[i].seqno = seqno;
+ rq->items[i].idx = idx;
+
+ return 0;
+}
+
+uint64_t rq_peek(struct rq * rq)
+{
+ assert(rq);
+
+ return rq->items[1].seqno;
+}
+
+bool rq_is_empty(struct rq * rq)
+{
+ assert(rq);
+
+ return (rq->n_items == 0);
+}
+
+size_t rq_pop(struct rq * rq)
+{
+ size_t idx;
+ int i;
+ int j;
+ int k;
+
+ assert(rq);
+
+ idx = rq->items[1].idx;
+
+ rq->items[1] = rq->items[rq->n_items];
+ rq->n_items--;
+
+ i = 1;
+ while (true) {
+ k = i;
+ j = 2 * i;
+
+ if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno)
+ k = j;
+
+ if (j + 1 <= rq->n_items &&
+ rq->items[j + 1].seqno < rq->items[k].seqno)
+ k = j + 1;
+
+ if (k == i)
+ break;
+
+ rq->items[i] = rq->items[k];
+ i = k;
+ }
+
+ rq->items[i] = rq->items[rq->n_items + 1];
+
+ return idx;
+}
+
+bool rq_has(struct rq * rq,
+ uint64_t seqno)
+{
+ int i;
+
+ assert(rq);
+
+ for (i = 1; i <= rq->n_items; i++)
+ if (rq->items[i].seqno == seqno)
+ return true;
+
+ return false;
+}
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index a93bf321..0edd4a42 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -14,6 +14,7 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
crc32_test.c
hashtable_test.c
md5_test.c
+ rq_test.c
sha3_test.c
time_utils_test.c
${TIMERWHEEL_TEST}
diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c
new file mode 100644
index 00000000..e2d0f435
--- /dev/null
+++ b/src/lib/tests/rq_test.c
@@ -0,0 +1,115 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Reordering queue test
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/rq.h>
+
+#include <stdio.h>
+
+#define Q_SIZE 5
+
+int rq_test(int argc,
+ char ** argv)
+{
+ struct rq * q;
+ int i;
+
+ (void) argc;
+ (void) argv;
+
+ q = rq_create(Q_SIZE);
+ if (q == NULL) {
+ printf("Failed to create.\n");
+ return -1;
+ }
+
+ if (rq_push(q, 1, 1)) {
+ printf("Failed to insert.\n");
+ return -1;
+ }
+
+ if (!rq_has(q, 1)) {
+ printf("Inserted item not present.\n");
+ return -1;
+ }
+
+ if (rq_peek(q) != 1) {
+ printf("Inserted item not present.\n");
+ return -1;
+ }
+
+ if (rq_pop(q) != 1) {
+ printf("Bad pop.\n");
+ return -1;
+ }
+
+ if (rq_push(q, 3, 5)) {
+ printf("Failed to insert.\n");
+ return -1;
+ }
+
+ if (rq_push(q, 1, 3)) {
+ printf("Failed to insert.\n");
+ return -1;
+ }
+
+ if (rq_push(q, 2, 7)) {
+ printf("Failed to insert.\n");
+ return -1;
+ }
+
+ if (!rq_has(q, 3)) {
+ printf("Inserted item not present.\n");
+ return -1;
+ }
+
+ if (rq_has(q, 4)) {
+ printf("Item present that was not inserted.\n");
+ return -1;
+ }
+
+ if (rq_peek(q) != 1) {
+ printf("Inserted item not present.\n");
+ return -1;
+ }
+
+ if (rq_pop(q) != 3) {
+ printf("Bad pop.\n");
+ return -1;
+ }
+
+ if (rq_peek(q) != 2) {
+ printf("Inserted item not present.\n");
+ return -1;
+ }
+
+ if (rq_pop(q) != 7) {
+ printf("Bad pop.\n");
+ return -1;
+ }
+
+ for (i = 0; i < Q_SIZE + 1; i++)
+ rq_push(q, i, i);
+
+ rq_destroy(q);
+
+ return 0;
+}