diff options
author | Dimitri Staessens <[email protected]> | 2018-07-27 00:18:20 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2018-07-27 08:53:17 +0200 |
commit | bfc29ca20406ccd69363b0f9796987534318e7ae (patch) | |
tree | ac41f39e11cfe989daa10277f14a9597bb5cc3b8 /src/lib/shm_rdrbuff.c | |
parent | 1c7dcc2d37dc5a41379ca08b523bda58a51f11de (diff) | |
download | ouroboros-bfc29ca20406ccd69363b0f9796987534318e7ae.tar.gz ouroboros-bfc29ca20406ccd69363b0f9796987534318e7ae.zip |
lib: Support for rudimentary retransmission
This adds rudimentary support for sending and processing
acknowledgments and doing retransmission.
It replaces the generic timerwheel with a specific one for
retransmission. This is currently a fixed wheel allowing
retransmissions to be scheduled up to about 32 seconds into the
future. It currently has an 8ms resolution. This could be made
configurable in the future. Failures of the flow (i.e. rtx not
working) are indicated by the rxmwheel_move() function returning a fd.
This is currently not yet handled (maybe just setting the state of the
flow to FLOWDOWN is a better solution).
The shm_rdrbuff tracks the number of users of a du_buff. One user is
the full stack, each retransmission will increment the refs counter
(which effectively acts as a semaphore). The refs counter is
decremented when a packet is acked. The du_buff is only allowed to be
removed if there is only one user left (the "stack").
When a packet is retransmitted, it is copied in the rdrbuff. This is
to ensure integrity of the packet when multiple layers do
retransmission and it is passed down the stack again.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/shm_rdrbuff.c')
-rw-r--r-- | src/lib/shm_rdrbuff.c | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 5ae2085d..182ad084 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -65,11 +65,6 @@ #define shm_rdrb_empty(rdrb) \ (*rdrb->tail == *rdrb->head) -enum shm_du_buff_flags { - SDB_VALID = 0, - SDB_NULL -}; - struct shm_du_buff { size_t size; #ifdef SHM_RDRB_MULTI_BLOCK @@ -77,7 +72,7 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - size_t flags; + size_t refs; size_t idx; }; @@ -96,11 +91,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + (sdb = get_tail_ptr(rdrb))->refs == 0) *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0) *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cond_broadcast(rdrb->healthy); @@ -108,7 +103,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) static void sanitize(struct shm_rdrbuff * rdrb) { - get_head_ptr(rdrb)->flags = SDB_NULL; + --get_head_ptr(rdrb)->refs; garbage_collect(rdrb); pthread_mutex_consistent(rdrb->lock); } @@ -338,7 +333,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -347,7 +342,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -434,7 +429,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -443,7 +438,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -497,6 +492,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) { + struct shm_du_buff * sdb; + assert(rdrb); assert(idx < (SHM_BUFFER_SIZE)); @@ -508,10 +505,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, #endif assert(!shm_rdrb_empty(rdrb)); - idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; + sdb = idx_to_du_buff_ptr(rdrb, idx); - if (idx == *rdrb->tail) - garbage_collect(rdrb); + if (sdb->refs == 1) { /* only stack needs it, can be removed */ + sdb->refs = 0; + if (idx == *rdrb->tail) + garbage_collect(rdrb); + } pthread_mutex_unlock(rdrb->lock); @@ -603,3 +603,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb, sdb->du_tail = sdb->du_head + len; } + +int shm_du_buff_wait_ack(struct shm_du_buff * sdb) +{ + __sync_add_and_fetch(&sdb->refs, 1); + + return 0; +} + +int shm_du_buff_ack(struct shm_du_buff * sdb) +{ + __sync_sub_and_fetch(&sdb->refs, 1); + return 0; +} |