diff options
author | dimitri staessens <[email protected]> | 2016-05-14 16:39:27 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-05-14 21:34:04 +0200 |
commit | 037fec33cda726d0078e23798f462ad273153dd5 (patch) | |
tree | 25c9ef679a0aaa93e5f01f2a68512d8eaf76f3e7 /src | |
parent | c56a4ed3b865b4b240c6f01809c935b7b86d160b (diff) | |
download | ouroboros-037fec33cda726d0078e23798f462ad273153dd5.tar.gz ouroboros-037fec33cda726d0078e23798f462ad273153dd5.zip |
ipcpd: shim-udp: complete locking
Added necessary locks for the shim-udp. This PR also improves thread
management, the main thread now starts a mainloop thread, which spawns
sdu handler threads when it the IPCP is enrolled. If the IPCP exits
the enrolled state, the sdu loop is cancelled.
Diffstat (limited to 'src')
-rw-r--r-- | src/ipcpd/ipcp.c | 34 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 6 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 455 | ||||
-rw-r--r-- | src/lib/shm_ap_rbuff.c | 9 |
4 files changed, 390 insertions, 114 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 13632a80..060178bf 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -29,6 +29,22 @@ #define OUROBOROS_PREFIX "ipcpd/ipcp" #include <ouroboros/logs.h> +struct ipcp * ipcp_instance_create() +{ + struct ipcp * i = malloc(sizeof *i); + if (i == NULL) + return NULL; + + i->data = NULL; + i->ops = NULL; + i->irmd_fd = -1; + i->state = IPCP_INIT; + + rw_lock_init(&i->state_lock); + + return i; +} + int ipcp_arg_check(int argc, char * argv[]) { if (argc != 3) @@ -52,25 +68,33 @@ void * ipcp_main_loop(void * o) uint8_t buf[IPCP_MSG_BUF_SIZE]; struct ipcp * _ipcp = (struct ipcp *) o; - ipcp_msg_t * msg; - ssize_t count; - buffer_t buffer; - ipcp_msg_t ret_msg = IPCP_MSG__INIT; + ipcp_msg_t * msg; + ssize_t count; + buffer_t buffer; + ipcp_msg_t ret_msg = IPCP_MSG__INIT; dif_config_msg_t * conf_msg; struct dif_config conf; + char * sock_path; + if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); return (void *) 1; } - sockfd = server_socket_open(ipcp_sock_path(getpid())); + sock_path = ipcp_sock_path(getpid()); + if (sock_path == NULL) + return (void *) 1; + + sockfd = server_socket_open(sock_path); if (sockfd < 0) { LOG_ERR("Could not open server socket."); return (void *) 1; } + free(sock_path); + while (true) { ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY; diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 393af994..c9002d4d 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -23,6 +23,8 @@ #ifndef IPCPD_IPCP_H #define IPCPD_IPCP_H +#include <ouroboros/rw_lock.h> + #include "ipcp-ops.h" #include "ipcp-data.h" @@ -38,11 +40,13 @@ enum ipcp_state { struct ipcp { struct ipcp_data * data; struct ipcp_ops * ops; + int irmd_fd; enum ipcp_state state; - int irmd_fd; + rw_lock_t state_lock; }; +struct ipcp * ipcp_instance_create(); void * ipcp_main_loop(void * o); void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 14a698ee..3296540e 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -33,6 +33,7 @@ #include <ouroboros/sockets.h> #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> +#include <ouroboros/rw_lock.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -81,18 +82,25 @@ struct shim_ap_data { instance_name_t * api; struct shm_du_map * dum; struct bmp * fds; - struct shm_ap_rbuff * rb; + rw_lock_t data_lock; + struct flow flows[AP_MAX_FLOWS]; + rw_lock_t flows_lock; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader; + + rw_lock_t thread_lock; - pthread_t mainloop; - pthread_t sduloop; - pthread_t handler; - pthread_t sdu_reader; } * _ap_instance; static int shim_ap_init(char * ap_name) { + int i; + _ap_instance = malloc(sizeof(struct shim_ap_data)); if (_ap_instance == NULL) { return -1; @@ -130,11 +138,22 @@ static int shim_ap_init(char * ap_name) _ap_instance->rb = shm_ap_rbuff_create(); if (_ap_instance->rb == NULL) { instance_name_destroy(_ap_instance->api); + shm_du_map_close(_ap_instance->dum); bmp_destroy(_ap_instance->fds); free(_ap_instance); return -1; } + for (i = 0; i < AP_MAX_FLOWS; i ++) { + _ap_instance->flows[i].rb = NULL; + _ap_instance->flows[i].port_id = -1; + _ap_instance->flows[i].state = FLOW_NULL; + } + + rw_lock_init(&_ap_instance->flows_lock); + rw_lock_init(&_ap_instance->thread_lock); + rw_lock_init(&_ap_instance->data_lock); + return 0; } @@ -144,6 +163,9 @@ void shim_ap_fini() if (_ap_instance == NULL) return; + + rw_lock_wrlock(&_ap_instance->data_lock); + if (_ap_instance->api != NULL) instance_name_destroy(_ap_instance->api); if (_ap_instance->fds != NULL) @@ -152,41 +174,76 @@ void shim_ap_fini() shm_du_map_close(_ap_instance->dum); if (_ap_instance->rb != NULL) shm_ap_rbuff_destroy(_ap_instance->rb); + + rw_lock_wrlock(&_ap_instance->flows_lock); + for (i = 0; i < AP_MAX_FLOWS; i ++) if (_ap_instance->flows[i].rb != NULL) shm_ap_rbuff_close(_ap_instance->flows[i].rb); + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + free(_ap_instance); } static int port_id_to_fd(int port_id) { int i; - for (i = 0; i < AP_MAX_FLOWS; ++i) + + rw_lock_rdlock(&_ap_instance->flows_lock); + + for (i = 0; i < AP_MAX_FLOWS; ++i) { if (_ap_instance->flows[i].port_id == port_id - && _ap_instance->flows[i].state != FLOW_NULL) + && _ap_instance->flows[i].state != FLOW_NULL) { + + rw_lock_unlock(&_ap_instance->flows_lock); + return i; + } + } + + rw_lock_unlock(&_ap_instance->flows_lock); + return -1; } static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) { /* the AP chooses the amount of headspace and tailspace */ - size_t index = shm_create_du_buff(_ap_instance->dum, - count, - 0, - buf, - count); - struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; - - if (index == -1) + size_t index; + struct rb_entry e; + + rw_lock_rdlock(&_ap_instance->data_lock); + + index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); + + if (index == -1) { + rw_lock_unlock(&_ap_instance->data_lock); return -1; + } + + e.index = index; + + rw_lock_rdlock(&_ap_instance->flows_lock); + + e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + shm_release_du_buff(_ap_instance->dum, index); + + rw_lock_unlock(&_ap_instance->data_lock); + return -EPIPE; } + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_unlock(&_ap_instance->data_lock); + return 0; } @@ -206,8 +263,7 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - - pthread_mutex_t lock; + rw_lock_t fd_lock; }; struct ipcp_udp_data * ipcp_udp_data_create() @@ -229,6 +285,8 @@ struct ipcp_udp_data * ipcp_udp_data_create() return NULL; } + rw_lock_init(&udp_data->fd_lock); + FD_ZERO(&udp_data->flow_fd_s); return udp_data; @@ -236,21 +294,49 @@ struct ipcp_udp_data * ipcp_udp_data_create() void ipcp_sig_handler(int sig, siginfo_t * info, void * c) { + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + bool clean_threads = false; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: if (info->si_pid == irmd_pid || info->si_pid == 0) { + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state == IPCP_ENROLLED) { + clean_threads = true; + } + + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + + if (clean_threads) { + rw_lock_wrlock(&_ap_instance->thread_lock); + + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader); + pthread_cancel(_ap_instance->sduloop); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader, NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + } + pthread_cancel(_ap_instance->mainloop); - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); - /* FIXME: should be called after join */ - shim_ap_fini(); - exit(0); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + } default: return; @@ -268,6 +354,7 @@ static void * ipcp_udp_listener() while (true) { int fd; + int port_id; memset(&buf, 0, SHIM_UDP_BUF_SIZE); n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, @@ -315,21 +402,27 @@ static void * ipcp_udp_listener() /* reply to IRM */ - _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), - buf, - UNKNOWN_AP, - UNKNOWN_AE); - if (_ap_instance->flows[fd].port_id < 0) { + port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + + if (port_id < 0) { LOG_ERR("Could not get port id from IRMd"); close(fd); continue; } - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; + + rw_lock_unlock(&_ap_instance->flows_lock); LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", - _ap_instance->flows[fd].port_id, fd); + port_id, fd); } return 0; @@ -340,12 +433,26 @@ static void * ipcp_udp_sdu_reader() int n; int fd; char buf[SHIM_UDP_MAX_SDU_SIZE]; - struct timeval tv = {0, 10}; + struct timeval tv = {0, 1000}; struct sockaddr_in r_saddr; fd_set read_fds; while (true) { + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&shim_data(_ipcp)->fd_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) continue; @@ -369,22 +476,96 @@ static void * ipcp_udp_sdu_reader() return (void *) 0; } +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +static void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e; + int fd; + int len = 0; + char * buf; + + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 0; + } + + rw_lock_unlock(&_ipcp->state_lock); + + rw_lock_rdlock(&_ap_instance->data_lock); + + e = shm_ap_rbuff_read(_ap_instance->rb); + + if (e == NULL) { + rw_lock_unlock(&_ap_instance->data_lock); + continue; + } + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + if (len == 0) { + rw_lock_unlock(&_ap_instance->data_lock); + free(e); + continue; + } + + rw_lock_unlock(&_ap_instance->data_lock); + + send(fd, buf, len, 0); + + rw_lock_rdlock(&_ap_instance->data_lock); + + shm_release_du_buff(_ap_instance->dum, e->index); + + rw_lock_unlock(&_ap_instance->data_lock); + + free(e); + } + + return (void *) 1; +} + static int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - int enable = 1; + int enable = 1; + int fd = -1; if (conf->type != THIS_TYPE) { LOG_ERR("Config doesn't match IPCP type."); return -1; } + rw_lock_wrlock(&_ipcp->state_lock); + if (_ipcp->state != IPCP_INIT) { + rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("IPCP in wrong state."); return -1; } + _ipcp->state = IPCP_BOOTSTRAPPING; + + rw_lock_unlock(&_ipcp->state_lock); + if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, @@ -408,37 +589,36 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) strcpy(dnsstr, "not set"); } - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; - /* UDP listen server */ - - if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - LOG_DBGF("Can't create socket."); + if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { + LOG_ERR("Can't create socket."); return -1; } - if (setsockopt(shim_data(_ipcp)->s_fd, + if (setsockopt(fd, SOL_SOCKET, - SO_REUSEADDR, - &enable, + SO_REUSEADDR, + &enable, sizeof(int)) < 0) { - LOG_DBGF("Setsockopt(SO_REUSEADDR) failed."); + LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); } + shim_data(_ipcp)->s_fd = fd; + shim_data(_ipcp)->ip_addr = conf->ip_addr; + shim_data(_ipcp)->dns_addr = conf->dns_addr; + shim_data(_ipcp)->s_saddr.sin_family = AF_INET; shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; - if (bind(shim_data(_ipcp)->s_fd, + if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, sizeof shim_data(_ipcp)->s_saddr ) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); return -1; } - FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_wrlock(&_ap_instance->thread_lock); pthread_create(&_ap_instance->handler, NULL, @@ -449,8 +629,25 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_reader, NULL); + pthread_create(&_ap_instance->sduloop, + NULL, + ipcp_udp_sdu_loop, + NULL); + + rw_lock_unlock(&_ap_instance->thread_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + + rw_lock_wrlock(&_ipcp->state_lock); + _ipcp->state = IPCP_ENROLLED; + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", getpid()); @@ -595,16 +792,21 @@ static int ipcp_udp_name_reg(char * name) uint32_t ip_addr; #endif - if (_ipcp->state != IPCP_ENROLLED) { - LOG_DBGF("Won't register with non-enrolled IPCP."); - return -1; - } - if (strlen(name) > 24) { LOG_ERR("DNS names cannot be longer than 24 chars."); return -1; } + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't register with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (ipcp_data_add_reg_entry(_ipcp->data, name)) { LOG_ERR("Failed to add %s to local registry.", name); return -1; @@ -659,6 +861,16 @@ static int ipcp_udp_name_unreg(char * name) #ifdef CONFIG_OUROBOROS_ENABLE_DNS /* unregister application with DNS server */ + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("IPCP is not enrolled"); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + dns_addr = shim_data(_ipcp)->dns_addr; if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) @@ -697,10 +909,21 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif + struct shm_ap_rbuff * rb; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + return -1; /* -ENOTENROLLED */ + } + + rw_lock_unlock(&_ipcp->state_lock); + if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -726,6 +949,7 @@ static int ipcp_udp_flow_alloc(int port_id, #ifdef CONFIG_OUROBOROS_ENABLE_DNS dns_addr = shim_data(_ipcp)->dns_addr; + if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { @@ -789,35 +1013,45 @@ static int ipcp_udp_flow_alloc(int port_id, free(recv_buf); - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); close(fd); - return -1; + return -1; /* -ENORBUFF */ } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); /* tell IRMd that flow allocation "worked" */ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + shm_ap_rbuff_close(rb); LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); return fd; } static int ipcp_udp_flow_alloc_resp(int port_id, - pid_t n_pid, - int response) + pid_t n_pid, + int response) { + struct shm_ap_rbuff * rb; int fd = port_id_to_fd(port_id); if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); @@ -829,22 +1063,44 @@ static int ipcp_udp_flow_alloc_resp(int port_id, /* awaken pending flow */ + rw_lock_rdlock(&_ap_instance->flows_lock); + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + rw_lock_unlock(&_ap_instance->flows_lock); + LOG_DBGF("Flow was not pending."); return -1; } - _ap_instance->flows[fd].state = FLOW_ALLOCATED; - _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); - if (_ap_instance->flows[fd].rb == NULL) { + rw_lock_unlock(&_ap_instance->flows_lock); + + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); + + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; + + rw_lock_unlock(&_ap_instance->flows_lock); + return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = rb; + + rw_lock_unlock(&_ap_instance->flows_lock); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); return 0; @@ -853,18 +1109,33 @@ static int ipcp_udp_flow_alloc_resp(int port_id, static int ipcp_udp_flow_dealloc(int port_id) { int fd = port_id_to_fd(port_id); + struct shm_ap_rbuff * rb; + if (fd < 0) { LOG_DBGF("Could not find flow with port_id %d.", port_id); return 0; } + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_NULL; _ap_instance->flows[fd].port_id = 0; - if (_ap_instance->flows[fd].rb != NULL) - shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + rb = _ap_instance->flows[fd].rb; + _ap_instance->flows[fd].rb = NULL; + + rw_lock_unlock(&_ap_instance->flows_lock); + + if (rb != NULL) + shm_ap_rbuff_close(rb); + + rw_lock_wrlock(&shim_data(_ipcp)->fd_lock); FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&shim_data(_ipcp)->fd_lock); + close(fd); + return 0; } @@ -877,7 +1148,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name) if (shim_ap_init(ap_name) < 0) return NULL; - i = malloc(sizeof *i); + i = ipcp_instance_create(); if (i == NULL) return NULL; @@ -914,45 +1185,17 @@ static struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK -/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ -/* FIXME: stop eating the CPU */ -static void * ipcp_udp_sdu_loop(void * o) -{ - while (true) { - struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); - int fd; - int len = 0; - char * buf; - if (e == NULL) - continue; - - len = shm_du_map_read_sdu((uint8_t **) &buf, - _ap_instance->dum, - e->index); - if (len == -1) - continue; - - fd = port_id_to_fd(e->port_id); - - if (fd == -1) - continue; - - if (len == 0) - continue; - - send(fd, buf, len, 0); - - shm_release_du_buff(_ap_instance->dum, e->index); - } - - return (void *) 1; -} - int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ /* argument 2: ap name */ struct sigaction sig_act; + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGQUIT); + sigaddset(&sigset, SIGHUP); + sigaddset(&sigset, SIGPIPE); if (ipcp_arg_check(argc, argv)) { LOG_ERR("Wrong arguments."); @@ -980,13 +1223,19 @@ int main (int argc, char * argv[]) exit(1); } + pthread_sigmask(SIG_BLOCK, &sigset, NULL); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); - pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); - pthread_join(_ap_instance->sduloop, NULL); + pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + pthread_join(_ap_instance->mainloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); + + shim_ap_fini(); + + free(_ipcp->data); + free(_ipcp->ops); + free(_ipcp); exit(0); } diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6c977cbb..6c04ccc5 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -245,18 +245,17 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) if (rb == NULL) return NULL; - e = malloc(sizeof(*e)); - if (e == NULL) - return NULL; - pthread_mutex_lock(rb->shm_mutex); if (shm_rbuff_used(rb) == 0) { pthread_mutex_unlock(rb->shm_mutex); - free(e); return NULL; } + e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + *e = *(rb->shm_base + *rb->ptr_tail); *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1); |