summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-12-12 13:24:17 +0100
committerdimitri staessens <[email protected]>2016-12-12 15:10:30 +0100
commitf8c14e0246a6c9cb5e8ff47869b5968abb63f010 (patch)
treed91c005451a74822516669f3f7cc3ade34971abb /src/lib
parentb731adbf7b6fa16490f7abf94e2662d82d76cce0 (diff)
downloadouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.tar.gz
ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.zip
src, tools: Set/get timeout and get qos for flows
Receiver timeouts can now be set on a flow using the flow_set_timeout function. Specifying NULL disables the timeout. The flow_get_timeout function gets the value for the timeout. This commit also deprecates fcntl in favor of flow_get_flags and flow_set_flags functions. struct qos_spec is typedef'd as a qosspec_t. The tools and cdap.c are updated to use the new API. Fixes a bug in operf client where the client's writer thread wouldn't cancel on SIGINT.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/cdap.c2
-rw-r--r--src/lib/dev.c190
2 files changed, 155 insertions, 37 deletions
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index d06a7d39..df79be54 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops,
ops->cdap_request == NULL)
return NULL;
- flags = flow_cntl(fd, FLOW_F_GETFL, 0);
+ flags = flow_get_flags(fd);
if (flags & FLOW_O_NONBLOCK)
return NULL;
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1c0d73a1..bad56129 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -138,7 +138,8 @@ struct flow {
pid_t api;
- struct timespec * timeout;
+ bool timesout;
+ struct timespec rcv_timeo;
};
struct {
@@ -213,11 +214,7 @@ static void reset_flow(int fd)
ai.flows[fd].oflags = 0;
ai.flows[fd].api = -1;
-
- if (ai.flows[fd].timeout != NULL) {
- free(ai.flows[fd].timeout);
- ai.flows[fd].timeout = NULL;
- }
+ ai.flows[fd].timesout = false;
}
int ap_init(char * ap_name)
@@ -265,13 +262,13 @@ int ap_init(char * ap_name)
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai.flows[i].rx_rb = NULL;
- ai.flows[i].tx_rb = NULL;
- ai.flows[i].set = NULL;
- ai.flows[i].port_id = -1;
- ai.flows[i].oflags = 0;
- ai.flows[i].api = -1;
- ai.flows[i].timeout = NULL;
+ ai.flows[i].rx_rb = NULL;
+ ai.flows[i].tx_rb = NULL;
+ ai.flows[i].set = NULL;
+ ai.flows[i].port_id = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
+ ai.flows[i].timesout = false;
}
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
@@ -341,7 +338,7 @@ void ap_fini()
pthread_rwlock_destroy(&ai.data_lock);
}
-int flow_accept(char ** ae_name, struct qos_spec * qos)
+int flow_accept(char ** ae_name, qosspec_t * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response)
return ret;
}
-int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
+int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -680,7 +677,7 @@ int flow_dealloc(int fd)
return 0;
}
-int flow_cntl(int fd, int cmd, int oflags)
+int flow_set_flags(int fd, int flags)
{
int old;
@@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags)
old = ai.flows[fd].oflags;
- switch (cmd) {
- case FLOW_F_GETFL: /* GET FLOW FLAGS */
+ ai.flows[fd].oflags = flags;
+ if (flags & FLOW_O_WRONLY)
+ shm_rbuff_block(ai.flows[fd].rx_rb);
+ if (flags & FLOW_O_RDWR)
+ shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return old;
+}
+
+int flow_get_flags(int fd)
+{
+ int old;
+
+ if (fd < 0 || fd >= AP_MAX_FLOWS)
+ return -EBADF;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -ENOTALLOC;
+ }
+
+ old = ai.flows[fd].oflags;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return old;
+}
+
+int flow_get_timeout(int fd, struct timespec * timeo)
+{
+ int ret = 0;
+
+ if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return old;
- case FLOW_F_SETFL: /* SET FLOW FLAGS */
- ai.flows[fd].oflags = oflags;
- if (oflags & FLOW_O_WRONLY)
- shm_rbuff_block(ai.flows[fd].rx_rb);
- if (oflags & FLOW_O_RDWR)
- shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ return -ENOTALLOC;
+ }
+
+ if (ai.flows[fd].timesout)
+ *timeo = ai.flows[fd].rcv_timeo;
+ else
+ ret = -EPERM;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
+}
+
+int flow_set_timeout(int fd, struct timespec * timeo)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return old;
- default:
+ return -ENOTALLOC;
+ }
+
+ if (timeo == NULL) {
+ ai.flows[fd].timesout = false;
+ } else {
+ ai.flows[fd].timesout = true;
+ ai.flows[fd].rcv_timeo = *timeo;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+int flow_get_qosspec(int fd, qosspec_t * spec)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return FLOW_O_INVALID; /* unknown command */
+ return -ENOTALLOC;
}
+
+ /* FIXME: map cube to spec */
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
}
ssize_t flow_write(int fd, void * buf, size_t count)
@@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
- struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
+ struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count)
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_rbuff * rb = ai.flows[fd].rx_rb;
- struct timespec * timeout = ai.flows[fd].timeout;
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ bool timeo = ai.flows[fd].timesout;
+ struct timespec timeout = ai.flows[fd].rcv_timeo;
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_rbuff_read_b(rb, timeout);
+
+ if (timeo)
+ idx = shm_rbuff_read_b(rb, &timeout);
+ else
+ idx = shm_rbuff_read_b(rb, NULL);
+
pthread_rwlock_rdlock(&ai.data_lock);
}
- if (idx < 0) {
+ if (idx == -ETIMEDOUT) {
pthread_rwlock_unlock(&ai.data_lock);
- return -EAGAIN;
+ return -ETIMEDOUT;
}
+ assert(idx >= 0);
+
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
if (n < 0) {
pthread_rwlock_unlock(&ai.data_lock);
@@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return n;
}
-/* select functions */
+/* fqueue functions */
struct flow_set * flow_set_create()
{
@@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd)
{
struct shm_rbuff * rb;
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+ flow_set_flags(fd, FLOW_O_WRONLY);
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd)
return 0;
}
+int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -ENOTALLOC;
+ }
+
+ *cube = ai.flows[fd].qos;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
ssize_t local_flow_read(int fd)
{
ssize_t ret;