diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib/dev.c | 719 | ||||
-rw-r--r-- | src/lib/frct.c | 320 | ||||
-rw-r--r-- | src/lib/frct_pci.c | 63 | ||||
-rw-r--r-- | src/lib/frct_pci.h | 67 | ||||
-rw-r--r-- | src/lib/rq.c | 8 | ||||
-rw-r--r-- | src/lib/rq.h | 47 | ||||
-rw-r--r-- | src/lib/tests/rq_test.c | 2 |
7 files changed, 683 insertions, 543 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 28a99bc4..ff22cca6 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -38,22 +38,19 @@ #include <ouroboros/fqueue.h> #include <ouroboros/qoscube.h> #include <ouroboros/timerwheel.h> -#include <ouroboros/frct_pci.h> -#include <ouroboros/rq.h> + +#include "frct_pci.h" +#include "rq.h" #include <stdlib.h> #include <string.h> #include <stdio.h> #include <stdarg.h> +#include <stdbool.h> +#include <sys/types.h> #define BUF_SIZE 1500 -#define TW_ELEMENTS 6000 -#define TW_RESOLUTION 1 /* ms */ - -#define MPL 2000 /* ms */ -#define RQ_SIZE 20 - #ifndef CLOCK_REALTIME_COARSE #define CLOCK_REALTIME_COARSE CLOCK_REALTIME #endif @@ -76,26 +73,6 @@ enum port_state { PORT_DESTROY }; -struct frcti { - bool used; - - struct timespec last_snd; - bool snd_drf; - uint64_t snd_lwe; - uint64_t snd_rwe; - - struct timespec last_rcv; - bool rcv_drf; - uint64_t rcv_lwe; - uint64_t rcv_rwe; - - uint16_t conf_flags; - - struct rq * rq; - - pthread_rwlock_t lock; -}; - struct port { int fd; @@ -119,6 +96,8 @@ struct flow { bool rcv_timesout; struct timespec snd_timeo; struct timespec rcv_timeo; + + struct frcti * frcti; }; struct { @@ -132,13 +111,15 @@ struct { struct bmp * fds; struct bmp * fqueues; + struct flow * flows; struct port * ports; - struct frcti * frcti; pthread_rwlock_t lock; } ai; +#include "frct.c" + static void port_destroy(struct port * p) { pthread_mutex_lock(&p->state_lock); @@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id) enum port_state state; struct port * p; - pthread_rwlock_rdlock(&ai.lock); - p = &ai.ports[port_id]; - pthread_rwlock_unlock(&ai.lock); - pthread_mutex_lock(&p->state_lock); if (p->state == PORT_ID_ASSIGNED) { @@ -245,275 +222,8 @@ static int api_announce(char * ap_name) return ret; } -/* Call under flows lock. */ -static int finalize_write(int fd, - size_t idx) -{ - int ret; - - ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); - if (ret < 0) - return ret; - - shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - - return ret; -} - -static int frcti_init(int fd) -{ - struct frcti * frcti; - - frcti = &(ai.frcti[fd]); - - frcti->used = true; - - frcti->snd_drf = true; - frcti->snd_lwe = 0; - frcti->snd_rwe = 0; - - frcti->rcv_drf = true; - frcti->rcv_lwe = 0; - frcti->rcv_rwe = 0; - - frcti->conf_flags = 0; - - frcti->rq = rq_create(RQ_SIZE); - if (frcti->rq == NULL) - return -1; - - return 0; -} - -static void frcti_clear(int fd) -{ - ai.frcti[fd].used = false; -} - -static void frcti_fini(int fd) -{ - /* - * FIXME: In case of reliable transmission we should - * make sure everything is acked. - */ - - frcti_clear(fd); - - rq_destroy(ai.frcti[fd].rq); -} - -static int frcti_send(int fd, - struct frct_pci * pci, - struct shm_du_buff * sdb) -{ - struct timespec now = {0, 0}; - struct frcti * frcti; - int ret; - - frcti = &(ai.frcti[fd]); - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_wrlock(&frcti->lock); - - /* Check if sender inactivity is true. */ - if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL) - frcti->snd_drf = true; - - /* Set the DRF in the first packet of a new run of SDUs. */ - if (frcti->snd_drf) { - pci->flags |= FLAG_DATA_RUN; - frcti->snd_drf = false; - } - - frcti->last_snd = now; - - pci->seqno = frcti->snd_lwe++; - - if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - return -1; - } - - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&frcti->lock); - return ret; - } - - pthread_rwlock_unlock(&frcti->lock); - - return 0; -} - - -static int frcti_configure(int fd, - uint16_t flags) -{ - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - - frcti = &(ai.frcti[fd]); - - memset(&pci, 0, sizeof(pci)); - - if (ipcp_sdb_reserve(&sdb, 0)) - return -1; - - pci.conf_flags = flags; - - /* Always set the DRF on a configure message. */ - pci.flags |= FLAG_DATA_RUN; - pci.type |= PDU_TYPE_CONFIG; - - pthread_rwlock_wrlock(&frcti->lock); - - frcti->conf_flags = pci.conf_flags; - - pthread_rwlock_unlock(&frcti->lock); - - if (frcti_send(fd, &pci, sdb)) { - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); - return -1; - } - - return 0; -} - -static int frcti_write(int fd, - struct shm_du_buff * sdb) -{ - struct frct_pci pci; - - memset(&pci, 0, sizeof(pci)); - - pci.type |= PDU_TYPE_DATA; - - return frcti_send(fd, &pci, sdb); -} - -static ssize_t frcti_read(int fd) -{ - ssize_t idx; - struct frcti * frcti; - struct frct_pci pci; - struct shm_du_buff * sdb; - uint64_t seqno; - bool nxt_pdu = true; - - frcti = &(ai.frcti[fd]); - - /* See if we already have the next PDU */ - pthread_rwlock_wrlock(&frcti->lock); - - if (!rq_is_empty(frcti->rq)) { - seqno = rq_peek(frcti->rq); - if (seqno == frcti->rcv_lwe) { - frcti->rcv_lwe++; - idx = rq_pop(frcti->rq); - pthread_rwlock_unlock(&frcti->lock); - return idx; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - do { - struct timespec now; - struct timespec abs; - struct timespec * abstime = NULL; - struct shm_rbuff * rb; - bool noblock; - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_rwlock_rdlock(&ai.lock); - - noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK; - rb = ai.flows[fd].rx_rb; - - if (ai.flows[fd].rcv_timesout) { - ts_add(&now, &ai.flows[fd].rcv_timeo, &abs); - abstime = &abs; - } - - pthread_rwlock_unlock(&ai.lock); - - if (noblock) { - idx = shm_rbuff_read(rb); - } else { - idx = shm_rbuff_read_b(rb, abstime); - clock_gettime(CLOCK_REALTIME_COARSE, &now); - } - - if (idx < 0) - return idx; - - sdb = shm_rdrbuff_get(ai.rdrb, idx); - - pthread_rwlock_wrlock(&frcti->lock); - - /* SDU may be corrupted. */ - if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) { - pthread_rwlock_unlock(&frcti->lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return -EAGAIN; - } - - /* Check if receiver inactivity is true. */ - if (!frcti->rcv_drf && - ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL) - frcti->rcv_drf = true; - - /* When there is receiver inactivity queue the packet. */ - if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - pthread_rwlock_unlock(&frcti->lock); - return -EAGAIN; - } - - /* If the DRF is set, reset the state of the connection. */ - if (pci.flags & FLAG_DATA_RUN) - frcti->rcv_lwe = pci.seqno; - - if (pci.type & PDU_TYPE_CONFIG) - frcti->conf_flags = pci.conf_flags; - - if (frcti->rcv_drf) - frcti->rcv_drf = false; - - frcti->last_rcv = now; - - nxt_pdu = true; - - if (!(pci.type & PDU_TYPE_DATA)) { - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } - - if (frcti->conf_flags & FRCTFORDERING) { - if (pci.seqno != frcti->rcv_lwe) { - if (rq_push(frcti->rq, pci.seqno, idx)) - shm_rdrbuff_remove(ai.rdrb, idx); - nxt_pdu = false; - } else { - frcti->rcv_lwe++; - } - } - - pthread_rwlock_unlock(&frcti->lock); - - } while (!nxt_pdu); - - return idx; -} - static void flow_clear(int fd) { - assert(!(fd < 0)); - memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); ai.flows[fd].port_id = -1; @@ -525,8 +235,10 @@ static void flow_fini(int fd) { assert(!(fd < 0)); - if (ai.flows[fd].port_id != -1) + if (ai.flows[fd].port_id != -1) { port_destroy(&ai.ports[ai.flows[fd].port_id]); + bmp_release(ai.fds, fd); + } if (ai.flows[fd].rx_rb != NULL) shm_rbuff_close(ai.flows[fd].rx_rb); @@ -537,8 +249,8 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) shm_flow_set_close(ai.flows[fd].set); - if (ai.frcti[fd].used) - frcti_fini(fd); + if (ai.flows[fd].frcti != NULL) + frcti_destroy(ai.flows[fd].frcti); flow_clear(fd); } @@ -548,37 +260,27 @@ static int flow_init(int port_id, qoscube_t qc) { int fd; + int err = -ENOMEM; pthread_rwlock_wrlock(&ai.lock); fd = bmp_allocate(ai.fds); if (!bmp_is_id_valid(ai.fds, fd)) { - pthread_rwlock_unlock(&ai.lock); - return -EBADF; + err = -EBADF; + goto fail_fds; } ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); - if (ai.flows[fd].rx_rb == NULL) { - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].rx_rb == NULL) + goto fail; ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id); - if (ai.flows[fd].tx_rb == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].tx_rb == NULL) + goto fail; ai.flows[fd].set = shm_flow_set_open(api); - if (ai.flows[fd].set == NULL) { - flow_fini(fd); - bmp_release(ai.fds, fd); - pthread_rwlock_unlock(&ai.lock); - return -ENOMEM; - } + if (ai.flows[fd].set == NULL) + goto fail; ai.flows[fd].port_id = port_id; ai.flows[fd].oflags = FLOWFDEFAULT; @@ -593,6 +295,12 @@ static int flow_init(int port_id, pthread_rwlock_unlock(&ai.lock); return fd; + + fail: + flow_fini(fd); + fail_fds: + pthread_rwlock_unlock(&ai.lock); + return err; } static bool check_python(char * str) @@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc, { const char * ap_name = argv[0]; int i; - int j; (void) argc; (void) envp; @@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc, if (ai.flows == NULL) goto fail_flows; - ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS); - if (ai.frcti == NULL) - goto fail_frcti; - - for (i = 0; i < AP_MAX_FLOWS; ++i) { + for (i = 0; i < AP_MAX_FLOWS; ++i) flow_clear(i); - frcti_clear(i); - - if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) { - for (j = i - 1; j >= 0 ; j--) - pthread_rwlock_destroy(&ai.frcti[j].lock); - goto fail_frct_lock; - } - } ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS); if (ai.ports == NULL) @@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc, if (pthread_rwlock_init(&ai.lock, NULL)) goto fail_lock; - ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); - if (ai.tw == NULL) - goto fail_timerwheel; + if (frct_init()) + goto fail_frct; return; - fail_timerwheel: + fail_frct: pthread_rwlock_destroy(&ai.lock); fail_lock: for (i = 0; i < SYS_MAX_FLOWS; ++i) @@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc, fail_ap_name: free(ai.ports); fail_ports: - for (i = 0; i < AP_MAX_FLOWS; ++i) - pthread_rwlock_destroy(&ai.frcti[i].lock); - fail_frct_lock: - free(ai.frcti); - fail_frcti: free(ai.flows); fail_flows: shm_rdrbuff_close(ai.rdrb); @@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void) if (ai.fds == NULL) return; - bmp_destroy(ai.fds); - bmp_destroy(ai.fqueues); + frct_fini(); shm_flow_set_destroy(ai.fqset); if (ai.ap_name != NULL) free(ai.ap_name); - pthread_rwlock_rdlock(&ai.lock); + pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < AP_MAX_FLOWS; ++i) { if (ai.flows[i].port_id != -1) { @@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void) shm_rdrbuff_remove(ai.rdrb, idx); flow_fini(i); } - - pthread_rwlock_destroy(&ai.frcti[i].lock); } for (i = 0; i < SYS_MAX_FLOWS; ++i) { @@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void) free(ai.flows); free(ai.ports); - free(ai.frcti); + + bmp_destroy(ai.fds); + bmp_destroy(ai.fqueues); pthread_rwlock_unlock(&ai.lock); @@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } if (qs != NULL) *qs = ai.flows[fd].spec; @@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name, pthread_rwlock_wrlock(&ai.lock); - frcti_init(fd); + /* FIXME: check if FRCT is needed based on qc? */ + assert(ai.flows[fd].frcti == NULL); + + ai.flows[fd].frcti = frcti_create(fd); + if (ai.flows[fd].frcti == NULL) { + flow_fini(fd); + pthread_rwlock_unlock(&ai.lock); + return -ENOMEM; + } pthread_rwlock_unlock(&ai.lock); @@ -913,7 +618,7 @@ int flow_dealloc(int fd) pthread_rwlock_rdlock(&ai.lock); - assert(!(ai.flows[fd].port_id < 0)); + assert(ai.flows[fd].port_id >= 0); msg.port_id = ai.flows[fd].port_id; @@ -933,7 +638,6 @@ int flow_dealloc(int fd) pthread_rwlock_wrlock(&ai.lock); flow_fini(fd); - bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.lock); @@ -944,6 +648,7 @@ int fccntl(int fd, int cmd, ...) { + uint16_t sflags; uint32_t * fflags; uint16_t * cflags; va_list l; @@ -951,15 +656,18 @@ int fccntl(int fd, qosspec_t * qs; uint32_t rx_acl; uint32_t tx_acl; + struct flow * flow; if (fd < 0 || fd >= AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + va_start(l, cmd); pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -969,57 +677,57 @@ int fccntl(int fd, case FLOWSSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].snd_timesout = false; + flow->snd_timesout = false; } else { - ai.flows[fd].snd_timesout = true; - ai.flows[fd].snd_timeo = *timeo; + flow->snd_timesout = true; + flow->snd_timeo = *timeo; } break; case FLOWGSNDTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].snd_timesout) + if (!flow->snd_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWSRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) { - ai.flows[fd].rcv_timesout = false; + flow->rcv_timesout = false; } else { - ai.flows[fd].rcv_timesout = true; - ai.flows[fd].rcv_timeo = *timeo; + flow->rcv_timesout = true; + flow->rcv_timeo = *timeo; } break; case FLOWGRCVTIMEO: timeo = va_arg(l, struct timespec *); if (timeo == NULL) goto einval; - if (!ai.flows[fd].rcv_timesout) + if (!flow->rcv_timesout) goto eperm; - *timeo = ai.flows[fd].snd_timeo; + *timeo = flow->snd_timeo; break; case FLOWGQOSSPEC: qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = ai.flows[fd].spec; + *qs = flow->spec; break; case FLOWSFLAGS: - ai.flows[fd].oflags = va_arg(l, uint32_t); - rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); - tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb); + flow->oflags = va_arg(l, uint32_t); + rx_acl = shm_rbuff_get_acl(flow->rx_rb); + tx_acl = shm_rbuff_get_acl(flow->rx_rb); /* * Making our own flow write only means making the * the other side of the flow read only. */ - if (ai.flows[fd].oflags & FLOWFWRONLY) + if (flow->oflags & FLOWFWRONLY) rx_acl |= ACL_RDONLY; - if (ai.flows[fd].oflags & FLOWFRDWR) + if (flow->oflags & FLOWFRDWR) rx_acl |= ACL_RDWR; - if (ai.flows[fd].oflags & FLOWFDOWN) { + if (flow->oflags & FLOWFDOWN) { rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; } else { @@ -1027,26 +735,28 @@ int fccntl(int fd, tx_acl &= ~ACL_FLOWDOWN; } - shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl); - shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl); + shm_rbuff_set_acl(flow->rx_rb, rx_acl); + shm_rbuff_set_acl(flow->tx_rb, tx_acl); break; case FLOWGFLAGS: fflags = va_arg(l, uint32_t *); if (fflags == NULL) goto einval; - *fflags = ai.flows[fd].oflags; + *fflags = flow->oflags; break; case FRCTSFLAGS: - ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int); + sflags = (uint16_t) va_arg(l, int); + if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags)) + goto eperm; break; case FRCTGFLAGS: cflags = (uint16_t *) va_arg(l, int *); if (cflags == NULL) goto einval; - *cflags = ai.frcti[fd].conf_flags; - if (frcti_configure(fd, ai.frcti[fd].conf_flags)) + if (flow->frcti == NULL) goto eperm; + *cflags = frcti_getconf(flow->frcti); break; default: pthread_rwlock_unlock(&ai.lock); @@ -1075,8 +785,10 @@ ssize_t flow_write(int fd, const void * buf, size_t count) { - ssize_t idx; - int ret; + struct flow * flow; + ssize_t idx; + int ret; + int flags; if (buf == NULL) return 0; @@ -1084,104 +796,110 @@ ssize_t flow_write(int fd, if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { - pthread_rwlock_unlock(&ai.lock); - return -EPERM; - } + flags = flow->oflags; - if (ai.flows[fd].oflags & FLOWFWNOBLOCK) { - idx = shm_rdrbuff_write(ai.rdrb, - DU_BUFF_HEADSPACE, - DU_BUFF_TAILSPACE, - buf, - count); - if (idx < 0) { - pthread_rwlock_unlock(&ai.lock); - return idx; - } + pthread_rwlock_unlock(&ai.lock); - } else { /* Blocking. */ - pthread_rwlock_unlock(&ai.lock); + if ((flags & FLOWFACCMODE) == FLOWFRDONLY) + return -EPERM; + if (flags & FLOWFWNOBLOCK) + idx = shm_rdrbuff_write(ai.rdrb, + DU_BUFF_HEADSPACE, + DU_BUFF_TAILSPACE, + buf, + count); + else /* Blocking. */ idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, buf, count); - if (idx < 0) - return idx; + if (idx < 0) + return idx; - pthread_rwlock_rdlock(&ai.lock); + if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) { + shm_rdrbuff_remove(ai.rdrb, idx); + return -ENOMEM; } - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } + pthread_rwlock_rdlock(&ai.lock); - pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret < 0) + shm_rdrbuff_remove(ai.rdrb, idx); + else + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); - ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx)); - if (ret < 0) { - shm_rdrbuff_remove(ai.rdrb, idx); - return ret; - } - } + pthread_rwlock_unlock(&ai.lock); - return 0; + assert(ret <= 0); + + return ret; } ssize_t flow_read(int fd, void * buf, size_t count) { - ssize_t idx; - ssize_t n; - uint8_t * sdu; - bool used; - struct shm_rbuff * rb; + ssize_t idx; + ssize_t n; + uint8_t * sdu; + struct shm_rbuff * rb; + struct shm_du_buff * sdb; + struct timespec now; + struct timespec abs; + struct timespec * abstime = NULL; + struct flow * flow; + bool noblock; if (fd < 0 || fd > AP_MAX_FLOWS) return -EBADF; + flow = &ai.flows[fd]; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (flow->port_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - used = ai.frcti[fd].used; - rb = ai.flows[fd].rx_rb; + rb = flow->rx_rb; + noblock = flow->oflags & FLOWFRNOBLOCK; - pthread_rwlock_unlock(&ai.lock); + if (ai.flows[fd].rcv_timesout) { + ts_add(&now, &flow->rcv_timeo, &abs); + abstime = &abs; + } - if (!used) - idx = shm_rbuff_read(rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); + idx = frcti_queued_pdu(flow->frcti); if (idx < 0) { - assert(idx == -EAGAIN || idx == -ETIMEDOUT || - idx == -EFLOWDOWN); - return idx; + do { + idx = noblock ? shm_rbuff_read(rb) : + shm_rbuff_read_b(rb, abstime); + if (idx < 0) + return idx; + sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, sdb) != 0); } n = shm_rdrbuff_read(&sdu, ai.rdrb, idx); - if (n < 0) - return -1; + + assert(n >= 0); memcpy(buf, sdu, MIN((size_t) n, count)); @@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd, if (recv_msg == NULL) return -EIRMD; - if (recv_msg->has_result == false) { + if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd, int ipcp_flow_read(int fd, struct shm_du_buff ** sdb) { - ssize_t idx = -1; - int port_id = -1; + struct flow * flow; + struct shm_rbuff * rb; + ssize_t idx; assert(fd >= 0); assert(sdb); + flow = &ai.flows[fd]; + pthread_rwlock_rdlock(&ai.lock); - if ((port_id = ai.flows[fd].port_id) < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - pthread_rwlock_unlock(&ai.lock); + rb = flow->rx_rb; - if (!ai.frcti[fd].used) - idx = shm_rbuff_read(ai.flows[fd].rx_rb); - else - idx = frcti_read(fd); + pthread_rwlock_unlock(&ai.lock); - if (idx < 0) - return idx; + if (flow->frcti != NULL) { + idx = frcti_queued_pdu(flow->frcti); + if (idx >= 0) { + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + return 0; + } + } - *sdb = shm_rdrbuff_get(ai.rdrb, idx); + do { + idx = shm_rbuff_read(rb); + if (idx < 0) + return idx; + *sdb = shm_rdrbuff_get(ai.rdrb, idx); + } while (frcti_rcv(flow->frcti, *sdb) != 0); return 0; } @@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd, int ipcp_flow_write(int fd, struct shm_du_buff * sdb) { - int ret; + struct flow * flow; + int ret; + ssize_t idx; - if (sdb == NULL) - return -EINVAL; + assert(sdb); + + flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(flow->port_id >= 0); - if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) { + if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); return -EPERM; } - assert(ai.flows[fd].tx_rb); + assert(flow->tx_rb); - if (!ai.frcti[fd].used) { - ret = finalize_write(fd, shm_du_buff_get_idx(sdb)); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + idx = shm_du_buff_get_idx(sdb); + if (frcti_snd(flow->frcti, sdb) < 0) { pthread_rwlock_unlock(&ai.lock); - } else { - pthread_rwlock_unlock(&ai.lock); - - ret = frcti_write(fd, sdb); - if (ret < 0) - return ret; + return -ENOMEM; } - return 0; + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); + + pthread_rwlock_unlock(&ai.lock); + + assert(ret <= 0); + + return ret; } int ipcp_sdb_reserve(struct shm_du_buff ** sdb, size_t len) { - struct shm_rdrbuff * rdrb; - ssize_t idx; - - rdrb = ai.rdrb; + ssize_t idx; - idx = shm_rdrbuff_write_b(rdrb, + idx = shm_rdrbuff_write_b(ai.rdrb, DU_BUFF_HEADSPACE, DU_BUFF_TAILSPACE, NULL, @@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb, if (idx < 0) return -1; - *sdb = shm_rdrbuff_get(rdrb, idx); + *sdb = shm_rdrbuff_get(ai.rdrb, idx); return 0; } +void ipcp_sdb_release(struct shm_du_buff * sdb) +{ + shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); +} + void ipcp_flow_fini(int fd) { struct shm_rbuff * rx_rb; + assert(fd >= 0); + fccntl(fd, FLOWSFLAGS, FLOWFWRONLY); pthread_rwlock_rdlock(&ai.lock); @@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd) int ipcp_flow_get_qoscube(int fd, qoscube_t * cube) { - if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL) - return -EINVAL; + assert(fd >= 0); + assert(cube); - pthread_rwlock_wrlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); - return -ENOTALLOC; - } + assert(ai.flows[fd].port_id >= 0); *cube = ai.flows[fd].cube; @@ -1670,28 +1395,20 @@ int local_flow_write(int fd, { int ret; - if (fd < 0) - return -EINVAL; + assert(fd >= 0); pthread_rwlock_rdlock(&ai.lock); if (ai.flows[fd].port_id < 0) { - pthread_rwlock_unlock(&ai.lock); + pthread_rwlock_rdlock(&ai.lock); return -ENOTALLOC; } - ret = finalize_write(fd, idx); - if (ret < 0) { - pthread_rwlock_unlock(&ai.lock); - return ret; - } + ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx); + if (ret == 0) + shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id); pthread_rwlock_unlock(&ai.lock); - return 0; -} - -void ipcp_sdb_release(struct shm_du_buff * sdb) -{ - shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb)); + return ret; } diff --git a/src/lib/frct.c b/src/lib/frct.c new file mode 100644 index 00000000..abebb2ff --- /dev/null +++ b/src/lib/frct.c @@ -0,0 +1,320 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Flow and Retransmission Control + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +/* Default Delta-t parameters */ +#define DELT_MPL 60000 /* ms */ +#define DELT_A 0 /* ms */ +#define DELT_R 2000 /* ms */ + +#define RQ_SIZE 20 + +#define TW_ELEMENTS 6000 +#define TW_RESOLUTION 1 /* ms */ + +struct frct_cr { + bool drf; + uint64_t lwe; + uint64_t rwe; + + bool conf; + uint16_t cflags; + + time_t act; + time_t inact; +}; + +struct frcti { + int fd; + + time_t mpl; + time_t a; + time_t r; + + struct frct_cr snd_cr; + struct frct_cr rcv_cr; + + struct rq * rq; + + struct timespec rtt; + + pthread_rwlock_t lock; +}; + +struct { + struct timerwheel * tw; +} frct; + +static int frct_init(void) +{ + frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS); + if (frct.tw == NULL) + return -1; + + return 0; +} + +static void frct_fini(void) +{ + assert(frct.tw); + + timerwheel_destroy(frct.tw); +} + +static struct frcti * frcti_create(int fd) +{ + struct frcti * frcti; + time_t delta_t; + + frcti = malloc(sizeof(*frcti)); + if (frcti == NULL) + goto fail_malloc; + + if (pthread_rwlock_init(&frcti->lock, NULL)) + goto fail_lock; + + frcti->rq = rq_create(RQ_SIZE); + if (frcti->rq == NULL) + goto fail_rq; + + frcti->mpl = DELT_MPL; + frcti->a = DELT_A; + frcti->r = DELT_R; + frcti->fd = fd; + + delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000; + + frcti->snd_cr.drf = true; + frcti->snd_cr.conf = true; + frcti->snd_cr.lwe = 0; + frcti->snd_cr.rwe = 0; + frcti->snd_cr.cflags = 0; + frcti->snd_cr.inact = 2 * delta_t + 1; + + frcti->rcv_cr.drf = true; + frcti->rcv_cr.lwe = 0; + frcti->rcv_cr.rwe = 0; + frcti->rcv_cr.cflags = 0; + frcti->rcv_cr.inact = 3 * delta_t + 1; + + return frcti; + + fail_rq: + pthread_rwlock_destroy(&frcti->lock); + fail_lock: + free(frcti); + fail_malloc: + return NULL; +} + +static void frcti_destroy(struct frcti * frcti) +{ + /* + * FIXME: In case of reliable transmission we should + * make sure everything is acked. + */ + + pthread_rwlock_destroy(&frcti->lock); + + rq_destroy(frcti->rq); + free(frcti); +} + +static int frcti_setconf(struct frcti * frcti, + uint16_t flags) +{ + assert(frcti); + + pthread_rwlock_wrlock(&frcti->lock); + + if (frcti->snd_cr.cflags != flags) { + frcti->snd_cr.cflags = flags; + frcti->snd_cr.conf = true; + frcti->snd_cr.drf = true; + } + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} + +static uint16_t frcti_getconf(struct frcti * frcti) +{ + uint16_t ret; + + assert (frcti); + + pthread_rwlock_rdlock(&frcti->lock); + + ret = frcti->snd_cr.cflags; + + pthread_rwlock_unlock(&frcti->lock); + + return ret; +} + +#define frcti_queued_pdu(frcti) \ + (frcti == NULL ? -1 : __frcti_queued_pdu(frcti)) + +#define frcti_snd(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_snd(frcti, sdb)) + +#define frcti_rcv(frcti, sdb) \ + (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb)) + +static ssize_t __frcti_queued_pdu(struct frcti * frcti) +{ + ssize_t idx = -1; + + assert(frcti); + + /* See if we already have the next PDU. */ + pthread_rwlock_wrlock(&frcti->lock); + + if (!rq_is_empty(frcti->rq)) { + if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) { + ++frcti->rcv_cr.lwe; + idx = rq_pop(frcti->rq); + } + } + + pthread_rwlock_unlock(&frcti->lock); + + return idx; +} + +static int __frcti_snd(struct frcti * frcti, + struct shm_du_buff * sdb) +{ + struct frct_pci pci; + struct timespec now; + struct frct_cr * snd_cr; + + if (frcti == NULL) + return 0; + + snd_cr = &frcti->snd_cr; + + memset(&pci, 0, sizeof(pci)); + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pci.type |= PDU_TYPE_DATA; + + pthread_rwlock_wrlock(&frcti->lock); + + /* Check if sender is inactive. */ + if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact) + snd_cr->drf = true; + + /* Set the DRF in the first packet of a new run of SDUs. */ + if (snd_cr->drf) { + pci.flags |= FLAG_DATA_RUN; + if (snd_cr->conf) { + pci.type |= PDU_TYPE_CONFIG; + pci.cflags = snd_cr->cflags; + } + } + + pci.seqno = snd_cr->lwe++; + + if (frct_pci_ser(sdb, &pci, snd_cr->cflags & FRCTFERRCHCK)) { + pthread_rwlock_unlock(&frcti->lock); + return -1; + } + + snd_cr->act = now.tv_sec; + + snd_cr->drf = false; + snd_cr->conf = false; + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} + +/* Returns 0 when idx contains an SDU for the application. */ +static int __frcti_rcv(struct frcti * frcti, + struct shm_du_buff * sdb) +{ + ssize_t idx; + struct frct_pci pci; + struct timespec now; + struct frct_cr * rcv_cr; + + assert(frcti); + + rcv_cr = &frcti->rcv_cr; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_rwlock_wrlock(&frcti->lock); + + idx = shm_du_buff_get_idx(sdb); + + /* SDU may be corrupted. */ + if (frct_pci_des(sdb, &pci, rcv_cr->cflags & FRCTFERRCHCK)) { + pthread_rwlock_unlock(&frcti->lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } + + /* Check if receiver inactivity is true. */ + if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact) + rcv_cr->drf = true; + + /* When there is receiver inactivity and no DRF, drop the SDU. */ + if (rcv_cr->drf && !(pci.flags & FLAG_DATA_RUN)) { + pthread_rwlock_unlock(&frcti->lock); + shm_rdrbuff_remove(ai.rdrb, idx); + return -EAGAIN; + } + + /* If the DRF is set, reset the state of the connection. */ + if (pci.flags & FLAG_DATA_RUN) { + rcv_cr->lwe = pci.seqno; + if (pci.type & PDU_TYPE_CONFIG) + rcv_cr->cflags = pci.cflags; + } + + if (rcv_cr->drf) + rcv_cr->drf = false; + + rcv_cr->act = now.tv_sec; + + if (!(pci.type & PDU_TYPE_DATA)) + shm_rdrbuff_remove(ai.rdrb, idx); + + if (rcv_cr->cflags & FRCTFORDERING) { + if (pci.seqno != frcti->rcv_cr.lwe) { + if (rq_push(frcti->rq, pci.seqno, idx)) + shm_rdrbuff_remove(ai.rdrb, idx); + pthread_rwlock_unlock(&frcti->lock); + return -EAGAIN; + } else { + ++rcv_cr->lwe; + } + } + + pthread_rwlock_unlock(&frcti->lock); + + return 0; +} diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c index e44554f2..509cc8e2 100644 --- a/src/lib/frct_pci.c +++ b/src/lib/frct_pci.c @@ -20,29 +20,23 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include <ouroboros/frct_pci.h> #include <ouroboros/hash.h> #include <ouroboros/errno.h> +#include "frct_pci.h" + #include <assert.h> #include <string.h> -#define TYPE_SIZE 1 -#define SEQNO_SIZE 8 -#define FLAGS_SIZE 1 -#define CONF_FLAGS_SIZE sizeof(((struct frct_pci *) NULL)->conf_flags) -#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE -#define CONFIG_SIZE CONF_FLAGS_SIZE - -static size_t get_head_len(struct frct_pci * pci) -{ - size_t len = BASE_SIZE; +#define TYPE_SIZE 1 +#define FLAGS_SIZE 1 +#define SEQNO_SIZE 8 +#define CONF_FLAGS_SIZE 2 - if (pci->type & PDU_TYPE_CONFIG) - len += CONFIG_SIZE; +#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE - return len; -} +#define head_len(pci) (pci->type & PDU_TYPE_CONFIG ? \ + BASE_SIZE + CONF_FLAGS_SIZE : BASE_SIZE) int frct_pci_ser(struct shm_du_buff * sdb, struct frct_pci * pci, @@ -50,15 +44,12 @@ int frct_pci_ser(struct shm_du_buff * sdb, { uint8_t * head; uint8_t * tail; - size_t len; size_t offset = 0; assert(sdb); assert(pci); - len = get_head_len(pci); - - head = shm_du_buff_head_alloc(sdb, len); + head = shm_du_buff_head_alloc(sdb, head_len(pci)); if (head == NULL) return -EPERM; @@ -70,14 +61,14 @@ int frct_pci_ser(struct shm_du_buff * sdb, offset += SEQNO_SIZE; if (pci->type & PDU_TYPE_CONFIG) { - memcpy(head + offset, &pci->conf_flags, CONF_FLAGS_SIZE); + memcpy(head + offset, &pci->cflags, CONF_FLAGS_SIZE); /* offset += CONF_FLAGS_SIZE; */ } if (error_check) { tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32)); if (tail == NULL) { - shm_du_buff_head_release(sdb, len); + shm_du_buff_head_release(sdb, head_len(pci)); return -EPERM; } @@ -103,23 +94,8 @@ int frct_pci_des(struct shm_du_buff * sdb, head = shm_du_buff_head(sdb); - /* Depending on the type a different deserialization. */ - memcpy(&pci->type, head, TYPE_SIZE); - offset += TYPE_SIZE; - memcpy(&pci->flags, head + offset, FLAGS_SIZE); - offset += FLAGS_SIZE; - memcpy(&pci->seqno, head + offset, SEQNO_SIZE); - offset += SEQNO_SIZE; - - if (pci->type & PDU_TYPE_CONFIG) { - memcpy(&pci->conf_flags, head + offset, CONF_FLAGS_SIZE); - /* offset += CONF_FLAGS_SIZE; */ - } - if (error_check) { tail = shm_du_buff_tail(sdb); - if (tail == NULL) - return -EPERM; mem_hash(HASH_CRC32, &crc, head, tail - head - hash_len(HASH_CRC32)); @@ -134,7 +110,20 @@ int frct_pci_des(struct shm_du_buff * sdb, shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32)); } - shm_du_buff_head_release(sdb, get_head_len(pci)); + /* Depending on the type a different deserialization. */ + memcpy(&pci->type, head, TYPE_SIZE); + offset += TYPE_SIZE; + memcpy(&pci->flags, head + offset, FLAGS_SIZE); + offset += FLAGS_SIZE; + memcpy(&pci->seqno, head + offset, SEQNO_SIZE); + offset += SEQNO_SIZE; + + if (pci->type & PDU_TYPE_CONFIG) { + memcpy(&pci->cflags, head + offset, CONF_FLAGS_SIZE); + /* offset += CONF_FLAGS_SIZE; */ + } + + shm_du_buff_head_release(sdb, head_len(pci)); return 0; } diff --git a/src/lib/frct_pci.h b/src/lib/frct_pci.h new file mode 100644 index 00000000..fbbfd354 --- /dev/null +++ b/src/lib/frct_pci.h @@ -0,0 +1,67 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Protocol Control Information of FRCT + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_FRCT_PCI_H +#define OUROBOROS_LIB_FRCT_PCI_H + +#include <ouroboros/shm_du_buff.h> + +#include <stdint.h> +#include <stdbool.h> + +struct frct_pci { + /* Present in every PDU. */ + uint8_t type; + uint8_t flags; + uint64_t seqno; + + /* Present in config PDU. */ + uint16_t cflags; + + /* Present in flow control PDU. */ + uint64_t lwe; + uint64_t rwe; +}; + +enum pdu_types { + PDU_TYPE_DATA = 0x01, + PDU_TYPE_ACK = 0x02, + PDU_TYPE_FC = 0x04, + PDU_TYPE_ACK_AND_FC = (PDU_TYPE_ACK | PDU_TYPE_FC), + PDU_TYPE_RENDEZ_VOUS = 0x08, + PDU_TYPE_CONFIG = 0x10 +}; + +enum data_flags { + FLAG_DATA_RUN = 0x01, + FLAG_MORE_FRAGMENTS = 0x02 +}; + +int frct_pci_ser(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +int frct_pci_des(struct shm_du_buff * sdb, + struct frct_pci * pci, + bool error_check); + +#endif /* OUROBOROS_LIB_FRCT_PCI_H */ diff --git a/src/lib/rq.c b/src/lib/rq.c index bd0594b5..ba425236 100644 --- a/src/lib/rq.c +++ b/src/lib/rq.c @@ -20,7 +20,7 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include <ouroboros/rq.h> +#include "rq.h" #include <assert.h> @@ -77,11 +77,11 @@ int rq_push(struct rq * rq, return -1; i = ++rq->n_items; - j = i / 2; + j = i >> 1; while (i > 1 && rq->items[j].seqno > seqno) { rq->items[i] = rq->items[j]; i = j; - j = j / 2; + j >>= 1; } rq->items[i].seqno = seqno; @@ -121,7 +121,7 @@ size_t rq_pop(struct rq * rq) i = 1; while (true) { k = i; - j = 2 * i; + j = i << 1; if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno) k = j; diff --git a/src/lib/rq.h b/src/lib/rq.h new file mode 100644 index 00000000..7c024c11 --- /dev/null +++ b/src/lib/rq.h @@ -0,0 +1,47 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * Reordering queue + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_RQ_H +#define OUROBOROS_LIB_RQ_H + +#include <stdint.h> +#include <stdlib.h> +#include <stdbool.h> + +struct rq * rq_create(int size); + +void rq_destroy(struct rq * rq); + +int rq_push(struct rq * rq, + uint64_t seqno, + size_t idx); + +uint64_t rq_peek(struct rq * rq); + +bool rq_is_empty(struct rq * rq); + +size_t rq_pop(struct rq * rq); + +bool rq_has(struct rq * rq, + uint64_t seqno); + +#endif /* OUROBOROS_LIB_RQ_H */ diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c index e2d0f435..7b57cf30 100644 --- a/src/lib/tests/rq_test.c +++ b/src/lib/tests/rq_test.c @@ -20,7 +20,7 @@ * Foundation, Inc., http://www.fsf.org/about/contact/. */ -#include <ouroboros/rq.h> +#include "rq.h" #include <stdio.h> |