/* * Ouroboros - Copyright (C) 2016 * * The IPC Resource Manager * * Sander Vrijders <sander.vrijders@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #define OUROBOROS_PREFIX "irmd" #include <ouroboros/config.h> #include <ouroboros/logs.h> #include <ouroboros/sockets.h> #include <ouroboros/irm.h> #include <ouroboros/ipcp.h> #include <ouroboros/da.h> #include <ouroboros/list.h> #include <ouroboros/instance_name.h> #include <ouroboros/utils.h> #include <ouroboros/dif_config.h> #include <ouroboros/shm_du_map.h> #include <ouroboros/bitmap.h> #include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <sys/socket.h> #include <sys/un.h> #include <signal.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <limits.h> #include <pthread.h> /* FIXME: this smells like part of namespace management */ #define ALL_DIFS "*" #ifndef IRMD_MAX_FLOWS #define IRMD_MAX_FLOWS 4096 #endif #ifndef IRMD_THREADPOOL_SIZE #define IRMD_THREADPOOL_SIZE 3 #endif struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; }; /* currently supports only registering whatevercast groups of a single AP-I */ struct reg_name_entry { struct list_head next; /* generic whatevercast name */ char * name; /* FIXME: make a list resolve to AP-I instead */ instance_name_t * api; bool accept; char * req_ap_name; char * req_ae_name; int response; int flow_arrived; pthread_cond_t acc_signal; pthread_mutex_t acc_lock; }; /* keeps track of port_id's between N and N - 1 */ struct port_map_entry { struct list_head next; int port_id; pid_t n_pid; pid_t n_1_pid; pthread_cond_t res_signal; pthread_mutex_t res_lock; enum flow_state state; }; struct irm { /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; int sockfd; /* keep track of all flows in this processing system */ struct bmp * port_ids; /* maps port_ids to pid pair */ struct list_head port_map; struct shm_du_map * dum; pthread_t * threadpool; pthread_mutex_t r_lock; bool shutdown; pthread_mutex_t s_lock; } * instance = NULL; static struct port_map_entry * port_map_entry_create() { struct port_map_entry * e = malloc(sizeof(*e)); if (e == NULL) return NULL; e->n_pid = 0; e->n_1_pid = 0; e->port_id = 0; e->state = FLOW_NULL; if (pthread_cond_init(&e->res_signal, NULL)) { free(e); return NULL; } if (pthread_mutex_init(&e->res_lock, NULL)) { free(e); return NULL; } return e; } static struct port_map_entry * get_port_map_entry(int port_id) { struct list_head * pos = NULL; list_for_each(pos, &instance->port_map) { struct port_map_entry * e = list_entry(pos, struct port_map_entry, next); if (e->port_id == port_id) return e; } return NULL; } static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) { struct list_head * pos = NULL; list_for_each(pos, &instance->port_map) { struct port_map_entry * e = list_entry(pos, struct port_map_entry, next); if (e->n_pid == n_pid) return e; } return NULL; } static struct ipcp_entry * ipcp_entry_create() { struct ipcp_entry * e = malloc(sizeof(*e)); if (e == NULL) return NULL; e->api = NULL; e->dif_name = NULL; INIT_LIST_HEAD(&e->next); return e; } static void ipcp_entry_destroy(struct ipcp_entry * e) { if (e == NULL) return; if (e->api != NULL) instance_name_destroy(e->api); if (e->dif_name != NULL) free(e->dif_name); free(e); } static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api) { struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); if (instance_name_cmp(api, tmp->api) == 0) return tmp; } return NULL; } static instance_name_t * get_ipcp_by_name(char * ap_name) { struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (strcmp(e->api->name, ap_name) == 0) return e->api; } return NULL; } static instance_name_t * get_ipcp_by_dif_name(char * dif_name) { struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (e->dif_name == NULL) continue; if (strcmp(dif_name, e->dif_name) == 0) return e->api; } return NULL; } /* FIXME: this just returns the first IPCP for now */ static instance_name_t * get_ipcp_by_dst_name(char * dst_name) { struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); return e->api; } return NULL; } static struct reg_name_entry * reg_name_entry_create() { struct reg_name_entry * e = malloc(sizeof(*e)); if (e == NULL) return NULL; e->name = NULL; e->api = NULL; e->accept = false; e->req_ap_name = NULL; e->req_ae_name = NULL; e->flow_arrived = -1; if (pthread_cond_init(&e->acc_signal, NULL)) { free(e); return NULL; } if (pthread_mutex_init(&e->acc_lock, NULL)) { free(e); return NULL; } INIT_LIST_HEAD(&e->next); return e; } static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e, char * name, instance_name_t * api) { if (e == NULL || name == NULL || api == NULL) return NULL; e->name = name; e->api = api; return e; } static int reg_name_entry_destroy(struct reg_name_entry * e) { if (e == NULL) return 0; if (e->accept) { pthread_mutex_lock(&e->acc_lock); e->flow_arrived = -2; pthread_mutex_unlock(&e->acc_lock); pthread_cond_broadcast(&e->acc_signal); sched_yield(); } free(e->name); instance_name_destroy(e->api); if (e->req_ap_name != NULL) free(e->req_ap_name); if (e->req_ae_name != NULL) free(e->req_ae_name); free(e); e = NULL; return 0; } static struct reg_name_entry * get_reg_name_entry_by_name(char * name) { struct list_head * pos = NULL; list_for_each(pos, &instance->reg_names) { struct reg_name_entry * e = list_entry(pos, struct reg_name_entry, next); if (strcmp(name, e->name) == 0) return e; } return NULL; } static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid) { struct list_head * pos = NULL; list_for_each(pos, &instance->reg_names) { struct reg_name_entry * e = list_entry(pos, struct reg_name_entry, next); if (e->api->id == pid) return e; } return NULL; } /* FIXME: add only name when we have NSM solved */ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) { struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) { e = reg_name_entry_create(); if (e == NULL) return -1; if (reg_name_entry_init(e, name, api) == NULL) { reg_name_entry_destroy(e); return -1; } list_add(&e->next, &instance->reg_names); return 0; } /* already exists, we don't have NSM yet */ return -1; } static int reg_name_entry_del_name(char * name) { struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) return 0; list_del(&e->next); reg_name_entry_destroy(e); return 0; } static pid_t create_ipcp(char * ap_name, enum ipcp_type ipcp_type) { pid_t pid; struct ipcp_entry * tmp = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { LOG_ERR("Failed to create IPCP."); return -1; } tmp = ipcp_entry_create(); if (tmp == NULL) return -1; INIT_LIST_HEAD(&tmp->next); tmp->api = instance_name_create(); if (tmp->api == NULL) { ipcp_entry_destroy(tmp); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); ipcp_entry_destroy(tmp); return -1; } tmp->dif_name = NULL; pthread_mutex_lock(&instance->r_lock); list_add(&tmp->next, &instance->ipcps); pthread_mutex_unlock(&instance->r_lock); LOG_INFO("Created IPCP %s-%d.", ap_name, pid); return pid; } static int destroy_ipcp(instance_name_t * api) { struct list_head * pos = NULL; struct list_head * n = NULL; pid_t pid = 0; if (api == NULL) return 0; if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { LOG_ERR("No such IPCP in the system."); return 0; } pid = api->id; if (ipcp_destroy(api->id)) LOG_ERR("Could not destroy IPCP."); list_for_each_safe(pos, n, &(instance->ipcps)) { struct ipcp_entry * tmp = list_entry(pos, struct ipcp_entry, next); if (instance_name_cmp(api, tmp->api) == 0) list_del(&tmp->next); ipcp_entry_destroy(tmp); } LOG_INFO("Destroyed IPCP %d.", pid); return 0; } static int bootstrap_ipcp(instance_name_t * api, dif_config_msg_t * conf) { struct ipcp_entry * entry = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP in the system."); return -1; } entry = get_ipcp_entry_by_name(api); if (entry == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } pthread_mutex_unlock(&instance->r_lock); LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", api->name, api->id, conf->dif_name); return 0; } static int enroll_ipcp(instance_name_t * api, char * dif_name) { char * member = NULL; char ** n_1_difs = NULL; ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); entry = get_ipcp_entry_by_name(api); if (entry == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } member = da_resolve_daf(dif_name); if (member == NULL) { LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; pthread_mutex_unlock(&instance->r_lock); return -1; } n_1_difs_size = da_resolve_dap(member, n_1_difs); if (n_1_difs_size < 1) { LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; pthread_mutex_unlock(&instance->r_lock); return -1; } pthread_mutex_unlock(&instance->r_lock); if (ipcp_enroll(api->id, member, n_1_difs[0])) { LOG_ERR("Could not enroll IPCP."); pthread_mutex_lock(&instance->r_lock); free(entry->dif_name); entry->dif_name = NULL; pthread_mutex_unlock(&instance->r_lock); return -1; } LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", api->name, api->id, dif_name); return 0; } static int reg_ipcp(instance_name_t * api, char ** difs, size_t difs_size) { pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); if (ipcp_reg(api->id, difs, difs_size)) { LOG_ERR("Could not register IPCP to N-1 DIF(s)."); return -1; } return 0; } static int unreg_ipcp(instance_name_t * api, char ** difs, size_t difs_size) { if (ipcp_unreg(api->id, difs, difs_size)) { LOG_ERR("Could not unregister IPCP from N-1 DIF(s)."); return -1; } return 0; } static int ap_reg(char * ap_name, pid_t ap_id, char ** difs, size_t len) { int i; int ret = 0; struct list_head * pos = NULL; struct reg_name_entry * rne = NULL; instance_name_t * api = NULL; instance_name_t * ipcpi = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); if (instance->ipcps.next == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } api = instance_name_create(); if (api == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } if (instance_name_init_from(api, ap_name, ap_id) == NULL) { pthread_mutex_unlock(&instance->r_lock); instance_name_destroy(api); return -1; } /* check if this ap_name is already registered */ rne = get_reg_name_entry_by_name(ap_name); if (rne != NULL) { instance_name_destroy(api); pthread_mutex_unlock(&instance->r_lock); return -1; /* can only register one instance for now */ } /* * for now, the whatevercast name is the same as the ap_name and * contains a single instance only */ if (strcmp(difs[0], ALL_DIFS) == 0) { list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (ipcp_name_reg(e->api->id, ap_name)) { LOG_ERR("Could not register %s in DIF %s.", api->name, e->dif_name); } else { ++ret; } } } else { for (i = 0; i < len; ++i) { ipcpi = get_ipcp_by_dif_name(difs[i]); if (ipcpi == NULL) { LOG_ERR("%s: No such DIF.", difs[i]); continue; } if (ipcp_name_reg(ipcpi->id, api->name)) { LOG_ERR("Could not register %s in DIF %s.", api->name, difs[i]); } else { ++ret; } } } if (ret == 0) { instance_name_destroy(api); pthread_mutex_unlock(&instance->r_lock); return -1; } /* for now, we register single instances */ ret = reg_name_entry_add_name_instance(strdup(ap_name), api); pthread_mutex_unlock(&instance->r_lock); return ret; } static int ap_unreg(char * ap_name, pid_t ap_id, char ** difs, size_t len) { int i; int ret = 0; struct reg_name_entry * rne = NULL; struct list_head * pos = NULL; pthread_mutex_lock(&instance->r_lock); /* check if ap_name is registered */ rne = get_reg_name_entry_by_id(ap_id); if (rne == NULL) { pthread_mutex_unlock(&instance->r_lock); return 0; /* no such id */ } if (strcmp(ap_name, rne->api->name)) { pthread_mutex_unlock(&instance->r_lock); return 0; } if (instance->ipcps.next == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No IPCPs in this system."); return 0; } if (strcmp(difs[0], ALL_DIFS) == 0) { list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = list_entry(pos, struct ipcp_entry, next); if (ipcp_name_unreg(e->api->id, rne->name)) { LOG_ERR("Could not unregister %s in DIF %s.", rne->name, e->dif_name); --ret; } } } else { for (i = 0; i < len; ++i) { if (ipcp_name_unreg(ap_id, rne->name)) { LOG_ERR("Could not unregister %s in DIF %s.", rne->name, difs[i]); --ret; } } } /* FIXME: check if name is not registered in any DIF before removing */ reg_name_entry_del_name(rne->name); pthread_mutex_unlock(&instance->r_lock); return ret; } static struct port_map_entry * flow_accept(pid_t pid, char ** ap_name, char ** ae_name) { struct port_map_entry * pme; struct reg_name_entry * rne = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return NULL; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); rne = get_reg_name_entry_by_id(pid); if (rne == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("Unregistered AP calling accept()."); return NULL; } if (rne->accept) { pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("This AP still has a pending accept()."); return NULL; } rne->accept = true; rne->flow_arrived = -1; pthread_mutex_unlock(&instance->r_lock); pthread_mutex_lock(&rne->acc_lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void*) &rne->acc_lock); while (rne->flow_arrived == -1) pthread_cond_wait(&rne->acc_signal, &rne->acc_lock); pthread_mutex_unlock(&rne->acc_lock); pthread_cleanup_pop(0); pthread_mutex_lock(&rne->acc_lock); /* ap with pending accept being unregistered */ if (rne->flow_arrived == -2 ) { pthread_mutex_unlock(&rne->acc_lock); return NULL; } pthread_mutex_unlock(&rne->acc_lock); pthread_mutex_lock(&instance->r_lock); pme = get_port_map_entry_n(pid); if (pme == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Port_id was not created yet."); return NULL; } *ap_name = rne->req_ap_name; if (ae_name != NULL) *ae_name = rne->req_ae_name; pthread_mutex_unlock(&instance->r_lock); return pme; } static int flow_alloc_resp(pid_t n_pid, int port_id, int response) { struct port_map_entry * pme = NULL; struct reg_name_entry * rne = NULL; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); rne = get_reg_name_entry_by_id(n_pid); if (rne == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } /* FIXME: check all instances associated with the name */ if (!rne->accept) { pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No process listening for this name."); return -1; } /* * consider the flow as handled * once we can handle a list of AP-I's, remove it from the list */ rne->accept = false; rne->flow_arrived = -1; if (!response) { pme = get_port_map_entry(port_id); pme->state = FLOW_ALLOCATED; } pthread_mutex_unlock(&instance->r_lock); return ipcp_flow_alloc_resp(pme->n_1_pid, port_id, pme->n_pid, response); } static struct port_map_entry * flow_alloc(pid_t pid, char * dst_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos) { struct port_map_entry * pme; instance_name_t * ipcp; /* FIXME: Map qos_spec to qos_cube */ pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return NULL; } pthread_mutex_unlock(&instance->s_lock); pme = port_map_entry_create(); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); return NULL; } pme->n_pid = pid; pme->state = FLOW_PENDING; pthread_mutex_lock(&instance->r_lock); pme->port_id = bmp_allocate(instance->port_ids); pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; list_add(&pme->next, &instance->port_map); ipcp = get_ipcp_by_dst_name(dst_name); pthread_mutex_unlock(&instance->r_lock); if (ipcp == NULL) { LOG_DBG("unknown ipcp"); return NULL; } if (ipcp_flow_alloc(ipcp->id, pme->port_id, pme->n_pid, dst_name, src_ap_name, src_ae_name, QOS_CUBE_BE) < 0) { pthread_mutex_lock(&instance->r_lock); list_del(&pme->next); pthread_mutex_unlock(&instance->r_lock); bmp_release(instance->port_ids, pme->port_id); free(pme); return NULL; } return pme; } static int flow_alloc_res(int port_id) { struct port_map_entry * e; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return -1; } pthread_mutex_unlock(&instance->s_lock); pthread_mutex_lock(&instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } if (e->state == FLOW_ALLOCATED) { pthread_mutex_unlock(&instance->r_lock); return 0; } pthread_mutex_unlock(&instance->r_lock); while (true) { pthread_mutex_lock(&e->res_lock); pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void*) &e->res_lock); pthread_cond_wait(&e->res_signal, &e->res_lock); pthread_mutex_unlock(&e->res_lock); pthread_cleanup_pop(0); pthread_mutex_lock(&instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } if (e->state == FLOW_ALLOCATED) { pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("Returning 0."); return 0; } if (e->state == FLOW_NULL) { list_del(&e->next); pthread_mutex_unlock(&instance->r_lock); free(e); return -1; } /* still pending, spurious wake */ pthread_mutex_unlock(&instance->r_lock); } pthread_mutex_unlock(&instance->r_lock); return 0; } static int flow_dealloc(int port_id) { pid_t n_1_pid; struct port_map_entry * e = NULL; pthread_mutex_lock(&instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { pthread_mutex_unlock(&instance->r_lock); return 0; } n_1_pid = e->n_1_pid; list_del(&e->next); bmp_release(instance->port_ids, port_id); pthread_mutex_unlock(&instance->r_lock); free(e); return ipcp_flow_dealloc(n_1_pid, port_id); } static struct port_map_entry * flow_req_arr(pid_t pid, char * dst_name, char * ap_name, char * ae_name) { struct reg_name_entry * rne; struct port_map_entry * pme; pthread_mutex_lock(&instance->s_lock); if (instance->shutdown) { pthread_mutex_unlock(&instance->s_lock); return NULL; } pthread_mutex_unlock(&instance->s_lock); pme = malloc(sizeof(*pme)); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); return NULL; } pme->state = FLOW_PENDING; pme->n_1_pid = pid; pthread_mutex_lock(&instance->r_lock); pme->port_id = bmp_allocate(instance->port_ids); rne = get_reg_name_entry_by_name(dst_name); if (rne == NULL) { pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("Destination name %s unknown.", dst_name); free(pme); return NULL; } pme->n_pid = rne->api->id; rne->req_ap_name = strdup(ap_name); rne->req_ae_name = strdup(ae_name); list_add(&pme->next, &instance->port_map); pthread_mutex_lock(&rne->acc_lock); rne->flow_arrived = 0; if (pthread_cond_signal(&rne->acc_signal)) LOG_ERR("Failed to send signal."); pthread_mutex_unlock(&rne->acc_lock); pthread_mutex_unlock(&instance->r_lock); return pme; } static int flow_alloc_reply(int port_id, int response) { struct port_map_entry * e; pthread_mutex_lock(&instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { pthread_mutex_unlock(&instance->r_lock); return -1; } pthread_mutex_lock(&e->res_lock); if (!response) e->state = FLOW_ALLOCATED; else e->state = FLOW_NULL; if (pthread_cond_signal(&e->res_signal)) LOG_ERR("Failed to send signal."); pthread_mutex_unlock(&e->res_lock); pthread_mutex_unlock(&instance->r_lock); return 0; } static int flow_dealloc_ipcp(int port_id) { struct port_map_entry * e = NULL; pthread_mutex_lock(&instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { pthread_mutex_unlock(&instance->r_lock); return 0; } list_del(&e->next); pthread_mutex_unlock(&instance->r_lock); free(e); return 0; } static void irm_destroy(struct irm * irm) { struct list_head * h; struct list_head * t; if (irm == NULL) return; pthread_mutex_lock(&irm->s_lock); instance->shutdown = true; pthread_mutex_unlock(&irm->s_lock); if (irm->threadpool != NULL) free(irm->threadpool); if (irm->port_ids != NULL) bmp_destroy(irm->port_ids); /* clear the lists */ list_for_each_safe(h, t, &irm->ipcps) { struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); destroy_ipcp(e->api); } list_for_each_safe(h, t, &irm->reg_names) { struct reg_name_entry * e = list_entry(h, struct reg_name_entry, next); list_del(&e->next); reg_name_entry_destroy(e); } list_for_each_safe(h, t, &irm->port_map) { struct port_map_entry * e = list_entry(h, struct port_map_entry, next); list_del(&e->next); free(e); } if (irm->dum != NULL) shm_du_map_destroy(irm->dum); close(irm->sockfd); free(irm); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) { int i; switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: if (instance->threadpool != NULL) { for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) pthread_cancel(instance->threadpool[i]); } case SIGPIPE: LOG_DBG("Ignoring SIGPIPE."); default: return; } } void * mainloop() { uint8_t buf[IRM_MSG_BUF_SIZE]; while (true) { int cli_sockfd; irm_msg_t * msg; ssize_t count; instance_name_t api; buffer_t buffer; irm_msg_t ret_msg = IRM_MSG__INIT; struct port_map_entry * e = NULL; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; cli_sockfd = accept(instance->sockfd, 0, 0); if (cli_sockfd < 0) { LOG_ERR("Cannot accept new connection."); continue; } count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); if (count <= 0) { LOG_ERR("Failed to read from socket."); close(cli_sockfd); continue; } msg = irm_msg__unpack(NULL, count, buf); if (msg == NULL) { close(cli_sockfd); continue; } api.name = msg->ap_name; if (msg->has_api_id == true) api.id = msg->api_id; switch (msg->code) { case IRM_MSG_CODE__IRM_CREATE_IPCP: ret_msg.has_result = true; ret_msg.result = create_ipcp(msg->ap_name, msg->ipcp_type); break; case IRM_MSG_CODE__IRM_DESTROY_IPCP: ret_msg.has_result = true; ret_msg.result = destroy_ipcp(&api); break; case IRM_MSG_CODE__IRM_BOOTSTRAP_IPCP: ret_msg.has_result = true; ret_msg.result = bootstrap_ipcp(&api, msg->conf); break; case IRM_MSG_CODE__IRM_ENROLL_IPCP: ret_msg.has_result = true; ret_msg.result = enroll_ipcp(&api, msg->dif_name[0]); break; case IRM_MSG_CODE__IRM_REG_IPCP: ret_msg.has_result = true; ret_msg.result = reg_ipcp(&api, msg->dif_name, msg->n_dif_name); break; case IRM_MSG_CODE__IRM_UNREG_IPCP: ret_msg.has_result = true; ret_msg.result = unreg_ipcp(&api, msg->dif_name, msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_REG: ret_msg.has_result = true; ret_msg.result = ap_reg(msg->ap_name, msg->pid, msg->dif_name, msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_UNREG: ret_msg.has_result = true; ret_msg.result = ap_unreg(msg->ap_name, msg->pid, msg->dif_name, msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: e = flow_accept(msg->pid, &ret_msg.ap_name, &ret_msg.ae_name); if (e == NULL) break; ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_pid = true; ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: ret_msg.has_result = true; ret_msg.result = flow_alloc_resp(msg->pid, msg->port_id, msg->response); break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: e = flow_alloc(msg->pid, msg->dst_name, msg->ap_name, msg->ae_name, NULL); if (e == NULL) break; ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_pid = true; ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: ret_msg.has_result = true; ret_msg.result = flow_alloc_res(msg->port_id); break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; ret_msg.result = flow_dealloc(msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, msg->dst_name, msg->ap_name, msg->ae_name); if (e == NULL) break; ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_pid = true; ret_msg.pid = e->n_pid; break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: ret_msg.has_result = true; ret_msg.result = flow_alloc_reply(msg->port_id, msg->response); break; case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: ret_msg.has_result = true; ret_msg.result = flow_dealloc_ipcp(msg->port_id); break; default: LOG_ERR("Don't know that message code."); break; } irm_msg__free_unpacked(msg, NULL); buffer.size = irm_msg__get_packed_size(&ret_msg); if (buffer.size == 0) { LOG_ERR("Failed to send reply message."); close(cli_sockfd); continue; } buffer.data = malloc(buffer.size); if (buffer.data == NULL) { close(cli_sockfd); continue; } irm_msg__pack(&ret_msg, buffer.data); if (write(cli_sockfd, buffer.data, buffer.size) == -1) { free(buffer.data); close(cli_sockfd); continue; } free(buffer.data); close(cli_sockfd); } } static struct irm * irm_create() { struct irm * i = malloc(sizeof(*i)); if (i == NULL) return NULL; if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) unlink("/dev/shm/" SHM_DU_MAP_FILENAME); i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); if (i->threadpool == NULL) { irm_destroy(i); return NULL; } if ((i->dum = shm_du_map_create()) == NULL) { irm_destroy(i); return NULL; } INIT_LIST_HEAD(&i->ipcps); INIT_LIST_HEAD(&i->reg_names); INIT_LIST_HEAD(&i->port_map); i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); if (i->port_ids == NULL) { irm_destroy(i); return NULL; } i->sockfd = server_socket_open(IRM_SOCK_PATH); if (i->sockfd < 0) { irm_destroy(i); return NULL; } if (pthread_mutex_init(&i->r_lock, NULL)) { irm_destroy(i); return NULL; } i->shutdown = false; if (pthread_mutex_init(&i->s_lock, NULL)) { irm_destroy(i); return NULL; } return i; } int main() { struct sigaction sig_act; int t = 0; /* init sig_act */ memset(&sig_act, 0, sizeof sig_act); /* install signal traps */ sig_act.sa_sigaction = &irmd_sig_handler; sig_act.sa_flags = SA_SIGINFO; if (sigaction(SIGINT, &sig_act, NULL) < 0) exit(1); if (sigaction(SIGTERM, &sig_act, NULL) < 0) exit(1); if (sigaction(SIGHUP, &sig_act, NULL) < 0) exit(1); if (sigaction(SIGPIPE, &sig_act, NULL) < 0) exit(1); instance = irm_create(); if (instance == NULL) return 1; /* * FIXME: we need a main loop that delegates messages to subthreads in a * way that avoids all possible deadlocks for local apps */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); /* wait for (all of them) to return */ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) pthread_join(instance->threadpool[t], NULL); irm_destroy(instance); return 0; }