summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-12-12 15:20:07 +0100
committerSander Vrijders <[email protected]>2016-12-12 15:20:07 +0100
commitfc8d30f2d6e9f3e463aff81a1630ff56f9463a22 (patch)
treed91c005451a74822516669f3f7cc3ade34971abb
parented6ac5db8474edabe83f0cdcbe7f258f0859ea41 (diff)
parentf8c14e0246a6c9cb5e8ff47869b5968abb63f010 (diff)
downloadouroboros-fc8d30f2d6e9f3e463aff81a1630ff56f9463a22.tar.gz
ouroboros-fc8d30f2d6e9f3e463aff81a1630ff56f9463a22.zip
Merged in dstaesse/ouroboros/be-timeout (pull request #323)
Be timeout
-rw-r--r--include/ouroboros/dev.h10
-rw-r--r--include/ouroboros/fcntl.h19
-rw-r--r--include/ouroboros/ipcp-dev.h7
-rw-r--r--include/ouroboros/qos.h4
-rw-r--r--src/ipcpd/normal/fmgr.c6
-rw-r--r--src/irmd/main.c10
-rw-r--r--src/lib/cdap.c2
-rw-r--r--src/lib/dev.c190
-rw-r--r--src/lib/shm_rbuff.c18
-rw-r--r--src/lib/shm_rdrbuff.c8
-rw-r--r--src/tools/cbr/cbr_server.c4
-rw-r--r--src/tools/echo/echo_server.c2
-rw-r--r--src/tools/operf/operf.c3
-rw-r--r--src/tools/operf/operf_client.c68
-rw-r--r--src/tools/operf/operf_server.c2
-rw-r--r--src/tools/oping/oping.c3
-rw-r--r--src/tools/oping/oping_client.c115
-rw-r--r--src/tools/oping/oping_server.c4
18 files changed, 261 insertions, 214 deletions
diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index f2e42d03..47665ca4 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -35,8 +35,8 @@ int ap_init(char * ap_name);
void ap_fini(void);
/* Returns flow descriptor (> 0), client AE name and qos spec. */
-int flow_accept(char ** ae_name,
- struct qos_spec * qos);
+int flow_accept(char ** ae_name,
+ qosspec_t * qos);
int flow_alloc_resp(int fd,
int response);
@@ -45,9 +45,9 @@ int flow_alloc_resp(int fd,
* Returns flow descriptor (> 0).
* On returning, qos will contain the actual supplied QoS.
*/
-int flow_alloc(char * dst_name,
- char * src_ae_name,
- struct qos_spec * qos);
+int flow_alloc(char * dst_name,
+ char * src_ae_name,
+ qosspec_t * qos);
int flow_alloc_res(int fd);
diff --git a/include/ouroboros/fcntl.h b/include/ouroboros/fcntl.h
index ccb45996..3ad3ccac 100644
--- a/include/ouroboros/fcntl.h
+++ b/include/ouroboros/fcntl.h
@@ -23,6 +23,8 @@
#ifndef OUROBOROS_FCNTL_H
#define OUROBOROS_FCNTL_H
+#include <sys/time.h>
+
/* same values as fcntl.h */
#define FLOW_O_RDONLY 00000000
#define FLOW_O_WRONLY 00000001
@@ -34,11 +36,18 @@
#define FLOW_O_INVALID (FLOW_O_WRONLY | FLOW_O_RDWR)
-#define FLOW_F_GETFL 00000001
-#define FLOW_F_SETFL 00000002
+int flow_set_flags(int fd,
+ int flags);
+
+int flow_get_flags(int fd);
+
+int flow_set_timeout(int fd,
+ struct timespec * to);
+
+int flow_get_timeout(int fd,
+ struct timespec * to);
-int flow_cntl(int fd,
- int cmd,
- int oflags);
+int flow_get_qosspec(int fd,
+ qosspec_t * spec);
#endif /* OUROBOROS_FCNTL_H */
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index ee7c83ad..19a66762 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -21,10 +21,6 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#include <unistd.h>
-#include <time.h>
-
-#include <ouroboros/qos.h>
#include <ouroboros/shm_rdrbuff.h>
#ifndef OUROBOROS_IPCP_DEV_H
@@ -49,4 +45,7 @@ void ipcp_flow_fini(int fd);
void ipcp_flow_del(struct shm_du_buff * sdb);
+int ipcp_flow_get_qoscube(int fd,
+ enum qos_cube * cube);
+
#endif /* OUROBOROS_IPCP_DEV_H */
diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h
index 8f573b7d..047e6288 100644
--- a/include/ouroboros/qos.h
+++ b/include/ouroboros/qos.h
@@ -26,9 +26,9 @@
#include <stdint.h>
/* FIXME: may need revision */
-struct qos_spec {
+typedef struct qos_spec {
uint32_t delay;
uint32_t jitter;
-};
+} qosspec_t;
#endif
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 41785ae4..8e416aa4 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -123,9 +123,9 @@ static int add_np1_fd(int fd,
static void * fmgr_nm1_acceptor(void * o)
{
- int fd;
- char * ae_name;
- struct qos_spec qs;
+ int fd;
+ char * ae_name;
+ qosspec_t qs;
(void) o;
diff --git a/src/irmd/main.c b/src/irmd/main.c
index dc0c26f2..6a6aedf8 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1125,16 +1125,16 @@ static int flow_alloc_resp(pid_t n_api,
return ret;
}
-static struct irm_flow * flow_alloc(pid_t api,
- char * dst_name,
- char * src_ae_name,
- struct qos_spec * qos)
+static struct irm_flow * flow_alloc(pid_t api,
+ char * dst_name,
+ char * src_ae_name,
+ qosspec_t * qos)
{
struct irm_flow * f;
pid_t ipcp;
int port_id;
- /* FIXME: Map qos_spec to qos_cube */
+ /* FIXME: Map qosspec to qos_cube */
(void) qos;
pthread_rwlock_rdlock(&irmd->state_lock);
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index d06a7d39..df79be54 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -203,7 +203,7 @@ struct cdap * cdap_create(struct cdap_ops * ops,
ops->cdap_request == NULL)
return NULL;
- flags = flow_cntl(fd, FLOW_F_GETFL, 0);
+ flags = flow_get_flags(fd);
if (flags & FLOW_O_NONBLOCK)
return NULL;
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 1c0d73a1..bad56129 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -138,7 +138,8 @@ struct flow {
pid_t api;
- struct timespec * timeout;
+ bool timesout;
+ struct timespec rcv_timeo;
};
struct {
@@ -213,11 +214,7 @@ static void reset_flow(int fd)
ai.flows[fd].oflags = 0;
ai.flows[fd].api = -1;
-
- if (ai.flows[fd].timeout != NULL) {
- free(ai.flows[fd].timeout);
- ai.flows[fd].timeout = NULL;
- }
+ ai.flows[fd].timesout = false;
}
int ap_init(char * ap_name)
@@ -265,13 +262,13 @@ int ap_init(char * ap_name)
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai.flows[i].rx_rb = NULL;
- ai.flows[i].tx_rb = NULL;
- ai.flows[i].set = NULL;
- ai.flows[i].port_id = -1;
- ai.flows[i].oflags = 0;
- ai.flows[i].api = -1;
- ai.flows[i].timeout = NULL;
+ ai.flows[i].rx_rb = NULL;
+ ai.flows[i].tx_rb = NULL;
+ ai.flows[i].set = NULL;
+ ai.flows[i].port_id = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
+ ai.flows[i].timesout = false;
}
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
@@ -341,7 +338,7 @@ void ap_fini()
pthread_rwlock_destroy(&ai.data_lock);
}
-int flow_accept(char ** ae_name, struct qos_spec * qos)
+int flow_accept(char ** ae_name, qosspec_t * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -490,7 +487,7 @@ int flow_alloc_resp(int fd, int response)
return ret;
}
-int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
+int flow_alloc(char * dst_name, char * src_ae_name, qosspec_t * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -680,7 +677,7 @@ int flow_dealloc(int fd)
return 0;
}
-int flow_cntl(int fd, int cmd, int oflags)
+int flow_set_flags(int fd, int flags)
{
int old;
@@ -698,25 +695,115 @@ int flow_cntl(int fd, int cmd, int oflags)
old = ai.flows[fd].oflags;
- switch (cmd) {
- case FLOW_F_GETFL: /* GET FLOW FLAGS */
+ ai.flows[fd].oflags = flags;
+ if (flags & FLOW_O_WRONLY)
+ shm_rbuff_block(ai.flows[fd].rx_rb);
+ if (flags & FLOW_O_RDWR)
+ shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return old;
+}
+
+int flow_get_flags(int fd)
+{
+ int old;
+
+ if (fd < 0 || fd >= AP_MAX_FLOWS)
+ return -EBADF;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -ENOTALLOC;
+ }
+
+ old = ai.flows[fd].oflags;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return old;
+}
+
+int flow_get_timeout(int fd, struct timespec * timeo)
+{
+ int ret = 0;
+
+ if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return old;
- case FLOW_F_SETFL: /* SET FLOW FLAGS */
- ai.flows[fd].oflags = oflags;
- if (oflags & FLOW_O_WRONLY)
- shm_rbuff_block(ai.flows[fd].rx_rb);
- if (oflags & FLOW_O_RDWR)
- shm_rbuff_unblock(ai.flows[fd].rx_rb);
+ return -ENOTALLOC;
+ }
+
+ if (ai.flows[fd].timesout)
+ *timeo = ai.flows[fd].rcv_timeo;
+ else
+ ret = -EPERM;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return ret;
+}
+
+int flow_set_timeout(int fd, struct timespec * timeo)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return old;
- default:
+ return -ENOTALLOC;
+ }
+
+ if (timeo == NULL) {
+ ai.flows[fd].timesout = false;
+ } else {
+ ai.flows[fd].timesout = true;
+ ai.flows[fd].rcv_timeo = *timeo;
+ }
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+int flow_get_qosspec(int fd, qosspec_t * spec)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS || spec == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return FLOW_O_INVALID; /* unknown command */
+ return -ENOTALLOC;
}
+
+ /* FIXME: map cube to spec */
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
}
ssize_t flow_write(int fd, void * buf, size_t count)
@@ -764,7 +851,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
- struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
+ struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -816,19 +903,28 @@ ssize_t flow_read(int fd, void * buf, size_t count)
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_rbuff * rb = ai.flows[fd].rx_rb;
- struct timespec * timeout = ai.flows[fd].timeout;
+ struct shm_rbuff * rb = ai.flows[fd].rx_rb;
+ bool timeo = ai.flows[fd].timesout;
+ struct timespec timeout = ai.flows[fd].rcv_timeo;
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- idx = shm_rbuff_read_b(rb, timeout);
+
+ if (timeo)
+ idx = shm_rbuff_read_b(rb, &timeout);
+ else
+ idx = shm_rbuff_read_b(rb, NULL);
+
pthread_rwlock_rdlock(&ai.data_lock);
}
- if (idx < 0) {
+ if (idx == -ETIMEDOUT) {
pthread_rwlock_unlock(&ai.data_lock);
- return -EAGAIN;
+ return -ETIMEDOUT;
}
+ assert(idx >= 0);
+
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
if (n < 0) {
pthread_rwlock_unlock(&ai.data_lock);
@@ -844,7 +940,7 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return n;
}
-/* select functions */
+/* fqueue functions */
struct flow_set * flow_set_create()
{
@@ -1328,7 +1424,7 @@ int ipcp_flow_fini(int fd)
{
struct shm_rbuff * rb;
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
+ flow_set_flags(fd, FLOW_O_WRONLY);
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
@@ -1343,6 +1439,28 @@ int ipcp_flow_fini(int fd)
return 0;
}
+int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube)
+{
+ if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -ENOTALLOC;
+ }
+
+ *cube = ai.flows[fd].qos;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
ssize_t local_flow_read(int fd)
{
ssize_t ret;
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c
index 29a62f62..cc64fa09 100644
--- a/src/lib/shm_rbuff.c
+++ b/src/lib/shm_rbuff.c
@@ -287,7 +287,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
const struct timespec * timeout)
{
struct timespec abstime;
- int ret = 0;
ssize_t idx = -1;
assert(rb);
@@ -299,7 +298,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
pthread_mutex_consistent(rb->lock);
#endif
if (timeout != NULL) {
- idx = -ETIMEDOUT;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
ts_add(&abstime, timeout, &abstime);
}
@@ -307,21 +305,17 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
- while (shm_rbuff_empty(rb) && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) {
if (timeout != NULL)
- ret = pthread_cond_timedwait(rb->add,
- rb->lock,
- &abstime);
+ idx = -pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
else
- ret = pthread_cond_wait(rb->add, rb->lock);
+ idx = -pthread_cond_wait(rb->add, rb->lock);
#ifndef __APPLE__
- if (ret == EOWNERDEAD)
+ if (idx == -EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- if (ret == ETIMEDOUT) {
- idx = -ETIMEDOUT;
- break;
- }
}
if (idx != -ETIMEDOUT) {
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 3ad8a470..ae661be4 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -398,10 +398,10 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
}
ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
- size_t headspace,
- size_t tailspace,
- uint8_t * data,
- size_t len)
+ size_t headspace,
+ size_t tailspace,
+ uint8_t * data,
+ size_t len)
{
struct shm_du_buff * sdb;
size_t size = headspace + len + tailspace;
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 104c5e9e..64055cfb 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -85,7 +85,7 @@ static void handle_flow(int fd)
alive = iv_start;
ts_add(&iv_start, &intv, &iv_end);
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
+ flow_set_flags(fd, FLOW_O_NONBLOCK);
while (!stop) {
clock_gettime(CLOCK_REALTIME, &now);
@@ -157,7 +157,7 @@ static void * listener(void * o)
{
int client_fd = 0;
int response = 0;
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c
index 09575364..c369d3e6 100644
--- a/src/tools/echo/echo_server.c
+++ b/src/tools/echo/echo_server.c
@@ -40,7 +40,7 @@ int server_main(void)
int client_fd = 0;
char buf[BUF_SIZE];
ssize_t count = 0;
- struct qos_spec qs;
+ qosspec_t qs;
printf("Starting the server.\n");
diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c
index bc7ade3a..1716a598 100644
--- a/src/tools/operf/operf.c
+++ b/src/tools/operf/operf.c
@@ -47,9 +47,6 @@ struct c {
unsigned long sent;
unsigned long rcvd;
- flow_set_t * flows;
- fqueue_t * fq;
-
pthread_t reader_pt;
pthread_t writer_pt;
} client;
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index 902a7b41..44f25893 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -22,6 +22,7 @@
*/
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/time_utils.h>
#ifdef __FreeBSD__
@@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline)
while (now.tv_sec == deadline->tv_sec
&& now.tv_nsec < deadline->tv_nsec)
clock_gettime(CLOCK_REALTIME, &now);
+ pthread_testcancel();
}
void shutdown_client(int signo, siginfo_t * info, void * c)
@@ -68,23 +70,20 @@ void * reader(void * o)
struct timespec timeout = {2, 0};
char buf[OPERF_BUF_SIZE];
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
- (void) o;
+ flow_set_timeout(fd, &timeout);
- /* FIXME: use flow timeout option once we have it */
- while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT)
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPERF_BUF_SIZE);
- if (msg_len != client.size) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- ++client.rcvd;
+ while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) {
+ if (msg_len != client.size) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+ ++client.rcvd;
+ }
+
return (void *) 0;
}
@@ -160,33 +159,6 @@ void * writer(void * o)
return (void *) 0;
}
-static int client_init(void)
-{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
- client.sent = 0;
- client.rcvd = 0;
-
- return 0;
-}
-
-void client_fini(void)
-{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-}
-
int client_main(void)
{
struct sigaction sig_act;
@@ -208,32 +180,24 @@ int client_main(void)
return -1;
}
- if (client_init()) {
- printf("Failed to initialize client.\n");
- return -1;
- }
+ client.sent = 0;
+ client.rcvd = 0;
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
- client_fini();
return -1;
}
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -253,11 +217,7 @@ int client_main(void)
(client.rcvd * client.size * 8)
/ (double) ts_diff_us(&tic, &toc));
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
- client_fini();
-
return 0;
}
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c
index 4eb93879..340103d2 100644
--- a/src/tools/operf/operf_server.c
+++ b/src/tools/operf/operf_server.c
@@ -102,7 +102,7 @@ void * accept_thread(void * o)
{
int fd = 0;
struct timespec now = {0, 0};
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 98d12a7b..224b182b 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -54,9 +54,6 @@ struct c {
double rtt_avg;
double rtt_m2;
- flow_set_t * flows;
- fqueue_t * fq;
-
/* needs locking */
struct timespec * times;
pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index b30ba5f4..c439cf46 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -60,56 +60,54 @@ void * reader(void * o)
char buf[OPING_BUF_SIZE];
struct oping_msg * msg = (struct oping_msg *) buf;
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
double ms = 0;
double d = 0;
- (void) o;
-
- /* FIXME: use flow timeout option once we have it */
- while (client.rcvd != client.count
- && (flow_event_wait(client.flows, client.fq, &timeout)
- != -ETIMEDOUT)) {
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
- if (msg_len < 0)
- continue;
-
- if (ntohl(msg->type) != ECHO_REPLY) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- if (ntohl(msg->id) >= client.count) {
- printf("Invalid id.\n");
- continue;
- }
-
- ++client.rcvd;
-
- clock_gettime(CLOCK_REALTIME, &now);
-
- pthread_mutex_lock(&client.lock);
- ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
- / 1000.0;
- pthread_mutex_unlock(&client.lock);
-
- printf("%d bytes from %s: seq=%d time=%.3f ms\n",
- msg_len,
- client.s_apn,
- ntohl(msg->id),
- ms);
-
- if (ms < client.rtt_min)
- client.rtt_min = ms;
- if (ms > client.rtt_max)
- client.rtt_max = ms;
-
- d = (ms - client.rtt_avg);
- client.rtt_avg += d / (float) client.rcvd;
- client.rtt_m2 += d * (ms - client.rtt_avg);
+ flow_set_timeout(fd, &timeout);
+
+ while (client.rcvd != client.count) {
+ msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
+ if (msg_len == -ETIMEDOUT)
+ break;
+
+ if (msg_len < 0)
+ continue;
+
+ if (ntohl(msg->type) != ECHO_REPLY) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+
+ if (ntohl(msg->id) >= client.count) {
+ printf("Invalid id.\n");
+ continue;
+ }
+
+ ++client.rcvd;
+
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ pthread_mutex_lock(&client.lock);
+ ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
+ / 1000.0;
+ pthread_mutex_unlock(&client.lock);
+
+ printf("%d bytes from %s: seq=%d time=%.3f ms\n",
+ msg_len,
+ client.s_apn,
+ ntohl(msg->id),
+ ms);
+
+ if (ms < client.rtt_min)
+ client.rtt_min = ms;
+ if (ms > client.rtt_max)
+ client.rtt_max = ms;
+
+ d = (ms - client.rtt_avg);
+ client.rtt_avg += d / (float) client.rcvd;
+ client.rtt_m2 += d * (ms - client.rtt_avg);
}
return (void *) 0;
@@ -164,20 +162,8 @@ void * writer(void * o)
static int client_init(void)
{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
client.times = malloc(sizeof(struct timespec) * client.count);
if (client.times == NULL) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
pthread_mutex_unlock(&client.lock);
return -ENOMEM;
}
@@ -197,12 +183,6 @@ static int client_init(void)
void client_fini(void)
{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-
if (client.times != NULL)
free(client.times);
}
@@ -235,17 +215,12 @@ int client_main(void)
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
client_fini();
return -1;
@@ -255,7 +230,7 @@ int client_main(void)
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -283,8 +258,6 @@ int client_main(void)
printf("NaN ms\n");
}
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
client_fini();
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 8d7ab1db..63fca567 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -115,7 +115,7 @@ void * accept_thread(void * o)
{
int fd = 0;
struct timespec now = {0, 0};
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
@@ -143,7 +143,7 @@ void * accept_thread(void * o)
server.times[fd] = now;
pthread_mutex_unlock(&server.lock);
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR);
+ flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR);
}
return (void *) 0;