/* * Ouroboros - Copyright (C) 2016 - 2017 * * The Common Distributed Application Protocol * * Sander Vrijders <sander.vrijders@intec.ugent.be> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * version 2.1 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301 USA */ #include <ouroboros/config.h> #include <ouroboros/cdap.h> #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> #include <ouroboros/fcntl.h> #include <ouroboros/errno.h> #include "cdap_req.h" #include <stdlib.h> #include <pthread.h> #include <string.h> #include <assert.h> #include "cdap.pb-c.h" typedef Cdap cdap_t; typedef Opcode opcode_t; typedef int32_t invoke_id_t; #define INVALID_INVOKE_ID -1 #define IDS_SIZE 256 #define BUF_SIZE 2048 struct cdap { int fd; struct bmp * ids; pthread_mutex_t ids_lock; pthread_t reader; struct list_head sent; pthread_rwlock_t sent_lock; struct list_head rcvd; pthread_cond_t rcvd_cond; pthread_mutex_t rcvd_lock; }; struct cdap_rcvd { struct list_head next; invoke_id_t iid; enum cdap_opcode opcode; char * name; void * data; size_t len; uint32_t flags; }; static int next_invoke_id(struct cdap * instance) { int ret; assert(instance); pthread_mutex_lock(&instance->ids_lock); ret = bmp_allocate(instance->ids); if (!bmp_is_id_valid(instance->ids, ret)) ret = INVALID_INVOKE_ID; pthread_mutex_unlock(&instance->ids_lock); return ret; } static int release_invoke_id(struct cdap * instance, int id) { int ret; assert(instance); pthread_mutex_lock(&instance->ids_lock); ret = bmp_release(instance->ids, id); pthread_mutex_unlock(&instance->ids_lock); return ret; } #define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL) static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key) { struct list_head * p = NULL; struct cdap_req * req = NULL; assert(instance); assert(key >= 0); pthread_rwlock_rdlock(&instance->sent_lock); list_for_each(p, &instance->sent) { req = list_entry(p, struct cdap_req, next); if (req->key == key) { pthread_rwlock_unlock(&instance->sent_lock); return req; } } pthread_rwlock_unlock(&instance->sent_lock); return NULL; } static struct cdap_req * cdap_sent_add(struct cdap * instance, cdap_key_t key) { struct cdap_req * req; assert(instance); assert(key >= 0); assert(!cdap_sent_has_key(instance, key)); req = cdap_req_create(key); if (req == NULL) return NULL; pthread_rwlock_wrlock(&instance->sent_lock); list_add(&req->next, &instance->sent); pthread_rwlock_unlock(&instance->sent_lock); return req; } static void cdap_sent_del(struct cdap * instance, struct cdap_req * req) { assert(instance); assert(req); assert(cdap_sent_has_key(instance, req->key)); pthread_rwlock_wrlock(&instance->sent_lock); list_del(&req->next); pthread_rwlock_unlock(&instance->sent_lock); cdap_req_destroy(req); } static void cdap_sent_destroy(struct cdap * instance) { struct list_head * p = NULL; struct list_head * h = NULL; assert(instance); pthread_rwlock_wrlock(&instance->sent_lock); list_for_each_safe(p, h, &instance->sent) { struct cdap_req * req = list_entry(p, struct cdap_req, next); list_del(&req->next); cdap_req_destroy(req); } pthread_rwlock_unlock(&instance->sent_lock); } static void cdap_rcvd_destroy(struct cdap * instance) { struct list_head * p = NULL; struct list_head * h = NULL; assert(instance); pthread_mutex_lock(&instance->rcvd_lock); list_for_each_safe(p, h, &instance->sent) { struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next); list_del(&r->next); if (r->data != NULL) free(r->data); if (r->name != NULL) free(r->name); free(r); } pthread_mutex_unlock(&instance->rcvd_lock); } static void * sdu_reader(void * o) { struct cdap * instance = (struct cdap *) o; struct cdap_req * req; struct cdap_rcvd * rcvd; cdap_t * msg; uint8_t buf[BUF_SIZE]; ssize_t len; buffer_t data; while (true) { len = flow_read(instance->fd, buf, BUF_SIZE); if (len < 0) continue; msg = cdap__unpack(NULL, len, buf); if (msg == NULL) continue; if (msg->opcode != OPCODE__REPLY) { rcvd = malloc(sizeof(*rcvd)); if (rcvd == NULL) { cdap__free_unpacked(msg, NULL); continue; } switch (msg->opcode) { case OPCODE__START: rcvd->opcode = CDAP_START; break; case OPCODE__STOP: rcvd->opcode = CDAP_STOP; break; case OPCODE__READ: rcvd->opcode = CDAP_READ; break; case OPCODE__WRITE: rcvd->opcode = CDAP_WRITE; break; case OPCODE__CREATE: rcvd->opcode = CDAP_CREATE; break; case OPCODE__DELETE: rcvd->opcode = CDAP_DELETE; break; default: cdap__free_unpacked(msg, NULL); free(rcvd); continue; } rcvd->iid = msg->invoke_id; rcvd->flags = msg->flags; rcvd->name = strdup(msg->name); if (rcvd->name == NULL) { cdap__free_unpacked(msg, NULL); free(rcvd); continue; } if (msg->has_value) { rcvd->len = msg->value.len; rcvd->data = malloc(rcvd->len); if (rcvd->data == NULL) { cdap__free_unpacked(msg, NULL); free(rcvd); continue; } memcpy(rcvd->data, msg->value.data, rcvd->len); } else { rcvd->len = 0; rcvd->data = NULL; } pthread_mutex_lock(&instance->rcvd_lock); list_add(&rcvd->next, &instance->rcvd); pthread_cond_signal(&instance->rcvd_cond); pthread_mutex_unlock(&instance->rcvd_lock); } else { req = cdap_sent_get_by_key(instance, msg->invoke_id); if (req == NULL) continue; if (msg->has_value) { data.len = msg->value.len; data.data = malloc(data.len); if (data.data == NULL) { cdap__free_unpacked(msg, NULL); continue; } memcpy(data.data, msg->value.data, data.len); } else { data.len = 0; data.data = NULL; } cdap_req_respond(req, msg->result, data); } cdap__free_unpacked(msg, NULL); } return (void *) 0; } struct cdap * cdap_create(int fd) { struct cdap * instance = NULL; int flags; if (fd < 0) return NULL; flags = flow_get_flags(fd); if (flags & FLOW_O_NONBLOCK) return NULL; instance = malloc(sizeof(*instance)); if (instance == NULL) return NULL; if (pthread_mutex_init(&instance->ids_lock, NULL)) { free(instance); return NULL; } if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } if (pthread_rwlock_init(&instance->sent_lock, NULL)) { pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } if (pthread_cond_init(&instance->rcvd_cond, NULL)) { pthread_rwlock_destroy(&instance->sent_lock); pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } instance->ids = bmp_create(IDS_SIZE, 0); if (instance->ids == NULL) { pthread_cond_destroy(&instance->rcvd_cond); pthread_rwlock_destroy(&instance->sent_lock); pthread_mutex_destroy(&instance->rcvd_lock); pthread_mutex_destroy(&instance->ids_lock); free(instance); return NULL; } list_head_init(&instance->sent); list_head_init(&instance->rcvd); instance->fd = fd; pthread_create(&instance->reader, NULL, sdu_reader, instance); return instance; } int cdap_destroy(struct cdap * instance) { if (instance == NULL) return 0; pthread_cancel(instance->reader); pthread_join(instance->reader, NULL); pthread_mutex_lock(&instance->ids_lock); bmp_destroy(instance->ids); pthread_mutex_unlock(&instance->ids_lock); pthread_mutex_destroy(&instance->ids_lock); cdap_sent_destroy(instance); pthread_rwlock_destroy(&instance->sent_lock); cdap_rcvd_destroy(instance); pthread_mutex_destroy(&instance->rcvd_lock); free(instance); return 0; } static int write_msg(struct cdap * instance, cdap_t * msg) { int ret; uint8_t * data; size_t len; assert(instance); assert(msg); len = cdap__get_packed_size(msg); if (len == 0) return -1; data = malloc(len); if (data == NULL) return -ENOMEM; cdap__pack(msg, data); ret = flow_write(instance->fd, data, len); free(data); return ret; } static cdap_key_t invoke_id_to_key(invoke_id_t iid) { if (iid == INVALID_INVOKE_ID) return INVALID_CDAP_KEY; return (cdap_key_t) iid; } static invoke_id_t key_to_invoke_id(cdap_key_t key) { if (key == INVALID_CDAP_KEY) return INVALID_INVOKE_ID; return (invoke_id_t) key; } cdap_key_t cdap_request_send(struct cdap * instance, enum cdap_opcode code, const char * name, const void * data, size_t len, uint32_t flags) { cdap_t msg = CDAP__INIT; struct cdap_req * req; invoke_id_t iid; cdap_key_t key; if (instance == NULL || name == NULL) return -EINVAL; iid = next_invoke_id(instance); if (iid == INVALID_INVOKE_ID) return INVALID_CDAP_KEY; switch (code) { case CDAP_READ: msg.opcode = OPCODE__READ; break; case CDAP_WRITE: msg.opcode = OPCODE__WRITE; break; case CDAP_CREATE: msg.opcode = OPCODE__CREATE; break; case CDAP_DELETE: msg.opcode = OPCODE__DELETE; break; case CDAP_START: msg.opcode = OPCODE__START; break; case CDAP_STOP: msg.opcode = OPCODE__STOP; break; default: release_invoke_id(instance, iid); return -EINVAL; } msg.name = (char *) name; msg.has_flags = true; msg.flags = flags; msg.invoke_id = iid; if (data != NULL) { msg.has_value = true; msg.value.data = (uint8_t *) data; msg.value.len = len; } key = invoke_id_to_key(iid); req = cdap_sent_add(instance, key); if (req == NULL) { release_invoke_id(instance, iid); return INVALID_CDAP_KEY; } if (write_msg(instance, &msg)) { cdap_sent_del(instance, req); release_invoke_id(instance, iid); return INVALID_CDAP_KEY; } return key; } int cdap_reply_wait(struct cdap * instance, cdap_key_t key, uint8_t ** data, size_t * len) { int ret; struct cdap_req * r; invoke_id_t iid = key_to_invoke_id(key); if (instance == NULL || iid == INVALID_INVOKE_ID) return -EINVAL; r = cdap_sent_get_by_key(instance, key); if (r == NULL) return -EINVAL; ret = cdap_req_wait(r); if (ret < 0) return ret; if (r->response) return r->response; assert(ret == 0); if (data != NULL) { *data = r->data.data; *len = r->data.len; } cdap_sent_del(instance, r); release_invoke_id(instance, iid); return 0; } cdap_key_t cdap_request_wait(struct cdap * instance, enum cdap_opcode * opcode, char ** name, uint8_t ** data, size_t * len, uint32_t * flags) { struct cdap_rcvd * rcvd; invoke_id_t iid; if (instance == NULL || opcode == NULL || name == NULL || data == NULL || len == NULL || flags == NULL) return -EINVAL; pthread_mutex_lock(&instance->rcvd_lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) &instance->rcvd_lock); while (list_is_empty(&instance->rcvd)) pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); list_del(&rcvd->next); pthread_cleanup_pop(true); *opcode = rcvd->opcode; *name = rcvd->name; *data = rcvd->data; *len = rcvd->len; *flags = rcvd->flags; iid = rcvd->iid; free(rcvd); return invoke_id_to_key(iid); } int cdap_reply_send(struct cdap * instance, cdap_key_t key, int result, const void * data, size_t len) { cdap_t msg = CDAP__INIT; invoke_id_t iid = key_to_invoke_id(key); if (instance == NULL) return -EINVAL; msg.opcode = OPCODE__REPLY; msg.invoke_id = iid; msg.has_result = true; msg.result = result; if (data != NULL) { msg.has_value = true; msg.value.data = (uint8_t *) data; msg.value.len = len; } return write_msg(instance, &msg); }