diff options
author | dimitri staessens <[email protected]> | 2017-08-25 17:19:17 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-08-28 15:24:16 +0200 |
commit | 176698e8c2fd7ab8007b8074515d6144e7177d8e (patch) | |
tree | 6fa097b57aaafe3143b1f17e528479aa0624a005 /src/ipcpd/normal/connmgr.c | |
parent | a4bff697871c8cc7252d029d77b180e41e821f7e (diff) | |
download | ouroboros-176698e8c2fd7ab8007b8074515d6144e7177d8e.tar.gz ouroboros-176698e8c2fd7ab8007b8074515d6144e7177d8e.zip |
ipcpd: Deprecate gam as autonomous component
The graph adjacency manager has been deprecated in favor of providing
an external interface into the connectivity manager so that
adjacencies can be controlled from the command line, user scripts or
user applications.
The gam and its associated policies were removed from the normal IPCP
and the IRM configuration tools. The "/members" part of the RIB was
deprecated. Removal of the gam means that initial connectivity based
on changes in the RIB can't be provided, so some changes were
required throughout the normal IPCP.
The enrollment procedure was revised to establish its own
connectivity. First, it gets boot information from a peer by
establishing a connection to the remote enrollment component and
downloading the IPCP configuratoin. This is now done using its own
protocol buffers message in anticipation of deprecation of the RIB and
CDAP for communication within a DIF.
After the boot information is downloaded, it establishes a data
transfer flow for enrolling the directory (DHT). After the DHT has
enrolled, it signals the peer to that enrollment is done, and the data
transfer connection is torn down.
Signaling connections is done via the nbs struct, which is now passed
to the connmgr, which enables control of the connectivity graph from
external sources.
Diffstat (limited to 'src/ipcpd/normal/connmgr.c')
-rw-r--r-- | src/ipcpd/normal/connmgr.c | 295 |
1 files changed, 141 insertions, 154 deletions
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c index a83d71c3..b6e5e31a 100644 --- a/src/ipcpd/normal/connmgr.c +++ b/src/ipcpd/normal/connmgr.c @@ -22,7 +22,7 @@ #define _POSIX_C_SOURCE 200112L -#define OUROBOROS_PREFIX "normal-ipcp" +#define OUROBOROS_PREFIX "Connection manager" #include <ouroboros/dev.h> #include <ouroboros/cacep.h> @@ -42,57 +42,50 @@ #include <stdlib.h> #include <assert.h> +enum connmgr_state { + CONNMGR_NULL = 0, + CONNMGR_INIT, + CONNMGR_RUNNING +}; + struct ae_conn { struct list_head next; struct conn conn; }; struct ae { - struct list_head next; + struct nbs * nbs; struct conn_info info; - struct list_head conn_list; - pthread_cond_t conn_cond; - pthread_mutex_t conn_lock; + struct list_head conns; + pthread_cond_t cond; + pthread_mutex_t lock; }; struct { - pthread_t acceptor; + struct ae aes[AEID_MAX]; + enum connmgr_state state; - struct list_head aes; - pthread_rwlock_t aes_lock; + pthread_t acceptor; } connmgr; - -static int get_info_by_name(const char * name, - struct conn_info * info) +static int get_id_by_name(const char * name) { - struct list_head * p; + enum ae_id i; - pthread_rwlock_rdlock(&connmgr.aes_lock); - - list_for_each(p, &connmgr.aes) { - struct ae * ae = list_entry(p, struct ae, next); - if (strcmp(ae->info.ae_name, name) == 0) { - memcpy(info, &ae->info, sizeof(*info)); - pthread_rwlock_unlock(&connmgr.aes_lock); - return 0; - } - } - - pthread_rwlock_unlock(&connmgr.aes_lock); + for (i = 0; i < AEID_MAX; ++i) + if (strcmp(name, connmgr.aes[i].info.ae_name) == 0) + return i; return -1; } -static int add_ae_conn(const char * name, +static int add_ae_conn(enum ae_id id, int fd, qosspec_t qs, struct conn_info * rcv_info) { struct ae_conn * ae_conn; - struct list_head * p; - struct ae * ae = NULL; ae_conn = malloc(sizeof(*ae_conn)); if (ae_conn == NULL) { @@ -106,28 +99,12 @@ static int add_ae_conn(const char * name, list_head_init(&ae_conn->next); - pthread_rwlock_wrlock(&connmgr.aes_lock); + pthread_mutex_lock(&connmgr.aes[id].lock); - list_for_each(p, &connmgr.aes) { - ae = list_entry(p, struct ae, next); - if (strcmp(ae->info.ae_name, name) == 0) - break; - } + list_add(&ae_conn->next, &connmgr.aes[id].conns); + pthread_cond_signal(&connmgr.aes[id].cond); - /* Check if entry was removed during allocation. */ - if (ae == NULL || strcmp(ae->info.ae_name, name) != 0) { - pthread_rwlock_unlock(&connmgr.aes_lock); - return -1; - } - - pthread_mutex_lock(&ae->conn_lock); - - list_add(&ae_conn->next, &ae->conn_list); - pthread_cond_signal(&ae->conn_cond); - - pthread_mutex_unlock(&ae->conn_lock); - - pthread_rwlock_unlock(&connmgr.aes_lock); + pthread_mutex_unlock(&connmgr.aes[id].lock); return 0; } @@ -136,7 +113,6 @@ static void * flow_acceptor(void * o) { int fd; qosspec_t qs; - struct conn_info snd_info; struct conn_info rcv_info; struct conn_info fail_info; @@ -145,10 +121,7 @@ static void * flow_acceptor(void * o) memset(&fail_info, 0, sizeof(fail_info)); while (true) { - if (ipcp_get_state() != IPCP_OPERATIONAL) { - log_info("Shutting down flow acceptor."); - return 0; - } + int id; fd = flow_accept(&qs, NULL); if (fd < 0) { @@ -158,26 +131,30 @@ static void * flow_acceptor(void * o) } if (cacep_rcv(fd, &rcv_info)) { - log_err("Error establishing application connection."); + log_dbg("Error establishing application connection."); flow_dealloc(fd); continue; } - if (get_info_by_name(rcv_info.ae_name, &snd_info)) { - log_err("Failed to get info for %s.", rcv_info.ae_name); + id = get_id_by_name(rcv_info.ae_name); + if (id < 0) { + log_dbg("Connection request for unknown AE %s.", + rcv_info.ae_name); cacep_snd(fd, &fail_info); flow_dealloc(fd); continue; } - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to respond to request."); + assert(id < AEID_MAX); + + if (cacep_snd(fd, &connmgr.aes[id].info)) { + log_dbg("Failed to respond to request."); flow_dealloc(fd); continue; } - if (add_ae_conn(rcv_info.ae_name, fd, qs, &rcv_info)) { - log_err("Failed to add new connection."); + if (add_ae_conn(id, fd, qs, &rcv_info)) { + log_dbg("Failed to add new connection."); flow_dealloc(fd); continue; } @@ -188,130 +165,108 @@ static void * flow_acceptor(void * o) int connmgr_init(void) { - if (pthread_rwlock_init(&connmgr.aes_lock, NULL)) - return -1; - - list_head_init(&connmgr.aes); + connmgr.state = CONNMGR_INIT; return 0; } +void connmgr_fini(void) +{ + int i; + + if (connmgr.state == CONNMGR_RUNNING) + pthread_join(connmgr.acceptor, NULL); + + for (i = 0; i < AEID_MAX; ++i) + connmgr_ae_fini(i); +} + int connmgr_start(void) { if (pthread_create(&connmgr.acceptor, NULL, flow_acceptor, NULL)) return -1; + connmgr.state = CONNMGR_RUNNING; + return 0; } void connmgr_stop(void) { - pthread_cancel(connmgr.acceptor); + if (connmgr.state == CONNMGR_RUNNING) + pthread_cancel(connmgr.acceptor); } -static void destroy_ae(struct ae * ae) +int connmgr_ae_init(enum ae_id id, + const struct conn_info * info, + struct nbs * nbs) { - struct list_head * p; - struct list_head * h; - - pthread_mutex_lock(&ae->conn_lock); - - list_for_each_safe(p, h, &ae->conn_list) { - struct ae_conn * e = list_entry(p, struct ae_conn, next); - list_del(&e->next); - free(e); - } - - pthread_mutex_unlock(&ae->conn_lock); + struct ae * ae; - pthread_cond_destroy(&ae->conn_cond); - pthread_mutex_destroy(&ae->conn_lock); + assert(id >= 0 && id < AEID_MAX); - free(ae); -} + ae = connmgr.aes + id; -void connmgr_fini(void) -{ - struct list_head * p; - struct list_head * h; + if (pthread_mutex_init(&ae->lock, NULL)) { + return -1; + } - pthread_join(connmgr.acceptor, NULL); + if (pthread_cond_init(&ae->cond, NULL)) { + pthread_mutex_destroy(&ae->lock); + return -1; + } - pthread_rwlock_wrlock(&connmgr.aes_lock); + list_head_init(&ae->conns); - list_for_each_safe(p, h, &connmgr.aes) { - struct ae * e = list_entry(p, struct ae, next); - list_del(&e->next); - destroy_ae(e); - } + memcpy(&connmgr.aes[id].info, info, sizeof(connmgr.aes[id].info)); - pthread_rwlock_unlock(&connmgr.aes_lock); + connmgr.aes[id].nbs = nbs; - pthread_rwlock_destroy(&connmgr.aes_lock); + return 0; } -struct ae * connmgr_ae_create(struct conn_info info) +void connmgr_ae_fini(enum ae_id id) { - struct ae * ae; - - ae = malloc(sizeof(*ae)); - if (ae == NULL) - goto fail_malloc; - - list_head_init(&ae->next); - list_head_init(&ae->conn_list); - - ae->info = info; - - if (pthread_mutex_init(&ae->conn_lock, NULL)) - goto fail_mutex_init; - - if (pthread_cond_init(&ae->conn_cond, NULL)) - goto fail_cond_init; - - pthread_rwlock_wrlock(&connmgr.aes_lock); + struct list_head * p; + struct list_head * h; + struct ae * ae; - list_add(&ae->next, &connmgr.aes); + assert(id >= 0 && id < AEID_MAX); - pthread_rwlock_unlock(&connmgr.aes_lock); + if (strlen(connmgr.aes[id].info.ae_name) == 0) + return; - return ae; + ae = connmgr.aes + id; - fail_cond_init: - pthread_mutex_destroy(&ae->conn_lock); - fail_mutex_init: - free(ae); - fail_malloc: - return NULL; -} + pthread_mutex_lock(&ae->lock); -void connmgr_ae_destroy(struct ae * ae) -{ - assert(ae); + list_for_each_safe(p, h, &ae->conns) { + struct ae_conn * e = list_entry(p, struct ae_conn, next); + list_del(&e->next); + free(e); + } - pthread_rwlock_wrlock(&connmgr.aes_lock); + pthread_mutex_unlock(&ae->lock); - list_del(&ae->next); + pthread_cond_destroy(&ae->cond); + pthread_mutex_destroy(&ae->lock); - pthread_rwlock_unlock(&connmgr.aes_lock); + memset(&connmgr.aes[id].info, 0, sizeof(connmgr.aes[id].info)); - destroy_ae(ae); + connmgr.aes[id].nbs = NULL; } -int connmgr_alloc(struct ae * ae, - const char * dst_name, +int connmgr_alloc(enum ae_id id, + const char * dst, qosspec_t * qs, struct conn * conn) { - assert(ae); - assert(dst_name); - assert(conn); + assert(id >= 0 && id < AEID_MAX); + assert(dst); - memset(&conn->conn_info, 0, sizeof(conn->conn_info)); - - conn->flow_info.fd = flow_alloc(dst_name, qs, NULL); + conn->flow_info.fd = flow_alloc(dst, qs, NULL); if (conn->flow_info.fd < 0) { - log_err("Failed to allocate flow to %s.", dst_name); + log_dbg("Failed to allocate flow to %s.", dst); return -1; } @@ -320,49 +275,81 @@ int connmgr_alloc(struct ae * ae, else memset(&conn->flow_info.qs, 0, sizeof(conn->flow_info.qs)); - if (cacep_snd(conn->flow_info.fd, &ae->info)) { - log_err("Failed to create application connection."); + log_dbg("Sending cacep info for protocol %s to fd %d.", + connmgr.aes[id].info.protocol, conn->flow_info.fd); + + if (cacep_snd(conn->flow_info.fd, &connmgr.aes[id].info)) { + log_dbg("Failed to create application connection."); flow_dealloc(conn->flow_info.fd); return -1; } if (cacep_rcv(conn->flow_info.fd, &conn->conn_info)) { - log_err("Failed to connect to application."); + log_dbg("Failed to connect to application."); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (strcmp(connmgr.aes[id].info.protocol, conn->conn_info.protocol)) { + log_dbg("Unknown protocol (requested %s, got %s).", + connmgr.aes[id].info.protocol, + conn->conn_info.protocol); + flow_dealloc(conn->flow_info.fd); + return -1; + } + + if (connmgr.aes[id].info.pref_version != conn->conn_info.pref_version) { + log_dbg("Unknown protocol version."); flow_dealloc(conn->flow_info.fd); return -1; } - if (strcmp(ae->info.protocol, conn->conn_info.protocol) || - ae->info.pref_version != conn->conn_info.pref_version || - ae->info.pref_syntax != conn->conn_info.pref_syntax) { + if (connmgr.aes[id].info.pref_syntax != conn->conn_info.pref_syntax) { + log_dbg("Unknown protocol syntax."); flow_dealloc(conn->flow_info.fd); return -1; } + if (connmgr.aes[id].nbs != NULL) + nbs_add(connmgr.aes[id].nbs, *conn); + return 0; } -int connmgr_wait(struct ae * ae, +int connmgr_dealloc(enum ae_id id, + struct conn * conn) +{ + if (connmgr.aes[id].nbs != NULL) + nbs_del(connmgr.aes[id].nbs, conn->flow_info.fd); + + return flow_dealloc(conn->flow_info.fd); +} + + +int connmgr_wait(enum ae_id id, struct conn * conn) { struct ae_conn * ae_conn; + struct ae * ae; - assert(ae); + assert(id >= 0 && id < AEID_MAX); assert(conn); - pthread_mutex_lock(&ae->conn_lock); + ae = connmgr.aes + id; + + pthread_mutex_lock(&ae->lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &ae->conn_lock); + (void *) &ae->lock); - while (list_is_empty(&ae->conn_list)) - pthread_cond_wait(&ae->conn_cond, &ae->conn_lock); + while (list_is_empty(&ae->conns)) + pthread_cond_wait(&ae->cond, &ae->lock); pthread_cleanup_pop(false); - ae_conn = list_first_entry((&ae->conn_list), struct ae_conn, next); + ae_conn = list_first_entry((&ae->conns), struct ae_conn, next); if (ae_conn == NULL) { - pthread_mutex_unlock(&ae->conn_lock); + pthread_mutex_unlock(&ae->lock); return -1; } @@ -371,7 +358,7 @@ int connmgr_wait(struct ae * ae, list_del(&ae_conn->next); free(ae_conn); - pthread_mutex_unlock(&ae->conn_lock); + pthread_mutex_unlock(&ae->lock); return 0; } |