summaryrefslogtreecommitdiff
path: root/src/tools/oping
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-12-12 13:24:17 +0100
committerdimitri staessens <[email protected]>2016-12-12 15:10:30 +0100
commitf8c14e0246a6c9cb5e8ff47869b5968abb63f010 (patch)
treed91c005451a74822516669f3f7cc3ade34971abb /src/tools/oping
parentb731adbf7b6fa16490f7abf94e2662d82d76cce0 (diff)
downloadouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.tar.gz
ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.zip
src, tools: Set/get timeout and get qos for flows
Receiver timeouts can now be set on a flow using the flow_set_timeout function. Specifying NULL disables the timeout. The flow_get_timeout function gets the value for the timeout. This commit also deprecates fcntl in favor of flow_get_flags and flow_set_flags functions. struct qos_spec is typedef'd as a qosspec_t. The tools and cdap.c are updated to use the new API. Fixes a bug in operf client where the client's writer thread wouldn't cancel on SIGINT.
Diffstat (limited to 'src/tools/oping')
-rw-r--r--src/tools/oping/oping.c3
-rw-r--r--src/tools/oping/oping_client.c115
-rw-r--r--src/tools/oping/oping_server.c4
3 files changed, 46 insertions, 76 deletions
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c
index 98d12a7b..224b182b 100644
--- a/src/tools/oping/oping.c
+++ b/src/tools/oping/oping.c
@@ -54,9 +54,6 @@ struct c {
double rtt_avg;
double rtt_m2;
- flow_set_t * flows;
- fqueue_t * fq;
-
/* needs locking */
struct timespec * times;
pthread_mutex_t lock;
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index b30ba5f4..c439cf46 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -60,56 +60,54 @@ void * reader(void * o)
char buf[OPING_BUF_SIZE];
struct oping_msg * msg = (struct oping_msg *) buf;
- int fd = 0;
+ int fd = *((int *) o);
int msg_len = 0;
double ms = 0;
double d = 0;
- (void) o;
-
- /* FIXME: use flow timeout option once we have it */
- while (client.rcvd != client.count
- && (flow_event_wait(client.flows, client.fq, &timeout)
- != -ETIMEDOUT)) {
- while ((fd = fqueue_next(client.fq)) >= 0) {
- msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
- if (msg_len < 0)
- continue;
-
- if (ntohl(msg->type) != ECHO_REPLY) {
- printf("Invalid message on fd %d.\n", fd);
- continue;
- }
-
- if (ntohl(msg->id) >= client.count) {
- printf("Invalid id.\n");
- continue;
- }
-
- ++client.rcvd;
-
- clock_gettime(CLOCK_REALTIME, &now);
-
- pthread_mutex_lock(&client.lock);
- ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
- / 1000.0;
- pthread_mutex_unlock(&client.lock);
-
- printf("%d bytes from %s: seq=%d time=%.3f ms\n",
- msg_len,
- client.s_apn,
- ntohl(msg->id),
- ms);
-
- if (ms < client.rtt_min)
- client.rtt_min = ms;
- if (ms > client.rtt_max)
- client.rtt_max = ms;
-
- d = (ms - client.rtt_avg);
- client.rtt_avg += d / (float) client.rcvd;
- client.rtt_m2 += d * (ms - client.rtt_avg);
+ flow_set_timeout(fd, &timeout);
+
+ while (client.rcvd != client.count) {
+ msg_len = flow_read(fd, buf, OPING_BUF_SIZE);
+ if (msg_len == -ETIMEDOUT)
+ break;
+
+ if (msg_len < 0)
+ continue;
+
+ if (ntohl(msg->type) != ECHO_REPLY) {
+ printf("Invalid message on fd %d.\n", fd);
+ continue;
}
+
+ if (ntohl(msg->id) >= client.count) {
+ printf("Invalid id.\n");
+ continue;
+ }
+
+ ++client.rcvd;
+
+ clock_gettime(CLOCK_REALTIME, &now);
+
+ pthread_mutex_lock(&client.lock);
+ ms = ts_diff_us(&client.times[ntohl(msg->id)], &now)
+ / 1000.0;
+ pthread_mutex_unlock(&client.lock);
+
+ printf("%d bytes from %s: seq=%d time=%.3f ms\n",
+ msg_len,
+ client.s_apn,
+ ntohl(msg->id),
+ ms);
+
+ if (ms < client.rtt_min)
+ client.rtt_min = ms;
+ if (ms > client.rtt_max)
+ client.rtt_max = ms;
+
+ d = (ms - client.rtt_avg);
+ client.rtt_avg += d / (float) client.rcvd;
+ client.rtt_m2 += d * (ms - client.rtt_avg);
}
return (void *) 0;
@@ -164,20 +162,8 @@ void * writer(void * o)
static int client_init(void)
{
- client.flows = flow_set_create();
- if (client.flows == NULL)
- return -ENOMEM;
-
- client.fq = fqueue_create();
- if (client.fq == NULL) {
- flow_set_destroy(client.flows);
- return -ENOMEM;
- }
-
client.times = malloc(sizeof(struct timespec) * client.count);
if (client.times == NULL) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
pthread_mutex_unlock(&client.lock);
return -ENOMEM;
}
@@ -197,12 +183,6 @@ static int client_init(void)
void client_fini(void)
{
- if (client.flows != NULL)
- flow_set_destroy(client.flows);
-
- if (client.fq != NULL)
- fqueue_destroy(client.fq);
-
if (client.times != NULL)
free(client.times);
}
@@ -235,17 +215,12 @@ int client_main(void)
fd = flow_alloc(client.s_apn, NULL, NULL);
if (fd < 0) {
- flow_set_destroy(client.flows);
- fqueue_destroy(client.fq);
printf("Failed to allocate flow.\n");
return -1;
}
- flow_set_add(client.flows, fd);
-
if (flow_alloc_res(fd)) {
printf("Flow allocation refused.\n");
- flow_set_del(client.flows, fd);
flow_dealloc(fd);
client_fini();
return -1;
@@ -255,7 +230,7 @@ int client_main(void)
clock_gettime(CLOCK_REALTIME, &tic);
- pthread_create(&client.reader_pt, NULL, reader, NULL);
+ pthread_create(&client.reader_pt, NULL, reader, &fd);
pthread_create(&client.writer_pt, NULL, writer, &fd);
pthread_join(client.writer_pt, NULL);
@@ -283,8 +258,6 @@ int client_main(void)
printf("NaN ms\n");
}
- flow_set_del(client.flows, fd);
-
flow_dealloc(fd);
client_fini();
diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c
index 8d7ab1db..63fca567 100644
--- a/src/tools/oping/oping_server.c
+++ b/src/tools/oping/oping_server.c
@@ -115,7 +115,7 @@ void * accept_thread(void * o)
{
int fd = 0;
struct timespec now = {0, 0};
- struct qos_spec qs;
+ qosspec_t qs;
(void) o;
@@ -143,7 +143,7 @@ void * accept_thread(void * o)
server.times[fd] = now;
pthread_mutex_unlock(&server.lock);
- flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR);
+ flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR);
}
return (void *) 0;