diff options
author | dimitri staessens <[email protected]> | 2016-12-12 13:24:17 +0100 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-12-12 15:10:30 +0100 |
commit | f8c14e0246a6c9cb5e8ff47869b5968abb63f010 (patch) | |
tree | d91c005451a74822516669f3f7cc3ade34971abb /src/tools/oping | |
parent | b731adbf7b6fa16490f7abf94e2662d82d76cce0 (diff) | |
download | ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.tar.gz ouroboros-f8c14e0246a6c9cb5e8ff47869b5968abb63f010.zip |
src, tools: Set/get timeout and get qos for flows
Receiver timeouts can now be set on a flow using the flow_set_timeout
function. Specifying NULL disables the timeout. The flow_get_timeout
function gets the value for the timeout.
This commit also deprecates fcntl in favor of flow_get_flags and
flow_set_flags functions.
struct qos_spec is typedef'd as a qosspec_t.
The tools and cdap.c are updated to use the new API.
Fixes a bug in operf client where the client's writer thread wouldn't
cancel on SIGINT.
Diffstat (limited to 'src/tools/oping')
-rw-r--r-- | src/tools/oping/oping.c | 3 | ||||
-rw-r--r-- | src/tools/oping/oping_client.c | 115 | ||||
-rw-r--r-- | src/tools/oping/oping_server.c | 4 |
3 files changed, 46 insertions, 76 deletions
diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 98d12a7b..224b182b 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -54,9 +54,6 @@ struct c { double rtt_avg; double rtt_m2; - flow_set_t * flows; - fqueue_t * fq; - /* needs locking */ struct timespec * times; pthread_mutex_t lock; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index b30ba5f4..c439cf46 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -60,56 +60,54 @@ void * reader(void * o) char buf[OPING_BUF_SIZE]; struct oping_msg * msg = (struct oping_msg *) buf; - int fd = 0; + int fd = *((int *) o); int msg_len = 0; double ms = 0; double d = 0; - (void) o; - - /* FIXME: use flow timeout option once we have it */ - while (client.rcvd != client.count - && (flow_event_wait(client.flows, client.fq, &timeout) - != -ETIMEDOUT)) { - while ((fd = fqueue_next(client.fq)) >= 0) { - msg_len = flow_read(fd, buf, OPING_BUF_SIZE); - if (msg_len < 0) - continue; - - if (ntohl(msg->type) != ECHO_REPLY) { - printf("Invalid message on fd %d.\n", fd); - continue; - } - - if (ntohl(msg->id) >= client.count) { - printf("Invalid id.\n"); - continue; - } - - ++client.rcvd; - - clock_gettime(CLOCK_REALTIME, &now); - - pthread_mutex_lock(&client.lock); - ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) - / 1000.0; - pthread_mutex_unlock(&client.lock); - - printf("%d bytes from %s: seq=%d time=%.3f ms\n", - msg_len, - client.s_apn, - ntohl(msg->id), - ms); - - if (ms < client.rtt_min) - client.rtt_min = ms; - if (ms > client.rtt_max) - client.rtt_max = ms; - - d = (ms - client.rtt_avg); - client.rtt_avg += d / (float) client.rcvd; - client.rtt_m2 += d * (ms - client.rtt_avg); + flow_set_timeout(fd, &timeout); + + while (client.rcvd != client.count) { + msg_len = flow_read(fd, buf, OPING_BUF_SIZE); + if (msg_len == -ETIMEDOUT) + break; + + if (msg_len < 0) + continue; + + if (ntohl(msg->type) != ECHO_REPLY) { + printf("Invalid message on fd %d.\n", fd); + continue; } + + if (ntohl(msg->id) >= client.count) { + printf("Invalid id.\n"); + continue; + } + + ++client.rcvd; + + clock_gettime(CLOCK_REALTIME, &now); + + pthread_mutex_lock(&client.lock); + ms = ts_diff_us(&client.times[ntohl(msg->id)], &now) + / 1000.0; + pthread_mutex_unlock(&client.lock); + + printf("%d bytes from %s: seq=%d time=%.3f ms\n", + msg_len, + client.s_apn, + ntohl(msg->id), + ms); + + if (ms < client.rtt_min) + client.rtt_min = ms; + if (ms > client.rtt_max) + client.rtt_max = ms; + + d = (ms - client.rtt_avg); + client.rtt_avg += d / (float) client.rcvd; + client.rtt_m2 += d * (ms - client.rtt_avg); } return (void *) 0; @@ -164,20 +162,8 @@ void * writer(void * o) static int client_init(void) { - client.flows = flow_set_create(); - if (client.flows == NULL) - return -ENOMEM; - - client.fq = fqueue_create(); - if (client.fq == NULL) { - flow_set_destroy(client.flows); - return -ENOMEM; - } - client.times = malloc(sizeof(struct timespec) * client.count); if (client.times == NULL) { - flow_set_destroy(client.flows); - fqueue_destroy(client.fq); pthread_mutex_unlock(&client.lock); return -ENOMEM; } @@ -197,12 +183,6 @@ static int client_init(void) void client_fini(void) { - if (client.flows != NULL) - flow_set_destroy(client.flows); - - if (client.fq != NULL) - fqueue_destroy(client.fq); - if (client.times != NULL) free(client.times); } @@ -235,17 +215,12 @@ int client_main(void) fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { - flow_set_destroy(client.flows); - fqueue_destroy(client.fq); printf("Failed to allocate flow.\n"); return -1; } - flow_set_add(client.flows, fd); - if (flow_alloc_res(fd)) { printf("Flow allocation refused.\n"); - flow_set_del(client.flows, fd); flow_dealloc(fd); client_fini(); return -1; @@ -255,7 +230,7 @@ int client_main(void) clock_gettime(CLOCK_REALTIME, &tic); - pthread_create(&client.reader_pt, NULL, reader, NULL); + pthread_create(&client.reader_pt, NULL, reader, &fd); pthread_create(&client.writer_pt, NULL, writer, &fd); pthread_join(client.writer_pt, NULL); @@ -283,8 +258,6 @@ int client_main(void) printf("NaN ms\n"); } - flow_set_del(client.flows, fd); - flow_dealloc(fd); client_fini(); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 8d7ab1db..63fca567 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -115,7 +115,7 @@ void * accept_thread(void * o) { int fd = 0; struct timespec now = {0, 0}; - struct qos_spec qs; + qosspec_t qs; (void) o; @@ -143,7 +143,7 @@ void * accept_thread(void * o) server.times[fd] = now; pthread_mutex_unlock(&server.lock); - flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK | FLOW_O_RDWR); + flow_set_flags(fd, FLOW_O_NONBLOCK | FLOW_O_RDWR); } return (void *) 0; |