From b6a04b551d64531452089b869f9fa56f7e545e4d Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 23 Aug 2017 16:03:46 +0200 Subject: lib: Make sender and receiver inactivity simple checks This makes the sender and receiver inactivity timers into simple checks that compare when the last SDU was sent to the current time to set the receiver or sender inactivity. --- src/lib/dev.c | 221 ++++++++++++++++++---------------------------------------- 1 file changed, 68 insertions(+), 153 deletions(-) (limited to 'src/lib/dev.c') diff --git a/src/lib/dev.c b/src/lib/dev.c index b6c6087f..43543af3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -51,9 +51,12 @@ #define MPL 2000 /* ms */ +#ifndef CLOCK_REALTIME_COARSE +#define CLOCK_REALTIME_COARSE CLOCK_REALTIME +#endif + struct flow_set { - size_t idx; - bool np1_set; + size_t idx; }; struct fqueue { @@ -71,19 +74,19 @@ enum port_state { }; struct frcti { - bool used; + bool used; - struct tw_f * snd_inact; - bool snd_drf; - uint64_t snd_lwe; - uint64_t snd_rwe; + struct timespec last_snd; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; - struct tw_f * rcv_inact; - bool rcv_drf; - uint64_t rcv_lwe; - uint64_t rcv_rwe; + struct timespec last_rcv; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; - uint8_t conf_flags; + uint8_t conf_flags; }; struct port { @@ -268,27 +271,15 @@ static int frcti_init(int fd) static void frcti_clear(int fd) { - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - frcti->used = false; - frcti->snd_inact = NULL; - frcti->rcv_inact = NULL; + ai.frcti[fd].used = false; } static void frcti_fini(int fd) { - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - /* FIXME: We actually need to wait until these timers become NULL. */ - if (frcti->snd_inact != NULL) - timerwheel_stop(ai.tw, frcti->snd_inact); - - if (frcti->rcv_inact != NULL) - timerwheel_stop(ai.tw, frcti->rcv_inact); + /* + * FIXME: In case of reliable transmission we should + * make sure everything is acked. + */ frcti_clear(fd); } @@ -304,78 +295,49 @@ static int frcti_configure(int fd, return 0; } -static void frcti_snd_inactivity(void * arg) -{ - struct frcti * frcti; - - pthread_rwlock_wrlock(&ai.lock); - - frcti = (struct frcti * ) arg; - - frcti->snd_drf = true; - frcti->snd_inact = NULL; - - pthread_rwlock_unlock(&ai.lock); -} - -/* Called under flows lock */ static int frcti_write(int fd, struct shm_du_buff * sdb) { struct frcti * frcti; struct frct_pci pci; + struct timespec now = {0, 0}; memset(&pci, 0, sizeof(pci)); frcti = &(ai.frcti[fd]); - pthread_rwlock_unlock(&ai.lock); + clock_gettime(CLOCK_REALTIME_COARSE, &now); - timerwheel_move(ai.tw); + pthread_rwlock_wrlock(&ai.lock); - pthread_rwlock_rdlock(&ai.lock); + /* Check if sender inactivity is true. */ + if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) + frcti->snd_drf = true; - /* - * Set the DRF in the first packet of a new run of SDUs, - * otherwise simply recharge the timer. - */ + /* Set the DRF in the first packet of a new run of SDUs. */ if (frcti->snd_drf) { - frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, - frcti, 2 * MPL); - if (frcti->snd_inact == NULL) - return -1; - pci.flags |= FLAG_DATA_RUN; frcti->snd_drf = false; - } else { - if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) - return -1; } + frcti->last_snd = now; + pci.seqno = frcti->snd_lwe++; pci.type |= PDU_TYPE_DATA; - if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) + if (frct_pci_ser(sdb, &pci, frcti->conf_flags & CONF_ERROR_CHECK)) { + pthread_rwlock_unlock(&ai.lock); return -1; + } - if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; - - return 0; -} - -static void frcti_rcv_inactivity(void * arg) -{ - struct frcti * frcti; - - pthread_rwlock_wrlock(&ai.lock); - - frcti = (struct frcti * ) arg; - - frcti->rcv_drf = true; - frcti->rcv_inact = NULL; + } pthread_rwlock_unlock(&ai.lock); + + return 0; } static ssize_t frcti_read(int fd) @@ -385,8 +347,7 @@ static ssize_t frcti_read(int fd) struct frcti * frcti; struct frct_pci pci; struct shm_du_buff * sdb; - - timerwheel_move(ai.tw); + struct timespec now = {0, 0}; pthread_rwlock_rdlock(&ai.lock); @@ -412,6 +373,8 @@ static ssize_t frcti_read(int fd) if (idx < 0) return idx; + clock_gettime(CLOCK_REALTIME_COARSE, &now); + pthread_rwlock_rdlock(&ai.lock); frcti = &(ai.frcti[fd]); @@ -425,7 +388,11 @@ static ssize_t frcti_read(int fd) return -EAGAIN; } - /* We don't accept packets when there is no inactivity timer. */ + /* Check if receiver inactivity is true. */ + if (!frcti->rcv_drf && ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) + frcti->rcv_drf = true; + + /* We don't accept packets when there is receiver inactivity. */ if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); @@ -433,68 +400,22 @@ static ssize_t frcti_read(int fd) } /* - * If there is an inactivity timer and the DRF is set, + * If there is no receiver inactivity and the DRF is set, * reset the state of the connection. */ - if (pci.flags & FLAG_DATA_RUN) { - frcti->rcv_drf = true; - if (frcti->rcv_inact != NULL) - timerwheel_stop(ai.tw, frcti->rcv_inact); + if (pci.flags & FLAG_DATA_RUN) frcti->rcv_lwe = pci.seqno; - } - - /* - * Start receiver inactivity if this packet has the DRF, - * otherwise simply restart it. - */ - if (frcti->rcv_drf) { - frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, - frcti, 3 * MPL); - if (frcti->rcv_inact == NULL) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } + if (frcti->rcv_drf) frcti->rcv_drf = false; - } else { - if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } - } + + frcti->last_rcv = now; pthread_rwlock_unlock(&ai.lock); return idx; } -static int frcti_event_wait(struct flow_set * set, - struct fqueue * fq, - const struct timespec * timeout) -{ - int ret; - - assert(set); - assert(fq); - - timerwheel_move(ai.tw); - - /* - * FIXME: Return the fq only if a data SDU - * for the application is available. - */ - - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); - if (ret == -ETIMEDOUT) { - fq->fqsize = 0; - return -ETIMEDOUT; - } - - return ret; -} - static void flow_clear(int fd) { assert(!(fd < 0)); @@ -1073,16 +994,17 @@ ssize_t flow_write(int fd, shm_rdrbuff_remove(ai.rdrb, idx); return -ENOTALLOC; } + + pthread_rwlock_unlock(&ai.lock); } else { + pthread_rwlock_unlock(&ai.lock); + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { - pthread_rwlock_unlock(&ai.lock); shm_rdrbuff_remove(ai.rdrb, idx); return -1; } } - pthread_rwlock_unlock(&ai.lock); - return 0; } @@ -1090,9 +1012,10 @@ ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx = -1; - ssize_t n; + ssize_t idx = -1; + ssize_t n; uint8_t * sdu; + bool used; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; @@ -1104,9 +1027,11 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } + used = ai.frcti[fd].used; + pthread_rwlock_unlock(&ai.lock); - if (!ai.frcti[fd].used) + if (!used) idx = shm_rbuff_read(ai.flows[fd].rx_rb); else idx = frcti_read(fd); @@ -1146,8 +1071,6 @@ struct flow_set * flow_set_create() return NULL; } - set->np1_set = false; - pthread_rwlock_unlock(&ai.lock); return set; @@ -1208,7 +1131,7 @@ int flow_set_add(struct flow_set * set, if (set == NULL) return -EINVAL; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); @@ -1216,9 +1139,6 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); - if (ai.frcti[fd].used) - set->np1_set = true; - pthread_rwlock_unlock(&ai.lock); return ret; @@ -1230,7 +1150,7 @@ void flow_set_del(struct flow_set * set, if (set == NULL) return; - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); if (ai.flows[fd].port_id >= 0) shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); @@ -1306,12 +1226,7 @@ int flow_event_wait(struct flow_set * set, t = &abstime; } - if (set->np1_set) - ret = frcti_event_wait(set, fq, t); - else - ret = shm_flow_set_wait(ai.fqset, set->idx, - fq->fqueue, t); - + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t); if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1524,15 +1439,15 @@ int ipcp_flow_write(int fd, pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } + + pthread_rwlock_unlock(&ai.lock); } else { - if (frcti_write(fd, sdb)) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); + + if (frcti_write(fd, sdb)) return -1; - } } - pthread_rwlock_unlock(&ai.lock); - return 0; } -- cgit v1.2.3