diff options
author | Dimitri Staessens <[email protected]> | 2019-07-25 12:50:46 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2019-07-29 19:36:45 +0200 |
commit | dae15c284248d49079ad5f8a3d8ff30e217f419e (patch) | |
tree | ea7942e940396c0c78304fef8b43fb25c5aebba8 /src/ipcpd/normal/dt.c | |
parent | c9232acef855b51d1bc199a68c03c0695ac11192 (diff) | |
download | ouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.tar.gz ouroboros-dae15c284248d49079ad5f8a3d8ff30e217f419e.zip |
build: Refactor normal to unicast
This completes the renaming of the normal IPCP to the unicast IPCP in
the sources, to get everything consistent with the documentation.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/ipcpd/normal/dt.c')
-rw-r--r-- | src/ipcpd/normal/dt.c | 913 |
1 files changed, 0 insertions, 913 deletions
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c deleted file mode 100644 index 2fd3c060..00000000 --- a/src/ipcpd/normal/dt.c +++ /dev/null @@ -1,913 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - 2019 - * - * Data Transfer Component - * - * Dimitri Staessens <[email protected]> - * Sander Vrijders <[email protected]> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 as - * published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., http://www.fsf.org/about/contact/. - */ - -#if defined(__linux__) || defined(__CYGWIN__) -#define _DEFAULT_SOURCE -#else -#define _POSIX_C_SOURCE 200112L -#endif - -#include "config.h" - -#define DT "dt" -#define OUROBOROS_PREFIX DT - -#include <ouroboros/bitmap.h> -#include <ouroboros/errno.h> -#include <ouroboros/logs.h> -#include <ouroboros/dev.h> -#include <ouroboros/notifier.h> -#include <ouroboros/rib.h> -#ifdef IPCP_FLOW_STATS -#include <ouroboros/fccntl.h> -#endif - -#include "connmgr.h" -#include "ipcp.h" -#include "dt.h" -#include "pff.h" -#include "routing.h" -#include "psched.h" -#include "comp.h" -#include "fa.h" - -#include <stdlib.h> -#include <stdbool.h> -#include <pthread.h> -#include <string.h> -#include <inttypes.h> -#include <assert.h> - -#define QOS_BLOCK_LEN 672 -#define STAT_FILE_LEN (189 + QOS_BLOCK_LEN * QOS_CUBE_MAX) - -#ifndef CLOCK_REALTIME_COARSE -#define CLOCK_REALTIME_COARSE CLOCK_REALTIME -#endif - -struct comp_info { - void (* post_packet)(void * comp, struct shm_du_buff * sdb); - void * comp; - char * name; -}; - -/* Fixed field lengths */ -#define TTL_LEN 1 -#define QOS_LEN 1 -#define ECN_LEN 1 - -struct dt_pci { - uint64_t dst_addr; - qoscube_t qc; - uint8_t ttl; - uint8_t ecn; - uint32_t eid; -}; - -struct { - uint8_t addr_size; - uint8_t eid_size; - size_t head_size; - - /* Offsets */ - size_t qc_o; - size_t ttl_o; - size_t ecn_o; - size_t eid_o; - - /* Initial TTL value */ - uint8_t max_ttl; -} dt_pci_info; - -static int dt_pci_ser(struct shm_du_buff * sdb, - struct dt_pci * dt_pci) -{ - uint8_t * head; - uint8_t ttl = dt_pci_info.max_ttl; - - assert(sdb); - assert(dt_pci); - - head = shm_du_buff_head_alloc(sdb, dt_pci_info.head_size); - if (head == NULL) - return -EPERM; - - /* FIXME: Add check and operations for Big Endian machines. */ - memcpy(head, &dt_pci->dst_addr, dt_pci_info.addr_size); - memcpy(head + dt_pci_info.qc_o, &dt_pci->qc, QOS_LEN); - memcpy(head + dt_pci_info.ttl_o, &ttl, TTL_LEN); - memcpy(head + dt_pci_info.ecn_o, &dt_pci->ecn, ECN_LEN); - memcpy(head + dt_pci_info.eid_o, &dt_pci->eid, dt_pci_info.eid_size); - - return 0; -} - -static void dt_pci_des(struct shm_du_buff * sdb, - struct dt_pci * dt_pci) -{ - uint8_t * head; - - assert(sdb); - assert(dt_pci); - - head = shm_du_buff_head(sdb); - - /* Decrease TTL */ - --*(head + dt_pci_info.ttl_o); - - /* FIXME: Add check and operations for Big Endian machines. */ - memcpy(&dt_pci->dst_addr, head, dt_pci_info.addr_size); - memcpy(&dt_pci->qc, head + dt_pci_info.qc_o, QOS_LEN); - memcpy(&dt_pci->ttl, head + dt_pci_info.ttl_o, TTL_LEN); - memcpy(&dt_pci->ecn, head + dt_pci_info.ecn_o, ECN_LEN); - memcpy(&dt_pci->eid, head + dt_pci_info.eid_o, dt_pci_info.eid_size); -} - -static void dt_pci_shrink(struct shm_du_buff * sdb) -{ - assert(sdb); - - shm_du_buff_head_release(sdb, dt_pci_info.head_size); -} - -struct { - struct psched * psched; - - struct pff * pff[QOS_CUBE_MAX]; - struct routing_i * routing[QOS_CUBE_MAX]; -#ifdef IPCP_FLOW_STATS - struct { - time_t stamp; - uint64_t addr; - size_t snd_pkt[QOS_CUBE_MAX]; - size_t rcv_pkt[QOS_CUBE_MAX]; - size_t snd_bytes[QOS_CUBE_MAX]; - size_t rcv_bytes[QOS_CUBE_MAX]; - size_t lcl_r_pkt[QOS_CUBE_MAX]; - size_t lcl_r_bytes[QOS_CUBE_MAX]; - size_t lcl_w_pkt[QOS_CUBE_MAX]; - size_t lcl_w_bytes[QOS_CUBE_MAX]; - size_t r_drp_pkt[QOS_CUBE_MAX]; - size_t r_drp_bytes[QOS_CUBE_MAX]; - size_t w_drp_pkt[QOS_CUBE_MAX]; - size_t w_drp_bytes[QOS_CUBE_MAX]; - size_t f_nhp_pkt[QOS_CUBE_MAX]; - size_t f_nhp_bytes[QOS_CUBE_MAX]; - pthread_mutex_t lock; - } stat[PROG_MAX_FLOWS]; - - size_t n_flows; -#endif - struct bmp * res_fds; - struct comp_info comps[PROG_RES_FDS]; - pthread_rwlock_t lock; - - pthread_t listener; -} dt; - -static int dt_stat_read(const char * path, - char * buf, - size_t len) -{ -#ifdef IPCP_FLOW_STATS - int fd; - int i; - char str[QOS_BLOCK_LEN + 1]; - char addrstr[20]; - char tmstr[20]; - size_t rxqlen = 0; - size_t txqlen = 0; - struct tm * tm; - - /* NOTE: we may need stronger checks. */ - fd = atoi(path); - - if (len < STAT_FILE_LEN) - return 0; - - buf[0] = '\0'; - - pthread_mutex_lock(&dt.stat[fd].lock); - - if (dt.stat[fd].stamp == 0) { - pthread_mutex_unlock(&dt.stat[fd].lock); - return 0; - } - - if (dt.stat[fd].addr == ipcpi.dt_addr) - sprintf(addrstr, "%s", dt.comps[fd].name); - else - sprintf(addrstr, "%" PRIu64, dt.stat[fd].addr); - - tm = localtime(&dt.stat[fd].stamp); - strftime(tmstr, sizeof(tmstr), "%F %T", tm); - - if (fd >= PROG_RES_FDS) { - fccntl(fd, FLOWGRXQLEN, &rxqlen); - fccntl(fd, FLOWGTXQLEN, &txqlen); - } - - sprintf(buf, - "Flow established at: %20s\n" - "Endpoint address: %20s\n" - "Queued packets (rx): %20zu\n" - "Queued packets (tx): %20zu\n\n", - tmstr, addrstr, rxqlen, txqlen); - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - sprintf(str, - "Qos cube %3d:\n" - " sent (packets): %20zu\n" - " sent (bytes): %20zu\n" - " rcvd (packets): %20zu\n" - " rcvd (bytes): %20zu\n" - " local sent (packets): %20zu\n" - " local sent (bytes): %20zu\n" - " local rcvd (packets): %20zu\n" - " local rcvd (bytes): %20zu\n" - " dropped ttl (packets): %20zu\n" - " dropped ttl (bytes): %20zu\n" - " failed writes (packets): %20zu\n" - " failed writes (bytes): %20zu\n" - " failed nhop (packets): %20zu\n" - " failed nhop (bytes): %20zu\n", - i, - dt.stat[fd].snd_pkt[i], - dt.stat[fd].snd_bytes[i], - dt.stat[fd].rcv_pkt[i], - dt.stat[fd].rcv_bytes[i], - dt.stat[fd].lcl_w_pkt[i], - dt.stat[fd].lcl_w_bytes[i], - dt.stat[fd].lcl_r_pkt[i], - dt.stat[fd].lcl_r_bytes[i], - dt.stat[fd].r_drp_pkt[i], - dt.stat[fd].r_drp_bytes[i], - dt.stat[fd].w_drp_pkt[i], - dt.stat[fd].w_drp_bytes[i], - dt.stat[fd].f_nhp_pkt[i], - dt.stat[fd].f_nhp_bytes[i] - ); - strcat(buf, str); - } - - pthread_mutex_unlock(&dt.stat[fd].lock); - - return STAT_FILE_LEN; -#else - (void) path; - (void) buf; - (void) len; - return 0; -#endif -} - -static int dt_stat_readdir(char *** buf) -{ -#ifdef IPCP_FLOW_STATS - char entry[RIB_PATH_LEN + 1]; - size_t i; - int idx = 0; - - pthread_rwlock_rdlock(&dt.lock); - - if (dt.n_flows < 1) { - pthread_rwlock_unlock(&dt.lock); - return 0; - } - - *buf = malloc(sizeof(**buf) * dt.n_flows); - if (*buf == NULL) { - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } - - for (i = 0; i < PROG_MAX_FLOWS; ++i) { - pthread_mutex_lock(&dt.stat[i].lock); - - if (dt.stat[i].stamp == 0) { - pthread_mutex_unlock(&dt.stat[i].lock); - /* Optimization: skip unused res_fds. */ - if (i < PROG_RES_FDS) - i = PROG_RES_FDS; - continue; - } - - sprintf(entry, "%zu", i); - - (*buf)[idx] = malloc(strlen(entry) + 1); - if ((*buf)[idx] == NULL) { - while (idx-- > 0) - free((*buf)[idx]); - free(buf); - pthread_mutex_unlock(&dt.stat[i].lock); - pthread_rwlock_unlock(&dt.lock); - return -ENOMEM; - } - - strcpy((*buf)[idx++], entry); - - pthread_mutex_unlock(&dt.stat[i].lock); - } - - pthread_rwlock_unlock(&dt.lock); - - assert((size_t) idx == dt.n_flows); - - return idx; -#else - (void) buf; - return 0; -#endif -} - -static int dt_stat_getattr(const char * path, - struct stat * st) -{ -#ifdef IPCP_FLOW_STATS - int fd; - - fd = atoi(path); - - st->st_mode = S_IFREG | 0755; - st->st_nlink = 1; - st->st_uid = getuid(); - st->st_gid = getgid(); - - pthread_mutex_lock(&dt.stat[fd].lock); - - if (dt.stat[fd].stamp != -1) { - st->st_size = STAT_FILE_LEN; - st->st_mtime = dt.stat[fd].stamp; - } else { - st->st_size = 0; - st->st_mtime = 0; - } - - pthread_mutex_unlock(&dt.stat[fd].lock); -#else - (void) path; - (void) st; -#endif - return 0; -} - -static struct rib_ops r_ops = { - .read = dt_stat_read, - .readdir = dt_stat_readdir, - .getattr = dt_stat_getattr -}; - -#ifdef IPCP_FLOW_STATS - -static void stat_used(int fd, - uint64_t addr) -{ - struct timespec now; - - clock_gettime(CLOCK_REALTIME_COARSE, &now); - - pthread_mutex_lock(&dt.stat[fd].lock); - - memset(&dt.stat[fd], 0, sizeof(dt.stat[fd])); - - dt.stat[fd].stamp = (addr != INVALID_ADDR) ? now.tv_sec : 0; - dt.stat[fd].addr = addr; - - pthread_mutex_unlock(&dt.stat[fd].lock); - - pthread_rwlock_wrlock(&dt.lock); - - (addr != INVALID_ADDR) ? ++dt.n_flows : --dt.n_flows; - - pthread_rwlock_unlock(&dt.lock); -} -#endif - -static void handle_event(void * self, - int event, - const void * o) -{ - struct conn * c; - - (void) self; - - c = (struct conn *) o; - - switch (event) { - case NOTIFY_DT_CONN_ADD: -#ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, c->conn_info.addr); -#endif - psched_add(dt.psched, c->flow_info.fd); - log_dbg("Added fd %d to packet scheduler.", c->flow_info.fd); - break; - case NOTIFY_DT_CONN_DEL: -#ifdef IPCP_FLOW_STATS - stat_used(c->flow_info.fd, INVALID_ADDR); -#endif - psched_del(dt.psched, c->flow_info.fd); - log_dbg("Removed fd %d from " - "packet scheduler.", c->flow_info.fd); - break; - default: - break; - } -} - -static void packet_handler(int fd, - qoscube_t qc, - struct shm_du_buff * sdb) -{ - struct dt_pci dt_pci; - int ret; - int ofd; -#ifdef IPCP_FLOW_STATS - size_t len; -#else - (void) fd; -#endif - -#ifdef IPCP_FLOW_STATS - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); -#endif - memset(&dt_pci, 0, sizeof(dt_pci)); - dt_pci_des(sdb, &dt_pci); - if (dt_pci.dst_addr != ipcpi.dt_addr) { - if (dt_pci.ttl == 0) { - log_dbg("TTL was zero."); - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - ++dt.stat[fd].r_drp_pkt[qc]; - dt.stat[fd].r_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); -#endif - return; - } - - /* FIXME: Use qoscube from PCI instead of incoming flow. */ - ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); - if (ofd < 0) { - log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr); - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - ++dt.stat[fd].f_nhp_pkt[qc]; - dt.stat[fd].f_nhp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); -#endif - return; - } - - ret = ipcp_flow_write(ofd, sdb); - if (ret < 0) { - log_dbg("Failed to write packet to fd %d.", ofd); - if (ret == -EFLOWDOWN) - notifier_event(NOTIFY_DT_FLOW_DOWN, &ofd); - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); - pthread_mutex_lock(&dt.stat[ofd].lock); - - ++dt.stat[ofd].w_drp_pkt[qc]; - dt.stat[ofd].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[ofd].lock); -#endif - return; - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); - pthread_mutex_lock(&dt.stat[ofd].lock); - - ++dt.stat[ofd].snd_pkt[qc]; - dt.stat[ofd].snd_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[ofd].lock); -#endif - } else { - dt_pci_shrink(sdb); - if (dt_pci.eid >= PROG_RES_FDS) { - if (ipcp_flow_write(dt_pci.eid, sdb)) { - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - pthread_mutex_unlock(&dt.stat[fd].lock); - - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].rcv_pkt[qc]; - dt.stat[dt_pci.eid].rcv_bytes[qc] += len; - ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; - dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - return; - } - - if (dt.comps[dt_pci.eid].post_packet == NULL) { - log_err("No registered component on eid %d.", - dt_pci.eid); - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - return; - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[fd].lock); - - ++dt.stat[fd].rcv_pkt[qc]; - dt.stat[fd].rcv_bytes[qc] += len; - ++dt.stat[fd].lcl_r_pkt[qc]; - dt.stat[fd].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].snd_pkt[qc]; - dt.stat[dt_pci.eid].snd_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - dt.comps[dt_pci.eid].post_packet(dt.comps[dt_pci.eid].comp, - sdb); - } -} - -static void * dt_conn_handle(void * o) -{ - struct conn conn; - - (void) o; - - while (true) { - if (connmgr_wait(COMPID_DT, &conn)) { - log_err("Failed to get next DT connection."); - continue; - } - - /* NOTE: connection acceptance policy could be here. */ - - notifier_event(NOTIFY_DT_CONN_ADD, &conn); - } - - return 0; -} - -int dt_init(enum pol_routing pr, - enum pol_pff pp, - uint8_t addr_size, - uint8_t eid_size, - uint8_t max_ttl) -{ - int i; - int j; - char dtstr[256]; - struct conn_info info; - - memset(&info, 0, sizeof(info)); - - strcpy(info.comp_name, DT_COMP); - strcpy(info.protocol, DT_PROTO); - info.pref_version = 1; - info.pref_syntax = PROTO_FIXED; - info.addr = ipcpi.dt_addr; - - dt_pci_info.addr_size = addr_size; - dt_pci_info.eid_size = eid_size; - dt_pci_info.max_ttl = max_ttl; - - dt_pci_info.qc_o = dt_pci_info.addr_size; - dt_pci_info.ttl_o = dt_pci_info.qc_o + QOS_LEN; - dt_pci_info.ecn_o = dt_pci_info.ttl_o + TTL_LEN; - dt_pci_info.eid_o = dt_pci_info.ecn_o + ECN_LEN; - dt_pci_info.head_size = dt_pci_info.eid_o + dt_pci_info.eid_size; - - if (notifier_reg(handle_event, NULL)) { - log_err("Failed to register with notifier."); - goto fail_notifier_reg; - } - - if (connmgr_comp_init(COMPID_DT, &info)) { - log_err("Failed to register with connmgr."); - goto fail_connmgr_comp_init; - } - - if (routing_init(pr)) { - log_err("Failed to init routing."); - goto fail_routing; - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - dt.pff[i] = pff_create(pp); - if (dt.pff[i] == NULL) { - log_err("Failed to create a PFF."); - for (j = 0; j < i; ++j) - pff_destroy(dt.pff[j]); - goto fail_pff; - } - } - - for (i = 0; i < QOS_CUBE_MAX; ++i) { - dt.routing[i] = routing_i_create(dt.pff[i]); - if (dt.routing[i] == NULL) { - for (j = 0; j < i; ++j) - routing_i_destroy(dt.routing[j]); - goto fail_routing_i; - } - } - - if (pthread_rwlock_init(&dt.lock, NULL)) { - log_err("Failed to init rwlock."); - goto fail_rwlock_init; - } - - dt.res_fds = bmp_create(PROG_RES_FDS, 0); - if (dt.res_fds == NULL) - goto fail_res_fds; -#ifdef IPCP_FLOW_STATS - memset(dt.stat, 0, sizeof(dt.stat)); - - for (i = 0; i < PROG_MAX_FLOWS; ++i) - if (pthread_mutex_init(&dt.stat[i].lock, NULL)) { - for (j = 0; j < i; ++j) - pthread_mutex_destroy(&dt.stat[j].lock); - goto fail_stat_lock; - } - - dt.n_flows = 0; -#endif - sprintf(dtstr, "%s.%" PRIu64, DT, ipcpi.dt_addr); - if (rib_reg(dtstr, &r_ops)) - goto fail_rib_reg; - - return 0; - - fail_rib_reg: -#ifdef IPCP_FLOW_STATS - for (i = 0; i < PROG_MAX_FLOWS; ++i) - pthread_mutex_destroy(&dt.stat[i].lock); - fail_stat_lock: -#endif - bmp_destroy(dt.res_fds); - fail_res_fds: - pthread_rwlock_destroy(&dt.lock); - fail_rwlock_init: - for (j = 0; j < QOS_CUBE_MAX; ++j) - routing_i_destroy(dt.routing[j]); - fail_routing_i: - for (i = 0; i < QOS_CUBE_MAX; ++i) - pff_destroy(dt.pff[i]); - fail_pff: - routing_fini(); - fail_routing: - connmgr_comp_fini(COMPID_DT); - fail_connmgr_comp_init: - notifier_unreg(&handle_event); - fail_notifier_reg: - return -1; -} - -void dt_fini(void) -{ - int i; - - rib_unreg(DT); -#ifdef IPCP_FLOW_STATS - for (i = 0; i < PROG_MAX_FLOWS; ++i) - pthread_mutex_destroy(&dt.stat[i].lock); -#endif - bmp_destroy(dt.res_fds); - - pthread_rwlock_destroy(&dt.lock); - - for (i = 0; i < QOS_CUBE_MAX; ++i) - routing_i_destroy(dt.routing[i]); - - for (i = 0; i < QOS_CUBE_MAX; ++i) - pff_destroy(dt.pff[i]); - - routing_fini(); - - connmgr_comp_fini(COMPID_DT); - - notifier_unreg(&handle_event); -} - -int dt_start(void) -{ - dt.psched = psched_create(packet_handler); - if (dt.psched == NULL) { - log_err("Failed to create N-1 packet scheduler."); - return -1; - } - - if (pthread_create(&dt.listener, NULL, dt_conn_handle, NULL)) { - log_err("Failed to create listener thread."); - psched_destroy(dt.psched); - return -1; - } - - return 0; -} - -void dt_stop(void) -{ - pthread_cancel(dt.listener); - pthread_join(dt.listener, NULL); - psched_destroy(dt.psched); -} - -int dt_reg_comp(void * comp, - void (* func)(void * func, struct shm_du_buff *), - char * name) -{ - int res_fd; - - assert(func); - - pthread_rwlock_wrlock(&dt.lock); - - res_fd = bmp_allocate(dt.res_fds); - if (!bmp_is_id_valid(dt.res_fds, res_fd)) { - log_warn("Reserved fds depleted."); - pthread_rwlock_unlock(&dt.lock); - return -EBADF; - } - - assert(dt.comps[res_fd].post_packet == NULL); - assert(dt.comps[res_fd].comp == NULL); - assert(dt.comps[res_fd].name == NULL); - - dt.comps[res_fd].post_packet = func; - dt.comps[res_fd].comp = comp; - dt.comps[res_fd].name = name; - - pthread_rwlock_unlock(&dt.lock); -#ifdef IPCP_FLOW_STATS - stat_used(res_fd, ipcpi.dt_addr); -#endif - return res_fd; -} - -int dt_write_packet(uint64_t dst_addr, - qoscube_t qc, - int np1_fd, - struct shm_du_buff * sdb) -{ - int fd; - struct dt_pci dt_pci; - int ret; -#ifdef IPCP_FLOW_STATS - size_t len; -#endif - assert(sdb); - assert(dst_addr != ipcpi.dt_addr); - - fd = pff_nhop(dt.pff[qc], dst_addr); - if (fd < 0) { - log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); -#ifdef IPCP_FLOW_STATS - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); - - pthread_mutex_lock(&dt.stat[np1_fd].lock); - - ++dt.stat[np1_fd].lcl_r_pkt[qc]; - dt.stat[np1_fd].lcl_r_bytes[qc] += len; - ++dt.stat[np1_fd].f_nhp_pkt[qc]; - dt.stat[np1_fd].f_nhp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[np1_fd].lock); -#endif - return -1; - } - - dt_pci.dst_addr = dst_addr; - dt_pci.qc = qc; - dt_pci.eid = np1_fd; - dt_pci.ecn = 0; - - if (dt_pci_ser(sdb, &dt_pci)) { - log_dbg("Failed to serialize PDU."); -#ifdef IPCP_FLOW_STATS - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); -#endif - goto fail_write; - } -#ifdef IPCP_FLOW_STATS - len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); -#endif - ret = ipcp_flow_write(fd, sdb); - if (ret < 0) { - log_dbg("Failed to write packet to fd %d.", fd); - if (ret == -EFLOWDOWN) - notifier_event(NOTIFY_DT_FLOW_DOWN, &fd); - goto fail_write; - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[np1_fd].lock); - - ++dt.stat[np1_fd].lcl_r_pkt[qc]; - dt.stat[np1_fd].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[np1_fd].lock); - pthread_mutex_lock(&dt.stat[fd].lock); - - if (dt_pci.eid < PROG_RES_FDS) { - ++dt.stat[fd].lcl_w_pkt[qc]; - dt.stat[fd].lcl_w_bytes[qc] += len; - } - ++dt.stat[fd].snd_pkt[qc]; - dt.stat[fd].snd_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); -#endif - return 0; - - fail_write: -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[np1_fd].lock); - - ++dt.stat[np1_fd].lcl_w_pkt[qc]; - dt.stat[np1_fd].lcl_w_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[np1_fd].lock); - pthread_mutex_lock(&dt.stat[fd].lock); - - if (dt_pci.eid < PROG_RES_FDS) { - ++dt.stat[fd].lcl_w_pkt[qc]; - dt.stat[fd].lcl_w_bytes[qc] += len; - } - ++dt.stat[fd].w_drp_pkt[qc]; - dt.stat[fd].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[fd].lock); -#endif - return -1; -} |