diff options
author | Sander Vrijders <[email protected]> | 2016-10-21 12:44:00 +0000 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-10-21 12:44:00 +0000 |
commit | 482c44232d4deda3f89a7d85fbad99c1c64e80ec (patch) | |
tree | f3fb790d93da3cbe198b0f0c58d9c7513b0eff23 /src/ipcpd/shim-udp | |
parent | 680017a72c7a15b90f223bafcea80fd3e264e984 (diff) | |
parent | 02976060919566d1a217b818ca8f33297700d56d (diff) | |
download | ouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.tar.gz ouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.zip |
Merged in dstaesse/ouroboros/be-demux (pull request #267)
lib: Demultiplex the fast path
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 84 |
1 files changed, 61 insertions, 23 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include <ouroboros/utils.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/errno.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); |