summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2020-09-20 13:04:52 +0200
committerSander Vrijders <[email protected]>2020-09-25 11:52:51 +0200
commit1e3a9e464cbb2f02c057e9f63c1f270ff27530f4 (patch)
tree5193774eea2bb204d062cc47e178a3702d4790a4 /src/lib/dev.c
parent5f468ee5e02a0d63ed8ad7420ee1beda87e524d6 (diff)
downloadouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.tar.gz
ouroboros-1e3a9e464cbb2f02c057e9f63c1f270ff27530f4.zip
lib: Complete retransmission logic
This completes the retransmission (automated repeat-request, ARQ) logic, sending (delayed) ACK messages when needed. On deallocation, flows will ACK try to retransmit any remaining unacknowledged messages (unless the FRCTFLINGER flag is turned off; this is on by default). Applications can safely shut down as soon as everything is ACK'd (i.e. the current Delta-t run is done). The activity timeout is now passed to the IPCP for it to sleep before completing deallocation (and releasing the flow_id). That should be moved to the IRMd in due time. The timerwheel is revised to be multi-level to reduce memory consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16) and each level has RXMQ_SLOTS (1 << 8) slots. The lowest level has a resolution of (1 << RXMQ_RES) (20) ns, which is roughly a millisecond. Currently, 3 levels are defined, so the largest delay we can schedule at each level is: Level 0: 256ms Level 1: 4s Level 2: about a minute. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c133
1 files changed, 109 insertions, 24 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index df616ead..8d7d7934 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -63,6 +63,7 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
+#define TICTIME 1000000 /* ns */
struct flow_set {
size_t idx;
@@ -255,6 +256,9 @@ static void flow_fini(int fd)
bmp_release(ai.fds, fd);
}
+ if (ai.flows[fd].frcti != NULL)
+ frcti_destroy(ai.flows[fd].frcti);
+
if (ai.flows[fd].rx_rb != NULL) {
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -272,9 +276,6 @@ static void flow_fini(int fd)
shm_flow_set_close(ai.flows[fd].set);
}
- if (ai.flows[fd].frcti != NULL)
- frcti_destroy(ai.flows[fd].frcti);
-
if (ai.flows[fd].ctx != NULL)
crypt_fini(ai.flows[fd].ctx);
@@ -433,8 +434,13 @@ static void init(int argc,
if (ai.fqset == NULL)
goto fail_fqset;
+ if (timerwheel_init() < 0)
+ goto fail_timerwheel;
+
return;
+ fail_timerwheel:
+ shm_flow_set_close(ai.fqset);
fail_fqset:
pthread_rwlock_destroy(&ai.lock);
fail_lock:
@@ -491,6 +497,8 @@ static void fini(void)
pthread_cond_destroy(&ai.ports[i].state_cond);
}
+ timerwheel_fini();
+
shm_rdrbuff_close(ai.rdrb);
free(ai.flows);
@@ -747,25 +755,59 @@ int flow_join(const char * dst,
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg;
+ struct flow * f;
+ time_t timeo;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = ai.pid;
+ msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg.has_flow_id = true;
+ msg.has_pid = true;
+ msg.pid = ai.pid;
+ msg.has_timeo_sec = true;
+ msg.has_timeo_nsec = true;
+ msg.timeo_nsec = 0;
+
+ f = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (f->flow_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = ai.flows[fd].flow_id;
+ msg.flow_id = f->flow_id;
+
+ timeo = frcti_dealloc(f->frcti);
+ while (timeo < 0) { /* keep the flow active for rtx */
+ ssize_t ret;
+ uint8_t buf[128];
+
+ f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+
+ f->rcv_timesout = true;
+ f->rcv_timeo.tv_sec = -timeo;
+ f->rcv_timeo.tv_nsec = 0;
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ ret = flow_read(fd, buf, 128);
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ timeo = frcti_dealloc(f->frcti);
+
+ if (ret == -ETIMEDOUT && timeo < 0)
+ timeo = -timeo;
+ }
+
+ msg.timeo_sec = timeo;
+
+ shm_rbuff_fini(ai.flows[fd].tx_rb);
pthread_rwlock_unlock(&ai.lock);
@@ -904,13 +946,21 @@ int fccntl(int fd,
goto einval;
*fflags = flow->oflags;
break;
+ case FRCTSFLAGS:
+ cflags = va_arg(l, uint16_t *);
+ if (cflags == NULL)
+ goto einval;
+ if (flow->frcti == NULL)
+ goto eperm;
+ frcti_setflags(flow->frcti, *cflags);
+ break;
case FRCTGFLAGS:
cflags = (uint16_t *) va_arg(l, int *);
if (cflags == NULL)
goto einval;
if (flow->frcti == NULL)
goto eperm;
- *cflags = frcti_getconf(flow->frcti);
+ *cflags = frcti_getflags(flow->frcti);
break;
default:
pthread_rwlock_unlock(&ai.lock);
@@ -1067,6 +1117,8 @@ ssize_t flow_read(int fd,
struct shm_rbuff * rb;
struct shm_du_buff * sdb;
struct timespec abs;
+ struct timespec tic = {0, TICTIME};
+ struct timespec tictime;
struct timespec * abstime = NULL;
struct flow * flow;
bool noblock;
@@ -1096,6 +1148,8 @@ ssize_t flow_read(int fd,
noblock = flow->oflags & FLOWFRNOBLOCK;
partrd = !(flow->oflags & FLOWFRNOPART);
+ ts_add(&tic, &abs, &tictime);
+
if (ai.flows[fd].rcv_timesout) {
ts_add(&abs, &flow->rcv_timeo, &abs);
abstime = &abs;
@@ -1108,9 +1162,21 @@ ssize_t flow_read(int fd,
pthread_rwlock_unlock(&ai.lock);
idx = noblock ? shm_rbuff_read(rb) :
- shm_rbuff_read_b(rb, abstime);
- if (idx < 0)
- return idx;
+ shm_rbuff_read_b(rb, &tictime);
+ if (idx < 0) {
+ frcti_tick(flow->frcti);
+
+ if (idx != -ETIMEDOUT)
+ return idx;
+
+ if (abstime != NULL
+ && ts_diff_ns(&tictime, &abs) < 0)
+ return -ETIMEDOUT;
+
+ ts_add(&tictime, &tic, &tictime);
+ pthread_rwlock_rdlock(&ai.lock);
+ continue;
+ }
sdb = shm_rdrbuff_get(ai.rdrb, idx);
if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
@@ -1339,7 +1405,9 @@ ssize_t fevent(struct flow_set * set,
const struct timespec * timeo)
{
ssize_t ret = 0;
- struct timespec abstime;
+ struct timespec tic = {0, TICTIME};
+ struct timespec tictime;
+ struct timespec abs;
struct timespec * t = NULL;
if (set == NULL || fq == NULL)
@@ -1348,17 +1416,25 @@ ssize_t fevent(struct flow_set * set,
if (fq->fqsize > 0 && fq->next != fq->fqsize)
return fq->fqsize;
- if (timeo != NULL) {
- clock_gettime(PTHREAD_COND_CLOCK, &abstime);
- ts_add(&abstime, timeo, &abstime);
- t = &abstime;
- }
+ clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+ ts_add(&tic, &abs, &tictime);
+ t = &tictime;
+
+ if (timeo != NULL)
+ ts_add(&abs, timeo, &abs);
while (ret == 0) {
ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
if (ret == -ETIMEDOUT) {
- fq->fqsize = 0;
- return -ETIMEDOUT;
+ if (timeo != NULL && ts_diff_ns(t, &abs) < 0) {
+ fq->fqsize = 0;
+ return -ETIMEDOUT;
+ }
+ ret = 0;
+ ts_add(t, &tic, t);
+ timerwheel_move();
+ continue;
}
fq->fqsize = ret << 1;
@@ -1382,10 +1458,19 @@ int np1_flow_alloc(pid_t n_pid,
return flow_init(flow_id, n_pid, qs, NULL);
}
-int np1_flow_dealloc(int flow_id)
+int np1_flow_dealloc(int flow_id,
+ time_t timeo)
{
int fd;
+ /*
+ * TODO: Don't pass timeo to the IPCP but wait in IRMd.
+ * This will need async ops, waiting until we bootstrap
+ * the IRMd over ouroboros.
+ */
+
+ sleep(timeo);
+
pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[flow_id].fd;