diff options
author | dimitri staessens <[email protected]> | 2016-10-19 22:25:46 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-10-21 14:17:51 +0200 |
commit | f516b51169020ea1957010fbd1005d746f01b1d9 (patch) | |
tree | 03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd/shim-eth-llc | |
parent | c79ab46894053312f80390bf13a52c238a7d4704 (diff) | |
download | ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip |
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per
process. This necessitated the development of a new method for the
asynchronous io call, which is now based on an event queue system for
scalability (fqueue). The ipcpd's and tools have been updated to this
API.
Diffstat (limited to 'src/ipcpd/shim-eth-llc')
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 106 |
1 files changed, 71 insertions, 35 deletions
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@ #include <ouroboros/bitmap.h> #include <ouroboros/dev.h> #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/fqueue.h> #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ + /* global for trapping signal */ int irmd_api; @@ -110,6 +114,7 @@ struct { uint8_t * tx_ring; int tx_offset; #endif + flow_set_t * np1_flows; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init() return -ENOMEM; } + eth_llc_data.np1_flows = flow_set_create(); + if (eth_llc_data.np1_flows == NULL) { + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init() void eth_llc_data_fini() { bmp_destroy(eth_llc_data.saps); + flow_set_destroy(eth_llc_data.np1_flows); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) return 0; } - bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - - LOG_DBG("Flow with fd %d deallocated.", fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); return 0; } static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { - shim_eth_llc_msg_t * msg = NULL; - - msg = shim_eth_llc_msg__unpack(NULL, len, buf); + shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf); if (msg == NULL) { LOG_ERR("Failed to unpack."); return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + int fd; + struct shm_du_buff * sdb; + uint8_t ssap; + uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { - int fd; - struct shm_du_buff * sdb; - uint8_t ssap; - uint8_t dsap; - uint8_t r_addr[MAC_SIZE]; - - fd = ipcp_read_shim(&sdb); - if (fd < 0) + int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(ð_llc_data.flows_lock); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } - ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); - dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); - memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, + eth_llc_data.fd_to_ef[fd].r_addr, + MAC_SIZE); + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - eth_llc_ipcp_send_frame(r_addr, dsap, ssap, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb)); - ipcp_flow_del(sdb); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd, return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) static int eth_llc_ipcp_flow_dealloc(int fd) { + struct timespec t = {0, 10000}; + uint8_t sap; uint8_t r_sap; uint8_t addr[MAC_SIZE]; int ret; + flow_set_del(eth_llc_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd) if (ret < 0) LOG_DBG("Could not notify remote."); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (eth_llc_data_init() < 0) + if (ap_init(NULL) < 0) { + close_logfile(); exit(EXIT_FAILURE); + } - if (ap_init(NULL) < 0) { + if (eth_llc_data_init() < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); - ap_fini(); - eth_llc_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); |