diff options
author | Sander Vrijders <[email protected]> | 2016-05-20 08:58:49 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-05-20 08:58:49 +0200 |
commit | 303034090a9e8da6b096c1e61553dacaf359f187 (patch) | |
tree | 0272b1c35bd3dddaa380a8c8169767445b66a56b | |
parent | 4d348ef99bb3e3927be315ef1bdd1ae827c9a28c (diff) | |
parent | 129b15eea7a790bff0a83d1668b8d666fe0e6f35 (diff) | |
download | ouroboros-303034090a9e8da6b096c1e61553dacaf359f187.tar.gz ouroboros-303034090a9e8da6b096c1e61553dacaf359f187.zip |
Merged in dstaesse/ouroboros/be-udp-flow-alloc-same-port (pull request #100)
ipcpd: Full flow allocation for the shim UDP
-rw-r--r-- | src/ipcpd/ipcp-ops.h | 8 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 8 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/CMakeLists.txt | 14 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 771 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/shim_udp_messages.proto | 14 | ||||
-rw-r--r-- | src/irmd/main.c | 1 |
6 files changed, 558 insertions, 258 deletions
diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index a766c3ae..72c57595 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -38,14 +38,14 @@ struct ipcp_ops { size_t len); int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); - int (* ipcp_flow_alloc)(int port_id, - pid_t n_pid, + int (* ipcp_flow_alloc)(pid_t n_pid, + int port_id, char * dst_ap_name, char * src_ap_name, char * src_ae_name, enum qos_cube qos); - int (* ipcp_flow_alloc_resp)(int port_id, - pid_t n_pid, + int (* ipcp_flow_alloc_resp)(pid_t n_pid, + int port_id, int response); int (* ipcp_flow_dealloc)(int port_id); }; diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index 76d3620b..dd370005 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -200,8 +200,8 @@ void * ipcp_main_loop(void * o) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc(msg->port_id, - msg->pid, + _ipcp->ops->ipcp_flow_alloc(msg->pid, + msg->port_id, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -214,8 +214,8 @@ void * ipcp_main_loop(void * o) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, - msg->pid, + _ipcp->ops->ipcp_flow_alloc_resp(msg->pid, + msg->port_id, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: diff --git a/src/ipcpd/shim-udp/CMakeLists.txt b/src/ipcpd/shim-udp/CMakeLists.txt index e10a8ca6..b4d9ffbd 100644 --- a/src/ipcpd/shim-udp/CMakeLists.txt +++ b/src/ipcpd/shim-udp/CMakeLists.txt @@ -12,6 +12,11 @@ include_directories(${CURRENT_BINARY_PARENT_DIR}) include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_BINARY_DIR}/include) +find_package(ProtobufC REQUIRED) +include_directories(${PROTOBUF_INCLUDE_DIRS}) +protobuf_generate_c(SHIM_UDP_PROTO_SRCS SHIM_UDP_PROTO_HDRS + shim_udp_messages.proto) + # Find library needed for gethostbyname. include(CheckFunctionExists) CHECK_FUNCTION_EXISTS("gethostbyname" CMAKE_HAVE_GETHOSTBYNAME) @@ -64,9 +69,12 @@ set(SHIM_UDP_SOURCES # Add source files here ${CMAKE_CURRENT_SOURCE_DIR}/main.c) +install(FILES ${SHIM_UDP_PROTO_HDRS} DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}) + add_executable (ipcpd-shim-udp ${SHIM_UDP_SOURCES} ${IPCP_SOURCES} - "${CMAKE_CURRENT_BINARY_DIR}/shim_udp_config.h") -target_link_libraries (ipcpd-shim-udp LINK_PUBLIC ouroboros) + ${SHIM_UDP_PROTO_SRCS} "${CMAKE_CURRENT_BINARY_DIR}/shim_udp_config.h") +target_link_libraries (ipcpd-shim-udp LINK_PUBLIC ouroboros + ${PROTOBUF_C_LIBRARY}) # Enable DNS by default if (NOT DISABLE_DNS MATCHES True) @@ -81,4 +89,4 @@ endif (CMAKE_BUILD_TYPE MATCHES Debug) install(TARGETS ipcpd-shim-udp RUNTIME DESTINATION bin) # Enable once ipcp-shim-udp has tests -add_subdirectory(tests) +# add_subdirectory(tests) diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 0802583c..3ef783c4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -52,9 +52,14 @@ #include <sys/wait.h> #include <fcntl.h> +#include "shim_udp_messages.pb-c.h" + +typedef ShimUdpMsg shim_udp_msg_t; + #define THIS_TYPE IPCP_SHIM_UDP #define LISTEN_PORT htons(0x0D1F) #define SHIM_UDP_BUF_SIZE 256 +#define SHIM_UDP_MSG_SIZE 256 #define SHIM_UDP_MAX_SDU_SIZE 8980 #define DNS_TTL 86400 @@ -85,7 +90,6 @@ struct shim_ap_data { struct shm_du_map * dum; struct bmp * fds; struct shm_ap_rbuff * rb; - rw_lock_t data_lock; struct flow flows[AP_MAX_FLOWS]; rw_lock_t flows_lock; @@ -94,6 +98,9 @@ struct shim_ap_data { pthread_t sduloop; pthread_t handler; pthread_t sdu_reader; + + bool fd_set_sync; + pthread_mutex_t fd_set_lock; } * _ap_instance; static int shim_ap_init(char * ap_name) @@ -150,7 +157,7 @@ static int shim_ap_init(char * ap_name) } rw_lock_init(&_ap_instance->flows_lock); - + pthread_mutex_init(&_ap_instance->fd_set_lock, NULL); return 0; } @@ -209,27 +216,31 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) rw_lock_rdlock(&_ipcp->state_lock); + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return -1; /* -ENOTENROLLED */ + } + index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count); if (index == -1) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } e.index = index; - rw_lock_rdlock(&_ap_instance->flows_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); e.port_id = _ap_instance->flows[fd].port_id; if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { rw_lock_unlock(&_ap_instance->flows_lock); shm_release_du_buff(_ap_instance->dum, index); - rw_lock_unlock(&_ipcp->state_lock); return -EPIPE; } rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); return 0; @@ -239,13 +250,29 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) * end copy from dev.c */ +/* only call this under flows_lock */ +static int udp_port_to_fd(int udp_port) +{ + int i; + struct sockaddr_in f_saddr; + socklen_t len = sizeof(f_saddr); + + for (i = 0; i < AP_MAX_FLOWS; ++i) { + if (getsockname(i, (struct sockaddr *) &f_saddr, &len) < 0) + continue; + if (f_saddr.sin_port == udp_port) + return i; + } + + return -1; +} + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; uint32_t ip_addr; uint32_t dns_addr; - /* listen server */ struct sockaddr_in s_saddr; int s_fd; @@ -260,7 +287,7 @@ struct ipcp_udp_data * ipcp_udp_data_create() struct ipcp_data * data; enum ipcp_type ipcp_type; - udp_data = malloc(sizeof *udp_data); + udp_data = malloc(sizeof(*udp_data)); if (udp_data == NULL) { LOG_ERR("Failed to allocate."); return NULL; @@ -278,74 +305,272 @@ struct ipcp_udp_data * ipcp_udp_data_create() return udp_data; } -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +static int send_shim_udp_msg(shim_udp_msg_t * msg, + uint32_t dst_ip_addr) { - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - bool clean_threads = false; + buffer_t buf; + struct sockaddr_in r_saddr; + + memset((char *)&r_saddr, 0, sizeof(r_saddr)); + r_saddr.sin_family = AF_INET; + r_saddr.sin_addr.s_addr = dst_ip_addr; + r_saddr.sin_port = LISTEN_PORT; + + buf.size = shim_udp_msg__get_packed_size(msg); + if (buf.size == 0) { + return -1; + } + + buf.data = malloc(SHIM_UDP_MSG_SIZE); + if (buf.data == NULL) { + return -1; + } + + shim_udp_msg__pack(msg, buf.data); + + if (sendto(shim_data(_ipcp)->s_fd, + buf.data, + buf.size, + 0, + (struct sockaddr *) &r_saddr, + sizeof(r_saddr)) == -1) { + LOG_ERR("Failed to send message."); + free(buf.data); + return -1; + } + + free(buf.data); + + return 0; +} - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - if (info->si_pid == irmd_pid || info->si_pid == 0) { - pthread_sigmask(SIG_BLOCK, &sigset, NULL); +static int ipcp_udp_port_alloc(uint32_t dst_ip_addr, + uint32_t src_udp_port, + char * dst_name, + char * src_ap_name, + char * src_ae_name) +{ + shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; - LOG_DBG("Terminating by order of %d. Bye.", - info->si_pid); + msg.code = SHIM_UDP_MSG_CODE__FLOW_REQ; + msg.src_udp_port = src_udp_port; + msg.dst_name = dst_name; + msg.src_ap_name = src_ap_name; + msg.src_ae_name = src_ae_name; - rw_lock_wrlock(&_ipcp->state_lock); + return send_shim_udp_msg(&msg, dst_ip_addr); +} - if (_ipcp->state == IPCP_ENROLLED) { - clean_threads = true; - } +static int ipcp_udp_port_alloc_resp(uint32_t ip_addr, + uint16_t src_udp_port, + uint16_t dst_udp_port, + int response) +{ + shim_udp_msg_t msg = SHIM_UDP_MSG__INIT; - if (clean_threads) { - pthread_cancel(_ap_instance->handler); - pthread_cancel(_ap_instance->sdu_reader); - pthread_cancel(_ap_instance->sduloop); + msg.code = SHIM_UDP_MSG_CODE__FLOW_REPLY; + msg.src_udp_port = src_udp_port; + msg.has_dst_udp_port = true; + msg.dst_udp_port = dst_udp_port; + msg.has_response = true; + msg.response = response; - pthread_join(_ap_instance->sduloop, NULL); - pthread_join(_ap_instance->handler, NULL); - pthread_join(_ap_instance->sdu_reader, NULL); - } + return send_shim_udp_msg(&msg, ip_addr); +} - pthread_cancel(_ap_instance->mainloop); +static int ipcp_udp_port_req(struct sockaddr_in * c_saddr, + char * dst_name, + char * src_ap_name, + char * src_ae_name) +{ + int fd; + int port_id; - _ipcp->state = IPCP_SHUTDOWN; + struct sockaddr_in f_saddr; + socklen_t f_saddr_len = sizeof(f_saddr); + + LOG_DBGF("Port request arrived from UDP port %d", + ntohs(c_saddr->sin_port)); + + if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + LOG_ERR("Could not create UDP socket."); + return -1; + } + + memset((char *) &f_saddr, 0, sizeof(f_saddr)); + f_saddr.sin_family = AF_INET; + f_saddr.sin_addr.s_addr = local_ip; + + /* + * FIXME: we could have a port dedicated per registered AP + * Not that critical for UDP, but will be for LLC + */ + + f_saddr.sin_port = 0; + + if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { + LOG_ERR("Could not bind to socket."); + close(fd); + return -1; + } + + if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + LOG_ERR("Could not get address from fd."); + return -1; + } + + /* + * store the remote address in the file descriptor + * this avoids having to store the sockaddr_in in + * the flow structure + */ + + if (connect(fd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) { + LOG_ERR("Could not connect to remote UDP client."); + close(fd); + return -1; + } + + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + /* reply to IRM */ + port_id = ipcp_flow_req_arr(getpid(), + dst_name, + src_ap_name, + src_ae_name); + + if (port_id < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_ERR("Could not get port id from IRMd"); + close(fd); + return -1; + } + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + LOG_DBGF("Pending allocation request, port_id %d, UDP port (%d, %d).", + port_id, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port)); + + return 0; +} + +static int ipcp_udp_port_alloc_reply(int src_udp_port, + int dst_udp_port, + int response) +{ + int fd = -1; + int ret = 0; + int port_id = -1; + + struct sockaddr_in t_saddr; + socklen_t t_saddr_len = sizeof(t_saddr); + + LOG_DBGF("Received reply for flow on udp port %d.", + ntohs(dst_udp_port)); + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_rdlock(&_ap_instance->flows_lock); + + fd = udp_port_to_fd(dst_udp_port); + if (fd == -1) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Unknown flow on UDP port %d.", dst_udp_port); + return -1; /* -EUNKNOWNFLOW */ + } + + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow on UDP port %d not pending.", dst_udp_port); + return -1; /* -EFLOWNOTPENDING */ + } + + port_id = _ap_instance->flows[fd].port_id; + + if (response) { + _ap_instance->flows[fd].port_id = -1; + _ap_instance->flows[fd].rb = NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + _ap_instance->flows[fd].state = FLOW_NULL; + } else { + /* get the original address with the LISTEN PORT */ + if (getpeername(fd, + (struct sockaddr *) &t_saddr, + &t_saddr_len) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return -1; + } - pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + /* connect to the flow udp port */ + t_saddr.sin_port = src_udp_port; + if (connect(fd, + (struct sockaddr *) &t_saddr, + sizeof(t_saddr)) < 0) { + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + close(fd); + return -1; } - default: - return; + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + } + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + + if ((ret = ipcp_flow_alloc_reply(getpid(), + port_id, + response)) < 0) { + return -1; /* -EPIPE */ } + + LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).", + ntohs(src_udp_port), ntohs(dst_udp_port)); + + return ret; + } static void * ipcp_udp_listener() { - char buf[SHIM_UDP_BUF_SIZE]; + uint8_t buf[SHIM_UDP_MSG_SIZE]; int n = 0; - struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; - int sfd = shim_data(_ipcp)->s_fd; while (true) { - int fd; - int port_id; + int sfd = 0; + shim_udp_msg_t * msg = NULL; rw_lock_rdlock(&_ipcp->state_lock); - memset(&buf, 0, SHIM_UDP_BUF_SIZE); - n = sizeof c_saddr; - n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + return (void *) 1; /* -ENOTENROLLED */ + } + + sfd = shim_data(_ipcp)->s_fd; + + rw_lock_unlock(&_ipcp->state_lock); + + memset(&buf, 0, SHIM_UDP_MSG_SIZE); + n = sizeof(c_saddr); + n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); + if (n < 0) { - rw_lock_unlock(&_ipcp->state_lock); continue; } @@ -353,69 +578,36 @@ static void * ipcp_udp_listener() if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr, sizeof(c_saddr.sin_addr.s_addr), AF_INET) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); continue; } - fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - - memset((char *) &f_saddr, 0, sizeof f_saddr); - f_saddr.sin_family = AF_INET; - f_saddr.sin_addr.s_addr = local_ip; - - /* - * FIXME: we could have a port dedicated per registered AP - * Not that critical for UDP, but will be for LLC - */ - - f_saddr.sin_port = 0; - - /* - * store the remote address in the file descriptor - * this avoids having to store the sockaddr_in in - * the flow structure - */ - - if (connect(fd, - (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - close(fd); + msg = shim_udp_msg__unpack(NULL, n, buf); + if (msg == NULL) { continue; } - /* echo back the packet */ - if (send(fd, buf, strlen(buf), 0) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to echo back the packet."); - close(fd); + switch (msg->code) { + case SHIM_UDP_MSG_CODE__FLOW_REQ: + c_saddr.sin_port = msg->src_udp_port; + ipcp_udp_port_req(&c_saddr, + msg->dst_name, + msg->src_ap_name, + msg->src_ae_name); + break; + case SHIM_UDP_MSG_CODE__FLOW_REPLY: + ipcp_udp_port_alloc_reply(msg->src_udp_port, + msg->dst_udp_port, + msg->response); + break; + default: + LOG_ERR("Unknown message received %d.", msg->code); + shim_udp_msg__free_unpacked(msg, NULL); continue; } - /* reply to IRM */ - rw_lock_wrlock(&_ap_instance->flows_lock); - - port_id = ipcp_flow_req_arr(getpid(), - buf, - UNKNOWN_AP, - UNKNOWN_AE); + c_saddr.sin_port = LISTEN_PORT; - if (port_id < 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not get port id from IRMd"); - close(fd); - continue; - } - - _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].rb = NULL; - _ap_instance->flows[fd].state = FLOW_PENDING; - - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - - LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", - port_id, fd); + shim_udp_msg__free_unpacked(msg, NULL); } return 0; @@ -436,16 +628,22 @@ static void * ipcp_udp_sdu_reader() if (_ipcp->state != IPCP_ENROLLED) { rw_lock_unlock(&_ipcp->state_lock); - return (void *) 0; + return (void *) 1; /* -ENOTENROLLED */ } rw_lock_rdlock(&_ap_instance->flows_lock); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + read_fds = shim_data(_ipcp)->flow_fd_s; + _ap_instance->fd_set_sync = false; + + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); continue; } @@ -455,7 +653,7 @@ static void * ipcp_udp_sdu_reader() flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); - n = sizeof r_saddr; + n = sizeof(r_saddr); if ((n = recvfrom(fd, buf, SHIM_UDP_MAX_SDU_SIZE, @@ -468,9 +666,6 @@ static void * ipcp_udp_sdu_reader() if (ipcp_udp_flow_write(fd, buf, n) < 0) LOG_ERR("Failed to write SDU."); } - - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); } return (void *) 0; @@ -479,6 +674,7 @@ static void * ipcp_udp_sdu_reader() /* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ static void * ipcp_udp_sdu_loop(void * o) { + while (true) { struct rb_entry * e; int fd; @@ -489,7 +685,7 @@ static void * ipcp_udp_sdu_loop(void * o) if (_ipcp->state != IPCP_ENROLLED) { rw_lock_unlock(&_ipcp->state_lock); - return (void *) 0; + return (void *) 1; /* -ENOTENROLLED */ } e = shm_ap_rbuff_read(_ap_instance->rb); @@ -502,7 +698,7 @@ static void * ipcp_udp_sdu_loop(void * o) len = shm_du_map_read_sdu((uint8_t **) &buf, _ap_instance->dum, e->index); - if (len == -1) { + if (len <= 0) { rw_lock_unlock(&_ipcp->state_lock); free(e); continue; @@ -512,16 +708,10 @@ static void * ipcp_udp_sdu_loop(void * o) fd = port_id_to_fd(e->port_id); - if (fd == -1) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); - free(e); - continue; - } + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); - if (len == 0) { - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + if (fd == -1) { free(e); continue; } @@ -529,19 +719,57 @@ static void * ipcp_udp_sdu_loop(void * o) if (send(fd, buf, len, 0) < 0) LOG_ERR("Failed to send SDU."); - shm_release_du_buff(_ap_instance->dum, e->index); + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ap_instance->dum != NULL) + shm_release_du_buff(_ap_instance->dum, e->index); - rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); - - free(e); } return (void *) 1; } +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + sigset_t sigset; + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state == IPCP_ENROLLED) { + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader); + pthread_cancel(_ap_instance->sduloop); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader, NULL); + } + + pthread_cancel(_ap_instance->mainloop); + + _ipcp->state = IPCP_SHUTDOWN; + + rw_lock_unlock(&_ipcp->state_lock); + } + default: + return; + } +} + static int ipcp_udp_bootstrap(struct dif_config * conf) { + struct sockaddr_in s_saddr; char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; int enable = 1; @@ -552,19 +780,10 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - rw_lock_wrlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_INIT) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("IPCP in wrong state."); - return -1; - } - if (inet_ntop(AF_INET, &conf->ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert IP address"); return -1; } @@ -574,7 +793,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) &conf->dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Failed to convert DNS address"); return -1; } @@ -587,7 +805,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Can't create socket."); return -1; } @@ -596,28 +813,39 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) SOL_SOCKET, SO_REUSEADDR, &enable, - sizeof(int)) < 0) { + sizeof(int)) < 0) LOG_WARN("Setsockopt(SO_REUSEADDR) failed."); - } - - shim_data(_ipcp)->s_fd = fd; - shim_data(_ipcp)->ip_addr = conf->ip_addr; - shim_data(_ipcp)->dns_addr = conf->dns_addr; + memset((char *) &s_saddr, 0, sizeof(s_saddr)); shim_data(_ipcp)->s_saddr.sin_family = AF_INET; shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr; shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT; if (bind(fd, (struct sockaddr *) &shim_data(_ipcp)->s_saddr, - sizeof shim_data(_ipcp)->s_saddr ) < 0) { - rw_lock_unlock(&_ipcp->state_lock); + sizeof(shim_data(_ipcp)->s_saddr)) < 0) { LOG_ERR("Couldn't bind to %s.", ipstr); + close(fd); return -1; } + rw_lock_wrlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_INIT) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_ERR("IPCP in wrong state."); + close(fd); + return -1; + } + + shim_data(_ipcp)->s_fd = fd; + shim_data(_ipcp)->ip_addr = conf->ip_addr; + shim_data(_ipcp)->dns_addr = conf->dns_addr; + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + _ipcp->state = IPCP_ENROLLED; + pthread_create(&_ap_instance->handler, NULL, ipcp_udp_listener, @@ -632,8 +860,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) ipcp_udp_sdu_loop, NULL); - _ipcp->state = IPCP_ENROLLED; - rw_lock_unlock(&_ipcp->state_lock); LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", @@ -803,18 +1029,19 @@ static int ipcp_udp_name_reg(char * name) /* register application with DNS server */ dns_addr = shim_data(_ipcp)->dns_addr; + + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { ip_addr = shim_data(_ipcp)->ip_addr; if (inet_ntop(AF_INET, &ip_addr, ipstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } @@ -822,15 +1049,13 @@ static int ipcp_udp_name_reg(char * name) dnsstr, name, DNS_TTL, ipstr); if (ddns_send(cmd)) { + rw_lock_rdlock(&_ipcp->state_lock); ipcp_data_del_reg_entry(_ipcp->data, name); rw_lock_unlock(&_ipcp->state_lock); return -1; } } #endif - - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Registered %s.", name); return 0; @@ -862,10 +1087,12 @@ static int ipcp_udp_name_unreg(char * name) } dns_addr = shim_data(_ipcp)->dns_addr; + + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN) == NULL) { - rw_lock_unlock(&_ipcp->state_lock); return -1; } sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n", @@ -875,50 +1102,41 @@ static int ipcp_udp_name_unreg(char * name) } #endif + rw_lock_rdlock(&_ipcp->state_lock); + ipcp_data_del_reg_entry(_ipcp->data, name); rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Unregistered %s.", name); - return 0; } -static int ipcp_udp_flow_alloc(int port_id, - pid_t n_pid, +static int ipcp_udp_flow_alloc(pid_t n_pid, + int port_id, char * dst_name, char * src_ap_name, char * src_ae_name, enum qos_cube qos) { - struct sockaddr_in l_saddr; - struct sockaddr_in r_saddr; - struct sockaddr_in rf_saddr; + struct sockaddr_in r_saddr; /* server address */ + struct sockaddr_in f_saddr; /* flow */ + socklen_t f_saddr_len = sizeof(f_saddr); int fd; - int n; - char * recv_buf = NULL; struct hostent * h; uint32_t ip_addr = 0; + bool fd_wait = true; #ifdef CONFIG_OUROBOROS_ENABLE_DNS uint32_t dns_addr = 0; #endif struct shm_ap_rbuff * rb; + LOG_INFO("Allocating flow from %s to %s.", src_ap_name, dst_name); + if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - - rw_lock_rdlock(&_ipcp->state_lock); - - if (_ipcp->state != IPCP_ENROLLED) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); - return -1; /* -ENOTENROLLED */ - } - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { - rw_lock_unlock(&_ipcp->state_lock); LOG_ERR("Name too long for this shim."); return -1; } @@ -926,27 +1144,46 @@ static int ipcp_udp_flow_alloc(int port_id, if (qos != QOS_CUBE_BE) LOG_DBGF("QoS requested. UDP/IP can't do that."); + rb = shm_ap_rbuff_open(n_pid); + if (rb == NULL) + return -1; /* -ENORBUFF */ + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ - memset((char *) &l_saddr, 0, sizeof l_saddr); - l_saddr.sin_family = AF_INET; - l_saddr.sin_addr.s_addr = local_ip; - l_saddr.sin_port = 0; + memset((char *) &f_saddr, 0, sizeof(f_saddr)); + f_saddr.sin_family = AF_INET; + f_saddr.sin_addr.s_addr = local_ip; + f_saddr.sin_port = 0; - if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); + if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) { close(fd); return -1; } + if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) { + LOG_ERR("Could not get address from fd."); + close(fd); + return -1; + } + + rw_lock_rdlock(&_ipcp->state_lock); + + if (_ipcp->state != IPCP_ENROLLED) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Won't allocate flow with non-enrolled IPCP."); + close(fd); + return -1; /* -ENOTENROLLED */ + } + #ifdef CONFIG_OUROBOROS_ENABLE_DNS dns_addr = shim_data(_ipcp)->dns_addr; + rw_lock_unlock(&_ipcp->state_lock); + if (dns_addr != 0) { ip_addr = ddns_resolve(dst_name, dns_addr); if (ip_addr == 0) { - rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -955,7 +1192,6 @@ static int ipcp_udp_flow_alloc(int port_id, #endif h = gethostbyname(dst_name); if (h == NULL) { - rw_lock_unlock(&_ipcp->state_lock); LOG_DBGF("Could not resolve %s.", dst_name); close(fd); return -1; @@ -966,97 +1202,86 @@ static int ipcp_udp_flow_alloc(int port_id, } #endif - memset((char *) &r_saddr, 0, sizeof r_saddr); + /* connect to server (store the remote IP address in the fd) */ + memset((char *) &r_saddr, 0, sizeof(r_saddr)); r_saddr.sin_family = AF_INET; r_saddr.sin_addr.s_addr = ip_addr; r_saddr.sin_port = LISTEN_PORT; - if (sendto(fd, dst_name, strlen(dst_name), 0, - (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to send packet"); + if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) { close(fd); return -1; } - /* wait for the other shim IPCP to respond */ + rw_lock_unlock(&_ipcp->state_lock); - recv_buf = malloc(strlen(dst_name) + 1); - if (recv_buf == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Failed to malloc recv_buff."); - close(fd); - return -1; - } - n = sizeof(rf_saddr); - n = recvfrom(fd, - recv_buf, - strlen(dst_name), - 0, - (struct sockaddr *) &rf_saddr, - (unsigned *) &n); - - if (connect(fd, - (struct sockaddr *) &rf_saddr, - sizeof rf_saddr) - < 0) { - rw_lock_unlock(&_ipcp->state_lock); - close(fd); - free(recv_buf); - return -1; - } + LOG_DBGF("Pending flow with port_id %d on UDP port %d.", + port_id, ntohs(f_saddr.sin_port)); - if (memcmp(recv_buf, dst_name, strlen(dst_name))) - LOG_WARN("Incorrect echo from server"); + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); - free(recv_buf); + pthread_mutex_lock(&_ap_instance->fd_set_lock); - rb = shm_ap_rbuff_open(n_pid); - if (rb == NULL) { - rw_lock_unlock(&_ipcp->state_lock); - LOG_ERR("Could not open N + 1 ringbuffer."); - close(fd); - return -1; /* -ENORBUFF */ + _ap_instance->fd_set_sync = true; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + while (fd_wait) { + sched_yield(); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + fd_wait = _ap_instance->fd_set_sync; + pthread_mutex_unlock(&_ap_instance->fd_set_lock); } - rw_lock_wrlock(&_ap_instance->flows_lock); _ap_instance->flows[fd].port_id = port_id; - _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].state = FLOW_PENDING; _ap_instance->flows[fd].rb = rb; - FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + if (ipcp_udp_port_alloc(ip_addr, + f_saddr.sin_port, + dst_name, + src_ap_name, + src_ae_name) < 0) { + LOG_DBGF("Port alloc returned -1."); + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); - /* tell IRMd that flow allocation "worked" */ + _ap_instance->flows[fd].port_id = -1; + _ap_instance->flows[fd].state = FLOW_NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + _ap_instance->flows[fd].rb = NULL; - if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { + rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); - shm_ap_rbuff_close(rb); - LOG_ERR("Failed to notify IRMd about flow allocation reply"); close(fd); return -1; } - rw_lock_unlock(&_ipcp->state_lock); - - LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd); - return fd; } -static int ipcp_udp_flow_alloc_resp(int port_id, - pid_t n_pid, +static int ipcp_udp_flow_alloc_resp(pid_t n_pid, + int port_id, int response) { struct shm_ap_rbuff * rb; - struct timespec wait = {0, 1000000}; int fd = -1; + struct sockaddr_in f_saddr; + struct sockaddr_in r_saddr; + socklen_t len = sizeof(r_saddr); + bool fd_wait = true; if (response) return 0; - rw_lock_unlock(&_ipcp->state_lock); + rw_lock_rdlock(&_ipcp->state_lock); /* awaken pending flow */ @@ -1081,23 +1306,71 @@ static int ipcp_udp_flow_alloc_resp(int port_id, if (rb == NULL) { LOG_ERR("Could not open N + 1 ringbuffer."); _ap_instance->flows[fd].state = FLOW_NULL; - _ap_instance->flows[fd].port_id = 0; + _ap_instance->flows[fd].port_id = -1; rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); return 0; } + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return 0; + }; + + if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) { + rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d has no peer.", port_id); + return 0; + }; + + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + _ap_instance->flows[fd].state = FLOW_ALLOCATED; _ap_instance->flows[fd].rb = rb; + pthread_mutex_lock(&_ap_instance->fd_set_lock); + + _ap_instance->fd_set_sync = true; FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - nanosleep(&wait, NULL); + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + + while (fd_wait) { + sched_yield(); + pthread_mutex_lock(&_ap_instance->fd_set_lock); + fd_wait = _ap_instance->fd_set_sync; + pthread_mutex_unlock(&_ap_instance->fd_set_lock); + } rw_lock_unlock(&_ap_instance->flows_lock); rw_lock_unlock(&_ipcp->state_lock); - LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); + if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr, + f_saddr.sin_port, + r_saddr.sin_port, + response) < 0) { + rw_lock_rdlock(&_ipcp->state_lock); + rw_lock_wrlock(&_ap_instance->flows_lock); + + _ap_instance->flows[fd].state = FLOW_NULL; + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + _ap_instance->flows[fd].rb = NULL; + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&_ap_instance->flows_lock); + rw_lock_unlock(&_ipcp->state_lock); + + LOG_DBGF("Could not send response."); + return -1; + } + + LOG_DBGF("Accepted flow, port_id %d on UDP fd %d.", port_id, fd); return 0; } @@ -1106,7 +1379,8 @@ static int ipcp_udp_flow_dealloc(int port_id) { int fd = -1; struct shm_ap_rbuff * rb; - struct timespec wait = {0, 1000000}; + + LOG_DBGF("Deallocating flow with port_id %d.", port_id); rw_lock_rdlock(&_ipcp->state_lock); rw_lock_wrlock(&_ap_instance->flows_lock); @@ -1124,30 +1398,28 @@ static int ipcp_udp_flow_dealloc(int port_id) rb = _ap_instance->flows[fd].rb; _ap_instance->flows[fd].rb = NULL; + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + rw_lock_unlock(&_ap_instance->flows_lock); + if (rb != NULL) shm_ap_rbuff_close(rb); - FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); - - nanosleep(&wait, NULL); + rw_lock_unlock(&_ipcp->state_lock); close(fd); - rw_lock_unlock(&_ap_instance->flows_lock); - rw_lock_unlock(&_ipcp->state_lock); + LOG_DBGF("Flow with port_id %d deallocated.", port_id); return 0; } -static struct ipcp * ipcp_udp_create(char * ap_name) +static struct ipcp * ipcp_udp_create() { struct ipcp * i; struct ipcp_udp_data * data; struct ipcp_ops * ops; - if (shim_ap_init(ap_name) < 0) - return NULL; - i = ipcp_instance_create(); if (i == NULL) return NULL; @@ -1158,7 +1430,7 @@ static struct ipcp * ipcp_udp_create(char * ap_name) return NULL; } - ops = malloc(sizeof *ops); + ops = malloc(sizeof(*ops)); if (ops == NULL) { free(data); free(i); @@ -1202,11 +1474,14 @@ int main (int argc, char * argv[]) exit(1); } + if (shim_ap_init(argv[2]) < 0) + exit(1); + /* store the process id of the irmd */ irmd_pid = atoi(argv[1]); /* init sig_act */ - memset(&sig_act, 0, sizeof sig_act); + memset(&sig_act, 0, sizeof(sig_act)); /* install signal traps */ sig_act.sa_sigaction = &ipcp_sig_handler; @@ -1217,18 +1492,22 @@ int main (int argc, char * argv[]) sigaction(SIGHUP, &sig_act, NULL); sigaction(SIGPIPE, &sig_act, NULL); - _ipcp = ipcp_udp_create(argv[2]); + _ipcp = ipcp_udp_create(); if (_ipcp == NULL) { LOG_ERR("Won't."); exit(1); } + rw_lock_wrlock(&_ipcp->state_lock); + pthread_sigmask(SIG_BLOCK, &sigset, NULL); pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); + rw_lock_unlock(&_ipcp->state_lock); + pthread_join(_ap_instance->mainloop, NULL); shim_ap_fini(); diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto new file mode 100644 index 00000000..1d054f1f --- /dev/null +++ b/src/ipcpd/shim-udp/shim_udp_messages.proto @@ -0,0 +1,14 @@ +enum shim_udp_msg_code { + FLOW_REQ = 1; + FLOW_REPLY = 2; +}; + +message shim_udp_msg { + required shim_udp_msg_code code = 1; + optional string dst_name = 2; + optional string src_ap_name = 3; + optional string src_ae_name = 4; + required sint32 src_udp_port = 5; + optional sint32 dst_udp_port = 6; + optional sint32 response = 7; +}; diff --git a/src/irmd/main.c b/src/irmd/main.c index 89263977..28f82751 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1047,7 +1047,6 @@ static int flow_alloc_res(int port_id) if (e->state == FLOW_ALLOCATED) { rw_lock_unlock(&instance->flows_lock); rw_lock_unlock(&instance->state_lock); - LOG_DBGF("Returning 0."); return 0; } if (e->state == FLOW_NULL) { |