/*
 * Ouroboros - Copyright (C) 2016 - 2023
 *
 * Timerwheel
 *
 *    Dimitri Staessens <dimitri@ouroboros.rocks>
 *    Sander Vrijders   <sander@ouroboros.rocks>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., http://www.fsf.org/about/contact/.
 */

#include <ouroboros/list.h>

/* Overflow limits range to about 6 hours. */
#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES)

struct rxm {
        struct list_head     next;
        uint32_t             seqno;
#ifndef RXM_BUFFER_ON_HEAP
        struct shm_du_buff * sdb;
#endif
        struct frct_pci *    pkt;
        size_t               len;
        time_t               t0;      /* Time when original was sent (us). */
        struct frcti *       frcti;
        int                  fd;
        int                  flow_id; /* Prevent rtx when fd reused.       */
};

struct ack {
        struct list_head next;
        struct frcti *   frcti;
        int              fd;
        int              flow_id;
};

struct {
        /*
         * At a 1 ms min resolution, every level bumps the
         * resolution by a factor of 16.
         */
        struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS];

        struct list_head acks[ACKQ_SLOTS];
        bool             map[ACKQ_SLOTS][PROG_MAX_FLOWS];

        size_t           prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
        size_t           prv_ack;            /* Last processed ack slot.  */
        pthread_mutex_t  lock;
} rw;

static void timerwheel_fini(void)
{
        size_t             i;
        size_t             j;
        struct list_head * p;
        struct list_head * h;

        pthread_mutex_lock(&rw.lock);

        for (i = 0; i < RXMQ_LVLS; ++i) {
                for (j = 0; j < RXMQ_SLOTS; j++) {
                        list_for_each_safe(p, h, &rw.rxms[i][j]) {
                                struct rxm * rxm;
                                rxm = list_entry(p, struct rxm, next);
                                list_del(&rxm->next);
#ifdef RXM_BUFFER_ON_HEAP
                                free(rxm->pkt);
#else
                                shm_du_buff_ack(rxm->sdb);
                                ipcp_sdb_release(rxm->sdb);
#endif
                                free(rxm);
                        }
                }
        }

        for (i = 0; i < ACKQ_SLOTS; ++i) {
                list_for_each_safe(p, h, &rw.acks[i]) {
                        struct ack * a = list_entry(p, struct ack, next);
                        list_del(&a->next);
                        free(a);
                }
        }

        pthread_mutex_unlock(&rw.lock);

        pthread_mutex_destroy(&rw.lock);
}

static int timerwheel_init(void)
{
        struct timespec   now;
        size_t            i;
        size_t            j;

        if (pthread_mutex_init(&rw.lock, NULL))
                return -1;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        for (i = 0; i < RXMQ_LVLS; ++i) {
                rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
                rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
                rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
                for (j = 0; j < RXMQ_SLOTS; ++j)
                        list_head_init(&rw.rxms[i][j]);
        }

        rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1);
        for (i = 0; i < ACKQ_SLOTS; ++i)
                list_head_init(&rw.acks[i]);

        return 0;
}

static void timerwheel_move(void)
{
        struct timespec    now;
        struct list_head * p;
        struct list_head * h;
        size_t             rxm_slot;
        size_t             ack_slot;
        size_t             i;
        size_t             j;

        pthread_mutex_lock(&rw.lock);

        pthread_cleanup_push(__cleanup_mutex_unlock, &rw.lock);

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        rxm_slot = ts_to_rxm_slot(now);

        for (i = 0; i < RXMQ_LVLS; ++i) {
                size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
                j = rw.prv_rxm[i];
                if (j_max_slot < j)
                        j_max_slot += RXMQ_SLOTS;
                while (j++ < j_max_slot) {
                        list_for_each_safe(p, h,
                                           &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
                                struct rxm *         r;
                                struct frct_cr *     snd_cr;
                                struct frct_cr *     rcv_cr;
                                size_t               slot;
                                size_t               rslot;
                                ssize_t              idx;
                                struct shm_du_buff * sdb;
                                struct frct_pci *    pci;
                                struct flow *        f;
                                uint32_t             snd_lwe;
                                uint32_t             rcv_lwe;
                                size_t               lvl = 0;

                                r = list_entry(p, struct rxm, next);

                                list_del(&r->next);

                                snd_cr = &r->frcti->snd_cr;
                                rcv_cr = &r->frcti->rcv_cr;
                                f      = &ai.flows[r->fd];
#ifndef RXM_BUFFER_ON_HEAP
                                shm_du_buff_ack(r->sdb);
#endif
                                if (f->frcti == NULL
                                    || f->flow_id != r->flow_id)
                                        goto cleanup;

                                pthread_rwlock_rdlock(&r->frcti->lock);

                                snd_lwe = snd_cr->lwe;
                                rcv_lwe = rcv_cr->lwe;

                                pthread_rwlock_unlock(&r->frcti->lock);

                                /* Has been ack'd, remove. */
                                if (before(r->seqno, snd_lwe))
                                        goto cleanup;

                                /* Check for r-timer expiry. */
                                if (ts_to_ns(now) - r->t0 > r->frcti->r)
                                        goto flow_down;

                                pthread_rwlock_wrlock(&r->frcti->lock);

                                if (r->seqno == r->frcti->rttseq) {
                                        r->frcti->rto +=
                                                r->frcti->rto >> RTO_DIV;
                                        r->frcti->probe = false;
                                }
#ifdef PROC_FLOW_STATS
                                r->frcti->n_rtx++;
#endif
                                rslot = r->frcti->rto >> RXMQ_RES;

                                pthread_rwlock_unlock(&r->frcti->lock);

                                /* Schedule at least in the next time slot. */
                                slot = ts_to_ns(now) >> RXMQ_RES;

                                while (rslot >= RXMQ_SLOTS) {
                                        ++lvl;
                                        rslot >>= RXMQ_BUMP;
                                        slot >>= RXMQ_BUMP;
                                }

                                if (lvl >= RXMQ_LVLS) /* Can't reschedule */
                                        goto flow_down;

                                rslot = (rslot + slot + 1) & (RXMQ_SLOTS - 1);
#ifdef RXM_BLOCKING
                                if (ipcp_sdb_reserve(&sdb, r->len) < 0)
#else
                                if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL,
                                                      &sdb) < 0)
#endif
                                        goto reschedule; /* rdrbuff full */

                                pci = (struct frct_pci *) shm_du_buff_head(sdb);
                                memcpy(pci, r->pkt, r->len);
#ifndef RXM_BUFFER_ON_HEAP
                                ipcp_sdb_release(r->sdb);
                                r->sdb = sdb;
                                r->pkt = pci;
                                shm_du_buff_wait_ack(sdb);
#endif
                                idx = shm_du_buff_get_idx(sdb);

                                /* Retransmit the copy. */
                                pci->ackno = hton32(rcv_lwe);
#ifdef RXM_BLOCKING
                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
#else
                                if (shm_rbuff_write(f->tx_rb, idx) < 0)
#endif
                                        goto flow_down;
                                shm_flow_set_notify(f->set, f->flow_id,
                                                    FLOW_PKT);
                         reschedule:
                                list_add(&r->next, &rw.rxms[lvl][rslot]);
                                continue;

                         flow_down:
                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
                         cleanup:
#ifdef RXM_BUFFER_ON_HEAP
                                free(r->pkt);
#else
                                ipcp_sdb_release(r->sdb);
#endif
                                free(r);
                        }
                }
                rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
                /* Move up a level in the wheel. */
                rxm_slot >>= RXMQ_BUMP;
        }

        ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;

        j = rw.prv_ack;

        if (ack_slot < j)
                ack_slot += ACKQ_SLOTS;

        while (j++ < ack_slot) {
                list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) {
                        struct ack *  a;
                        struct flow * f;

                        a = list_entry(p, struct ack, next);

                        list_del(&a->next);

                        f = &ai.flows[a->fd];

                        rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;

                        if (f->flow_id == a->flow_id && f->frcti != NULL)
                                send_frct_pkt(a->frcti);

                        free(a);
                }
        }

        rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1);

        pthread_cleanup_pop(true);
}

static int timerwheel_rxm(struct frcti *       frcti,
                          uint32_t             seqno,
                          struct shm_du_buff * sdb)
{
        struct timespec now;
        struct rxm *    r;
        size_t          slot;
        size_t          lvl = 0;
        time_t          rto_slot;

        r = malloc(sizeof(*r));
        if (r == NULL)
                return -ENOMEM;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        r->t0    = ts_to_ns(now);
        r->seqno = seqno;
        r->frcti = frcti;
        r->len  = shm_du_buff_len(sdb);
#ifdef RXM_BUFFER_ON_HEAP
        r->pkt = malloc(r->len);
        if (r->pkt == NULL) {
                free(r);
                return -ENOMEM;
        }
        memcpy(r->pkt, shm_du_buff_head(sdb), r->len);
#else
        r->sdb = sdb;
        r->pkt = (struct frct_pci *) shm_du_buff_head(sdb);
#endif
        pthread_rwlock_rdlock(&r->frcti->lock);

        rto_slot = frcti->rto >> RXMQ_RES;
        slot     = r->t0 >> RXMQ_RES;

        r->fd      = frcti->fd;
        r->flow_id = ai.flows[r->fd].flow_id;

        pthread_rwlock_unlock(&r->frcti->lock);

        while (rto_slot >= RXMQ_SLOTS) {
                ++lvl;
                rto_slot >>= RXMQ_BUMP;
                slot >>= RXMQ_BUMP;
        }

        if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
#ifdef RXM_BUFFER_ON_HEAP
                free(r->pkt);
#endif
                free(r);
                return -EPERM;
        }

        slot = (slot + rto_slot + 1) & (RXMQ_SLOTS - 1);

        pthread_mutex_lock(&rw.lock);

        list_add_tail(&r->next, &rw.rxms[lvl][slot]);
#ifndef RXM_BUFFER_ON_HEAP
        shm_du_buff_wait_ack(sdb);
#endif
        pthread_mutex_unlock(&rw.lock);

        return 0;
}

static int timerwheel_delayed_ack(int            fd,
                                  struct frcti * frcti)
{
        struct timespec now;
        struct ack *    a;
        size_t          slot;

        a = malloc(sizeof(*a));
        if (a == NULL)
                return -ENOMEM;

        clock_gettime(PTHREAD_COND_CLOCK, &now);

        pthread_rwlock_rdlock(&frcti->lock);

        slot = (((ts_to_ns(now) + (TICTIME << 1)) >> ACKQ_RES) + 1)
                & (ACKQ_SLOTS - 1);

        pthread_rwlock_unlock(&frcti->lock);

        a->fd    = fd;
        a->frcti = frcti;
        a->flow_id = ai.flows[fd].flow_id;

        pthread_mutex_lock(&rw.lock);

        if (rw.map[slot][fd]) {
                pthread_mutex_unlock(&rw.lock);
                free(a);
                return 0;
        }

        rw.map[slot][fd] = true;

        list_add_tail(&a->next, &rw.acks[slot]);

        pthread_mutex_unlock(&rw.lock);

        return 0;
}