summaryrefslogtreecommitdiff
path: root/src/ipcpd/local
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-19 22:25:46 +0200
committerdimitri staessens <[email protected]>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/local
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/ipcpd/local')
-rw-r--r--src/ipcpd/local/main.c68
1 files changed, 51 insertions, 17 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 4e500a8a..68c9ae8c 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -25,7 +25,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
@@ -39,6 +39,7 @@
#include <sys/wait.h>
#include <fcntl.h>
+#define EVENT_WAIT_TIMEOUT 100 /* us */
#define THIS_TYPE IPCP_LOCAL
/* global for trapping signal */
@@ -46,18 +47,25 @@ int irmd_api;
struct {
int in_out[IRMD_MAX_FLOWS];
+ flow_set_t * flows;
pthread_rwlock_t lock;
pthread_t sduloop;
} local_data;
-void local_data_init()
+int local_data_init()
{
int i;
for (i = 0; i < IRMD_MAX_FLOWS; ++i)
local_data.in_out[i] = -1;
+ local_data.flows = flow_set_create();
+ if (local_data.flows == NULL)
+ return -ENFILE;
+
pthread_rwlock_init(&local_data.lock, NULL);
+
+ return 0;
}
void local_data_fini()
@@ -67,11 +75,24 @@ void local_data_fini()
static void * ipcp_local_sdu_loop(void * o)
{
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
+
while (true) {
int fd;
- struct rb_entry * e;
+ int ret;
+ ssize_t idx;
+
+ ret = flow_event_wait(local_data.flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = flow_select(NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
+ continue;
+ }
pthread_rwlock_rdlock(&ipcpi.state_lock);
@@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o)
pthread_rwlock_rdlock(&local_data.lock);
- e = local_flow_read(fd);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ idx = local_flow_read(fd);
- fd = local_data.in_out[fd];
+ fd = local_data.in_out[fd];
- if (fd != -1)
- local_flow_write(fd, e);
+ if (fd != -1)
+ local_flow_write(fd, idx);
+ }
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
-
- free(e);
}
- return (void *) 1;
+ return (void *) 0;
}
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
@@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name)
if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Failed to add %s to local registry.", name);
+ LOG_DBG("Failed to add %s to local registry.", name);
return -1;
}
@@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd,
if (ipcp_get_state() != IPCP_ENROLLED) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Won't register with non-enrolled IPCP.");
+ LOG_DBG("Won't register with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
pthread_rwlock_wrlock(&local_data.lock);
+ flow_set_add(local_data.flows, fd);
+
out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
local_data.in_out[fd] = out_fd;
@@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return 0;
pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&local_data.lock);
out_fd = local_data.in_out[fd];
if (out_fd < 0) {
@@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return -1;
}
+ flow_set_add(local_data.flows, fd);
+
+ pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
@@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd)
if (fd < 0)
return -EINVAL;
+ flow_set_del(local_data.flows, fd);
+
while (flow_dealloc(fd) == -EBUSY)
nanosleep(&t, NULL);
@@ -289,9 +318,14 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- local_data_init();
-
if (ap_init(NULL) < 0) {
+ LOG_ERR("Failed to init application.");
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
+ if (local_data_init() < 0) {
+ LOG_ERR("Failed to init local data.");
close_logfile();
exit(EXIT_FAILURE);
}
@@ -331,10 +365,10 @@ int main(int argc, char * argv[])
pthread_cancel(local_data.sduloop);
pthread_join(local_data.sduloop, NULL);
- ap_fini();
-
local_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);