summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-21 12:44:00 +0000
committerSander Vrijders <[email protected]>2016-10-21 12:44:00 +0000
commit482c44232d4deda3f89a7d85fbad99c1c64e80ec (patch)
treef3fb790d93da3cbe198b0f0c58d9c7513b0eff23 /src/ipcpd
parent680017a72c7a15b90f223bafcea80fd3e264e984 (diff)
parent02976060919566d1a217b818ca8f33297700d56d (diff)
downloadouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.tar.gz
ouroboros-482c44232d4deda3f89a7d85fbad99c1c64e80ec.zip
Merged in dstaesse/ouroboros/be-demux (pull request #267)
lib: Demultiplex the fast path
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/ipcp.c6
-rw-r--r--src/ipcpd/local/main.c68
-rw-r--r--src/ipcpd/normal/fmgr.c143
-rw-r--r--src/ipcpd/shim-eth-llc/main.c106
-rw-r--r--src/ipcpd/shim-udp/main.c84
5 files changed, 264 insertions, 143 deletions
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index a9f80ee7..f9246c7a 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o)
ret_msg.has_result = true;
ret_msg.result =
ipcpi.ops->ipcp_flow_alloc(fd,
- msg->dst_name,
- msg->src_ae_name,
- msg->qos_cube);
+ msg->dst_name,
+ msg->src_ae_name,
+ msg->qos_cube);
if (ret_msg.result < 0) {
LOG_DBG("Deallocate failed on port_id %d.",
msg->port_id);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 4e500a8a..68c9ae8c 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -25,7 +25,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/fcntl.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
@@ -39,6 +39,7 @@
#include <sys/wait.h>
#include <fcntl.h>
+#define EVENT_WAIT_TIMEOUT 100 /* us */
#define THIS_TYPE IPCP_LOCAL
/* global for trapping signal */
@@ -46,18 +47,25 @@ int irmd_api;
struct {
int in_out[IRMD_MAX_FLOWS];
+ flow_set_t * flows;
pthread_rwlock_t lock;
pthread_t sduloop;
} local_data;
-void local_data_init()
+int local_data_init()
{
int i;
for (i = 0; i < IRMD_MAX_FLOWS; ++i)
local_data.in_out[i] = -1;
+ local_data.flows = flow_set_create();
+ if (local_data.flows == NULL)
+ return -ENFILE;
+
pthread_rwlock_init(&local_data.lock, NULL);
+
+ return 0;
}
void local_data_fini()
@@ -67,11 +75,24 @@ void local_data_fini()
static void * ipcp_local_sdu_loop(void * o)
{
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
+
while (true) {
int fd;
- struct rb_entry * e;
+ int ret;
+ ssize_t idx;
+
+ ret = flow_event_wait(local_data.flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = flow_select(NULL, NULL);
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
+ continue;
+ }
pthread_rwlock_rdlock(&ipcpi.state_lock);
@@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o)
pthread_rwlock_rdlock(&local_data.lock);
- e = local_flow_read(fd);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ idx = local_flow_read(fd);
- fd = local_data.in_out[fd];
+ fd = local_data.in_out[fd];
- if (fd != -1)
- local_flow_write(fd, e);
+ if (fd != -1)
+ local_flow_write(fd, idx);
+ }
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
-
- free(e);
}
- return (void *) 1;
+ return (void *) 0;
}
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
@@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name)
if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Failed to add %s to local registry.", name);
+ LOG_DBG("Failed to add %s to local registry.", name);
return -1;
}
@@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd,
if (ipcp_get_state() != IPCP_ENROLLED) {
pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Won't register with non-enrolled IPCP.");
+ LOG_DBG("Won't register with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
pthread_rwlock_wrlock(&local_data.lock);
+ flow_set_add(local_data.flows, fd);
+
out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
local_data.in_out[fd] = out_fd;
@@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return 0;
pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&local_data.lock);
out_fd = local_data.in_out[fd];
if (out_fd < 0) {
@@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)
return -1;
}
+ flow_set_add(local_data.flows, fd);
+
+ pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
@@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd)
if (fd < 0)
return -EINVAL;
+ flow_set_del(local_data.flows, fd);
+
while (flow_dealloc(fd) == -EBUSY)
nanosleep(&t, NULL);
@@ -289,9 +318,14 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- local_data_init();
-
if (ap_init(NULL) < 0) {
+ LOG_ERR("Failed to init application.");
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
+ if (local_data_init() < 0) {
+ LOG_ERR("Failed to init local data.");
close_logfile();
exit(EXIT_FAILURE);
}
@@ -331,10 +365,10 @@ int main(int argc, char * argv[])
pthread_cancel(local_data.sduloop);
pthread_join(local_data.sduloop, NULL);
- ap_fini();
-
local_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index cb25072e..3da392c5 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -27,7 +27,7 @@
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
#include <ouroboros/ipcp-dev.h>
-#include <ouroboros/select.h>
+#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
#include <stdlib.h>
@@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)
struct shm_du_buff * sdb;
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct np1_flow * flow;
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.np1_set, &timeout);
- if (fd == -ETIMEDOUT)
+ int ret = flow_event_wait(fmgr.np1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
- flow = fmgr.np1_flows[fd];
- if (flow == NULL) {
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to retrieve flow.");
- continue;
- }
+ pthread_rwlock_rdlock(&fmgr.np1_flows_lock);
+
+ flow = fmgr.np1_flows[fd];
+ if (flow == NULL) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to retrieve flow.");
+ continue;
+ }
+
+ if (frct_i_write_sdu(flow->cep_id, sdb)) {
+ pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ ipcp_flow_del(sdb);
+ LOG_ERR("Failed to hand SDU to FRCT.");
+ continue;
+ }
- if (frct_i_write_sdu(flow->cep_id, sdb)) {
pthread_rwlock_unlock(&fmgr.np1_flows_lock);
- ipcp_flow_del(sdb);
- LOG_ERR("Failed to hand SDU to FRCT.");
- continue;
- }
- pthread_rwlock_unlock(&fmgr.np1_flows_lock);
+ }
}
return (void *) 0;
@@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)
struct timespec timeout = {0, FD_UPDATE_TIMEOUT};
struct shm_du_buff * sdb;
struct pci * pci;
-
+ int fd;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd = flow_select(fmgr.nm1_set, &timeout);
- if (fd == -ETIMEDOUT)
- continue;
-
- if (fd < 0) {
- LOG_ERR("Failed to get active fd.");
+ int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- }
- if (ipcp_flow_read(fd, &sdb)) {
- LOG_ERR("Failed to read SDU from fd %d.", fd);
+ if (ret < 0) {
+ LOG_ERR("Event error: %d.", ret);
continue;
}
- pci = shm_pci_des(sdb);
- if (pci == NULL) {
- LOG_ERR("Failed to get PCI.");
- ipcp_flow_del(sdb);
- continue;
- }
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Failed to read SDU from fd %d.", fd);
+ continue;
+ }
- if (pci->dst_addr != ribmgr_address()) {
- LOG_DBG("PDU needs to be forwarded.");
+ pci = shm_pci_des(sdb);
+ if (pci == NULL) {
+ LOG_ERR("Failed to get PCI.");
+ ipcp_flow_del(sdb);
+ continue;
+ }
- if (pci->ttl == 0) {
- LOG_DBG("TTL was zero.");
+ if (pci->dst_addr != ribmgr_address()) {
+ LOG_DBG("PDU needs to be forwarded.");
+
+ if (pci->ttl == 0) {
+ LOG_DBG("TTL was zero.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+
+ if (shm_pci_dec_ttl(sdb)) {
+ LOG_ERR("Failed to decrease TTL.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
+ /*
+ * FIXME: Dropping for now, since
+ * we don't have a PFF yet
+ */
ipcp_flow_del(sdb);
free(pci);
continue;
}
- if (shm_pci_dec_ttl(sdb)) {
- LOG_ERR("Failed to decrease TTL.");
+ if (shm_pci_shrink(sdb)) {
+ LOG_ERR("Failed to shrink PDU.");
ipcp_flow_del(sdb);
free(pci);
continue;
}
- /*
- * FIXME: Dropping for now, since
- * we don't have a PFF yet
- */
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
-
- if (shm_pci_shrink(sdb)) {
- LOG_ERR("Failed to shrink PDU.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
- }
- if (frct_nm1_post_sdu(pci, sdb)) {
- LOG_ERR("Failed to hand PDU to FRCT.");
- ipcp_flow_del(sdb);
- free(pci);
- continue;
+ if (frct_nm1_post_sdu(pci, sdb)) {
+ LOG_ERR("Failed to hand PDU to FRCT.");
+ ipcp_flow_del(sdb);
+ free(pci);
+ continue;
+ }
}
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index 399d3dc8..db258c8b 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -30,6 +30,8 @@
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/fqueue.h>
#define OUROBOROS_PREFIX "ipcpd/shim-eth-llc"
@@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;
#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \
+ SHIM_ETH_LLC_MAX_SDU_SIZE)
+#define EVENT_WAIT_TIMEOUT 100 /* us */
+
/* global for trapping signal */
int irmd_api;
@@ -110,6 +114,7 @@ struct {
uint8_t * tx_ring;
int tx_offset;
#endif
+ flow_set_t * np1_flows;
int * ef_to_fd;
struct ef * fd_to_ef;
pthread_rwlock_t flows_lock;
@@ -139,6 +144,14 @@ static int eth_llc_data_init()
return -ENOMEM;
}
+ eth_llc_data.np1_flows = flow_set_create();
+ if (eth_llc_data.np1_flows == NULL) {
+ bmp_destroy(eth_llc_data.saps);
+ free(eth_llc_data.ef_to_fd);
+ free(eth_llc_data.fd_to_ef);
+ return -ENOMEM;
+ }
+
for (i = 0; i < MAX_SAPS; ++i)
eth_llc_data.ef_to_fd[i] = -1;
@@ -156,6 +169,7 @@ static int eth_llc_data_init()
void eth_llc_data_fini()
{
bmp_destroy(eth_llc_data.saps);
+ flow_set_destroy(eth_llc_data.np1_flows);
free(eth_llc_data.fd_to_ef);
free(eth_llc_data.ef_to_fd);
pthread_rwlock_destroy(&eth_llc_data.flows_lock);
@@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)
return 0;
}
- bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
-
pthread_rwlock_unlock(&eth_llc_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
-
- LOG_DBG("Flow with fd %d deallocated.", fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
return 0;
}
static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)
{
- shim_eth_llc_msg_t * msg = NULL;
-
- msg = shim_eth_llc_msg__unpack(NULL, len, buf);
+ shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf);
if (msg == NULL) {
LOG_ERR("Failed to unpack.");
return -1;
@@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
static void * eth_llc_ipcp_sdu_writer(void * o)
{
+ int fd;
+ struct shm_du_buff * sdb;
+ uint8_t ssap;
+ uint8_t dsap;
+ uint8_t r_addr[MAC_SIZE];
+ struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000};
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
+
while (true) {
- int fd;
- struct shm_du_buff * sdb;
- uint8_t ssap;
- uint8_t dsap;
- uint8_t r_addr[MAC_SIZE];
-
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
continue;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
+ continue;
+ }
- ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
- dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
- memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
+ dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
+ memcpy(r_addr,
+ eth_llc_data.fd_to_ef[fd].r_addr,
+ MAC_SIZE);
+
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb)
- - shm_du_buff_head(sdb));
- ipcp_flow_del(sdb);
+ eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb)
+ - shm_du_buff_head(sdb));
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd,
uint8_t ssap = 0;
uint8_t r_addr[MAC_SIZE];
- LOG_INFO("Allocating flow to %s.", dst_name);
+ LOG_DBG("Allocating flow to %s.", dst_name);
if (dst_name == NULL || src_ae_name == NULL)
return -1;
@@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd,
return -1;
}
+ flow_set_add(eth_llc_data.np1_flows, fd);
+
LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap);
return 0;
@@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
return -1;
}
+ flow_set_add(eth_llc_data.np1_flows, fd);
+
LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);
return 0;
@@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
static int eth_llc_ipcp_flow_dealloc(int fd)
{
+ struct timespec t = {0, 10000};
+
uint8_t sap;
uint8_t r_sap;
uint8_t addr[MAC_SIZE];
int ret;
+ flow_set_del(eth_llc_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
@@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd)
if (ret < 0)
LOG_DBG("Could not notify remote.");
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1008,10 +1042,12 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- if (eth_llc_data_init() < 0)
+ if (ap_init(NULL) < 0) {
+ close_logfile();
exit(EXIT_FAILURE);
+ }
- if (ap_init(NULL) < 0) {
+ if (eth_llc_data_init() < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
@@ -1054,10 +1090,10 @@ int main(int argc, char * argv[])
pthread_join(eth_llc_data.sdu_writer, NULL);
pthread_join(eth_llc_data.sdu_reader, NULL);
- ap_fini();
-
eth_llc_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 7c109a8a..050623e4 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -27,6 +27,9 @@
#include <ouroboros/utils.h>
#include <ouroboros/dev.h>
#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/fqueue.h>
+#include <ouroboros/fcntl.h>
+#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -75,6 +78,7 @@ struct {
struct sockaddr_in s_saddr;
int s_fd;
+ flow_set_t * np1_flows;
fd_set flow_fd_s;
/* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
int uf_to_fd[FD_SETSIZE];
@@ -90,7 +94,7 @@ struct {
pthread_mutex_t fd_set_lock;
} udp_data;
-static void udp_data_init()
+static int udp_data_init()
{
int i;
@@ -104,13 +108,21 @@ static void udp_data_init()
FD_ZERO(&udp_data.flow_fd_s);
+ udp_data.np1_flows = flow_set_create();
+ if (udp_data.np1_flows == NULL)
+ return -ENOMEM;
+
pthread_rwlock_init(&udp_data.flows_lock, NULL);
pthread_cond_init(&udp_data.fd_set_cond, NULL);
pthread_mutex_init(&udp_data.fd_set_lock, NULL);
+
+ return 0;
}
static void udp_data_fini()
{
+ flow_set_destroy(udp_data.np1_flows);
+
pthread_rwlock_destroy(&udp_data.flows_lock);
pthread_mutex_destroy(&udp_data.fd_set_lock);
pthread_cond_destroy(&udp_data.fd_set_cond);
@@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
- flow_dealloc(fd);
+ flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);
close(skfd);
@@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()
static void * ipcp_udp_sdu_loop(void * o)
{
+ int fd;
+ struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000};
+ struct shm_du_buff * sdb;
+ fqueue_t * fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) 1;
while (true) {
- int fd;
- struct shm_du_buff * sdb;
+ int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout);
+ if (ret == -ETIMEDOUT)
+ continue;
- fd = ipcp_read_shim(&sdb);
- if (fd < 0)
+ if (ret < 0) {
+ LOG_ERR("Event wait returned error code %d.", -ret);
continue;
+ }
- pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&udp_data.flows_lock);
+ while ((fd = fqueue_next(fq)) >= 0) {
+ if (ipcp_flow_read(fd, &sdb)) {
+ LOG_ERR("Bad read from fd %d.", fd);
+ continue;
+ }
- fd = udp_data.fd_to_uf[fd].skfd;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&udp_data.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ fd = udp_data.fd_to_uf[fd].skfd;
+
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (send(fd,
- shm_du_buff_head(sdb),
- shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
- 0) < 0)
- LOG_ERR("Failed to send SDU.");
+ if (send(fd,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ 0) < 0)
+ LOG_ERR("Failed to send SDU.");
- ipcp_flow_del(sdb);
+ ipcp_flow_del(sdb);
+ }
}
return (void *) 1;
@@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd,
udp_data.fd_to_uf[fd].skfd = skfd;
udp_data.uf_to_fd[skfd] = fd;
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)
set_fd(skfd);
+ flow_set_add(udp_data.np1_flows, fd);
+
pthread_rwlock_unlock(&udp_data.flows_lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)
{
int skfd = -1;
int remote_udp = -1;
+ struct timespec t = {0, 10000};
struct sockaddr_in r_saddr;
socklen_t r_saddr_len = sizeof(r_saddr);
+ flow_set_del(udp_data.np1_flows, fd);
+
+ while (flow_dealloc(fd) == -EBUSY)
+ nanosleep(&t, NULL);
+
pthread_rwlock_rdlock(&ipcpi.state_lock);
pthread_rwlock_wrlock(&udp_data.flows_lock);
@@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)
close(skfd);
- flow_dealloc(fd);
-
LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
@@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- udp_data_init();
-
if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
+ if (udp_data_init() < 0) {
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
/* store the process id of the irmd */
irmd_api = atoi(argv[1]);
@@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])
pthread_join(udp_data.handler, NULL);
pthread_join(udp_data.sdu_reader, NULL);
- ap_fini();
-
udp_data_fini();
+ ap_fini();
+
close_logfile();
exit(EXIT_SUCCESS);