summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2022-02-25 17:34:29 +0100
committerSander Vrijders <[email protected]>2022-03-03 12:00:54 +0100
commitf5d642a06f9c1a58197313b32f6b213a152e446f (patch)
tree19ec9813f2d83fae986ff2bddbf5511c5b7662da /src
parentdb5e9bf4f884097ec919aa40b02d8eafab05cfa8 (diff)
downloadouroboros-f5d642a06f9c1a58197313b32f6b213a152e446f.tar.gz
ouroboros-f5d642a06f9c1a58197313b32f6b213a152e446f.zip
lib: Make flow liveness timeout configurable
The qosspec_t now has a timeout value that sets the timeout value of the flow. Flows with a peer that has timed out will now return -EFLOWPEER on flow_read() or flow_write(). Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/eth/eth.c3
-rw-r--r--src/ipcpd/udp/main.c4
-rw-r--r--src/ipcpd/unicast/fa.c7
-rw-r--r--src/irmd/main.c2
-rw-r--r--src/lib/dev.c66
-rw-r--r--src/lib/qosspec.proto17
-rw-r--r--src/lib/sockets.c2
7 files changed, 70 insertions, 31 deletions
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index e22dd7bc..8b34d303 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -164,6 +164,7 @@ struct mgmt_msg {
uint32_t ber;
uint32_t max_gap;
uint32_t delay;
+ uint32_t timeout;
uint16_t cypher_s;
uint8_t in_order;
#if defined (BUILD_ETH_DIX)
@@ -492,6 +493,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, hash, ipcp_dir_hash_len());
memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen);
@@ -753,6 +755,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
if (shim_data_reg_has(eth_data.shim_data,
buf + sizeof(*msg))) {
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index b9f97e74..5c57e6b8 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -98,7 +98,9 @@ struct mgmt_msg {
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
+ uint32_t timeout;
uint16_t cypher_s;
+
} __attribute__((packed));
struct mgmt_frame {
@@ -217,6 +219,7 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * r_saddr,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(buf + len, data, dlen);
@@ -375,6 +378,7 @@ static int ipcp_udp_mgmt_frame(const uint8_t * buf,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid),
(uint8_t *) (msg + 1), qs,
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index dcc79031..d59b9760 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -72,14 +72,15 @@ struct fa_msg {
int8_t response;
uint16_t ece;
/* QoS parameters from spec, aligned */
- uint8_t availability;
- uint8_t in_order;
uint32_t delay;
uint64_t bandwidth;
uint32_t loss;
uint32_t ber;
uint32_t max_gap;
+ uint32_t timeout;
uint16_t cypher_s;
+ uint8_t availability;
+ uint8_t in_order;
} __attribute__((packed));
struct cmd {
@@ -569,6 +570,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.in_order = msg->in_order;
qs.max_gap = ntoh32(msg->max_gap);
qs.cypher_s = ntoh16(msg->cypher_s);
+ qs.timeout = ntoh32(msg->timeout);
fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
if (fd < 0)
@@ -840,6 +842,7 @@ int fa_alloc(int fd,
msg->in_order = qs.in_order;
msg->max_gap = hton32(qs.max_gap);
msg->cypher_s = hton16(qs.cypher_s);
+ msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index f83e8e1e..a3acc78a 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -648,7 +648,7 @@ static int connect_ipcp(pid_t pid,
log_dbg("Connecting %s to %s.", component, dst);
if (ipcp_connect(pid, dst, component, qs)) {
- log_err("Could not connect IPCP.");
+ log_err("Could not connect IPCP %d to %s.", pid, dst);
return -EPERM;
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4c21fcdf..5c57a538 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,7 +68,6 @@
#define SECMEMSZ 16384
#define SYMMKEYSZ 32
#define MSGBUFSZ 2048
-#define FLOWTIMEO 120 /* seconds */
enum port_state {
PORT_NULL = 0,
@@ -123,7 +122,8 @@ struct flow_set_entry {
struct flow_set {
size_t idx;
- struct timespec chk; /* Last keepalive check */
+ struct timespec chk; /* Last keepalive check. */
+ uint32_t min; /* Minimum keepalive time in set. */
struct list_head flows;
pthread_rwlock_t lock;
@@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd)
struct timespec r_act;
struct flow * flow;
int flow_id;
+ uint32_t timeo;
flow = &ai.flows[fd];
@@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd)
r_act = flow->rcv_act;
flow_id = flow->flow_id;
+ timeo = flow->qs.timeout;
pthread_rwlock_unlock(&ai.lock);
- if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
- shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
- return -EFLOWDOWN;
+ if (timeo == 0)
+ return 0;
+
+ if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+ shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+ return -EFLOWPEER;
}
- if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+ if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION)
flow_send_keepalive(fd);
return 0;
@@ -1140,7 +1145,7 @@ ssize_t flow_write(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd))
- return -EFLOWDOWN;
+ return -EFLOWPEER;
frcti_tick(flow->frcti);
@@ -1165,14 +1170,16 @@ ssize_t flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
- goto enomem;
+ if (count > 0) {
+ if (frcti_snd(flow->frcti, sdb) < 0)
+ goto enomem;
- if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
- goto enomem;
+ if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
+ goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
- goto enomem;
+ if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ goto enomem;
+ }
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
@@ -1263,7 +1270,7 @@ ssize_t flow_read(int fd,
return -ETIMEDOUT;
if (flow_keepalive(fd) < 0)
- return -EFLOWDOWN;
+ return -EFLOWPEER;
ts_add(&tictime, &tic, &tictime);
@@ -1360,6 +1367,7 @@ struct flow_set * fset_create()
goto fail_bmp_alloc;
set->chk = now;
+ set->min = UINT32_MAX;
pthread_rwlock_unlock(&ai.lock);
@@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
+ struct flow * flow;
struct flow_set_entry * fse;
int ret;
size_t packets;
@@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set,
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return -EINVAL;
+ flow = &ai.flows[fd];
+
fse = malloc(sizeof(*fse));
if (fse == NULL)
return -ENOMEM;
- pthread_rwlock_wrlock(&ai.lock);
+ fse->fd = fd;
- if (ai.flows[fd].flow_id < 0) {
+ pthread_rwlock_rdlock(&ai.lock);
+
+ if (flow->flow_id < 0) {
ret = -EINVAL;
goto fail;
}
@@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set,
pthread_rwlock_wrlock(&set->lock);
+ if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
+ set->min = flow->qs.timeout;
+
list_add_tail(&fse->next, &set->flows);
pthread_rwlock_unlock(&set->lock);
@@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set,
{
struct list_head * p;
struct list_head * h;
+ struct flow * flow;
+ uint32_t min;
if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
return;
+ flow = &ai.flows[fd];
+
+ min = UINT32_MAX;
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ if (flow->flow_id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
pthread_rwlock_wrlock(&set->lock);
@@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set,
if (e->fd == fd) {
list_del(&e->next);
free(e);
- break;
+ } else {
+ if (flow->qs.timeout != 0 && flow->qs.timeout < min)
+ min = flow->qs.timeout;
}
}
+ set->min = min;
+
pthread_rwlock_unlock(&set->lock);
pthread_rwlock_unlock(&ai.lock);
@@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set)
pthread_rwlock_wrlock(&set->lock);
- if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+ if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
pthread_rwlock_unlock(&set->lock);
return;
}
diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto
index 8a355363..3ceedd87 100644
--- a/src/lib/qosspec.proto
+++ b/src/lib/qosspec.proto
@@ -23,12 +23,13 @@
syntax = "proto2";
message qosspec_msg {
- required uint32 delay = 1; /* In ms */
- required uint64 bandwidth = 2; /* In bits/s */
- required uint32 availability = 3; /* Class of 9s */
- required uint32 loss = 4; /* Packet loss */
- required uint32 ber = 5; /* Bit error rate, ppb */
- required uint32 in_order = 6; /* In-order delivery */
- required uint32 max_gap = 7; /* In ms */
- required uint32 cypher_s = 8; /* Crypto strength in bits */
+ required uint32 delay = 1; /* In ms. */
+ required uint64 bandwidth = 2; /* In bits/s. */
+ required uint32 availability = 3; /* Class of 9s. */
+ required uint32 loss = 4; /* Packet loss. */
+ required uint32 ber = 5; /* Bit error rate, ppb. */
+ required uint32 in_order = 6; /* In-order delivery. */
+ required uint32 max_gap = 7; /* In ms. */
+ required uint32 cypher_s = 8; /* Crypto strength in bits. */
+ required uint32 timeout = 9; /* Timeout in ms. */
};
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 8179d2b3..48e95121 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs)
msg.in_order = spec.in_order;
msg.max_gap = spec.max_gap;
msg.cypher_s = spec.cypher_s;
+ msg.timeout = spec.timeout;
return msg;
}
@@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg)
spec.in_order = msg->in_order;
spec.max_gap = msg->max_gap;
spec.cypher_s = msg->cypher_s;
+ spec.timeout = msg->timeout;
return spec;
}