/*
 * Ouroboros - Copyright (C) 2016 - 2017
 *
 * The Common Distributed Application Protocol
 *
 *    Dimitri Staessens <dimitri.staessens@ugent.be>
 *    Sander Vrijders   <sander.vrijders@ugent.be>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * version 2.1 as published by the Free Software Foundation.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301 USA
 */

#include <ouroboros/config.h>
#include <ouroboros/cdap.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/fqueue.h>
#include <ouroboros/fcntl.h>
#include <ouroboros/errno.h>

#include "cdap_req.h"

#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>

#include "cdap.pb-c.h"
typedef Cdap cdap_t;

#define CDAP_REPLY (CDAP_DELETE + 1)

#define INVALID_ID -1
#define IDS_SIZE 2048
#define BUF_SIZE 2048

struct fd_el {
        struct list_head next;

        int              fd;
};

struct cdap {
        flow_set_t *     set;
        fqueue_t *       fq;

        size_t           n_flows;
        struct list_head flows;
        pthread_rwlock_t flows_lock;

        struct bmp *     ids;
        pthread_mutex_t  ids_lock;

        struct list_head sent;
        pthread_rwlock_t sent_lock;

        struct list_head rcvd;
        pthread_cond_t   rcvd_cond;
        pthread_mutex_t  rcvd_lock;

        pthread_t        reader;
};

struct cdap_rcvd {
        struct list_head next;

        int              fd;
        bool             proc;

        invoke_id_t      iid;
        cdap_key_t       key;

        enum cdap_opcode opcode;
        char *           name;
        void *           data;
        size_t           len;
        uint32_t         flags;
};

static int next_id(struct cdap * instance)
{
        int ret;

        assert(instance);

        pthread_mutex_lock(&instance->ids_lock);

        ret = bmp_allocate(instance->ids);
        if (!bmp_is_id_valid(instance->ids, ret))
                ret = INVALID_ID;

        pthread_mutex_unlock(&instance->ids_lock);

        return ret;
}

static int release_id(struct cdap * instance,
                      int32_t       id)
{
        int ret;

        assert(instance);

        pthread_mutex_lock(&instance->ids_lock);

        ret = bmp_release(instance->ids, id);

        pthread_mutex_unlock(&instance->ids_lock);

        return ret;
}

#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL)

static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
                                              cdap_key_t    key)
{
        struct list_head * p = NULL;
        struct cdap_req *  req = NULL;

        assert(instance);

        pthread_rwlock_rdlock(&instance->sent_lock);

        list_for_each(p, &instance->sent) {
                req = list_entry(p, struct cdap_req, next);
                if (req->key == key) {
                        pthread_rwlock_unlock(&instance->sent_lock);
                        return req;
                }
        }

        pthread_rwlock_unlock(&instance->sent_lock);

        return NULL;
}

static struct cdap_req * cdap_sent_get_by_iid(struct cdap * instance,
                                              invoke_id_t   iid)
{
        struct list_head * p = NULL;
        struct cdap_req *  req = NULL;

        assert(instance);

        pthread_rwlock_rdlock(&instance->sent_lock);

        list_for_each(p, &instance->sent) {
                req = list_entry(p, struct cdap_req, next);
                if (req->iid == iid) {
                        pthread_rwlock_unlock(&instance->sent_lock);
                        return req;
                }
        }

        pthread_rwlock_unlock(&instance->sent_lock);

        return NULL;
}

static struct cdap_rcvd * cdap_rcvd_get_by_key(struct cdap * instance,
                                               cdap_key_t    key)
{
        struct list_head * p = NULL;
        struct list_head * h = NULL;
        struct cdap_rcvd * rcvd = NULL;

        assert(instance);

        pthread_mutex_lock(&instance->rcvd_lock);

        list_for_each_safe(p, h, &instance->rcvd) {
                rcvd = list_entry(p, struct cdap_rcvd, next);
                if (rcvd->key == key) {
                        pthread_mutex_unlock(&instance->rcvd_lock);
                        list_del(&rcvd->next);
                        return rcvd;
                }
        }

        pthread_mutex_unlock(&instance->rcvd_lock);

        assert(false);

        return NULL;
}

static struct cdap_req * cdap_sent_add(struct cdap * instance,
                                       int           fd,
                                       invoke_id_t   iid,
                                       cdap_key_t    key)
{
        struct cdap_req * req;

        assert(instance);
        assert(!cdap_sent_has_key(instance, key));

        req = cdap_req_create(fd, iid, key);
        if (req == NULL)
                return NULL;

        pthread_rwlock_wrlock(&instance->sent_lock);

        list_add(&req->next, &instance->sent);

        pthread_rwlock_unlock(&instance->sent_lock);

        return req;
}

static void cdap_sent_del(struct cdap *     instance,
                          struct cdap_req * req)
{
        assert(instance);
        assert(req);

        assert(cdap_sent_has_key(instance, req->key));

        pthread_rwlock_wrlock(&instance->sent_lock);

        list_del(&req->next);

        pthread_rwlock_unlock(&instance->sent_lock);

        cdap_req_destroy(req);
}

static void cdap_sent_destroy(struct cdap * instance)
{
        struct list_head * p = NULL;
        struct list_head * h = NULL;

        assert(instance);

        pthread_rwlock_wrlock(&instance->sent_lock);

        list_for_each_safe(p, h, &instance->sent) {
                struct cdap_req * req = list_entry(p, struct cdap_req, next);
                list_del(&req->next);
                cdap_req_cancel(req);
                cdap_req_destroy(req);
        }

        pthread_rwlock_unlock(&instance->sent_lock);
}

static void cdap_rcvd_destroy(struct cdap * instance)
{
        struct list_head * p = NULL;
        struct list_head * h = NULL;

        assert(instance);

        pthread_mutex_lock(&instance->rcvd_lock);

        list_for_each_safe(p, h, &instance->sent) {
                struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next);
                list_del(&r->next);
                if (r->data != NULL)
                        free(r->data);
                if (r->name != NULL)
                        free(r->name);
                free(r);
        }

        pthread_cond_broadcast(&instance->rcvd_cond);

        pthread_mutex_unlock(&instance->rcvd_lock);
}

static void * sdu_reader(void * o)
{
        struct cdap * instance = (struct cdap *) o;
        struct cdap_req * req;
        struct cdap_rcvd * rcvd;
        cdap_t * msg;
        uint8_t buf[BUF_SIZE];
        ssize_t len;
        buffer_t data;

        while (flow_event_wait(instance->set, instance->fq, NULL)) {
                int fd = fqueue_next(instance->fq);
                len = flow_read(fd, buf, BUF_SIZE);
                if (len < 0)
                        continue;

                msg = cdap__unpack(NULL, len, buf);
                if (msg == NULL)
                        continue;

                if (msg->opcode != CDAP_REPLY) {
                        rcvd = malloc(sizeof(*rcvd));
                        if (rcvd == NULL) {
                                cdap__free_unpacked(msg, NULL);
                                continue;
                        }

                        assert(msg->name);

                        rcvd->opcode = msg->opcode;
                        rcvd->fd     = fd;
                        rcvd->iid    = msg->invoke_id;
                        rcvd->key    = next_id(instance);
                        if (rcvd->key == INVALID_ID) {
                                cdap__free_unpacked(msg, NULL);
                                free(rcvd);
                                continue;
                        }

                        rcvd->flags  = msg->flags;
                        rcvd->proc   = false;
                        rcvd->name   = strdup(msg->name);
                        if (rcvd->name == NULL) {
                                release_id(instance, rcvd->key);
                                cdap__free_unpacked(msg, NULL);
                                free(rcvd);
                                continue;
                        }

                        if (msg->has_value) {
                                rcvd->len = msg->value.len;
                                rcvd->data = malloc(rcvd->len);
                                if (rcvd->data == NULL) {
                                        release_id(instance, rcvd->key);
                                        cdap__free_unpacked(msg, NULL);
                                        free(rcvd->name);
                                        free(rcvd);
                                        continue;
                                }
                                memcpy(rcvd->data, msg->value.data, rcvd->len);
                        } else {
                                rcvd->len = 0;
                                rcvd->data = NULL;
                        }

                        pthread_mutex_lock(&instance->rcvd_lock);

                        list_add(&rcvd->next, &instance->rcvd);

                        pthread_cond_signal(&instance->rcvd_cond);
                        pthread_mutex_unlock(&instance->rcvd_lock);
                } else  {
                        req = cdap_sent_get_by_iid(instance, msg->invoke_id);
                        if (req == NULL) {
                                cdap__free_unpacked(msg, NULL);
                                continue;
                        }

                        if (msg->has_value) {
                                data.len = msg->value.len;
                                data.data = malloc(data.len);
                                if (data.data == NULL) {
                                        cdap__free_unpacked(msg, NULL);
                                        continue;
                                }
                                memcpy(data.data, msg->value.data, data.len);
                        } else {
                                data.len = 0;
                                data.data = NULL;
                        }

                        cdap_req_respond(req, msg->result, data);
                }

                cdap__free_unpacked(msg, NULL);
        }

        return (void *) 0;
}

struct cdap * cdap_create()
{
        struct cdap * instance = NULL;

        instance = malloc(sizeof(*instance));
        if (instance == NULL)
                return NULL;

        if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
                free(instance);
                return NULL;
        }

        if (pthread_mutex_init(&instance->ids_lock, NULL)) {
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
                pthread_mutex_destroy(&instance->rcvd_lock);
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        if (pthread_cond_init(&instance->rcvd_cond, NULL)) {
                pthread_rwlock_destroy(&instance->sent_lock);
                pthread_mutex_destroy(&instance->rcvd_lock);
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        instance->ids = bmp_create(IDS_SIZE, 0);
        if (instance->ids == NULL) {
                pthread_cond_destroy(&instance->rcvd_cond);
                pthread_rwlock_destroy(&instance->sent_lock);
                pthread_mutex_destroy(&instance->rcvd_lock);
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        instance->set = flow_set_create();
        if (instance->set == NULL) {
                bmp_destroy(instance->ids);
                pthread_cond_destroy(&instance->rcvd_cond);
                pthread_rwlock_destroy(&instance->sent_lock);
                pthread_mutex_destroy(&instance->rcvd_lock);
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
                return NULL;
        }

        instance->fq = fqueue_create();
        if (instance->fq == NULL) {
                flow_set_destroy(instance->set);
                bmp_destroy(instance->ids);
                pthread_cond_destroy(&instance->rcvd_cond);
                pthread_rwlock_destroy(&instance->sent_lock);
                pthread_mutex_destroy(&instance->rcvd_lock);
                pthread_mutex_destroy(&instance->ids_lock);
                pthread_rwlock_destroy(&instance->flows_lock);
                free(instance);
        }

        instance->n_flows = 0;

        list_head_init(&instance->flows);
        list_head_init(&instance->sent);
        list_head_init(&instance->rcvd);

        pthread_create(&instance->reader, NULL, sdu_reader, instance);

        return instance;
}

int cdap_destroy(struct cdap * instance)
{
        struct list_head * p;
        struct list_head * h;

        if (instance == NULL)
                return 0;

        pthread_cancel(instance->reader);
        pthread_join(instance->reader, NULL);

        fqueue_destroy(instance->fq);

        flow_set_destroy(instance->set);

        pthread_rwlock_wrlock(&instance->flows_lock);

        list_for_each_safe(p,h, &instance->flows) {
                struct fd_el * e = list_entry(p, struct fd_el, next);
                list_del(&e->next);
                free(e);
        }

        pthread_rwlock_unlock(&instance->flows_lock);

        pthread_rwlock_destroy(&instance->flows_lock);

        pthread_mutex_lock(&instance->ids_lock);

        bmp_destroy(instance->ids);

        pthread_mutex_unlock(&instance->ids_lock);

        pthread_mutex_destroy(&instance->ids_lock);

        cdap_sent_destroy(instance);

        pthread_rwlock_destroy(&instance->sent_lock);

        cdap_rcvd_destroy(instance);

        pthread_mutex_destroy(&instance->rcvd_lock);

        free(instance);

        return 0;
}

int cdap_add_flow(struct cdap * instance,
                  int           fd)
{
        struct fd_el * e;

        if (fd < 0)
                return -EINVAL;

        e = malloc(sizeof(*e));
        if (e == NULL)
                return -ENOMEM;

        e->fd = fd;

        pthread_rwlock_wrlock(&instance->flows_lock);

        if (flow_set_add(instance->set, fd)) {
                pthread_rwlock_unlock(&instance->flows_lock);
                return -1;
        }

        list_add(&e->next, &instance->flows);

        ++instance->n_flows;

        pthread_rwlock_unlock(&instance->flows_lock);

        return 0;
}

int cdap_del_flow(struct cdap * instance,
                  int           fd)
{
        struct list_head * p;
        struct list_head * h;

        if (fd < 0)
                return -EINVAL;

        pthread_rwlock_wrlock(&instance->flows_lock);

        flow_set_del(instance->set, fd);

        list_for_each_safe(p, h, &instance->flows) {
                struct fd_el * e = list_entry(p, struct fd_el, next);
                if (e->fd == fd) {
                        list_del(&e->next);
                        free(e);
                        break;
                }
        }

        --instance->n_flows;

        pthread_rwlock_unlock(&instance->flows_lock);

        return 0;
}

static int write_msg(int           fd,
                     cdap_t *      msg)
{
        uint8_t * data;
        size_t len;

        assert(msg);

        len = cdap__get_packed_size(msg);
        if (len == 0)
                return -1;

        data = malloc(len);
        if (data == NULL)
                return -ENOMEM;

        cdap__pack(msg, data);

        if (flow_write(fd, data, len)) {
                free(data);
                return -1;
        }

        free(data);

        return 0;
}

cdap_key_t * cdap_request_send(struct cdap *    instance,
                               enum cdap_opcode code,
                               const char *     name,
                               const void *     data,
                               size_t           len,
                               uint32_t         flags)
{
        cdap_key_t *       keys;
        cdap_key_t *       key;
        cdap_t             msg = CDAP__INIT;
        struct list_head * p;
        int                ret;

        if (instance == NULL || name == NULL || code > CDAP_DELETE)
                return NULL;

        pthread_rwlock_rdlock(&instance->flows_lock);

        keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
        if (keys == NULL)
                return NULL;

        memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));

        key = keys;

        cdap__init(&msg);

        msg.opcode = code;
        msg.name = (char *) name;
        msg.has_flags = true;
        msg.flags = flags;

        if (data != NULL) {
                msg.has_value = true;
                msg.value.data = (uint8_t *) data;
                msg.value.len = len;
        }

        list_for_each(p, &instance->flows) {
                struct cdap_req * req;
                invoke_id_t iid;
                struct fd_el * e;

                iid = next_id(instance);
                if (iid == INVALID_ID) {
                        pthread_rwlock_unlock(&instance->flows_lock);
                        return keys;
                }

                msg.invoke_id = iid;

                e = list_entry(p, struct fd_el, next);

                *key = next_id(instance);
                if (*key == INVALID_ID) {
                        pthread_rwlock_unlock(&instance->flows_lock);
                        release_id(instance, iid);
                        return keys;
                }

                req = cdap_sent_add(instance, e->fd, iid, *key);
                if (req == NULL) {
                        pthread_rwlock_unlock(&instance->flows_lock);
                        release_id(instance, *key);
                        release_id(instance, iid);
                        return keys;
                }

                ret = write_msg(e->fd, &msg);
                if (ret == -ENOMEM) {
                        pthread_rwlock_unlock(&instance->flows_lock);
                        cdap_sent_del(instance, req);
                        release_id(instance, *key);
                        release_id(instance, iid);
                        return keys;
                }

                if (ret < 0) {
                        cdap_sent_del(instance, req);
                        release_id(instance, *key);
                        release_id(instance, iid);
                }

                ++key;
        }

        pthread_rwlock_unlock(&instance->flows_lock);

        return keys;
}

int cdap_reply_wait(struct cdap * instance,
                    cdap_key_t    key,
                    uint8_t **    data,
                    size_t *      len)
{
        int ret;
        struct cdap_req * r;
        invoke_id_t iid;

        if (instance == NULL || (data != NULL && len == NULL))
                return -EINVAL;

        r = cdap_sent_get_by_key(instance, key);
        if (r == NULL)
                return -EINVAL;

        iid = r->iid;

        ret = cdap_req_wait(r);
        if (ret < 0) {
                cdap_sent_del(instance, r);
                release_id(instance, iid);
                return ret;
        }

        assert(ret == 0);

        if (data != NULL) {
                *data = r->data.data;
                *len  = r->data.len;
        }

        ret = r->response;

        cdap_sent_del(instance, r);
        release_id(instance, iid);

        return ret;
}

cdap_key_t cdap_request_wait(struct cdap *      instance,
                             enum cdap_opcode * opcode,
                             char **            name,
                             uint8_t **         data,
                             size_t *           len,
                             uint32_t *         flags)
{
        struct cdap_rcvd * rcv = NULL;

        if (instance == NULL || opcode == NULL || name == NULL || data == NULL
            || len == NULL || flags == NULL)
                return -EINVAL;

        pthread_mutex_lock(&instance->rcvd_lock);

        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
                             (void *) &instance->rcvd_lock);

        while (rcv == NULL) {
                while (list_is_empty(&instance->rcvd))
                        pthread_cond_wait(&instance->rcvd_cond,
                                          &instance->rcvd_lock);

                rcv = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
                if (rcv->proc) {
                        rcv = NULL;
                        pthread_cond_wait(&instance->rcvd_cond,
                                          &instance->rcvd_lock);
                }
        }

        rcv->proc = true;
        list_del(&rcv->next);
        list_add_tail(&rcv->next, &instance->rcvd);

        pthread_cleanup_pop(true);

        *opcode = rcv->opcode;
        *name   = rcv->name;
        *data   = rcv->data;
        *len    = rcv->len;
        *flags  = rcv->flags;

        rcv->name = NULL;
        rcv->data = NULL;

        return rcv->key;
}

int cdap_reply_send(struct cdap * instance,
                    cdap_key_t    key,
                    int           result,
                    const void *  data,
                    size_t        len)
{
        int                fd;
        cdap_t             msg  = CDAP__INIT;
        struct cdap_rcvd * rcvd;

        if (instance == NULL)
                return -EINVAL;

        rcvd = cdap_rcvd_get_by_key(instance, key);
        if (rcvd == NULL)
                return -1;

        msg.opcode = CDAP_REPLY;
        msg.invoke_id = rcvd->iid;
        msg.has_result = true;
        msg.result = result;

        if (data != NULL) {
                msg.has_value = true;
                msg.value.data = (uint8_t *) data;
                msg.value.len = len;
        }

        fd = rcvd->fd;

        release_id(instance, rcvd->key);

        assert(rcvd->data == NULL);
        assert(rcvd->name == NULL);
        assert(rcvd->proc);

        free(rcvd);

        return write_msg(fd, &msg);
}