diff options
Diffstat (limited to 'src/ipcpd/normal/gam.c')
-rw-r--r-- | src/ipcpd/normal/gam.c | 296 |
1 files changed, 17 insertions, 279 deletions
diff --git a/src/ipcpd/normal/gam.c b/src/ipcpd/normal/gam.c index 212cfd83..cb4e662f 100644 --- a/src/ipcpd/normal/gam.c +++ b/src/ipcpd/normal/gam.c @@ -1,7 +1,7 @@ /* * Ouroboros - Copyright (C) 2016 - 2017 * - * Graph adjacency manager for IPC Process components + * Data transfer graph adjacency manager * * Dimitri Staessens <[email protected]> * Sander Vrijders <[email protected]> @@ -20,7 +20,7 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "graph-adjacency-manager" +#define OUROBOROS_PREFIX "dt-gam" #include <ouroboros/config.h> #include <ouroboros/cdap.h> @@ -40,305 +40,43 @@ #include <pthread.h> #include <string.h> -struct ga { - struct list_head next; - - qosspec_t qs; - int fd; - struct conn_info * info; -}; - struct gam { - struct list_head gas; - pthread_mutex_t gas_lock; - pthread_cond_t gas_cond; - struct pol_gam_ops * ops; void * ops_o; }; -struct gam * gam_create(enum pol_gam gam_type) +struct gam * gam_create(enum pol_gam gam_type, + struct nbs * nbs, + struct ae * ae) { - struct gam * tmp; + struct gam * gam; - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) + gam = malloc(sizeof(*gam)); + if (gam == NULL) return NULL; switch (gam_type) { case COMPLETE: - tmp->ops = &complete_ops; + gam->ops = &complete_ops; break; default: log_err("Unknown gam policy: %d.", gam_type); - free(tmp); - return NULL; - } - - list_head_init(&tmp->gas); - - if (pthread_mutex_init(&tmp->gas_lock, NULL)) { - free(tmp); - return NULL; - } - - if (pthread_cond_init(&tmp->gas_cond, NULL)) { - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); return NULL; } - tmp->ops_o = tmp->ops->create(tmp); - if (tmp->ops_o == NULL) { - pthread_cond_destroy(&tmp->gas_cond); - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); + gam->ops_o = gam->ops->create(nbs, ae); + if (gam->ops_o == NULL) { + free(gam); return NULL; } - if (tmp->ops->start(tmp->ops_o)) { - pthread_cond_destroy(&tmp->gas_cond); - pthread_mutex_destroy(&tmp->gas_lock); - free(tmp); - return NULL; - } - - return tmp; -} - -void gam_destroy(struct gam * instance) -{ - struct list_head * p = NULL; - struct list_head * n = NULL; - - assert(instance); - - instance->ops->stop(instance->ops_o); - - pthread_mutex_lock(&instance->gas_lock); - - list_for_each_safe(p, n, &instance->gas) { - struct ga * e = list_entry(p, struct ga, next); - list_del(&e->next); - free(e->info); - free(e); - } - - pthread_mutex_unlock(&instance->gas_lock); - - pthread_mutex_destroy(&instance->gas_lock); - pthread_cond_destroy(&instance->gas_cond); - - instance->ops->destroy(instance->ops_o); - free(instance); -} - -static int add_ga(struct gam * instance, - int fd, - qosspec_t qs, - struct conn_info * info) -{ - struct ga * ga; - - ga = malloc(sizeof(*ga)); - if (ga == NULL) - return -ENOMEM; - - ga->fd = fd; - ga->info = info; - ga->qs = qs; - - list_head_init(&ga->next); - - pthread_mutex_lock(&instance->gas_lock); - list_add(&ga->next, &instance->gas); - pthread_cond_signal(&instance->gas_cond); - pthread_mutex_unlock(&instance->gas_lock); - - log_info("Added flow."); - - return 0; -} - -int gam_flow_arr(struct gam * instance, - int fd, - qosspec_t qs) -{ - struct conn_info * rcv_info; - struct conn_info snd_info; - - if (flow_alloc_resp(fd, instance->ops->accept_new_flow(instance->ops_o)) - < 0) { - log_err("Could not respond to new flow."); - return -1; - } - - rcv_info = malloc(sizeof(*rcv_info)); - if (rcv_info == NULL) - return -ENOMEM; - - memset(&snd_info, 0, sizeof(snd_info)); - memset(rcv_info, 0, sizeof(*rcv_info)); - - /* FIXME: send correct AE */ - strcpy(snd_info.ae_name, "FIXME:CORRECT_AE"); - strcpy(snd_info.protocol, CDAP_PROTO); - snd_info.pref_version = 1; - snd_info.pref_syntax = PROTO_GPB; - snd_info.addr = ipcpi.dt_addr; - - if (cacep_rcv(fd, rcv_info)) { - log_err("Error establishing application connection."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to respond to application connection request."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.ae_name, rcv_info->ae_name)) { - log_err("Received connection for wrong AE."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.protocol, rcv_info->protocol) || - snd_info.pref_version != rcv_info->pref_version || - snd_info.pref_syntax != rcv_info->pref_syntax) { - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) { - flow_dealloc(fd); - free(rcv_info); - return 0; - } - - if (add_ga(instance, fd, qs, rcv_info)) { - log_err("Failed to add ga to graph adjacency manager list."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - return 0; + return gam; } -int gam_flow_alloc(struct gam * instance, - char * dst_name, - qosspec_t qs) +void gam_destroy(struct gam * gam) { - struct conn_info * rcv_info; - struct conn_info snd_info; - int fd; - - log_dbg("Allocating flow to %s.", dst_name); - - rcv_info = malloc(sizeof(*rcv_info)); - if (rcv_info == NULL) - return -ENOMEM; - - fd = flow_alloc(dst_name, NULL); - if (fd < 0) { - log_err("Failed to allocate flow to %s.", dst_name); - return -1; - } - - if (flow_alloc_res(fd)) { - log_err("Flow allocation to %s failed.", dst_name); - flow_dealloc(fd); - return -1; - } - - memset(&snd_info, 0, sizeof(snd_info)); - memset(rcv_info, 0, sizeof(*rcv_info)); - - /* FIXME: send correct AE */ - strcpy(snd_info.ae_name, "FIXME:CORRECT_AE"); - strcpy(snd_info.protocol, CDAP_PROTO); - snd_info.pref_version = 1; - snd_info.pref_syntax = PROTO_GPB; - snd_info.addr = ipcpi.dt_addr; - - if (cacep_snd(fd, &snd_info)) { - log_err("Failed to create application connection."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (cacep_rcv(fd, rcv_info)) { - log_err("Failed to connect to application."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (strcmp(snd_info.protocol, rcv_info->protocol) || - snd_info.pref_version != rcv_info->pref_version || - snd_info.pref_syntax != rcv_info->pref_syntax) { - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - if (instance->ops->accept_flow(instance->ops_o, qs, rcv_info)) { - flow_dealloc(fd); - free(rcv_info); - return 0; - } - - if (add_ga(instance, fd, qs, rcv_info)) { - log_err("Failed to add GA to graph adjacency manager list."); - flow_dealloc(fd); - free(rcv_info); - return -1; - } - - return 0; -} - -int gam_flow_wait(struct gam * instance, - int * fd, - struct conn_info ** info, - qosspec_t * qs) -{ - struct ga * ga; - - assert(fd); - assert(info); - assert(qs); - - pthread_mutex_lock(&instance->gas_lock); - - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) &instance->gas_lock); - - while (list_is_empty(&instance->gas)) - pthread_cond_wait(&instance->gas_cond, &instance->gas_lock); - - ga = list_first_entry((&instance->gas), struct ga, next); - if (ga == NULL) { - pthread_mutex_unlock(&instance->gas_lock); - return -1; - } - - *fd = ga->fd; - *info = ga->info; - *qs = ga->qs; - - list_del(&ga->next); - free(ga); - - pthread_cleanup_pop(true); + assert(gam); - return 0; + gam->ops->destroy(gam->ops_o); + free(gam); } |