summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2020-12-01 19:19:04 +0100
committerSander Vrijders <[email protected]>2020-12-02 19:21:29 +0100
commit8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1 (patch)
treece90e18277c38e7ee592d441ae4de197081c6476
parentaef6bdb1eadf8779173145710306ea5b6d81b8ec (diff)
downloadouroboros-8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1.tar.gz
ouroboros-8e1c0e62feb4832dca2b53e51ab0e1cb8f48e5b1.zip
ipcpd: Add congestion avoidance policies
This adds congestion avoidance policies to the unicast IPCP. The default policy is a multi-bit explicit congestion avoidance algorithm based on data-center TCP congestion avoidance (DCTCP) to relay information about the maximum queue depth that packets experienced to the receiver. There's also a "nop" policy to disable congestion avoidance for testing and benchmarking purposes. The (initial) API for congestion avoidance policies is: void * (* ctx_create)(void); void (* ctx_destroy)(void * ctx); These calls create / and or destroy a context for congestion control for a specific flow. Thread-safety of the context is the responsability of the flow allocator (operations on the ctx should be performed under a lock). ca_wnd_t (* ctx_update_snd)(void * ctx, size_t len); This is the sender call to update the context, and should be called for every packet that is sent on the flow. The len parameter in this API is the packet length, which allows calculating the bandwidth. It returns an opaque union type that is used for the call to check/wait if the congestion window is open or closed (and allowing to release locks before waiting). bool (* ctx_update_rcv)(void * ctx, size_t len, uint8_t ecn, uint16_t * ece); This is the call to update the flow congestion context on the receiver side. It should be called for every received packet. It gets the ecn value from the packet and its length, and returns the ECE (explicit congestion experienced) value to be sent to the sender in case of congestion. The boolean returned signals whether or not a congestion update needs to be sent. void (* ctx_update_ece)(void * ctx, uint16_t ece); This is the call for the sending side top update the context when it receives an ECE update from the receiver. void (* wnd_wait)(ca_wnd_t wnd); This is a (blocking) call that waits for the congestion window to clear. It should be stateless (to avoid waiting under locks). This may change later on if passing the context is needed for different algorithms. uint8_t (* calc_ecn)(int fd, size_t len); This is the call that intermediate IPCPs(routers) should use to update the ECN field on passing packets. The multi-bit ECN policy bases the value for the ECN field on the depth of the rbuff queue packets will be sent on. I created another call to grab the queue depth as fccntl is write-locking the application. We can further optimize this to avoid most locking on the rbuff. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r--doc/man/ouroboros.813
-rw-r--r--include/ouroboros/ipcp-dev.h42
-rw-r--r--include/ouroboros/ipcp.h32
-rw-r--r--src/ipcpd/ipcp.c4
-rw-r--r--src/ipcpd/unicast/CMakeLists.txt3
-rw-r--r--src/ipcpd/unicast/ca.c99
-rw-r--r--src/ipcpd/unicast/ca.h61
-rw-r--r--src/ipcpd/unicast/dt.c43
-rw-r--r--src/ipcpd/unicast/fa.c222
-rw-r--r--src/ipcpd/unicast/fa.h4
-rw-r--r--src/ipcpd/unicast/main.c10
-rw-r--r--src/ipcpd/unicast/pol-ca-ops.h50
-rw-r--r--src/ipcpd/unicast/pol/ca-mb-ecn.c216
-rw-r--r--src/ipcpd/unicast/pol/ca-mb-ecn.h50
-rw-r--r--src/ipcpd/unicast/pol/ca-nop.c93
-rw-r--r--src/ipcpd/unicast/pol/ca-nop.h50
-rw-r--r--src/ipcpd/unicast/pol/link_state.c6
-rw-r--r--src/lib/dev.c15
-rw-r--r--src/lib/ipcp_config.proto13
-rw-r--r--src/lib/irm.c2
-rw-r--r--src/tools/irm/irm_ipcp_bootstrap.c69
21 files changed, 957 insertions, 140 deletions
diff --git a/doc/man/ouroboros.8 b/doc/man/ouroboros.8
index 5a09df8d..adb652bf 100644
--- a/doc/man/ouroboros.8
+++ b/doc/man/ouroboros.8
@@ -2,7 +2,7 @@
.\" Dimitri Staessens <[email protected]>
.\" Sander Vrijders <[email protected]>
-.TH OUROBOROS 8 2018-03-10 Ouroboros "Ouroboros User Manual"
+.TH OUROBOROS 8 2020-11-29 Ouroboros "Ouroboros User Manual"
.SH NAME
@@ -258,6 +258,17 @@ ecmp: equal-cost multipath routing.
.br
default: link_state.
.PP
+[congestion \fIpolicy\fR] specifies the congestion avoidance policy.
+.br
+\fIpolicy\fR:
+.RS 4
+none: no congestion avoidance.
+.br
+mb-ecn: Multi-bit explicit congestion notification and avoidance.
+.RE
+.br
+default: mb-ecn.
+.PP
[hash \fIpolicy\fR] specifies the hash function used for the directory.
.br
\fIpolicy\fR: SHA3_224, SHA3_256, SHA3_384, SHA3_512.
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index d60e0b45..b57cec0d 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -26,33 +26,35 @@
#ifndef OUROBOROS_IPCP_DEV_H
#define OUROBOROS_IPCP_DEV_H
-int ipcp_create_r(int result);
+int ipcp_create_r(int result);
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
- qosspec_t qs,
- const void * data,
- size_t dlen);
+int ipcp_flow_req_arr(const uint8_t * dst,
+ size_t len,
+ qosspec_t qs,
+ const void * data,
+ size_t dlen);
-int ipcp_flow_alloc_reply(int fd,
- int response,
- const void * data,
- size_t len);
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ const void * data,
+ size_t len);
-int ipcp_flow_read(int fd,
- struct shm_du_buff ** sdb);
+int ipcp_flow_read(int fd,
+ struct shm_du_buff ** sdb);
-int ipcp_flow_write(int fd,
- struct shm_du_buff * sdb);
+int ipcp_flow_write(int fd,
+ struct shm_du_buff * sdb);
-int ipcp_flow_fini(int fd);
+int ipcp_flow_fini(int fd);
-int ipcp_flow_get_qoscube(int fd,
- qoscube_t * cube);
+int ipcp_flow_get_qoscube(int fd,
+ qoscube_t * cube);
-int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
- size_t len);
+size_t ipcp_flow_queued(int fd);
-void ipcp_sdb_release(struct shm_du_buff * sdb);
+int ipcp_sdb_reserve(struct shm_du_buff ** sdb,
+ size_t len);
+
+void ipcp_sdb_release(struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCP_DEV_H */
diff --git a/include/ouroboros/ipcp.h b/include/ouroboros/ipcp.h
index 6494d9b7..86dfd2da 100644
--- a/include/ouroboros/ipcp.h
+++ b/include/ouroboros/ipcp.h
@@ -55,6 +55,11 @@ enum pol_routing {
ROUTING_LINK_STATE_ECMP
};
+enum pol_cong_avoid {
+ CA_NONE = 0,
+ CA_MB_ECN
+};
+
enum pol_dir_hash {
DIR_HASH_SHA3_224 = 0,
DIR_HASH_SHA3_256,
@@ -70,29 +75,30 @@ struct layer_info {
/* Structure to configure the first IPCP */
struct ipcp_config {
- struct layer_info layer_info;
+ struct layer_info layer_info;
- enum ipcp_type type;
+ enum ipcp_type type;
/* Unicast */
- uint8_t addr_size;
- uint8_t eid_size;
- uint8_t max_ttl;
+ uint8_t addr_size;
+ uint8_t eid_size;
+ uint8_t max_ttl;
- enum pol_addr_auth addr_auth_type;
- enum pol_routing routing_type;
+ enum pol_addr_auth addr_auth_type;
+ enum pol_routing routing_type;
+ enum pol_cong_avoid cong_avoid;
/* UDP */
- uint32_t ip_addr;
- uint32_t dns_addr;
- uint16_t clt_port;
- uint16_t srv_port;
+ uint32_t ip_addr;
+ uint32_t dns_addr;
+ uint16_t clt_port;
+ uint16_t srv_port;
/* Ethernet */
- char * dev;
+ char * dev;
/* Ethernet DIX */
- uint16_t ethertype;
+ uint16_t ethertype;
};
#endif /* OUROBOROS_IPCP_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 95d2f783..c8b5f848 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -241,6 +241,7 @@ static void * mainloop(void * o)
conf.max_ttl = conf_msg->max_ttl;
conf.addr_auth_type = conf_msg->addr_auth_type;
conf.routing_type = conf_msg->routing_type;
+ conf.cong_avoid = conf_msg->cong_avoid;
break;
case IPCP_ETH_DIX:
conf.ethertype = conf_msg->ethertype;
@@ -261,7 +262,8 @@ static void * mainloop(void * o)
layer_info.dir_hash_algo = HASH_SHA3_256;
break;
default:
- log_err("Unknown IPCP type: %d.", conf_msg->ipcp_type);
+ log_err("Unknown IPCP type: %d.",
+ conf_msg->ipcp_type);
}
/* UDP and broadcast use fixed hash algorithm. */
diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt
index c0c55519..035ee5f4 100644
--- a/src/ipcpd/unicast/CMakeLists.txt
+++ b/src/ipcpd/unicast/CMakeLists.txt
@@ -33,6 +33,7 @@ endif ()
set(SOURCE_FILES
# Add source files here
addr_auth.c
+ ca.c
connmgr.c
dht.c
dir.c
@@ -51,6 +52,8 @@ set(SOURCE_FILES
pol/simple_pff.c
pol/alternate_pff.c
pol/multipath_pff.c
+ pol/ca-mb-ecn.c
+ pol/ca-nop.c
)
add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES}
diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c
new file mode 100644
index 00000000..f93d0504
--- /dev/null
+++ b/src/ipcpd/unicast/ca.c
@@ -0,0 +1,99 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion Avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define OUROBOROS_PREFIX "ca"
+
+#include <ouroboros/logs.h>
+
+#include "ca.h"
+#include "pol-ca-ops.h"
+#include "pol/ca-mb-ecn.h"
+#include "pol/ca-nop.h"
+
+struct {
+ struct pol_ca_ops * ops;
+} ca;
+
+int ca_init(enum pol_cong_avoid pol)
+{
+ switch(pol) {
+ case CA_NONE:
+ log_dbg("Disabling congestion control.");
+ ca.ops = &nop_ca_ops;
+ break;
+ case CA_MB_ECN:
+ log_dbg("Using multi-bit ECN.");
+ ca.ops = &mb_ecn_ca_ops;
+ break;
+ default:
+ return -1;
+ }
+
+ return 0;
+}
+
+
+void ca_fini(void)
+{
+ ca.ops = NULL;
+}
+
+void * ca_ctx_create(void)
+{
+ return ca.ops->ctx_create();
+}
+
+void ca_ctx_destroy(void * ctx)
+{
+ return ca.ops->ctx_destroy(ctx);
+}
+
+ca_wnd_t ca_ctx_update_snd(void * ctx,
+ size_t len)
+{
+ return ca.ops->ctx_update_snd(ctx, len);
+}
+
+bool ca_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ return ca.ops->ctx_update_rcv(ctx, len, ecn, ece);
+}
+
+void ca_ctx_update_ece(void * ctx,
+ uint16_t ece)
+{
+ return ca.ops->ctx_update_ece(ctx, ece);
+}
+
+void ca_wnd_wait(ca_wnd_t wnd)
+{
+ return ca.ops->wnd_wait(wnd);
+}
+
+uint8_t ca_calc_ecn(int fd,
+ size_t len)
+{
+ return ca.ops->calc_ecn(fd, len);
+}
diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h
new file mode 100644
index 00000000..5cf6199c
--- /dev/null
+++ b/src/ipcpd/unicast/ca.h
@@ -0,0 +1,61 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_H
+#define OUROBOROS_IPCPD_UNICAST_CA_H
+
+#include <ouroboros/ipcp.h>
+
+#include <stdbool.h>
+#include <sys/types.h>
+
+typedef union {
+ time_t wait;
+} ca_wnd_t;
+
+int ca_init(enum pol_cong_avoid ca);
+
+void ca_fini(void);
+
+
+/* OPS */
+void * ca_ctx_create(void);
+
+void ca_ctx_destroy(void * ctx);
+
+ca_wnd_t ca_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool ca_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void ca_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void ca_wnd_wait(ca_wnd_t wnd);
+
+uint8_t ca_calc_ecn(int fd,
+ size_t len);
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_H */
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 7db766a5..53accba3 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -41,6 +41,7 @@
#include <ouroboros/fccntl.h>
#endif
+#include "ca.h"
#include "connmgr.h"
#include "ipcp.h"
#include "dt.h"
@@ -115,16 +116,12 @@ static void dt_pci_ser(uint8_t * head,
}
-static void dt_pci_des(struct shm_du_buff * sdb,
- struct dt_pci * dt_pci)
+static void dt_pci_des(uint8_t * head,
+ struct dt_pci * dt_pci)
{
- uint8_t * head;
-
- assert(sdb);
+ assert(head);
assert(dt_pci);
- head = shm_du_buff_head(sdb);
-
/* Decrease TTL */
--*(head + dt_pci_info.ttl_o);
@@ -226,7 +223,6 @@ static int dt_stat_read(const char * path,
"Queued packets (rx): %20zu\n"
"Queued packets (tx): %20zu\n\n",
tmstr, addrstr, rxqlen, txqlen);
-
for (i = 0; i < QOS_CUBE_MAX; ++i) {
sprintf(str,
"Qos cube %3d:\n"
@@ -434,13 +430,14 @@ static void packet_handler(int fd,
struct dt_pci dt_pci;
int ret;
int ofd;
-#ifndef IPCP_FLOW_STATS
- (void) fd;
-#else
+ uint8_t * head;
size_t len;
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifndef IPCP_FLOW_STATS
+ (void) fd;
+#else
pthread_mutex_lock(&dt.stat[fd].lock);
++dt.stat[fd].rcv_pkt[qc];
@@ -449,7 +446,10 @@ static void packet_handler(int fd,
pthread_mutex_unlock(&dt.stat[fd].lock);
#endif
memset(&dt_pci, 0, sizeof(dt_pci));
- dt_pci_des(sdb, &dt_pci);
+
+ head = shm_du_buff_head(sdb);
+
+ dt_pci_des(head, &dt_pci);
if (dt_pci.dst_addr != ipcpi.dt_addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
@@ -481,6 +481,8 @@ static void packet_handler(int fd,
return;
}
+ *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, len);
+
ret = ipcp_flow_write(ofd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", ofd);
@@ -508,6 +510,9 @@ static void packet_handler(int fd,
} else {
dt_pci_shrink(sdb);
if (dt_pci.eid >= PROG_RES_FDS) {
+ uint8_t ecn = *(head + dt_pci_info.ecn_o);
+ fa_ecn_update(dt_pci.eid, ecn, len);
+
if (ipcp_flow_write(dt_pci.eid, sdb)) {
ipcp_sdb_release(sdb);
#ifdef IPCP_FLOW_STATS
@@ -792,15 +797,15 @@ int dt_write_packet(uint64_t dst_addr,
int fd;
int ret;
uint8_t * head;
-#ifdef IPCP_FLOW_STATS
size_t len;
-#endif
+
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
-#ifdef IPCP_FLOW_STATS
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifdef IPCP_FLOW_STATS
+
pthread_mutex_lock(&dt.stat[np1_fd].lock);
++dt.stat[np1_fd].lcl_r_pkt[qc];
@@ -829,15 +834,15 @@ int dt_write_packet(uint64_t dst_addr,
goto fail_write;
}
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
dt_pci.eid = np1_fd;
- dt_pci.ecn = 0;
+ dt_pci.ecn = ca_calc_ecn(fd, len);
dt_pci_ser(head, &dt_pci);
-#ifdef IPCP_FLOW_STATS
- len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-#endif
+
ret = ipcp_flow_write(fd, sdb);
if (ret < 0) {
log_dbg("Failed to write packet to fd %d.", fd);
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index e154d785..8f268a9d 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -42,6 +42,7 @@
#include "psched.h"
#include "ipcp.h"
#include "dt.h"
+#include "ca.h"
#include <pthread.h>
#include <stdlib.h>
@@ -49,9 +50,10 @@
#define TIMEOUT 10000 /* nanoseconds */
-#define FLOW_REQ 0
-#define FLOW_REPLY 1
-#define MSGBUFSZ 2048
+#define FLOW_REQ 0
+#define FLOW_REPLY 1
+#define FLOW_UPDATE 2
+#define MSGBUFSZ 2048
struct fa_msg {
uint64_t s_addr;
@@ -59,6 +61,7 @@ struct fa_msg {
uint32_t s_eid;
uint8_t code;
int8_t response;
+ uint16_t ece;
/* QoS parameters from spec, aligned */
uint8_t availability;
uint8_t in_order;
@@ -75,10 +78,16 @@ struct cmd {
struct shm_du_buff * sdb;
};
+struct fa_flow {
+ int r_eid; /* remote endpoint id */
+ uint64_t r_addr; /* remote address */
+ void * ctx; /* congestion avoidance context */
+};
+
struct {
pthread_rwlock_t flows_lock;
- int r_eid[PROG_MAX_FLOWS];
- uint64_t r_addr[PROG_MAX_FLOWS];
+ struct fa_flow flows[PROG_MAX_FLOWS];
+
int fd;
struct list_head cmds;
@@ -93,22 +102,56 @@ static void packet_handler(int fd,
qoscube_t qc,
struct shm_du_buff * sdb)
{
- pthread_rwlock_rdlock(&fa.flows_lock);
+ struct fa_flow * flow;
+ uint64_t r_addr;
+ uint32_t r_eid;
+ ca_wnd_t wnd;
+ size_t len;
- if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) {
- pthread_rwlock_unlock(&fa.flows_lock);
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ wnd = ca_ctx_update_snd(flow->ctx, len);
+
+ r_addr = flow->r_addr;
+ r_eid = flow->r_eid;
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ ca_wnd_wait(wnd);
+
+ if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
log_warn("Failed to forward packet.");
return;
}
+}
- pthread_rwlock_unlock(&fa.flows_lock);
+static int fa_flow_init(struct fa_flow * flow)
+{
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
+
+ flow->ctx = ca_ctx_create();
+ if (flow->ctx == NULL)
+ return -1;
+
+ return 0;
}
-static void destroy_conn(int fd)
+static void fa_flow_fini(struct fa_flow * flow)
{
- fa.r_eid[fd] = -1;
- fa.r_addr[fd] = INVALID_ADDR;
+ ca_ctx_destroy(flow->ctx);
+
+ memset(flow, 0, sizeof(*flow));
+
+ flow->r_eid = -1;
+ flow->r_addr = INVALID_ADDR;
}
static void fa_post_packet(void * comp,
@@ -145,14 +188,15 @@ static void * fa_handle_packet(void * o)
(void) o;
while (true) {
- struct timespec abstime;
- int fd;
- uint8_t buf[MSGBUFSZ];
- struct fa_msg * msg;
- qosspec_t qs;
- struct cmd * cmd;
- size_t len;
- size_t msg_len;
+ struct timespec abstime;
+ int fd;
+ uint8_t buf[MSGBUFSZ];
+ struct fa_msg * msg;
+ qosspec_t qs;
+ struct cmd * cmd;
+ size_t len;
+ size_t msg_len;
+ struct fa_flow * flow;
pthread_mutex_lock(&fa.mtx);
@@ -232,10 +276,14 @@ static void * fa_handle_packet(void * o)
continue;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[fd] = ntoh32(msg->s_eid);
- fa.r_addr[fd] = ntoh64(msg->s_addr);
+ fa_flow_init(flow);
+
+ flow->r_eid = ntoh32(msg->s_eid);
+ flow->r_addr = ntoh64(msg->s_addr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -248,19 +296,32 @@ static void * fa_handle_packet(void * o)
case FLOW_REPLY:
assert(len >= sizeof(*msg));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+ flow->r_eid = ntoh32(msg->s_eid);
+
+ if (msg->response < 0)
+ fa_flow_fini(flow);
+ else
+ psched_add(fa.psched, ntoh32(msg->r_eid));
+
+ pthread_rwlock_unlock(&fa.flows_lock);
ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
msg->response,
buf + sizeof(*msg),
len - sizeof(*msg));
+ break;
+ case FLOW_UPDATE:
+ assert(len >= sizeof(*msg));
- if (msg->response < 0)
- destroy_conn(ntoh32(msg->r_eid));
- else
- psched_add(fa.psched, ntoh32(msg->r_eid));
+ flow = &fa.flows[ntoh32(msg->r_eid)];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
pthread_rwlock_unlock(&fa.flows_lock);
@@ -275,10 +336,6 @@ static void * fa_handle_packet(void * o)
int fa_init(void)
{
pthread_condattr_t cattr;
- int i;
-
- for (i = 0; i < PROG_MAX_FLOWS; ++i)
- destroy_conn(i);
if (pthread_rwlock_init(&fa.flows_lock, NULL))
goto fail_rwlock;
@@ -383,9 +440,10 @@ int fa_alloc(int fd,
size_t dlen)
{
struct fa_msg * msg;
- uint64_t addr;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ uint64_t addr;
+ qoscube_t qc = QOS_CUBE_BE;
size_t len;
addr = dir_query(dst);
@@ -397,7 +455,9 @@ int fa_alloc(int fd,
if (ipcp_sdb_reserve(&sdb, len + dlen))
return -1;
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
msg->code = FLOW_REQ;
msg->s_eid = hton32(fd);
msg->s_addr = hton64(ipcpi.dt_addr);
@@ -413,17 +473,17 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
- qc = qos_spec_to_cube(qs);
-
if (dt_write_packet(addr, qc, fa.fd, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
+ flow = &fa.flows[fd];
+
pthread_rwlock_wrlock(&fa.flows_lock);
- assert(fa.r_eid[fd] == -1);
- fa.r_addr[fd] = addr;
+ fa_flow_init(flow);
+ flow->r_addr = addr;
pthread_rwlock_unlock(&fa.flows_lock);
@@ -439,10 +499,13 @@ int fa_alloc_resp(int fd,
struct timespec abstime;
struct fa_msg * msg;
struct shm_du_buff * sdb;
- qoscube_t qc;
+ struct fa_flow * flow;
+ qoscube_t qc = QOS_CUBE_BE;
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ flow = &fa.flows[fd];
+
pthread_mutex_lock(&ipcpi.alloc_lock);
while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
@@ -463,33 +526,31 @@ int fa_alloc_resp(int fd,
pthread_mutex_unlock(&ipcpi.alloc_lock);
if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
return -1;
}
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+ memset(msg, 0, sizeof(*msg));
+
pthread_rwlock_wrlock(&fa.flows_lock);
- msg = (struct fa_msg *) shm_du_buff_head(sdb);
msg->code = FLOW_REPLY;
- msg->r_eid = hton32(fa.r_eid[fd]);
+ msg->r_eid = hton32(flow->r_eid);
msg->s_eid = hton32(fd);
msg->response = response;
memcpy(msg + 1, data, len);
if (response < 0) {
- destroy_conn(fd);
+ fa_flow_fini(flow);
ipcp_sdb_release(sdb);
} else {
psched_add(fa.psched, fd);
}
- ipcp_flow_get_qoscube(fd, &qc);
-
- assert(qc >= 0 && qc < QOS_CUBE_MAX);
-
- if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) {
- destroy_conn(fd);
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
return -1;
@@ -505,11 +566,11 @@ int fa_dealloc(int fd)
if (ipcp_flow_fini(fd) < 0)
return 0;
- pthread_rwlock_wrlock(&fa.flows_lock);
-
psched_del(fa.psched, fd);
- destroy_conn(fd);
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ fa_flow_fini(&fa.flows[fd]);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -517,3 +578,60 @@ int fa_dealloc(int fd)
return 0;
}
+
+static int fa_update_remote(int fd,
+ uint16_t ece)
+{
+ struct fa_msg * msg;
+ struct shm_du_buff * sdb;
+ qoscube_t qc = QOS_CUBE_BE;
+ struct fa_flow * flow;
+
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+ return -1;
+ }
+
+ msg = (struct fa_msg *) shm_du_buff_head(sdb);
+
+ memset(msg, 0, sizeof(*msg));
+
+ flow = &fa.flows[fd];
+
+ pthread_rwlock_rdlock(&fa.flows_lock);
+
+ msg->code = FLOW_UPDATE;
+ msg->r_eid = hton32(flow->r_eid);
+ msg->ece = hton16(ece);
+
+ if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
+ return -1;
+ }
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+
+ return 0;
+}
+
+void fa_ecn_update(int eid,
+ uint8_t ecn,
+ size_t len)
+{
+ struct fa_flow * flow;
+ bool update;
+ uint16_t ece;
+
+ flow = &fa.flows[eid];
+
+ pthread_rwlock_wrlock(&fa.flows_lock);
+
+ update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
+
+ pthread_rwlock_unlock(&fa.flows_lock);
+
+ if (update)
+ fa_update_remote(eid, ece);
+
+}
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index 12a10a0c..daba2a51 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,4 +47,8 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
+void fa_ecn_update(int eid,
+ uint8_t ecn,
+ size_t len);
+
#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */
diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c
index 0ab37d25..1b2cc14e 100644
--- a/src/ipcpd/unicast/main.c
+++ b/src/ipcpd/unicast/main.c
@@ -39,6 +39,7 @@
#include <ouroboros/time_utils.h>
#include "addr_auth.h"
+#include "ca.h"
#include "connmgr.h"
#include "dir.h"
#include "dt.h"
@@ -83,6 +84,11 @@ static int initialize_components(const struct ipcp_config * conf)
log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
+ if (ca_init(conf->cong_avoid)) {
+ log_err("Failed to initialize congestion avoidance.");
+ goto fail_ca;
+ }
+
if (dt_init(conf->routing_type,
conf->addr_size,
conf->eid_size,
@@ -110,6 +116,8 @@ static int initialize_components(const struct ipcp_config * conf)
fail_fa:
dt_fini();
fail_dt:
+ ca_fini();
+ fail_ca:
addr_auth_fini();
fail_addr_auth:
free(ipcpi.layer_name);
@@ -125,6 +133,8 @@ static void finalize_components(void)
dt_fini();
+ ca_fini();
+
addr_auth_fini();
free(ipcpi.layer_name);
diff --git a/src/ipcpd/unicast/pol-ca-ops.h b/src/ipcpd/unicast/pol-ca-ops.h
new file mode 100644
index 00000000..3cb8a9d2
--- /dev/null
+++ b/src/ipcpd/unicast/pol-ca-ops.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance policy ops
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+#define OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+
+#include "ca.h"
+
+struct pol_ca_ops {
+ void * (* ctx_create)(void);
+
+ void (* ctx_destroy)(void * ctx);
+
+ ca_wnd_t (* ctx_update_snd)(void * ctx,
+ size_t len);
+
+ bool (* ctx_update_rcv)(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+ void (* ctx_update_ece)(void * ctx,
+ uint16_t ece);
+
+ void (* wnd_wait)(ca_wnd_t wnd);
+
+ uint8_t (* calc_ecn)(int fd,
+ size_t len);
+};
+
+#endif /* OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H */
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.c b/src/ipcpd/unicast/pol/ca-mb-ecn.c
new file mode 100644
index 00000000..2de8f8e7
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.c
@@ -0,0 +1,216 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/time_utils.h>
+
+#include "ca-mb-ecn.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+/* congestion avoidance constants */
+#define CA_SHFT 5
+#define CA_WND (1 << CA_SHFT)
+#define CA_UPD (1 << (CA_SHFT - 3))
+#define CA_SLOT 18
+#define CA_AI 20000
+#define ECN_Q_SHFT 5
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+
+struct mb_ecn_ctx {
+ uint16_t rx_ece; /* level of congestion (upstream) */
+ size_t rx_ctr; /* receiver side packet counter */
+
+ uint16_t tx_ece; /* level of congestion (downstream) */
+ size_t tx_ctr; /* sender side packet counter */
+ size_t tx_aps; /* average packet size */
+ time_t tx_wnd; /* tgt time to send packets (ns) */
+ bool tx_cav; /* Congestion avoidance */
+ size_t tx_slot;
+
+ struct timespec t_sent; /* last sent packet */
+};
+
+struct pol_ca_ops mb_ecn_ca_ops = {
+ .ctx_create = mb_ecn_ctx_create,
+ .ctx_destroy = mb_ecn_ctx_destroy,
+ .ctx_update_snd = mb_ecn_ctx_update_snd,
+ .ctx_update_rcv = mb_ecn_ctx_update_rcv,
+ .ctx_update_ece = mb_ecn_ctx_update_ece,
+ .wnd_wait = mb_ecn_wnd_wait,
+ .calc_ecn = mb_ecn_calc_ecn
+};
+
+void * mb_ecn_ctx_create(void)
+{
+
+ struct mb_ecn_ctx * ctx;
+
+ ctx = malloc(sizeof(*ctx));
+ if (ctx == NULL)
+ return NULL;
+
+ memset(ctx, 0, sizeof(*ctx));
+
+ return (void *) ctx;
+}
+
+void mb_ecn_ctx_destroy(void * ctx)
+{
+ free(ctx);
+}
+
+ca_wnd_t mb_ecn_ctx_update_snd(void * _ctx,
+ size_t len)
+{
+ struct timespec now;
+ size_t slot;
+ time_t gap;
+ ca_wnd_t wnd;
+
+ struct mb_ecn_ctx * ctx = _ctx;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (ctx->tx_wnd == 0) { /* 10 ms initial window estimate */
+ ctx->tx_wnd = 10 * MILLION;
+ gap = ctx->tx_wnd >> CA_SHFT;
+ ctx->tx_aps = len >> CA_SHFT;
+ ctx->tx_slot = ts_to_ns(now) >> CA_SLOT;
+ } else {
+ gap = ts_diff_ns(&ctx->t_sent, &now);
+ ctx->tx_aps -= ctx->tx_aps >> CA_SHFT;
+ ctx->tx_aps += len;
+ }
+
+ ctx->t_sent = now;
+
+ slot = ts_to_ns(now) >> CA_SLOT;
+
+ ctx->tx_ctr++;
+
+ if (slot - ctx->tx_slot > 0) {
+ ctx->tx_slot = slot;
+
+ if (ctx->tx_ctr > CA_WND)
+ ctx->tx_ece = 0;
+
+ /* Slow start */
+ if (!ctx->tx_cav) {
+ ctx->tx_wnd >>= 1;
+ /* Multiplicative Decrease */
+ } else if (ctx->tx_ece) { /* MD */
+ ctx->tx_wnd += (ctx->tx_wnd * ctx->tx_ece)
+ >> (CA_SHFT + 8);
+ /* Additive Increase */
+ } else {
+ size_t bw = ctx->tx_aps * BILLION / ctx->tx_wnd;
+ bw += CA_AI;
+ ctx->tx_wnd = ctx->tx_aps * BILLION / bw;
+ }
+ }
+
+ wnd.wait = (ctx->tx_wnd >> CA_SHFT) - gap;
+
+ return wnd;
+}
+
+void mb_ecn_wnd_wait(ca_wnd_t wnd)
+{
+ if (wnd.wait > 0) {
+ struct timespec s = {0, 0};
+ if (wnd.wait > BILLION) /* Don't care throttling < 1pps */
+ s.tv_sec = 1;
+ else
+ s.tv_nsec = wnd.wait;
+
+ nanosleep(&s, NULL);
+ }
+}
+
+bool mb_ecn_ctx_update_rcv(void * _ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ struct mb_ecn_ctx* ctx = _ctx;
+ bool update;
+
+ (void) len;
+
+ if ((ctx->rx_ece | ecn) == 0)
+ return false;
+
+ if (ecn == 0) {
+ /* end of congestion */
+ ctx->rx_ece >>= 2;
+ update = ctx->rx_ece == 0;
+ } else {
+ if (ctx->rx_ece == 0) {
+ /* start of congestion */
+ ctx->rx_ece = ecn;
+ ctx->rx_ctr = 0;
+ update = true;
+ } else {
+ /* congestion update */
+ ctx->rx_ece -= ctx->rx_ece >> CA_SHFT;
+ ctx->rx_ece += ecn;
+ update = (ctx->rx_ctr++ & (CA_UPD - 1)) == true;
+ }
+ }
+
+ *ece = ctx->rx_ece;
+
+ return update;
+}
+
+
+void mb_ecn_ctx_update_ece(void * _ctx,
+ uint16_t ece)
+{
+ struct mb_ecn_ctx* ctx = _ctx;
+
+ ctx->tx_ece = ece;
+ ctx->tx_ctr = 0;
+ ctx->tx_cav = true;
+}
+
+uint8_t mb_ecn_calc_ecn(int fd,
+ size_t len)
+{
+ size_t q;
+
+ (void) len;
+
+ q = ipcp_flow_queued(fd);
+
+ return (uint8_t) (q >> ECN_Q_SHFT);
+}
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.h b/src/ipcpd/unicast/pol/ca-mb-ecn.h
new file mode 100644
index 00000000..456b9b13
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+#define OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+
+#include "pol-ca-ops.h"
+
+void * mb_ecn_ctx_create(void);
+
+void mb_ecn_ctx_destroy(void * ctx);
+
+ca_wnd_t mb_ecn_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool mb_ecn_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void mb_ecn_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void mb_ecn_wnd_wait(ca_wnd_t wnd);
+
+uint8_t mb_ecn_calc_ecn(int fd,
+ size_t len);
+
+extern struct pol_ca_ops mb_ecn_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */
diff --git a/src/ipcpd/unicast/pol/ca-nop.c b/src/ipcpd/unicast/pol/ca-nop.c
new file mode 100644
index 00000000..d0d89a2e
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.c
@@ -0,0 +1,93 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include "ca-nop.h"
+
+#include <string.h>
+
+struct pol_ca_ops nop_ca_ops = {
+ .ctx_create = nop_ctx_create,
+ .ctx_destroy = nop_ctx_destroy,
+ .ctx_update_snd = nop_ctx_update_snd,
+ .ctx_update_rcv = nop_ctx_update_rcv,
+ .ctx_update_ece = nop_ctx_update_ece,
+ .wnd_wait = nop_wnd_wait,
+ .calc_ecn = nop_calc_ecn
+};
+
+void * nop_ctx_create(void)
+{
+ return (void *) 1;
+}
+
+void nop_ctx_destroy(void * ctx)
+{
+ (void) ctx;
+}
+
+ca_wnd_t nop_ctx_update_snd(void * ctx,
+ size_t len)
+{
+ ca_wnd_t wnd;
+
+ (void) ctx;
+ (void) len;
+
+ memset(&wnd, 0, sizeof(wnd));
+
+ return wnd;
+}
+
+void nop_wnd_wait(ca_wnd_t wnd)
+{
+ (void) wnd;
+}
+
+bool nop_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece)
+{
+ (void) ctx;
+ (void) len;
+ (void) ecn;
+ (void) ece;
+
+ return false;
+}
+
+void nop_ctx_update_ece(void * ctx,
+ uint16_t ece)
+{
+ (void) ctx;
+ (void) ece;
+}
+
+
+uint8_t nop_calc_ecn(int fd,
+ size_t len)
+{
+ (void) fd;
+ (void) len;
+
+ return 0;
+}
diff --git a/src/ipcpd/unicast/pol/ca-nop.h b/src/ipcpd/unicast/pol/ca-nop.h
new file mode 100644
index 00000000..baf649d8
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.h
@@ -0,0 +1,50 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+#define OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+
+#include "pol-ca-ops.h"
+
+void * nop_ctx_create(void);
+
+void nop_ctx_destroy(void * ctx);
+
+ca_wnd_t nop_ctx_update_snd(void * ctx,
+ size_t len);
+
+bool nop_ctx_update_rcv(void * ctx,
+ size_t len,
+ uint8_t ecn,
+ uint16_t * ece);
+
+void nop_ctx_update_ece(void * ctx,
+ uint16_t ece);
+
+void nop_wnd_wait(ca_wnd_t wnd);
+
+uint8_t nop_calc_ecn(int fd,
+ size_t len);
+
+extern struct pol_ca_ops nop_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_NOP_H */
diff --git a/src/ipcpd/unicast/pol/link_state.c b/src/ipcpd/unicast/pol/link_state.c
index d9482876..ca8a7c50 100644
--- a/src/ipcpd/unicast/pol/link_state.c
+++ b/src/ipcpd/unicast/pol/link_state.c
@@ -812,8 +812,12 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
pthread_rwlock_rdlock(&ls.db_lock);
+
+ pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock,
+ (void *) &ls.db_lock);
+
send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0);
- pthread_rwlock_unlock(&ls.db_lock);
+ pthread_cleanup_pop(true);
if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT))
log_dbg("Failed to add neighbor to LSDB.");
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4b78c1db..a6be762b 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1768,6 +1768,21 @@ int ipcp_flow_get_qoscube(int fd,
return 0;
}
+size_t ipcp_flow_queued(int fd)
+{
+ size_t q;
+
+ pthread_rwlock_rdlock(&ai.lock);
+
+ assert(ai.flows[fd].flow_id >= 0);
+
+ q = shm_rbuff_queued(ai.flows[fd].tx_rb);
+
+ pthread_rwlock_unlock(&ai.lock);
+
+ return q;
+}
+
ssize_t local_flow_read(int fd)
{
ssize_t ret;
diff --git a/src/lib/ipcp_config.proto b/src/lib/ipcp_config.proto
index 23c65e94..7bf5329e 100644
--- a/src/lib/ipcp_config.proto
+++ b/src/lib/ipcp_config.proto
@@ -36,15 +36,16 @@ message ipcp_config_msg {
optional uint32 max_ttl = 5;
optional uint32 addr_auth_type = 6;
optional uint32 routing_type = 7;
+ optional uint32 cong_avoid = 8;
// Config for UDP
- optional uint32 ip_addr = 8;
- optional uint32 dns_addr = 9;
- optional uint32 clt_port = 10;
- optional uint32 srv_port = 11;
+ optional uint32 ip_addr = 9;
+ optional uint32 dns_addr = 10;
+ optional uint32 clt_port = 11;
+ optional uint32 srv_port = 12;
// Config for the Ethernet
- optional string dev = 12;
+ optional string dev = 13;
// Config for DIX Ethernet
- optional uint32 ethertype = 13;
+ optional uint32 ethertype = 14;
}
enum enroll_code {
diff --git a/src/lib/irm.c b/src/lib/irm.c
index 08dffb6c..42ad74fa 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -132,6 +132,8 @@ int irm_bootstrap_ipcp(pid_t pid,
config.addr_auth_type = conf->addr_auth_type;
config.has_routing_type = true;
config.routing_type = conf->routing_type;
+ config.has_cong_avoid = true;
+ config.cong_avoid = conf->cong_avoid;
break;
case IPCP_UDP:
config.has_ip_addr = true;
diff --git a/src/tools/irm/irm_ipcp_bootstrap.c b/src/tools/irm/irm_ipcp_bootstrap.c
index 84b6759a..ba57a506 100644
--- a/src/tools/irm/irm_ipcp_bootstrap.c
+++ b/src/tools/irm/irm_ipcp_bootstrap.c
@@ -50,7 +50,7 @@
#include <sys/socket.h>
#endif
-#define UNICAST "unicast"
+#define UNICAST "unicast"
#define BROADCAST "broadcast"
#define UDP "udp"
#define ETH_LLC "eth-llc"
@@ -70,6 +70,7 @@
#define DEFAULT_TTL 60
#define DEFAULT_ADDR_AUTH ADDR_AUTH_FLAT_RANDOM
#define DEFAULT_ROUTING ROUTING_LINK_STATE
+#define DEFAULT_CONG_AVOID CA_MB_ECN
#define DEFAULT_HASH_ALGO DIR_HASH_SHA3_256
#define DEFAULT_ETHERTYPE 0xA000
#define DEFAULT_CLIENT_PORT 0x0000 /* random port */
@@ -79,6 +80,8 @@
#define LINK_STATE_ROUTING "link_state"
#define LINK_STATE_LFA_ROUTING "lfa"
#define LINK_STATE_ECM_ROUTING "ecmp"
+#define NONE_CA "none"
+#define MB_ECN_CA "mb-ecn"
static void usage(void)
{
@@ -95,11 +98,13 @@ static void usage(void)
" [ttl (max time-to-live value, default: %d)]\n"
" [addr_auth <ADDRESS_POLICY> (default: %s)]\n"
" [routing <ROUTING_POLICY> (default: %s)]\n"
+ " [congestion <CONG_POLICY> (default: %s)]\n"
" [hash [ALGORITHM] (default: %s)]\n"
" [autobind]\n"
- "where ADDRESS_POLICY = {"FLAT_RANDOM_ADDR_AUTH"}\n"
- " ROUTING_POLICY = {"LINK_STATE_ROUTING " "
+ "where ADDRESS_POLICY = {" FLAT_RANDOM_ADDR_AUTH "}\n"
+ " ROUTING_POLICY = {" LINK_STATE_ROUTING " "
LINK_STATE_LFA_ROUTING " " LINK_STATE_ECM_ROUTING "}\n"
+ " CONG_POLICY = {" NONE_CA " " MB_ECN_CA "}\n"
" ALGORITHM = {" SHA3_224 " " SHA3_256 " "
SHA3_384 " " SHA3_512 "}\n\n"
"if TYPE == " UDP "\n"
@@ -130,7 +135,7 @@ static void usage(void)
"if TYPE == " BROADCAST "\n"
" [autobind]\n\n",
DEFAULT_ADDR_SIZE, DEFAULT_EID_SIZE, DEFAULT_TTL,
- FLAT_RANDOM_ADDR_AUTH, LINK_STATE_ROUTING,
+ FLAT_RANDOM_ADDR_AUTH, LINK_STATE_ROUTING, MB_ECN_CA,
SHA3_256, DEFAULT_SERVER_PORT, SHA3_256, 0xA000, SHA3_256,
SHA3_256, SHA3_256);
}
@@ -138,29 +143,30 @@ static void usage(void)
int do_bootstrap_ipcp(int argc,
char ** argv)
{
- char * ipcp = NULL;
- pid_t pid = -1;
- struct ipcp_config conf;
- uint8_t addr_size = DEFAULT_ADDR_SIZE;
- uint8_t eid_size = DEFAULT_EID_SIZE;
- uint8_t max_ttl = DEFAULT_TTL;
- enum pol_addr_auth addr_auth_type = DEFAULT_ADDR_AUTH;
- enum pol_routing routing_type = DEFAULT_ROUTING;
- enum pol_dir_hash hash_algo = DEFAULT_HASH_ALGO;
- uint32_t ip_addr = 0;
- uint32_t dns_addr = DEFAULT_DDNS;
- char * ipcp_type = NULL;
- enum ipcp_type type = IPCP_INVALID;
- char * layer = NULL;
- char * dev = NULL;
- uint16_t ethertype = DEFAULT_ETHERTYPE;
- struct ipcp_info * ipcps;
- ssize_t len = 0;
- int i = 0;
- bool autobind = false;
- int cargs;
- int cport = DEFAULT_CLIENT_PORT;
- int sport = DEFAULT_SERVER_PORT;
+ char * ipcp = NULL;
+ pid_t pid = -1;
+ struct ipcp_config conf;
+ uint8_t addr_size = DEFAULT_ADDR_SIZE;
+ uint8_t eid_size = DEFAULT_EID_SIZE;
+ uint8_t max_ttl = DEFAULT_TTL;
+ enum pol_addr_auth addr_auth_type = DEFAULT_ADDR_AUTH;
+ enum pol_routing routing_type = DEFAULT_ROUTING;
+ enum pol_dir_hash hash_algo = DEFAULT_HASH_ALGO;
+ enum pol_cong_avoid cong_avoid = DEFAULT_CONG_AVOID;
+ uint32_t ip_addr = 0;
+ uint32_t dns_addr = DEFAULT_DDNS;
+ char * ipcp_type = NULL;
+ enum ipcp_type type = IPCP_INVALID;
+ char * layer = NULL;
+ char * dev = NULL;
+ uint16_t ethertype = DEFAULT_ETHERTYPE;
+ struct ipcp_info * ipcps;
+ ssize_t len = 0;
+ int i = 0;
+ bool autobind = false;
+ int cargs;
+ int cport = DEFAULT_CLIENT_PORT;
+ int sport = DEFAULT_SERVER_PORT;
while (argc > 0) {
cargs = 2;
@@ -230,6 +236,14 @@ int do_bootstrap_ipcp(int argc,
routing_type = ROUTING_LINK_STATE_ECMP;
else
goto unknown_param;
+ } else if (matches(*argv, "congestion") == 0) {
+ if (strcmp(NONE_CA, *(argv + 1)) == 0)
+ cong_avoid = CA_NONE;
+ else if (strcmp(MB_ECN_CA,
+ *(argv + 1)) == 0)
+ cong_avoid = CA_MB_ECN;
+ else
+ goto unknown_param;
} else {
printf("Unknown option: \"%s\".\n", *argv);
return -1;
@@ -315,6 +329,7 @@ int do_bootstrap_ipcp(int argc,
conf.max_ttl = max_ttl;
conf.addr_auth_type = addr_auth_type;
conf.routing_type = routing_type;
+ conf.cong_avoid = cong_avoid;
break;
case IPCP_UDP:
if (ip_addr == 0)