summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-12-24 12:54:16 +0100
committerSander Vrijders <[email protected]>2016-12-24 12:54:16 +0100
commit7348a7d587adc3cb1bc11ef7513ef7b03bc40349 (patch)
tree54c4406c135621d44fd9468c12a6b26e3e5d4f74 /src/lib
parent55eaed508c9f68b350f29cf2c4e96be4e57b0b37 (diff)
parent8910bd28e2b6269f0900c8215352ab5d177a3b54 (diff)
downloadouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.tar.gz
ouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.zip
Merged in dstaesse/ouroboros/be-normal (pull request #325)
Be normal
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/cdap.c531
-rw-r--r--src/lib/cdap_req.c151
-rw-r--r--src/lib/cdap_req.h69
-rw-r--r--src/lib/dev.c8
5 files changed, 618 insertions, 142 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 9eaca9fd..22971806 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -29,6 +29,7 @@ set(SOURCE_FILES
# Add source files here
bitmap.c
cdap.c
+ cdap_req.c
dev.c
hashtable.c
irm.c
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index df79be54..dee8f88c 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -25,39 +25,65 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
+
+#include "cdap_req.h"
#include <stdlib.h>
#include <pthread.h>
+#include <string.h>
+#include <assert.h>
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
typedef Opcode opcode_t;
+typedef int32_t invoke_id_t;
+
+#define INVALID_INVOKE_ID -1
#define IDS_SIZE 256
#define BUF_SIZE 2048
struct cdap {
- int fd;
- struct bmp * ids;
- pthread_mutex_t ids_lock;
- pthread_t reader;
- struct cdap_ops * ops;
+ int fd;
+
+ struct bmp * ids;
+ pthread_mutex_t ids_lock;
+
+ pthread_t reader;
+
+ struct list_head sent;
+ pthread_rwlock_t sent_lock;
+
+ struct list_head rcvd;
+ pthread_cond_t rcvd_cond;
+ pthread_mutex_t rcvd_lock;
};
-struct cdap_info {
- pthread_t thread;
- struct cdap * instance;
- cdap_t * msg;
+struct cdap_rcvd {
+ struct list_head next;
+
+ invoke_id_t iid;
+
+ enum cdap_opcode opcode;
+ char * name;
+ uint8_t * data;
+ size_t len;
+ uint32_t flags;
};
static int next_invoke_id(struct cdap * instance)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_allocate(instance->ids);
if (!bmp_is_id_valid(instance->ids, ret))
- ret = -1; /* INVALID_INVOKE_ID */
+ ret = INVALID_INVOKE_ID;
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
@@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id)
{
int ret;
+ assert(instance);
+
pthread_mutex_lock(&instance->ids_lock);
+
ret = bmp_release(instance->ids, id);
+
pthread_mutex_unlock(&instance->ids_lock);
return ret;
}
-static void * handle_cdap_msg(void * o)
+#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL)
+
+struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key)
{
- struct cdap_info * info = (struct cdap_info *) o;
- struct cdap * instance = info->instance;
- cdap_t * msg = info->msg;
-
- switch (msg->opcode) {
- case OPCODE__READ:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_READ,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__WRITE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_WRITE,
- msg->name,
- msg->value.data,
- msg->value.len,
- msg->flags);
- break;
- case OPCODE__CREATE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_CREATE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__DELETE:
- if (msg->name != NULL &&
- msg->has_value)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_DELETE,
- msg->name,
- msg->value.data,
- msg->value.len, 0);
- break;
- case OPCODE__START:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_START,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__STOP:
- if (msg->name != NULL)
- instance->ops->cdap_request(instance,
- msg->invoke_id,
- CDAP_STOP,
- msg->name,
- NULL, 0, 0);
- break;
- case OPCODE__REPLY:
- instance->ops->cdap_reply(instance,
- msg->invoke_id,
- msg->result,
- msg->value.data,
- msg->value.len);
- release_invoke_id(instance, msg->invoke_id);
- break;
- default:
- break;
+ struct list_head * p = NULL;
+ struct cdap_req * req = NULL;
+
+ assert(instance);
+ assert(key >= 0);
+
+ pthread_rwlock_rdlock(&instance->sent_lock);
+
+ list_for_each(p, &instance->sent) {
+ req = list_entry(p, struct cdap_req, next);
+ if (req->key == key) {
+ pthread_rwlock_unlock(&instance->sent_lock);
+ return req;
+ }
}
- free(info);
- cdap__free_unpacked(msg, NULL);
+ pthread_rwlock_unlock(&instance->sent_lock);
- return (void *) 0;
+ return NULL;
+}
+
+static int cdap_sent_add(struct cdap * instance, struct cdap_req * req)
+{
+ assert (instance);
+ assert (req);
+
+ if (cdap_sent_has_key(instance, req->key))
+ return -EPERM;
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_add(&req->next, &instance->sent);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+
+ return 0;
+}
+
+static void cdap_sent_del(struct cdap * instance, struct cdap_req * req)
+{
+ assert(instance);
+ assert(req);
+
+ assert(cdap_sent_has_key(instance, req->key));
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_del(&req->next);
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_sent_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_rwlock_wrlock(&instance->sent_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_req * req = list_entry(p, struct cdap_req, next);
+ list_del(&req->next);
+ cdap_req_destroy(req);
+ }
+
+ pthread_rwlock_unlock(&instance->sent_lock);
+}
+
+static void cdap_rcvd_destroy(struct cdap * instance)
+{
+ struct list_head * p = NULL;
+ struct list_head * h = NULL;
+
+ assert(instance);
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_for_each_safe(p, h, &instance->sent) {
+ struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next);
+ list_del(&r->next);
+ if (r->data != NULL)
+ free(r->data);
+ if (r->name != NULL)
+ free(r->name);
+ free(r);
+ }
+
+ pthread_mutex_unlock(&instance->rcvd_lock);
}
static void * sdu_reader(void * o)
{
struct cdap * instance = (struct cdap *) o;
+ struct cdap_req * req;
+ struct cdap_rcvd * rcvd;
cdap_t * msg;
uint8_t buf[BUF_SIZE];
ssize_t len;
- struct cdap_info * cdap_info;
+ buffer_t data;
while (true) {
len = flow_read(instance->fd, buf, BUF_SIZE);
if (len < 0)
- return (void *) -1;
+ continue;
msg = cdap__unpack(NULL, len, buf);
if (msg == NULL)
continue;
- cdap_info = malloc(sizeof(*cdap_info));
- if (cdap_info == NULL) {
- cdap__free_unpacked(msg, NULL);
- continue;
+ if (msg->opcode != OPCODE__REPLY) {
+ rcvd = malloc(sizeof(*rcvd));
+ if (rcvd == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+
+ switch (msg->opcode) {
+ case OPCODE__START:
+ rcvd->opcode = CDAP_START;
+ break;
+ case OPCODE__STOP:
+ rcvd->opcode = CDAP_STOP;
+ break;
+ case OPCODE__READ:
+ rcvd->opcode = CDAP_READ;
+ break;
+ case OPCODE__WRITE:
+ rcvd->opcode = CDAP_WRITE;
+ break;
+ case OPCODE__CREATE:
+ rcvd->opcode = CDAP_CREATE;
+ break;
+ case OPCODE__DELETE:
+ rcvd->opcode = CDAP_DELETE;
+ break;
+ default:
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ rcvd->iid = msg->invoke_id;
+ rcvd->flags = msg->flags;
+ rcvd->name = strdup(msg->name);
+ if (rcvd->name == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+
+ if (msg->has_value) {
+ rcvd->len = msg->value.len;
+ rcvd->data = malloc(rcvd->len);
+ if (rcvd->data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ free(rcvd);
+ continue;
+ }
+ memcpy(rcvd->data, msg->value.data, rcvd->len);
+ } else {
+ rcvd->len = 0;
+ rcvd->data = NULL;
+ }
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ list_add(&rcvd->next, &instance->rcvd);
+
+ pthread_cond_signal(&instance->rcvd_cond);
+ pthread_mutex_unlock(&instance->rcvd_lock);
+ } else {
+ req = cdap_sent_get_by_key(instance, msg->invoke_id);
+ if (req == NULL)
+ continue;
+
+ if (msg->has_value) {
+ data.len = msg->value.len;
+ data.data = malloc(data.len);
+ if (data.data == NULL) {
+ cdap__free_unpacked(msg, NULL);
+ continue;
+ }
+ memcpy(data.data, msg->value.data, data.len);
+ } else {
+ data.len = 0;
+ data.data = NULL;
+ }
+
+ cdap_req_respond(req, msg->result, data);
}
- cdap_info->instance = instance;
- cdap_info->msg = msg;
-
- pthread_create(&cdap_info->thread,
- NULL,
- handle_cdap_msg,
- (void *) cdap_info);
-
- pthread_detach(cdap_info->thread);
-
+ cdap__free_unpacked(msg, NULL);
}
return (void *) 0;
}
-struct cdap * cdap_create(struct cdap_ops * ops,
- int fd)
+struct cdap * cdap_create(int fd)
{
struct cdap * instance = NULL;
int flags;
- if (ops == NULL || fd < 0 ||
- ops->cdap_reply == NULL ||
- ops->cdap_request == NULL)
+ if (fd < 0)
return NULL;
flags = flow_get_flags(fd);
@@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops,
return NULL;
}
- instance->ops = ops;
- instance->fd = fd;
+ if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
+
+ if (pthread_cond_init(&instance->rcvd_cond, NULL)) {
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ free(instance);
+ return NULL;
+ }
instance->ids = bmp_create(IDS_SIZE, 0);
if (instance->ids == NULL) {
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
free(instance);
return NULL;
}
- pthread_create(&instance->reader,
- NULL,
- sdu_reader,
- (void *) instance);
+ INIT_LIST_HEAD(&instance->sent);
+ INIT_LIST_HEAD(&instance->rcvd);
+
+ instance->fd = fd;
+
+ pthread_create(&instance->reader, NULL, sdu_reader, instance);
return instance;
}
@@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance)
pthread_mutex_unlock(&instance->ids_lock);
- flow_dealloc(instance->fd);
+ pthread_mutex_destroy(&instance->ids_lock);
+
+ cdap_sent_destroy(instance);
+
+ pthread_rwlock_destroy(&instance->sent_lock);
+
+ cdap_rcvd_destroy(instance);
+
+ pthread_mutex_destroy(&instance->rcvd_lock);
free(instance);
return 0;
}
-static int write_msg(struct cdap * instance,
- cdap_t * msg)
+static int write_msg(struct cdap * instance, cdap_t * msg)
{
int ret;
uint8_t * data;
size_t len;
+ assert(instance);
+ assert(msg);
+
len = cdap__get_packed_size(msg);
if (len == 0)
return -1;
- data = malloc(BUF_SIZE);
+ data = malloc(len);
if (data == NULL)
- return -1;
+ return -ENOMEM;
cdap__pack(msg, data);
@@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance,
return ret;
}
-int cdap_send_request(struct cdap * instance,
- enum cdap_opcode code,
- char * name,
- uint8_t * data,
- size_t len,
- uint32_t flags)
+static cdap_key_t invoke_id_to_key(invoke_id_t iid)
+{
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
+
+ return (cdap_key_t) iid;
+}
+
+static invoke_id_t key_to_invoke_id(cdap_key_t key)
+{
+ if (key == INVALID_CDAP_KEY)
+ return INVALID_INVOKE_ID;
+
+ return (invoke_id_t) key;
+}
+
+cdap_key_t cdap_request_send(struct cdap * instance,
+ enum cdap_opcode code,
+ char * name,
+ uint8_t * data,
+ size_t len,
+ uint32_t flags)
{
- int id;
cdap_t msg = CDAP__INIT;
+ struct cdap_req * req;
+ invoke_id_t iid;
+ cdap_key_t key;
if (instance == NULL || name == NULL)
- return -1;
+ return -EINVAL;
- id = next_invoke_id(instance);
- if (!bmp_is_id_valid(instance->ids, id))
- return -1;
+
+ iid = next_invoke_id(instance);
+ if (iid == INVALID_INVOKE_ID)
+ return INVALID_CDAP_KEY;
switch (code) {
case CDAP_READ:
@@ -315,39 +479,132 @@ int cdap_send_request(struct cdap * instance,
msg.opcode = OPCODE__STOP;
break;
default:
- release_invoke_id(instance, id);
- return -1;
+ release_invoke_id(instance, iid);
+ return -EINVAL;
}
msg.name = name;
msg.has_flags = true;
msg.flags = flags;
- msg.invoke_id = id;
+ msg.invoke_id = iid;
if (data != NULL) {
msg.has_value = true;
msg.value.data = data;
msg.value.len = len;
}
- if (write_msg(instance, &msg))
- return -1;
+ key = invoke_id_to_key(iid);
+
+ req = cdap_req_create(key);
+ if (req == NULL)
+ return INVALID_CDAP_KEY;
+
+ if (cdap_sent_add(instance, req)) {
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ if (write_msg(instance, &msg)) {
+ cdap_sent_del(instance, req);
+ cdap_req_destroy(req);
+ return INVALID_CDAP_KEY;
+ }
+
+ return key;
+}
+
+int cdap_reply_wait(struct cdap * instance,
+ cdap_key_t key,
+ uint8_t ** data,
+ size_t * len)
+{
+ int ret;
+ struct cdap_req * r;
+ invoke_id_t iid = key_to_invoke_id(key);
+
+ if (instance == NULL || iid == INVALID_INVOKE_ID)
+ return -EINVAL;
+
+ r = cdap_sent_get_by_key(instance, key);
+ if (r == NULL)
+ return -EINVAL;
+
+ ret = cdap_req_wait(r);
+ if (ret < 0)
+ return ret;
+
+ if (r->response)
+ return r->response;
+
+ assert(ret == 0);
+
+ if (data != NULL) {
+ *data = r->data.data;
+ *len = r->data.len;
+ }
+
+ cdap_sent_del(instance, r);
- return id;
+ release_invoke_id(instance, iid);
+
+ return 0;
}
-int cdap_send_reply(struct cdap * instance,
- int invoke_id,
+cdap_key_t cdap_request_wait(struct cdap * instance,
+ enum cdap_opcode * opcode,
+ char ** name,
+ uint8_t ** data,
+ size_t * len,
+ uint32_t * flags)
+{
+ struct cdap_rcvd * rcvd;
+ invoke_id_t iid;
+
+ if (instance == NULL || opcode == NULL || name == NULL || data == NULL
+ || len == NULL || flags == NULL)
+ return -EINVAL;
+
+ pthread_mutex_lock(&instance->rcvd_lock);
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) &instance->rcvd_lock);
+
+ while (list_empty(&instance->rcvd))
+ pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock);
+
+ rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next);
+
+ list_del(&rcvd->next);
+
+ pthread_cleanup_pop(true);
+
+ *opcode = rcvd->opcode;
+ *name = rcvd->name;
+ *data = rcvd->data;
+ *len = rcvd->len;
+ *flags = rcvd->flags;
+
+ iid = rcvd->iid;
+
+ free(rcvd);
+
+ return invoke_id_to_key(iid);
+}
+
+int cdap_reply_send(struct cdap * instance,
+ cdap_key_t key,
int result,
uint8_t * data,
size_t len)
{
cdap_t msg = CDAP__INIT;
+ invoke_id_t iid = key_to_invoke_id(key);
if (instance == NULL)
- return -1;
+ return -EINVAL;
msg.opcode = OPCODE__REPLY;
- msg.invoke_id = invoke_id;
+ msg.invoke_id = iid;
msg.has_result = true;
msg.result = result;
diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c
new file mode 100644
index 00000000..02fa0846
--- /dev/null
+++ b/src/lib/cdap_req.c
@@ -0,0 +1,151 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * CDAP - CDAP request management
+ *
+ * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/config.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/errno.h>
+
+#include "cdap_req.h"
+
+#include <stdlib.h>
+#include <assert.h>
+
+struct cdap_req * cdap_req_create(cdap_key_t key)
+{
+ struct cdap_req * creq = malloc(sizeof(*creq));
+ pthread_condattr_t cattr;
+
+ if (creq == NULL)
+ return NULL;
+
+ creq->key = key;
+ creq->state = REQ_INIT;
+
+ creq->response = -1;
+ creq->data.data = NULL;
+ creq->data.len = 0;
+
+ pthread_condattr_init(&cattr);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ pthread_cond_init(&creq->cond, &cattr);
+ pthread_mutex_init(&creq->lock, NULL);
+
+ INIT_LIST_HEAD(&creq->next);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &creq->birth);
+
+ return creq;
+}
+
+void cdap_req_destroy(struct cdap_req * creq)
+{
+ assert(creq);
+
+ pthread_mutex_lock(&creq->lock);
+
+ if (creq->state == REQ_DESTROY) {
+ pthread_mutex_unlock(&creq->lock);
+ return;
+ }
+
+ if (creq->state == REQ_INIT)
+ creq->state = REQ_DONE;
+
+ if (creq->state == REQ_PENDING) {
+ creq->state = REQ_DESTROY;
+ pthread_cond_broadcast(&creq->cond);
+ }
+
+ while (creq->state != REQ_DONE)
+ pthread_cond_wait(&creq->cond, &creq->lock);
+
+ pthread_mutex_unlock(&creq->lock);
+
+ pthread_cond_destroy(&creq->cond);
+ pthread_mutex_destroy(&creq->lock);
+
+ free(creq);
+}
+
+int cdap_req_wait(struct cdap_req * creq)
+{
+ struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),
+ (CDAP_REPLY_TIMEOUT % 1000) * MILLION};
+ struct timespec abstime;
+ int ret = -1;
+
+ assert(creq);
+
+ ts_add(&creq->birth, &timeout, &abstime);
+
+ pthread_mutex_lock(&creq->lock);
+
+ if (creq->state != REQ_INIT) {
+ pthread_mutex_unlock(&creq->lock);
+ return -EINVAL;
+ }
+
+ creq->state = REQ_PENDING;
+
+ while (creq->state == REQ_PENDING) {
+ if ((ret = -pthread_cond_timedwait(&creq->cond,
+ &creq->lock,
+ &abstime)) == -ETIMEDOUT)
+ break;
+ }
+
+ if (creq->state == REQ_DESTROY)
+ ret = -1;
+
+ creq->state = REQ_DONE;
+ pthread_cond_broadcast(&creq->cond);
+
+ pthread_mutex_unlock(&creq->lock);
+
+ return ret;
+}
+
+void cdap_req_respond(struct cdap_req * creq, int response, buffer_t data)
+{
+ assert(creq);
+
+ pthread_mutex_lock(&creq->lock);
+
+ if (creq->state != REQ_PENDING) {
+ pthread_mutex_unlock(&creq->lock);
+ return;
+ }
+
+ creq->state = REQ_RESPONSE;
+ creq->response = response;
+ creq->data = data;
+
+ pthread_cond_broadcast(&creq->cond);
+
+ while (creq->state == REQ_RESPONSE)
+ pthread_cond_wait(&creq->cond, &creq->lock);
+
+ pthread_mutex_unlock(&creq->lock);
+}
diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h
new file mode 100644
index 00000000..714744ab
--- /dev/null
+++ b/src/lib/cdap_req.h
@@ -0,0 +1,69 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * CDAP - CDAP request management
+ *
+ * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_CDAP_REQ_H
+#define OUROBOROS_CDAP_REQ_H
+
+#include <ouroboros/config.h>
+#include <ouroboros/cdap.h>
+#include <ouroboros/list.h>
+#include <ouroboros/utils.h>
+
+#include <pthread.h>
+
+enum creq_state {
+ REQ_INIT = 0,
+ REQ_PENDING,
+ REQ_RESPONSE,
+ REQ_DONE,
+ REQ_DESTROY
+};
+
+struct cdap_req {
+ struct list_head next;
+
+ struct timespec birth;
+
+ cdap_key_t key;
+
+ int response;
+ buffer_t data;
+
+ enum creq_state state;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+};
+
+struct cdap_req * cdap_req_create(cdap_key_t key);
+
+void cdap_req_destroy(struct cdap_req * creq);
+
+int cdap_req_wait(struct cdap_req * creq);
+
+void cdap_req_respond(struct cdap_req * creq,
+ int response,
+ buffer_t data);
+
+enum creq_state cdap_req_get_state(struct cdap_req * creq);
+
+#endif /* OUROBOROS_CDAP_REQ_H */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index bad56129..20976375 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -134,7 +134,7 @@ struct flow {
struct shm_flow_set * set;
int port_id;
int oflags;
- enum qos_cube qos;
+ qoscube_t qos;
pid_t api;
@@ -654,9 +654,8 @@ int flow_dealloc(int fd)
pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
- if (recv_msg == NULL) {
+ if (recv_msg == NULL)
return -1;
- }
if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
@@ -1435,11 +1434,10 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_unlock(&ai.data_lock);
shm_rbuff_fini(rb);
-
return 0;
}
-int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube)
+int ipcp_flow_get_qoscube(int fd, qoscube_t * cube)
{
if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
return -EINVAL;