summaryrefslogtreecommitdiff
path: root/src/ipcpd/shim-udp
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-21 12:44:00 +0000
committerSander Vrijders <[email protected]>2016-10-21 12:44:00 +0000
commit482c44232d4deda3f89a7d85fbad99c1c64e80ec (patch)
treef3fb790d93da3cbe198b0f0c58d9c7513b0eff23 /src/ipcpd/shim-udp
parent680017a72c7a15b90f223bafcea80fd3e264e984 (diff)
parent02976060919566d1a217b818ca8f33297700d56d (diff)
downloadouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.tar.gz
ouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.zip
Merged in dstaesse/ouroboros/be-demux (pull request #267)
lib: Demultiplex the fast path
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r--src/ipcpd/shim-udp/main.c84
1 files changed, 61 insertions, 23 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 7c109a8a..050623e4 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -27,6 +27,9 @@
#include <ouroboros/utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -75,6 +78,7 @@ struct {
struct sockaddr_in s_saddr;
int s_fd;
+ flow_set_t * np1_flows;
fd_set flow_fd_s;
/* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
int uf_to_fd[FD_SETSIZE];
@@ -90,7 +94,7 @@ struct {
pthread_mutex_t fd_set_lock;
} udp_data;
-static void udp_data_init()
+static int udp_data_init()
{
int i;
@@ -104,13 +108,21 @@ static void udp_data_init()
FD_ZERO(&udp_data.flow_fd_s);
+ udp_data.np1_flows = flow_set_create();
+ if (udp_data.np1_flows == NULL)
+ return -ENOMEM;
+
pthread_rwlock_init(&udp_data.flows_lock, NULL);
pthread_cond_init(&udp_data.fd_set_cond, NULL);
pthread_mutex_init(&udp_data.fd_set_lock, NULL);
+
+ return 0;
}
static void udp_data_fini()
{
+ flow_set_destroy(udp_data.np1_flows);
+
pthread_rwlock_destroy(&udp_data.flows_lock);
pthread_mutex_destroy(&udp_data.fd_set_lock);
pthread_cond_destroy(&udp_data.fd_set_cond);
@@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
close(skfd);
@@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()
static void * ipcp_udp_sdu_loop(void * o)
{
+ int fd;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct shm_du_buff * sdb;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd;
- struct shm_du_buff * sdb;
+ int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
continue;
+ }
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
- fd = udp_data.fd_to_uf[fd].skfd;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ fd = udp_data.fd_to_uf[fd].skfd;
+
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (send(fd,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- 0) < 0)
- LOG_ERR("Failed to send SDU.");
+ if (send(fd,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ 0) < 0)
+ LOG_ERR("Failed to send SDU.");
- ipcp_flow_del(sdb);
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd,
udp_data.fd_to_uf[fd].skfd = skfd;
udp_data.uf_to_fd[skfd] = fd;
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
set_fd(skfd);
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
int remote_udp = -1;
+ struct timespec t = {0, 10000};
struct sockaddr_in r_saddr;
socklen_t r_saddr_len = sizeof(r_saddr);
+ flow_set_del(udp_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)
close(skfd);
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- udp_data_init();
-
if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
+ if (udp_data_init() < 0) {
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
/* store the process id of the irmd */
irmd_api = atoi(argv[1]);
@@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])
pthread_join(udp_data.handler, NULL);
pthread_join(udp_data.sdu_reader, NULL);
- ap_fini();
-
udp_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);