summaryrefslogtreecommitdiff
path: root/src/ipcpd/shim-udp
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-05-16 00:42:00 +0200
committerSander Vrijders <[email protected]>2016-05-16 00:42:00 +0200
commitbd270e67f1b4cb9cdbd7099ba7a6a458acc49169 (patch)
tree04f1110e5d411d00cf4e9ca0651b7bd2573a482b /src/ipcpd/shim-udp
parent3313a0b3f37584224686b769a77977e40018d5a3 (diff)
parent80a4808418694fcd9d96dc714a984e157c90a6ab (diff)
downloadouroboros-bd270e67f1b4cb9cdbd7099ba7a6a458acc49169.tar.gz
ouroboros-bd270e67f1b4cb9cdbd7099ba7a6a458acc49169.zip
Merged in dstaesse/ouroboros/be-udp-locks (pull request #92)
ipcpd: shim-udp: Revised locking
Diffstat (limited to 'src/ipcpd/shim-udp')
-rw-r--r--src/ipcpd/shim-udp/main.c195
1 files changed, 94 insertions, 101 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 74fa0d2b..e3f7fcdd 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -50,6 +50,7 @@
#include <stdlib.h>
#include <pthread.h>
#include <sys/wait.h>
+#include <fcntl.h>
#define THIS_TYPE IPCP_SHIM_UDP
#define LISTEN_PORT htons(0x0D1F)
@@ -93,9 +94,6 @@ struct shim_ap_data {
pthread_t sduloop;
pthread_t handler;
pthread_t sdu_reader;
-
- rw_lock_t thread_lock;
-
} * _ap_instance;
static int shim_ap_init(char * ap_name)
@@ -152,8 +150,6 @@ static int shim_ap_init(char * ap_name)
}
rw_lock_init(&_ap_instance->flows_lock);
- rw_lock_init(&_ap_instance->thread_lock);
- rw_lock_init(&_ap_instance->data_lock);
return 0;
}
@@ -165,7 +161,10 @@ void shim_ap_fini()
if (_ap_instance == NULL)
return;
- rw_lock_wrlock(&_ap_instance->data_lock);
+ rw_lock_wrlock(&_ipcp->state_lock);
+
+ if (_ipcp->state != IPCP_SHUTDOWN)
+ LOG_WARN("Cleaning up AP while not in shutdown.");
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
@@ -184,9 +183,9 @@ void shim_ap_fini()
rw_lock_unlock(&_ap_instance->flows_lock);
- rw_lock_unlock(&_ap_instance->data_lock);
-
free(_ap_instance);
+
+ rw_lock_unlock(&_ipcp->state_lock);
}
/* only call this under flows_lock */
@@ -209,12 +208,12 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
size_t index;
struct rb_entry e;
- rw_lock_rdlock(&_ap_instance->data_lock);
+ rw_lock_rdlock(&_ipcp->state_lock);
index = shm_create_du_buff(_ap_instance->dum, count, 0, buf, count);
if (index == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
@@ -226,17 +225,13 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
shm_release_du_buff(_ap_instance->dum, index);
-
- rw_lock_unlock(&_ap_instance->data_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
return -EPIPE;
}
rw_lock_unlock(&_ap_instance->flows_lock);
-
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
return 0;
}
@@ -256,8 +251,8 @@ struct ipcp_udp_data {
struct sockaddr_in s_saddr;
int s_fd;
+ /* only modify under _ap_instance->flows_lock */
fd_set flow_fd_s;
- rw_lock_t fd_lock;
};
struct ipcp_udp_data * ipcp_udp_data_create()
@@ -279,8 +274,6 @@ struct ipcp_udp_data * ipcp_udp_data_create()
return NULL;
}
- rw_lock_init(&udp_data->fd_lock);
-
FD_ZERO(&udp_data->flow_fd_s);
return udp_data;
@@ -309,13 +302,7 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
clean_threads = true;
}
- _ipcp->state = IPCP_SHUTDOWN;
-
- rw_lock_unlock(&_ipcp->state_lock);
-
if (clean_threads) {
- rw_lock_wrlock(&_ap_instance->thread_lock);
-
pthread_cancel(_ap_instance->handler);
pthread_cancel(_ap_instance->sdu_reader);
pthread_cancel(_ap_instance->sduloop);
@@ -323,12 +310,14 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
pthread_join(_ap_instance->sduloop, NULL);
pthread_join(_ap_instance->handler, NULL);
pthread_join(_ap_instance->sdu_reader, NULL);
-
- rw_lock_unlock(&_ap_instance->thread_lock);
}
pthread_cancel(_ap_instance->mainloop);
+ _ipcp->state = IPCP_SHUTDOWN;
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}
@@ -349,18 +338,25 @@ static void * ipcp_udp_listener()
while (true) {
int fd;
int port_id;
+
+ rw_lock_rdlock(&_ipcp->state_lock);
+
memset(&buf, 0, SHIM_UDP_BUF_SIZE);
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
(struct sockaddr *) &c_saddr, (unsigned *) &n);
- if (n < 0)
+ if (n < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
/* flow alloc request from other host */
if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr,
sizeof(c_saddr.sin_addr.s_addr), AF_INET)
- == NULL)
+ == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
@@ -383,12 +379,14 @@ static void * ipcp_udp_listener()
if (connect(fd,
(struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
continue;
}
/* echo back the packet */
if (send(fd, buf, strlen(buf), 0) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to echo back the packet.");
close(fd);
continue;
@@ -403,9 +401,10 @@ static void * ipcp_udp_listener()
UNKNOWN_AE);
if (port_id < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Could not get port id from IRMd");
close(fd);
- rw_lock_unlock(&_ap_instance->flows_lock);
continue;
}
@@ -414,6 +413,7 @@ static void * ipcp_udp_listener()
_ap_instance->flows[fd].state = FLOW_PENDING;
rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
port_id, fd);
@@ -427,9 +427,10 @@ static void * ipcp_udp_sdu_reader()
int n;
int fd;
char buf[SHIM_UDP_MAX_SDU_SIZE];
- struct timeval tv = {0, 1000};
+ struct timeval tv = {0, 100};
struct sockaddr_in r_saddr;
fd_set read_fds;
+ int flags;
while (true) {
rw_lock_rdlock(&_ipcp->state_lock);
@@ -439,20 +440,21 @@ static void * ipcp_udp_sdu_reader()
return (void *) 0;
}
- rw_lock_unlock(&_ipcp->state_lock);
-
- rw_lock_rdlock(&shim_data(_ipcp)->fd_lock);
+ rw_lock_rdlock(&_ap_instance->flows_lock);
read_fds = shim_data(_ipcp)->flow_fd_s;
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
-
- if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
+ if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
+ }
for (fd = 0; fd < FD_SETSIZE; ++fd) {
if (!FD_ISSET(fd, &read_fds))
continue;
+ flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
n = sizeof r_saddr;
if ((n = recvfrom(fd,
@@ -467,6 +469,9 @@ static void * ipcp_udp_sdu_reader()
if (ipcp_udp_flow_write(fd, buf, n) < 0)
LOG_ERR("Failed to write SDU.");
}
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
}
return (void *) 0;
@@ -488,14 +493,10 @@ static void * ipcp_udp_sdu_loop(void * o)
return (void *) 0;
}
- rw_lock_unlock(&_ipcp->state_lock);
-
- rw_lock_rdlock(&_ap_instance->data_lock);
-
e = shm_ap_rbuff_read(_ap_instance->rb);
if (e == NULL) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
continue;
}
@@ -503,7 +504,7 @@ static void * ipcp_udp_sdu_loop(void * o)
_ap_instance->dum,
e->index);
if (len == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
continue;
}
@@ -512,30 +513,27 @@ static void * ipcp_udp_sdu_loop(void * o)
fd = port_id_to_fd(e->port_id);
- rw_lock_unlock(&_ap_instance->flows_lock);
-
if (fd == -1) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
continue;
}
if (len == 0) {
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
continue;
}
- rw_lock_unlock(&_ap_instance->data_lock);
-
if (send(fd, buf, len, 0) < 0)
LOG_ERR("Failed to send SDU.");
- rw_lock_rdlock(&_ap_instance->data_lock);
-
shm_release_du_buff(_ap_instance->dum, e->index);
- rw_lock_unlock(&_ap_instance->data_lock);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
free(e);
}
@@ -563,14 +561,11 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
return -1;
}
- _ipcp->state = IPCP_BOOTSTRAPPING;
-
- rw_lock_unlock(&_ipcp->state_lock);
-
if (inet_ntop(AF_INET,
&conf->ip_addr,
ipstr,
INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to convert IP address");
return -1;
}
@@ -580,6 +575,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
&conf->dns_addr,
dnsstr,
INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to convert DNS address");
return -1;
}
@@ -592,6 +588,7 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
/* UDP listen server */
if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Can't create socket.");
return -1;
}
@@ -615,11 +612,12 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
if (bind(fd,
(struct sockaddr *) &shim_data(_ipcp)->s_saddr,
sizeof shim_data(_ipcp)->s_saddr ) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Couldn't bind to %s.", ipstr);
return -1;
}
- rw_lock_wrlock(&_ap_instance->thread_lock);
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
pthread_create(&_ap_instance->handler,
NULL,
@@ -635,16 +633,6 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
ipcp_udp_sdu_loop,
NULL);
- rw_lock_unlock(&_ap_instance->thread_lock);
-
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
- FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
-
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
-
- rw_lock_wrlock(&_ipcp->state_lock);
-
_ipcp->state = IPCP_ENROLLED;
rw_lock_unlock(&_ipcp->state_lock);
@@ -806,9 +794,8 @@ static int ipcp_udp_name_reg(char * name)
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
}
@@ -822,11 +809,13 @@ static int ipcp_udp_name_reg(char * name)
if (inet_ntop(AF_INET, &ip_addr,
ipstr, INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
if (inet_ntop(AF_INET, &dns_addr,
dnsstr, INET_ADDRSTRLEN) == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
@@ -835,11 +824,14 @@ static int ipcp_udp_name_reg(char * name)
if (ddns_send(cmd)) {
ipcp_data_del_reg_entry(_ipcp->data, name);
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
}
#endif
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Registered %s.", name);
return 0;
@@ -870,12 +862,11 @@ static int ipcp_udp_name_unreg(char * name)
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
dns_addr = shim_data(_ipcp)->dns_addr;
if (dns_addr != 0) {
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
== NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
return -1;
}
sprintf(cmd, "server %s\nupdate delete %s A\nsend\nquit\n",
@@ -887,6 +878,8 @@ static int ipcp_udp_name_unreg(char * name)
ipcp_data_del_reg_entry(_ipcp->data, name);
+ rw_lock_unlock(&_ipcp->state_lock);
+
LOG_DBG("Unregistered %s.", name);
return 0;
@@ -923,11 +916,10 @@ static int ipcp_udp_flow_alloc(int port_id,
return -1; /* -ENOTENROLLED */
}
- rw_lock_unlock(&_ipcp->state_lock);
-
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Name too long for this shim.");
return -1;
}
@@ -944,6 +936,7 @@ static int ipcp_udp_flow_alloc(int port_id,
l_saddr.sin_port = 0;
if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
return -1;
}
@@ -954,6 +947,7 @@ static int ipcp_udp_flow_alloc(int port_id,
if (dns_addr != 0) {
ip_addr = ddns_resolve(dst_name, dns_addr);
if (ip_addr == 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not resolve %s.", dst_name);
close(fd);
return -1;
@@ -962,6 +956,7 @@ static int ipcp_udp_flow_alloc(int port_id,
#endif
h = gethostbyname(dst_name);
if (h == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not resolve %s.", dst_name);
close(fd);
return -1;
@@ -979,6 +974,7 @@ static int ipcp_udp_flow_alloc(int port_id,
if (sendto(fd, dst_name, strlen(dst_name), 0,
(struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to send packet");
close(fd);
return -1;
@@ -988,6 +984,7 @@ static int ipcp_udp_flow_alloc(int port_id,
recv_buf = malloc(strlen(dst_name) + 1);
if (recv_buf == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Failed to malloc recv_buff.");
close(fd);
return -1;
@@ -1004,6 +1001,7 @@ static int ipcp_udp_flow_alloc(int port_id,
(struct sockaddr *) &rf_saddr,
sizeof rf_saddr)
< 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
close(fd);
free(recv_buf);
return -1;
@@ -1016,6 +1014,7 @@ static int ipcp_udp_flow_alloc(int port_id,
rb = shm_ap_rbuff_open(n_pid);
if (rb == NULL) {
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_ERR("Could not open N + 1 ringbuffer.");
close(fd);
return -1; /* -ENORBUFF */
@@ -1026,22 +1025,21 @@ static int ipcp_udp_flow_alloc(int port_id,
_ap_instance->flows[fd].state = FLOW_ALLOCATED;
_ap_instance->flows[fd].rb = rb;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+
rw_lock_unlock(&_ap_instance->flows_lock);
/* tell IRMd that flow allocation "worked" */
if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
+ rw_lock_unlock(&_ipcp->state_lock);
shm_ap_rbuff_close(rb);
LOG_ERR("Failed to notify IRMd about flow allocation reply");
close(fd);
return -1;
}
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
- FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
-
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Allocated flow with port_id %d on UDP fd %d.", port_id, fd);
@@ -1053,58 +1051,52 @@ static int ipcp_udp_flow_alloc_resp(int port_id,
int response)
{
struct shm_ap_rbuff * rb;
+ struct timespec wait = {0, 1000000};
int fd = -1;
if (response)
return 0;
+ rw_lock_unlock(&_ipcp->state_lock);
+
/* awaken pending flow */
- rw_lock_rdlock(&_ap_instance->flows_lock);
+ rw_lock_wrlock(&_ap_instance->flows_lock);
fd = port_id_to_fd(port_id);
if (fd < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
if (_ap_instance->flows[fd].state != FLOW_PENDING) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Flow was not pending.");
return -1;
}
- rw_lock_unlock(&_ap_instance->flows_lock);
-
rb = shm_ap_rbuff_open(n_pid);
if (rb == NULL) {
LOG_ERR("Could not open N + 1 ringbuffer.");
-
- rw_lock_wrlock(&_ap_instance->flows_lock);
-
_ap_instance->flows[fd].state = FLOW_NULL;
_ap_instance->flows[fd].port_id = 0;
-
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
return 0;
}
- rw_lock_wrlock(&_ap_instance->flows_lock);
-
_ap_instance->flows[fd].state = FLOW_ALLOCATED;
_ap_instance->flows[fd].rb = rb;
- rw_lock_unlock(&_ap_instance->flows_lock);
-
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ nanosleep(&wait, NULL);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
@@ -1115,13 +1107,15 @@ static int ipcp_udp_flow_dealloc(int port_id)
{
int fd = -1;
struct shm_ap_rbuff * rb;
+ struct timespec wait = {0, 1000000};
+ rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
fd = port_id_to_fd(port_id);
if (fd < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
-
+ rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not find flow with port_id %d.", port_id);
return 0;
}
@@ -1131,19 +1125,18 @@ static int ipcp_udp_flow_dealloc(int port_id)
rb = _ap_instance->flows[fd].rb;
_ap_instance->flows[fd].rb = NULL;
- rw_lock_unlock(&_ap_instance->flows_lock);
-
if (rb != NULL)
shm_ap_rbuff_close(rb);
- rw_lock_wrlock(&shim_data(_ipcp)->fd_lock);
-
FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
- rw_lock_unlock(&shim_data(_ipcp)->fd_lock);
+ nanosleep(&wait, NULL);
close(fd);
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
+
return 0;
}