diff options
Diffstat (limited to 'src/ipcpd/unicast')
-rw-r--r-- | src/ipcpd/unicast/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/ipcpd/unicast/ca.c | 99 | ||||
-rw-r--r-- | src/ipcpd/unicast/ca.h | 61 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.c | 43 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.c | 222 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.h | 4 | ||||
-rw-r--r-- | src/ipcpd/unicast/main.c | 10 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol-ca-ops.h | 50 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol/ca-mb-ecn.c | 216 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol/ca-mb-ecn.h | 50 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol/ca-nop.c | 93 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol/ca-nop.h | 50 | ||||
-rw-r--r-- | src/ipcpd/unicast/pol/link_state.c | 6 |
13 files changed, 835 insertions, 72 deletions
diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt index c0c55519..035ee5f4 100644 --- a/src/ipcpd/unicast/CMakeLists.txt +++ b/src/ipcpd/unicast/CMakeLists.txt @@ -33,6 +33,7 @@ endif () set(SOURCE_FILES # Add source files here addr_auth.c + ca.c connmgr.c dht.c dir.c @@ -51,6 +52,8 @@ set(SOURCE_FILES pol/simple_pff.c pol/alternate_pff.c pol/multipath_pff.c + pol/ca-mb-ecn.c + pol/ca-nop.c ) add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES} diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c new file mode 100644 index 00000000..f93d0504 --- /dev/null +++ b/src/ipcpd/unicast/ca.c @@ -0,0 +1,99 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Congestion Avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define OUROBOROS_PREFIX "ca" + +#include <ouroboros/logs.h> + +#include "ca.h" +#include "pol-ca-ops.h" +#include "pol/ca-mb-ecn.h" +#include "pol/ca-nop.h" + +struct { + struct pol_ca_ops * ops; +} ca; + +int ca_init(enum pol_cong_avoid pol) +{ + switch(pol) { + case CA_NONE: + log_dbg("Disabling congestion control."); + ca.ops = &nop_ca_ops; + break; + case CA_MB_ECN: + log_dbg("Using multi-bit ECN."); + ca.ops = &mb_ecn_ca_ops; + break; + default: + return -1; + } + + return 0; +} + + +void ca_fini(void) +{ + ca.ops = NULL; +} + +void * ca_ctx_create(void) +{ + return ca.ops->ctx_create(); +} + +void ca_ctx_destroy(void * ctx) +{ + return ca.ops->ctx_destroy(ctx); +} + +ca_wnd_t ca_ctx_update_snd(void * ctx, + size_t len) +{ + return ca.ops->ctx_update_snd(ctx, len); +} + +bool ca_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + return ca.ops->ctx_update_rcv(ctx, len, ecn, ece); +} + +void ca_ctx_update_ece(void * ctx, + uint16_t ece) +{ + return ca.ops->ctx_update_ece(ctx, ece); +} + +void ca_wnd_wait(ca_wnd_t wnd) +{ + return ca.ops->wnd_wait(wnd); +} + +uint8_t ca_calc_ecn(int fd, + size_t len) +{ + return ca.ops->calc_ecn(fd, len); +} diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h new file mode 100644 index 00000000..5cf6199c --- /dev/null +++ b/src/ipcpd/unicast/ca.h @@ -0,0 +1,61 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Congestion avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_H +#define OUROBOROS_IPCPD_UNICAST_CA_H + +#include <ouroboros/ipcp.h> + +#include <stdbool.h> +#include <sys/types.h> + +typedef union { + time_t wait; +} ca_wnd_t; + +int ca_init(enum pol_cong_avoid ca); + +void ca_fini(void); + + +/* OPS */ +void * ca_ctx_create(void); + +void ca_ctx_destroy(void * ctx); + +ca_wnd_t ca_ctx_update_snd(void * ctx, + size_t len); + +bool ca_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void ca_ctx_update_ece(void * ctx, + uint16_t ece); + +void ca_wnd_wait(ca_wnd_t wnd); + +uint8_t ca_calc_ecn(int fd, + size_t len); + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_H */ diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 7db766a5..53accba3 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -41,6 +41,7 @@ #include <ouroboros/fccntl.h> #endif +#include "ca.h" #include "connmgr.h" #include "ipcp.h" #include "dt.h" @@ -115,16 +116,12 @@ static void dt_pci_ser(uint8_t * head, } -static void dt_pci_des(struct shm_du_buff * sdb, - struct dt_pci * dt_pci) +static void dt_pci_des(uint8_t * head, + struct dt_pci * dt_pci) { - uint8_t * head; - - assert(sdb); + assert(head); assert(dt_pci); - head = shm_du_buff_head(sdb); - /* Decrease TTL */ --*(head + dt_pci_info.ttl_o); @@ -226,7 +223,6 @@ static int dt_stat_read(const char * path, "Queued packets (rx): %20zu\n" "Queued packets (tx): %20zu\n\n", tmstr, addrstr, rxqlen, txqlen); - for (i = 0; i < QOS_CUBE_MAX; ++i) { sprintf(str, "Qos cube %3d:\n" @@ -434,13 +430,14 @@ static void packet_handler(int fd, struct dt_pci dt_pci; int ret; int ofd; -#ifndef IPCP_FLOW_STATS - (void) fd; -#else + uint8_t * head; size_t len; len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#ifndef IPCP_FLOW_STATS + (void) fd; +#else pthread_mutex_lock(&dt.stat[fd].lock); ++dt.stat[fd].rcv_pkt[qc]; @@ -449,7 +446,10 @@ static void packet_handler(int fd, pthread_mutex_unlock(&dt.stat[fd].lock); #endif memset(&dt_pci, 0, sizeof(dt_pci)); - dt_pci_des(sdb, &dt_pci); + + head = shm_du_buff_head(sdb); + + dt_pci_des(head, &dt_pci); if (dt_pci.dst_addr != ipcpi.dt_addr) { if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); @@ -481,6 +481,8 @@ static void packet_handler(int fd, return; } + *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, len); + ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", ofd); @@ -508,6 +510,9 @@ static void packet_handler(int fd, } else { dt_pci_shrink(sdb); if (dt_pci.eid >= PROG_RES_FDS) { + uint8_t ecn = *(head + dt_pci_info.ecn_o); + fa_ecn_update(dt_pci.eid, ecn, len); + if (ipcp_flow_write(dt_pci.eid, sdb)) { ipcp_sdb_release(sdb); #ifdef IPCP_FLOW_STATS @@ -792,15 +797,15 @@ int dt_write_packet(uint64_t dst_addr, int fd; int ret; uint8_t * head; -#ifdef IPCP_FLOW_STATS size_t len; -#endif + assert(sdb); assert(dst_addr != ipcpi.dt_addr); -#ifdef IPCP_FLOW_STATS len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[np1_fd].lock); ++dt.stat[np1_fd].lcl_r_pkt[qc]; @@ -829,15 +834,15 @@ int dt_write_packet(uint64_t dst_addr, goto fail_write; } + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; dt_pci.eid = np1_fd; - dt_pci.ecn = 0; + dt_pci.ecn = ca_calc_ecn(fd, len); dt_pci_ser(head, &dt_pci); -#ifdef IPCP_FLOW_STATS - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); -#endif + ret = ipcp_flow_write(fd, sdb); if (ret < 0) { log_dbg("Failed to write packet to fd %d.", fd); diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index e154d785..8f268a9d 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -42,6 +42,7 @@ #include "psched.h" #include "ipcp.h" #include "dt.h" +#include "ca.h" #include <pthread.h> #include <stdlib.h> @@ -49,9 +50,10 @@ #define TIMEOUT 10000 /* nanoseconds */ -#define FLOW_REQ 0 -#define FLOW_REPLY 1 -#define MSGBUFSZ 2048 +#define FLOW_REQ 0 +#define FLOW_REPLY 1 +#define FLOW_UPDATE 2 +#define MSGBUFSZ 2048 struct fa_msg { uint64_t s_addr; @@ -59,6 +61,7 @@ struct fa_msg { uint32_t s_eid; uint8_t code; int8_t response; + uint16_t ece; /* QoS parameters from spec, aligned */ uint8_t availability; uint8_t in_order; @@ -75,10 +78,16 @@ struct cmd { struct shm_du_buff * sdb; }; +struct fa_flow { + int r_eid; /* remote endpoint id */ + uint64_t r_addr; /* remote address */ + void * ctx; /* congestion avoidance context */ +}; + struct { pthread_rwlock_t flows_lock; - int r_eid[PROG_MAX_FLOWS]; - uint64_t r_addr[PROG_MAX_FLOWS]; + struct fa_flow flows[PROG_MAX_FLOWS]; + int fd; struct list_head cmds; @@ -93,22 +102,56 @@ static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) { - pthread_rwlock_rdlock(&fa.flows_lock); + struct fa_flow * flow; + uint64_t r_addr; + uint32_t r_eid; + ca_wnd_t wnd; + size_t len; - if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); + flow = &fa.flows[fd]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + + wnd = ca_ctx_update_snd(flow->ctx, len); + + r_addr = flow->r_addr; + r_eid = flow->r_eid; + + pthread_rwlock_unlock(&fa.flows_lock); + + ca_wnd_wait(wnd); + + if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); log_warn("Failed to forward packet."); return; } +} - pthread_rwlock_unlock(&fa.flows_lock); +static int fa_flow_init(struct fa_flow * flow) +{ + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->r_addr = INVALID_ADDR; + + flow->ctx = ca_ctx_create(); + if (flow->ctx == NULL) + return -1; + + return 0; } -static void destroy_conn(int fd) +static void fa_flow_fini(struct fa_flow * flow) { - fa.r_eid[fd] = -1; - fa.r_addr[fd] = INVALID_ADDR; + ca_ctx_destroy(flow->ctx); + + memset(flow, 0, sizeof(*flow)); + + flow->r_eid = -1; + flow->r_addr = INVALID_ADDR; } static void fa_post_packet(void * comp, @@ -145,14 +188,15 @@ static void * fa_handle_packet(void * o) (void) o; while (true) { - struct timespec abstime; - int fd; - uint8_t buf[MSGBUFSZ]; - struct fa_msg * msg; - qosspec_t qs; - struct cmd * cmd; - size_t len; - size_t msg_len; + struct timespec abstime; + int fd; + uint8_t buf[MSGBUFSZ]; + struct fa_msg * msg; + qosspec_t qs; + struct cmd * cmd; + size_t len; + size_t msg_len; + struct fa_flow * flow; pthread_mutex_lock(&fa.mtx); @@ -232,10 +276,14 @@ static void * fa_handle_packet(void * o) continue; } + flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[fd] = ntoh32(msg->s_eid); - fa.r_addr[fd] = ntoh64(msg->s_addr); + fa_flow_init(flow); + + flow->r_eid = ntoh32(msg->s_eid); + flow->r_addr = ntoh64(msg->s_addr); pthread_rwlock_unlock(&fa.flows_lock); @@ -248,19 +296,32 @@ static void * fa_handle_packet(void * o) case FLOW_REPLY: assert(len >= sizeof(*msg)); + flow = &fa.flows[ntoh32(msg->r_eid)]; + pthread_rwlock_wrlock(&fa.flows_lock); - fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid); + flow->r_eid = ntoh32(msg->s_eid); + + if (msg->response < 0) + fa_flow_fini(flow); + else + psched_add(fa.psched, ntoh32(msg->r_eid)); + + pthread_rwlock_unlock(&fa.flows_lock); ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response, buf + sizeof(*msg), len - sizeof(*msg)); + break; + case FLOW_UPDATE: + assert(len >= sizeof(*msg)); - if (msg->response < 0) - destroy_conn(ntoh32(msg->r_eid)); - else - psched_add(fa.psched, ntoh32(msg->r_eid)); + flow = &fa.flows[ntoh32(msg->r_eid)]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece)); pthread_rwlock_unlock(&fa.flows_lock); @@ -275,10 +336,6 @@ static void * fa_handle_packet(void * o) int fa_init(void) { pthread_condattr_t cattr; - int i; - - for (i = 0; i < PROG_MAX_FLOWS; ++i) - destroy_conn(i); if (pthread_rwlock_init(&fa.flows_lock, NULL)) goto fail_rwlock; @@ -383,9 +440,10 @@ int fa_alloc(int fd, size_t dlen) { struct fa_msg * msg; - uint64_t addr; struct shm_du_buff * sdb; - qoscube_t qc; + struct fa_flow * flow; + uint64_t addr; + qoscube_t qc = QOS_CUBE_BE; size_t len; addr = dir_query(dst); @@ -397,7 +455,9 @@ int fa_alloc(int fd, if (ipcp_sdb_reserve(&sdb, len + dlen)) return -1; - msg = (struct fa_msg *) shm_du_buff_head(sdb); + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + msg->code = FLOW_REQ; msg->s_eid = hton32(fd); msg->s_addr = hton64(ipcpi.dt_addr); @@ -413,17 +473,17 @@ int fa_alloc(int fd, memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(shm_du_buff_head(sdb) + len, data, dlen); - qc = qos_spec_to_cube(qs); - if (dt_write_packet(addr, qc, fa.fd, sdb)) { ipcp_sdb_release(sdb); return -1; } + flow = &fa.flows[fd]; + pthread_rwlock_wrlock(&fa.flows_lock); - assert(fa.r_eid[fd] == -1); - fa.r_addr[fd] = addr; + fa_flow_init(flow); + flow->r_addr = addr; pthread_rwlock_unlock(&fa.flows_lock); @@ -439,10 +499,13 @@ int fa_alloc_resp(int fd, struct timespec abstime; struct fa_msg * msg; struct shm_du_buff * sdb; - qoscube_t qc; + struct fa_flow * flow; + qoscube_t qc = QOS_CUBE_BE; clock_gettime(PTHREAD_COND_CLOCK, &abstime); + flow = &fa.flows[fd]; + pthread_mutex_lock(&ipcpi.alloc_lock); while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) { @@ -463,33 +526,31 @@ int fa_alloc_resp(int fd, pthread_mutex_unlock(&ipcpi.alloc_lock); if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) { - destroy_conn(fd); + fa_flow_fini(flow); return -1; } + msg = (struct fa_msg *) shm_du_buff_head(sdb); + memset(msg, 0, sizeof(*msg)); + pthread_rwlock_wrlock(&fa.flows_lock); - msg = (struct fa_msg *) shm_du_buff_head(sdb); msg->code = FLOW_REPLY; - msg->r_eid = hton32(fa.r_eid[fd]); + msg->r_eid = hton32(flow->r_eid); msg->s_eid = hton32(fd); msg->response = response; memcpy(msg + 1, data, len); if (response < 0) { - destroy_conn(fd); + fa_flow_fini(flow); ipcp_sdb_release(sdb); } else { psched_add(fa.psched, fd); } - ipcp_flow_get_qoscube(fd, &qc); - - assert(qc >= 0 && qc < QOS_CUBE_MAX); - - if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) { - destroy_conn(fd); + if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { + fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); return -1; @@ -505,11 +566,11 @@ int fa_dealloc(int fd) if (ipcp_flow_fini(fd) < 0) return 0; - pthread_rwlock_wrlock(&fa.flows_lock); - psched_del(fa.psched, fd); - destroy_conn(fd); + pthread_rwlock_wrlock(&fa.flows_lock); + + fa_flow_fini(&fa.flows[fd]); pthread_rwlock_unlock(&fa.flows_lock); @@ -517,3 +578,60 @@ int fa_dealloc(int fd) return 0; } + +static int fa_update_remote(int fd, + uint16_t ece) +{ + struct fa_msg * msg; + struct shm_du_buff * sdb; + qoscube_t qc = QOS_CUBE_BE; + struct fa_flow * flow; + + if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { + return -1; + } + + msg = (struct fa_msg *) shm_du_buff_head(sdb); + + memset(msg, 0, sizeof(*msg)); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + msg->code = FLOW_UPDATE; + msg->r_eid = hton32(flow->r_eid); + msg->ece = hton16(ece); + + if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { + pthread_rwlock_unlock(&fa.flows_lock); + ipcp_sdb_release(sdb); + return -1; + } + + pthread_rwlock_unlock(&fa.flows_lock); + + + return 0; +} + +void fa_ecn_update(int eid, + uint8_t ecn, + size_t len) +{ + struct fa_flow * flow; + bool update; + uint16_t ece; + + flow = &fa.flows[eid]; + + pthread_rwlock_wrlock(&fa.flows_lock); + + update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); + + pthread_rwlock_unlock(&fa.flows_lock); + + if (update) + fa_update_remote(eid, ece); + +} diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index 12a10a0c..daba2a51 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -47,4 +47,8 @@ int fa_alloc_resp(int fd, int fa_dealloc(int fd); +void fa_ecn_update(int eid, + uint8_t ecn, + size_t len); + #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */ diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c index 0ab37d25..1b2cc14e 100644 --- a/src/ipcpd/unicast/main.c +++ b/src/ipcpd/unicast/main.c @@ -39,6 +39,7 @@ #include <ouroboros/time_utils.h> #include "addr_auth.h" +#include "ca.h" #include "connmgr.h" #include "dir.h" #include "dt.h" @@ -83,6 +84,11 @@ static int initialize_components(const struct ipcp_config * conf) log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr); + if (ca_init(conf->cong_avoid)) { + log_err("Failed to initialize congestion avoidance."); + goto fail_ca; + } + if (dt_init(conf->routing_type, conf->addr_size, conf->eid_size, @@ -110,6 +116,8 @@ static int initialize_components(const struct ipcp_config * conf) fail_fa: dt_fini(); fail_dt: + ca_fini(); + fail_ca: addr_auth_fini(); fail_addr_auth: free(ipcpi.layer_name); @@ -125,6 +133,8 @@ static void finalize_components(void) dt_fini(); + ca_fini(); + addr_auth_fini(); free(ipcpi.layer_name); diff --git a/src/ipcpd/unicast/pol-ca-ops.h b/src/ipcpd/unicast/pol-ca-ops.h new file mode 100644 index 00000000..3cb8a9d2 --- /dev/null +++ b/src/ipcpd/unicast/pol-ca-ops.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Congestion avoidance policy ops + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H +#define OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H + +#include "ca.h" + +struct pol_ca_ops { + void * (* ctx_create)(void); + + void (* ctx_destroy)(void * ctx); + + ca_wnd_t (* ctx_update_snd)(void * ctx, + size_t len); + + bool (* ctx_update_rcv)(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + + void (* ctx_update_ece)(void * ctx, + uint16_t ece); + + void (* wnd_wait)(ca_wnd_t wnd); + + uint8_t (* calc_ecn)(int fd, + size_t len); +}; + +#endif /* OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H */ diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.c b/src/ipcpd/unicast/pol/ca-mb-ecn.c new file mode 100644 index 00000000..2de8f8e7 --- /dev/null +++ b/src/ipcpd/unicast/pol/ca-mb-ecn.c @@ -0,0 +1,216 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Multi-bit ECN Congestion Avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/time_utils.h> + +#include "ca-mb-ecn.h" + +#include <stdlib.h> +#include <string.h> + +/* congestion avoidance constants */ +#define CA_SHFT 5 +#define CA_WND (1 << CA_SHFT) +#define CA_UPD (1 << (CA_SHFT - 3)) +#define CA_SLOT 18 +#define CA_AI 20000 +#define ECN_Q_SHFT 5 +#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec) + +struct mb_ecn_ctx { + uint16_t rx_ece; /* level of congestion (upstream) */ + size_t rx_ctr; /* receiver side packet counter */ + + uint16_t tx_ece; /* level of congestion (downstream) */ + size_t tx_ctr; /* sender side packet counter */ + size_t tx_aps; /* average packet size */ + time_t tx_wnd; /* tgt time to send packets (ns) */ + bool tx_cav; /* Congestion avoidance */ + size_t tx_slot; + + struct timespec t_sent; /* last sent packet */ +}; + +struct pol_ca_ops mb_ecn_ca_ops = { + .ctx_create = mb_ecn_ctx_create, + .ctx_destroy = mb_ecn_ctx_destroy, + .ctx_update_snd = mb_ecn_ctx_update_snd, + .ctx_update_rcv = mb_ecn_ctx_update_rcv, + .ctx_update_ece = mb_ecn_ctx_update_ece, + .wnd_wait = mb_ecn_wnd_wait, + .calc_ecn = mb_ecn_calc_ecn +}; + +void * mb_ecn_ctx_create(void) +{ + + struct mb_ecn_ctx * ctx; + + ctx = malloc(sizeof(*ctx)); + if (ctx == NULL) + return NULL; + + memset(ctx, 0, sizeof(*ctx)); + + return (void *) ctx; +} + +void mb_ecn_ctx_destroy(void * ctx) +{ + free(ctx); +} + +ca_wnd_t mb_ecn_ctx_update_snd(void * _ctx, + size_t len) +{ + struct timespec now; + size_t slot; + time_t gap; + ca_wnd_t wnd; + + struct mb_ecn_ctx * ctx = _ctx; + + clock_gettime(PTHREAD_COND_CLOCK, &now); + + if (ctx->tx_wnd == 0) { /* 10 ms initial window estimate */ + ctx->tx_wnd = 10 * MILLION; + gap = ctx->tx_wnd >> CA_SHFT; + ctx->tx_aps = len >> CA_SHFT; + ctx->tx_slot = ts_to_ns(now) >> CA_SLOT; + } else { + gap = ts_diff_ns(&ctx->t_sent, &now); + ctx->tx_aps -= ctx->tx_aps >> CA_SHFT; + ctx->tx_aps += len; + } + + ctx->t_sent = now; + + slot = ts_to_ns(now) >> CA_SLOT; + + ctx->tx_ctr++; + + if (slot - ctx->tx_slot > 0) { + ctx->tx_slot = slot; + + if (ctx->tx_ctr > CA_WND) + ctx->tx_ece = 0; + + /* Slow start */ + if (!ctx->tx_cav) { + ctx->tx_wnd >>= 1; + /* Multiplicative Decrease */ + } else if (ctx->tx_ece) { /* MD */ + ctx->tx_wnd += (ctx->tx_wnd * ctx->tx_ece) + >> (CA_SHFT + 8); + /* Additive Increase */ + } else { + size_t bw = ctx->tx_aps * BILLION / ctx->tx_wnd; + bw += CA_AI; + ctx->tx_wnd = ctx->tx_aps * BILLION / bw; + } + } + + wnd.wait = (ctx->tx_wnd >> CA_SHFT) - gap; + + return wnd; +} + +void mb_ecn_wnd_wait(ca_wnd_t wnd) +{ + if (wnd.wait > 0) { + struct timespec s = {0, 0}; + if (wnd.wait > BILLION) /* Don't care throttling < 1pps */ + s.tv_sec = 1; + else + s.tv_nsec = wnd.wait; + + nanosleep(&s, NULL); + } +} + +bool mb_ecn_ctx_update_rcv(void * _ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + struct mb_ecn_ctx* ctx = _ctx; + bool update; + + (void) len; + + if ((ctx->rx_ece | ecn) == 0) + return false; + + if (ecn == 0) { + /* end of congestion */ + ctx->rx_ece >>= 2; + update = ctx->rx_ece == 0; + } else { + if (ctx->rx_ece == 0) { + /* start of congestion */ + ctx->rx_ece = ecn; + ctx->rx_ctr = 0; + update = true; + } else { + /* congestion update */ + ctx->rx_ece -= ctx->rx_ece >> CA_SHFT; + ctx->rx_ece += ecn; + update = (ctx->rx_ctr++ & (CA_UPD - 1)) == true; + } + } + + *ece = ctx->rx_ece; + + return update; +} + + +void mb_ecn_ctx_update_ece(void * _ctx, + uint16_t ece) +{ + struct mb_ecn_ctx* ctx = _ctx; + + ctx->tx_ece = ece; + ctx->tx_ctr = 0; + ctx->tx_cav = true; +} + +uint8_t mb_ecn_calc_ecn(int fd, + size_t len) +{ + size_t q; + + (void) len; + + q = ipcp_flow_queued(fd); + + return (uint8_t) (q >> ECN_Q_SHFT); +} diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.h b/src/ipcpd/unicast/pol/ca-mb-ecn.h new file mode 100644 index 00000000..456b9b13 --- /dev/null +++ b/src/ipcpd/unicast/pol/ca-mb-ecn.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Multi-bit ECN Congestion Avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H +#define OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H + +#include "pol-ca-ops.h" + +void * mb_ecn_ctx_create(void); + +void mb_ecn_ctx_destroy(void * ctx); + +ca_wnd_t mb_ecn_ctx_update_snd(void * ctx, + size_t len); + +bool mb_ecn_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void mb_ecn_ctx_update_ece(void * ctx, + uint16_t ece); + +void mb_ecn_wnd_wait(ca_wnd_t wnd); + +uint8_t mb_ecn_calc_ecn(int fd, + size_t len); + +extern struct pol_ca_ops mb_ecn_ca_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */ diff --git a/src/ipcpd/unicast/pol/ca-nop.c b/src/ipcpd/unicast/pol/ca-nop.c new file mode 100644 index 00000000..d0d89a2e --- /dev/null +++ b/src/ipcpd/unicast/pol/ca-nop.c @@ -0,0 +1,93 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Dummy Congestion Avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#include "ca-nop.h" + +#include <string.h> + +struct pol_ca_ops nop_ca_ops = { + .ctx_create = nop_ctx_create, + .ctx_destroy = nop_ctx_destroy, + .ctx_update_snd = nop_ctx_update_snd, + .ctx_update_rcv = nop_ctx_update_rcv, + .ctx_update_ece = nop_ctx_update_ece, + .wnd_wait = nop_wnd_wait, + .calc_ecn = nop_calc_ecn +}; + +void * nop_ctx_create(void) +{ + return (void *) 1; +} + +void nop_ctx_destroy(void * ctx) +{ + (void) ctx; +} + +ca_wnd_t nop_ctx_update_snd(void * ctx, + size_t len) +{ + ca_wnd_t wnd; + + (void) ctx; + (void) len; + + memset(&wnd, 0, sizeof(wnd)); + + return wnd; +} + +void nop_wnd_wait(ca_wnd_t wnd) +{ + (void) wnd; +} + +bool nop_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece) +{ + (void) ctx; + (void) len; + (void) ecn; + (void) ece; + + return false; +} + +void nop_ctx_update_ece(void * ctx, + uint16_t ece) +{ + (void) ctx; + (void) ece; +} + + +uint8_t nop_calc_ecn(int fd, + size_t len) +{ + (void) fd; + (void) len; + + return 0; +} diff --git a/src/ipcpd/unicast/pol/ca-nop.h b/src/ipcpd/unicast/pol/ca-nop.h new file mode 100644 index 00000000..baf649d8 --- /dev/null +++ b/src/ipcpd/unicast/pol/ca-nop.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * Dummy Congestion Avoidance + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IPCPD_UNICAST_CA_NOP_H +#define OUROBOROS_IPCPD_UNICAST_CA_NOP_H + +#include "pol-ca-ops.h" + +void * nop_ctx_create(void); + +void nop_ctx_destroy(void * ctx); + +ca_wnd_t nop_ctx_update_snd(void * ctx, + size_t len); + +bool nop_ctx_update_rcv(void * ctx, + size_t len, + uint8_t ecn, + uint16_t * ece); + +void nop_ctx_update_ece(void * ctx, + uint16_t ece); + +void nop_wnd_wait(ca_wnd_t wnd); + +uint8_t nop_calc_ecn(int fd, + size_t len); + +extern struct pol_ca_ops nop_ca_ops; + +#endif /* OUROBOROS_IPCPD_UNICAST_CA_NOP_H */ diff --git a/src/ipcpd/unicast/pol/link_state.c b/src/ipcpd/unicast/pol/link_state.c index d9482876..ca8a7c50 100644 --- a/src/ipcpd/unicast/pol/link_state.c +++ b/src/ipcpd/unicast/pol/link_state.c @@ -812,8 +812,12 @@ static void handle_event(void * self, switch (event) { case NOTIFY_DT_CONN_ADD: pthread_rwlock_rdlock(&ls.db_lock); + + pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock, + (void *) &ls.db_lock); + send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0); - pthread_rwlock_unlock(&ls.db_lock); + pthread_cleanup_pop(true); if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT)) log_dbg("Failed to add neighbor to LSDB."); |