From e03dedf0a4c40ceeb063f95777bc99628a980ec9 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Mon, 12 Mar 2018 17:27:10 +0100 Subject: lib: Allow partial read This implements partial read of packets if the buffer supplied to flow_read() is smaller than the packet in the buffer. If the number of bytes returned by flow_read equals the size of the buffer, the next read() will deliver the next bytes of the packet (or 0 if the packet was exactly the size of the buffer on the previous read). Implements #7. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/dev.c | 59 +++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/lib/dev.c b/src/lib/dev.c index 3564c293..115cd565 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -52,12 +52,14 @@ #include #include -#define BUF_SIZE 1500 - #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif +/* Partial read information. */ +#define NO_PART -1 +#define DONE_PART -2 + struct flow_set { size_t idx; }; @@ -92,6 +94,7 @@ struct flow { int oflags; qoscube_t cube; qosspec_t spec; + ssize_t part_idx; pid_t pid; @@ -285,11 +288,12 @@ static int flow_init(int port_id, if (ai.flows[fd].set == NULL) goto fail; - ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOWFDEFAULT; - ai.flows[fd].pid = pid; - ai.flows[fd].cube = qc; - ai.flows[fd].spec = qos_cube_to_spec(qc); + ai.flows[fd].port_id = port_id; + ai.flows[fd].oflags = FLOWFDEFAULT; + ai.flows[fd].pid = pid; + ai.flows[fd].cube = qc; + ai.flows[fd].spec = qos_cube_to_spec(qc); + ai.flows[fd].part_idx = NO_PART; ai.ports[port_id].fd = fd; @@ -899,6 +903,11 @@ ssize_t flow_read(int fd, flow = &ai.flows[fd]; + if (flow->part_idx == DONE_PART) { + flow->part_idx = NO_PART; + return 0; + } + clock_gettime(PTHREAD_COND_CLOCK, &abs); pthread_rwlock_rdlock(&ai.lock); @@ -918,26 +927,36 @@ ssize_t flow_read(int fd, pthread_rwlock_unlock(&ai.lock); - idx = frcti_queued_pdu(flow->frcti); + idx = flow->part_idx; if (idx < 0) { - do { - idx = noblock ? shm_rbuff_read(rb) : - shm_rbuff_read_b(rb, abstime); - if (idx < 0) - return idx; - sdb = shm_rdrbuff_get(ai.rdrb, idx); - } while (frcti_rcv(flow->frcti, sdb) != 0); + idx = frcti_queued_pdu(flow->frcti); + if (idx < 0) { + do { + idx = noblock ? shm_rbuff_read(rb) : + shm_rbuff_read_b(rb, abstime); + if (idx < 0) + return idx; + sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, sdb) != 0); + } } n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); assert(n >= 0); - memcpy(buf, sdu, MIN((size_t) n, count)); - - shm_rdrbuff_remove(ai.rdrb, idx); - - return n; + if (n <= (ssize_t) count) { + memcpy(buf, sdu, n); + shm_rdrbuff_remove(ai.rdrb, idx); + flow->part_idx = (n == (ssize_t) count) ? DONE_PART : NO_PART; + return n; + } else { + memcpy(buf, sdu, count); + sdb = shm_rdrbuff_get(ai.rdrb, idx); + shm_du_buff_head_release(sdb, n); + flow->part_idx = idx; + return count; + } } /* fqueue functions. */ -- cgit v1.2.3