summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/dev.c719
-rw-r--r--src/lib/frct.c320
-rw-r--r--src/lib/frct_pci.c63
-rw-r--r--src/lib/frct_pci.h67
-rw-r--r--src/lib/rq.c8
-rw-r--r--src/lib/rq.h47
-rw-r--r--src/lib/tests/rq_test.c2
7 files changed, 683 insertions, 543 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 28a99bc4..ff22cca6 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -38,22 +38,19 @@
#include <ouroboros/fqueue.h>
#include <ouroboros/qoscube.h>
#include <ouroboros/timerwheel.h>
-#include <ouroboros/frct_pci.h>
-#include <ouroboros/rq.h>
+
+#include "frct_pci.h"
+#include "rq.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
+#include <stdbool.h>
+#include <sys/types.h>
#define BUF_SIZE 1500
-#define TW_ELEMENTS 6000
-#define TW_RESOLUTION 1 /* ms */
-
-#define MPL 2000 /* ms */
-#define RQ_SIZE 20
-
#ifndef CLOCK_REALTIME_COARSE
#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
#endif
@@ -76,26 +73,6 @@ enum port_state {
PORT_DESTROY
};
-struct frcti {
- bool used;
-
- struct timespec last_snd;
- bool snd_drf;
- uint64_t snd_lwe;
- uint64_t snd_rwe;
-
- struct timespec last_rcv;
- bool rcv_drf;
- uint64_t rcv_lwe;
- uint64_t rcv_rwe;
-
- uint16_t conf_flags;
-
- struct rq * rq;
-
- pthread_rwlock_t lock;
-};
-
struct port {
int fd;
@@ -119,6 +96,8 @@ struct flow {
bool rcv_timesout;
struct timespec snd_timeo;
struct timespec rcv_timeo;
+
+ struct frcti * frcti;
};
struct {
@@ -132,13 +111,15 @@ struct {
struct bmp * fds;
struct bmp * fqueues;
+
struct flow * flows;
struct port * ports;
- struct frcti * frcti;
pthread_rwlock_t lock;
} ai;
+#include "frct.c"
+
static void port_destroy(struct port * p)
{
pthread_mutex_lock(&p->state_lock);
@@ -185,12 +166,8 @@ static enum port_state port_wait_assign(int port_id)
enum port_state state;
struct port * p;
- pthread_rwlock_rdlock(&ai.lock);
-
p = &ai.ports[port_id];
- pthread_rwlock_unlock(&ai.lock);
-
pthread_mutex_lock(&p->state_lock);
if (p->state == PORT_ID_ASSIGNED) {
@@ -245,275 +222,8 @@ static int api_announce(char * ap_name)
return ret;
}
-/* Call under flows lock. */
-static int finalize_write(int fd,
- size_t idx)
-{
- int ret;
-
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
- if (ret < 0)
- return ret;
-
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
-
- return ret;
-}
-
-static int frcti_init(int fd)
-{
- struct frcti * frcti;
-
- frcti = &(ai.frcti[fd]);
-
- frcti->used = true;
-
- frcti->snd_drf = true;
- frcti->snd_lwe = 0;
- frcti->snd_rwe = 0;
-
- frcti->rcv_drf = true;
- frcti->rcv_lwe = 0;
- frcti->rcv_rwe = 0;
-
- frcti->conf_flags = 0;
-
- frcti->rq = rq_create(RQ_SIZE);
- if (frcti->rq == NULL)
- return -1;
-
- return 0;
-}
-
-static void frcti_clear(int fd)
-{
- ai.frcti[fd].used = false;
-}
-
-static void frcti_fini(int fd)
-{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything is acked.
- */
-
- frcti_clear(fd);
-
- rq_destroy(ai.frcti[fd].rq);
-}
-
-static int frcti_send(int fd,
- struct frct_pci * pci,
- struct shm_du_buff * sdb)
-{
- struct timespec now = {0, 0};
- struct frcti * frcti;
- int ret;
-
- frcti = &(ai.frcti[fd]);
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* Check if sender inactivity is true. */
- if (!frcti->snd_drf && ts_diff_ms(&now, &frcti->last_snd) > 2 * MPL)
- frcti->snd_drf = true;
-
- /* Set the DRF in the first packet of a new run of SDUs. */
- if (frcti->snd_drf) {
- pci->flags |= FLAG_DATA_RUN;
- frcti->snd_drf = false;
- }
-
- frcti->last_snd = now;
-
- pci->seqno = frcti->snd_lwe++;
-
- if (frct_pci_ser(sdb, pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- return -1;
- }
-
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&frcti->lock);
- return ret;
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- return 0;
-}
-
-
-static int frcti_configure(int fd,
- uint16_t flags)
-{
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
-
- frcti = &(ai.frcti[fd]);
-
- memset(&pci, 0, sizeof(pci));
-
- if (ipcp_sdb_reserve(&sdb, 0))
- return -1;
-
- pci.conf_flags = flags;
-
- /* Always set the DRF on a configure message. */
- pci.flags |= FLAG_DATA_RUN;
- pci.type |= PDU_TYPE_CONFIG;
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- frcti->conf_flags = pci.conf_flags;
-
- pthread_rwlock_unlock(&frcti->lock);
-
- if (frcti_send(fd, &pci, sdb)) {
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
- return -1;
- }
-
- return 0;
-}
-
-static int frcti_write(int fd,
- struct shm_du_buff * sdb)
-{
- struct frct_pci pci;
-
- memset(&pci, 0, sizeof(pci));
-
- pci.type |= PDU_TYPE_DATA;
-
- return frcti_send(fd, &pci, sdb);
-}
-
-static ssize_t frcti_read(int fd)
-{
- ssize_t idx;
- struct frcti * frcti;
- struct frct_pci pci;
- struct shm_du_buff * sdb;
- uint64_t seqno;
- bool nxt_pdu = true;
-
- frcti = &(ai.frcti[fd]);
-
- /* See if we already have the next PDU */
- pthread_rwlock_wrlock(&frcti->lock);
-
- if (!rq_is_empty(frcti->rq)) {
- seqno = rq_peek(frcti->rq);
- if (seqno == frcti->rcv_lwe) {
- frcti->rcv_lwe++;
- idx = rq_pop(frcti->rq);
- pthread_rwlock_unlock(&frcti->lock);
- return idx;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- do {
- struct timespec now;
- struct timespec abs;
- struct timespec * abstime = NULL;
- struct shm_rbuff * rb;
- bool noblock;
-
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
-
- pthread_rwlock_rdlock(&ai.lock);
-
- noblock = ai.flows[fd].oflags & FLOWFRNOBLOCK;
- rb = ai.flows[fd].rx_rb;
-
- if (ai.flows[fd].rcv_timesout) {
- ts_add(&now, &ai.flows[fd].rcv_timeo, &abs);
- abstime = &abs;
- }
-
- pthread_rwlock_unlock(&ai.lock);
-
- if (noblock) {
- idx = shm_rbuff_read(rb);
- } else {
- idx = shm_rbuff_read_b(rb, abstime);
- clock_gettime(CLOCK_REALTIME_COARSE, &now);
- }
-
- if (idx < 0)
- return idx;
-
- sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
- pthread_rwlock_wrlock(&frcti->lock);
-
- /* SDU may be corrupted. */
- if (frct_pci_des(sdb, &pci, frcti->conf_flags & FRCTFERRCHCK)) {
- pthread_rwlock_unlock(&frcti->lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return -EAGAIN;
- }
-
- /* Check if receiver inactivity is true. */
- if (!frcti->rcv_drf &&
- ts_diff_ms(&now, &frcti->last_rcv) > 3 * MPL)
- frcti->rcv_drf = true;
-
- /* When there is receiver inactivity queue the packet. */
- if (frcti->rcv_drf && !(pci.flags & FLAG_DATA_RUN)) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&frcti->lock);
- return -EAGAIN;
- }
-
- /* If the DRF is set, reset the state of the connection. */
- if (pci.flags & FLAG_DATA_RUN)
- frcti->rcv_lwe = pci.seqno;
-
- if (pci.type & PDU_TYPE_CONFIG)
- frcti->conf_flags = pci.conf_flags;
-
- if (frcti->rcv_drf)
- frcti->rcv_drf = false;
-
- frcti->last_rcv = now;
-
- nxt_pdu = true;
-
- if (!(pci.type & PDU_TYPE_DATA)) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- }
-
- if (frcti->conf_flags & FRCTFORDERING) {
- if (pci.seqno != frcti->rcv_lwe) {
- if (rq_push(frcti->rq, pci.seqno, idx))
- shm_rdrbuff_remove(ai.rdrb, idx);
- nxt_pdu = false;
- } else {
- frcti->rcv_lwe++;
- }
- }
-
- pthread_rwlock_unlock(&frcti->lock);
-
- } while (!nxt_pdu);
-
- return idx;
-}
-
static void flow_clear(int fd)
{
- assert(!(fd < 0));
-
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
ai.flows[fd].port_id = -1;
@@ -525,8 +235,10 @@ static void flow_fini(int fd)
{
assert(!(fd < 0));
- if (ai.flows[fd].port_id != -1)
+ if (ai.flows[fd].port_id != -1) {
port_destroy(&ai.ports[ai.flows[fd].port_id]);
+ bmp_release(ai.fds, fd);
+ }
if (ai.flows[fd].rx_rb != NULL)
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -537,8 +249,8 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL)
shm_flow_set_close(ai.flows[fd].set);
- if (ai.frcti[fd].used)
- frcti_fini(fd);
+ if (ai.flows[fd].frcti != NULL)
+ frcti_destroy(ai.flows[fd].frcti);
flow_clear(fd);
}
@@ -548,37 +260,27 @@ static int flow_init(int port_id,
qoscube_t qc)
{
int fd;
+ int err = -ENOMEM;
pthread_rwlock_wrlock(&ai.lock);
fd = bmp_allocate(ai.fds);
if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.lock);
- return -EBADF;
+ err = -EBADF;
+ goto fail_fds;
}
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
- if (ai.flows[fd].rx_rb == NULL) {
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].rx_rb == NULL)
+ goto fail;
ai.flows[fd].tx_rb = shm_rbuff_open(api, port_id);
- if (ai.flows[fd].tx_rb == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].tx_rb == NULL)
+ goto fail;
ai.flows[fd].set = shm_flow_set_open(api);
- if (ai.flows[fd].set == NULL) {
- flow_fini(fd);
- bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.lock);
- return -ENOMEM;
- }
+ if (ai.flows[fd].set == NULL)
+ goto fail;
ai.flows[fd].port_id = port_id;
ai.flows[fd].oflags = FLOWFDEFAULT;
@@ -593,6 +295,12 @@ static int flow_init(int port_id,
pthread_rwlock_unlock(&ai.lock);
return fd;
+
+ fail:
+ flow_fini(fd);
+ fail_fds:
+ pthread_rwlock_unlock(&ai.lock);
+ return err;
}
static bool check_python(char * str)
@@ -611,7 +319,6 @@ __attribute__((constructor)) static void init(int argc,
{
const char * ap_name = argv[0];
int i;
- int j;
(void) argc;
(void) envp;
@@ -643,20 +350,8 @@ __attribute__((constructor)) static void init(int argc,
if (ai.flows == NULL)
goto fail_flows;
- ai.frcti = malloc(sizeof(*ai.frcti) * AP_MAX_FLOWS);
- if (ai.frcti == NULL)
- goto fail_frcti;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
flow_clear(i);
- frcti_clear(i);
-
- if (pthread_rwlock_init(&ai.frcti[i].lock, NULL)) {
- for (j = i - 1; j >= 0 ; j--)
- pthread_rwlock_destroy(&ai.frcti[j].lock);
- goto fail_frct_lock;
- }
- }
ai.ports = malloc(sizeof(*ai.ports) * SYS_MAX_FLOWS);
if (ai.ports == NULL)
@@ -690,13 +385,12 @@ __attribute__((constructor)) static void init(int argc,
if (pthread_rwlock_init(&ai.lock, NULL))
goto fail_lock;
- ai.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
- if (ai.tw == NULL)
- goto fail_timerwheel;
+ if (frct_init())
+ goto fail_frct;
return;
- fail_timerwheel:
+ fail_frct:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -709,11 +403,6 @@ __attribute__((constructor)) static void init(int argc,
fail_ap_name:
free(ai.ports);
fail_ports:
- for (i = 0; i < AP_MAX_FLOWS; ++i)
- pthread_rwlock_destroy(&ai.frcti[i].lock);
- fail_frct_lock:
- free(ai.frcti);
- fail_frcti:
free(ai.flows);
fail_flows:
shm_rdrbuff_close(ai.rdrb);
@@ -737,15 +426,14 @@ __attribute__((destructor)) static void fini(void)
if (ai.fds == NULL)
return;
- bmp_destroy(ai.fds);
- bmp_destroy(ai.fqueues);
+ frct_fini();
shm_flow_set_destroy(ai.fqset);
if (ai.ap_name != NULL)
free(ai.ap_name);
- pthread_rwlock_rdlock(&ai.lock);
+ pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (ai.flows[i].port_id != -1) {
@@ -754,8 +442,6 @@ __attribute__((destructor)) static void fini(void)
shm_rdrbuff_remove(ai.rdrb, idx);
flow_fini(i);
}
-
- pthread_rwlock_destroy(&ai.frcti[i].lock);
}
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
@@ -770,7 +456,9 @@ __attribute__((destructor)) static void fini(void)
free(ai.flows);
free(ai.ports);
- free(ai.frcti);
+
+ bmp_destroy(ai.fds);
+ bmp_destroy(ai.fqueues);
pthread_rwlock_unlock(&ai.lock);
@@ -825,7 +513,16 @@ int flow_accept(qosspec_t * qs,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
if (qs != NULL)
*qs = ai.flows[fd].spec;
@@ -891,7 +588,15 @@ int flow_alloc(const char * dst_name,
pthread_rwlock_wrlock(&ai.lock);
- frcti_init(fd);
+ /* FIXME: check if FRCT is needed based on qc? */
+ assert(ai.flows[fd].frcti == NULL);
+
+ ai.flows[fd].frcti = frcti_create(fd);
+ if (ai.flows[fd].frcti == NULL) {
+ flow_fini(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ return -ENOMEM;
+ }
pthread_rwlock_unlock(&ai.lock);
@@ -913,7 +618,7 @@ int flow_dealloc(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(!(ai.flows[fd].port_id < 0));
+ assert(ai.flows[fd].port_id >= 0);
msg.port_id = ai.flows[fd].port_id;
@@ -933,7 +638,6 @@ int flow_dealloc(int fd)
pthread_rwlock_wrlock(&ai.lock);
flow_fini(fd);
- bmp_release(ai.fds, fd);
pthread_rwlock_unlock(&ai.lock);
@@ -944,6 +648,7 @@ int fccntl(int fd,
int cmd,
...)
{
+ uint16_t sflags;
uint32_t * fflags;
uint16_t * cflags;
va_list l;
@@ -951,15 +656,18 @@ int fccntl(int fd,
qosspec_t * qs;
uint32_t rx_acl;
uint32_t tx_acl;
+ struct flow * flow;
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
va_start(l, cmd);
pthread_rwlock_wrlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -969,57 +677,57 @@ int fccntl(int fd,
case FLOWSSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].snd_timesout = false;
+ flow->snd_timesout = false;
} else {
- ai.flows[fd].snd_timesout = true;
- ai.flows[fd].snd_timeo = *timeo;
+ flow->snd_timesout = true;
+ flow->snd_timeo = *timeo;
}
break;
case FLOWGSNDTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].snd_timesout)
+ if (!flow->snd_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWSRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL) {
- ai.flows[fd].rcv_timesout = false;
+ flow->rcv_timesout = false;
} else {
- ai.flows[fd].rcv_timesout = true;
- ai.flows[fd].rcv_timeo = *timeo;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = *timeo;
}
break;
case FLOWGRCVTIMEO:
timeo = va_arg(l, struct timespec *);
if (timeo == NULL)
goto einval;
- if (!ai.flows[fd].rcv_timesout)
+ if (!flow->rcv_timesout)
goto eperm;
- *timeo = ai.flows[fd].snd_timeo;
+ *timeo = flow->snd_timeo;
break;
case FLOWGQOSSPEC:
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = ai.flows[fd].spec;
+ *qs = flow->spec;
break;
case FLOWSFLAGS:
- ai.flows[fd].oflags = va_arg(l, uint32_t);
- rx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
- tx_acl = shm_rbuff_get_acl(ai.flows[fd].rx_rb);
+ flow->oflags = va_arg(l, uint32_t);
+ rx_acl = shm_rbuff_get_acl(flow->rx_rb);
+ tx_acl = shm_rbuff_get_acl(flow->rx_rb);
/*
* Making our own flow write only means making the
* the other side of the flow read only.
*/
- if (ai.flows[fd].oflags & FLOWFWRONLY)
+ if (flow->oflags & FLOWFWRONLY)
rx_acl |= ACL_RDONLY;
- if (ai.flows[fd].oflags & FLOWFRDWR)
+ if (flow->oflags & FLOWFRDWR)
rx_acl |= ACL_RDWR;
- if (ai.flows[fd].oflags & FLOWFDOWN) {
+ if (flow->oflags & FLOWFDOWN) {
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
} else {
@@ -1027,26 +735,28 @@ int fccntl(int fd,
tx_acl &= ~ACL_FLOWDOWN;
}
- shm_rbuff_set_acl(ai.flows[fd].rx_rb, rx_acl);
- shm_rbuff_set_acl(ai.flows[fd].tx_rb, tx_acl);
+ shm_rbuff_set_acl(flow->rx_rb, rx_acl);
+ shm_rbuff_set_acl(flow->tx_rb, tx_acl);
break;
case FLOWGFLAGS:
fflags = va_arg(l, uint32_t *);
if (fflags == NULL)
goto einval;
- *fflags = ai.flows[fd].oflags;
+ *fflags = flow->oflags;
break;
case FRCTSFLAGS:
- ai.frcti[fd].conf_flags = (uint16_t) va_arg(l, int);
+ sflags = (uint16_t) va_arg(l, int);
+ if (flow->frcti == NULL || frcti_setconf(flow->frcti, sflags))
+ goto eperm;
break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
goto einval;
- *cflags = ai.frcti[fd].conf_flags;
- if (frcti_configure(fd, ai.frcti[fd].conf_flags))
+ if (flow->frcti == NULL)
goto eperm;
+ *cflags = frcti_getconf(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1075,8 +785,10 @@ ssize_t flow_write(int fd,
const void * buf,
size_t count)
{
- ssize_t idx;
- int ret;
+ struct flow * flow;
+ ssize_t idx;
+ int ret;
+ int flags;
if (buf == NULL)
return 0;
@@ -1084,104 +796,110 @@ ssize_t flow_write(int fd,
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
- pthread_rwlock_unlock(&ai.lock);
- return -EPERM;
- }
+ flags = flow->oflags;
- if (ai.flows[fd].oflags & FLOWFWNOBLOCK) {
- idx = shm_rdrbuff_write(ai.rdrb,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- buf,
- count);
- if (idx < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return idx;
- }
+ pthread_rwlock_unlock(&ai.lock);
- } else { /* Blocking. */
- pthread_rwlock_unlock(&ai.lock);
+ if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
+ return -EPERM;
+ if (flags & FLOWFWNOBLOCK)
+ idx = shm_rdrbuff_write(ai.rdrb,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ buf,
+ count);
+ else /* Blocking. */
idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
buf,
count);
- if (idx < 0)
- return idx;
+ if (idx < 0)
+ return idx;
- pthread_rwlock_rdlock(&ai.lock);
+ if (frcti_snd(flow->frcti, shm_rdrbuff_get(ai.rdrb, idx)) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -ENOMEM;
}
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
+ pthread_rwlock_rdlock(&ai.lock);
- pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret < 0)
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ else
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- ret = frcti_write(fd, shm_rdrbuff_get(ai.rdrb, idx));
- if (ret < 0) {
- shm_rdrbuff_remove(ai.rdrb, idx);
- return ret;
- }
- }
+ pthread_rwlock_unlock(&ai.lock);
- return 0;
+ assert(ret <= 0);
+
+ return ret;
}
ssize_t flow_read(int fd,
void * buf,
size_t count)
{
- ssize_t idx;
- ssize_t n;
- uint8_t * sdu;
- bool used;
- struct shm_rbuff * rb;
+ ssize_t idx;
+ ssize_t n;
+ uint8_t * sdu;
+ struct shm_rbuff * rb;
+ struct shm_du_buff * sdb;
+ struct timespec now;
+ struct timespec abs;
+ struct timespec * abstime = NULL;
+ struct flow * flow;
+ bool noblock;
if (fd < 0 || fd > AP_MAX_FLOWS)
return -EBADF;
+ flow = &ai.flows[fd];
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- used = ai.frcti[fd].used;
- rb = ai.flows[fd].rx_rb;
+ rb = flow->rx_rb;
+ noblock = flow->oflags & FLOWFRNOBLOCK;
- pthread_rwlock_unlock(&ai.lock);
+ if (ai.flows[fd].rcv_timesout) {
+ ts_add(&now, &flow->rcv_timeo, &abs);
+ abstime = &abs;
+ }
- if (!used)
- idx = shm_rbuff_read(rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
+ idx = frcti_queued_pdu(flow->frcti);
if (idx < 0) {
- assert(idx == -EAGAIN || idx == -ETIMEDOUT ||
- idx == -EFLOWDOWN);
- return idx;
+ do {
+ idx = noblock ? shm_rbuff_read(rb) :
+ shm_rbuff_read_b(rb, abstime);
+ if (idx < 0)
+ return idx;
+ sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, sdb) != 0);
}
n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
- if (n < 0)
- return -1;
+
+ assert(n >= 0);
memcpy(buf, sdu, MIN((size_t) n, count));
@@ -1432,7 +1150,7 @@ int ipcp_create_r(pid_t api,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1509,7 +1227,7 @@ int ipcp_flow_alloc_reply(int fd,
if (recv_msg == NULL)
return -EIRMD;
- if (recv_msg->has_result == false) {
+ if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -1524,30 +1242,37 @@ int ipcp_flow_alloc_reply(int fd,
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb)
{
- ssize_t idx = -1;
- int port_id = -1;
+ struct flow * flow;
+ struct shm_rbuff * rb;
+ ssize_t idx;
assert(fd >= 0);
assert(sdb);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if ((port_id = ai.flows[fd].port_id) < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- pthread_rwlock_unlock(&ai.lock);
+ rb = flow->rx_rb;
- if (!ai.frcti[fd].used)
- idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- else
- idx = frcti_read(fd);
+ pthread_rwlock_unlock(&ai.lock);
- if (idx < 0)
- return idx;
+ if (flow->frcti != NULL) {
+ idx = frcti_queued_pdu(flow->frcti);
+ if (idx >= 0) {
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ return 0;
+ }
+ }
- *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ do {
+ idx = shm_rbuff_read(rb);
+ if (idx < 0)
+ return idx;
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+ } while (frcti_rcv(flow->frcti, *sdb) != 0);
return 0;
}
@@ -1555,53 +1280,49 @@ int ipcp_flow_read(int fd,
int ipcp_flow_write(int fd,
struct shm_du_buff * sdb)
{
- int ret;
+ struct flow * flow;
+ int ret;
+ ssize_t idx;
- if (sdb == NULL)
- return -EINVAL;
+ assert(sdb);
+
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(flow->port_id >= 0);
- if ((ai.flows[fd].oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+ if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
- assert(ai.flows[fd].tx_rb);
+ assert(flow->tx_rb);
- if (!ai.frcti[fd].used) {
- ret = finalize_write(fd, shm_du_buff_get_idx(sdb));
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ idx = shm_du_buff_get_idx(sdb);
+ if (frcti_snd(flow->frcti, sdb) < 0) {
pthread_rwlock_unlock(&ai.lock);
- } else {
- pthread_rwlock_unlock(&ai.lock);
-
- ret = frcti_write(fd, sdb);
- if (ret < 0)
- return ret;
+ return -ENOMEM;
}
- return 0;
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ assert(ret <= 0);
+
+ return ret;
}
int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
size_t len)
{
- struct shm_rdrbuff * rdrb;
- ssize_t idx;
-
- rdrb = ai.rdrb;
+ ssize_t idx;
- idx = shm_rdrbuff_write_b(rdrb,
+ idx = shm_rdrbuff_write_b(ai.rdrb,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
NULL,
@@ -1610,15 +1331,22 @@ int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
if (idx < 0)
return -1;
- *sdb = shm_rdrbuff_get(rdrb, idx);
+ *sdb = shm_rdrbuff_get(ai.rdrb, idx);
return 0;
}
+void ipcp_sdb_release(struct shm_du_buff * sdb)
+{
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+}
+
void ipcp_flow_fini(int fd)
{
struct shm_rbuff * rx_rb;
+ assert(fd >= 0);
+
fccntl(fd, FLOWSFLAGS, FLOWFWRONLY);
pthread_rwlock_rdlock(&ai.lock);
@@ -1633,15 +1361,12 @@ void ipcp_flow_fini(int fd)
int ipcp_flow_get_qoscube(int fd,
qoscube_t * cube)
{
- if (fd < 0 || fd > AP_MAX_FLOWS || cube == NULL)
- return -EINVAL;
+ assert(fd >= 0);
+ assert(cube);
- pthread_rwlock_wrlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return -ENOTALLOC;
- }
+ assert(ai.flows[fd].port_id >= 0);
*cube = ai.flows[fd].cube;
@@ -1670,28 +1395,20 @@ int local_flow_write(int fd,
{
int ret;
- if (fd < 0)
- return -EINVAL;
+ assert(fd >= 0);
pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.lock);
+ pthread_rwlock_rdlock(&ai.lock);
return -ENOTALLOC;
}
- ret = finalize_write(fd, idx);
- if (ret < 0) {
- pthread_rwlock_unlock(&ai.lock);
- return ret;
- }
+ ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ if (ret == 0)
+ shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
pthread_rwlock_unlock(&ai.lock);
- return 0;
-}
-
-void ipcp_sdb_release(struct shm_du_buff * sdb)
-{
- shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
+ return ret;
}
diff --git a/src/lib/frct.c b/src/lib/frct.c
new file mode 100644
index 00000000..abebb2ff
--- /dev/null
+++ b/src/lib/frct.c
@@ -0,0 +1,320 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Flow and Retransmission Control
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+/* Default Delta-t parameters */
+#define DELT_MPL 60000 /* ms */
+#define DELT_A 0 /* ms */
+#define DELT_R 2000 /* ms */
+
+#define RQ_SIZE 20
+
+#define TW_ELEMENTS 6000
+#define TW_RESOLUTION 1 /* ms */
+
+struct frct_cr {
+ bool drf;
+ uint64_t lwe;
+ uint64_t rwe;
+
+ bool conf;
+ uint16_t cflags;
+
+ time_t act;
+ time_t inact;
+};
+
+struct frcti {
+ int fd;
+
+ time_t mpl;
+ time_t a;
+ time_t r;
+
+ struct frct_cr snd_cr;
+ struct frct_cr rcv_cr;
+
+ struct rq * rq;
+
+ struct timespec rtt;
+
+ pthread_rwlock_t lock;
+};
+
+struct {
+ struct timerwheel * tw;
+} frct;
+
+static int frct_init(void)
+{
+ frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * TW_ELEMENTS);
+ if (frct.tw == NULL)
+ return -1;
+
+ return 0;
+}
+
+static void frct_fini(void)
+{
+ assert(frct.tw);
+
+ timerwheel_destroy(frct.tw);
+}
+
+static struct frcti * frcti_create(int fd)
+{
+ struct frcti * frcti;
+ time_t delta_t;
+
+ frcti = malloc(sizeof(*frcti));
+ if (frcti == NULL)
+ goto fail_malloc;
+
+ if (pthread_rwlock_init(&frcti->lock, NULL))
+ goto fail_lock;
+
+ frcti->rq = rq_create(RQ_SIZE);
+ if (frcti->rq == NULL)
+ goto fail_rq;
+
+ frcti->mpl = DELT_MPL;
+ frcti->a = DELT_A;
+ frcti->r = DELT_R;
+ frcti->fd = fd;
+
+ delta_t = (frcti->mpl + frcti->a + frcti->r) / 1000;
+
+ frcti->snd_cr.drf = true;
+ frcti->snd_cr.conf = true;
+ frcti->snd_cr.lwe = 0;
+ frcti->snd_cr.rwe = 0;
+ frcti->snd_cr.cflags = 0;
+ frcti->snd_cr.inact = 2 * delta_t + 1;
+
+ frcti->rcv_cr.drf = true;
+ frcti->rcv_cr.lwe = 0;
+ frcti->rcv_cr.rwe = 0;
+ frcti->rcv_cr.cflags = 0;
+ frcti->rcv_cr.inact = 3 * delta_t + 1;
+
+ return frcti;
+
+ fail_rq:
+ pthread_rwlock_destroy(&frcti->lock);
+ fail_lock:
+ free(frcti);
+ fail_malloc:
+ return NULL;
+}
+
+static void frcti_destroy(struct frcti * frcti)
+{
+ /*
+ * FIXME: In case of reliable transmission we should
+ * make sure everything is acked.
+ */
+
+ pthread_rwlock_destroy(&frcti->lock);
+
+ rq_destroy(frcti->rq);
+ free(frcti);
+}
+
+static int frcti_setconf(struct frcti * frcti,
+ uint16_t flags)
+{
+ assert(frcti);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (frcti->snd_cr.cflags != flags) {
+ frcti->snd_cr.cflags = flags;
+ frcti->snd_cr.conf = true;
+ frcti->snd_cr.drf = true;
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return 0;
+}
+
+static uint16_t frcti_getconf(struct frcti * frcti)
+{
+ uint16_t ret;
+
+ assert (frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ ret = frcti->snd_cr.cflags;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return ret;
+}
+
+#define frcti_queued_pdu(frcti) \
+ (frcti == NULL ? -1 : __frcti_queued_pdu(frcti))
+
+#define frcti_snd(frcti, sdb) \
+ (frcti == NULL ? 0 : __frcti_snd(frcti, sdb))
+
+#define frcti_rcv(frcti, sdb) \
+ (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
+
+static ssize_t __frcti_queued_pdu(struct frcti * frcti)
+{
+ ssize_t idx = -1;
+
+ assert(frcti);
+
+ /* See if we already have the next PDU. */
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (!rq_is_empty(frcti->rq)) {
+ if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) {
+ ++frcti->rcv_cr.lwe;
+ idx = rq_pop(frcti->rq);
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return idx;
+}
+
+static int __frcti_snd(struct frcti * frcti,
+ struct shm_du_buff * sdb)
+{
+ struct frct_pci pci;
+ struct timespec now;
+ struct frct_cr * snd_cr;
+
+ if (frcti == NULL)
+ return 0;
+
+ snd_cr = &frcti->snd_cr;
+
+ memset(&pci, 0, sizeof(pci));
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pci.type |= PDU_TYPE_DATA;
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ /* Check if sender is inactive. */
+ if (!snd_cr->drf && now.tv_sec - snd_cr->act > snd_cr->inact)
+ snd_cr->drf = true;
+
+ /* Set the DRF in the first packet of a new run of SDUs. */
+ if (snd_cr->drf) {
+ pci.flags |= FLAG_DATA_RUN;
+ if (snd_cr->conf) {
+ pci.type |= PDU_TYPE_CONFIG;
+ pci.cflags = snd_cr->cflags;
+ }
+ }
+
+ pci.seqno = snd_cr->lwe++;
+
+ if (frct_pci_ser(sdb, &pci, snd_cr->cflags & FRCTFERRCHCK)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return -1;
+ }
+
+ snd_cr->act = now.tv_sec;
+
+ snd_cr->drf = false;
+ snd_cr->conf = false;
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return 0;
+}
+
+/* Returns 0 when idx contains an SDU for the application. */
+static int __frcti_rcv(struct frcti * frcti,
+ struct shm_du_buff * sdb)
+{
+ ssize_t idx;
+ struct frct_pci pci;
+ struct timespec now;
+ struct frct_cr * rcv_cr;
+
+ assert(frcti);
+
+ rcv_cr = &frcti->rcv_cr;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ /* SDU may be corrupted. */
+ if (frct_pci_des(sdb, &pci, rcv_cr->cflags & FRCTFERRCHCK)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -EAGAIN;
+ }
+
+ /* Check if receiver inactivity is true. */
+ if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact)
+ rcv_cr->drf = true;
+
+ /* When there is receiver inactivity and no DRF, drop the SDU. */
+ if (rcv_cr->drf && !(pci.flags & FLAG_DATA_RUN)) {
+ pthread_rwlock_unlock(&frcti->lock);
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ return -EAGAIN;
+ }
+
+ /* If the DRF is set, reset the state of the connection. */
+ if (pci.flags & FLAG_DATA_RUN) {
+ rcv_cr->lwe = pci.seqno;
+ if (pci.type & PDU_TYPE_CONFIG)
+ rcv_cr->cflags = pci.cflags;
+ }
+
+ if (rcv_cr->drf)
+ rcv_cr->drf = false;
+
+ rcv_cr->act = now.tv_sec;
+
+ if (!(pci.type & PDU_TYPE_DATA))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+
+ if (rcv_cr->cflags & FRCTFORDERING) {
+ if (pci.seqno != frcti->rcv_cr.lwe) {
+ if (rq_push(frcti->rq, pci.seqno, idx))
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ pthread_rwlock_unlock(&frcti->lock);
+ return -EAGAIN;
+ } else {
+ ++rcv_cr->lwe;
+ }
+ }
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ return 0;
+}
diff --git a/src/lib/frct_pci.c b/src/lib/frct_pci.c
index e44554f2..509cc8e2 100644
--- a/src/lib/frct_pci.c
+++ b/src/lib/frct_pci.c
@@ -20,29 +20,23 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#include <ouroboros/frct_pci.h>
#include <ouroboros/hash.h>
#include <ouroboros/errno.h>
+#include "frct_pci.h"
+
#include <assert.h>
#include <string.h>
-#define TYPE_SIZE 1
-#define SEQNO_SIZE 8
-#define FLAGS_SIZE 1
-#define CONF_FLAGS_SIZE sizeof(((struct frct_pci *) NULL)->conf_flags)
-#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE
-#define CONFIG_SIZE CONF_FLAGS_SIZE
-
-static size_t get_head_len(struct frct_pci * pci)
-{
- size_t len = BASE_SIZE;
+#define TYPE_SIZE 1
+#define FLAGS_SIZE 1
+#define SEQNO_SIZE 8
+#define CONF_FLAGS_SIZE 2
- if (pci->type & PDU_TYPE_CONFIG)
- len += CONFIG_SIZE;
+#define BASE_SIZE TYPE_SIZE + FLAGS_SIZE + SEQNO_SIZE
- return len;
-}
+#define head_len(pci) (pci->type & PDU_TYPE_CONFIG ? \
+ BASE_SIZE + CONF_FLAGS_SIZE : BASE_SIZE)
int frct_pci_ser(struct shm_du_buff * sdb,
struct frct_pci * pci,
@@ -50,15 +44,12 @@ int frct_pci_ser(struct shm_du_buff * sdb,
{
uint8_t * head;
uint8_t * tail;
- size_t len;
size_t offset = 0;
assert(sdb);
assert(pci);
- len = get_head_len(pci);
-
- head = shm_du_buff_head_alloc(sdb, len);
+ head = shm_du_buff_head_alloc(sdb, head_len(pci));
if (head == NULL)
return -EPERM;
@@ -70,14 +61,14 @@ int frct_pci_ser(struct shm_du_buff * sdb,
offset += SEQNO_SIZE;
if (pci->type & PDU_TYPE_CONFIG) {
- memcpy(head + offset, &pci->conf_flags, CONF_FLAGS_SIZE);
+ memcpy(head + offset, &pci->cflags, CONF_FLAGS_SIZE);
/* offset += CONF_FLAGS_SIZE; */
}
if (error_check) {
tail = shm_du_buff_tail_alloc(sdb, hash_len(HASH_CRC32));
if (tail == NULL) {
- shm_du_buff_head_release(sdb, len);
+ shm_du_buff_head_release(sdb, head_len(pci));
return -EPERM;
}
@@ -103,23 +94,8 @@ int frct_pci_des(struct shm_du_buff * sdb,
head = shm_du_buff_head(sdb);
- /* Depending on the type a different deserialization. */
- memcpy(&pci->type, head, TYPE_SIZE);
- offset += TYPE_SIZE;
- memcpy(&pci->flags, head + offset, FLAGS_SIZE);
- offset += FLAGS_SIZE;
- memcpy(&pci->seqno, head + offset, SEQNO_SIZE);
- offset += SEQNO_SIZE;
-
- if (pci->type & PDU_TYPE_CONFIG) {
- memcpy(&pci->conf_flags, head + offset, CONF_FLAGS_SIZE);
- /* offset += CONF_FLAGS_SIZE; */
- }
-
if (error_check) {
tail = shm_du_buff_tail(sdb);
- if (tail == NULL)
- return -EPERM;
mem_hash(HASH_CRC32, &crc, head,
tail - head - hash_len(HASH_CRC32));
@@ -134,7 +110,20 @@ int frct_pci_des(struct shm_du_buff * sdb,
shm_du_buff_tail_release(sdb, hash_len(HASH_CRC32));
}
- shm_du_buff_head_release(sdb, get_head_len(pci));
+ /* Depending on the type a different deserialization. */
+ memcpy(&pci->type, head, TYPE_SIZE);
+ offset += TYPE_SIZE;
+ memcpy(&pci->flags, head + offset, FLAGS_SIZE);
+ offset += FLAGS_SIZE;
+ memcpy(&pci->seqno, head + offset, SEQNO_SIZE);
+ offset += SEQNO_SIZE;
+
+ if (pci->type & PDU_TYPE_CONFIG) {
+ memcpy(&pci->cflags, head + offset, CONF_FLAGS_SIZE);
+ /* offset += CONF_FLAGS_SIZE; */
+ }
+
+ shm_du_buff_head_release(sdb, head_len(pci));
return 0;
}
diff --git a/src/lib/frct_pci.h b/src/lib/frct_pci.h
new file mode 100644
index 00000000..fbbfd354
--- /dev/null
+++ b/src/lib/frct_pci.h
@@ -0,0 +1,67 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Protocol Control Information of FRCT
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_FRCT_PCI_H
+#define OUROBOROS_LIB_FRCT_PCI_H
+
+#include <ouroboros/shm_du_buff.h>
+
+#include <stdint.h>
+#include <stdbool.h>
+
+struct frct_pci {
+ /* Present in every PDU. */
+ uint8_t type;
+ uint8_t flags;
+ uint64_t seqno;
+
+ /* Present in config PDU. */
+ uint16_t cflags;
+
+ /* Present in flow control PDU. */
+ uint64_t lwe;
+ uint64_t rwe;
+};
+
+enum pdu_types {
+ PDU_TYPE_DATA = 0x01,
+ PDU_TYPE_ACK = 0x02,
+ PDU_TYPE_FC = 0x04,
+ PDU_TYPE_ACK_AND_FC = (PDU_TYPE_ACK | PDU_TYPE_FC),
+ PDU_TYPE_RENDEZ_VOUS = 0x08,
+ PDU_TYPE_CONFIG = 0x10
+};
+
+enum data_flags {
+ FLAG_DATA_RUN = 0x01,
+ FLAG_MORE_FRAGMENTS = 0x02
+};
+
+int frct_pci_ser(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check);
+
+int frct_pci_des(struct shm_du_buff * sdb,
+ struct frct_pci * pci,
+ bool error_check);
+
+#endif /* OUROBOROS_LIB_FRCT_PCI_H */
diff --git a/src/lib/rq.c b/src/lib/rq.c
index bd0594b5..ba425236 100644
--- a/src/lib/rq.c
+++ b/src/lib/rq.c
@@ -20,7 +20,7 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#include <ouroboros/rq.h>
+#include "rq.h"
#include <assert.h>
@@ -77,11 +77,11 @@ int rq_push(struct rq * rq,
return -1;
i = ++rq->n_items;
- j = i / 2;
+ j = i >> 1;
while (i > 1 && rq->items[j].seqno > seqno) {
rq->items[i] = rq->items[j];
i = j;
- j = j / 2;
+ j >>= 1;
}
rq->items[i].seqno = seqno;
@@ -121,7 +121,7 @@ size_t rq_pop(struct rq * rq)
i = 1;
while (true) {
k = i;
- j = 2 * i;
+ j = i << 1;
if (j <= rq->n_items && rq->items[j].seqno < rq->items[k].seqno)
k = j;
diff --git a/src/lib/rq.h b/src/lib/rq.h
new file mode 100644
index 00000000..7c024c11
--- /dev/null
+++ b/src/lib/rq.h
@@ -0,0 +1,47 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Reordering queue
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_RQ_H
+#define OUROBOROS_LIB_RQ_H
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdbool.h>
+
+struct rq * rq_create(int size);
+
+void rq_destroy(struct rq * rq);
+
+int rq_push(struct rq * rq,
+ uint64_t seqno,
+ size_t idx);
+
+uint64_t rq_peek(struct rq * rq);
+
+bool rq_is_empty(struct rq * rq);
+
+size_t rq_pop(struct rq * rq);
+
+bool rq_has(struct rq * rq,
+ uint64_t seqno);
+
+#endif /* OUROBOROS_LIB_RQ_H */
diff --git a/src/lib/tests/rq_test.c b/src/lib/tests/rq_test.c
index e2d0f435..7b57cf30 100644
--- a/src/lib/tests/rq_test.c
+++ b/src/lib/tests/rq_test.c
@@ -20,7 +20,7 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#include <ouroboros/rq.h>
+#include "rq.h"
#include <stdio.h>