diff options
author | Dimitri Staessens <[email protected]> | 2020-05-01 18:23:58 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2020-05-02 11:34:28 +0200 |
commit | 25d1721e7dc9fa15c8a7c5513f30e636e9bda397 (patch) | |
tree | 9a012ab53513ffc78bf122e448045cc6084c13a4 /src/lib | |
parent | 6415d0f683dbe5f20d4d00c74bf75a795753f444 (diff) | |
download | ouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.tar.gz ouroboros-25d1721e7dc9fa15c8a7c5513f30e636e9bda397.zip |
lib: Create an rxmwheel per flow
The single retransmission wheel caused locking headaches as the calls
for different flows could block on the same rxmwheel. This stabilizes
the stack, but if the rdrbuff gets full there can now be big delays.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/dev.c | 21 | ||||
-rw-r--r-- | src/lib/frct.c | 86 | ||||
-rw-r--r-- | src/lib/rxmwheel.c | 159 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 11 |
4 files changed, 149 insertions, 128 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 80d7e9ad..e8989a48 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -429,9 +429,6 @@ static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - if (rxmwheel_init()) - goto fail_rxmwheel; - ai.fqset = shm_flow_set_open(getpid()); if (ai.fqset == NULL) goto fail_fqset; @@ -439,8 +436,6 @@ static void init(int argc, return; fail_fqset: - rxmwheel_fini(); - fail_rxmwheel: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -474,8 +469,6 @@ static void fini(void) if (ai.fds == NULL) return; - rxmwheel_fini(); - if (ai.prog != NULL) free(ai.prog); @@ -1080,15 +1073,16 @@ ssize_t flow_read(int fd, flow = &ai.flows[fd]; + clock_gettime(PTHREAD_COND_CLOCK, &abs); + + pthread_rwlock_rdlock(&ai.lock); + if (flow->part_idx == DONE_PART) { + pthread_rwlock_unlock(&ai.lock); flow->part_idx = NO_PART; return 0; } - clock_gettime(PTHREAD_COND_CLOCK, &abs); - - pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; @@ -1141,8 +1135,13 @@ ssize_t flow_read(int fd, if (n <= (ssize_t) count) { memcpy(buf, packet, n); shm_rdrbuff_remove(ai.rdrb, idx); + + pthread_rwlock_wrlock(&ai.lock); + flow->part_idx = (partrd && n == (ssize_t) count) ? DONE_PART : NO_PART; + + pthread_rwlock_unlock(&ai.lock); return n; } else { if (partrd) { diff --git a/src/lib/frct.c b/src/lib/frct.c index 0e9d64c7..3c180128 100644 --- a/src/lib/frct.c +++ b/src/lib/frct.c @@ -21,15 +21,12 @@ */ /* Default Delta-t parameters */ -#define DELT_MPL 60000 /* ms */ -#define DELT_A 3000 /* ms */ -#define DELT_R 20000 /* ms */ +#define DELT_MPL (60 * MILLION) /* us */ +#define DELT_A (1 * MILLION) /* us */ +#define DELT_R (20 * MILLION) /* us */ #define RQ_SIZE 1024 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - #define FRCT_PCILEN (sizeof(struct frct_pci)) struct frct_cr { @@ -44,24 +41,26 @@ struct frct_cr { }; struct frcti { - int fd; + int fd; + + time_t mpl; + time_t a; + time_t r; - time_t mpl; - time_t a; - time_t r; + time_t srtt_us; /* smoothed rtt */ + time_t mdev_us; /* mdev */ + time_t rto; /* retransmission timeout */ + uint32_t rttseq; + struct timespec t_probe; /* probe time */ + bool probe; /* probe active */ - time_t srtt_us; /* smoothed rtt */ - time_t mdev_us; /* mdev */ - time_t rto; /* retransmission timeout */ - uint32_t rttseq; - struct timespec t_probe; /* probe time */ - bool probe; /* probe active */ + struct frct_cr snd_cr; + struct frct_cr rcv_cr; - struct frct_cr snd_cr; - struct frct_cr rcv_cr; + struct rxmwheel * rw; - ssize_t rq[RQ_SIZE]; - pthread_rwlock_t lock; + ssize_t rq[RQ_SIZE]; + pthread_rwlock_t lock; }; enum frct_flags { @@ -111,28 +110,35 @@ static struct frcti * frcti_create(int fd) frcti->r = DELT_R; frcti->fd = fd; - delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + delta_t = frcti->mpl + frcti->a + frcti->r; - frcti->snd_cr.inact = 3 * delta_t; - frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); + frcti->snd_cr.inact = 3 * delta_t / MILLION; /* s */ + frcti->snd_cr.act = now.tv_sec - (frcti->snd_cr.inact + 1); - frcti->rttseq = 0; - frcti->probe = false; + frcti->rttseq = 0; + frcti->probe = false; - frcti->srtt_us = 0; /* updated on first ACK */ - frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ - frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->srtt_us = 0; /* updated on first ACK */ + frcti->mdev_us = 10000; /* initial rxm will be after 20 ms */ + frcti->rto = 20000; /* initial rxm will be after 20 ms */ + frcti->rw = NULL; if (ai.flows[fd].qs.loss == 0) { frcti->snd_cr.cflags |= FRCTFRTX; frcti->rcv_cr.cflags |= FRCTFRTX; + frcti->rw = rxmwheel_create(); + if (frcti->rw == NULL) + goto fail_rw; } - frcti->rcv_cr.inact = 2 * delta_t; - frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + frcti->rcv_cr.inact = 2 * delta_t / MILLION; /* s */ + frcti->rcv_cr.act = now.tv_sec - (frcti->rcv_cr.inact + 1); + return frcti; + fail_rw: + pthread_rwlock_destroy(&frcti->lock); fail_lock: free(frcti); fail_malloc: @@ -146,7 +152,8 @@ static void frcti_destroy(struct frcti * frcti) * make sure everything we sent is acked. */ - rxmwheel_clear(frcti->fd); + if (frcti->rw != NULL) + rxmwheel_destroy(frcti->rw); pthread_rwlock_destroy(&frcti->lock); @@ -229,13 +236,15 @@ static int __frcti_snd(struct frcti * frcti, struct timespec now; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; + uint32_t seqno; assert(frcti); snd_cr = &frcti->snd_cr; rcv_cr = &frcti->rcv_cr; - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); pci = frcti_alloc_head(sdb); if (pci == NULL) @@ -259,7 +268,9 @@ static int __frcti_snd(struct frcti * frcti, frcti->snd_cr.lwe = snd_cr->seqno - 1; } - pci->seqno = hton32(snd_cr->seqno); + seqno = snd_cr->seqno; + pci->seqno = hton32(seqno); + if (!(snd_cr->cflags & FRCTFRTX)) { snd_cr->lwe++; } else { @@ -269,8 +280,6 @@ static int __frcti_snd(struct frcti * frcti, frcti->probe = true; } - rxmwheel_add(frcti, snd_cr->seqno, sdb); - if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) { pci->flags |= FRCT_ACK; pci->ackno = hton32(rcv_cr->lwe); @@ -282,6 +291,9 @@ static int __frcti_snd(struct frcti * frcti, pthread_rwlock_unlock(&frcti->lock); + if (frcti->rw != NULL) + rxmwheel_add(frcti->rw, frcti, seqno, sdb); + return 0; } @@ -384,13 +396,13 @@ static int __frcti_rcv(struct frcti * frcti, if (ret == 0 && !(pci->flags & FRCT_DATA)) shm_rdrbuff_remove(ai.rdrb, idx); - rxmwheel_move(); + if (frcti->rw != NULL) + rxmwheel_move(frcti->rw); return ret; drop_packet: pthread_rwlock_unlock(&frcti->lock); shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; } diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c index 28cd78de..dbdd9377 100644 --- a/src/lib/rxmwheel.c +++ b/src/lib/rxmwheel.c @@ -29,7 +29,6 @@ #define RXMQ_MAX (1 << RXMQ_M) /* us */ /* Small inacurracy to avoid slow division by MILLION. */ -#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20)) #define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10)) #define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1)) @@ -39,26 +38,28 @@ struct rxm { struct shm_du_buff * sdb; uint8_t * head; uint8_t * tail; - time_t t0; /* Time when original was sent (s). */ + time_t t0; /* Time when original was sent (us). */ size_t mul; /* RTO multiplier. */ struct frcti * frcti; }; -struct { +struct rxmwheel { struct list_head wheel[RXMQ_SLOTS]; size_t prv; /* Last processed slot. */ pthread_mutex_t lock; -} rw; +}; -static void rxmwheel_fini(void) +static void rxmwheel_destroy(struct rxmwheel * rw) { size_t i; struct list_head * p; struct list_head * h; + pthread_mutex_destroy(&rw->lock); + for (i = 0; i < RXMQ_SLOTS; ++i) { - list_for_each_safe(p, h, &rw.wheel[i]) { + list_for_each_safe(p, h, &rw->wheel[i]) { struct rxm * rxm = list_entry(p, struct rxm, next); list_del(&rxm->next); shm_du_buff_ack(rxm->sdb); @@ -68,66 +69,49 @@ static void rxmwheel_fini(void) } } -static int rxmwheel_init(void) +static struct rxmwheel * rxmwheel_create(void) { - struct timespec now; - size_t i; + struct rxmwheel * rw; + struct timespec now; + size_t i; - if (pthread_mutex_init(&rw.lock, NULL)) - return -1; + rw = malloc(sizeof(*rw)); + if (rw == NULL) + return NULL; + + if (pthread_mutex_init(&rw->lock, NULL)) { + free(rw); + return NULL; + } clock_gettime(PTHREAD_COND_CLOCK, &now); /* Mark the previous timeslot as the last one processed. */ - rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); + rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1); for (i = 0; i < RXMQ_SLOTS; ++i) - list_head_init(&rw.wheel[i]); - - return 0; -} - -static void rxmwheel_clear(int fd) -{ - size_t i; + list_head_init(&rw->wheel[i]); - /* FIXME: Add list element to avoid looping over full rxmwheel. */ - pthread_mutex_lock(&rw.lock); - - for (i = 0; i < RXMQ_SLOTS; ++i) { - struct list_head * p; - struct list_head * h; - - list_for_each_safe(p, h, &rw.wheel[i]) { - struct rxm * r = list_entry(p, struct rxm, next); - if (r->frcti->fd == fd) { - list_del(&r->next); - shm_du_buff_ack(r->sdb); - ipcp_sdb_release(r->sdb); - free(r); - } - } - } - - pthread_mutex_unlock(&rw.lock); + return rw; } static void check_probe(struct frcti * frcti, uint32_t seqno) { - /* disable rtt probe if this packet */ + /* Disable rtt probe on retransmitted packet! */ - /* TODO: This should be locked, but lock reversal! */ + pthread_rwlock_wrlock(&frcti->lock); if (frcti->probe && ((frcti->rttseq + 1) == seqno)) { /* Backoff to avoid never updating rtt */ frcti->srtt_us += frcti->mdev_us; frcti->probe = false; } + + pthread_rwlock_unlock(&frcti->lock); } -/* Return fd on r-timer expiry. */ -static int rxmwheel_move(void) +static void rxmwheel_move(struct rxmwheel * rw) { struct timespec now; struct list_head * p; @@ -135,19 +119,22 @@ static int rxmwheel_move(void) size_t slot; size_t i; - pthread_mutex_lock(&rw.lock); + pthread_mutex_lock(&rw->lock); + + pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, + (void *) &rw->lock); clock_gettime(PTHREAD_COND_CLOCK, &now); slot = ts_to_slot(now); - i = rw.prv; + i = rw->prv; if (slot < i) slot += RXMQ_SLOTS; while (i++ < slot) { - list_for_each_safe(p, h, &rw.wheel[i & (RXMQ_SLOTS - 1)]) { + list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) { struct rxm * r; struct frct_cr * snd_cr; struct frct_cr * rcv_cr; @@ -156,42 +143,55 @@ static int rxmwheel_move(void) struct shm_du_buff * sdb; uint8_t * head; struct flow * f; + int fd; + uint32_t snd_lwe; + uint32_t rcv_lwe; + time_t rto; r = list_entry(p, struct rxm, next); + list_del(&r->next); snd_cr = &r->frcti->snd_cr; rcv_cr = &r->frcti->rcv_cr; + fd = r->frcti->fd; + f = &ai.flows[fd]; shm_du_buff_ack(r->sdb); + pthread_rwlock_rdlock(&r->frcti->lock); + + snd_lwe = snd_cr->lwe; + rcv_lwe = rcv_cr->lwe; + rto = r->frcti->rto; + + pthread_rwlock_unlock(&r->frcti->lock); + /* Has been ack'd, remove. */ - if ((int) (r->seqno - snd_cr->lwe) < 0) { + if ((int) (r->seqno - snd_lwe) < 0) { ipcp_sdb_release(r->sdb); free(r); continue; } - /* Disable using this seqno as rto probe. */ - check_probe(r->frcti, r->seqno); - /* Check for r-timer expiry. */ - if (ts_to_ms(now) - r->t0 > r->frcti->r) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + if (ts_to_us(now) - r->t0 > r->frcti->r) { ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(ai.flows[fd].rx_rb, + ACL_FLOWDOWN); + shm_rbuff_set_acl(ai.flows[fd].tx_rb, + ACL_FLOWDOWN); + continue; } /* Copy the payload, safe rtx in other layers. */ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) { - /* FIXME: reschedule send instead of failing? */ - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); ipcp_sdb_release(r->sdb); free(r); - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } idx = shm_du_buff_get_idx(sdb); @@ -202,20 +202,20 @@ static int rxmwheel_move(void) /* Release the old copy. */ ipcp_sdb_release(r->sdb); + /* Disable using this seqno as rto probe. */ + check_probe(r->frcti, r->seqno); + /* Update ackno and make sure DRF is not set. */ - ((struct frct_pci *) head)->ackno = ntoh32(rcv_cr->lwe); + ((struct frct_pci *) head)->ackno = ntoh32(rcv_lwe); ((struct frct_pci *) head)->flags &= ~FRCT_DRF; - f = &ai.flows[r->frcti->fd]; - - /* Retransmit the copy. FIXME: cancel flow */ - if (shm_rbuff_write(f->tx_rb, idx)) { - int fd = r->frcti->fd; - pthread_mutex_unlock(&rw.lock); + /* Retransmit the copy. */ + if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) { ipcp_sdb_release(sdb); free(r); - /* FIXME: reschedule send? */ - return fd; + shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN); + shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN); + continue; } /* Reschedule. */ @@ -228,21 +228,20 @@ static int rxmwheel_move(void) r->sdb = sdb; /* Schedule at least in the next time slot */ - rslot = (slot + MAX((f->frcti->rto >> RXMQ_R), 1)) + rslot = (slot + MAX(rto >> RXMQ_R, 1)) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[rslot]); + list_add_tail(&r->next, &rw->wheel[rslot]); } } - rw.prv = slot & (RXMQ_SLOTS - 1); - - pthread_mutex_unlock(&rw.lock); + rw->prv = slot & (RXMQ_SLOTS - 1); - return 0; + pthread_cleanup_pop(true); } -static int rxmwheel_add(struct frcti * frcti, +static int rxmwheel_add(struct rxmwheel * rw, + struct frcti * frcti, uint32_t seqno, struct shm_du_buff * sdb) { @@ -254,8 +253,6 @@ static int rxmwheel_add(struct frcti * frcti, if (r == NULL) return -ENOMEM; - pthread_mutex_lock(&rw.lock); - clock_gettime(PTHREAD_COND_CLOCK, &now); r->t0 = ts_to_us(now); @@ -266,13 +263,19 @@ static int rxmwheel_add(struct frcti * frcti, r->tail = shm_du_buff_tail(sdb); r->frcti = frcti; + pthread_rwlock_rdlock(&r->frcti->lock); + slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1); - list_add_tail(&r->next, &rw.wheel[slot]); + pthread_rwlock_unlock(&r->frcti->lock); + + pthread_mutex_lock(&rw->lock); + + list_add_tail(&r->next, &rw->wheel[slot]); shm_du_buff_wait_ack(sdb); - pthread_mutex_unlock(&rw.lock); + pthread_mutex_unlock(&rw->lock); return 0; } diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 00ffd583..91eb8b5f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -109,7 +109,9 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + while (!shm_rbuff_free(rb) + && ret != -ETIMEDOUT + && !(*rb->acl & ACL_FLOWDOWN)) { if (abstime != NULL) ret = -pthread_cond_timedwait(rb->del, rb->lock, @@ -187,7 +189,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); - while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { + while (shm_rbuff_empty(rb) + && (idx != -ETIMEDOUT) + && !(*rb->acl & ACL_FLOWDOWN)) { if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, @@ -224,6 +228,9 @@ void shm_rbuff_set_acl(struct shm_rbuff * rb, #endif *rb->acl = (size_t) flags; + pthread_cond_broadcast(rb->del); + pthread_cond_broadcast(rb->add); + pthread_mutex_unlock(rb->lock); } |