diff options
Diffstat (limited to 'src/lib/rxmwheel.c')
-rw-r--r-- | src/lib/rxmwheel.c | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c new file mode 100644 index 00000000..697c6a48 --- /dev/null +++ b/src/lib/rxmwheel.c @@ -0,0 +1,252 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2018 + * + * Timerwheel + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include <ouroboros/list.h> + +#define RXMQ_S 12 /* defines #slots */ +#define RXMQ_M 15 /* defines max delay */ +#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution */ +#define RXMQ_SLOTS (1 << RXMQ_S) +#define RXMQ_MAX (1 << RXMQ_M) /* ms */ + +/* Small inacurracy to avoid slow division by MILLION. */ +#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) +#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) + +struct rxm { + struct list_head next; + uint32_t seqno; + struct shm_du_buff * sdb; + uint8_t * head; + uint8_t * tail; + time_t t0; /* Time when original was sent (s). */ + size_t mul; /* RTO multiplier. */ + struct frcti * frcti; +}; + +struct { + struct list_head wheel[RXMQ_SLOTS]; + + size_t prv; /* Last processed slot. */ + pthread_mutex_t lock; +} rw; + +static void rxmwheel_fini(void) +{ + size_t i; + struct list_head * p; + struct list_head * h; + + for (i = 0; i < RXMQ_SLOTS; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * rxm = list_entry(p, struct rxm, next); + list_del(&rxm->next); + free(rxm); + } + } +} + +static int rxmwheel_init(void) +{ + struct timespec now; + size_t i; + + if (pthread_mutex_init(&rw.lock, NULL)) + return -1; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + /* Mark the previous timeslot as the last one processed. */ + rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + + for (i = 0; i < RXMQ_SLOTS; ++i) + list_head_init(&rw.wheel[i]); + + return 0; +} + +static void rxmwheel_clear(int fd) +{ + size_t i; + + /* FIXME: Add list element to avoid looping over full rxmwheel */ + pthread_mutex_lock(&rw.lock); + + for (i = 0; i < RXMQ_SLOTS; ++i) { + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r = list_entry(p, struct rxm, next); + if (r->frcti->fd == fd) { + list_del(&r->next); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + } + } + } + + pthread_mutex_unlock(&rw.lock); +} + +/* Return fd on r-timer expiry. */ +static int rxmwheel_move(void) +{ + struct timespec now; + struct list_head * p; + struct list_head * h; + size_t slot; + size_t i; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + slot = ts_to_slot(now); + + pthread_mutex_lock(&rw.lock); + + for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) { + list_for_each_safe(p, h, &rw.wheel[i]) { + struct rxm * r; + struct frct_cr * snd_cr; + struct frct_cr * rcv_cr; + size_t rslot; + time_t newtime; + ssize_t idx; + struct shm_du_buff * sdb; + uint8_t * head; + struct flow * f; + + r = list_entry(p, struct rxm, next); + list_del(&r->next); + + snd_cr = &r->frcti->snd_cr; + rcv_cr = &r->frcti->rcv_cr; + /* Has been ack'd, remove. */ + if ((int) (r->seqno - snd_cr->lwe) <= 0) { + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + continue; + } + /* Check for r-timer expiry. */ + if (ts_to_ms(now) - r->t0 > r->frcti->r) { + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + /* Copy the payload, safe rtx in other layers. */ + if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { + /* FIXME: reschedule send? */ + int fd = r->frcti->fd; + pthread_mutex_unlock(&rw.lock); + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + free(r); + return fd; + } + + idx = shm_du_buff_get_idx(sdb); + + head = shm_du_buff_head(sdb); + memcpy(head, r->head, r->tail - r->head); + + /* Release the old copy */ + shm_du_buff_ack(r->sdb); + ipcp_sdb_release(r->sdb); + + /* Update ackno and make sure DRF is not set*/ + ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->flags &= ~FRCT_DRF; + + f = &ai.flows[r->frcti->fd]; + + /* Retransmit the copy. */ + if (shm_rbuff_write(f->tx_rb, idx)) { + ipcp_sdb_release(sdb); + free(r); + /* FIXME: reschedule send? */ + continue; + } + + shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT); + + /* Reschedule. */ + shm_du_buff_wait_ack(sdb); + + r->head = head; + r->tail = shm_du_buff_tail(sdb); + r->sdb = sdb; + + newtime = ts_to_ms(now) + (snd_cr->rto << ++r->mul); + rslot = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[rslot]); + } + } + + rw.prv = slot; + + pthread_mutex_unlock(&rw.lock); + + return 0; +} + +static int rxmwheel_add(struct frcti * frcti, + uint32_t seqno, + struct shm_du_buff * sdb) +{ + struct timespec now; + struct rxm * r; + size_t slot; + + r = malloc(sizeof(*r)); + if (r == NULL) + return -ENOMEM; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + pthread_mutex_lock(&rw.lock); + + r->t0 = ts_to_ms(now); + r->mul = 0; + r->seqno = seqno; + r->sdb = sdb; + r->head = shm_du_buff_head(sdb); + r->tail = shm_du_buff_tail(sdb); + r->frcti = frcti; + + slot = ((r->t0 + frcti->snd_cr.rto) >> RXMQ_R) + & (RXMQ_SLOTS - 1); + + list_add_tail(&r->next, &rw.wheel[slot]); + + pthread_mutex_unlock(&rw.lock); + + shm_du_buff_wait_ack(sdb); + + return 0; +} |