diff options
author | dimitri staessens <[email protected]> | 2016-10-19 22:25:46 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-10-21 14:17:51 +0200 |
commit | f516b51169020ea1957010fbd1005d746f01b1d9 (patch) | |
tree | 03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/normal/fmgr.c | |
parent | c79ab46894053312f80390bf13a52c238a7d4704 (diff) | |
download | ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip |
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per
process. This necessitated the development of a new method for the
asynchronous io call, which is now based on an event queue system for
scalability (fqueue). The ipcpd's and tools have been updated to this
API.
Diffstat (limited to 'src/ipcpd/normal/fmgr.c')
-rw-r--r-- | src/ipcpd/normal/fmgr.c | 143 |
1 files changed, 78 insertions, 65 deletions
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@ #include <ouroboros/dev.h> #include <ouroboros/list.h> #include <ouroboros/ipcp-dev.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h> #include <ouroboros/errno.h> #include <stdlib.h> @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o) struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct np1_flow * flow; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.np1_set, &timeout); - if (fd == -ETIMEDOUT) + int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + + flow = fmgr.np1_flows[fd]; + if (flow == NULL) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } - if (frct_i_write_sdu(flow->cep_id, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); - continue; - } - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + } } return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o) struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; struct pci * pci; - + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.nm1_set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - if (pci->dst_addr != ribmgr_address()) { - LOG_DBG("PDU needs to be forwarded."); + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ ipcp_flow_del(sdb); free(pci); continue; } - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); ipcp_flow_del(sdb); free(pci); continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - if (frct_nm1_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } } } |