/*
 * Ouroboros - Copyright (C) 2016 - 2017
 *
 * API for applications
 *
 *    Dimitri Staessens <dimitri.staessens@ugent.be>
 *    Sander Vrijders   <sander.vrijders@ugent.be>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., http://www.fsf.org/about/contact/.
 */

#define _POSIX_C_SOURCE 200809L

#include "config.h"

#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/local-dev.h>
#include <ouroboros/sockets.h>
#include <ouroboros/fccntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
#include <ouroboros/frct_pci.h>
#include <ouroboros/rq.h>

#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>

#define BUF_SIZE       1500

#define TW_ELEMENTS    6000
#define TW_RESOLUTION  1   /* ms */

#define MPL            2000 /* ms */
#define RQ_SIZE        20

#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif

struct flow_set {
        size_t   idx;
};

struct fqueue {
        int    fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
        size_t fqsize;
        size_t next;
};

enum port_state {
        PORT_NULL = 0,
        PORT_INIT,
        PORT_ID_PENDING,
        PORT_ID_ASSIGNED,
        PORT_DESTROY
};

struct frcti {
        bool             used;

        struct timespec  last_snd;
        bool             snd_drf;
        uint64_t         snd_lwe;
        uint64_t         snd_rwe;

        struct timespec  last_rcv;
        bool             rcv_drf;
        uint64_t         rcv_lwe;
        uint64_t         rcv_rwe;

        uint16_t         conf_flags;

        struct rq *      rq;

        pthread_rwlock_t lock;
};

struct port {
        int             fd;

        enum port_state state;
        pthread_mutex_t state_lock;
        pthread_cond_t  state_cond;
};

struct flow {
        struct shm_rbuff *    rx_rb;
        struct shm_rbuff *    tx_rb;
        struct shm_flow_set * set;
        int                   port_id;
        int                   oflags;
        qoscube_t             cube;
        qosspec_t             spec;

        pid_t                 api;

        bool                  snd_timesout;
        bool                  rcv_timesout;
        struct timespec       snd_timeo;
        struct timespec       rcv_timeo;
};

struct {
        char *                ap_name;
        pid_t                 api;

        struct shm_rdrbuff *  rdrb;
        struct shm_flow_set * fqset;

        struct timerwheel *   tw;

        struct bmp *          fds;
        struct bmp *          fqueues;
        struct flow *         flows;
        struct port *         ports;
        struct frcti *        frcti;

        pthread_rwlock_t      lock;
} ai;

static void port_destroy(struct port * p)
{
        pthread_mutex_lock(&p->state_lock);

        if (p->state == PORT_DESTROY) {
                pthread_mutex_unlock(&p->state_lock);
                return;
        }

        if (p->state == PORT_ID_PENDING)
                p->state = PORT_DESTROY;
        else
                p->state = PORT_NULL;

        pthread_cond_signal(&p->state_cond);

        while (p->state != PORT_NULL)
                pthread_cond_wait(&p->state_cond, &p->state_lock);

        p->fd = -1;
        p->state = PORT_INIT;

        pthread_mutex_unlock(&p->state_lock);
}

static void port_set_state(struct port *   p,
                           enum port_state state)
{
        pthread_mutex_lock(&p->state_lock);

        if (p->state == PORT_DESTROY) {
                pthread_mutex_unlock(&p->state_lock);
                return;
        }

        p->state = state;
        pthread_cond_broadcast(&p->state_cond);

        pthread_mutex_unlock(&p->state_lock);
}

static enum port_state port_wait_assign(int port_id)
{
        enum port_state state;
        struct port *   p;

        pthread_rwlock_rdlock(&ai.lock);

        p = &ai.ports[port_id];

        pthread_rwlock_unlock(&ai.lock);

        pthread_mutex_lock(&p->state_lock);

        if (p->state == PORT_ID_ASSIGNED) {
                pthread_mutex_unlock(&p->state_lock);
                return PORT_ID_ASSIGNED;
        }

        if (p->state == PORT_INIT)
                p->state = PORT_ID_PENDING;

        while (p->state == PORT_ID_PENDING)
                pthread_cond_wait(&p->state_cond, &p->state_lock);

        if (p->state == PORT_DESTROY) {
                p->state = PORT_NULL;
                pthread_cond_broadcast(&p->state_cond);
        }

        state = p->state;

        assert(state != PORT_INIT);

        pthread_mutex_unlock(&p->state_lock);

        return state;
}

static int api_announce(char * ap_name)
{
        irm_msg_t msg = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        int ret = -1;

        msg.code    = IRM_MSG_CODE__IRM_API_ANNOUNCE;
        msg.has_api = true;

        msg.api = ai.api;
        msg.ap_name = ap_name;

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL) {
                return -EIRMD;
        }

        if (!recv_msg->has_result || (ret = recv_msg->result)) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return ret;
        }

        irm_msg__free_unpacked(recv_msg, NULL);

        return ret;
}

/* Call under flows lock. */
static int finalize_write(int    fd,
                          size_t idx)
{
        int ret;

        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
        if (ret < 0)
                return ret;

        shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);

        return ret;
}

static int frcti_init(int fd)
{
        struct frcti * frcti;

        frcti = &(ai.frcti[fd]);

        frcti->used = true;

        frcti->snd_drf = true;
        frcti->snd_lwe = 0;
        frcti->snd_rwe = 0;

        frcti->rcv_drf = true;
        frcti->rcv_lwe = 0;
        frcti->rcv_rwe = 0;

        frcti->conf_flags = 0;

        frcti->rq = rq_create(RQ_SIZE);
        if (frcti->rq == NULL)
                return -1;

        return 0;
}

static void frcti_clear(int fd)
{
        ai.frcti[fd].used = false;
}

static void frcti_fini(int fd)
{
        /*
         * FIXME: In case of reliable transmission we should
         * make sure everything is acked.
         */

        frcti_clear(fd);

        rq_destroy(ai.frcti[fd].rq);
}

static int frcti_send(int                  fd,
                      struct frct_pci *    pci,
                      struct shm_du_buff * sdb)
{
        struct timespec now = {0, 0};
        struct frcti *  frcti;
        int             ret;

        frcti = &(ai.frcti[fd]);

        clock_gettime(CLOCK_REALTIME_COARSE, &now);

        pthread_rwlock_wrlock(&frcti->lock);

        /* Check if sender inactivity is true. */
        if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL)
                frcti->snd_drf = true;

        /* Set the DRF in the first packet of a new run of SDUs. */
        if (frcti->snd_drf) {
                pci->flags |= FLAG_DATA_RUN;
                frcti->snd_drf = false;
        }

        frcti->last_snd = now;

        pci->seqno = frcti->snd_lwe++;

        if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) {
                pthread_rwlock_unlock(&frcti->lock);
                return -1;
        }

        ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
        if (ret < 0) {
                pthread_rwlock_unlock(&frcti->lock);
                return ret;
        }

        pthread_rwlock_unlock(&frcti->lock);

        return 0;
}


static int frcti_configure(int      fd,
                           uint16_t flags)
{
        struct frcti *       frcti;
        struct frct_pci      pci;
        struct shm_du_buff * sdb;

        frcti = &(ai.frcti[fd]);

        memset(&pci, 0, sizeof(pci));

        if (ipcp_sdb_reserve(&sdb, 0))
                return -1;

        pci.conf_flags = flags;

        /* Always set the DRF on a configure message. */
        pci.flags |= FLAG_DATA_RUN;
        pci.type |= PDU_TYPE_CONFIG;

        pthread_rwlock_wrlock(&frcti->lock);

        frcti->conf_flags = pci.conf_flags;

        pthread_rwlock_unlock(&frcti->lock);

        if (frcti_send(fd, &pci, sdb)) {
                shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
                return -1;
        }

        return 0;
}

static int frcti_write(int                  fd,
                       struct shm_du_buff * sdb)
{
        struct frct_pci pci;

        memset(&pci, 0, sizeof(pci));

        pci.type |= PDU_TYPE_DATA;

        return frcti_send(fd, &pci, sdb);
}

static ssize_t frcti_read(int fd)
{
        ssize_t              idx;
        struct frcti *       frcti;
        struct frct_pci      pci;
        struct shm_du_buff * sdb;
        uint64_t             seqno;
        bool                 nxt_pdu = true;

        frcti = &(ai.frcti[fd]);

        /* See if we already have the next PDU */
        pthread_rwlock_wrlock(&frcti->lock);

        if (!rq_is_empty(frcti->rq)) {
                seqno = rq_peek(frcti->rq);
                if (seqno == frcti->rcv_lwe) {
                        frcti->rcv_lwe++;
                        idx = rq_pop(frcti->rq);
                        pthread_rwlock_unlock(&frcti->lock);
                        return idx;
                }
        }

        pthread_rwlock_unlock(&frcti->lock);

        do {
                struct timespec    now;
                struct timespec    abs;
                struct timespec *  abstime = NULL;
                struct shm_rbuff * rb;
                bool               noblock;

                clock_gettime(CLOCK_REALTIME_COARSE, &now);

                pthread_rwlock_rdlock(&ai.lock);

                noblock = ai.flows[fd].oflags & FLOWFNONBLOCK;
                rb      = ai.flows[fd].rx_rb;

                if (ai.flows[fd].rcv_timesout) {
                        ts_add(&now, &ai.flows[fd].rcv_timeo, &abs);
                        abstime = &abs;
                }

                pthread_rwlock_unlock(&ai.lock);

                if (noblock) {
                        idx = shm_rbuff_read(rb);
                } else {
                        idx = shm_rbuff_read_b(rb, abstime);
                        clock_gettime(CLOCK_REALTIME_COARSE, &now);
                }

                if (idx < 0)
                        return idx;

                sdb = shm_rdrbuff_get(ai.rdrb, idx);

                pthread_rwlock_wrlock(&frcti->lock);

                /* SDU may be corrupted. */
                if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) {
                        pthread_rwlock_unlock(&frcti->lock);
                        shm_rdrbuff_remove(ai.rdrb, idx);
                        return -EAGAIN;
                }

                /* Check if receiver inactivity is true. */
                if (!frcti->rcv_drf &&
                    ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
                        frcti->rcv_drf = true;

                /* When there is receiver inactivity queue the packet. */
                if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
                        if (rq_push(frcti->rq, pci.seqno, idx))
                                shm_rdrbuff_remove(ai.rdrb, idx);
                        pthread_rwlock_unlock(&frcti->lock);
                        return -EAGAIN;
                }

                /* If the DRF is set, reset the state of the connection. */
                if (pci.flags & FLAG_DATA_RUN)
                        frcti->rcv_lwe = pci.seqno;

                if (pci.type & PDU_TYPE_CONFIG)
                        frcti->conf_flags = pci.conf_flags;

                if (frcti->rcv_drf)
                        frcti->rcv_drf = false;

                frcti->last_rcv = now;

                nxt_pdu = true;

                if (!(pci.type & PDU_TYPE_DATA)) {
                        shm_rdrbuff_remove(ai.rdrb, idx);
                        nxt_pdu = false;
                }

                if (frcti->conf_flags & FRCTFORDERING) {
                        if (pci.seqno != frcti->rcv_lwe) {
                                if (rq_push(frcti->rq, pci.seqno, idx))
                                        shm_rdrbuff_remove(ai.rdrb, idx);
                                nxt_pdu = false;
                        } else {
                                frcti->rcv_lwe++;
                        }
                }

                pthread_rwlock_unlock(&frcti->lock);

        } while (!nxt_pdu);

        return idx;
}

static void flow_clear(int fd)
{
        assert(!(fd < 0));

        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));

        ai.flows[fd].port_id  = -1;
        ai.flows[fd].api      = -1;
        ai.flows[fd].cube     = QOS_CUBE_BE;
}

static void flow_fini(int fd)
{
        assert(!(fd < 0));

        if (ai.flows[fd].port_id != -1)
                port_destroy(&ai.ports[ai.flows[fd].port_id]);

        if (ai.flows[fd].rx_rb != NULL)
                shm_rbuff_close(ai.flows[fd].rx_rb);

        if (ai.flows[fd].tx_rb != NULL)
                shm_rbuff_close(ai.flows[fd].tx_rb);

        if (ai.flows[fd].set != NULL)
                shm_flow_set_close(ai.flows[fd].set);

        if (ai.frcti[fd].used)
                frcti_fini(fd);

        flow_clear(fd);
}

static int flow_init(int       port_id,
                     pid_t     api,
                     qoscube_t qc)
{
        int fd;

        pthread_rwlock_wrlock(&ai.lock);

        fd = bmp_allocate(ai.fds);
        if (!bmp_is_id_valid(ai.fds, fd)) {
                pthread_rwlock_unlock(&ai.lock);
                return -EBADF;
        }

        ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
        if (ai.flows[fd].rx_rb == NULL) {
                bmp_release(ai.fds, fd);
                pthread_rwlock_unlock(&ai.lock);
                return -ENOMEM;
        }

        ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id);
        if (ai.flows[fd].tx_rb == NULL) {
                flow_fini(fd);
                bmp_release(ai.fds, fd);
                pthread_rwlock_unlock(&ai.lock);
                return -ENOMEM;
        }

        ai.flows[fd].set = shm_flow_set_open(api);
        if (ai.flows[fd].set == NULL) {
                flow_fini(fd);
                bmp_release(ai.fds, fd);
                pthread_rwlock_unlock(&ai.lock);
                return -ENOMEM;
        }

        ai.flows[fd].port_id = port_id;
        ai.flows[fd].oflags  = FLOWFDEFAULT;
        ai.flows[fd].api     = api;
        ai.flows[fd].cube    = qc;
        ai.flows[fd].spec    = qos_cube_to_spec(qc);

        ai.ports[port_id].fd = fd;

        port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);

        pthread_rwlock_unlock(&ai.lock);

        return fd;
}

int ouroboros_init(const char * ap_name)
{
        int i;
        int j;
        int ret = -ENOMEM;

        assert(ai.ap_name == NULL);

        ai.api = getpid();

        ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);
        if (ai.fds == NULL)
                goto fail_fds;

        ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0);
        if (ai.fqueues == NULL)
                goto fail_fqueues;

        ai.fqset = shm_flow_set_create();
        if (ai.fqset == NULL)
                goto fail_fqset;

        ai.rdrb = shm_rdrbuff_open();
        if (ai.rdrb == NULL) {
                ret = -EIRMD;
                goto fail_rdrb;
        }

        ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
        if (ai.flows == NULL)
                goto fail_flows;

        ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS);
        if (ai.frcti == NULL)
                goto fail_frcti;

        for (i = 0; i < AP_MAX_FLOWS; ++i) {
                flow_clear(i);
                frcti_clear(i);

                if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) {
                        for (j = i - 1; j >= 0 ; j--)
                                pthread_rwlock_destroy(&ai.frcti[j].lock);
                        goto fail_frct_lock;
                }
        }

        ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
        if (ai.ports == NULL)
                goto fail_ports;

        if (ap_name != NULL) {
                ai.ap_name = strdup(path_strip((char *) ap_name));
                if (ai.ap_name == NULL)
                        goto fail_ap_name;

                if (api_announce((char *) ai.ap_name)) {
                        ret = -EIRMD;
                        goto fail_announce;
                }
        }

        for (i = 0; i < SYS_MAX_FLOWS; ++i) {
                ai.ports[i].state = PORT_INIT;
                if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) {
                        int j;
                        for (j = 0; j < i; ++j)
                                pthread_mutex_destroy(&ai.ports[j].state_lock);
                        goto fail_announce;
                }
                if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) {
                        int j;
                        for (j = 0; j < i; ++j)
                                pthread_cond_destroy(&ai.ports[j].state_cond);
                        goto fail_state_cond;
                }
        }

        if (pthread_rwlock_init(&ai.lock, NULL))
                goto fail_lock;

        ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
        if (ai.tw == NULL)
                goto fail_timerwheel;

        return 0;

 fail_timerwheel:
        pthread_rwlock_destroy(&ai.lock);
 fail_lock:
        for (i = 0; i < SYS_MAX_FLOWS; ++i)
                pthread_cond_destroy(&ai.ports[i].state_cond);
 fail_state_cond:
        for (i = 0; i < SYS_MAX_FLOWS; ++i)
                pthread_mutex_destroy(&ai.ports[i].state_lock);
 fail_announce:
        free(ai.ap_name);
 fail_ap_name:
        free(ai.ports);
 fail_ports:
        for (i = 0; i < AP_MAX_FLOWS; ++i)
                pthread_rwlock_destroy(&ai.frcti[i].lock);
 fail_frct_lock:
        free(ai.frcti);
 fail_frcti:
        free(ai.flows);
 fail_flows:
        shm_rdrbuff_close(ai.rdrb);
 fail_rdrb:
        shm_flow_set_destroy(ai.fqset);
 fail_fqset:
        bmp_destroy(ai.fqueues);
 fail_fqueues:
        bmp_destroy(ai.fds);
 fail_fds:
        return ret;
}

void ouroboros_fini()
{
        int i = 0;

        bmp_destroy(ai.fds);
        bmp_destroy(ai.fqueues);

        shm_flow_set_destroy(ai.fqset);

        if (ai.ap_name != NULL)
                free(ai.ap_name);

        pthread_rwlock_rdlock(&ai.lock);

        for (i = 0; i < AP_MAX_FLOWS; ++i) {
                if (ai.flows[i].port_id != -1) {
                        ssize_t idx;
                        while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
                                shm_rdrbuff_remove(ai.rdrb, idx);
                        flow_fini(i);
                }

                pthread_rwlock_destroy(&ai.frcti[i].lock);
        }

        for (i = 0; i < SYS_MAX_FLOWS; ++i) {
                pthread_mutex_destroy(&ai.ports[i].state_lock);
                pthread_cond_destroy(&ai.ports[i].state_cond);
        }

        shm_rdrbuff_close(ai.rdrb);

        if (ai.tw != NULL)
                timerwheel_destroy(ai.tw);

        free(ai.flows);
        free(ai.ports);
        free(ai.frcti);

        pthread_rwlock_unlock(&ai.lock);

        pthread_rwlock_destroy(&ai.lock);
}

int flow_accept(qosspec_t *             qs,
                const struct timespec * timeo)
{
        irm_msg_t   msg      = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        int         fd       = -1;

        msg.code    = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
        msg.has_api = true;
        msg.api     = ai.api;

        if (timeo != NULL) {
                msg.has_timeo_sec = true;
                msg.has_timeo_nsec = true;
                msg.timeo_sec  = timeo->tv_sec;
                msg.timeo_nsec = timeo->tv_nsec;
        }

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL)
                return -EIRMD;

        if (!recv_msg->has_result) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -EIRMD;
        }

        if (recv_msg->result !=  0) {
                int res = recv_msg->result;
                irm_msg__free_unpacked(recv_msg, NULL);
                return res;
        }

        if (!recv_msg->has_api || !recv_msg->has_port_id ||
            !recv_msg->has_qoscube) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -EIRMD;
        }

        fd = flow_init(recv_msg->port_id, recv_msg->api, recv_msg->qoscube);

        irm_msg__free_unpacked(recv_msg, NULL);

        if (fd < 0)
                return fd;

        pthread_rwlock_wrlock(&ai.lock);

        frcti_init(fd);

        if (qs != NULL)
                *qs = ai.flows[fd].spec;

        pthread_rwlock_unlock(&ai.lock);

        return fd;
}

int flow_alloc(const char *            dst_name,
               qosspec_t *             qs,
               const struct timespec * timeo)
{
        irm_msg_t   msg      = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        qoscube_t   qc       = QOS_CUBE_BE;
        int         fd;

        msg.code        = IRM_MSG_CODE__IRM_FLOW_ALLOC;
        msg.dst_name    = (char *) dst_name;
        msg.has_api     = true;
        msg.has_qoscube = true;
        msg.api         = ai.api;

        if (qs != NULL)
                qc = qos_spec_to_cube(*qs);

        msg.qoscube = qc;

        if (timeo != NULL) {
                msg.has_timeo_sec = true;
                msg.has_timeo_nsec = true;
                msg.timeo_sec  = timeo->tv_sec;
                msg.timeo_nsec = timeo->tv_nsec;
        }

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL)
                return -EIRMD;

        if (!recv_msg->has_result) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -EIRMD;
        }

        if (recv_msg->result != 0) {
                int res =  recv_msg->result;
                irm_msg__free_unpacked(recv_msg, NULL);
                return res;
        }

        if (!recv_msg->has_api || !recv_msg->has_port_id) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -EIRMD;
        }

        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);

        irm_msg__free_unpacked(recv_msg, NULL);

        if (fd < 0)
                return fd;

        pthread_rwlock_wrlock(&ai.lock);

        frcti_init(fd);

        pthread_rwlock_unlock(&ai.lock);

        return fd;
}

int flow_dealloc(int fd)
{
        irm_msg_t msg = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;

        if (fd < 0)
                return -EINVAL;

        msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
        msg.has_port_id  = true;
        msg.has_api      = true;
        msg.api          = ai.api;

        pthread_rwlock_rdlock(&ai.lock);

        assert(!(ai.flows[fd].port_id < 0));

        msg.port_id = ai.flows[fd].port_id;

        pthread_rwlock_unlock(&ai.lock);

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL)
                return -EIRMD;

        if (!recv_msg->has_result) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -EIRMD;
        }

        irm_msg__free_unpacked(recv_msg, NULL);

        pthread_rwlock_wrlock(&ai.lock);

        flow_fini(fd);
        bmp_release(ai.fds, fd);

        pthread_rwlock_unlock(&ai.lock);

        return 0;
}

int fccntl(int fd,
           int cmd,
           ...)
{
        uint32_t *        fflags;
        uint16_t *        cflags;
        va_list           l;
        struct timespec * timeo;
        qosspec_t *       qs;
        uint32_t          rx_acl;
        uint32_t          tx_acl;

        if (fd < 0 || fd >= AP_MAX_FLOWS)
                return -EBADF;

        va_start(l, cmd);

        pthread_rwlock_wrlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                va_end(l);
                return -ENOTALLOC;
        }

        switch(cmd) {
        case FLOWSSNDTIMEO:
                timeo = va_arg(l, struct timespec *);
                if (timeo == NULL) {
                        ai.flows[fd].snd_timesout = false;
                } else {
                        ai.flows[fd].snd_timesout = true;
                        ai.flows[fd].snd_timeo    = *timeo;
                }
                break;
        case FLOWGSNDTIMEO:
                timeo = va_arg(l, struct timespec *);
                if (timeo == NULL)
                        goto einval;
                if (!ai.flows[fd].snd_timesout)
                        goto eperm;
                *timeo = ai.flows[fd].snd_timeo;
                break;
        case FLOWSRCVTIMEO:
                timeo = va_arg(l, struct timespec *);
                if (timeo == NULL) {
                        ai.flows[fd].rcv_timesout = false;
                } else {
                        ai.flows[fd].rcv_timesout = true;
                        ai.flows[fd].rcv_timeo    = *timeo;
                }
                break;
        case FLOWGRCVTIMEO:
                timeo = va_arg(l, struct timespec *);
                if (timeo == NULL)
                        goto einval;
                if (!ai.flows[fd].rcv_timesout)
                        goto eperm;
                *timeo = ai.flows[fd].snd_timeo;
                break;
        case FLOWGQOSSPEC:
                qs = va_arg(l, qosspec_t *);
                if (qs == NULL)
                        goto einval;
                *qs = ai.flows[fd].spec;
                break;
        case FLOWSFLAGS:
                ai.flows[fd].oflags = va_arg(l, uint32_t);
                rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
                tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
                /*
                 * Making our own flow write only means making the
                 * the other side of the flow read only.
                 */
                if (ai.flows[fd].oflags & FLOWFWRONLY)
                        rx_acl |= ACL_RDONLY;
                if (ai.flows[fd].oflags & FLOWFRDWR)
                        rx_acl |= ACL_RDWR;

                if (ai.flows[fd].oflags & FLOWFDOWN) {
                        rx_acl |= ACL_FLOWDOWN;
                        tx_acl |= ACL_FLOWDOWN;
                } else {
                        rx_acl &= ~ACL_FLOWDOWN;
                        tx_acl &= ~ACL_FLOWDOWN;
                }

                shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
                shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);

                break;
        case FLOWGFLAGS:
                fflags = va_arg(l, uint32_t *);
                if (fflags == NULL)
                        goto einval;
                *fflags = ai.flows[fd].oflags;
                break;
        case FRCTSFLAGS:
                ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int);
                break;
        case FRCTGFLAGS:
                cflags = (uint16_t *) va_arg(l, int *);
                if (cflags == NULL)
                        goto einval;
                *cflags = ai.frcti[fd].conf_flags;
                if (frcti_configure(fd, ai.frcti[fd].conf_flags))
                        goto eperm;
                break;
        default:
                pthread_rwlock_unlock(&ai.lock);
                va_end(l);
                return -ENOTSUP;

        };

        pthread_rwlock_unlock(&ai.lock);

        va_end(l);

        return 0;

 einval:
        pthread_rwlock_unlock(&ai.lock);
        va_end(l);
        return -EINVAL;
 eperm:
        pthread_rwlock_unlock(&ai.lock);
        va_end(l);
        return -EPERM;
}

ssize_t flow_write(int          fd,
                   const void * buf,
                   size_t       count)
{
        ssize_t idx;
        int     ret;

        if (buf == NULL)
                return 0;

        if (fd < 0 || fd > AP_MAX_FLOWS)
                return -EBADF;

        pthread_rwlock_rdlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
                pthread_rwlock_unlock(&ai.lock);
                return -EPERM;
        }

        if (ai.flows[fd].oflags & FLOWFNONBLOCK) {
                idx = shm_rdrbuff_write(ai.rdrb,
                                       DU_BUFF_HEADSPACE,
                                       DU_BUFF_TAILSPACE,
                                       buf,
                                       count);
                if (idx < 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        return idx;
                }

        } else { /* Blocking. */
                pthread_rwlock_unlock(&ai.lock);

                idx = shm_rdrbuff_write_b(ai.rdrb,
                                          DU_BUFF_HEADSPACE,
                                          DU_BUFF_TAILSPACE,
                                          buf,
                                          count);

                pthread_rwlock_rdlock(&ai.lock);
        }

        if (!ai.frcti[fd].used) {
                ret = finalize_write(fd, idx);
                if (ret < 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        shm_rdrbuff_remove(ai.rdrb, idx);
                        return ret;
                }

                pthread_rwlock_unlock(&ai.lock);
        } else {
                pthread_rwlock_unlock(&ai.lock);

                ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
                if (ret < 0) {
                        shm_rdrbuff_remove(ai.rdrb, idx);
                        return ret;
                }
        }

        return 0;
}

ssize_t flow_read(int    fd,
                  void * buf,
                  size_t count)
{
        ssize_t            idx;
        ssize_t            n;
        uint8_t *          sdu;
        bool               used;
        struct shm_rbuff * rb;

        if (fd < 0 || fd > AP_MAX_FLOWS)
                return -EBADF;

        pthread_rwlock_rdlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        used = ai.frcti[fd].used;
        rb   = ai.flows[fd].rx_rb;

        pthread_rwlock_unlock(&ai.lock);

        if (!used)
                idx = shm_rbuff_read(rb);
        else
                idx = frcti_read(fd);

        if (idx < 0) {
                assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
                       idx == -EFLOWDOWN);
                return idx;
        }

        n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
        if (n < 0)
                return -1;

        memcpy(buf, sdu, MIN((size_t) n, count));

        shm_rdrbuff_remove(ai.rdrb, idx);

        return n;
}

/* fqueue functions. */

struct flow_set * fset_create()
{
        struct flow_set * set = malloc(sizeof(*set));
        if (set == NULL)
                return NULL;

        assert(ai.fqueues);

        pthread_rwlock_wrlock(&ai.lock);

        set->idx = bmp_allocate(ai.fqueues);
        if (!bmp_is_id_valid(ai.fqueues, set->idx)) {
                pthread_rwlock_unlock(&ai.lock);
                free(set);
                return NULL;
        }

        pthread_rwlock_unlock(&ai.lock);

        return set;
}

void fset_destroy(struct flow_set * set)
{
        if (set == NULL)
                return;

        fset_zero(set);

        pthread_rwlock_wrlock(&ai.lock);

        bmp_release(ai.fqueues, set->idx);

        pthread_rwlock_unlock(&ai.lock);

        free(set);
}

struct fqueue * fqueue_create()
{
        struct fqueue * fq = malloc(sizeof(*fq));
        if (fq == NULL)
                return NULL;

        memset(fq->fqueue, -1, (SHM_BUFFER_SIZE) * sizeof(*fq->fqueue));
        fq->fqsize = 0;
        fq->next   = 0;

        return fq;
}

void fqueue_destroy(struct fqueue * fq)
{
        if (fq == NULL)
                return;

        free(fq);
}

void fset_zero(struct flow_set * set)
{
        if (set == NULL)
                return;

        shm_flow_set_zero(ai.fqset, set->idx);
}

int fset_add(struct flow_set * set,
             int               fd)
{
        int ret;
        size_t sdus;
        size_t i;

        if (set == NULL || fd < 0 || fd > AP_MAX_FLOWS)
                return -EINVAL;

        pthread_rwlock_wrlock(&ai.lock);

        ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);

        sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
        for (i = 0; i < sdus; i++)
                shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);

        pthread_rwlock_unlock(&ai.lock);

        return ret;
}

void fset_del(struct flow_set * set,
              int               fd)
{
        if (set == NULL || fd < 0 || fd > AP_MAX_FLOWS)
                return;

        pthread_rwlock_wrlock(&ai.lock);

        if (ai.flows[fd].port_id >= 0)
                shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);

        pthread_rwlock_unlock(&ai.lock);
}

bool fset_has(const struct flow_set * set,
              int                     fd)
{
        bool ret = false;

        if (set == NULL || fd < 0)
                return false;

        pthread_rwlock_rdlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return false;
        }

        ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);

        pthread_rwlock_unlock(&ai.lock);

        return ret;
}

int fqueue_next(struct fqueue * fq)
{
        int fd;

        if (fq == NULL)
                return -EINVAL;

        if (fq->fqsize == 0)
                return -EPERM;

        pthread_rwlock_rdlock(&ai.lock);

        fd = ai.ports[fq->fqueue[fq->next++]].fd;

        pthread_rwlock_unlock(&ai.lock);

        if (fq->next == fq->fqsize) {
                fq->fqsize = 0;
                fq->next = 0;
        }

        return fd;
}

int fevent(struct flow_set *       set,
           struct fqueue *         fq,
           const struct timespec * timeo)
{
        ssize_t           ret;
        struct timespec   abstime;
        struct timespec * t = NULL;

        if (set == NULL || fq == NULL)
                return -EINVAL;

        if (fq->fqsize > 0)
                return fq->fqsize;

        assert(!fq->next);

        if (timeo != NULL) {
                clock_gettime(PTHREAD_COND_CLOCK, &abstime);
                ts_add(&abstime, timeo, &abstime);
                t = &abstime;
        }

        ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
        if (ret == -ETIMEDOUT) {
                fq->fqsize = 0;
                return -ETIMEDOUT;
        }

        fq->fqsize = ret;

        assert(ret);

        return ret;
}

/* ipcp-dev functions. */

int np1_flow_alloc(pid_t     n_api,
                   int       port_id,
                   qoscube_t qc)
{
        return flow_init(port_id, n_api, qc);
}

int np1_flow_dealloc(int port_id)
{
        int fd;

        pthread_rwlock_rdlock(&ai.lock);

        fd = ai.ports[port_id].fd;

        pthread_rwlock_unlock(&ai.lock);

        return fd;
}

int np1_flow_resp(int port_id)
{
        int fd;

        if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)
                return -1;

        pthread_rwlock_rdlock(&ai.lock);

        fd = ai.ports[port_id].fd;

        pthread_rwlock_unlock(&ai.lock);

        return fd;
}

int ipcp_create_r(pid_t api,
                  int   result)
{
        irm_msg_t msg = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        int ret = -1;

        msg.code       = IRM_MSG_CODE__IPCP_CREATE_R;
        msg.has_api    = true;
        msg.api        = api;
        msg.has_result = true;
        msg.result     = result;

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL)
                return -EIRMD;

        if (recv_msg->has_result == false) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -1;
        }

        ret = recv_msg->result;
        irm_msg__free_unpacked(recv_msg, NULL);

        return ret;
}

int ipcp_flow_req_arr(pid_t           api,
                      const uint8_t * dst,
                      size_t          len,
                      qoscube_t       qc)
{
        irm_msg_t msg = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        int fd = -1;

        if (dst == NULL)
                return -EINVAL;

        msg.code        = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
        msg.has_api     = true;
        msg.api         = api;
        msg.has_hash    = true;
        msg.hash.len    = len;
        msg.hash.data   = (uint8_t *) dst;
        msg.has_qoscube = true;
        msg.qoscube     = qc;

        recv_msg = send_recv_irm_msg(&msg);

        if (recv_msg == NULL)
                return -EIRMD;

        if (!recv_msg->has_port_id || !recv_msg->has_api) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -1;
        }

        if (recv_msg->has_result && recv_msg->result) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -1;
        }

        fd = flow_init(recv_msg->port_id, recv_msg->api, qc);

        irm_msg__free_unpacked(recv_msg, NULL);

        return fd;
}

int ipcp_flow_alloc_reply(int fd,
                          int response)
{
        irm_msg_t msg = IRM_MSG__INIT;
        irm_msg_t * recv_msg = NULL;
        int ret = -1;

        msg.code         = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
        msg.has_port_id  = true;

        pthread_rwlock_rdlock(&ai.lock);

        msg.port_id = ai.flows[fd].port_id;

        pthread_rwlock_unlock(&ai.lock);

        msg.has_response = true;
        msg.response     = response;

        recv_msg = send_recv_irm_msg(&msg);
        if (recv_msg == NULL)
                return -EIRMD;

        if (recv_msg->has_result == false) {
                irm_msg__free_unpacked(recv_msg, NULL);
                return -1;
        }

        ret = recv_msg->result;

        irm_msg__free_unpacked(recv_msg, NULL);

        return ret;
}

int ipcp_flow_read(int                   fd,
                   struct shm_du_buff ** sdb)
{
        ssize_t idx = -1;
        int port_id = -1;

        assert(fd >= 0);
        assert(sdb);

        pthread_rwlock_rdlock(&ai.lock);

        if ((port_id = ai.flows[fd].port_id) < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        pthread_rwlock_unlock(&ai.lock);

        if (!ai.frcti[fd].used)
                idx = shm_rbuff_read(ai.flows[fd].rx_rb);
        else
                idx = frcti_read(fd);

        if (idx < 0)
                return idx;

        *sdb = shm_rdrbuff_get(ai.rdrb, idx);

        return 0;
}

int ipcp_flow_write(int                  fd,
                    struct shm_du_buff * sdb)
{
        int ret;

        if (sdb == NULL)
                return -EINVAL;

        pthread_rwlock_rdlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
                pthread_rwlock_unlock(&ai.lock);
                return -EPERM;
        }

        assert(ai.flows[fd].tx_rb);

        if (!ai.frcti[fd].used) {
                ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
                if (ret < 0) {
                        pthread_rwlock_unlock(&ai.lock);
                        return ret;
                }

                pthread_rwlock_unlock(&ai.lock);
        } else {
                pthread_rwlock_unlock(&ai.lock);

                ret = frcti_write(fd, sdb);
                if (ret < 0)
                        return ret;
        }

        return 0;
}

int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
                     size_t                len)
{
        struct shm_rdrbuff * rdrb;
        ssize_t              idx;

        rdrb = ai.rdrb;

        idx = shm_rdrbuff_write_b(rdrb,
                                  DU_BUFF_HEADSPACE,
                                  DU_BUFF_TAILSPACE,
                                  NULL,
                                  len);

        if (idx < 0)
                return -1;

        *sdb = shm_rdrbuff_get(rdrb, idx);

        return 0;
}

void ipcp_flow_fini(int fd)
{
        struct shm_rbuff * rx_rb;

        fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);

        pthread_rwlock_rdlock(&ai.lock);

        rx_rb = ai.flows[fd].rx_rb;

        pthread_rwlock_unlock(&ai.lock);

        shm_rbuff_fini(rx_rb);
}

int ipcp_flow_get_qoscube(int         fd,
                          qoscube_t * cube)
{
        if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL)
                return -EINVAL;

        pthread_rwlock_wrlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        *cube = ai.flows[fd].cube;

        pthread_rwlock_unlock(&ai.lock);

        return 0;
}

ssize_t local_flow_read(int fd)
{
        ssize_t ret;

        assert(fd >= 0);

        pthread_rwlock_rdlock(&ai.lock);

        ret = shm_rbuff_read(ai.flows[fd].rx_rb);

        pthread_rwlock_unlock(&ai.lock);

        return ret;
}

int local_flow_write(int    fd,
                     size_t idx)
{
        int ret;

        if (fd < 0)
                return -EINVAL;

        pthread_rwlock_rdlock(&ai.lock);

        if (ai.flows[fd].port_id < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return -ENOTALLOC;
        }

        ret = finalize_write(fd, idx);
        if (ret < 0) {
                pthread_rwlock_unlock(&ai.lock);
                return ret;
        }

        pthread_rwlock_unlock(&ai.lock);

        return 0;
}

void ipcp_sdb_release(struct shm_du_buff * sdb)
{
        shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
}