summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-19 22:25:46 +0200
committerdimitri staessens <[email protected]>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/lib/dev.c
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c500
1 files changed, 329 insertions, 171 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 77c2d06a..f735e72b 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -3,7 +3,8 @@
*
* API for applications
*
- * Sander Vrijders <[email protected]>
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -26,20 +27,24 @@
#include <ouroboros/sockets.h>
#include <ouroboros/fcntl.h>
#include <ouroboros/bitmap.h>
+#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/shm_rbuff.h>
#include <ouroboros/utils.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
struct flow_set {
- bool dirty;
- bool b[IRMD_MAX_FLOWS]; /* working copy */
- bool s[IRMD_MAX_FLOWS]; /* safe copy */
- pthread_rwlock_t lock;
+ size_t idx;
+};
+
+struct fqueue {
+ int fqueue[SHM_BUFFER_SIZE]; /* safe copy from shm */
+ size_t fqsize;
+ size_t next;
};
enum port_state {
@@ -124,7 +129,9 @@ enum port_state port_wait_assign(struct port * p)
}
struct flow {
- struct shm_ap_rbuff * rb;
+ struct shm_rbuff * rx_rb;
+ struct shm_rbuff * tx_rb;
+ struct shm_flow_set * set;
int port_id;
int oflags;
@@ -139,10 +146,11 @@ struct {
pid_t api;
struct shm_rdrbuff * rdrb;
- struct shm_ap_rbuff * rb;
+ struct shm_flow_set * fqset;
pthread_rwlock_t data_lock;
struct bmp * fds;
+ struct bmp * fqueues;
struct flow * flows;
struct port * ports;
@@ -194,40 +202,52 @@ int ap_init(char * ap_name)
if (ai.fds == NULL)
return -ENOMEM;
- ai.rdrb = shm_rdrbuff_open();
- if (ai.rdrb == NULL) {
+ ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0);
+ if (ai.fqueues == NULL) {
+ bmp_destroy(ai.fds);
+ return -ENOMEM;
+ }
+
+ ai.fqset = shm_flow_set_create();
+ if (ai.fqset == NULL) {
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
- ai.rb = shm_ap_rbuff_create();
- if (ai.rb == NULL) {
- shm_rdrbuff_close(ai.rdrb);
+ ai.rdrb = shm_rdrbuff_open();
+ if (ai.rdrb == NULL) {
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
if (ai.flows == NULL) {
- shm_ap_rbuff_destroy(ai.rb);
shm_rdrbuff_close(ai.rdrb);
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai.flows[i].rb = NULL;
+ ai.flows[i].rx_rb = NULL;
+ ai.flows[i].tx_rb = NULL;
+ ai.flows[i].set = NULL;
ai.flows[i].port_id = -1;
- ai.flows[i].oflags = 0;
- ai.flows[i].api = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
ai.flows[i].timeout = NULL;
}
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
- if (ai.flows == NULL) {
+ if (ai.ports == NULL) {
free(ai.flows);
- shm_ap_rbuff_destroy(ai.rb);
shm_rdrbuff_close(ai.rdrb);
+ shm_flow_set_destroy(ai.fqset);
+ bmp_destroy(ai.fqueues);
bmp_destroy(ai.fds);
return -1;
}
@@ -253,16 +273,10 @@ void ap_fini()
pthread_rwlock_wrlock(&ai.data_lock);
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_pop_idx(ai.rb)) >= 0)
- shm_rdrbuff_remove(ai.rdrb, i);
-
- if (ai.fds != NULL)
- bmp_destroy(ai.fds);
- if (ai.rb != NULL)
- shm_ap_rbuff_destroy(ai.rb);
- if (ai.rdrb != NULL)
- shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
+ bmp_destroy(ai.fqueues);
+ shm_flow_set_destroy(ai.fqset);
+ shm_rdrbuff_close(ai.rdrb);
if (ai.daf_name != NULL)
free(ai.daf_name);
@@ -270,8 +284,15 @@ void ap_fini()
pthread_rwlock_rdlock(&ai.flows_lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (ai.flows[i].rb != NULL)
- shm_ap_rbuff_close(ai.flows[i].rb);
+ if (ai.flows[i].tx_rb != NULL) {
+ int idx;
+ while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ shm_rbuff_destroy(ai.flows[i].rx_rb);
+ shm_rbuff_close(ai.flows[i].tx_rb);
+ shm_flow_set_close(ai.flows[i].set);
+ }
+
if (ai.flows[i].timeout != NULL)
free(ai.flows[i].timeout);
}
@@ -328,8 +349,8 @@ int flow_accept(char ** ae_name)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -337,10 +358,24 @@ int flow_accept(char ** ae_name)
return -1;
}
+ ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
+ if (ai.flows[fd].set == NULL) {
+ bmp_release(ai.fds, fd);
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(ai.flows[fd].rb);
+ shm_rbuff_destroy(ai.flows[fd].tx_rb);
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ shm_flow_set_close(ai.flows[fd].set);
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -356,8 +391,6 @@ int flow_accept(char ** ae_name)
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -410,6 +443,17 @@ int flow_alloc_resp(int fd, int response)
ret = recv_msg->result;
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api,
+ ai.flows[fd].port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -461,8 +505,11 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
+ ai.flows[fd].rx_rb = shm_rbuff_create(recv_msg->port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -470,9 +517,26 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
return -1;
}
- ai.flows[fd].port_id = recv_msg->port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
- ai.flows[fd].api = recv_msg->api;
+ ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, recv_msg->port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
+ if (ai.flows[fd].set == NULL) {
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -480,8 +544,6 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
-
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
@@ -548,7 +610,7 @@ int flow_dealloc(int fd)
return -ENOTALLOC;
}
- if (shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id) == -EBUSY) {
+ if (shm_rbuff_block(ai.flows[fd].rx_rb) == -EBUSY) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EBUSY;
@@ -559,8 +621,10 @@ int flow_dealloc(int fd)
port_destroy(&ai.ports[msg.port_id]);
ai.flows[fd].port_id = -1;
- shm_ap_rbuff_close(ai.flows[fd].rb);
- ai.flows[fd].rb = NULL;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ ai.flows[fd].rx_rb = NULL;
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ ai.flows[fd].tx_rb = NULL;
ai.flows[fd].oflags = 0;
ai.flows[fd].api = -1;
if (ai.flows[fd].timeout != NULL) {
@@ -604,9 +668,9 @@ int flow_cntl(int fd, int cmd, int oflags)
case FLOW_F_SETFL: /* SET FLOW FLAGS */
ai.flows[fd].oflags = oflags;
if (oflags & FLOW_O_WRONLY)
- shm_ap_rbuff_close_port(ai.rb, ai.flows[fd].port_id);
+ shm_rbuff_block(ai.flows[fd].rx_rb);
if (oflags & FLOW_O_RDWR)
- shm_ap_rbuff_open_port(ai.rb, ai.flows[fd].port_id);
+ shm_rbuff_unblock(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return old;
@@ -620,7 +684,6 @@ int flow_cntl(int fd, int cmd, int oflags)
ssize_t flow_write(int fd, void * buf, size_t count)
{
ssize_t idx;
- struct rb_entry e;
if (buf == NULL)
return 0;
@@ -653,13 +716,10 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (idx < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return -idx;
+ return idx;
}
- e.index = idx;
- e.port_id = ai.flows[fd].port_id;
-
- if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
+ if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -667,7 +727,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
- pid_t api = ai.flows[fd].api;
+ pid_t api = ai.flows[fd].api;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -681,17 +741,16 @@ ssize_t flow_write(int fd, void * buf, size_t count)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- e.index = idx;
- e.port_id = ai.flows[fd].port_id;
-
- if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(ai.rdrb, e.index);
+ if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
}
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -717,15 +776,14 @@ ssize_t flow_read(int fd, void * buf, size_t count)
}
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_ap_rbuff * rb = ai.rb;
- int port_id = ai.flows[fd].port_id;
- struct timespec * timeout = ai.flows[fd].timeout;
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ struct timespec * timeout = ai.flows[fd].timeout;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
+ idx = shm_rbuff_read_b(rb, timeout);
pthread_rwlock_rdlock(&ai.data_lock);
}
@@ -757,79 +815,163 @@ struct flow_set * flow_set_create()
if (set == NULL)
return NULL;
- if (pthread_rwlock_init(&set->lock, NULL)) {
+ assert(ai.fqueues);
+
+ set->idx = bmp_allocate(ai.fqueues);
+ if (!bmp_is_id_valid(ai.fqueues, set->idx)) {
free(set);
return NULL;
}
- memset(set->b, 0, IRMD_MAX_FLOWS);
- memset(set->s, 0, IRMD_MAX_FLOWS);
+ return set;
+}
- set->dirty = true;
+void flow_set_destroy(struct flow_set * set)
+{
+ if (set == NULL)
+ return;
- return set;
+ flow_set_zero(set);
+ bmp_release(ai.fqueues, set->idx);
+ free(set);
}
-void flow_set_zero(struct flow_set * set)
+struct fqueue * fqueue_create()
{
- pthread_rwlock_wrlock(&set->lock);
- memset(set->b, 0, IRMD_MAX_FLOWS);
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ struct fqueue * fq = malloc(sizeof(*fq));
+ if (fq == NULL)
+ return NULL;
+
+ memset(fq->fqueue, -1, SHM_BUFFER_SIZE);
+ fq->fqsize = 0;
+ fq->next = 0;
+
+ return fq;
}
-void flow_set_add(struct flow_set * set, int fd)
+void fqueue_destroy(struct fqueue * fq)
{
- pthread_rwlock_wrlock(&set->lock);
- set->b[ai.flows[fd].port_id] = true;
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ if (fq == NULL)
+ return
+ free(fq);
}
-void flow_set_del(struct flow_set * set, int fd)
+void flow_set_zero(struct flow_set * set)
{
- pthread_rwlock_wrlock(&set->lock);
- set->b[ai.flows[fd].port_id] = false;
- set->dirty = true;
- pthread_rwlock_unlock(&set->lock);
+ if (set == NULL)
+ return;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+
+ shm_flow_set_zero(ai.fqset, set->idx);
+
+ pthread_rwlock_unlock(&ai.data_lock);
}
-bool flow_set_has(struct flow_set * set, int fd)
+int flow_set_add(struct flow_set * set, int fd)
{
- bool ret;
- pthread_rwlock_rdlock(&set->lock);
- ret = set->b[ai.flows[fd].port_id];
- pthread_rwlock_unlock(&set->lock);
+ int ret;
+
+ if (set == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
return ret;
}
-void flow_set_destroy(struct flow_set * set)
+void flow_set_del(struct flow_set * set, int fd)
{
- pthread_rwlock_destroy(&set->lock);
- free(set);
+ if (set == NULL)
+ return;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
}
-static void flow_set_cpy(struct flow_set * set)
+bool flow_set_has(struct flow_set * set, int fd)
{
- pthread_rwlock_rdlock(&set->lock);
- if (set->dirty)
- memcpy(set->s, set->b, IRMD_MAX_FLOWS);
- set->dirty = false;
- pthread_rwlock_unlock(&set->lock);
+ bool ret = false;
+
+ if (set == NULL || fd < 0)
+ return false;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return false;
+ }
+
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
}
-int flow_select(struct flow_set * set, const struct timespec * timeout)
+int fqueue_next(struct fqueue * fq)
{
- int port_id;
- if (set == NULL) {
- port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout);
- } else {
- flow_set_cpy(set);
- port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout);
+ int fd;
+
+ if (fq == NULL)
+ return -EINVAL;
+
+ if (fq->next == fq->fqsize) {
+ fq->fqsize = 0;
+ fq->next = 0;
+ return -EPERM;
}
- if (port_id < 0)
- return port_id;
- return ai.ports[port_id].fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[fq->fqueue[fq->next++]].fd;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int flow_event_wait(struct flow_set * set,
+ struct fqueue * fq,
+ const struct timespec * timeout)
+{
+ int ret;
+
+ if (set == NULL)
+ return -EINVAL;
+
+ if (fq->fqsize > 0)
+ return 0;
+
+ ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout);
+ if (ret == -ETIMEDOUT)
+ return -ETIMEDOUT;
+
+ if (ret < 0)
+ return ret;
+
+ fq->fqsize = ret;
+ fq->next = 0;
+
+ return 0;
}
/* ipcp-dev functions */
@@ -848,8 +990,8 @@ int np1_flow_alloc(pid_t n_api, int port_id)
return -1;
}
- ai.flows[fd].rb = shm_ap_rbuff_open(n_api);
- if (ai.flows[fd].rb == NULL) {
+ ai.flows[fd].rx_rb = shm_rbuff_create(port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -863,8 +1005,6 @@ int np1_flow_alloc(pid_t n_api, int port_id)
ai.ports[port_id].fd = fd;
port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
- shm_ap_rbuff_open_port(ai.rb, port_id);
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -890,7 +1030,6 @@ int np1_flow_dealloc(int port_id)
int np1_flow_resp(pid_t n_api, int port_id)
{
int fd;
- struct shm_ap_rbuff * rb;
port_wait_assign(&ai.ports[port_id]);
@@ -904,18 +1043,26 @@ int np1_flow_resp(pid_t n_api, int port_id)
return fd;
}
- rb = shm_ap_rbuff_open(n_api);
- if (rb == NULL) {
+ ai.flows[fd].tx_rb = shm_rbuff_open(n_api, port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
ai.flows[fd].port_id = -1;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
port_destroy(&ai.ports[port_id]);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
- ai.flows[fd].rb = rb;
-
- shm_ap_rbuff_open_port(ai.rb, port_id);
+ ai.flows[fd].set = shm_flow_set_open(n_api);
+ if (ai.flows[fd].set == NULL) {
+ shm_rbuff_close(ai.flows[fd].tx_rb);
+ ai.flows[fd].port_id = -1;
+ shm_rbuff_destroy(ai.flows[fd].rx_rb);
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -929,9 +1076,9 @@ int ipcp_create_r(pid_t api)
irm_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_api = true;
- msg.api = api;
+ msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg.has_api = true;
+ msg.api = api;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
@@ -958,11 +1105,11 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
if (dst_name == NULL || src_ae_name == NULL)
return -EINVAL;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_api = true;
- msg.api = api;
- msg.dst_name = dst_name;
- msg.ae_name = src_ae_name;
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_api = true;
+ msg.api = api;
+ msg.dst_name = dst_name;
+ msg.ae_name = src_ae_name;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_wrlock(&ai.flows_lock);
@@ -974,7 +1121,7 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
return -1; /* -ENOMOREFDS */
}
- ai.flows[fd].rb = NULL;
+ ai.flows[fd].tx_rb = NULL;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -996,8 +1143,16 @@ int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_wrlock(&ai.flows_lock);
+ ai.flows[fd].rx_rb = shm_rbuff_create(port_id);
+ if (ai.flows[fd].rx_rb == NULL) {
+ ai.flows[fd].port_id = -1;
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
ai.flows[fd].port_id = port_id;
- ai.flows[fd].rb = NULL;
ai.ports[port_id].fd = fd;
port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
@@ -1019,16 +1174,13 @@ int ipcp_flow_alloc_reply(int fd, int response)
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- msg.port_id = ai.flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
msg.has_response = true;
msg.response = response;
- if (response)
- shm_ap_rbuff_open_port(ai.rb, msg.port_id);
-
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -1039,6 +1191,26 @@ int ipcp_flow_alloc_reply(int fd, int response)
}
ret = recv_msg->result;
+
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].tx_rb = shm_rbuff_open(ai.flows[fd].api,
+ ai.flows[fd].port_id);
+ if (ai.flows[fd].tx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].set = shm_flow_set_open(ai.flows[fd].api);
+ if (ai.flows[fd].set == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+
irm_msg__free_unpacked(recv_msg, NULL);
return ret;
@@ -1061,7 +1233,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_ap_rbuff_read_port(ai.rb, port_id);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1081,7 +1253,7 @@ int ipcp_flow_read(int fd, struct shm_du_buff ** sdb)
int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
{
- struct rb_entry e;
+ ssize_t idx;
if (sdb == NULL)
return -EINVAL;
@@ -1095,16 +1267,16 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return -EPERM;
}
- if (ai.flows[fd].rb == NULL) {
+ if (ai.flows[fd].tx_rb == NULL) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EPERM;
}
- e.index = shm_du_buff_get_idx(sdb);
- e.port_id = ai.flows[fd].port_id;
+ idx = shm_du_buff_get_idx(sdb);
- shm_ap_rbuff_write(ai.flows[fd].rb, &e);
+ shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1112,46 +1284,28 @@ int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
return 0;
}
-struct rb_entry * local_flow_read(int fd)
+ssize_t local_flow_read(int fd)
{
- int port_id;
- struct rb_entry * e = NULL;
-
- pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_rdlock(&ai.flows_lock);
-
- port_id = ai.flows[fd].port_id;
-
- pthread_rwlock_unlock(&ai.flows_lock);
- pthread_rwlock_unlock(&ai.data_lock);
-
- if (port_id != -1) {
- e = malloc(sizeof(*e));
- if (e == NULL)
- return NULL;
- e->index = shm_ap_rbuff_read_port(ai.rb, port_id);
- }
-
- return e;
+ return shm_rbuff_read(ai.flows[fd].rx_rb);
}
-int local_flow_write(int fd, struct rb_entry * e)
+int local_flow_write(int fd, ssize_t idx)
{
- if (e == NULL || fd < 0)
+ if (fd < 0)
return -EINVAL;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai.flows[fd].rb == NULL) {
+ if (ai.flows[fd].tx_rb == NULL) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
return -EPERM;
}
- e->port_id = ai.flows[fd].port_id;
+ shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- shm_ap_rbuff_write(ai.flows[fd].rb, e);
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1159,22 +1313,26 @@ int local_flow_write(int fd, struct rb_entry * e)
return 0;
}
-int ipcp_read_shim(struct shm_du_buff ** sdb)
+int ipcp_read_shim(int fd, struct shm_du_buff ** sdb)
{
- int fd;
- struct rb_entry * e = shm_ap_rbuff_read(ai.rb);
+ ssize_t idx;
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- fd = ai.ports[e->port_id].fd;
+ if (ai.flows[fd].rx_rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
- *sdb = shm_rdrbuff_get(ai.rdrb, e->index);
+ idx = shm_rbuff_read(ai.flows[fd].rx_rb);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return fd;
+ return 0;
}
void ipcp_flow_del(struct shm_du_buff * sdb)