diff options
author | Sander Vrijders <[email protected]> | 2017-08-17 16:56:00 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2017-08-17 18:02:30 +0200 |
commit | 4d9c4025222e19dac9a90cabe8bd886e47959ad6 (patch) | |
tree | 3959a7206bfa3b5de2881d4404a2746a75aaefda /src/lib/dev.c | |
parent | c7cb10810c447579cb20a8bc99049baeeb8e2065 (diff) | |
download | ouroboros-4d9c4025222e19dac9a90cabe8bd886e47959ad6.tar.gz ouroboros-4d9c4025222e19dac9a90cabe8bd886e47959ad6.zip |
lib: Add basic FRCT mechanisms
This adds the basic FRCT mechanisms to the library. Upon flow alloc or
accept an FRCT instance is now created and used when reading or
writing to the flow. The timerwheel has been refactored to allow
recharging timers and removing them and is now part of the
library. The first SDU sent over the connection has the DRF set and
this initializes the connection. Sender and receiver inactivity timers
are added.
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 440 |
1 files changed, 370 insertions, 70 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 9354855b..e81bf105 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -34,6 +34,8 @@ #include <ouroboros/utils.h> #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> +#include <ouroboros/timerwheel.h> +#include <ouroboros/frct_pci.h> #include <stdlib.h> #include <string.h> @@ -41,8 +43,14 @@ #define BUF_SIZE 1500 +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +#define MPL 2000 /* ms */ + struct flow_set { size_t idx; + bool np1_set; }; struct fqueue { @@ -59,6 +67,26 @@ enum port_state { PORT_DESTROY }; +struct frcti { + bool used; + + struct tw_f * snd_inact; + bool snd_drf; + uint64_t snd_lwe; + uint64_t snd_rwe; + + struct tw_f * rcv_inact; + bool rcv_drf; + uint64_t rcv_lwe; + uint64_t rcv_rwe; + + bool resource_control; + bool reliable; + bool error_check; + bool ordered; + bool partial; +}; + struct port { int fd; @@ -89,10 +117,14 @@ struct { struct shm_rdrbuff * rdrb; struct shm_flow_set * fqset; + struct timerwheel * tw; + int tw_users; + struct bmp * fds; struct bmp * fqueues; struct flow * flows; struct port * ports; + struct frcti * frcti; pthread_rwlock_t lock; } ai; @@ -203,6 +235,242 @@ static int api_announce(char * ap_name) return ret; } +/* Call under flows lock */ +static int finalize_write(int fd, + size_t idx) +{ + if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) + return -ENOTALLOC; + + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + return 0; +} + +static int frcti_init(int fd) +{ + ai.frcti[fd].used = true; + + ai.frcti[fd].snd_drf = true; + ai.frcti[fd].snd_lwe = 0; + ai.frcti[fd].snd_rwe = 0; + + ai.frcti[fd].rcv_drf = true; + ai.frcti[fd].rcv_lwe = 0; + ai.frcti[fd].rcv_rwe = 0; + + return 0; +} + +static void frcti_fini(int fd) +{ + struct frcti * frcti; + + frcti = &(ai.frcti[fd]); + + frcti->used = false; + + /* FIXME: We actually need to wait until these timers become NULL. */ + if (frcti->snd_inact != NULL) + timerwheel_stop(ai.tw, frcti->snd_inact); + + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); +} + +static int frcti_configure(int fd, + qosspec_t * qos) +{ + /* FIXME: Send configuration message here to other side. */ + + (void) fd; + (void) qos; + + return 0; +} + +static void frcti_snd_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->snd_drf = true; + frcti->snd_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +/* Called under flows lock */ +static int frcti_write(int fd, + struct shm_du_buff * sdb) +{ + struct frcti * frcti; + struct frct_pci pci; + + memset(&pci, 0, sizeof(pci)); + + frcti = &(ai.frcti[fd]); + + /* + * Set the DRF in the first packet of a new run of SDUs, + * otherwise simply recharge the timer. + */ + if (frcti->snd_drf) { + frcti->snd_inact = timerwheel_start(ai.tw, frcti_snd_inactivity, + frcti, 2 * MPL); + if (frcti->snd_inact == NULL) + return -1; + + pci.flags |= FLAG_DATA_RUN; + frcti->snd_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->snd_inact, 2 * MPL)) + return -1; + } + + pci.seqno = frcti->snd_lwe++; + pci.type |= PDU_TYPE_DATA; + + if (frct_pci_ser(sdb, &pci, frcti->error_check)) + return -1; + + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) + return -ENOTALLOC; + + return 0; +} + +static void frcti_rcv_inactivity(void * arg) +{ + struct frcti * frcti; + + pthread_rwlock_wrlock(&ai.lock); + + frcti = (struct frcti * ) arg; + + frcti->rcv_drf = true; + frcti->rcv_inact = NULL; + + pthread_rwlock_unlock(&ai.lock); +} + +static ssize_t frcti_read(int fd) +{ + ssize_t idx = -1; + struct timespec abstime; + struct frcti * frcti; + struct frct_pci pci; + struct shm_du_buff * sdb; + + pthread_rwlock_rdlock(&ai.lock); + + if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + pthread_rwlock_unlock(&ai.lock); + } else { + struct shm_rbuff * rb = ai.flows[fd].rx_rb; + bool timeo = ai.flows[fd].timesout; + struct timespec timeout = ai.flows[fd].rcv_timeo; + + pthread_rwlock_unlock(&ai.lock); + + if (timeo) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, &timeout, &abstime); + idx = shm_rbuff_read_b(rb, &abstime); + } else { + idx = shm_rbuff_read_b(rb, NULL); + } + } + + if (idx < 0) + return idx; + + pthread_rwlock_rdlock(&ai.lock); + + frcti = &(ai.frcti[fd]); + + sdb = shm_rdrbuff_get(ai.rdrb, idx); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, frcti->error_check)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* We don't accept packets when there is no inactivity timer. */ + if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + /* + * If there is an inactivity timer and the DRF is set, + * reset the state of the connection. + */ + if (pci.flags & FLAG_DATA_RUN) { + frcti->rcv_drf = true; + if (frcti->rcv_inact != NULL) + timerwheel_stop(ai.tw, frcti->rcv_inact); + frcti->rcv_lwe = pci.seqno; + } + + /* + * Start receiver inactivity if this packet has the DRF, + * otherwise simply restart it. + */ + if (frcti->rcv_drf) { + frcti->rcv_inact = timerwheel_start(ai.tw, frcti_rcv_inactivity, + frcti, 3 * MPL); + if (frcti->rcv_inact == NULL) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + + frcti->rcv_drf = false; + } else { + if (timerwheel_restart(ai.tw, frcti->rcv_inact, 3 * MPL)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } + + pthread_rwlock_unlock(&ai.lock); + + return idx; +} + +static int frcti_event_wait(struct flow_set * set, + struct fqueue * fq, + const struct timespec * timeout) +{ + int ret; + + assert(set); + assert(fq); + assert(timeout); + + /* + * FIXME: Return the fq only if a data SDU + * for the application is available. + */ + + ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (ret == -ETIMEDOUT) { + fq->fqsize = 0; + return -ETIMEDOUT; + } + + return ret; +} + static void flow_clear(int fd) { assert(!(fd < 0)); @@ -230,6 +498,9 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); + if (ai.frcti[fd].used) + frcti_fini(fd); + flow_clear(fd); } @@ -316,8 +587,14 @@ int ouroboros_init(const char * ap_name) if (ai.flows == NULL) goto fail_flows; - for (i = 0; i < AP_MAX_FLOWS; ++i) + ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); + if (ai.frcti == NULL) + goto fail_frcti; + + for (i = 0; i < AP_MAX_FLOWS; ++i) { flow_clear(i); + frcti_fini(i); + } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); if (ai.ports == NULL) @@ -353,8 +630,15 @@ int ouroboros_init(const char * ap_name) if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; + ai.tw = timerwheel_create(TW_RESOLUTION, + TW_RESOLUTION * TW_ELEMENTS); + if (ai.tw == NULL) + goto fail_timerwheel; + return 0; + fail_timerwheel: + pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < IRMD_MAX_FLOWS; ++i) pthread_cond_destroy(&ai.ports[i].state_cond); @@ -366,11 +650,13 @@ int ouroboros_init(const char * ap_name) fail_ap_name: free(ai.ports); fail_ports: + free(ai.frcti); + fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); fail_rdrb: - shm_flow_set_destroy(ai.fqset); + shm_flow_set_destroy(ai.fqset); fail_fqset: bmp_destroy(ai.fqueues); fail_fqueues: @@ -409,6 +695,9 @@ void ouroboros_fini() shm_rdrbuff_close(ai.rdrb); + if (ai.tw != NULL) + timerwheel_destroy(ai.tw); + free(ai.flows); free(ai.ports); @@ -463,9 +752,15 @@ int flow_accept(qosspec_t * qs, if (fd < 0) return fd; + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + if (qs != NULL) *qs = ai.flows[fd].spec; + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -505,7 +800,7 @@ int flow_alloc(const char * dst_name, return -EIRMD; } - if (recv_msg->result != 0) { + if (recv_msg->result != 0) { int res = recv_msg->result; irm_msg__free_unpacked(recv_msg, NULL); return res; @@ -520,6 +815,22 @@ int flow_alloc(const char * dst_name, irm_msg__free_unpacked(recv_msg, NULL); + if (fd < 0) + return fd; + + pthread_rwlock_wrlock(&ai.lock); + + frcti_init(fd); + + if (frcti_configure(fd, qs)) { + flow_fini(fd); + bmp_release(ai.fds, fd); + pthread_rwlock_unlock(&ai.lock); + return -1; + } + + pthread_rwlock_unlock(&ai.lock); + return fd; } @@ -720,34 +1031,31 @@ ssize_t flow_write(int fd, return idx; } - if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } } else { /* blocking */ - struct shm_rdrbuff * rdrb = ai.rdrb; - struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb; - pthread_rwlock_unlock(&ai.lock); - assert(tx_rb); - - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (shm_rbuff_write(tx_rb, idx) < 0) { - shm_rdrbuff_remove(rdrb, idx); - return -ENOTALLOC; - } - pthread_rwlock_rdlock(&ai.lock); } - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, idx)) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx))) { + pthread_rwlock_unlock(&ai.lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -772,21 +1080,12 @@ ssize_t flow_read(int fd, return -ENOTALLOC; } - if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) { - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - pthread_rwlock_unlock(&ai.lock); - } else { - struct shm_rbuff * rb = ai.flows[fd].rx_rb; - bool timeo = ai.flows[fd].timesout; - struct timespec timeout = ai.flows[fd].rcv_timeo; - - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_unlock(&ai.lock); - if (timeo) - idx = shm_rbuff_read_b(rb, &timeout); - else - idx = shm_rbuff_read_b(rb, NULL); - } + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); if (idx < 0) { assert(idx == -EAGAIN || idx == -ETIMEDOUT); @@ -823,6 +1122,8 @@ struct flow_set * flow_set_create() return NULL; } + set->np1_set = false; + pthread_rwlock_unlock(&ai.lock); return set; @@ -891,6 +1192,9 @@ int flow_set_add(struct flow_set * set, for (i = 0; i < sdus; i++) shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id); + if (ai.frcti[fd].used) + set->np1_set = true; + pthread_rwlock_unlock(&ai.lock); return ret; @@ -960,7 +1264,8 @@ int flow_event_wait(struct flow_set * set, struct fqueue * fq, const struct timespec * timeout) { - ssize_t ret; + ssize_t ret; + struct timespec abstime; if (set == NULL || fq == NULL) return -EINVAL; @@ -970,7 +1275,17 @@ int flow_event_wait(struct flow_set * set, assert(!fq->next); - ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, timeout); + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + if (set->np1_set) + ret = frcti_event_wait(set, fq, &abstime); + else + ret = shm_flow_set_wait(ai.fqset, set->idx, + fq->fqueue, &abstime); + if (ret == -ETIMEDOUT) { fq->fqsize = 0; return -ETIMEDOUT; @@ -1132,9 +1447,8 @@ int ipcp_flow_read(int fd, { ssize_t idx = -1; int port_id = -1; - struct shm_rbuff * rb; - assert(fd >=0); + assert(fd >= 0); assert(sdb); pthread_rwlock_rdlock(&ai.lock); @@ -1144,11 +1458,13 @@ int ipcp_flow_read(int fd, return -ENOTALLOC; } - rb = ai.flows[fd].rx_rb; - pthread_rwlock_unlock(&ai.lock); - idx = shm_rbuff_read(rb); + if (!ai.frcti[fd].used) + idx = shm_rbuff_read(ai.flows[fd].rx_rb); + else + idx = frcti_read(fd); + if (idx < 0) return idx; @@ -1160,8 +1476,6 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - size_t idx; - if (sdb == NULL) return -EINVAL; @@ -1179,10 +1493,17 @@ int ipcp_flow_write(int fd, assert(ai.flows[fd].tx_rb); - idx = shm_du_buff_get_idx(sdb); - - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + if (!ai.frcti[fd].used) { + if (finalize_write(fd, shm_du_buff_get_idx(sdb))) { + pthread_rwlock_unlock(&ai.lock); + return -ENOTALLOC; + } + } else { + if (frcti_write(fd, sdb)) { + pthread_rwlock_unlock(&ai.lock); + return -1; + } + } pthread_rwlock_unlock(&ai.lock); @@ -1274,32 +1595,11 @@ int local_flow_write(int fd, return -ENOTALLOC; } - shm_rbuff_write(ai.flows[fd].tx_rb, idx); - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - pthread_rwlock_unlock(&ai.lock); - - return 0; -} - -int ipcp_read_shim(int fd, - struct shm_du_buff ** sdb) -{ - ssize_t idx; - - pthread_rwlock_rdlock(&ai.lock); - - assert(ai.flows[fd].rx_rb); - - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - if (idx < 0) { + if (finalize_write(fd, idx)) { pthread_rwlock_unlock(&ai.lock); - return -EAGAIN; + return -ENOTALLOC; } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); - pthread_rwlock_unlock(&ai.lock); return 0; |