summaryrefslogtreecommitdiff
path: root/src/tools
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-12-12 13:24:17 +0100
committerdimitri staessens <[email protected]>2016-12-12 15:10:30 +0100
commitf8c14e0246a6c9cb5e8ff47869b5968abb63f010 (patch)
treed91c005451a74822516669f3f7cc3ade34971abb /src/tools
parentb731adbf7b6fa16490f7abf94e2662d82d76cce0 (diff)
downloadouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.tar.gz
ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.zip
src, tools: Set/get timeout and get qos for flows
Receiver timeouts can now be set on a flow using the flow_set_timeout function. Specifying NULL disables the timeout. The flow_get_timeout function gets the value for the timeout. This commit also deprecates fcntl in favor of flow_get_flags and flow_set_flags functions. struct qos_spec is typedef'd as a qosspec_t. The tools and cdap.c are updated to use the new API. Fixes a bug in operf client where the client's writer thread wouldn't cancel on SIGINT.
Diffstat (limited to 'src/tools')
-rw-r--r--src/tools/cbr/cbr_server.c4
-rw-r--r--src/tools/echo/echo_server.c2
-rw-r--r--src/tools/operf/operf.c3
-rw-r--r--src/tools/operf/operf_client.c68
-rw-r--r--src/tools/operf/operf_server.c2
-rw-r--r--src/tools/oping/oping.c3
-rw-r--r--src/tools/oping/oping_client.c115
-rw-r--r--src/tools/oping/oping_server.c4
8 files changed, 64 insertions, 137 deletions
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 104c5e9e..64055cfb 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -85,7 +85,7 @@ static void handle_flow(int fd)
alive = iv_start;
ts_add(&iv_start, &intv, &iv_end);
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);
+ flow_set_flags(fd, FLOW_O_NONBLOCK);
while (!stop) {
clock_gettime(CLOCK_REALTIME, &now);
@@ -157,7 +157,7 @@ static void * listener(void * o)
{
int client_fd = 0;
int response = 0;
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
diff --git a/src/tools/echo/echo_server.c b/src/tools/echo/echo_server.c
index 09575364..c369d3e6 100644
--- a/src/tools/echo/echo_server.c
+++ b/src/tools/echo/echo_server.c
@@ -40,7 +40,7 @@ int server_main(void)
int client_fd = 0;
char buf[BUF_SIZE];
ssize_t count = 0;
- struct qos_spec qs;
+ qosspec_t qs;
printf("Starting the server.\n");
diff --git a/src/tools/operf/operf.c b/src/tools/operf/operf.c
index bc7ade3a..1716a598 100644
--- a/src/tools/operf/operf.c
+++ b/src/tools/operf/operf.c
@@ -47,9 +47,6 @@ struct c {
unsigned long sent;
unsigned long rcvd;
- flow_set_t * flows;
- fqueue_t * fq;
-
pthread_t reader_pt;
pthread_t writer_pt;
} client;
diff --git a/src/tools/operf/operf_client.c b/src/tools/operf/operf_client.c
index 902a7b41..44f25893 100644
--- a/src/tools/operf/operf_client.c
+++ b/src/tools/operf/operf_client.c
@@ -22,6 +22,7 @@
*/
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/time_utils.h>
#ifdef __FreeBSD__
@@ -45,6 +46,7 @@ static void busy_wait_until(const struct timespec * deadline)
while (now.tv_sec == deadline->tv_sec
&& now.tv_nsec < deadline->tv_nsec)
clock_gettime(CLOCK_REALTIME, &now);
+ pthread_testcancel();
}
void shutdown_client(int signo, siginfo_t * info, void * c)
@@ -68,23 +70,20 @@ void * reader(void * o)
struct timespec timeout = {2, 0};
char buf[OPERF_BUF_SIZE];
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
- (void) o;
+ flow_set_timeout(fd, &timeout);
- /* FIXME: use flow timeout option once we have it */
- while (flow_event_wait(client.flows, client.fq, &timeout) != -ETIMEDOUT)
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPERF_BUF_SIZE);
- if (msg_len != client.size) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- ++client.rcvd;
+ while ((msg_len = flow_read(fd, buf, OPERF_BUF_SIZE)) != -ETIMEDOUT) {
+ if (msg_len != client.size) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+ ++client.rcvd;
+ }
+
return (void *) 0;
}
@@ -160,33 +159,6 @@ void * writer(void * o)
return (void *) 0;
}
-static int client_init(void)
-{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
- client.sent = 0;
- client.rcvd = 0;
-
- return 0;
-}
-
-void client_fini(void)
-{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-}
-
int client_main(void)
{
struct sigaction sig_act;
@@ -208,32 +180,24 @@ int client_main(void)
return -1;
}
- if (client_init()) {
- printf("Failed to initialize client.\n");
- return -1;
- }
+ client.sent = 0;
+ client.rcvd = 0;
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
- client_fini();
return -1;
}
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -253,11 +217,7 @@ int client_main(void)
(client.rcvd * client.size * 8)
/ (double) ts_diff_us(&tic, &toc));
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
- client_fini();
-
return 0;
}
diff --git a/src/tools/operf/operf_server.c b/src/tools/operf/operf_server.c
index 4eb93879..340103d2 100644
--- a/src/tools/operf/operf_server.c
+++ b/src/tools/operf/operf_server.c
@@ -102,7 +102,7 @@ void * accept_thread(void * o)
{
int fd = 0;
struct timespec now = {0, 0};
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 98d12a7b..224b182b 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -54,9 +54,6 @@ struct c {
double rtt_avg;
double rtt_m2;
- flow_set_t * flows;
- fqueue_t * fq;
-
/* needs locking */
struct timespec * times;
pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index b30ba5f4..c439cf46 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -60,56 +60,54 @@ void * reader(void * o)
char buf[OPING_BUF_SIZE];
struct oping_msg * msg = (struct oping_msg *) buf;
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
double ms = 0;
double d = 0;
- (void) o;
-
- /* FIXME: use flow timeout option once we have it */
- while (client.rcvd != client.count
- && (flow_event_wait(client.flows, client.fq, &timeout)
- != -ETIMEDOUT)) {
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
- if (msg_len < 0)
- continue;
-
- if (ntohl(msg->type) != ECHO_REPLY) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- if (ntohl(msg->id) >= client.count) {
- printf("Invalid id.\n");
- continue;
- }
-
- ++client.rcvd;
-
- clock_gettime(CLOCK_REALTIME, &now);
-
- pthread_mutex_lock(&client.lock);
- ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
- / 1000.0;
- pthread_mutex_unlock(&client.lock);
-
- printf("%d bytes from %s: seq=%d time=%.3f ms\n",
- msg_len,
- client.s_apn,
- ntohl(msg->id),
- ms);
-
- if (ms < client.rtt_min)
- client.rtt_min = ms;
- if (ms > client.rtt_max)
- client.rtt_max = ms;
-
- d = (ms - client.rtt_avg);
- client.rtt_avg += d / (float) client.rcvd;
- client.rtt_m2 += d * (ms - client.rtt_avg);
+ flow_set_timeout(fd, &timeout);
+
+ while (client.rcvd != client.count) {
+ msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
+ if (msg_len == -ETIMEDOUT)
+ break;
+
+ if (msg_len < 0)
+ continue;
+
+ if (ntohl(msg->type) != ECHO_REPLY) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+
+ if (ntohl(msg->id) >= client.count) {
+ printf("Invalid id.\n");
+ continue;
+ }
+
+ ++client.rcvd;
+
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ pthread_mutex_lock(&client.lock);
+ ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
+ / 1000.0;
+ pthread_mutex_unlock(&client.lock);
+
+ printf("%d bytes from %s: seq=%d time=%.3f ms\n",
+ msg_len,
+ client.s_apn,
+ ntohl(msg->id),
+ ms);
+
+ if (ms < client.rtt_min)
+ client.rtt_min = ms;
+ if (ms > client.rtt_max)
+ client.rtt_max = ms;
+
+ d = (ms - client.rtt_avg);
+ client.rtt_avg += d / (float) client.rcvd;
+ client.rtt_m2 += d * (ms - client.rtt_avg);
}
return (void *) 0;
@@ -164,20 +162,8 @@ void * writer(void * o)
static int client_init(void)
{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
client.times = malloc(sizeof(struct timespec) * client.count);
if (client.times == NULL) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
pthread_mutex_unlock(&client.lock);
return -ENOMEM;
}
@@ -197,12 +183,6 @@ static int client_init(void)
void client_fini(void)
{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-
if (client.times != NULL)
free(client.times);
}
@@ -235,17 +215,12 @@ int client_main(void)
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
client_fini();
return -1;
@@ -255,7 +230,7 @@ int client_main(void)
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -283,8 +258,6 @@ int client_main(void)
printf("NaN ms\n");
}
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
client_fini();
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 8d7ab1db..63fca567 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -115,7 +115,7 @@ void * accept_thread(void * o)
{
int fd = 0;
struct timespec now = {0, 0};
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
@@ -143,7 +143,7 @@ void * accept_thread(void * o)
server.times[fd] = now;
pthread_mutex_unlock(&server.lock);
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR);
+ flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR);
}
return (void *) 0;