diff options
author | Sander Vrijders <[email protected]> | 2017-08-17 16:09:24 +0000 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-08-17 16:09:24 +0000 |
commit | eefae235dd7af96eef3dc4f82f706170c379d260 (patch) | |
tree | 3959a7206bfa3b5de2881d4404a2746a75aaefda /src/lib/dev.c | |
parent | c7cb10810c447579cb20a8bc99049baeeb8e2065 (diff) | |
parent | 4d9c4025222e19dac9a90cabe8bd886e47959ad6 (diff) | |
download | ouroboros-eefae235dd7af96eef3dc4f82f706170c379d260.tar.gz ouroboros-eefae235dd7af96eef3dc4f82f706170c379d260.zip |
Merged in sandervrijders/ouroboros/be-frct (pull request #555)
lib: Add basic FRCT mechanisms
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 440 |
1 files changed, 370 insertions, 70 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 9354855b..e81bf105 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,6 +34,8 @@ #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> +#include <ouroboros/timerwheel.h> +#include <ouroboros/frct_pci.h> #include <stdlib.h> #include <string.h> @@ -41,8 +43,14 @@ #define BUF_SIZE 1500 +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +#define MPL 2000 /* ms */ + struct flow_set { size_t idx; + bool np1_set; }; struct fqueue { @@ -59,6 +67,26 @@ enum port_state { PORT_DESTROY }; +struct frcti { + bool used; + + struct tw_f * snd_inact; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; + + struct tw_f * rcv_inact; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; + + bool resource_control; + bool reliable; + bool error_check; + bool ordered; + bool partial; +}; + struct port { int fd; @@ -89,10 +117,14 @@ struct { struct shm_rdrbuff * rdrb; struct shm_flow_set * fqset; + struct timerwheel * tw; + int tw_users; + struct bmp * fds; struct bmp * fqueues; struct flow * flows; struct port * ports; + struct frcti * frcti; pthread_rwlock_t lock; } ai; @@ -203,6 +235,242 @@ static int api_announce(char * ap_name) return ret; } +/* Call under flows lock */ +static int finalize_write(int fd, + size_t idx) +{ + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) + return -ENOTALLOC; + + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + return 0; +} + +static int frcti_init(int fd) +{ + ai.frcti[fd].used = true; + + ai.frcti[fd].snd_drf = true; + ai.frcti[fd].snd_lwe = 0; + ai.frcti[fd].snd_rwe = 0; + + ai.frcti[fd].rcv_drf = true; + ai.frcti[fd].rcv_lwe = 0; + ai.frcti[fd].rcv_rwe = 0; + + return 0; +} + +static void frcti_fini(int fd) +{ + struct frcti * frcti; + + frcti = &(ai.frcti[fd]); + + frcti->used = false; + + /* FIXME: We actually need to wait until these timers become NULL. */ + if (frcti->snd_inact != NULL) + timerwheel_stop(ai.tw, frcti->snd_inact); + + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); +} + +static int frcti_configure(int fd, + qosspec_t * qos) +{ + /* FIXME: Send configuration message here to other side. */ + + (void) fd; + (void) qos; + + return 0; +} + +static void frcti_snd_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->snd_drf = true; + frcti->snd_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +/* Called under flows lock */ +static int frcti_write(int fd, + struct shm_du_buff * sdb) +{ + struct frcti * frcti; + struct frct_pci pci; + + memset(&pci, 0, sizeof(pci)); + + frcti = &(ai.frcti[fd]); + + /* + * Set the DRF in the first packet of a new run of SDUs, + * otherwise simply recharge the timer. + */ + if (frcti->snd_drf) { + frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, + frcti, 2 * MPL); + if (frcti->snd_inact == NULL) + return -1; + + pci.flags |= FLAG_DATA_RUN; + frcti->snd_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) + return -1; + } + + pci.seqno = frcti->snd_lwe++; + pci.type |= PDU_TYPE_DATA; + + if (frct_pci_ser(sdb, &pci, frcti->error_check)) + return -1; + + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + return -ENOTALLOC; + + return 0; +} + +static void frcti_rcv_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->rcv_drf = true; + frcti->rcv_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +static ssize_t frcti_read(int fd) +{ + ssize_t idx = -1; + struct timespec abstime; + struct frcti * frcti; + struct frct_pci pci; + struct shm_du_buff * sdb; + + pthread_rwlock_rdlock(&ai.lock); + + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.lock); + } else { + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + + pthread_rwlock_unlock(&ai.lock); + + if (timeo) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); + idx = shm_rbuff_read_b(rb, &abstime); + } else { + idx = shm_rbuff_read_b(rb, NULL); + } + } + + if (idx < 0) + return idx; + + pthread_rwlock_rdlock(&ai.lock); + + frcti = &(ai.frcti[fd]); + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, frcti->error_check)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* We don't accept packets when there is no inactivity timer. */ + if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* + * If there is an inactivity timer and the DRF is set, + * reset the state of the connection. + */ + if (pci.flags & FLAG_DATA_RUN) { + frcti->rcv_drf = true; + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); + frcti->rcv_lwe = pci.seqno; + } + + /* + * Start receiver inactivity if this packet has the DRF, + * otherwise simply restart it. + */ + if (frcti->rcv_drf) { + frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, + frcti, 3 * MPL); + if (frcti->rcv_inact == NULL) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + frcti->rcv_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } + + pthread_rwlock_unlock(&ai.lock); + + return idx; +} + +static int frcti_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + assert(set); + assert(fq); + assert(timeout); + + /* + * FIXME: Return the fq only if a data SDU + * for the application is available. + */ + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + + return ret; +} + static void flow_clear(int fd) { assert(!(fd < 0)); @@ -230,6 +498,9 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); + if (ai.frcti[fd].used) + frcti_fini(fd); + flow_clear(fd); } @@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name) if (ai.flows == NULL) goto fail_flows; - for (i = 0; i < AP_MAX_FLOWS; ++i) + ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); + if (ai.frcti == NULL) + goto fail_frcti; + + for (i = 0; i < AP_MAX_FLOWS; ++i) { flow_clear(i); + frcti_fini(i); + } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) @@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name) if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; + ai.tw = timerwheel_create(TW_RESOLUTION, + TW_RESOLUTION * TW_ELEMENTS); + if (ai.tw == NULL) + goto fail_timerwheel; + return 0; + fail_timerwheel: + pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < IRMD_MAX_FLOWS; ++i) pthread_cond_destroy(&ai.ports[i].state_cond); @@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name) fail_ap_name: free(ai.ports); fail_ports: + free(ai.frcti); + fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); fail_rdrb: - shm_flow_set_destroy(ai.fqset); + shm_flow_set_destroy(ai.fqset); fail_fqset: bmp_destroy(ai.fqueues); fail_fqueues: @@ -409,6 +695,9 @@ void ouroboros_fini() shm_rdrbuff_close(ai.rdrb); + if (ai.tw != NULL) + timerwheel_destroy(ai.tw); + free(ai.flows); free(ai.ports); @@ -463,9 +752,15 @@ int flow_accept(qosspec_t * qs, if (fd < 0) return fd; + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + if (qs != NULL) *qs = ai.flows[fd].spec; + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -505,7 +800,7 @@ int flow_alloc(const char * dst_name, return -EIRMD; } - if (recv_msg->result != 0) { + if (recv_msg->result != 0) { int res = recv_msg->result; irm_msg__free_unpacked(recv_msg, NULL); return res; @@ -520,6 +815,22 @@ int flow_alloc(const char * dst_name, irm_msg__free_unpacked(recv_msg, NULL); + if (fd < 0) + return fd; + + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + + if (frcti_configure(fd, qs)) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.lock); + return -1; + } + + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -720,34 +1031,31 @@ ssize_t flow_write(int fd, return idx; } - if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pthread_rwlock_unlock(&ai.lock); - assert(tx_rb); - - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (shm_rbuff_write(tx_rb, idx) < 0) { - shm_rdrbuff_remove(rdrb, idx); - return -ENOTALLOC; - } - pthread_rwlock_rdlock(&ai.lock); } - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, idx)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -772,21 +1080,12 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } - if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.lock); - } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - bool timeo = ai.flows[fd].timesout; - struct timespec timeout = ai.flows[fd].rcv_timeo; - - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); - if (timeo) - idx = shm_rbuff_read_b(rb, &timeout); - else - idx = shm_rbuff_read_b(rb, NULL); - } + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); if (idx < 0) { assert(idx == -EAGAIN || idx == -ETIMEDOUT); @@ -823,6 +1122,8 @@ struct flow_set * flow_set_create() return NULL; } + set->np1_set = false; + pthread_rwlock_unlock(&ai.lock); return set; @@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + if (ai.frcti[fd].used) + set->np1_set = true; + pthread_rwlock_unlock(&ai.lock); return ret; @@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set * set, struct fqueue * fq, const struct timespec * timeout) { - ssize_t ret; + ssize_t ret; + struct timespec abstime; if (set == NULL || fq == NULL) return -EINVAL; @@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set * set, assert(!fq->next); - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + if (set->np1_set) + ret = frcti_event_wait(set, fq, &abstime); + else + ret = shm_flow_set_wait(ai.fqset, set->idx, + fq->fqueue, &abstime); + if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1132,9 +1447,8 @@ int ipcp_flow_read(int fd, { ssize_t idx = -1; int port_id = -1; - struct shm_rbuff * rb; - assert(fd >=0); + assert(fd >= 0); assert(sdb); pthread_rwlock_rdlock(&ai.lock); @@ -1144,11 +1458,13 @@ int ipcp_flow_read(int fd, return -ENOTALLOC; } - rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.lock); - idx = shm_rbuff_read(rb); + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); + if (idx < 0) return idx; @@ -1160,8 +1476,6 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - size_t idx; - if (sdb == NULL) return -EINVAL; @@ -1179,10 +1493,17 @@ int ipcp_flow_write(int fd, assert(ai.flows[fd].tx_rb); - idx = shm_du_buff_get_idx(sdb); - - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, sdb)) { + pthread_rwlock_unlock(&ai.lock); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -1274,32 +1595,11 @@ int local_flow_write(int fd, return -ENOTALLOC; } - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - pthread_rwlock_unlock(&ai.lock); - - return 0; -} - -int ipcp_read_shim(int fd, - struct shm_du_buff ** sdb) -{ - ssize_t idx; - - pthread_rwlock_rdlock(&ai.lock); - - assert(ai.flows[fd].rx_rb); - - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - if (idx < 0) { + if (finalize_write(fd, idx)) { pthread_rwlock_unlock(&ai.lock); - return -EAGAIN; + return -ENOTALLOC; } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); return 0; |