summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-05-15 22:42:47 +0200
committerdimitri staessens <[email protected]>2016-05-15 22:42:47 +0200
commit3f5b31d49139968a84c42d5d3067d06edefa3aae (patch)
treef860fa136bf8fcf4ee89881ac41bd39f6c7cd311
parent8f79d80e7fe7f52f310edddc73589f4f71457747 (diff)
downloadouroboros-3f5b31d49139968a84c42d5d3067d06edefa3aae.tar.gz
ouroboros-3f5b31d49139968a84c42d5d3067d06edefa3aae.zip
ipcpd: shim-udp: Revised locking
Simplified locking to take only two locks: the first lock guards the state of the ipcp. This lock must be held for writing on bootstrap and closing, and held for reading during all other operations. The second lock guards operations on flows, and must be held for writing during allocation and deallocation, and held for reading when sending sdu's. After adding a fd to FD_SET, the shim will wait for 1 ms to ensure that the FD is added to the select call.
-rw-r--r--src/ipcpd/ipcp.h2
-rw-r--r--src/ipcpd/shim-udp/main.c191
2 files changed, 97 insertions, 96 deletions
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index c9002d4d..70da0675 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -30,8 +30,6 @@
enum ipcp_state {
IPCP_INIT = 0,
- IPCP_ENROLLING,
- IPCP_BOOTSTRAPPING,
IPCP_ENROLLED,
IPCP_DISCONNECTED,
IPCP_SHUTDOWN
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 74fa0d2b..300a5748 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -50,6 +50,7 @@
#include <stdlib.h>
#include <pthread.h>
#include <sys/wait.h>
+#include <fcntl.h>
#define THIS_TYPE IPCP_SHIM_UDP
#define LISTEN_PORT htons(0x0D1F)
@@ -152,8 +153,6 @@ static int shim_ap_init(char * ap_name)
}
rw_lock_init(&_ap_instance->flows_lock);
- rw_lock_init(&_ap_instance->thread_lock);
- rw_lock_init(&_ap_instance->data_lock);
return 0;
}
@@ -165,7 +164,10 @@ void shim_ap_fini()
if (_ap_instance == NULL)
return;
- rw_lock_wrlock(&_ap_instance->data_lock);
+ rw_lock_wrlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_SHUTDOWN)
+ LOG_WARN("Cleaning up AP while not in shutdown.");
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
@@ -184,9 +186,9 @@ void shim_ap_fini()
rw_lock_unlock(&_ap_instance->flows_lock);
- rw_lock_unlock(&_ap_instance->data_lock);
-
free(_ap_instance);
+
+ rw_lock_unlock(&_ipcp->state_lock);
}
/* only call this under flows_lock */
@@ -209,12 +211,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
size_t index;
struct rb_entry e;
- rw_lock_rdlock(&_ap_instance->data_lock);
+ rw_lock_rdlock(&_ipcp->state_lock);
index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count);
if (index == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
@@ -226,17 +228,13 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
shm_release_du_buff(_ap_instance->dum, index);
-
- rw_lock_unlock(&_ap_instance->data_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
return -EPIPE;
}
rw_lock_unlock(&_ap_instance->flows_lock);
-
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
return 0;
}
@@ -256,8 +254,8 @@ struct ipcp_udp_data {
struct sockaddr_in s_saddr;
int s_fd;
+ /* only modify under _ap_instance->flows_lock */
fd_set flow_fd_s;
- rw_lock_t fd_lock;
};
struct ipcp_udp_data * ipcp_udp_data_create()
@@ -279,8 +277,6 @@ struct ipcp_udp_data * ipcp_udp_data_create()
return NULL;
}
- rw_lock_init(&udp_data->fd_lock);
-
FD_ZERO(&udp_data->flow_fd_s);
return udp_data;
@@ -309,10 +305,6 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
clean_threads = true;
}
- _ipcp->state = IPCP_SHUTDOWN;
-
- rw_lock_unlock(&_ipcp->state_lock);
-
if (clean_threads) {
rw_lock_wrlock(&_ap_instance->thread_lock);
@@ -329,6 +321,10 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
pthread_cancel(_ap_instance->mainloop);
+ _ipcp->state = IPCP_SHUTDOWN;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}
@@ -349,18 +345,25 @@ static void * ipcp_udp_listener()
while (true) {
int fd;
int port_id;
+
+ rw_lock_rdlock(&_ipcp->state_lock);
+
memset(&buf, 0, SHIM_UDP_BUF_SIZE);
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
(struct sockaddr *) &c_saddr, (unsigned *) &n);
- if (n < 0)
+ if (n < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
/* flow alloc request from other host */
if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr,
sizeof(c_saddr.sin_addr.s_addr), AF_INET)
- == NULL)
+ == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
@@ -383,12 +386,14 @@ static void * ipcp_udp_listener()
if (connect(fd,
(struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
continue;
}
/* echo back the packet */
if (send(fd, buf, strlen(buf), 0) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to echo back the packet.");
close(fd);
continue;
@@ -403,9 +408,10 @@ static void * ipcp_udp_listener()
UNKNOWN_AE);
if (port_id < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Could not get port id from IRMd");
close(fd);
- rw_lock_unlock(&_ap_instance->flows_lock);
continue;
}
@@ -414,6 +420,7 @@ static void * ipcp_udp_listener()
_ap_instance->flows[fd].state = FLOW_PENDING;
rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
port_id, fd);
@@ -427,9 +434,10 @@ static void * ipcp_udp_sdu_reader()
int n;
int fd;
char buf[SHIM_UDP_MAX_SDU_SIZE];
- struct timeval tv = {0, 1000};
+ struct timeval tv = {0, 750};
struct sockaddr_in r_saddr;
fd_set read_fds;
+ int flags;
while (true) {
rw_lock_rdlock(&_ipcp->state_lock);
@@ -439,20 +447,21 @@ static void * ipcp_udp_sdu_reader()
return (void *) 0;
}
- rw_lock_unlock(&_ipcp->state_lock);
-
- rw_lock_rdlock(&shim_data(_ipcp)->fd_lock);
+ rw_lock_rdlock(&_ap_instance->flows_lock);
read_fds = shim_data(_ipcp)->flow_fd_s;
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
-
- if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
+ if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
for (fd = 0; fd < FD_SETSIZE; ++fd) {
if (!FD_ISSET(fd, &read_fds))
continue;
+ flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
n = sizeof r_saddr;
if ((n = recvfrom(fd,
@@ -467,6 +476,9 @@ static void * ipcp_udp_sdu_reader()
if (ipcp_udp_flow_write(fd, buf, n) < 0)
LOG_ERR("Failed to write SDU.");
}
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
}
return (void *) 0;
@@ -488,14 +500,10 @@ static void * ipcp_udp_sdu_loop(void * o)
return (void *) 0;
}
- rw_lock_unlock(&_ipcp->state_lock);
-
- rw_lock_rdlock(&_ap_instance->data_lock);
-
e = shm_ap_rbuff_read(_ap_instance->rb);
if (e == NULL) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_rdlock(&_ipcp->state_lock);
continue;
}
@@ -503,7 +511,7 @@ static void * ipcp_udp_sdu_loop(void * o)
_ap_instance->dum,
e->index);
if (len == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_rdlock(&_ipcp->state_lock);
free(e);
continue;
}
@@ -512,30 +520,27 @@ static void * ipcp_udp_sdu_loop(void * o)
fd = port_id_to_fd(e->port_id);
- rw_lock_unlock(&_ap_instance->flows_lock);
-
if (fd == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
continue;
}
if (len == 0) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
continue;
}
- rw_lock_unlock(&_ap_instance->data_lock);
-
if (send(fd, buf, len, 0) < 0)
LOG_ERR("Failed to send SDU.");
- rw_lock_rdlock(&_ap_instance->data_lock);
-
shm_release_du_buff(_ap_instance->dum, e->index);
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
}
@@ -563,14 +568,11 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
return -1;
}
- _ipcp->state = IPCP_BOOTSTRAPPING;
-
- rw_lock_unlock(&_ipcp->state_lock);
-
if (inet_ntop(AF_INET,
&conf->ip_addr,
ipstr,
INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to convert IP address");
return -1;
}
@@ -580,6 +582,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
&conf->dns_addr,
dnsstr,
INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to convert DNS address");
return -1;
}
@@ -592,6 +595,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
/* UDP listen server */
if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Can't create socket.");
return -1;
}
@@ -615,11 +619,12 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
if (bind(fd,
(struct sockaddr *) &shim_data(_ipcp)->s_saddr,
sizeof shim_data(_ipcp)->s_saddr ) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Couldn't bind to %s.", ipstr);
return -1;
}
- rw_lock_wrlock(&_ap_instance->thread_lock);
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
pthread_create(&_ap_instance->handler,
NULL,
@@ -635,16 +640,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
ipcp_udp_sdu_loop,
NULL);
- rw_lock_unlock(&_ap_instance->thread_lock);
-
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
- FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
-
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
-
- rw_lock_wrlock(&_ipcp->state_lock);
-
_ipcp->state = IPCP_ENROLLED;
rw_lock_unlock(&_ipcp->state_lock);
@@ -806,9 +801,8 @@ static int ipcp_udp_name_reg(char * name)
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
}
@@ -822,11 +816,13 @@ static int ipcp_udp_name_reg(char * name)
if (inet_ntop(AF_INET, &ip_addr,
ipstr, INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
if (inet_ntop(AF_INET, &dns_addr,
dnsstr, INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
@@ -835,11 +831,14 @@ static int ipcp_udp_name_reg(char * name)
if (ddns_send(cmd)) {
ipcp_data_del_reg_entry(_ipcp->data, name);
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
}
#endif
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Registered %s.", name);
return 0;
@@ -870,12 +869,11 @@ static int ipcp_udp_name_unreg(char * name)
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
dns_addr = shim_data(_ipcp)->dns_addr;
if (dns_addr != 0) {
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
== NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n",
@@ -887,6 +885,8 @@ static int ipcp_udp_name_unreg(char * name)
ipcp_data_del_reg_entry(_ipcp->data, name);
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Unregistered %s.", name);
return 0;
@@ -911,6 +911,7 @@ static int ipcp_udp_flow_alloc(int port_id,
uint32_t dns_addr = 0;
#endif
struct shm_ap_rbuff * rb;
+ struct timespec wait = {0, 1000000};
if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
return -1;
@@ -923,11 +924,10 @@ static int ipcp_udp_flow_alloc(int port_id,
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Name too long for this shim.");
return -1;
}
@@ -944,6 +944,7 @@ static int ipcp_udp_flow_alloc(int port_id,
l_saddr.sin_port = 0;
if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
return -1;
}
@@ -954,6 +955,7 @@ static int ipcp_udp_flow_alloc(int port_id,
if (dns_addr != 0) {
ip_addr = ddns_resolve(dst_name, dns_addr);
if (ip_addr == 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not resolve %s.", dst_name);
close(fd);
return -1;
@@ -962,6 +964,7 @@ static int ipcp_udp_flow_alloc(int port_id,
#endif
h = gethostbyname(dst_name);
if (h == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not resolve %s.", dst_name);
close(fd);
return -1;
@@ -979,6 +982,7 @@ static int ipcp_udp_flow_alloc(int port_id,
if (sendto(fd, dst_name, strlen(dst_name), 0,
(struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to send packet");
close(fd);
return -1;
@@ -988,6 +992,7 @@ static int ipcp_udp_flow_alloc(int port_id,
recv_buf = malloc(strlen(dst_name) + 1);
if (recv_buf == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to malloc recv_buff.");
close(fd);
return -1;
@@ -1004,6 +1009,7 @@ static int ipcp_udp_flow_alloc(int port_id,
(struct sockaddr *) &rf_saddr,
sizeof rf_saddr)
< 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
free(recv_buf);
return -1;
@@ -1016,6 +1022,7 @@ static int ipcp_udp_flow_alloc(int port_id,
rb = shm_ap_rbuff_open(n_pid);
if (rb == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Could not open N + 1 ringbuffer.");
close(fd);
return -1; /* -ENORBUFF */
@@ -1026,22 +1033,23 @@ static int ipcp_udp_flow_alloc(int port_id,
_ap_instance->flows[fd].state = FLOW_ALLOCATED;
_ap_instance->flows[fd].rb = rb;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ nanosleep(&wait, NULL);
+
rw_lock_unlock(&_ap_instance->flows_lock);
/* tell IRMd that flow allocation "worked" */
if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
+ rw_lock_unlock(&_ipcp->state_lock);
shm_ap_rbuff_close(rb);
LOG_ERR("Failed to notify IRMd about flow allocation reply");
close(fd);
return -1;
}
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
- FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
-
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);
@@ -1053,58 +1061,52 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
int response)
{
struct shm_ap_rbuff * rb;
+ struct timespec wait = {0, 1000000};
int fd = -1;
if (response)
return 0;
+ rw_lock_unlock(&_ipcp->state_lock);
+
/* awaken pending flow */
- rw_lock_rdlock(&_ap_instance->flows_lock);
+ rw_lock_wrlock(&_ap_instance->flows_lock);
fd = port_id_to_fd(port_id);
if (fd < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
if (_ap_instance->flows[fd].state != FLOW_PENDING) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Flow was not pending.");
return -1;
}
- rw_lock_unlock(&_ap_instance->flows_lock);
-
rb = shm_ap_rbuff_open(n_pid);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
-
- rw_lock_wrlock(&_ap_instance->flows_lock);
-
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
-
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
return 0;
}
- rw_lock_wrlock(&_ap_instance->flows_lock);
-
_ap_instance->flows[fd].state = FLOW_ALLOCATED;
_ap_instance->flows[fd].rb = rb;
- rw_lock_unlock(&_ap_instance->flows_lock);
-
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ nanosleep(&wait, NULL);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
@@ -1115,13 +1117,15 @@ static int ipcp_udp_flow_dealloc(int port_id)
{
int fd = -1;
struct shm_ap_rbuff * rb;
+ struct timespec wait = {0, 1000000};
+ rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
fd = port_id_to_fd(port_id);
if (fd < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
@@ -1131,19 +1135,18 @@ static int ipcp_udp_flow_dealloc(int port_id)
rb = _ap_instance->flows[fd].rb;
_ap_instance->flows[fd].rb = NULL;
- rw_lock_unlock(&_ap_instance->flows_lock);
-
if (rb != NULL)
shm_ap_rbuff_close(rb);
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ nanosleep(&wait, NULL);
close(fd);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
+
return 0;
}