diff options
author | dimitri staessens <[email protected]> | 2016-10-19 22:25:46 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-10-21 14:17:51 +0200 |
commit | f516b51169020ea1957010fbd1005d746f01b1d9 (patch) | |
tree | 03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/shim-udp/main.c | |
parent | c79ab46894053312f80390bf13a52c238a7d4704 (diff) | |
download | ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip |
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per
process. This necessitated the development of a new method for the
asynchronous io call, which is now based on an event queue system for
scalability (fqueue). The ipcpd's and tools have been updated to this
API.
Diffstat (limited to 'src/ipcpd/shim-udp/main.c')
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 84 |
1 files changed, 61 insertions, 23 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include <ouroboros/utils.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/errno.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); |