From 8dc6e2ad8a9ab356be2d746dd680a4113fb0bcbc Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Mon, 28 Nov 2016 15:24:26 +0100 Subject: ipcpd: normal: Add timerwheel to RIB manager The RIB manager now keeps track of ROs it has already received. The identification of a RO it knows is kept in a struct ro id. For the deletion of these RO ids, a timerwheel to the RIB manager. It also deletes ROs if they have a specified timeout. --- src/ipcpd/normal/ribmgr.c | 300 +++++++++++++++++++++++++++++++++------------- 1 file changed, 219 insertions(+), 81 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 49971eda..524c5a39 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -36,6 +36,7 @@ #include #include +#include "timerwheel.h" #include "addr_auth.h" #include "ribmgr.h" #include "dt_const.h" @@ -50,13 +51,16 @@ typedef StaticInfoMsg static_info_msg_t; #include "ro.pb-c.h" typedef RoMsg ro_msg_t; -#define SUBS_SIZE 25 +#define SUBS_SIZE 25 +#define WHEEL_RESOLUTION 1000 /* ms */ +#define WHEEL_DELAY 3600000 /* ms */ +#define RO_ID_TIMEOUT 1000 /* ms */ -#define ENROLLMENT "enrollment" +#define ENROLLMENT "enrollment" -#define RIBMGR_PREFIX "/ribmgr" -#define STAT_INFO "/statinfo" -#define PATH_DELIMITER "/" +#define RIBMGR_PREFIX "/ribmgr" +#define STAT_INFO "/statinfo" +#define PATH_DELIMITER "/" /* RIB objects */ struct rnode { @@ -92,26 +96,37 @@ struct ro_sub { struct list_head next; }; +struct ro_id { + uint64_t seqno; + char * full_name; + struct list_head next; +}; + struct { - struct rnode * root; - pthread_mutex_t ro_lock; + struct rnode * root; + pthread_mutex_t ro_lock; - struct list_head subs; - struct bmp * sids; - pthread_mutex_t subs_lock; - int ribmgr_sid; + struct list_head subs; + struct bmp * sids; + pthread_mutex_t subs_lock; + int ribmgr_sid; - struct dt_const dtc; + struct dt_const dtc; - uint64_t address; + uint64_t address; - struct list_head flows; - pthread_rwlock_t flows_lock; + struct timerwheel * wheel; - struct list_head cdap_reqs; - pthread_mutex_t cdap_reqs_lock; + struct list_head ro_ids; + pthread_mutex_t ro_ids_lock; - struct addr_auth * addr_auth; + struct list_head flows; + pthread_rwlock_t flows_lock; + + struct list_head cdap_reqs; + pthread_mutex_t cdap_reqs_lock; + + struct addr_auth * addr_auth; } rib; int ribmgr_ro_created(const char * name, @@ -229,6 +244,81 @@ static int ro_msg_create(struct rnode * node, return 0; } +static int ribmgr_ro_delete(const char * name) +{ + char * str; + char * str1; + char * saveptr; + char * token; + struct rnode * node; + struct rnode * prev; + bool sibling = false; + + str = strdup(name); + if (str == NULL) + return -1; + + node = rib.root; + prev = NULL; + + for (str1 = str; ; str1 = NULL) { + token = strtok_r(str1, PATH_DELIMITER, &saveptr); + if (token == NULL) + break; + + prev = node; + node = node->child; + sibling = false; + + while (node != NULL) { + if (strcmp(node->name, token) == 0) { + break; + } else { + prev = node; + node = node->sibling; + sibling = true; + } + } + + if (node == NULL) { + free(str); + return -1; + } + } + + if (node == rib.root) { + LOG_ERR("Won't remove root."); + free(str); + return -1; + } + + free(node->name); + free(node->full_name); + if (node->data != NULL) + free(node->data); + + if (sibling) + prev->sibling = node->sibling; + else + prev->child = node->sibling; + + free(node); + free(str); + + LOG_DBG("Deleted RO with name %s.", name); + + return 0; +} + +static void ro_delete_timer(void * o) +{ + char * name = (char *) o; + + if (ribmgr_ro_delete(name)) { + LOG_ERR("Failed to delete %s.", name); + } +} + static struct rnode * ribmgr_ro_create(const char * name, struct ro_props * props, uint8_t * data, @@ -243,6 +333,7 @@ static struct rnode * ribmgr_ro_create(const char * name, struct rnode * new; struct rnode * prev; bool sibling; + int timeout; str = strdup(name); if (str == NULL) @@ -318,71 +409,20 @@ static struct rnode * ribmgr_ro_create(const char * name, new->child = NULL; new->sibling = NULL; - return new; -} - -static int ribmgr_ro_delete(const char * name) -{ - char * str; - char * str1; - char * saveptr; - char * token; - struct rnode * node; - struct rnode * prev; - bool sibling = false; - - str = strdup(name); - if (str == NULL) - return -1; - - node = rib.root; - prev = NULL; - - for (str1 = str; ; str1 = NULL) { - token = strtok_r(str1, PATH_DELIMITER, &saveptr); - if (token == NULL) - break; - - prev = node; - node = node->child; - sibling = false; - - while (node != NULL) { - if (strcmp(node->name, token) == 0) { - break; - } else { - prev = node; - node = node->sibling; - sibling = true; - } - } + LOG_DBG("Created RO with name %s.", name); - if (node == NULL) { - free(str); - return -1; + if (!(props->expiry.tv_sec == 0 && + props->expiry.tv_nsec == 0)) { + timeout = props->expiry.tv_sec * 1000 + + props->expiry.tv_nsec * MILLION; + if (timerwheel_add(rib.wheel, ro_delete_timer, + new->full_name, strlen(new->full_name), + timeout)) { + LOG_ERR("Failed to add deletion timer of RO."); } } - if (node == rib.root) { - LOG_ERR("Won't remove root."); - free(str); - return -1; - } - - free(node->name); - free(node->full_name); - if (node->data != NULL) - free(node->data); - - if (sibling) - prev->sibling = node->sibling; - else - prev->child = node->sibling; - - free(node); - free(str); - - return 0; + return new; } static struct rnode * ribmgr_ro_write(const char * name, @@ -399,7 +439,8 @@ static struct rnode * ribmgr_ro_write(const char * name, node->data = data; node->len = len; - node->seqno++; + + LOG_DBG("Updated RO with name %s.", name); return node; } @@ -494,6 +535,7 @@ int ribmgr_init() INIT_LIST_HEAD(&rib.flows); INIT_LIST_HEAD(&rib.cdap_reqs); INIT_LIST_HEAD(&rib.subs); + INIT_LIST_HEAD(&rib.ro_ids); rib.root = malloc(sizeof(*(rib.root))); if (rib.root == NULL) @@ -533,6 +575,16 @@ int ribmgr_init() return -1; } + if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { + LOG_ERR("Failed to initialize mutex."); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.cdap_reqs_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + free(rib.root); + return -1; + } + rib.sids = bmp_create(SUBS_SIZE, 0); if (rib.sids == NULL) { LOG_ERR("Failed to create bitmap."); @@ -540,6 +592,20 @@ int ribmgr_init() pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); + free(rib.root); + return -1; + } + + rib.wheel = timerwheel_create(WHEEL_RESOLUTION, WHEEL_DELAY); + if (rib.wheel == NULL) { + LOG_ERR("Failed to create timerwheel."); + bmp_destroy(rib.sids); + pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.cdap_reqs_lock); + pthread_mutex_destroy(&rib.ro_lock); + pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); free(rib.root); return -1; } @@ -547,11 +613,13 @@ int ribmgr_init() rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops); if (rib.ribmgr_sid < 0) { LOG_ERR("Failed to subscribe."); + timerwheel_destroy(rib.wheel); bmp_destroy(rib.sids); pthread_rwlock_destroy(&rib.flows_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_mutex_destroy(&rib.subs_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); free(rib.root); return -1; } @@ -608,11 +676,13 @@ int ribmgr_fini() pthread_mutex_unlock(&rib.ro_lock); bmp_destroy(rib.sids); + timerwheel_destroy(rib.wheel); pthread_mutex_destroy(&rib.subs_lock); pthread_mutex_destroy(&rib.cdap_reqs_lock); pthread_mutex_destroy(&rib.ro_lock); pthread_rwlock_destroy(&rib.flows_lock); + pthread_mutex_destroy(&rib.ro_ids_lock); return 0; } @@ -810,6 +880,7 @@ static int ribmgr_cdap_write(struct cdap * instance, cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } + node->seqno = msg->seqno; pthread_mutex_lock(&rib.subs_lock); @@ -859,6 +930,8 @@ static int ribmgr_enrol_sync(struct cdap * instance, return -1; } + LOG_DBG("Syncing RO with name %s.", node->full_name); + if (write_ro_msg(instance, &msg, node->full_name, CDAP_CREATE)) { LOG_ERR("Failed to send RO msg."); @@ -961,6 +1034,51 @@ static int ribmgr_cdap_stop(struct cdap * instance, return 0; } +static void ro_id_delete(void * o) +{ + struct ro_id * ro_id = *((struct ro_id **) o); + + pthread_mutex_lock(&rib.ro_ids_lock); + list_del(&ro_id->next); + free(ro_id->full_name); + free(ro_id); + pthread_mutex_unlock(&rib.ro_ids_lock); +} + +static int ro_id_create(char * name, + ro_msg_t * msg) +{ + struct ro_id * tmp; + + tmp = malloc(sizeof(*tmp)); + if (tmp == NULL) + return -ENOMEM; + + tmp->seqno = msg->seqno; + tmp->full_name = strdup(name); + INIT_LIST_HEAD(&tmp->next); + + if (tmp->full_name == NULL) { + free(tmp); + return -ENOMEM; + } + + pthread_mutex_lock(&rib.ro_ids_lock); + list_add(&tmp->next, &rib.ro_ids); + + if (timerwheel_add(rib.wheel, ro_id_delete, + &tmp, sizeof(tmp), RO_ID_TIMEOUT)) { + LOG_ERR("Failed to add item to timerwheel."); + pthread_mutex_unlock(&rib.ro_ids_lock); + free(tmp->full_name); + free(tmp); + return -1; + } + pthread_mutex_unlock(&rib.ro_ids_lock); + + return 0; +} + static int ribmgr_cdap_request(struct cdap * instance, int invoke_id, enum cdap_opcode opcode, @@ -989,7 +1107,20 @@ static int ribmgr_cdap_request(struct cdap * instance, return -1; } - /* FIXME: Check if we already received this */ + pthread_mutex_lock(&rib.ro_ids_lock); + list_for_each(p, &rib.ro_ids) { + struct ro_id * e = list_entry(p, struct ro_id, next); + + if (strcmp(e->full_name, name) == 0 && + e->seqno == msg->seqno) { + pthread_mutex_unlock(&rib.ro_ids_lock); + ro_msg__free_unpacked(msg, NULL); + cdap_send_reply(instance, invoke_id, 0, NULL, 0); + LOG_DBG("Already received this RO."); + return 0; + } + } + pthread_mutex_unlock(&rib.ro_ids_lock); if (opcode == CDAP_CREATE) { ret = ribmgr_cdap_create(instance, @@ -1008,10 +1139,16 @@ static int ribmgr_cdap_request(struct cdap * instance, name); } else { LOG_INFO("Unsupported opcode received."); + ro_msg__free_unpacked(msg, NULL); cdap_send_reply(instance, invoke_id, -1, NULL, 0); return -1; } + if (ro_id_create(name, msg)) { + LOG_ERR("Failed to create RO id."); + return -1; + } + if (msg->recv_set == ALL_MEMBERS) { pthread_rwlock_rdlock(&rib.flows_lock); list_for_each(p, &rib.flows) { @@ -1364,6 +1501,7 @@ int ro_write(const char * name, LOG_ERR("Failed to create RO."); return -1; } + node->seqno++; if (node->props->recv_set == NO_SYNC) { pthread_mutex_unlock(&rib.ro_lock); -- cgit v1.2.3