summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-09-18 19:06:15 +0200
committerSander Vrijders <[email protected]>2017-09-19 16:52:54 +0200
commit541b1c5eeb5fe9f9aaa4aa6487852e5c59761139 (patch)
treea5817c5bd030b8a07713dcaa7dde95edbd0392d2 /src
parent669a8d4edfcc0fb2a7cd6f93e0ad2d0de820371a (diff)
downloadouroboros-541b1c5eeb5fe9f9aaa4aa6487852e5c59761139.tar.gz
ouroboros-541b1c5eeb5fe9f9aaa4aa6487852e5c59761139.zip
ipcpd, lib: Add flow down events
This adds the flow down event to Ouroboros. In the shim-eth-llc, a netlink socket is opened which listens to device up/down events. For each event the flow is then adjusted with fccntl to notify the user the flow is down or back up again. In the normal IPCP an event is thrown if a write reports that the flow is down.
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/local/main.c18
-rw-r--r--src/ipcpd/normal/connmgr.h14
-rw-r--r--src/ipcpd/normal/dt.c12
-rw-r--r--src/ipcpd/shim-eth-llc/main.c207
-rw-r--r--src/lib/CMakeLists.txt2
-rw-r--r--src/lib/dev.c86
-rw-r--r--src/lib/shm_rbuff_ll.c30
-rw-r--r--src/lib/shm_rbuff_pthr.c52
8 files changed, 316 insertions, 105 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index fb4e312b..7f44a2c6 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -99,30 +99,28 @@ static void * ipcp_local_sdu_loop(void * o)
(void) o;
- while (true) {
- int fd;
+ while (ipcp_get_state() == IPCP_OPERATIONAL) {
+ int fd;
ssize_t idx;
- if (ipcp_get_state() != IPCP_OPERATIONAL)
- return (void *) 1; /* -ENOTENROLLED */
-
fevent(local_data.flows, local_data.fq, &timeout);
while ((fd = fqueue_next(local_data.fq)) >= 0) {
- pthread_rwlock_rdlock(&local_data.lock);
-
idx = local_flow_read(fd);
+ if (idx < 0)
+ continue;
assert(idx < (SHM_BUFFER_SIZE));
+ pthread_rwlock_rdlock(&local_data.lock);
+
fd = local_data.in_out[fd];
+ pthread_rwlock_unlock(&local_data.lock);
+
if (fd != -1)
local_flow_write(fd, idx);
-
- pthread_rwlock_unlock(&local_data.lock);
}
-
}
return (void *) 0;
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
index ca5288ae..a8edee7d 100644
--- a/src/ipcpd/normal/connmgr.h
+++ b/src/ipcpd/normal/connmgr.h
@@ -28,12 +28,14 @@
#include "ae.h"
-#define NOTIFY_DT_CONN_ADD 0x00D0
-#define NOTIFY_DT_CONN_DEL 0x00D1
-#define NOTIFY_DT_CONN_QOS 0x00D2
-
-#define NOTIFY_MGMT_CONN_ADD 0x00F0
-#define NOTIFY_MGMT_CONN_DEL 0x00F1
+#define NOTIFY_DT_CONN_ADD 0x00D0
+#define NOTIFY_DT_CONN_DEL 0x00D1
+#define NOTIFY_DT_CONN_QOS 0x00D2
+#define NOTIFY_DT_CONN_DOWN 0x00D3
+
+#define NOTIFY_MGMT_CONN_ADD 0x00F0
+#define NOTIFY_MGMT_CONN_DEL 0x00F1
+#define NOTIFY_MGMT_CONN_DOWN 0x00F2
int connmgr_init(void);
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 2df17163..56cb5a61 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -93,6 +93,7 @@ static void sdu_handler(int fd,
struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
+ int ret;
memset(&dt_pci, 0, sizeof(dt_pci));
@@ -112,8 +113,11 @@ static void sdu_handler(int fd,
return;
}
- if (ipcp_flow_write(fd, sdb)) {
+ ret = ipcp_flow_write(fd, sdb);
+ if (ret < 0) {
log_err("Failed to write SDU to fd %d.", fd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
ipcp_sdb_release(sdb);
return;
}
@@ -323,6 +327,7 @@ int dt_write_sdu(uint64_t dst_addr,
{
int fd;
struct dt_pci dt_pci;
+ int ret;
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
@@ -342,8 +347,11 @@ int dt_write_sdu(uint64_t dst_addr,
return -1;
}
- if (ipcp_flow_write(fd, sdb)) {
+ ret = ipcp_flow_write(fd, sdb);
+ if (ret < 0) {
log_err("Failed to write SDU to fd %d.", fd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
return -1;
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 20734d6e..36d101e5 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -43,6 +43,7 @@
#include <ouroboros/fqueue.h>
#include <ouroboros/logs.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/fccntl.h>
#include "ipcp.h"
#include "shim-data.h"
@@ -64,6 +65,8 @@
#ifdef __linux__
#include <linux/if_packet.h>
#include <linux/if_ether.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
#endif
#ifdef __FreeBSD__
@@ -154,6 +157,10 @@ struct {
pthread_t sdu_writer;
pthread_t sdu_reader;
+#ifdef __linux__
+ pthread_t if_monitor;
+#endif
+
/* Handle mgmt frames in a different thread */
pthread_t mgmt_handler;
pthread_mutex_t mgmt_lock;
@@ -771,6 +778,114 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
return (void *) 1;
}
+#ifdef __linux__
+static int open_netlink_socket(void)
+{
+ struct sockaddr_nl sa;
+ int fd;
+
+ memset(&sa, 0, sizeof(sa));
+ sa.nl_family = AF_NETLINK;
+ sa.nl_pid = getpid();
+ sa.nl_groups = RTMGRP_LINK;
+
+ fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
+ if (fd < 0)
+ return -1;
+
+ if (bind(fd, (struct sockaddr *) &sa, sizeof(sa))) {
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
+static void change_flows_state(bool up)
+{
+ int i;
+ uint32_t flags;
+
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
+
+ for (i = 0; i < MAX_SAPS; i++) {
+ if (eth_llc_data.ef_to_fd[i] != -1) {
+ fccntl(i, FLOWGFLAGS, &flags);
+ if (up)
+ fccntl(eth_llc_data.ef_to_fd[i],
+ FLOWSFLAGS, flags & ~FLOWFDOWN);
+ else
+ fccntl(eth_llc_data.ef_to_fd[i],
+ FLOWSFLAGS, flags | FLOWFDOWN);
+ }
+ }
+
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+}
+
+static void * eth_llc_ipcp_if_monitor(void * o)
+{
+ int fd;
+ int status;
+ char buf[4096];
+ struct iovec iov = {buf, sizeof(buf)};
+ struct sockaddr_nl snl;
+ struct msghdr msg = {(void *) &snl, sizeof(snl),
+ &iov, 1, NULL, 0, 0};
+ struct nlmsghdr * h;
+ struct ifinfomsg * ifi;
+
+ (void ) o;
+
+ fd = open_netlink_socket();
+ if (fd < 0) {
+ log_err("Failed to open socket.");
+ return (void *) -1;
+ }
+
+ while (ipcp_get_state() == IPCP_OPERATIONAL) {
+ status = recvmsg(fd, &msg, 0);
+ if (status < 0)
+ continue;
+
+ for (h = (struct nlmsghdr *) buf;
+ NLMSG_OK(h, (unsigned int) status);
+ h = NLMSG_NEXT(h, status)) {
+
+ /* Finish reading */
+ if (h->nlmsg_type == NLMSG_DONE)
+ break;
+
+ /* Message is some kind of error */
+ if (h->nlmsg_type == NLMSG_ERROR)
+ continue;
+
+ /* Only interested in link up/down */
+ if (h->nlmsg_type != RTM_NEWLINK)
+ continue;
+
+ ifi = NLMSG_DATA(h);
+
+ /* Not our interface */
+ if (ifi->ifi_index != eth_llc_data.device.sll_ifindex)
+ continue;
+
+ if (ifi->ifi_flags & IFF_UP) {
+ change_flows_state(true);
+ log_dbg("Interface up.");
+ } else {
+ change_flows_state(false);
+ log_dbg("Interface down.");
+ }
+ }
+ }
+
+ close(fd);
+
+ return (void *) 0;
+}
+#endif
+
#if defined (HAVE_BPF) && !defined(HAVE_NETMAP)
static int open_bpf_device(void)
{
@@ -844,13 +959,13 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
break;
}
+ freeifaddrs(ifaddr);
+
if (ifa == NULL) {
log_err("Interface not found.");
- freeifaddrs(ifaddr);
return -1;
}
- freeifaddrs(ifaddr);
#elif defined(__linux__)
skfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (skfd < 0) {
@@ -904,38 +1019,32 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
if (BPF_BLEN < blen) {
log_err("BPF buffer too small (is: %ld must be: %d).",
BPF_BLEN, blen);
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSETIF, &ifr) < 0) {
log_err("Failed to set interface.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSHDRCMPLT, &enable) < 0) {
log_err("Failed to set BIOCSHDRCMPLT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSSEESENT, &disable) < 0) {
log_err("Failed to set BIOCSSEESENT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCSRTIMEOUT, &tv) < 0) {
log_err("Failed to set BIOCSRTIMEOUT.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
if (ioctl(eth_llc_data.bpf, BIOCIMMEDIATE, &enable) < 0) {
log_err("Failed to set BIOCIMMEDIATE.");
- close(eth_llc_data.bpf);
- return -1;
+ goto fail_device;
}
log_info("Using Berkeley Packet Filter.");
@@ -958,39 +1067,74 @@ static int eth_llc_ipcp_bootstrap(const struct ipcp_config * conf)
if (bind(eth_llc_data.s_fd, (struct sockaddr *) &eth_llc_data.device,
sizeof(eth_llc_data.device))) {
log_err("Failed to bind socket to interface");
- close(eth_llc_data.s_fd);
- return -1;
+ goto fail_device;
}
if (setsockopt(eth_llc_data.s_fd, SOL_SOCKET, SO_RCVTIMEO,
&tv, sizeof(tv))) {
log_err("Failed to set socket timeout: %s.", strerror(errno));
- close(eth_llc_data.s_fd);
- return -1;
+ goto fail_device;
}
#endif /* HAVE_NETMAP */
ipcp_set_state(IPCP_OPERATIONAL);
- pthread_create(&eth_llc_data.mgmt_handler,
- NULL,
- eth_llc_ipcp_mgmt_handler,
- NULL);
+#ifdef __linux__
+ if (pthread_create(&eth_llc_data.if_monitor,
+ NULL,
+ eth_llc_ipcp_if_monitor,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_device;
+ }
+#endif
+
+ if (pthread_create(&eth_llc_data.mgmt_handler,
+ NULL,
+ eth_llc_ipcp_mgmt_handler,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_mgmt_handler;
+ }
- pthread_create(&eth_llc_data.sdu_reader,
- NULL,
- eth_llc_ipcp_sdu_reader,
- NULL);
+ if (pthread_create(&eth_llc_data.sdu_reader,
+ NULL,
+ eth_llc_ipcp_sdu_reader,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_reader;
+ }
- pthread_create(&eth_llc_data.sdu_writer,
- NULL,
- eth_llc_ipcp_sdu_writer,
- NULL);
+ if (pthread_create(&eth_llc_data.sdu_writer,
+ NULL,
+ eth_llc_ipcp_sdu_writer,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_writer;
+ }
log_dbg("Bootstrapped shim IPCP over Ethernet with LLC with api %d.",
getpid());
return 0;
+
+ fail_sdu_writer:
+ pthread_join(eth_llc_data.sdu_reader, NULL);
+ fail_sdu_reader:
+ pthread_join(eth_llc_data.mgmt_handler, NULL);
+ fail_mgmt_handler:
+#ifdef __linux__
+ pthread_join(eth_llc_data.if_monitor, NULL);
+#endif
+ fail_device:
+#if defined(HAVE_NETMAP)
+ nm_close(eth_llc_data.nmd);
+#elif defined(HAVE_BPF)
+ close(eth_llc_data.bpf);
+#elif defined(HAVE_RAW_SOCKETS)
+ close(eth_llc_data.s_fd);
+#endif
+ return -1;
}
static int eth_llc_ipcp_reg(const uint8_t * hash)
@@ -1228,6 +1372,9 @@ int main(int argc,
pthread_join(eth_llc_data.sdu_writer, NULL);
pthread_join(eth_llc_data.sdu_reader, NULL);
pthread_join(eth_llc_data.mgmt_handler, NULL);
+#ifdef __linux__
+ pthread_join(eth_llc_data.if_monitor, NULL);
+#endif
}
eth_llc_data_fini();
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 26cecb44..29ca4db2 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -163,6 +163,8 @@ set(SHM_RDRB_NAME "/${SHM_PREFIX}.rdrb" CACHE INTERNAL
"Name for the main POSIX shared memory buffer")
set(SHM_RDRB_BLOCK_SIZE "sysconf(_SC_PAGESIZE)" CACHE STRING
"SDU buffer block size, multiple of pagesize for performance")
+set(SHM_RBUFF_LOCKLESS 1 CACHE BOOL
+ "Enable shared memory rbuff lockless support")
set(SOURCE_FILES
# Add source files here
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 14ee31f4..b945968d 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -46,10 +46,10 @@
#include <stdio.h>
#include <stdarg.h>
-#define BUF_SIZE 1500
+#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
+#define TW_ELEMENTS 6000
+#define TW_RESOLUTION 1 /* ms */
#define MPL 2000 /* ms */
#define RQ_SIZE 20
@@ -249,12 +249,15 @@ static int api_announce(char * ap_name)
static int finalize_write(int fd,
size_t idx)
{
- if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0)
- return -ENOTALLOC;
+ int ret;
+
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ return ret;
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- return 0;
+ return ret;
}
static int frcti_init(int fd)
@@ -305,6 +308,7 @@ static int frcti_send(int fd,
{
struct timespec now = {0, 0};
struct frcti * frcti;
+ int ret;
frcti = &(ai.frcti[fd]);
@@ -331,9 +335,10 @@ static int frcti_send(int fd,
return -1;
}
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&frcti->lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&frcti->lock);
@@ -871,12 +876,6 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_unlock(&ai.lock);
- if (frcti_configure(fd, FRCTFORDERING | FRCTFERRCHCK)) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- return -1;
- }
-
return fd;
}
@@ -931,6 +930,8 @@ int fccntl(int fd,
va_list l;
struct timespec * timeo;
qosspec_t * qs;
+ uint32_t rx_acl;
+ uint32_t tx_acl;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
@@ -988,10 +989,28 @@ int fccntl(int fd,
break;
case FLOWSFLAGS:
ai.flows[fd].oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ /*
+ * Making our own flow write only means making the
+ * the other side of the flow read only.
+ */
if (ai.flows[fd].oflags & FLOWFWRONLY)
- shm_rbuff_block(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDONLY;
if (ai.flows[fd].oflags & FLOWFRDWR)
- shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ rx_acl |= ACL_RDWR;
+
+ if (ai.flows[fd].oflags & FLOWFDOWN) {
+ rx_acl |= ACL_FLOWDOWN;
+ tx_acl |= ACL_FLOWDOWN;
+ } else {
+ rx_acl &= ~ACL_FLOWDOWN;
+ tx_acl &= ~ACL_FLOWDOWN;
+ }
+
+ shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
+ shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
@@ -1007,6 +1026,8 @@ int fccntl(int fd,
if (cflags == NULL)
goto einval;
*cflags = ai.frcti[fd].conf_flags;
+ if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ goto eperm;
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1036,6 +1057,7 @@ ssize_t flow_write(int fd,
size_t count)
{
ssize_t idx;
+ int ret;
if (buf == NULL)
return 0;
@@ -1079,19 +1101,21 @@ ssize_t flow_write(int fd,
}
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
shm_rdrbuff_remove(ai.rdrb, idx);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) {
+ ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
+ if (ret < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
- return -1;
+ return ret;
}
}
@@ -1129,7 +1153,8 @@ ssize_t flow_read(int fd,
idx = frcti_read(fd);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT);
+ assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
+ idx == -EFLOWDOWN);
return idx;
}
@@ -1509,6 +1534,8 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
+ int ret;
+
if (sdb == NULL)
return -EINVAL;
@@ -1527,17 +1554,19 @@ int ipcp_flow_write(int fd,
assert(ai.flows[fd].tx_rb);
if (!ai.frcti[fd].used) {
- if (finalize_write(fd, shm_du_buff_get_idx(sdb))) {
+ ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
} else {
pthread_rwlock_unlock(&ai.lock);
- if (frcti_write(fd, sdb))
- return -1;
+ ret = frcti_write(fd, sdb);
+ if (ret < 0)
+ return ret;
}
return 0;
@@ -1618,6 +1647,8 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
+ int ret;
+
if (fd < 0)
return -EINVAL;
@@ -1628,9 +1659,10 @@ int local_flow_write(int fd,
return -ENOTALLOC;
}
- if (finalize_write(fd, idx)) {
+ ret = finalize_write(fd, idx);
+ if (ret < 0) {
pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
+ return ret;
}
pthread_rwlock_unlock(&ai.lock);
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index ec0199c0..b77c4374 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb,
{
size_t ohead;
size_t nhead;
-
- bool was_empty = false;
+ bool was_empty = false;
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
- if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */
- return -ENOTALLOC;
+ if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) {
+ if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+ else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY)
+ return -ENOTALLOC;
+ }
if (!shm_rbuff_free(rb))
return -EAGAIN;
@@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
if (shm_rbuff_empty(rb))
- return -EAGAIN;
+ return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ?
+ -EFLOWDOWN : -EAGAIN;
ntail = RB_TAIL;
@@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED);
+ __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN);
+ return __sync_fetch_and_add(rb->acl, 0);
}
void shm_rbuff_fini(struct shm_rbuff * rb)
{
assert(rb);
- assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED);
-
if (shm_rbuff_empty(rb))
return;
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 9567762f..fb183d8f 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)
free(rb);
}
-int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
+int shm_rbuff_write(struct shm_rbuff * rb,
+ size_t idx)
{
+ int ret = 0;
+
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
@@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- if (*rb->acl == RB_CLOSED) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
+
+ if (*rb->acl != ACL_RDWR) {
+ if (*rb->acl & ACL_FLOWDOWN)
+ ret = -EFLOWDOWN;
+ else if (*rb->acl & ACL_RDONLY)
+ ret = -ENOTALLOC;
+ goto err;
}
if (!shm_rbuff_free(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ ret = -EAGAIN;
+ goto err;
}
if (shm_rbuff_empty(rb))
@@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
pthread_mutex_unlock(rb->lock);
return 0;
+ err:
+ pthread_mutex_unlock(rb->lock);
+ return ret;
}
ssize_t shm_rbuff_read(struct shm_rbuff * rb)
@@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
if (shm_rbuff_empty(rb)) {
+ ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;
pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ return ret;
}
ret = *tail_el_ptr(rb);
@@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
+ if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) {
+ pthread_mutex_unlock(rb->lock);
+ return -EFLOWDOWN;
+ }
+
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
@@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
@@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_CLOSED;
+ *rb->acl = (size_t) flags;
pthread_mutex_unlock(rb->lock);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
+ uint32_t flags;
+
assert(rb);
#ifndef HAVE_ROBUST_MUTEX
@@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_OPEN;
+ flags = (uint32_t) *rb->acl;
pthread_mutex_unlock(rb->lock);
+
+ return flags;
}
void shm_rbuff_fini(struct shm_rbuff * rb)
@@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- assert(*rb->acl == RB_CLOSED);
-
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);