summaryrefslogtreecommitdiff
path: root/src/ipcpd/unicast/dt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd/unicast/dt.c')
-rw-r--r--src/ipcpd/unicast/dt.c913
1 files changed, 913 insertions, 0 deletions
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
new file mode 100644
index 00000000..2fd3c060
--- /dev/null
+++ b/src/ipcpd/unicast/dt.c
@@ -0,0 +1,913 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2019
+ *
+ * Data Transfer Component
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+
+#define DT "dt"
+#define OUROBOROS_PREFIX DT
+
+#include <ouroboros/bitmap.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/logs.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/notifier.h>
+#include <ouroboros/rib.h>
+#ifdef IPCP_FLOW_STATS
+#include <ouroboros/fccntl.h>
+#endif
+
+#include "connmgr.h"
+#include "ipcp.h"
+#include "dt.h"
+#include "pff.h"
+#include "routing.h"
+#include "psched.h"
+#include "comp.h"
+#include "fa.h"
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <string.h>
+#include <inttypes.h>
+#include <assert.h>
+
+#define QOS_BLOCK_LEN 672
+#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX)
+
+#ifndef CLOCK_REALTIME_COARSE
+#define CLOCK_REALTIME_COARSE CLOCK_REALTIME
+#endif
+
+struct comp_info {
+ void (* post_packet)(void * comp, struct shm_du_buff * sdb);
+ void * comp;
+ char * name;
+};
+
+/* Fixed field lengths */
+#define TTL_LEN 1
+#define QOS_LEN 1
+#define ECN_LEN 1
+
+struct dt_pci {
+ uint64_t dst_addr;
+ qoscube_t qc;
+ uint8_t ttl;
+ uint8_t ecn;
+ uint32_t eid;
+};
+
+struct {
+ uint8_t addr_size;
+ uint8_t eid_size;
+ size_t head_size;
+
+ /* Offsets */
+ size_t qc_o;
+ size_t ttl_o;
+ size_t ecn_o;
+ size_t eid_o;
+
+ /* Initial TTL value */
+ uint8_t max_ttl;
+} dt_pci_info;
+
+static int dt_pci_ser(struct shm_du_buff * sdb,
+ struct dt_pci * dt_pci)
+{
+ uint8_t * head;
+ uint8_t ttl = dt_pci_info.max_ttl;
+
+ assert(sdb);
+ assert(dt_pci);
+
+ head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size);
+ if (head == NULL)
+ return -EPERM;
+
+ /* FIXME: Add check and operations for Big Endian machines. */
+ memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size);
+ memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN);
+ memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN);
+ memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN);
+ memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size);
+
+ return 0;
+}
+
+static void dt_pci_des(struct shm_du_buff * sdb,
+ struct dt_pci * dt_pci)
+{
+ uint8_t * head;
+
+ assert(sdb);
+ assert(dt_pci);
+
+ head = shm_du_buff_head(sdb);
+
+ /* Decrease TTL */
+ --*(head + dt_pci_info.ttl_o);
+
+ /* FIXME: Add check and operations for Big Endian machines. */
+ memcpy(&dt_pci->dst_addr, head, dt_pci_info.addr_size);
+ memcpy(&dt_pci->qc, head + dt_pci_info.qc_o, QOS_LEN);
+ memcpy(&dt_pci->ttl, head + dt_pci_info.ttl_o, TTL_LEN);
+ memcpy(&dt_pci->ecn, head + dt_pci_info.ecn_o, ECN_LEN);
+ memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size);
+}
+
+static void dt_pci_shrink(struct shm_du_buff * sdb)
+{
+ assert(sdb);
+
+ shm_du_buff_head_release(sdb, dt_pci_info.head_size);
+}
+
+struct {
+ struct psched * psched;
+
+ struct pff * pff[QOS_CUBE_MAX];
+ struct routing_i * routing[QOS_CUBE_MAX];
+#ifdef IPCP_FLOW_STATS
+ struct {
+ time_t stamp;
+ uint64_t addr;
+ size_t snd_pkt[QOS_CUBE_MAX];
+ size_t rcv_pkt[QOS_CUBE_MAX];
+ size_t snd_bytes[QOS_CUBE_MAX];
+ size_t rcv_bytes[QOS_CUBE_MAX];
+ size_t lcl_r_pkt[QOS_CUBE_MAX];
+ size_t lcl_r_bytes[QOS_CUBE_MAX];
+ size_t lcl_w_pkt[QOS_CUBE_MAX];
+ size_t lcl_w_bytes[QOS_CUBE_MAX];
+ size_t r_drp_pkt[QOS_CUBE_MAX];
+ size_t r_drp_bytes[QOS_CUBE_MAX];
+ size_t w_drp_pkt[QOS_CUBE_MAX];
+ size_t w_drp_bytes[QOS_CUBE_MAX];
+ size_t f_nhp_pkt[QOS_CUBE_MAX];
+ size_t f_nhp_bytes[QOS_CUBE_MAX];
+ pthread_mutex_t lock;
+ } stat[PROG_MAX_FLOWS];
+
+ size_t n_flows;
+#endif
+ struct bmp * res_fds;
+ struct comp_info comps[PROG_RES_FDS];
+ pthread_rwlock_t lock;
+
+ pthread_t listener;
+} dt;
+
+static int dt_stat_read(const char * path,
+ char * buf,
+ size_t len)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+ int i;
+ char str[QOS_BLOCK_LEN + 1];
+ char addrstr[20];
+ char tmstr[20];
+ size_t rxqlen = 0;
+ size_t txqlen = 0;
+ struct tm * tm;
+
+ /* NOTE: we may need stronger checks. */
+ fd = atoi(path);
+
+ if (len < STAT_FILE_LEN)
+ return 0;
+
+ buf[0] = '\0';
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ return 0;
+ }
+
+ if (dt.stat[fd].addr == ipcpi.dt_addr)
+ sprintf(addrstr, "%s", dt.comps[fd].name);
+ else
+ sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr);
+
+ tm = localtime(&dt.stat[fd].stamp);
+ strftime(tmstr, sizeof(tmstr), "%F %T", tm);
+
+ if (fd >= PROG_RES_FDS) {
+ fccntl(fd, FLOWGRXQLEN, &rxqlen);
+ fccntl(fd, FLOWGTXQLEN, &txqlen);
+ }
+
+ sprintf(buf,
+ "Flow established at: %20s\n"
+ "Endpoint address: %20s\n"
+ "Queued packets (rx): %20zu\n"
+ "Queued packets (tx): %20zu\n\n",
+ tmstr, addrstr, rxqlen, txqlen);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ sprintf(str,
+ "Qos cube %3d:\n"
+ " sent (packets): %20zu\n"
+ " sent (bytes): %20zu\n"
+ " rcvd (packets): %20zu\n"
+ " rcvd (bytes): %20zu\n"
+ " local sent (packets): %20zu\n"
+ " local sent (bytes): %20zu\n"
+ " local rcvd (packets): %20zu\n"
+ " local rcvd (bytes): %20zu\n"
+ " dropped ttl (packets): %20zu\n"
+ " dropped ttl (bytes): %20zu\n"
+ " failed writes (packets): %20zu\n"
+ " failed writes (bytes): %20zu\n"
+ " failed nhop (packets): %20zu\n"
+ " failed nhop (bytes): %20zu\n",
+ i,
+ dt.stat[fd].snd_pkt[i],
+ dt.stat[fd].snd_bytes[i],
+ dt.stat[fd].rcv_pkt[i],
+ dt.stat[fd].rcv_bytes[i],
+ dt.stat[fd].lcl_w_pkt[i],
+ dt.stat[fd].lcl_w_bytes[i],
+ dt.stat[fd].lcl_r_pkt[i],
+ dt.stat[fd].lcl_r_bytes[i],
+ dt.stat[fd].r_drp_pkt[i],
+ dt.stat[fd].r_drp_bytes[i],
+ dt.stat[fd].w_drp_pkt[i],
+ dt.stat[fd].w_drp_bytes[i],
+ dt.stat[fd].f_nhp_pkt[i],
+ dt.stat[fd].f_nhp_bytes[i]
+ );
+ strcat(buf, str);
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ return STAT_FILE_LEN;
+#else
+ (void) path;
+ (void) buf;
+ (void) len;
+ return 0;
+#endif
+}
+
+static int dt_stat_readdir(char *** buf)
+{
+#ifdef IPCP_FLOW_STATS
+ char entry[RIB_PATH_LEN + 1];
+ size_t i;
+ int idx = 0;
+
+ pthread_rwlock_rdlock(&dt.lock);
+
+ if (dt.n_flows < 1) {
+ pthread_rwlock_unlock(&dt.lock);
+ return 0;
+ }
+
+ *buf = malloc(sizeof(**buf) * dt.n_flows);
+ if (*buf == NULL) {
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ pthread_mutex_lock(&dt.stat[i].lock);
+
+ if (dt.stat[i].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ /* Optimization: skip unused res_fds. */
+ if (i < PROG_RES_FDS)
+ i = PROG_RES_FDS;
+ continue;
+ }
+
+ sprintf(entry, "%zu", i);
+
+ (*buf)[idx] = malloc(strlen(entry) + 1);
+ if ((*buf)[idx] == NULL) {
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(buf);
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ strcpy((*buf)[idx++], entry);
+
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ }
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ assert((size_t) idx == dt.n_flows);
+
+ return idx;
+#else
+ (void) buf;
+ return 0;
+#endif
+}
+
+static int dt_stat_getattr(const char * path,
+ struct stat * st)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+
+ fd = atoi(path);
+
+ st->st_mode = S_IFREG | 0755;
+ st->st_nlink = 1;
+ st->st_uid = getuid();
+ st->st_gid = getgid();
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp != -1) {
+ st->st_size = STAT_FILE_LEN;
+ st->st_mtime = dt.stat[fd].stamp;
+ } else {
+ st->st_size = 0;
+ st->st_mtime = 0;
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#else
+ (void) path;
+ (void) st;
+#endif
+ return 0;
+}
+
+static struct rib_ops r_ops = {
+ .read = dt_stat_read,
+ .readdir = dt_stat_readdir,
+ .getattr = dt_stat_getattr
+};
+
+#ifdef IPCP_FLOW_STATS
+
+static void stat_used(int fd,
+ uint64_t addr)
+{
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ memset(&dt.stat[fd], 0, sizeof(dt.stat[fd]));
+
+ dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0;
+ dt.stat[fd].addr = addr;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ (addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows;
+
+ pthread_rwlock_unlock(&dt.lock);
+}
+#endif
+
+static void handle_event(void * self,
+ int event,
+ const void * o)
+{
+ struct conn * c;
+
+ (void) self;
+
+ c = (struct conn *) o;
+
+ switch (event) {
+ case NOTIFY_DT_CONN_ADD:
+#ifdef IPCP_FLOW_STATS
+ stat_used(c->flow_info.fd, c->conn_info.addr);
+#endif
+ psched_add(dt.psched, c->flow_info.fd);
+ log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd);
+ break;
+ case NOTIFY_DT_CONN_DEL:
+#ifdef IPCP_FLOW_STATS
+ stat_used(c->flow_info.fd, INVALID_ADDR);
+#endif
+ psched_del(dt.psched, c->flow_info.fd);
+ log_dbg("Removed fd %d from "
+ "packet scheduler.", c->flow_info.fd);
+ break;
+ default:
+ break;
+ }
+}
+
+static void packet_handler(int fd,
+ qoscube_t qc,
+ struct shm_du_buff * sdb)
+{
+ struct dt_pci dt_pci;
+ int ret;
+ int ofd;
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#else
+ (void) fd;
+#endif
+
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
+ memset(&dt_pci, 0, sizeof(dt_pci));
+ dt_pci_des(sdb, &dt_pci);
+ if (dt_pci.dst_addr != ipcpi.dt_addr) {
+ if (dt_pci.ttl == 0) {
+ log_dbg("TTL was zero.");
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].r_drp_pkt[qc];
+ dt.stat[fd].r_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return;
+ }
+
+ /* FIXME: Use qoscube from PCI instead of incoming flow. */
+ ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
+ if (ofd < 0) {
+ log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].f_nhp_pkt[qc];
+ dt.stat[fd].f_nhp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return;
+ }
+
+ ret = ipcp_flow_write(ofd, sdb);
+ if (ret < 0) {
+ log_dbg("Failed to write packet to fd %d.", ofd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd);
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].w_drp_pkt[qc];
+ dt.stat[ofd].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
+ return;
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].snd_pkt[qc];
+ dt.stat[ofd].snd_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
+ } else {
+ dt_pci_shrink(sdb);
+ if (dt_pci.eid >= PROG_RES_FDS) {
+ if (ipcp_flow_write(dt_pci.eid, sdb)) {
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].rcv_pkt[qc];
+ dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
+ ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+ return;
+ }
+
+ if (dt.comps[dt_pci.eid].post_packet == NULL) {
+ log_err("No registered component on eid %d.",
+ dt_pci.eid);
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+ return;
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].lcl_r_pkt[qc];
+ dt.stat[fd].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].snd_pkt[qc];
+ dt.stat[dt_pci.eid].snd_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+ dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp,
+ sdb);
+ }
+}
+
+static void * dt_conn_handle(void * o)
+{
+ struct conn conn;
+
+ (void) o;
+
+ while (true) {
+ if (connmgr_wait(COMPID_DT, &conn)) {
+ log_err("Failed to get next DT connection.");
+ continue;
+ }
+
+ /* NOTE: connection acceptance policy could be here. */
+
+ notifier_event(NOTIFY_DT_CONN_ADD, &conn);
+ }
+
+ return 0;
+}
+
+int dt_init(enum pol_routing pr,
+ enum pol_pff pp,
+ uint8_t addr_size,
+ uint8_t eid_size,
+ uint8_t max_ttl)
+{
+ int i;
+ int j;
+ char dtstr[256];
+ struct conn_info info;
+
+ memset(&info, 0, sizeof(info));
+
+ strcpy(info.comp_name, DT_COMP);
+ strcpy(info.protocol, DT_PROTO);
+ info.pref_version = 1;
+ info.pref_syntax = PROTO_FIXED;
+ info.addr = ipcpi.dt_addr;
+
+ dt_pci_info.addr_size = addr_size;
+ dt_pci_info.eid_size = eid_size;
+ dt_pci_info.max_ttl = max_ttl;
+
+ dt_pci_info.qc_o = dt_pci_info.addr_size;
+ dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN;
+ dt_pci_info.ecn_o = dt_pci_info.ttl_o + TTL_LEN;
+ dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN;
+ dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size;
+
+ if (notifier_reg(handle_event, NULL)) {
+ log_err("Failed to register with notifier.");
+ goto fail_notifier_reg;
+ }
+
+ if (connmgr_comp_init(COMPID_DT, &info)) {
+ log_err("Failed to register with connmgr.");
+ goto fail_connmgr_comp_init;
+ }
+
+ if (routing_init(pr)) {
+ log_err("Failed to init routing.");
+ goto fail_routing;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.pff[i] = pff_create(pp);
+ if (dt.pff[i] == NULL) {
+ log_err("Failed to create a PFF.");
+ for (j = 0; j < i; ++j)
+ pff_destroy(dt.pff[j]);
+ goto fail_pff;
+ }
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ dt.routing[i] = routing_i_create(dt.pff[i]);
+ if (dt.routing[i] == NULL) {
+ for (j = 0; j < i; ++j)
+ routing_i_destroy(dt.routing[j]);
+ goto fail_routing_i;
+ }
+ }
+
+ if (pthread_rwlock_init(&dt.lock, NULL)) {
+ log_err("Failed to init rwlock.");
+ goto fail_rwlock_init;
+ }
+
+ dt.res_fds = bmp_create(PROG_RES_FDS, 0);
+ if (dt.res_fds == NULL)
+ goto fail_res_fds;
+#ifdef IPCP_FLOW_STATS
+ memset(dt.stat, 0, sizeof(dt.stat));
+
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ if (pthread_mutex_init(&dt.stat[i].lock, NULL)) {
+ for (j = 0; j < i; ++j)
+ pthread_mutex_destroy(&dt.stat[j].lock);
+ goto fail_stat_lock;
+ }
+
+ dt.n_flows = 0;
+#endif
+ sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr);
+ if (rib_reg(dtstr, &r_ops))
+ goto fail_rib_reg;
+
+ return 0;
+
+ fail_rib_reg:
+#ifdef IPCP_FLOW_STATS
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+ fail_stat_lock:
+#endif
+ bmp_destroy(dt.res_fds);
+ fail_res_fds:
+ pthread_rwlock_destroy(&dt.lock);
+ fail_rwlock_init:
+ for (j = 0; j < QOS_CUBE_MAX; ++j)
+ routing_i_destroy(dt.routing[j]);
+ fail_routing_i:
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+ fail_pff:
+ routing_fini();
+ fail_routing:
+ connmgr_comp_fini(COMPID_DT);
+ fail_connmgr_comp_init:
+ notifier_unreg(&handle_event);
+ fail_notifier_reg:
+ return -1;
+}
+
+void dt_fini(void)
+{
+ int i;
+
+ rib_unreg(DT);
+#ifdef IPCP_FLOW_STATS
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+#endif
+ bmp_destroy(dt.res_fds);
+
+ pthread_rwlock_destroy(&dt.lock);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ routing_i_destroy(dt.routing[i]);
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i)
+ pff_destroy(dt.pff[i]);
+
+ routing_fini();
+
+ connmgr_comp_fini(COMPID_DT);
+
+ notifier_unreg(&handle_event);
+}
+
+int dt_start(void)
+{
+ dt.psched = psched_create(packet_handler);
+ if (dt.psched == NULL) {
+ log_err("Failed to create N-1 packet scheduler.");
+ return -1;
+ }
+
+ if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) {
+ log_err("Failed to create listener thread.");
+ psched_destroy(dt.psched);
+ return -1;
+ }
+
+ return 0;
+}
+
+void dt_stop(void)
+{
+ pthread_cancel(dt.listener);
+ pthread_join(dt.listener, NULL);
+ psched_destroy(dt.psched);
+}
+
+int dt_reg_comp(void * comp,
+ void (* func)(void * func, struct shm_du_buff *),
+ char * name)
+{
+ int res_fd;
+
+ assert(func);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ res_fd = bmp_allocate(dt.res_fds);
+ if (!bmp_is_id_valid(dt.res_fds, res_fd)) {
+ log_warn("Reserved fds depleted.");
+ pthread_rwlock_unlock(&dt.lock);
+ return -EBADF;
+ }
+
+ assert(dt.comps[res_fd].post_packet == NULL);
+ assert(dt.comps[res_fd].comp == NULL);
+ assert(dt.comps[res_fd].name == NULL);
+
+ dt.comps[res_fd].post_packet = func;
+ dt.comps[res_fd].comp = comp;
+ dt.comps[res_fd].name = name;
+
+ pthread_rwlock_unlock(&dt.lock);
+#ifdef IPCP_FLOW_STATS
+ stat_used(res_fd, ipcpi.dt_addr);
+#endif
+ return res_fd;
+}
+
+int dt_write_packet(uint64_t dst_addr,
+ qoscube_t qc,
+ int np1_fd,
+ struct shm_du_buff * sdb)
+{
+ int fd;
+ struct dt_pci dt_pci;
+ int ret;
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#endif
+ assert(sdb);
+ assert(dst_addr != ipcpi.dt_addr);
+
+ fd = pff_nhop(dt.pff[qc], dst_addr);
+ if (fd < 0) {
+ log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ pthread_mutex_lock(&dt.stat[np1_fd].lock);
+
+ ++dt.stat[np1_fd].lcl_r_pkt[qc];
+ dt.stat[np1_fd].lcl_r_bytes[qc] += len;
+ ++dt.stat[np1_fd].f_nhp_pkt[qc];
+ dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+#endif
+ return -1;
+ }
+
+ dt_pci.dst_addr = dst_addr;
+ dt_pci.qc = qc;
+ dt_pci.eid = np1_fd;
+ dt_pci.ecn = 0;
+
+ if (dt_pci_ser(sdb, &dt_pci)) {
+ log_dbg("Failed to serialize PDU.");
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
+ goto fail_write;
+ }
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
+ ret = ipcp_flow_write(fd, sdb);
+ if (ret < 0) {
+ log_dbg("Failed to write packet to fd %d.", fd);
+ if (ret == -EFLOWDOWN)
+ notifier_event(NOTIFY_DT_FLOW_DOWN, &fd);
+ goto fail_write;
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[np1_fd].lock);
+
+ ++dt.stat[np1_fd].lcl_r_pkt[qc];
+ dt.stat[np1_fd].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt_pci.eid < PROG_RES_FDS) {
+ ++dt.stat[fd].lcl_w_pkt[qc];
+ dt.stat[fd].lcl_w_bytes[qc] += len;
+ }
+ ++dt.stat[fd].snd_pkt[qc];
+ dt.stat[fd].snd_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return 0;
+
+ fail_write:
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[np1_fd].lock);
+
+ ++dt.stat[np1_fd].lcl_w_pkt[qc];
+ dt.stat[np1_fd].lcl_w_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt_pci.eid < PROG_RES_FDS) {
+ ++dt.stat[fd].lcl_w_pkt[qc];
+ dt.stat[fd].lcl_w_bytes[qc] += len;
+ }
+ ++dt.stat[fd].w_drp_pkt[qc];
+ dt.stat[fd].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return -1;
+}