summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2020-09-20 13:04:52 +0200
committerSander Vrijders <[email protected]>2020-09-25 11:52:51 +0200
commit1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 (patch)
tree5193774eea2bb204d062cc47e178a3702d4790a4 /src
parent5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (diff)
downloadouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.tar.gz
ouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.zip
lib: Complete retransmission logic
This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src')
-rw-r--r--src/irmd/ipcp.c13
-rw-r--r--src/irmd/ipcp.h5
-rw-r--r--src/irmd/main.c20
-rw-r--r--src/lib/dev.c133
-rw-r--r--src/lib/frct.c244
-rw-r--r--src/lib/ipcpd_messages.proto3
-rw-r--r--src/lib/rxmwheel.c261
-rw-r--r--src/lib/timerwheel.c409
8 files changed, 690 insertions, 398 deletions
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 78408185..cbd9ee15 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -543,16 +543,19 @@ int ipcp_flow_alloc_resp(pid_t pid,
return ret;
}
-int ipcp_flow_dealloc(pid_t pid,
- int flow_id)
+int ipcp_flow_dealloc(pid_t pid,
+ int flow_id,
+ time_t timeo)
{
ipcp_msg_t msg = IPCP_MSG__INIT;
ipcp_msg_t * recv_msg = NULL;
int ret = -1;
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.flow_id = flow_id;
+ msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
+ msg.has_flow_id = true;
+ msg.flow_id = flow_id;
+ msg.has_timeo_sec = true;
+ msg.timeo_sec = timeo;
recv_msg = send_recv_ipcp_msg(pid, &msg);
if (recv_msg == NULL)
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
index ae00792b..652316ba 100644
--- a/src/irmd/ipcp.h
+++ b/src/irmd/ipcp.h
@@ -85,7 +85,8 @@ int ipcp_flow_alloc_resp(pid_t pid,
const void * data,
size_t len);
-int ipcp_flow_dealloc(pid_t pid,
- int flow_id);
+int ipcp_flow_dealloc(pid_t pid,
+ int flow_id,
+ time_t timeo);
#endif /* OUROBOROS_IRMD_IPCP_H */
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 3709a3e5..3a0ad544 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -68,10 +68,11 @@
#endif
#define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
-#define SHM_SAN_HOLDOFF 1000 /* ms */
-#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo)
-#define IB_LEN SOCK_BUF_SIZE
-#define BIND_TIMEOUT 10 /* ms */
+#define SHM_SAN_HOLDOFF 1000 /* ms */
+#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo)
+#define IB_LEN SOCK_BUF_SIZE
+#define BIND_TIMEOUT 10 /* ms */
+#define DEALLOC_TIME 300 /* s */
enum init_state {
IPCP_NULL = 0,
@@ -1475,7 +1476,8 @@ static int flow_alloc(pid_t pid,
}
static int flow_dealloc(pid_t pid,
- int flow_id)
+ int flow_id,
+ time_t timeo)
{
pid_t n_1_pid = -1;
int ret = 0;
@@ -1521,7 +1523,7 @@ static int flow_dealloc(pid_t pid,
pthread_rwlock_unlock(&irmd.flows_lock);
if (n_1_pid != -1)
- ret = ipcp_flow_dealloc(n_1_pid, flow_id);
+ ret = ipcp_flow_dealloc(n_1_pid, flow_id, timeo);
return ret;
}
@@ -1927,7 +1929,7 @@ void * irm_sanitize(void * o)
ipcpi = f->n_1_pid;
flow_id = f->flow_id;
pthread_rwlock_unlock(&irmd.flows_lock);
- ipcp_flow_dealloc(ipcpi, flow_id);
+ ipcp_flow_dealloc(ipcpi, flow_id, DEALLOC_TIME);
pthread_rwlock_wrlock(&irmd.flows_lock);
continue;
}
@@ -2190,7 +2192,9 @@ static void * mainloop(void * o)
}
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
- result = flow_dealloc(msg->pid, msg->flow_id);
+ result = flow_dealloc(msg->pid,
+ msg->flow_id,
+ msg->timeo_sec);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
assert(msg->pk.len > 0 ? msg->pk.data != NULL
diff --git a/src/lib/dev.c b/src/lib/dev.c
index df616ead..8d7d7934 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -63,6 +63,7 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
+#define TICTIME 1000000 /* ns */
struct flow_set {
size_t idx;
@@ -255,6 +256,9 @@ static void flow_fini(int fd)
bmp_release(ai.fds, fd);
}
+ if (ai.flows[fd].frcti != NULL)
+ frcti_destroy(ai.flows[fd].frcti);
+
if (ai.flows[fd].rx_rb != NULL) {
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -272,9 +276,6 @@ static void flow_fini(int fd)
shm_flow_set_close(ai.flows[fd].set);
}
- if (ai.flows[fd].frcti != NULL)
- frcti_destroy(ai.flows[fd].frcti);
-
if (ai.flows[fd].ctx != NULL)
crypt_fini(ai.flows[fd].ctx);
@@ -433,8 +434,13 @@ static void init(int argc,
if (ai.fqset == NULL)
goto fail_fqset;
+ if (timerwheel_init() < 0)
+ goto fail_timerwheel;
+
return;
+ fail_timerwheel:
+ shm_flow_set_close(ai.fqset);
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
@@ -491,6 +497,8 @@ static void fini(void)
pthread_cond_destroy(&ai.ports[i].state_cond);
}
+ timerwheel_fini();
+
shm_rdrbuff_close(ai.rdrb);
free(ai.flows);
@@ -747,25 +755,59 @@ int flow_join(const char * dst,
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg;
+ struct flow * f;
+ time_t timeo;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = ai.pid;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_flow_id = true;
+ msg.has_pid = true;
+ msg.pid = ai.pid;
+ msg.has_timeo_sec = true;
+ msg.has_timeo_nsec = true;
+ msg.timeo_nsec = 0;
+
+ f = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (f->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = ai.flows[fd].flow_id;
+ msg.flow_id = f->flow_id;
+
+ timeo = frcti_dealloc(f->frcti);
+ while (timeo < 0) { /* keep the flow active for rtx */
+ ssize_t ret;
+ uint8_t buf[128];
+
+ f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+
+ f->rcv_timesout = true;
+ f->rcv_timeo.tv_sec = -timeo;
+ f->rcv_timeo.tv_nsec = 0;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ ret = flow_read(fd, buf, 128);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ timeo = frcti_dealloc(f->frcti);
+
+ if (ret == -ETIMEDOUT && timeo < 0)
+ timeo = -timeo;
+ }
+
+ msg.timeo_sec = timeo;
+
+ shm_rbuff_fini(ai.flows[fd].tx_rb);
pthread_rwlock_unlock(&ai.lock);
@@ -904,13 +946,21 @@ int fccntl(int fd,
goto einval;
*fflags = flow->oflags;
break;
+ case FRCTSFLAGS:
+ cflags = va_arg(l, uint16_t *);
+ if (cflags == NULL)
+ goto einval;
+ if (flow->frcti == NULL)
+ goto eperm;
+ frcti_setflags(flow->frcti, *cflags);
+ break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
goto einval;
if (flow->frcti == NULL)
goto eperm;
- *cflags = frcti_getconf(flow->frcti);
+ *cflags = frcti_getflags(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1067,6 +1117,8 @@ ssize_t flow_read(int fd,
struct shm_rbuff * rb;
struct shm_du_buff * sdb;
struct timespec abs;
+ struct timespec tic = {0, TICTIME};
+ struct timespec tictime;
struct timespec * abstime = NULL;
struct flow * flow;
bool noblock;
@@ -1096,6 +1148,8 @@ ssize_t flow_read(int fd,
noblock = flow->oflags & FLOWFRNOBLOCK;
partrd = !(flow->oflags & FLOWFRNOPART);
+ ts_add(&tic, &abs, &tictime);
+
if (ai.flows[fd].rcv_timesout) {
ts_add(&abs, &flow->rcv_timeo, &abs);
abstime = &abs;
@@ -1108,9 +1162,21 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
idx = noblock ? shm_rbuff_read(rb) :
- shm_rbuff_read_b(rb, abstime);
- if (idx < 0)
- return idx;
+ shm_rbuff_read_b(rb, &tictime);
+ if (idx < 0) {
+ frcti_tick(flow->frcti);
+
+ if (idx != -ETIMEDOUT)
+ return idx;
+
+ if (abstime != NULL
+ && ts_diff_ns(&tictime, &abs) < 0)
+ return -ETIMEDOUT;
+
+ ts_add(&tictime, &tic, &tictime);
+ pthread_rwlock_rdlock(&ai.lock);
+ continue;
+ }
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
@@ -1339,7 +1405,9 @@ ssize_t fevent(struct flow_set * set,
const struct timespec * timeo)
{
ssize_t ret = 0;
- struct timespec abstime;
+ struct timespec tic = {0, TICTIME};
+ struct timespec tictime;
+ struct timespec abs;
struct timespec * t = NULL;
if (set == NULL || fq == NULL)
@@ -1348,17 +1416,25 @@ ssize_t fevent(struct flow_set * set,
if (fq->fqsize > 0 && fq->next != fq->fqsize)
return fq->fqsize;
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- t = &abstime;
- }
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ ts_add(&tic, &abs, &tictime);
+ t = &tictime;
+
+ if (timeo != NULL)
+ ts_add(&abs, timeo, &abs);
while (ret == 0) {
ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
+ if (timeo != NULL && ts_diff_ns(t, &abs) < 0) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
+ ret = 0;
+ ts_add(t, &tic, t);
+ timerwheel_move();
+ continue;
}
fq->fqsize = ret << 1;
@@ -1382,10 +1458,19 @@ int np1_flow_alloc(pid_t n_pid,
return flow_init(flow_id, n_pid, qs, NULL);
}
-int np1_flow_dealloc(int flow_id)
+int np1_flow_dealloc(int flow_id,
+ time_t timeo)
{
int fd;
+ /*
+ * TODO: Don't pass timeo to the IPCP but wait in IRMd.
+ * This will need async ops, waiting until we bootstrap
+ * the IRMd over ouroboros.
+ */
+
+ sleep(timeo);
+
pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[flow_id].fd;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2bd126f4..c26910fa 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,7 +21,7 @@
*/
/* Default Delta-t parameters */
-#define DELT_MPL (60 * BILLION) /* ns */
+#define DELT_MPL (5 * BILLION) /* ns */
#define DELT_A (1 * BILLION) /* ns */
#define DELT_R (20 * BILLION) /* ns */
@@ -59,8 +59,6 @@ struct frcti {
struct frct_cr snd_cr;
struct frct_cr rcv_cr;
- struct rxmwheel * rw;
-
ssize_t rq[RQ_SIZE];
pthread_rwlock_t lock;
};
@@ -86,7 +84,84 @@ struct frct_pci {
uint32_t ackno;
} __attribute__((packed));
-#include <rxmwheel.c>
+static bool before(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+ uint32_t seq2)
+{
+ return (int32_t)(seq2 - seq1) < 0;
+}
+
+static void __send_ack(int fd,
+ int ackno)
+{
+ struct shm_du_buff * sdb;
+ struct frct_pci * pci;
+ ssize_t idx;
+ struct flow * f;
+
+ /* Raw calls needed to bypass frcti. */
+ idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+ if (idx < 0)
+ return;
+
+ pci = (struct frct_pci *) shm_du_buff_head(sdb);
+ memset(pci, 0, sizeof(*pci));
+
+ pci->flags = FRCT_ACK;
+ pci->ackno = hton32(ackno);
+
+ f = &ai.flows[fd];
+
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
+
+ shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+}
+
+static void frct_send_ack(struct frcti * frcti)
+{
+ struct timespec now;
+ time_t diff;
+ uint32_t ackno;
+ int fd;
+
+ assert(frcti);
+
+ pthread_rwlock_rdlock(&frcti->lock);
+
+ if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+ pthread_rwlock_unlock(&frcti->lock);
+ return;
+ }
+
+ ackno = frcti->rcv_cr.lwe;
+ fd = frcti->fd;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+ pthread_rwlock_unlock(&frcti->lock);
+
+ if (diff > frcti->a || diff < DELT_ACK)
+ return;
+
+ __send_ack(fd, ackno);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+ frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
static struct frcti * frcti_create(int fd)
{
@@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd)
frcti->srtt = 0; /* Updated on first ACK */
frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
frcti->rto = 20 * MILLION; /* Initial rxm will be after 20 ms */
- frcti->rw = NULL;
if (ai.flows[fd].qs.loss == 0) {
- frcti->snd_cr.cflags |= FRCTFRTX;
+ frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
- frcti->rw = rxmwheel_create();
- if (frcti->rw == NULL)
- goto fail_rw;
}
frcti->snd_cr.inact = (3 * mpl + a + r) / BILLION + 1; /* s */
@@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd)
return frcti;
- fail_rw:
- pthread_rwlock_destroy(&frcti->lock);
fail_lock:
free(frcti);
fail_malloc:
@@ -151,24 +220,16 @@ static struct frcti * frcti_create(int fd)
static void frcti_destroy(struct frcti * frcti)
{
- /*
- * FIXME: In case of reliable transmission we should
- * make sure everything we sent is acked.
- */
-
- if (frcti->rw != NULL)
- rxmwheel_destroy(frcti->rw);
-
pthread_rwlock_destroy(&frcti->lock);
free(frcti);
}
-static uint16_t frcti_getconf(struct frcti * frcti)
+static uint16_t frcti_getflags(struct frcti * frcti)
{
uint16_t ret;
- assert (frcti);
+ assert(frcti);
pthread_rwlock_rdlock(&frcti->lock);
@@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti)
return ret;
}
+static void frcti_setflags(struct frcti * frcti,
+ uint16_t flags)
+{
+ flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */
+
+ assert(frcti);
+
+ pthread_rwlock_wrlock(&frcti->lock);
+
+ frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags */
+
+ frcti->snd_cr.cflags &= flags;
+
+ pthread_rwlock_unlock(&frcti->lock);
+}
+
#define frcti_queued_pdu(frcti) \
(frcti == NULL ? idx : __frcti_queued_pdu(frcti))
@@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti)
(frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
#define frcti_tick(frcti) \
- (frcti == NULL ? 0 : __frcti_tick(frcti))
+ (frcti == NULL ? 0 : __frcti_tick())
+#define frcti_dealloc(frcti) \
+ (frcti == NULL ? 0 : __frcti_dealloc(frcti))
static ssize_t __frcti_queued_pdu(struct frcti * frcti)
{
@@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti)
return idx;
}
-static bool before(uint32_t seq1,
- uint32_t seq2)
-{
- return (int32_t)(seq1 - seq2) < 0;
-}
-
-static bool after(uint32_t seq1,
- uint32_t seq2)
-{
- return (int32_t)(seq2 - seq1) < 0;
-}
+#include <timerwheel.c>
-static void frct_send_ack(struct frcti * frcti)
+/*
+ * Send a final ACK for everything that has not been ACK'd.
+ * If the flow should be kept active for retransmission,
+ * the returned time will be negative.
+ */
+static time_t __frcti_dealloc(struct frcti * frcti)
{
- struct shm_du_buff * sdb;
- struct frct_pci * pci;
- ssize_t idx;
- struct timespec now;
- time_t diff;
- uint32_t ackno;
- struct flow * f;
+ struct timespec now;
+ time_t wait;
+ int ackno;
+ int fd = -1;
- assert(frcti);
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
pthread_rwlock_rdlock(&frcti->lock);
- if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
- pthread_rwlock_unlock(&frcti->lock);
- return;
- }
-
ackno = frcti->rcv_cr.lwe;
+ if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
+ fd = frcti->fd;
- pthread_rwlock_unlock(&frcti->lock);
+ wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
+ frcti->snd_cr.inact - now.tv_sec + frcti->snd_cr.act.tv_sec);
- clock_gettime(PTHREAD_COND_CLOCK, &now);
+ if (frcti->snd_cr.cflags & FRCTFLINGER
+ && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
+ wait = -wait;
- diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
-
- if (diff > frcti->a)
- return;
-
- if (diff < DELT_ACK)
- return;
-
- /* Raw calls needed to bypass frcti. */
- idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
- if (idx < 0)
- return;
-
- pci = (struct frct_pci *) shm_du_buff_head(sdb);
- memset(pci, 0, sizeof(*pci));
-
- pci->flags = FRCT_ACK;
- pci->ackno = hton32(ackno);
-
- f = &ai.flows[frcti->fd];
-
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- pthread_rwlock_rdlock(&ai.lock);
- ipcp_sdb_release(sdb);
- return;
- }
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- pthread_rwlock_wrlock(&frcti->lock);
+ pthread_rwlock_unlock(&frcti->lock);
- if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
- frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+ if (fd != -1)
+ __send_ack(fd, ackno);
- pthread_rwlock_unlock(&frcti->lock);
+ return wait;
}
static int __frcti_snd(struct frcti * frcti,
@@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti * frcti,
struct frct_cr * snd_cr;
struct frct_cr * rcv_cr;
uint32_t seqno;
+ bool rtx;
assert(frcti);
snd_cr = &frcti->snd_cr;
rcv_cr = &frcti->rcv_cr;
- if (frcti->rw != NULL)
- rxmwheel_move(frcti->rw);
+ timerwheel_move();
pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
if (pci == NULL)
@@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_wrlock(&frcti->lock);
+ rtx = snd_cr->cflags & FRCTFRTX;
+
pci->flags |= FRCT_DATA;
/* Set DRF if there are no unacknowledged packets. */
@@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti * frcti,
seqno = snd_cr->seqno;
pci->seqno = hton32(seqno);
- if (!(snd_cr->cflags & FRCTFRTX)) {
+ if (!rtx) {
snd_cr->lwe++;
} else {
if (!frcti->probe) {
@@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
- if (frcti->rw != NULL)
- rxmwheel_add(frcti->rw, frcti, seqno, sdb);
+ if (rtx)
+ timerwheel_rxm(frcti, seqno, sdb);
return 0;
}
@@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti,
if (srtt == 0) { /* first measurement */
srtt = mrtt;
rttvar = mrtt >> 1;
-
} else {
time_t delta = mrtt - srtt;
srtt += (delta >> 3);
- rttvar -= rttvar >> 2;
- rttvar += ABS(delta) >> 2;
+ rttvar += (ABS(delta) - rttvar) >> 2;
}
frcti->srtt = MAX(1000U, srtt);
@@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti,
frcti->srtt + (frcti->mdev << 1));
}
-static void __frcti_tick(struct frcti * frcti)
+static void __frcti_tick(void)
{
- if (frcti->rw != NULL) {
- rxmwheel_move(frcti->rw);
- frct_send_ack(frcti);
- }
+ timerwheel_move();
}
/* Always queues the next application packet on the RQ. */
@@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti * frcti,
struct frct_cr * rcv_cr;
uint32_t seqno;
uint32_t ackno;
+ int fd = -1;
assert(frcti);
@@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti * frcti,
if (!(pci->flags & FRCT_DATA))
goto drop_packet;
- if (before(seqno, rcv_cr->lwe))
+ if (before(seqno, rcv_cr->lwe)) {
+ rcv_cr->seqno = seqno;
goto drop_packet;
+ }
if (rcv_cr->cflags & FRCTFRTX) {
if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
@@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti * frcti,
if (frcti->rq[pos] != -1)
goto drop_packet; /* Duplicate in rq. */
+
+ fd = frcti->fd;
} else {
rcv_cr->lwe = seqno;
}
@@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti * frcti,
pthread_rwlock_unlock(&frcti->lock);
+ if (fd != -1)
+ timerwheel_ack(fd, frcti);
+
return;
drop_packet:
pthread_rwlock_unlock(&frcti->lock);
+
+ frct_send_ack(frcti);
+
shm_rdrbuff_remove(ai.rdrb, idx);
return;
}
@@ -492,7 +542,7 @@ int frcti_filter(struct fqueue * fq)
struct frcti * frcti;
struct shm_rbuff * rb;
- while(fq->next < fq->fqsize) {
+ while (fq->next < fq->fqsize) {
if (fq->fqueue[fq->next + 1] != FLOW_PKT)
return 1;
diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto
index b0efe9ab..809117b8 100644
--- a/src/lib/ipcpd_messages.proto
+++ b/src/lib/ipcpd_messages.proto
@@ -52,5 +52,6 @@ message ipcp_msg {
optional layer_info_msg layer_info = 9;
optional int32 response = 10;
optional string comp = 11;
- optional int32 result = 12;
+ optional uint32 timeo_sec = 12;
+ optional int32 result = 13;
};
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
deleted file mode 100644
index 0572c7b7..00000000
--- a/src/lib/rxmwheel.c
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2020
- *
- * Timerwheel
- *
- * Dimitri Staessens <[email protected]>
- * Sander Vrijders <[email protected]>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include <ouroboros/list.h>
-
-#define RXMQ_S 14 /* defines #slots */
-#define RXMQ_M 34 /* defines max delay (ns) */
-#define RXMQ_R (RXMQ_M - RXMQ_S) /* defines resolution (ns) */
-#define RXMQ_SLOTS (1 << RXMQ_S)
-#define RXMQ_MAX (1 << RXMQ_M) /* us */
-
-/* Overflow limits range to about 6 hours. */
-#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
-#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
-
-struct rxm {
- struct list_head next;
- uint32_t seqno;
- struct shm_du_buff * sdb;
- uint8_t * head;
- uint8_t * tail;
- time_t t0; /* Time when original was sent (us). */
- size_t mul; /* RTO multiplier. */
- struct frcti * frcti;
-};
-
-struct rxmwheel {
- struct list_head wheel[RXMQ_SLOTS];
-
- size_t prv; /* Last processed slot. */
- pthread_mutex_t lock;
-};
-
-static void rxmwheel_destroy(struct rxmwheel * rw)
-{
- size_t i;
- struct list_head * p;
- struct list_head * h;
-
- pthread_mutex_destroy(&rw->lock);
-
- for (i = 0; i < RXMQ_SLOTS; ++i) {
- list_for_each_safe(p, h, &rw->wheel[i]) {
- struct rxm * rxm = list_entry(p, struct rxm, next);
- list_del(&rxm->next);
- shm_du_buff_ack(rxm->sdb);
- ipcp_sdb_release(rxm->sdb);
- free(rxm);
- }
- }
-}
-
-static struct rxmwheel * rxmwheel_create(void)
-{
- struct rxmwheel * rw;
- struct timespec now;
- size_t i;
-
- rw = malloc(sizeof(*rw));
- if (rw == NULL)
- return NULL;
-
- if (pthread_mutex_init(&rw->lock, NULL)) {
- free(rw);
- return NULL;
- }
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- /* Mark the previous timeslot as the last one processed. */
- rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
-
- for (i = 0; i < RXMQ_SLOTS; ++i)
- list_head_init(&rw->wheel[i]);
-
- return rw;
-}
-
-static void rxmwheel_move(struct rxmwheel * rw)
-{
- struct timespec now;
- struct list_head * p;
- struct list_head * h;
- size_t slot;
- size_t i;
-
- pthread_mutex_lock(&rw->lock);
-
- pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
- (void *) &rw->lock);
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- slot = ts_to_slot(now);
-
- i = rw->prv;
-
- if (slot < i)
- slot += RXMQ_SLOTS;
-
- while (i++ < slot) {
- list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
- struct rxm * r;
- struct frct_cr * snd_cr;
- struct frct_cr * rcv_cr;
- size_t rslot;
- ssize_t idx;
- struct shm_du_buff * sdb;
- uint8_t * head;
- struct flow * f;
- int fd;
- uint32_t snd_lwe;
- uint32_t rcv_lwe;
- time_t rto;
-
- r = list_entry(p, struct rxm, next);
-
- list_del(&r->next);
-
- snd_cr = &r->frcti->snd_cr;
- rcv_cr = &r->frcti->rcv_cr;
- fd = r->frcti->fd;
- f = &ai.flows[fd];
-
- shm_du_buff_ack(r->sdb);
-
- pthread_rwlock_wrlock(&r->frcti->lock);
-
- snd_lwe = snd_cr->lwe;
- rcv_lwe = rcv_cr->lwe;
- rto = r->frcti->rto;
- /* Assume last RTX is the one that's ACK'd. */
- if (r->frcti->probe
- && (r->frcti->rttseq + 1) == r->seqno)
- r->frcti->t_probe = now;
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- /* Has been ack'd, remove. */
- if ((int) (r->seqno - snd_lwe) < 0) {
- ipcp_sdb_release(r->sdb);
- free(r);
- continue;
- }
-
- /* Check for r-timer expiry. */
- if (ts_to_ns(now) - r->t0 > r->frcti->r) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- /* Copy the payload, safe rtx in other layers. */
- if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
- ipcp_sdb_release(r->sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- idx = shm_du_buff_get_idx(sdb);
-
- head = shm_du_buff_head(sdb);
- memcpy(head, r->head, r->tail - r->head);
-
- ipcp_sdb_release(r->sdb);
-
- ((struct frct_pci *) head)->ackno = hton32(rcv_lwe);
-
- /* Retransmit the copy. */
- if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
- ipcp_sdb_release(sdb);
- free(r);
- shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
- shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
- continue;
- }
-
- /* Reschedule. */
- shm_du_buff_wait_ack(sdb);
-
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
- r->head = head;
- r->tail = shm_du_buff_tail(sdb);
- r->sdb = sdb;
-
- /* Schedule at least in the next time slot */
- rslot = (slot + MAX((rto >> RXMQ_R), 1))
- & (RXMQ_SLOTS - 1);
-
- list_add_tail(&r->next, &rw->wheel[rslot]);
- }
- }
-
- rw->prv = slot & (RXMQ_SLOTS - 1);
-
- pthread_cleanup_pop(true);
-}
-
-static int rxmwheel_add(struct rxmwheel * rw,
- struct frcti * frcti,
- uint32_t seqno,
- struct shm_du_buff * sdb)
-{
- struct timespec now;
- struct rxm * r;
- size_t slot;
-
- r = malloc(sizeof(*r));
- if (r == NULL)
- return -ENOMEM;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
-
- r->t0 = ts_to_ns(now);
- r->mul = 0;
- r->seqno = seqno;
- r->sdb = sdb;
- r->head = shm_du_buff_head(sdb);
- r->tail = shm_du_buff_tail(sdb);
- r->frcti = frcti;
-
- pthread_rwlock_rdlock(&r->frcti->lock);
-
- slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
-
- pthread_rwlock_unlock(&r->frcti->lock);
-
- pthread_mutex_lock(&rw->lock);
-
- list_add_tail(&r->next, &rw->wheel[slot]);
-
- shm_du_buff_wait_ack(sdb);
-
- pthread_mutex_unlock(&rw->lock);
-
- return 0;
-}
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
new file mode 100644
index 00000000..33fcbc1c
--- /dev/null
+++ b/src/lib/timerwheel.c
@@ -0,0 +1,409 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Timerwheel
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/list.h>
+
+#define RXMQ_SLOTS (1 << 8) /* #slots / level. */
+#define RXMQ_LVLS 3 /* #levels, bump for DTN. */
+#define RXMQ_BUMP 4 /* factor to bump lvl. */
+#define RXMQ_RES 20 /* res (ns) of lowest lvl. */
+
+#define ACKQ_SLOTS (1 << 7) /* #slots for delayed ACK. */
+#define ACKQ_RES 20 /* resolution for dACK. */
+
+/* Overflow limits range to about 6 hours. */
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
+#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES)
+
+struct rxm {
+ struct list_head next;
+ uint32_t seqno;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ uint8_t * tail;
+ time_t t0; /* Time when original was sent (us). */
+ size_t mul; /* RTO multiplier. */
+ struct frcti * frcti;
+ int fd;
+ int flow_id; /* Prevent rtx when fd reused. */
+};
+
+struct ack {
+ struct list_head next;
+ struct frcti * frcti;
+ int fd;
+ int flow_id;
+};
+
+struct {
+ /*
+ * At a 1 ms min resolution, every level bumps the
+ * resolution by a factor of 16.
+ */
+ struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS];
+
+ struct list_head acks[ACKQ_SLOTS];
+ bool map[ACKQ_SLOTS][PROG_MAX_FLOWS];
+
+ size_t prv_rxm; /* Last processed rxm slot at lvl 0. */
+ size_t prv_ack; /* Last processed ack slot. */
+ pthread_mutex_t lock;
+} rw;
+
+static void timerwheel_fini(void)
+{
+ size_t i;
+ size_t j;
+ struct list_head * p;
+ struct list_head * h;
+
+ pthread_mutex_lock(&rw.lock);
+
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ for (j = 0; j < RXMQ_SLOTS; j++) {
+ list_for_each_safe(p, h, &rw.rxms[i][j]) {
+ struct rxm * rxm;
+ rxm = list_entry(p, struct rxm, next);
+ list_del(&rxm->next);
+ shm_du_buff_ack(rxm->sdb);
+ ipcp_sdb_release(rxm->sdb);
+ free(rxm);
+ }
+ }
+ }
+
+ for (i = 0; i < ACKQ_SLOTS; ++i) {
+ list_for_each_safe(p, h, &rw.acks[i]) {
+ struct ack * a = list_entry(p, struct ack, next);
+ list_del(&a->next);
+ free(a);
+ }
+ }
+
+ pthread_mutex_unlock(&rw.lock);
+
+ pthread_mutex_destroy(&rw.lock);
+}
+
+static int timerwheel_init(void)
+{
+ struct timespec now;
+ size_t i;
+ size_t j;
+
+ if (pthread_mutex_init(&rw.lock, NULL))
+ return -1;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1);
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ for (j = 0; j < RXMQ_SLOTS; ++j)
+ list_head_init(&rw.rxms[i][j]);
+ }
+
+ rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1);
+ for (i = 0; i < ACKQ_SLOTS; ++i)
+ list_head_init(&rw.acks[i]);
+
+ return 0;
+}
+
+static void timerwheel_move(void)
+{
+ struct timespec now;
+ struct list_head * p;
+ struct list_head * h;
+ size_t rxm_slot;
+ size_t ack_slot;
+ size_t i;
+ size_t j;
+
+ pthread_mutex_lock(&rw.lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
+ (void *) &rw.lock);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ rxm_slot = ts_to_ns(now) >> RXMQ_RES;
+ j = rw.prv_rxm;
+ rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1);
+
+ for (i = 0; i < RXMQ_LVLS; ++i) {
+ size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+ if (j_max_slot < j)
+ j_max_slot += RXMQ_SLOTS;
+
+ while (j++ < j_max_slot) {
+ list_for_each_safe(p,
+ h,
+ &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
+ struct rxm * r;
+ struct frct_cr * snd_cr;
+ struct frct_cr * rcv_cr;
+ size_t rslot;
+ ssize_t idx;
+ struct shm_du_buff * sdb;
+ uint8_t * head;
+ struct flow * f;
+ uint32_t snd_lwe;
+ uint32_t rcv_lwe;
+ time_t rto;
+
+ r = list_entry(p, struct rxm, next);
+
+ list_del(&r->next);
+
+ snd_cr = &r->frcti->snd_cr;
+ rcv_cr = &r->frcti->rcv_cr;
+ f = &ai.flows[r->fd];
+
+ shm_du_buff_ack(r->sdb);
+
+ if (f->frcti == NULL
+ || f->flow_id != r->flow_id) {
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&r->frcti->lock);
+
+ snd_lwe = snd_cr->lwe;
+ rcv_lwe = rcv_cr->lwe;
+ rto = r->frcti->rto;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ /* Has been ack'd, remove. */
+ if ((int) (r->seqno - snd_lwe) < 0) {
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ continue;
+ }
+
+ /* Check for r-timer expiry. */
+ if (ts_to_ns(now) - r->t0 > r->frcti->r) {
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ shm_rbuff_set_acl(f->rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb,
+ ACL_FLOWDOWN);
+ continue;
+ }
+
+ if (r->frcti->probe
+ && (r->frcti->rttseq + 1) == r->seqno)
+ r->frcti->probe = false;
+
+ /* Copy the data, safe rtx in other layers. */
+ if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
+ ipcp_sdb_release(r->sdb);
+ free(r);
+ shm_rbuff_set_acl(f->rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb,
+ ACL_FLOWDOWN);
+ continue;
+ }
+
+ idx = shm_du_buff_get_idx(sdb);
+
+ head = shm_du_buff_head(sdb);
+ memcpy(head, r->head, r->tail - r->head);
+
+ ipcp_sdb_release(r->sdb);
+
+ ((struct frct_pci *) head)->ackno =
+ hton32(rcv_lwe);
+
+ /* Retransmit the copy. */
+ if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+ ipcp_sdb_release(sdb);
+ free(r);
+ shm_rbuff_set_acl(f->rx_rb,
+ ACL_FLOWDOWN);
+ shm_rbuff_set_acl(f->tx_rb,
+ ACL_FLOWDOWN);
+ continue;
+ }
+
+ /* Reschedule. */
+ shm_du_buff_wait_ack(sdb);
+
+ shm_flow_set_notify(f->set,
+ f->flow_id,
+ FLOW_PKT);
+
+ r->head = head;
+ r->tail = shm_du_buff_tail(sdb);
+ r->sdb = sdb;
+ r->mul++;
+
+ /* Schedule at least in the next time slot. */
+ rslot = (rxm_slot
+ + MAX(((rto * r->mul) >> RXMQ_RES), 1))
+ & (RXMQ_SLOTS - 1);
+
+ list_add_tail(&r->next, &rw.rxms[i][rslot]);
+ }
+ }
+ /* Move up a level in the wheel. */
+ rxm_slot >>= RXMQ_BUMP;
+ }
+
+ ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
+
+ j = rw.prv_ack;
+
+ if (ack_slot < j)
+ ack_slot += ACKQ_SLOTS;
+
+ while (j++ < ack_slot) {
+ list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) {
+ struct ack * a;
+ struct flow * f;
+
+ a = list_entry(p, struct ack, next);
+
+ list_del(&a->next);
+
+ f = &ai.flows[a->fd];
+
+ rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
+
+ if (f->flow_id == a->flow_id && f->frcti != NULL)
+ frct_send_ack(a->frcti);
+
+ free(a);
+
+ }
+ }
+
+ rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1);
+
+ pthread_cleanup_pop(true);
+}
+
+static int timerwheel_rxm(struct frcti * frcti,
+ uint32_t seqno,
+ struct shm_du_buff * sdb)
+{
+ struct timespec now;
+ struct rxm * r;
+ size_t slot;
+ size_t lvl = 0;
+ time_t rto_slot;
+
+ r = malloc(sizeof(*r));
+ if (r == NULL)
+ return -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ r->t0 = ts_to_ns(now);
+ r->mul = 0;
+ r->seqno = seqno;
+ r->sdb = sdb;
+ r->head = shm_du_buff_head(sdb);
+ r->tail = shm_du_buff_tail(sdb);
+ r->frcti = frcti;
+
+ pthread_rwlock_rdlock(&r->frcti->lock);
+
+ rto_slot = frcti->rto >> RXMQ_RES;
+ slot = r->t0 >> RXMQ_RES;
+
+ r->fd = frcti->fd;
+ r->flow_id = ai.flows[r->fd].flow_id;
+
+ pthread_rwlock_unlock(&r->frcti->lock);
+
+ while (rto_slot >= RXMQ_SLOTS) {
+ ++lvl;
+ rto_slot >>= RXMQ_BUMP;
+ slot >>= RXMQ_BUMP;
+ }
+
+ if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
+ free(r);
+ return -EPERM;
+ }
+
+ slot = (slot + rto_slot) & (RXMQ_SLOTS - 1);
+
+ pthread_mutex_lock(&rw.lock);
+
+ list_add_tail(&r->next, &rw.rxms[lvl][slot]);
+
+ shm_du_buff_wait_ack(sdb);
+
+ pthread_mutex_unlock(&rw.lock);
+
+ return 0;
+}
+
+static int timerwheel_ack(int fd,
+ struct frcti * frcti)
+{
+ struct timespec now;
+ struct ack * a;
+ size_t slot;
+
+ a = malloc(sizeof(*a));
+ if (a == NULL)
+ return -ENOMEM;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ slot = DELT_ACK >> ACKQ_RES;
+ if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */
+ free(a);
+ return -EPERM;
+ }
+
+ slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1)
+ & (ACKQ_SLOTS - 1);
+
+ a->fd = fd;
+ a->frcti = frcti;
+ a->flow_id = ai.flows[fd].flow_id;
+
+ pthread_mutex_lock(&rw.lock);
+
+ if (rw.map[slot][fd]) {
+ pthread_mutex_unlock(&rw.lock);
+ free(a);
+ return 0;
+ }
+
+ rw.map[slot][fd] = true;
+
+ list_add_tail(&a->next, &rw.acks[slot]);
+
+ pthread_mutex_unlock(&rw.lock);
+
+ return 0;
+}