summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-09-19 14:53:11 +0000
committerdimitri staessens <[email protected]>2017-09-19 14:53:11 +0000
commit115431af51795dfd583e24a051a7749c58a900b3 (patch)
treea5817c5bd030b8a07713dcaa7dde95edbd0392d2 /src/lib
parent669a8d4edfcc0fb2a7cd6f93e0ad2d0de820371a (diff)
parent541b1c5eeb5fe9f9aaa4aa6487852e5c59761139 (diff)
downloadouroboros-115431af51795dfd583e24a051a7749c58a900b3.tar.gz
ouroboros-115431af51795dfd583e24a051a7749c58a900b3.zip
Merged in sandervrijders/ouroboros/be-flow-down (pull request #596)
ipcpd, lib: Add flow down events
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/dev.c86
-rw-r--r--src/lib/shm_rbuff_ll.c30
-rw-r--r--src/lib/shm_rbuff_pthr.c52
4 files changed, 113 insertions, 57 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 26cecb44..29ca4db2 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -163,6 +163,8 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL
"Name for the main POSIX shared memory buffer")
set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING
"SDU buffer block size, multiple of pagesize for performance")
+set(SHM_RBUFF_LOCKLESS 1 CACHE BOOL
+ "Enable shared memory rbuff lockless support")
set(SOURCE_FILES
# Add source files here
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 14ee31f4..b945968d 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -46,10 +46,10 @@
#include <stdio.h>
#include <stdarg.h>
-#define BUF_SIZE 1500
+#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
+#define TW_ELEMENTS 6000
+#define TW_RESOLUTION 1 /* ms */
#define MPL 2000 /* ms */
#define RQ_SIZE 20
@@ -249,12 +249,15 @@ static int api_announce(char * ap_name)
static int finalize_write(int fd,
size_t idx)
{
- if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0)
- return -ENOTALLOC;
+ int ret;
+
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ return ret;
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- return 0;
+ return ret;
}
static int frcti_init(int fd)
@@ -305,6 +308,7 @@ static int frcti_send(int fd,
{
struct timespec now = {0, 0};
struct frcti * frcti;
+ int ret;
frcti = &(ai.frcti[fd]);
@@ -331,9 +335,10 @@ static int frcti_send(int fd,
return -1;
}
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&frcti->lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&frcti->lock);
@@ -871,12 +876,6 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_unlock(&ai.lock);
- if (frcti_configure(fd, FRCTFORDERING | FRCTFERRCHCK)) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- return -1;
- }
-
return fd;
}
@@ -931,6 +930,8 @@ int fccntl(int fd,
va_list l;
struct timespec * timeo;
qosspec_t * qs;
+ uint32_t rx_acl;
+ uint32_t tx_acl;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
@@ -988,10 +989,28 @@ int fccntl(int fd,
break;
case FLOWSFLAGS:
ai.flows[fd].oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ /*
+ * Making our own flow write only means making the
+ * the other side of the flow read only.
+ */
if (ai.flows[fd].oflags & FLOWFWRONLY)
- shm_rbuff_block(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDONLY;
if (ai.flows[fd].oflags & FLOWFRDWR)
- shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDWR;
+
+ if (ai.flows[fd].oflags & FLOWFDOWN) {
+ rx_acl |= ACL_FLOWDOWN;
+ tx_acl |= ACL_FLOWDOWN;
+ } else {
+ rx_acl &= ~ACL_FLOWDOWN;
+ tx_acl &= ~ACL_FLOWDOWN;
+ }
+
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
@@ -1007,6 +1026,8 @@ int fccntl(int fd,
if (cflags == NULL)
goto einval;
*cflags = ai.frcti[fd].conf_flags;
+ if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ goto eperm;
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1036,6 +1057,7 @@ ssize_t flow_write(int fd,
size_t count)
{
ssize_t idx;
+ int ret;
if (buf == NULL)
return 0;
@@ -1079,19 +1101,21 @@ ssize_t flow_write(int fd,
}
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) {
+ ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
+ if (ret < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
- return -1;
+ return ret;
}
}
@@ -1129,7 +1153,8 @@ ssize_t flow_read(int fd,
idx = frcti_read(fd);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT);
+ assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
+ idx == -EFLOWDOWN);
return idx;
}
@@ -1509,6 +1534,8 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
+ int ret;
+
if (sdb == NULL)
return -EINVAL;
@@ -1527,17 +1554,19 @@ int ipcp_flow_write(int fd,
assert(ai.flows[fd].tx_rb);
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, sdb))
- return -1;
+ ret = frcti_write(fd, sdb);
+ if (ret < 0)
+ return ret;
}
return 0;
@@ -1618,6 +1647,8 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
+ int ret;
+
if (fd < 0)
return -EINVAL;
@@ -1628,9 +1659,10 @@ int local_flow_write(int fd,
return -ENOTALLOC;
}
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index ec0199c0..b77c4374 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb,
{
size_t ohead;
size_t nhead;
-
- bool was_empty = false;
+ bool was_empty = false;
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
- if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */
- return -ENOTALLOC;
+ if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) {
+ if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+ else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY)
+ return -ENOTALLOC;
+ }
if (!shm_rbuff_free(rb))
return -EAGAIN;
@@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
if (shm_rbuff_empty(rb))
- return -EAGAIN;
+ return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ?
+ -EFLOWDOWN : -EAGAIN;
ntail = RB_TAIL;
@@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED);
+ __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN);
+ return __sync_fetch_and_add(rb->acl, 0);
}
void shm_rbuff_fini(struct shm_rbuff * rb)
{
assert(rb);
- assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED);
-
if (shm_rbuff_empty(rb))
return;
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 9567762f..fb183d8f 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)
free(rb);
}
-int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
+int shm_rbuff_write(struct shm_rbuff * rb,
+ size_t idx)
{
+ int ret = 0;
+
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
@@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- if (*rb->acl == RB_CLOSED) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
+
+ if (*rb->acl != ACL_RDWR) {
+ if (*rb->acl & ACL_FLOWDOWN)
+ ret = -EFLOWDOWN;
+ else if (*rb->acl & ACL_RDONLY)
+ ret = -ENOTALLOC;
+ goto err;
}
if (!shm_rbuff_free(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ ret = -EAGAIN;
+ goto err;
}
if (shm_rbuff_empty(rb))
@@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
pthread_mutex_unlock(rb->lock);
return 0;
+ err:
+ pthread_mutex_unlock(rb->lock);
+ return ret;
}
ssize_t shm_rbuff_read(struct shm_rbuff * rb)
@@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
if (shm_rbuff_empty(rb)) {
+ ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;
pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ return ret;
}
ret = *tail_el_ptr(rb);
@@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
+ if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) {
+ pthread_mutex_unlock(rb->lock);
+ return -EFLOWDOWN;
+ }
+
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
@@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
@@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_CLOSED;
+ *rb->acl = (size_t) flags;
pthread_mutex_unlock(rb->lock);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
+ uint32_t flags;
+
assert(rb);
#ifndef HAVE_ROBUST_MUTEX
@@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_OPEN;
+ flags = (uint32_t) *rb->acl;
pthread_mutex_unlock(rb->lock);
+
+ return flags;
}
void shm_rbuff_fini(struct shm_rbuff * rb)
@@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- assert(*rb->acl == RB_CLOSED);
-
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);