/* * Ouroboros - Copyright (C) 2016 * * Local IPC process * * Dimitri Staessens <dimitri.staessens@intec.ugent.be> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include <ouroboros/config.h> #include "ipcp.h" #include "flow.h" #include <ouroboros/shm_du_map.h> #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/ipcp.h> #include <ouroboros/dif_config.h> #include <ouroboros/sockets.h> #include <ouroboros/bitmap.h> #include <ouroboros/common.h> #include <ouroboros/dev.h> #define OUROBOROS_PREFIX "ipcpd/local" #include <ouroboros/logs.h> #include <string.h> #include <signal.h> #include <stdlib.h> #include <pthread.h> #include <sys/wait.h> #include <fcntl.h> #define THIS_TYPE IPCP_LOCAL #define shim_data(type) ((struct ipcp_local_data *) type->data) /* global for trapping signal */ int irmd_pid; /* this IPCP's data */ #ifdef MAKE_CHECK extern struct ipcp * _ipcp; /* defined in test */ #else struct ipcp * _ipcp; #endif /* * copied from ouroboros/dev. The shim needs access to the internals * because it doesn't follow all steps necessary steps to get * the info */ /* the shim needs access to these internals */ struct shim_ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; struct shm_ap_rbuff * rb; int in_out[AP_MAX_FLOWS]; struct flow flows[AP_MAX_FLOWS]; pthread_rwlock_t flows_lock; pthread_t mainloop; pthread_t sduloop; } * _ap_instance; static int shim_ap_init(char * ap_name) { int i; _ap_instance = malloc(sizeof(struct shim_ap_data)); if (_ap_instance == NULL) { return -1; } _ap_instance->api = instance_name_create(); if (_ap_instance->api == NULL) { free(_ap_instance); return -1; } if (instance_name_init_from(_ap_instance->api, ap_name, getpid()) == NULL) { instance_name_destroy(_ap_instance->api); free(_ap_instance); return -1; } _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); if (_ap_instance->fds == NULL) { instance_name_destroy(_ap_instance->api); free(_ap_instance); return -1; } _ap_instance->dum = shm_du_map_open(); if (_ap_instance->dum == NULL) { instance_name_destroy(_ap_instance->api); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } for (i = 0; i < AP_MAX_FLOWS; i ++) { _ap_instance->flows[i].rb = NULL; _ap_instance->flows[i].port_id = -1; _ap_instance->flows[i].state = FLOW_NULL; _ap_instance->in_out[i] = -1; } pthread_rwlock_init(&_ap_instance->flows_lock, NULL); return 0; } void shim_ap_fini() { int i = 0; if (_ap_instance == NULL) return; pthread_rwlock_wrlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_SHUTDOWN) LOG_WARN("Cleaning up AP while not in shutdown."); if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) bmp_destroy(_ap_instance->fds); if (_ap_instance->dum != NULL) shm_du_map_close(_ap_instance->dum); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); pthread_rwlock_wrlock(&_ap_instance->flows_lock); for (i = 0; i < AP_MAX_FLOWS; i ++) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); free(_ap_instance); } /* only call this under flows_lock */ static int port_id_to_fd(int port_id) { int i; for (i = 0; i < AP_MAX_FLOWS; ++i) { if (_ap_instance->flows[i].port_id == port_id && _ap_instance->flows[i].state != FLOW_NULL) return i; } return -1; } /* * end copy from dev.c */ /* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_local_sdu_loop(void * o) { while (true) { struct rb_entry * e; int fd; e = shm_ap_rbuff_read(_ap_instance->rb); if (e == NULL) { continue; } pthread_rwlock_rdlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_ENROLLED) { pthread_rwlock_unlock(&_ipcp->state_lock); return (void *) 1; /* -ENOTENROLLED */ } pthread_rwlock_rdlock(&_ap_instance->flows_lock); fd = _ap_instance->in_out[port_id_to_fd(e->port_id)]; if (fd == -1) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); free(e); continue; } e->port_id = _ap_instance->flows[fd].port_id; while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0) ; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); } return (void *) 1; } void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: case SIGQUIT: if (info->si_pid == irmd_pid) { bool clean_threads = false; LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); pthread_rwlock_wrlock(&_ipcp->state_lock); if (_ipcp->state == IPCP_ENROLLED) clean_threads = true; _ipcp->state = IPCP_SHUTDOWN; pthread_rwlock_unlock(&_ipcp->state_lock); if (clean_threads) { pthread_cancel(_ap_instance->sduloop); pthread_join(_ap_instance->sduloop, NULL); } pthread_cancel(_ap_instance->mainloop); } default: return; } } static int ipcp_local_bootstrap(struct dif_config * conf) { if (conf->type != THIS_TYPE) { LOG_ERR("Config doesn't match IPCP type."); return -1; } if (_ipcp->state != IPCP_INIT) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_ERR("IPCP in wrong state."); return -1; } pthread_rwlock_wrlock(&_ipcp->state_lock); _ipcp->state = IPCP_ENROLLED; pthread_create(&_ap_instance->sduloop, NULL, ipcp_local_sdu_loop, NULL); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBG("Bootstrapped local IPCP with pid %d.", getpid()); return 0; } static int ipcp_local_name_reg(char * name) { pthread_rwlock_rdlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_ENROLLED) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } pthread_rwlock_unlock(&_ipcp->state_lock); pthread_rwlock_rdlock(&_ipcp->state_lock); if (ipcp_data_add_reg_entry(_ipcp->data, name)) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Failed to add %s to local registry.", name); return -1; } pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBG("Registered %s.", name); return 0; } static int ipcp_local_name_unreg(char * name) { pthread_rwlock_rdlock(&_ipcp->state_lock); ipcp_data_del_reg_entry(_ipcp->data, name); pthread_rwlock_unlock(&_ipcp->state_lock); return 0; } static int ipcp_local_flow_alloc(pid_t n_pid, int port_id, char * dst_name, char * src_ae_name, enum qos_cube qos) { int in_fd = -1; int out_fd = -1; struct shm_ap_rbuff * rb; LOG_INFO("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; /* This ipcpd has all QoS */ pthread_rwlock_rdlock(&_ipcp->state_lock); if (_ipcp->state != IPCP_ENROLLED) { pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } rb = shm_ap_rbuff_open(n_pid); if (rb == NULL) { pthread_rwlock_unlock(&_ipcp->state_lock); return -1; /* -ENORBUFF */ } pthread_rwlock_wrlock(&_ap_instance->flows_lock); in_fd = bmp_allocate(_ap_instance->fds); if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); return -EMFILE; } _ap_instance->flows[in_fd].port_id = port_id; _ap_instance->flows[in_fd].state = FLOW_PENDING; _ap_instance->flows[in_fd].rb = rb; LOG_DBGF("Pending local flow with port_id %d.", port_id); /* reply to IRM */ port_id = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); if (port_id < 0) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_ERR("Could not get port id from IRMd"); /* shm_ap_rbuff_close(n_pid); */ return -1; } out_fd = bmp_allocate(_ap_instance->fds); if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) { /* shm_ap_rbuff_close(n_pid); */ pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); return -1; /* -ENOMOREFDS */ } _ap_instance->flows[out_fd].port_id = port_id; _ap_instance->flows[out_fd].rb = NULL; _ap_instance->flows[out_fd].state = FLOW_PENDING; _ap_instance->in_out[in_fd] = out_fd; _ap_instance->in_out[out_fd] = in_fd; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Pending local allocation request, port_id %d.", port_id); return 0; } static int ipcp_local_flow_alloc_resp(pid_t n_pid, int port_id, int response) { struct shm_ap_rbuff * rb; int in_fd = -1;\ int out_fd = -1; int ret = -1; if (response) return 0; pthread_rwlock_rdlock(&_ipcp->state_lock); /* awaken pending flow */ pthread_rwlock_wrlock(&_ap_instance->flows_lock); in_fd = port_id_to_fd(port_id); if (in_fd < 0) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not find flow with port_id %d.", port_id); return -1; } if (_ap_instance->flows[in_fd].state != FLOW_PENDING) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Flow was not pending."); return -1; } rb = shm_ap_rbuff_open(n_pid); if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); _ap_instance->flows[in_fd].state = FLOW_NULL; _ap_instance->flows[in_fd].port_id = -1; _ap_instance->in_out[in_fd] = -1; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); return -1; } _ap_instance->flows[in_fd].state = FLOW_ALLOCATED; _ap_instance->flows[in_fd].rb = rb; LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd); out_fd = _ap_instance->in_out[in_fd]; if (out_fd < 0) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("No pending local flow with port_id %d.", port_id); return -1; } if (_ap_instance->flows[out_fd].state != FLOW_PENDING) { /* FIXME: clean up other end */ pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Flow was not pending."); return -1; } _ap_instance->flows[out_fd].state = FLOW_ALLOCATED; pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); if ((ret = ipcp_flow_alloc_reply(getpid(), _ap_instance->flows[out_fd].port_id, response)) < 0) { return -1; /* -EPIPE */ } LOG_INFO("Flow allocation completed, port_ids (%d, %d).", _ap_instance->flows[out_fd].port_id, _ap_instance->flows[in_fd].port_id); return ret; } static int ipcp_local_flow_dealloc(int port_id) { int fd = -1; struct shm_ap_rbuff * rb; pthread_rwlock_rdlock(&_ipcp->state_lock); pthread_rwlock_wrlock(&_ap_instance->flows_lock); fd = port_id_to_fd(port_id); if (fd < 0) { pthread_rwlock_unlock(&_ap_instance->flows_lock); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } bmp_release(_ap_instance->fds, fd); if (_ap_instance->in_out[fd] != -1) _ap_instance->in_out[_ap_instance->in_out[fd]] = -1; _ap_instance->in_out[fd] = -1; _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = -1; rb = _ap_instance->flows[fd].rb; _ap_instance->flows[fd].rb = NULL; pthread_rwlock_unlock(&_ap_instance->flows_lock); if (rb != NULL) shm_ap_rbuff_close(rb); pthread_rwlock_unlock(&_ipcp->state_lock); LOG_DBGF("Flow with port_id %d deallocated.", port_id); return 0; } static struct ipcp * ipcp_local_create() { struct ipcp * i; struct ipcp_ops * ops; i = ipcp_instance_create(); if (i == NULL) return NULL; i->data = ipcp_data_create(); if (i->data == NULL) { free(i); return NULL; } if (ipcp_data_init(i->data, THIS_TYPE) == NULL) { free(i->data); free(i); return NULL; } ops = malloc(sizeof(*ops)); if (ops == NULL) { free(i->data); free(i); return NULL; } ops->ipcp_bootstrap = ipcp_local_bootstrap; ops->ipcp_enroll = NULL; /* shim */ ops->ipcp_reg = NULL; /* shim */ ops->ipcp_unreg = NULL; /* shim */ ops->ipcp_name_reg = ipcp_local_name_reg; ops->ipcp_name_unreg = ipcp_local_name_unreg; ops->ipcp_flow_alloc = ipcp_local_flow_alloc; ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp; ops->ipcp_flow_dealloc = ipcp_local_flow_dealloc; i->ops = ops; i->state = IPCP_INIT; return i; } #ifndef MAKE_CHECK int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ /* argument 2: ap name */ struct sigaction sig_act; sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGQUIT); sigaddset(&sigset, SIGHUP); sigaddset(&sigset, SIGPIPE); if (ipcp_arg_check(argc, argv)) { LOG_ERR("Wrong arguments."); exit(1); } if (shim_ap_init(argv[2]) < 0) exit(1); /* store the process id of the irmd */ irmd_pid = atoi(argv[1]); /* init sig_act */ memset(&sig_act, 0, sizeof(sig_act)); /* install signal traps */ sig_act.sa_sigaction = &ipcp_sig_handler; sig_act.sa_flags = SA_SIGINFO; sigaction(SIGINT, &sig_act, NULL); sigaction(SIGTERM, &sig_act, NULL); sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); _ipcp = ipcp_local_create(); if (_ipcp == NULL) { LOG_ERR("Won't."); exit(1); } pthread_rwlock_wrlock(&_ipcp->state_lock); pthread_sigmask(SIG_BLOCK, &sigset, NULL); pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); pthread_rwlock_unlock(&_ipcp->state_lock); pthread_join(_ap_instance->mainloop, NULL); shim_ap_fini(); free(_ipcp->data); free(_ipcp->ops); free(_ipcp); exit(EXIT_SUCCESS); } #endif /* MAKE_CHECK */