summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-26 14:59:43 +0200
committerSander Vrijders <[email protected]>2016-10-26 15:10:28 +0200
commit188aba280f7c5b80b868cb1527fce9d45702a196 (patch)
tree5a0852a4832cc853d45e96f0087e7724c8b205a1
parentcc64e52dee3559128293a17a669e94acb48f9309 (diff)
downloadouroboros-188aba280f7c5b80b868cb1527fce9d45702a196.tar.gz
ouroboros-188aba280f7c5b80b868cb1527fce9d45702a196.zip
ipcpd: Add threadpool for main loop
This adds a threadpool for the main loop of the IPCPs. Before there was a single thread handling each request, which could result in starvation since performing name queries at the same time as enrolling a normal IPCP was impossible.
-rw-r--r--include/ouroboros/config.h.in1
-rw-r--r--src/ipcpd/ipcp.c76
-rw-r--r--src/ipcpd/ipcp.h4
-rw-r--r--src/lib/dev.c3
4 files changed, 51 insertions, 33 deletions
diff --git a/include/ouroboros/config.h.in b/include/ouroboros/config.h.in
index 6ffcb97f..122899f3 100644
--- a/include/ouroboros/config.h.in
+++ b/include/ouroboros/config.h.in
@@ -48,6 +48,7 @@
#define SHM_FLOW_SET_PREFIX "/ouroboros.sets."
#define IRMD_MAX_FLOWS 4096
#define IRMD_THREADPOOL_SIZE 5
+#define IPCPD_THREADPOOL_SIZE 3
#define LOG_DIR "/@LOG_DIR@/"
#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
/* Timeout values */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 90fb94ef..694db7cf 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -41,15 +41,45 @@
int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops)
{
pthread_condattr_t cattr;
+ int t;
+
+ struct timeval tv = {(IPCP_ACCEPT_TIMEOUT / 1000),
+ (IPCP_ACCEPT_TIMEOUT % 1000) * 1000};
ipcpi.irmd_fd = -1;
ipcpi.state = IPCP_INIT;
+ ipcpi.threadpool = malloc(sizeof(pthread_t) * IPCPD_THREADPOOL_SIZE);
+ if (ipcpi.threadpool == NULL) {
+ return -ENOMEM;
+ }
+
+ ipcpi.sock_path = ipcp_sock_path(getpid());
+ if (ipcpi.sock_path == NULL) {
+ free(ipcpi.threadpool);
+ return -1;
+ }
+
+ ipcpi.sockfd = server_socket_open(ipcpi.sock_path);
+ if (ipcpi.sockfd < 0) {
+ LOG_ERR("Could not open server socket.");
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
+ return -1;
+ }
+
+ if (setsockopt(ipcpi.sockfd, SOL_SOCKET, SO_RCVTIMEO,
+ (void *) &tv, sizeof(tv)))
+ LOG_WARN("Failed to set timeout on socket.");
+
ipcpi.ops = ops;
ipcpi.data = ipcp_data_create();
- if (ipcpi.data == NULL)
+ if (ipcpi.data == NULL) {
+ free(ipcpi.threadpool);
+ free(ipcpi.sock_path);
return -ENOMEM;
+ }
ipcp_data_init(ipcpi.data, type);
@@ -61,14 +91,26 @@ int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops)
#endif
pthread_cond_init(&ipcpi.state_cond, &cattr);
- pthread_create(&ipcpi.mainloop, NULL, ipcp_main_loop, NULL);
+ for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t)
+ pthread_create(&ipcpi.threadpool[t], NULL,
+ ipcp_main_loop, NULL);
return 0;
}
void ipcp_fini()
{
- pthread_join(ipcpi.mainloop, NULL);
+ int t;
+
+ for (t = 0; t < IPCPD_THREADPOOL_SIZE; ++t)
+ pthread_join(ipcpi.threadpool[t], NULL);
+
+ close(ipcpi.sockfd);
+ if (unlink(ipcpi.sock_path))
+ LOG_DBG("Could not unlink %s.", ipcpi.sock_path);
+
+ free(ipcpi.sock_path);
+ free(ipcpi.threadpool);
ipcp_data_destroy(ipcpi.data);
pthread_cond_destroy(&ipcpi.state_cond);
@@ -171,7 +213,6 @@ int ipcp_parse_arg(int argc, char * argv[])
void * ipcp_main_loop(void * o)
{
int lsockfd;
- int sockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
ipcp_msg_t * msg;
@@ -182,32 +223,13 @@ void * ipcp_main_loop(void * o)
dif_config_msg_t * conf_msg;
struct dif_config conf;
- char * sock_path;
char * msg_name_dup;
- struct timeval tv = {(IPCP_ACCEPT_TIMEOUT / 1000),
- (IPCP_ACCEPT_TIMEOUT % 1000) * 1000};
-
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
(void) o;
- sock_path = ipcp_sock_path(getpid());
- if (sock_path == NULL)
- return (void *) 1;
-
- sockfd = server_socket_open(sock_path);
- if (sockfd < 0) {
- LOG_ERR("Could not open server socket.");
- free(sock_path);
- return (void *) 1;
- }
-
- if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
- (void *) &tv, sizeof(tv)))
- LOG_WARN("Failed to set timeout on socket.");
-
while (true) {
int fd = -1;
@@ -221,7 +243,7 @@ void * ipcp_main_loop(void * o)
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
- lsockfd = accept(sockfd, 0, 0);
+ lsockfd = accept(ipcpi.sockfd, 0, 0);
if (lsockfd < 0)
continue;
@@ -416,11 +438,5 @@ void * ipcp_main_loop(void * o)
close(lsockfd);
}
- close(sockfd);
- if (unlink(sock_path))
- LOG_DBG("Could not unlink %s.", sock_path);
-
- free(sock_path);
-
return (void *) 0;
}
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 18a5bdab..c89fe438 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -50,7 +50,9 @@ struct ipcp {
pthread_mutex_t state_mtx;
pthread_cond_t state_cond;
- pthread_t mainloop;
+ int sockfd;
+ char * sock_path;
+ pthread_t * threadpool;
} ipcpi;
int ipcp_init(enum ipcp_type type,
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 94fbd394..55ee7572 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -489,9 +489,8 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL) {
+ if (recv_msg == NULL)
return -1;
- }
if (!recv_msg->has_api || !recv_msg->has_port_id) {
irm_msg__free_unpacked(recv_msg, NULL);