summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-19 22:25:46 +0200
committerdimitri staessens <[email protected]>2016-10-21 14:17:51 +0200
commitf516b51169020ea1957010fbd1005d746f01b1d9 (patch)
tree03d19b0dfb6eab68f8ee5a3ecac5300c7bef2f4b /src/ipcpd
parentc79ab46894053312f80390bf13a52c238a7d4704 (diff)
downloadouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.tar.gz
ouroboros-f516b51169020ea1957010fbd1005d746f01b1d9.zip
lib: Demultiplex the fast path
The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API.
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/ipcp.c6
-rw-r--r--src/ipcpd/local/main.c68
-rw-r--r--src/ipcpd/normal/fmgr.c143
-rw-r--r--src/ipcpd/shim-eth-llc/main.c106
-rw-r--r--src/ipcpd/shim-udp/main.c84
5 files changed, 264 insertions, 143 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index a9f80ee7..f9246c7a 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o)
ret_msg.has_result = true;
ret_msg.result =
ipcpi.ops->ipcp_flow_alloc(fd,
- msg->dst_name,
- msg->src_ae_name,
- msg->qos_cube);
+ msg->dst_name,
+ msg->src_ae_name,
+ msg->qos_cube);
if (ret_msg.result < 0) {
LOG_DBG("Deallocate failed on port_id %d.",
msg->port_id);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 4e500a8a..68c9ae8c 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -25,7 +25,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
@@ -39,6 +39,7 @@
#include <sys/wait.h>
#include <fcntl.h>
+#define EVENT_WAIT_TIMEOUT 100 /* us */
#define THIS_TYPE IPCP_LOCAL
/* global for trapping signal */
@@ -46,18 +47,25 @@ int irmd_api;
struct {
int in_out[IRMD_MAX_FLOWS];
+ flow_set_t * flows;
pthread_rwlock_t lock;
pthread_t sduloop;
} local_data;
-void local_data_init()
+int local_data_init()
{
int i;
for (i = 0; i < IRMD_MAX_FLOWS; ++i)
local_data.in_out[i] = -1;
+ local_data.flows = flow_set_create();
+ if (local_data.flows == NULL)
+ return -ENFILE;
+
pthread_rwlock_init(&local_data.lock, NULL);
+
+ return 0;
}
void local_data_fini()
@@ -67,11 +75,24 @@ void local_data_fini()
static void * ipcp_local_sdu_loop(void * o)
{
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
+
while (true) {
int fd;
- struct rb_entry * e;
+ int ret;
+ ssize_t idx;
+
+ ret = flow_event_wait(local_data.flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = flow_select(NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
+ continue;
+ }
pthread_rwlock_rdlock(&ipcpi.state_lock);
@@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o)
pthread_rwlock_rdlock(&local_data.lock);
- e = local_flow_read(fd);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ idx = local_flow_read(fd);
- fd = local_data.in_out[fd];
+ fd = local_data.in_out[fd];
- if (fd != -1)
- local_flow_write(fd, e);
+ if (fd != -1)
+ local_flow_write(fd, idx);
+ }
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
-
- free(e);
}
- return (void *) 1;
+ return (void *) 0;
}
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
@@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name)
if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Failed to add %s to local registry.", name);
+ LOG_DBG("Failed to add %s to local registry.", name);
return -1;
}
@@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd,
if (ipcp_get_state() != IPCP_ENROLLED) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Won't register with non-enrolled IPCP.");
+ LOG_DBG("Won't register with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
pthread_rwlock_wrlock(&local_data.lock);
+ flow_set_add(local_data.flows, fd);
+
out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
local_data.in_out[fd] = out_fd;
@@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return 0;
pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&local_data.lock);
out_fd = local_data.in_out[fd];
if (out_fd < 0) {
@@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return -1;
}
+ flow_set_add(local_data.flows, fd);
+
+ pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
@@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd)
if (fd < 0)
return -EINVAL;
+ flow_set_del(local_data.flows, fd);
+
while (flow_dealloc(fd) == -EBUSY)
nanosleep(&t, NULL);
@@ -289,9 +318,14 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- local_data_init();
-
if (ap_init(NULL) < 0) {
+ LOG_ERR("Failed to init application.");
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
+ if (local_data_init() < 0) {
+ LOG_ERR("Failed to init local data.");
close_logfile();
exit(EXIT_FAILURE);
}
@@ -331,10 +365,10 @@ int main(int argc, char * argv[])
pthread_cancel(local_data.sduloop);
pthread_join(local_data.sduloop, NULL);
- ap_fini();
-
local_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 8c627641..2800dcb2 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,7 +27,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
#include <stdlib.h>
@@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct np1_flow * flow;
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.np1_set, &timeout);
- if (fd == -ETIMEDOUT)
+ int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
+
+ flow = fmgr.np1_flows[fd];
+ if (flow == NULL) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
- continue;
- }
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ }
}
return (void *) 0;
@@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct shm_du_buff * sdb;
struct pci * pci;
-
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.nm1_set, &timeout);
- if (fd == -ETIMEDOUT)
- continue;
-
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- }
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- pci = shm_pci_des(sdb);
- if (pci == NULL) {
- LOG_ERR("Failed to get PCI.");
- ipcp_flow_del(sdb);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- if (pci->dst_addr != ribmgr_address()) {
- LOG_DBG("PDU needs to be forwarded.");
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
- if (pci->ttl == 0) {
- LOG_DBG("TTL was zero.");
+ if (pci->dst_addr != ribmgr_address()) {
+ LOG_DBG("PDU needs to be forwarded.");
+
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
ipcp_flow_del(sdb);
free(pci);
continue;
}
- if (shm_pci_dec_ttl(sdb)) {
- LOG_ERR("Failed to decrease TTL.");
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
ipcp_flow_del(sdb);
free(pci);
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
-
- if (shm_pci_shrink(sdb)) {
- LOG_ERR("Failed to shrink PDU.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
- if (frct_nm1_post_sdu(pci, sdb)) {
- LOG_ERR("Failed to hand PDU to FRCT.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
+ if (frct_nm1_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
}
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 399d3dc8..db258c8b 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -30,6 +30,8 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/fqueue.h>
#define OUROBOROS_PREFIX "ipcpd/shim-eth-llc"
@@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \
+ SHIM_ETH_LLC_MAX_SDU_SIZE)
+#define EVENT_WAIT_TIMEOUT 100 /* us */
+
/* global for trapping signal */
int irmd_api;
@@ -110,6 +114,7 @@ struct {
uint8_t * tx_ring;
int tx_offset;
#endif
+ flow_set_t * np1_flows;
int * ef_to_fd;
struct ef * fd_to_ef;
pthread_rwlock_t flows_lock;
@@ -139,6 +144,14 @@ static int eth_llc_data_init()
return -ENOMEM;
}
+ eth_llc_data.np1_flows = flow_set_create();
+ if (eth_llc_data.np1_flows == NULL) {
+ bmp_destroy(eth_llc_data.saps);
+ free(eth_llc_data.ef_to_fd);
+ free(eth_llc_data.fd_to_ef);
+ return -ENOMEM;
+ }
+
for (i = 0; i < MAX_SAPS; ++i)
eth_llc_data.ef_to_fd[i] = -1;
@@ -156,6 +169,7 @@ static int eth_llc_data_init()
void eth_llc_data_fini()
{
bmp_destroy(eth_llc_data.saps);
+ flow_set_destroy(eth_llc_data.np1_flows);
free(eth_llc_data.fd_to_ef);
free(eth_llc_data.ef_to_fd);
pthread_rwlock_destroy(&eth_llc_data.flows_lock);
@@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)
return 0;
}
- bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
-
pthread_rwlock_unlock(&eth_llc_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
-
- LOG_DBG("Flow with fd %d deallocated.", fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
return 0;
}
static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)
{
- shim_eth_llc_msg_t * msg = NULL;
-
- msg = shim_eth_llc_msg__unpack(NULL, len, buf);
+ shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf);
if (msg == NULL) {
LOG_ERR("Failed to unpack.");
return -1;
@@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
static void * eth_llc_ipcp_sdu_writer(void * o)
{
+ int fd;
+ struct shm_du_buff * sdb;
+ uint8_t ssap;
+ uint8_t dsap;
+ uint8_t r_addr[MAC_SIZE];
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
+
while (true) {
- int fd;
- struct shm_du_buff * sdb;
- uint8_t ssap;
- uint8_t dsap;
- uint8_t r_addr[MAC_SIZE];
-
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
+ continue;
+ }
- ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
- dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
- memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
+ dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
+ memcpy(r_addr,
+ eth_llc_data.fd_to_ef[fd].r_addr,
+ MAC_SIZE);
+
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb)
- - shm_du_buff_head(sdb));
- ipcp_flow_del(sdb);
+ eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb)
+ - shm_du_buff_head(sdb));
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd,
uint8_t ssap = 0;
uint8_t r_addr[MAC_SIZE];
- LOG_INFO("Allocating flow to %s.", dst_name);
+ LOG_DBG("Allocating flow to %s.", dst_name);
if (dst_name == NULL || src_ae_name == NULL)
return -1;
@@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd,
return -1;
}
+ flow_set_add(eth_llc_data.np1_flows, fd);
+
LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap);
return 0;
@@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
return -1;
}
+ flow_set_add(eth_llc_data.np1_flows, fd);
+
LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);
return 0;
@@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
static int eth_llc_ipcp_flow_dealloc(int fd)
{
+ struct timespec t = {0, 10000};
+
uint8_t sap;
uint8_t r_sap;
uint8_t addr[MAC_SIZE];
int ret;
+ flow_set_del(eth_llc_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
@@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd)
if (ret < 0)
LOG_DBG("Could not notify remote.");
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1008,10 +1042,12 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- if (eth_llc_data_init() < 0)
+ if (ap_init(NULL) < 0) {
+ close_logfile();
exit(EXIT_FAILURE);
+ }
- if (ap_init(NULL) < 0) {
+ if (eth_llc_data_init() < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
@@ -1054,10 +1090,10 @@ int main(int argc, char * argv[])
pthread_join(eth_llc_data.sdu_writer, NULL);
pthread_join(eth_llc_data.sdu_reader, NULL);
- ap_fini();
-
eth_llc_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 7c109a8a..050623e4 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -27,6 +27,9 @@
#include <ouroboros/utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -75,6 +78,7 @@ struct {
struct sockaddr_in s_saddr;
int s_fd;
+ flow_set_t * np1_flows;
fd_set flow_fd_s;
/* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
int uf_to_fd[FD_SETSIZE];
@@ -90,7 +94,7 @@ struct {
pthread_mutex_t fd_set_lock;
} udp_data;
-static void udp_data_init()
+static int udp_data_init()
{
int i;
@@ -104,13 +108,21 @@ static void udp_data_init()
FD_ZERO(&udp_data.flow_fd_s);
+ udp_data.np1_flows = flow_set_create();
+ if (udp_data.np1_flows == NULL)
+ return -ENOMEM;
+
pthread_rwlock_init(&udp_data.flows_lock, NULL);
pthread_cond_init(&udp_data.fd_set_cond, NULL);
pthread_mutex_init(&udp_data.fd_set_lock, NULL);
+
+ return 0;
}
static void udp_data_fini()
{
+ flow_set_destroy(udp_data.np1_flows);
+
pthread_rwlock_destroy(&udp_data.flows_lock);
pthread_mutex_destroy(&udp_data.fd_set_lock);
pthread_cond_destroy(&udp_data.fd_set_cond);
@@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
close(skfd);
@@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()
static void * ipcp_udp_sdu_loop(void * o)
{
+ int fd;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct shm_du_buff * sdb;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd;
- struct shm_du_buff * sdb;
+ int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
continue;
+ }
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
- fd = udp_data.fd_to_uf[fd].skfd;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ fd = udp_data.fd_to_uf[fd].skfd;
+
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (send(fd,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- 0) < 0)
- LOG_ERR("Failed to send SDU.");
+ if (send(fd,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ 0) < 0)
+ LOG_ERR("Failed to send SDU.");
- ipcp_flow_del(sdb);
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd,
udp_data.fd_to_uf[fd].skfd = skfd;
udp_data.uf_to_fd[skfd] = fd;
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
set_fd(skfd);
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
int remote_udp = -1;
+ struct timespec t = {0, 10000};
struct sockaddr_in r_saddr;
socklen_t r_saddr_len = sizeof(r_saddr);
+ flow_set_del(udp_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)
close(skfd);
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- udp_data_init();
-
if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
+ if (udp_data_init() < 0) {
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
/* store the process id of the irmd */
irmd_api = atoi(argv[1]);
@@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])
pthread_join(udp_data.handler, NULL);
pthread_join(udp_data.sdu_reader, NULL);
- ap_fini();
-
udp_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);