summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-03-03 15:41:11 +0100
committerdimitri staessens <[email protected]>2017-03-03 15:54:44 +0100
commita688b8a38d7eb9f42406eeb611717db737b0d257 (patch)
tree68a2bf098171299d2c30f9537c89b862edb0f996 /src
parentd753cab1897e323b59923e2b0b11f550b087351c (diff)
downloadouroboros-a688b8a38d7eb9f42406eeb611717db737b0d257.tar.gz
ouroboros-a688b8a38d7eb9f42406eeb611717db737b0d257.zip
lib: Manage multiple flows with a single CDAP instance
You can now add multiple flows to a CDAP instance. This will simplify sending messages to different peers (e.g. for syncing the RIB). A request will now return an array of keys terminated by CDAP_KEY_INVALID. Removes the enum from the CDAP proto file to just take the opcode as an integer.
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/normal/enroll.c48
-rw-r--r--src/lib/cdap.c341
-rw-r--r--src/lib/cdap.proto12
-rw-r--r--src/lib/cdap_req.c9
-rw-r--r--src/lib/cdap_req.h5
5 files changed, 279 insertions, 136 deletions
diff --git a/src/ipcpd/normal/enroll.c b/src/ipcpd/normal/enroll.c
index 680cfbba..4e510038 100644
--- a/src/ipcpd/normal/enroll.c
+++ b/src/ipcpd/normal/enroll.c
@@ -78,13 +78,20 @@ static void * enroll_handle(void * o)
continue;
}
- cdap = cdap_create(conn.flow_info.fd);
+ cdap = cdap_create();
if (cdap == NULL) {
log_err("Failed to instantiate CDAP.");
flow_dealloc(conn.flow_info.fd);
continue;
}
+ if (cdap_add_flow(cdap, conn.flow_info.fd)) {
+ log_warn("Failed to add flow to CDAP.");
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ continue;
+ }
+
while (!(boot_r && members_r && dif_name_r)) {
key = cdap_request_wait(cdap, &oc, &name, &data,
(size_t *) &len , &flags);
@@ -167,7 +174,7 @@ static void * enroll_handle(void * o)
int enroll_boot(char * dst_name)
{
struct cdap * cdap;
- cdap_key_t key;
+ cdap_key_t * key;
uint8_t * data;
size_t len;
struct conn conn;
@@ -186,31 +193,41 @@ int enroll_boot(char * dst_name)
return -1;
}
- cdap = cdap_create(conn.flow_info.fd);
+ cdap = cdap_create();
if (cdap == NULL) {
log_err("Failed to instantiate CDAP.");
return -1;
}
+ if (cdap_add_flow(cdap, conn.flow_info.fd)) {
+ log_warn("Failed to add flow to CDAP.");
+ cdap_destroy(cdap);
+ flow_dealloc(conn.flow_info.fd);
+ return -1;
+ }
+
log_dbg("Getting boot information from %s.", dst_name);
clock_gettime(CLOCK_REALTIME, &t0);
key = cdap_request_send(cdap, CDAP_READ, TIME_PATH, NULL, 0, 0);
- if (key < 0) {
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(cdap, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
+ free(key);
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
clock_gettime(CLOCK_REALTIME, &rtt);
delta_t = ts_diff_ms(&t0, &rtt);
@@ -226,20 +243,23 @@ int enroll_boot(char * dst_name)
free(data);
key = cdap_request_send(cdap, CDAP_READ, boot_ro, NULL, 0, 0);
- if (key < 0) {
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(cdap, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
+ free(key);
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
@@ -254,20 +274,23 @@ int enroll_boot(char * dst_name)
log_dbg("Packed information inserted into RIB.");
key = cdap_request_send(cdap, CDAP_READ, members_ro, NULL, 0, 0);
- if (key < 0) {
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(cdap, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
+ free(key);
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
@@ -282,20 +305,23 @@ int enroll_boot(char * dst_name)
log_dbg("Packed information inserted into RIB.");
key = cdap_request_send(cdap, CDAP_READ, dif_ro, NULL, 0, 0);
- if (key < 0) {
+ if (key == NULL) {
log_err("Failed to send CDAP request.");
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
- if (cdap_reply_wait(cdap, key, &data, &len)) {
+ if (cdap_reply_wait(cdap, key[0], &data, &len)) {
log_err("Failed to get CDAP reply.");
+ free(key);
cdap_destroy(cdap);
flow_dealloc(conn.flow_info.fd);
return -1;
}
+ free(key);
+
log_dbg("Packed information received (%zu bytes).", len);
if (rib_unpack(data, len, UNPACK_CREATE)) {
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index ba4a2a21..86554dd2 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -25,6 +25,7 @@
#include <ouroboros/cdap.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/fcntl.h>
#include <ouroboros/errno.h>
@@ -37,28 +38,39 @@
#include "cdap.pb-c.h"
typedef Cdap cdap_t;
-typedef Opcode opcode_t;
typedef int32_t invoke_id_t;
+#define CDAP_REPLY (CDAP_DELETE + 1)
+
#define INVALID_INVOKE_ID -1
#define IDS_SIZE 256
#define BUF_SIZE 2048
-struct cdap {
+struct fd_el {
+ struct list_head next;
+
int fd;
+};
+
+struct cdap {
+ flow_set_t * set;
+
+ size_t n_flows;
+ struct list_head flows;
+ pthread_rwlock_t flows_lock;
struct bmp * ids;
pthread_mutex_t ids_lock;
- pthread_t reader;
-
struct list_head sent;
pthread_rwlock_t sent_lock;
struct list_head rcvd;
pthread_cond_t rcvd_cond;
pthread_mutex_t rcvd_lock;
+
+ pthread_t reader;
};
struct cdap_rcvd {
@@ -133,6 +145,7 @@ static struct cdap_req * cdap_sent_get_by_key(struct cdap * instance,
}
static struct cdap_req * cdap_sent_add(struct cdap * instance,
+ int fd,
cdap_key_t key)
{
struct cdap_req * req;
@@ -141,7 +154,7 @@ static struct cdap_req * cdap_sent_add(struct cdap * instance,
assert(key >= 0);
assert(!cdap_sent_has_key(instance, key));
- req = cdap_req_create(key);
+ req = cdap_req_create(fd, key);
if (req == NULL)
return NULL;
@@ -220,9 +233,14 @@ static void * sdu_reader(void * o)
uint8_t buf[BUF_SIZE];
ssize_t len;
buffer_t data;
+ fqueue_t * fq;
- while (true) {
- len = flow_read(instance->fd, buf, BUF_SIZE);
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
+ while (flow_event_wait(instance->set, fq, NULL)) {
+ len = flow_read(fqueue_next(fq), buf, BUF_SIZE);
if (len < 0)
continue;
@@ -230,41 +248,17 @@ static void * sdu_reader(void * o)
if (msg == NULL)
continue;
- if (msg->opcode != OPCODE__REPLY) {
+ if (msg->opcode != CDAP_REPLY) {
rcvd = malloc(sizeof(*rcvd));
if (rcvd == NULL) {
cdap__free_unpacked(msg, NULL);
continue;
}
- switch (msg->opcode) {
- case OPCODE__START:
- rcvd->opcode = CDAP_START;
- break;
- case OPCODE__STOP:
- rcvd->opcode = CDAP_STOP;
- break;
- case OPCODE__READ:
- rcvd->opcode = CDAP_READ;
- break;
- case OPCODE__WRITE:
- rcvd->opcode = CDAP_WRITE;
- break;
- case OPCODE__CREATE:
- rcvd->opcode = CDAP_CREATE;
- break;
- case OPCODE__DELETE:
- rcvd->opcode = CDAP_DELETE;
- break;
- default:
- cdap__free_unpacked(msg, NULL);
- free(rcvd);
- continue;
- }
-
- rcvd->iid = msg->invoke_id;
- rcvd->flags = msg->flags;
- rcvd->name = strdup(msg->name);
+ rcvd->opcode = msg->opcode;
+ rcvd->iid = msg->invoke_id;
+ rcvd->flags = msg->flags;
+ rcvd->name = strdup(msg->name);
if (rcvd->name == NULL) {
cdap__free_unpacked(msg, NULL);
free(rcvd);
@@ -303,7 +297,7 @@ static void * sdu_reader(void * o)
cdap__free_unpacked(msg, NULL);
continue;
}
- memcpy(data.data, msg->value.data, data.len);
+ memcpy(data.data, msg->value.data Iata.len);
} else {
data.len = 0;
data.data = NULL;
@@ -311,36 +305,32 @@ static void * sdu_reader(void * o)
cdap_req_respond(req, msg->result, data);
}
-
- cdap__free_unpacked(msg, NULL);
}
-
return (void *) 0;
}
-struct cdap * cdap_create(int fd)
+struct cdap * cdap_create()
{
struct cdap * instance = NULL;
- int flags;
-
- if (fd < 0)
- return NULL;
-
- flags = flow_get_flags(fd);
- if (flags & FLOW_O_NONBLOCK)
- return NULL;
instance = malloc(sizeof(*instance));
if (instance == NULL)
return NULL;
+ if (pthread_rwlock_init(&instance->flows_lock, NULL)) {
+ free(instance);
+ return NULL;
+ }
+
if (pthread_mutex_init(&instance->ids_lock, NULL)) {
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
if (pthread_mutex_init(&instance->rcvd_lock, NULL)) {
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -348,6 +338,7 @@ struct cdap * cdap_create(int fd)
if (pthread_rwlock_init(&instance->sent_lock, NULL)) {
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -356,6 +347,7 @@ struct cdap * cdap_create(int fd)
pthread_rwlock_destroy(&instance->sent_lock);
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
@@ -366,15 +358,29 @@ struct cdap * cdap_create(int fd)
pthread_rwlock_destroy(&instance->sent_lock);
pthread_mutex_destroy(&instance->rcvd_lock);
pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
free(instance);
return NULL;
}
+ instance->set = flow_set_create();
+ if (instance->set == NULL) {
+ bmp_destroy(instance->ids);
+ pthread_cond_destroy(&instance->rcvd_cond);
+ pthread_rwlock_destroy(&instance->sent_lock);
+ pthread_mutex_destroy(&instance->rcvd_lock);
+ pthread_mutex_destroy(&instance->ids_lock);
+ pthread_rwlock_destroy(&instance->flows_lock);
+ free(instance);
+ return NULL;
+ }
+
+ instance->n_flows = 0;
+
+ list_head_init(&instance->flows);
list_head_init(&instance->sent);
list_head_init(&instance->rcvd);
- instance->fd = fd;
-
pthread_create(&instance->reader, NULL, sdu_reader, instance);
return instance;
@@ -382,12 +388,29 @@ struct cdap * cdap_create(int fd)
int cdap_destroy(struct cdap * instance)
{
+ struct list_head * p;
+ struct list_head * h;
+
if (instance == NULL)
return 0;
pthread_cancel(instance->reader);
pthread_join(instance->reader, NULL);
+ flow_set_destroy(instance->set);
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ list_for_each_safe(p,h, &instance->flows) {
+ struct fd_el * e = list_entry(p, struct fd_el, next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ pthread_rwlock_destroy(&instance->flows_lock);
+
pthread_mutex_lock(&instance->ids_lock);
bmp_destroy(instance->ids);
@@ -409,14 +432,71 @@ int cdap_destroy(struct cdap * instance)
return 0;
}
-static int write_msg(struct cdap * instance,
+int cdap_add_flow(struct cdap * instance,
+ int fd)
+{
+ struct fd_el * e;
+
+ if (fd < 0)
+ return -EINVAL;
+
+ e = malloc(sizeof(*e));
+ if (e == NULL)
+ return -ENOMEM;
+
+ e->fd = fd;
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ if (flow_set_add(instance->set, fd)) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ return -1;
+ }
+
+ list_add(&e->next, &instance->flows);
+
+ ++instance->n_flows;
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return 0;
+}
+
+int cdap_del_flow(struct cdap * instance,
+ int fd)
+{
+ struct list_head * p;
+ struct list_head * h;
+
+ if (fd < 0)
+ return -EINVAL;
+
+ pthread_rwlock_wrlock(&instance->flows_lock);
+
+ flow_set_del(instance->set, fd);
+
+ list_for_each_safe(p, h, &instance->flows) {
+ struct fd_el * e = list_entry(p, struct fd_el, next);
+ if (e->fd == fd) {
+ list_del(&e->next);
+ free(e);
+ break;
+ }
+ }
+
+ --instance->n_flows;
+
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return 0;
+}
+
+static int write_msg(int fd,
cdap_t * msg)
{
- int ret;
uint8_t * data;
size_t len;
- assert(instance);
assert(msg);
len = cdap__get_packed_size(msg);
@@ -429,11 +509,14 @@ static int write_msg(struct cdap * instance,
cdap__pack(msg, data);
- ret = flow_write(instance->fd, data, len);
+ if (flow_write(fd, data, len)) {
+ free(data);
+ return -1;
+ }
free(data);
- return ret;
+ return 0;
}
static cdap_key_t invoke_id_to_key(invoke_id_t iid)
@@ -452,75 +535,114 @@ static invoke_id_t key_to_invoke_id(cdap_key_t key)
return (invoke_id_t) key;
}
-cdap_key_t cdap_request_send(struct cdap * instance,
- enum cdap_opcode code,
- const char * name,
- const void * data,
- size_t len,
- uint32_t flags)
+cdap_key_t * cdap_request_send(struct cdap * instance,
+ enum cdap_opcode code,
+ const char * name,
+ const void * data,
+ size_t len,
+ uint32_t flags)
{
- cdap_t msg = CDAP__INIT;
- struct cdap_req * req;
- invoke_id_t iid;
- cdap_key_t key;
+ cdap_key_t * keys;
+ cdap_key_t * key;
+ cdap_t msg = CDAP__INIT;
+ struct list_head * p;
+ int ret;
- if (instance == NULL || name == NULL)
- return -EINVAL;
+ if (instance == NULL || name == NULL || code > CDAP_DELETE)
+ return NULL;
+ pthread_rwlock_rdlock(&instance->flows_lock);
- iid = next_invoke_id(instance);
- if (iid == INVALID_INVOKE_ID)
- return INVALID_CDAP_KEY;
+ keys = malloc(sizeof(*keys) * (instance->n_flows + 1));
+ if (keys == NULL)
+ return NULL;
- switch (code) {
- case CDAP_READ:
- msg.opcode = OPCODE__READ;
- break;
- case CDAP_WRITE:
- msg.opcode = OPCODE__WRITE;
- break;
- case CDAP_CREATE:
- msg.opcode = OPCODE__CREATE;
- break;
- case CDAP_DELETE:
- msg.opcode = OPCODE__DELETE;
- break;
- case CDAP_START:
- msg.opcode = OPCODE__START;
- break;
- case CDAP_STOP:
- msg.opcode = OPCODE__STOP;
- break;
- default:
- release_invoke_id(instance, iid);
- return -EINVAL;
- }
+ memset(keys, INVALID_CDAP_KEY, sizeof(*keys) * (instance->n_flows + 1));
+ key = keys;
+
+ msg.opcode = code;
msg.name = (char *) name;
msg.has_flags = true;
msg.flags = flags;
- msg.invoke_id = iid;
+
if (data != NULL) {
msg.has_value = true;
msg.value.data = (uint8_t *) data;
msg.value.len = len;
}
- key = invoke_id_to_key(iid);
+ list_for_each(p, &instance->flows) {
+ struct cdap_req * req;
+ invoke_id_t iid;
+ struct fd_el * e;
+ cdap__init(&msg);
+
+ iid = next_invoke_id(instance);
+ if (iid == INVALID_INVOKE_ID) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key > keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance,
+ *(--key));
+ cdap_sent_del(instance, r);
+ cdap_req_destroy(r);
+ }
- req = cdap_sent_add(instance, key);
- if (req == NULL) {
- release_invoke_id(instance, iid);
- return INVALID_CDAP_KEY;
- }
+ free(keys);
+ return NULL;
+ }
- if (write_msg(instance, &msg)) {
- cdap_sent_del(instance, req);
- release_invoke_id(instance, iid);
- return INVALID_CDAP_KEY;
+ msg.invoke_id = iid;
+
+ *key = invoke_id_to_key(iid);
+
+ e = list_entry(p, struct fd_el, next);
+
+ req = cdap_sent_add(instance, e->fd, *key);
+ if (req == NULL) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key > keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance,
+ *(--key));
+ release_invoke_id(instance, iid);
+ cdap_sent_del(instance, r);
+ release_invoke_id(instance,
+ key_to_invoke_id(r->key));
+ cdap_req_destroy(r);
+ }
+ free(keys);
+ return NULL;
+ }
+
+ ret = write_msg(e->fd, &msg);
+ if (ret == -ENOMEM) {
+ pthread_rwlock_unlock(&instance->flows_lock);
+ while(key >= keys) {
+ struct cdap_req * r =
+ cdap_sent_get_by_key(instance, *key);
+ cdap_sent_del(instance, r);
+ release_invoke_id(instance,
+ key_to_invoke_id(r->key));
+ cdap_req_destroy(r);
+ }
+
+ free(keys);
+ return NULL;
+ }
+
+ if (ret < 0) {
+ release_invoke_id(instance, iid);
+ cdap_sent_del(instance, req);
+ }
+
+ ++key;
}
- return key;
+ pthread_rwlock_unlock(&instance->flows_lock);
+
+ return keys;
}
int cdap_reply_wait(struct cdap * instance,
@@ -609,11 +731,14 @@ int cdap_reply_send(struct cdap * instance,
{
cdap_t msg = CDAP__INIT;
invoke_id_t iid = key_to_invoke_id(key);
+ struct cdap_req * req = cdap_sent_get_by_key(instance, key);
+ if (req == NULL)
+ return -EINVAL;
if (instance == NULL)
return -EINVAL;
- msg.opcode = OPCODE__REPLY;
+ msg.opcode = CDAP_REPLY;
msg.invoke_id = iid;
msg.has_result = true;
msg.result = result;
@@ -624,5 +749,5 @@ int cdap_reply_send(struct cdap * instance,
msg.value.len = len;
}
- return write_msg(instance, &msg);
+ return write_msg(req->fd, &msg);
}
diff --git a/src/lib/cdap.proto b/src/lib/cdap.proto
index 5fde1658..120b2c97 100644
--- a/src/lib/cdap.proto
+++ b/src/lib/cdap.proto
@@ -23,18 +23,8 @@
syntax = "proto2";
-enum opcode {
- CREATE = 1;
- DELETE = 2;
- READ = 3;
- WRITE = 4;
- START = 5;
- STOP = 6;
- REPLY = 7;
-}
-
message cdap {
- required opcode opcode = 1;
+ required uint32 opcode = 1;
required uint32 invoke_id = 2;
optional uint32 flags = 3;
optional string name = 4;
diff --git a/src/lib/cdap_req.c b/src/lib/cdap_req.c
index a0348a14..b60e73ad 100644
--- a/src/lib/cdap_req.c
+++ b/src/lib/cdap_req.c
@@ -30,7 +30,8 @@
#include <stdlib.h>
#include <assert.h>
-struct cdap_req * cdap_req_create(cdap_key_t key)
+struct cdap_req * cdap_req_create(int fd,
+ cdap_key_t key)
{
struct cdap_req * creq = malloc(sizeof(*creq));
pthread_condattr_t cattr;
@@ -38,10 +39,10 @@ struct cdap_req * cdap_req_create(cdap_key_t key)
if (creq == NULL)
return NULL;
- creq->key = key;
+ creq->fd = fd;
+ creq->key = key;
creq->state = REQ_INIT;
-
- creq->response = -1;
+ creq->response = -1;
creq->data.data = NULL;
creq->data.len = 0;
diff --git a/src/lib/cdap_req.h b/src/lib/cdap_req.h
index 9023357d..fe8e3613 100644
--- a/src/lib/cdap_req.h
+++ b/src/lib/cdap_req.h
@@ -43,8 +43,8 @@ enum creq_state {
struct cdap_req {
struct list_head next;
+ int fd;
struct timespec birth;
-
cdap_key_t key;
int response;
@@ -55,7 +55,8 @@ struct cdap_req {
pthread_mutex_t lock;
};
-struct cdap_req * cdap_req_create(cdap_key_t key);
+struct cdap_req * cdap_req_create(int fd,
+ cdap_key_t key);
void cdap_req_destroy(struct cdap_req * creq);