summaryrefslogtreecommitdiff
path: root/src/ipcpd/eth
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2018-10-03 01:13:43 +0200
committerSander Vrijders <[email protected]>2018-10-03 09:14:57 +0200
commite00181b492d573ecd0621f55d9ad24f134c09d4c (patch)
treef7c1b1f1df892bd3766060419d34cd1363a6eedd /src/ipcpd/eth
parentee73b781c1e13daee67e149f1828d7166e5ea627 (diff)
downloadouroboros-e00181b492d573ecd0621f55d9ad24f134c09d4c.tar.gz
ouroboros-e00181b492d573ecd0621f55d9ad24f134c09d4c.zip
ipcpd: Add multithreading to Ethernet IPCP
This adds multiple reader and writer threads, configurabe via cmake with IPCP_ETH_RD_THR and IPCP_ETH_WR_THR. Improves ethernet IPCP throughput, which looks to be limited by the raw socket calls. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/ipcpd/eth')
-rw-r--r--src/ipcpd/eth/CMakeLists.txt5
-rw-r--r--src/ipcpd/eth/eth.c87
2 files changed, 61 insertions, 31 deletions
diff --git a/src/ipcpd/eth/CMakeLists.txt b/src/ipcpd/eth/CMakeLists.txt
index 6b8d1a77..7bad6ac0 100644
--- a/src/ipcpd/eth/CMakeLists.txt
+++ b/src/ipcpd/eth/CMakeLists.txt
@@ -77,6 +77,11 @@ endif ()
if (HAVE_ETH)
message(STATUS "Supported raw packet API found, building eth-llc and eth-dix")
+ set(IPCP_ETH_RD_THR 3 CACHE STRING
+ "Number of reader threads in Ethernet IPCP")
+ set(IPCP_ETH_WR_THR 3 CACHE STRING
+ "Number of writer threads in Ethernet IPCP")
+
set(ETH_LLC_SOURCES
# Add source files here
${CMAKE_CURRENT_SOURCE_DIR}/llc.c
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 04debfd1..af34f68e 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -215,11 +215,10 @@ struct {
#endif
struct ef * fd_to_ef;
fset_t * np1_flows;
- fqueue_t * fq;
pthread_rwlock_t flows_lock;
- pthread_t sdu_writer;
- pthread_t sdu_reader;
+ pthread_t sdu_writer[IPCP_ETH_WR_THR];
+ pthread_t sdu_reader[IPCP_ETH_RD_THR];
#ifdef __linux__
pthread_t if_monitor;
@@ -260,10 +259,6 @@ static int eth_data_init(void)
if (eth_data.np1_flows == NULL)
goto fail_np1_flows;
- eth_data.fq = fqueue_create();
- if (eth_data.fq == NULL)
- goto fail_fq;
-
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
#if defined(BUILD_ETH_DIX)
eth_data.fd_to_ef[i].r_eid = -1;
@@ -311,8 +306,6 @@ static int eth_data_init(void)
fail_flows_lock:
shim_data_destroy(eth_data.shim_data);
fail_shim_data:
- fqueue_destroy(eth_data.fq);
- fail_fq:
fset_destroy(eth_data.np1_flows);
fail_np1_flows:
#ifdef BUILD_ETH_LLC
@@ -339,7 +332,6 @@ static void eth_data_fini(void)
pthread_mutex_destroy(&eth_data.mgmt_lock);
pthread_rwlock_destroy(&eth_data.flows_lock);
shim_data_destroy(eth_data.shim_data);
- fqueue_destroy(eth_data.fq);
fset_destroy(eth_data.np1_flows);
#ifdef BUILD_ETH_LLC
bmp_destroy(eth_data.saps);
@@ -964,6 +956,11 @@ static void * eth_ipcp_sdu_reader(void * o)
return (void *) 0;
}
+static void cleanup_writer(void * o)
+{
+ fqueue_destroy((fqueue_t *) o);
+}
+
static void * eth_ipcp_sdu_writer(void * o)
{
int fd;
@@ -977,15 +974,22 @@ static void * eth_ipcp_sdu_writer(void * o)
#endif
uint8_t r_addr[MAC_SIZE];
+ fqueue_t * fq;
+
+ fq = fqueue_create();
+ if (fq == NULL)
+ return (void *) -1;
+
(void) o;
+ pthread_cleanup_push(cleanup_writer, fq);
+
ipcp_lock_to_core();
while (true) {
- fevent(eth_data.np1_flows, eth_data.fq, NULL);
-
+ fevent(eth_data.np1_flows, fq, NULL);
pthread_rwlock_rdlock(&eth_data.flows_lock);
- while ((fd = fqueue_next(eth_data.fq)) >= 0) {
+ while ((fd = fqueue_next(fq)) >= 0) {
if (ipcp_flow_read(fd, &sdb)) {
log_dbg("Bad read from fd %d.", fd);
continue;
@@ -1021,6 +1025,8 @@ static void * eth_ipcp_sdu_writer(void * o)
pthread_rwlock_unlock(&eth_data.flows_lock);
}
+ pthread_cleanup_pop(true);
+
return (void *) 1;
}
@@ -1382,20 +1388,24 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
goto fail_mgmt_handler;
}
- if (pthread_create(&eth_data.sdu_reader,
- NULL,
- eth_ipcp_sdu_reader,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_sdu_reader;
+ for (idx = 0; idx < IPCP_ETH_RD_THR; ++idx) {
+ if (pthread_create(&eth_data.sdu_reader[idx],
+ NULL,
+ eth_ipcp_sdu_reader,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_reader;
+ }
}
- if (pthread_create(&eth_data.sdu_writer,
- NULL,
- eth_ipcp_sdu_writer,
- NULL)) {
- ipcp_set_state(IPCP_INIT);
- goto fail_sdu_writer;
+ for (idx = 0; idx < IPCP_ETH_WR_THR; ++idx) {
+ if (pthread_create(&eth_data.sdu_writer[idx],
+ NULL,
+ eth_ipcp_sdu_writer,
+ NULL)) {
+ ipcp_set_state(IPCP_INIT);
+ goto fail_sdu_writer;
+ }
}
#if defined(BUILD_ETH_DIX)
@@ -1409,9 +1419,16 @@ static int eth_ipcp_bootstrap(const struct ipcp_config * conf)
return 0;
fail_sdu_writer:
- pthread_cancel(eth_data.sdu_reader);
- pthread_join(eth_data.sdu_reader, NULL);
+ while (idx > 0) {
+ pthread_cancel(eth_data.sdu_writer[--idx]);
+ pthread_join(eth_data.sdu_writer[idx], NULL);
+ }
+ idx = IPCP_ETH_RD_THR;
fail_sdu_reader:
+ while (idx > 0) {
+ pthread_cancel(eth_data.sdu_reader[--idx]);
+ pthread_join(eth_data.sdu_reader[idx], NULL);
+ }
pthread_cancel(eth_data.mgmt_handler);
pthread_join(eth_data.mgmt_handler, NULL);
fail_mgmt_handler:
@@ -1697,6 +1714,8 @@ static struct ipcp_ops eth_ops = {
int main(int argc,
char * argv[])
{
+ int i;
+
if (ipcp_init(argc, argv, &eth_ops) < 0)
goto fail_init;
@@ -1723,14 +1742,20 @@ int main(int argc,
ipcp_shutdown();
if (ipcp_get_state() == IPCP_SHUTDOWN) {
- pthread_cancel(eth_data.sdu_writer);
- pthread_cancel(eth_data.sdu_reader);
+ for (i = 0; i < IPCP_ETH_WR_THR; ++i)
+ pthread_cancel(eth_data.sdu_writer[i]);
+ for (i = 0; i < IPCP_ETH_RD_THR; ++i)
+ pthread_cancel(eth_data.sdu_reader[i]);
+
pthread_cancel(eth_data.mgmt_handler);
#ifdef __linux__
pthread_cancel(eth_data.if_monitor);
#endif
- pthread_join(eth_data.sdu_writer, NULL);
- pthread_join(eth_data.sdu_reader, NULL);
+ for (i = 0; i < IPCP_ETH_WR_THR; ++i)
+ pthread_join(eth_data.sdu_writer[i], NULL);
+ for (i = 0; i < IPCP_ETH_RD_THR; ++i)
+ pthread_join(eth_data.sdu_reader[i], NULL);
+
pthread_join(eth_data.mgmt_handler, NULL);
#ifdef __linux__
pthread_join(eth_data.if_monitor, NULL);