summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-05-20 17:22:04 +0200
committerdimitri staessens <[email protected]>2016-05-20 17:47:45 +0200
commitd32d7eae209f3ee09a690cc9adb6ea277e0d17aa (patch)
tree301199d037c9c74bd683a2937fb49fc12e692cf6
parent303034090a9e8da6b096c1e61553dacaf359f187 (diff)
downloadouroboros-d32d7eae209f3ee09a690cc9adb6ea277e0d17aa.tar.gz
ouroboros-d32d7eae209f3ee09a690cc9adb6ea277e0d17aa.zip
lib: allow parallel connections
dev.c: read now only reads an SDU if is is for the correct port_id shm_ap_rbuff: added a function peek() that returns the port_id of the tail.
-rw-r--r--include/ouroboros/shm_ap_rbuff.h1
-rw-r--r--src/lib/dev.c17
-rw-r--r--src/lib/shm_ap_rbuff.c24
3 files changed, 31 insertions, 11 deletions
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 956a9540..0ececf88 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -47,6 +47,7 @@ void shm_ap_rbuff_close(struct shm_ap_rbuff * rb);
void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
struct rb_entry * e);
+int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb);
struct rb_entry * shm_ap_rbuff_read();
#endif /* OUROBOROS_SHM_AP_RBUFF_H */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index c365a17b..fab37bbf 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -588,18 +588,21 @@ ssize_t flow_read(int fd, void * buf, size_t count)
rw_lock_rdlock(&_ap_instance->flows_lock);
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
- e = shm_ap_rbuff_read(_ap_instance->rb);
- } else {
-
- /* FIXME: this will throw away packets for other fd's */
- while (e == NULL ||
- e->port_id != _ap_instance->flows[fd].port_id) {
- e = shm_ap_rbuff_read(_ap_instance->rb);
+ if (shm_ap_rbuff_peek(_ap_instance->rb)
+ != _ap_instance->flows[fd].port_id) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ap_instance->data_lock);
+ return -1;
}
+ } else { /* block */
+ while (shm_ap_rbuff_peek(_ap_instance->rb)
+ != _ap_instance->flows[fd].port_id)
+ ;
}
rw_lock_unlock(&_ap_instance->flows_lock);
+ e = shm_ap_rbuff_read(_ap_instance->rb);
if (e == NULL) {
rw_lock_unlock(&_ap_instance->data_lock);
return -1;
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 71a7e733..18fedc88 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -219,8 +219,6 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
{
- struct rb_entry * pos;
-
if (rb == NULL || e == NULL)
return -1;
@@ -231,8 +229,7 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
return -1;
}
- pos = rb->shm_base + *rb->ptr_head;
- *pos = *e;
+ *(rb->shm_base + *rb->ptr_head) = *e;
*rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
pthread_mutex_unlock(rb->shm_mutex);
@@ -240,6 +237,25 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
return 0;
}
+
+int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb)
+{
+ int port_id = -1;
+
+ pthread_mutex_lock(rb->shm_mutex);
+
+ if (shm_rbuff_used(rb) == 0) {
+ pthread_mutex_unlock(rb->shm_mutex);
+ return -7; /* -EAGAIN */
+ }
+
+ port_id = (rb->shm_base + *rb->ptr_tail)->port_id;
+
+ pthread_mutex_unlock(rb->shm_mutex);
+
+ return port_id;
+}
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;