summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal/fmgr.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-19 22:25:46 +0200
committerdimitri staessens <[email protected]>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/normal/fmgr.c
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r--src/ipcpd/normal/fmgr.c143
1 files changed, 78 insertions, 65 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 8c627641..2800dcb2 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,7 +27,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
#include <stdlib.h>
@@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct np1_flow * flow;
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.np1_set, &timeout);
- if (fd == -ETIMEDOUT)
+ int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
+
+ flow = fmgr.np1_flows[fd];
+ if (flow == NULL) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
- continue;
- }
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ }
}
return (void *) 0;
@@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct shm_du_buff * sdb;
struct pci * pci;
-
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.nm1_set, &timeout);
- if (fd == -ETIMEDOUT)
- continue;
-
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- }
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- pci = shm_pci_des(sdb);
- if (pci == NULL) {
- LOG_ERR("Failed to get PCI.");
- ipcp_flow_del(sdb);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- if (pci->dst_addr != ribmgr_address()) {
- LOG_DBG("PDU needs to be forwarded.");
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
- if (pci->ttl == 0) {
- LOG_DBG("TTL was zero.");
+ if (pci->dst_addr != ribmgr_address()) {
+ LOG_DBG("PDU needs to be forwarded.");
+
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
ipcp_flow_del(sdb);
free(pci);
continue;
}
- if (shm_pci_dec_ttl(sdb)) {
- LOG_ERR("Failed to decrease TTL.");
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
ipcp_flow_del(sdb);
free(pci);
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
-
- if (shm_pci_shrink(sdb)) {
- LOG_ERR("Failed to shrink PDU.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
- if (frct_nm1_post_sdu(pci, sdb)) {
- LOG_ERR("Failed to hand PDU to FRCT.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
+ if (frct_nm1_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
}
}