summaryrefslogtreecommitdiff
path: root/src/lib/rxmwheel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r--src/lib/rxmwheel.c251
1 files changed, 251 insertions, 0 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
new file mode 100644
index 00000000..69151801
--- /dev/null
+++ b/src/lib/rxmwheel.c
@@ -0,0 +1,251 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2018
+ *
+ * Timerwheel
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/list.h>
+
+#define RXMQ_S 12 /* defines #slots */
+#define RXMQ_M 15 /* defines max delay */
+#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */
+#define RXMQ_SLOTS (1 << RXMQ_S)
+#define RXMQ_MAX (1 << RXMQ_M) /* ms */
+
+/* Small inacurracy to avoid slow division by MILLION. */
+#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
+#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+
+struct rxm {
+ struct list_head next;
+ uint32_t seqno;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ uint8_t * tail;
+ time_t t0; /* Time when original was sent (s). */
+ size_t mul; /* RTO multiplier. */
+ struct frcti * frcti;
+};
+
+struct {
+ struct list_head wheel[RXMQ_SLOTS];
+
+ size_t prv; /* Last processed slot. */
+ pthread_mutex_t lock;
+} rw;
+
+static void rxmwheel_fini(void)
+{
+ size_t i;
+ struct list_head * p;
+ struct list_head * h;
+
+ for (i = 0; i < RXMQ_SLOTS; ++i) {
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * rxm = list_entry(p, struct rxm, next);
+ list_del(&rxm->next);
+ free(rxm);
+ }
+ }
+}
+
+static int rxmwheel_init(void)
+{
+ struct timespec now;
+ size_t i;
+
+ if (pthread_mutex_init(&rw.lock, NULL))
+ return -1;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ /* Mark the previous timeslot as the last one processed. */
+ rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+
+ for (i = 0; i < RXMQ_SLOTS; ++i)
+ list_head_init(&rw.wheel[i]);
+
+ return 0;
+}
+
+static void rxmwheel_clear(int fd)
+{
+ size_t i;
+
+ /* FIXME: Add list element to avoid looping over full rxmwheel */
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = 0; i < RXMQ_SLOTS; ++i) {
+ struct list_head * p;
+ struct list_head * h;
+
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * r = list_entry(p, struct rxm, next);
+ if (r->frcti->fd == fd) {
+ list_del(&r->next);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&rw.lock);
+}
+
+/* Return fd on r-timer expiry. */
+static int rxmwheel_move(void)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ size_t slot;
+ size_t i;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ slot = ts_to_slot(now);
+
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) {
+ list_for_each_safe(p, h, &rw.wheel[i]) {
+ struct rxm * r;
+ struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
+ size_t rslot;
+ time_t newtime;
+ ssize_t idx;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ struct flow * f;
+
+ r = list_entry(p, struct rxm, next);
+ list_del(&r->next);
+
+ snd_cr = &r->frcti->snd_cr;
+ rcv_cr = &r->frcti->rcv_cr;
+ /* Has been ack'd, remove. */
+ if ((int) (r->seqno - snd_cr->lwe) <= 0) {
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ continue;
+ }
+ /* Check for r-timer expiry. */
+ if (ts_to_ms(now) - r->t0 > r->frcti->r) {
+ int fd = r->frcti->fd;
+ pthread_mutex_unlock(&rw.lock);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ return fd;
+ }
+
+ /* Copy the payload, safe rtx in other layers. */
+ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
+ /* FIXME: reschedule send? */
+ int fd = r->frcti->fd;
+ pthread_mutex_unlock(&rw.lock);
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ return fd;
+ }
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ head = shm_du_buff_head(sdb);
+ memcpy(head, r->head, r->tail - r->head);
+
+ /* Release the old copy */
+ shm_du_buff_ack(r->sdb);
+ ipcp_sdb_release(r->sdb);
+
+ /* Update ackno and make sure DRF is not set*/
+ ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe);
+ ((struct frct_pci *) head)->flags &= ~FRCT_DRF;
+
+ f = &ai.flows[r->frcti->fd];
+
+ /* Retransmit the copy. */
+ if (shm_rbuff_write(f->tx_rb, idx)) {
+ ipcp_sdb_release(sdb);
+ free(r);
+ /* FIXME: reschedule send? */
+ continue;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+
+ /* Reschedule. */
+ shm_du_buff_wait_ack(sdb);
+
+ r->head = head;
+ r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+
+ newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul);
+ rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
+
+ list_add_tail(&r->next, &rw.wheel[rslot]);
+ }
+ }
+
+ rw.prv = slot;
+
+ pthread_mutex_unlock(&rw.lock);
+
+ return 0;
+}
+
+static int rxmwheel_add(struct frcti * frcti,
+ uint32_t seqno,
+ struct shm_du_buff * sdb)
+{
+ struct timespec now;
+ struct rxm * r;
+ size_t slot;
+
+ r = malloc(sizeof(*r));
+ if (r == NULL)
+ return -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ pthread_mutex_lock(&rw.lock);
+
+ r->t0 = ts_to_ms(now);
+ r->mul = 0;
+ r->seqno = seqno;
+ r->sdb = sdb;
+ r->head = shm_du_buff_head(sdb);
+ r->tail = shm_du_buff_tail(sdb);
+ r->frcti = frcti;
+
+ slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);
+
+ list_add_tail(&r->next, &rw.wheel[slot]);
+
+ pthread_mutex_unlock(&rw.lock);
+
+ shm_du_buff_wait_ack(sdb);
+
+ return 0;
+}