summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-05-21 17:09:37 +0200
committerdimitri staessens <[email protected]>2016-05-21 17:25:26 +0200
commit546de0e99ce1b9a9de70ccc092a60778f99b4358 (patch)
tree85093330ad23e241c370eb5f7d466e9f5855691b
parentd53fd8ef0228f287568cc51b87733780591308fc (diff)
downloadouroboros-546de0e99ce1b9a9de70ccc092a60778f99b4358.tar.gz
ouroboros-546de0e99ce1b9a9de70ccc092a60778f99b4358.zip
lib, irmd, ipcpd: Flow deallocation over shim UDP
The shim UDP now supports deallocating a flow end-to-end. Contains some stability fixes for flow allocation and some missing close() calls in lib/sockets.
-rw-r--r--src/ipcpd/shim-udp/main.c200
-rw-r--r--src/ipcpd/shim-udp/shim_udp_messages.proto5
-rw-r--r--src/irmd/main.c37
-rw-r--r--src/lib/dev.c26
-rw-r--r--src/lib/sockets.c3
5 files changed, 210 insertions, 61 deletions
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 1284a0e5..362e03d5 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -158,6 +158,7 @@ static int shim_ap_init(char * ap_name)
rw_lock_init(&_ap_instance->flows_lock);
pthread_mutex_init(&_ap_instance->fd_set_lock, NULL);
+
return 0;
}
@@ -228,7 +229,7 @@ static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
e.index = index;
- rw_lock_wrlock(&_ap_instance->flows_lock);
+ rw_lock_rdlock(&_ap_instance->flows_lock);
e.port_id = _ap_instance->flows[fd].port_id;
@@ -304,6 +305,45 @@ struct ipcp_udp_data * ipcp_udp_data_create()
return udp_data;
}
+static void set_fd(int fd)
+{
+ bool fd_wait = true;
+
+ pthread_mutex_lock(&_ap_instance->fd_set_lock);
+
+ _ap_instance->fd_set_sync = true;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+
+ while (fd_wait) {
+ sched_yield();
+ pthread_mutex_lock(&_ap_instance->fd_set_lock);
+ fd_wait = _ap_instance->fd_set_sync;
+ pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+ }
+}
+
+static void clr_fd(int fd)
+{
+ bool fd_wait = true;
+
+ pthread_mutex_lock(&_ap_instance->fd_set_lock);
+
+ _ap_instance->fd_set_sync = true;
+ FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+
+ while (fd_wait) {
+ sched_yield();
+ pthread_mutex_lock(&_ap_instance->fd_set_lock);
+ fd_wait = _ap_instance->fd_set_sync;
+ pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+ }
+}
+
+
static int send_shim_udp_msg(shim_udp_msg_t * msg,
uint32_t dst_ip_addr)
{
@@ -360,7 +400,7 @@ static int ipcp_udp_port_alloc(uint32_t dst_ip_addr,
return send_shim_udp_msg(&msg, dst_ip_addr);
}
-static int ipcp_udp_port_alloc_resp(uint32_t ip_addr,
+static int ipcp_udp_port_alloc_resp(uint32_t dst_ip_addr,
uint16_t src_udp_port,
uint16_t dst_udp_port,
int response)
@@ -374,7 +414,18 @@ static int ipcp_udp_port_alloc_resp(uint32_t ip_addr,
msg.has_response = true;
msg.response = response;
- return send_shim_udp_msg(&msg, ip_addr);
+ return send_shim_udp_msg(&msg, dst_ip_addr);
+}
+
+static int ipcp_udp_port_dealloc(uint32_t dst_ip_addr,
+ uint16_t src_udp_port)
+{
+ shim_udp_msg_t msg = SHIM_UDP_MSG__INIT;
+
+ msg.code = SHIM_UDP_MSG_CODE__FLOW_DEALLOC;
+ msg.src_udp_port = src_udp_port;
+
+ return send_shim_udp_msg(&msg, dst_ip_addr);
}
static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
@@ -464,9 +515,9 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port,
int dst_udp_port,
int response)
{
- int fd = -1;
- int ret = 0;
- int port_id = -1;
+ int fd = -1;
+ int ret = 0;
+ int port_id = -1;
struct sockaddr_in t_saddr;
socklen_t t_saddr_len = sizeof(t_saddr);
@@ -536,12 +587,57 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port,
}
LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).",
- ntohs(src_udp_port), ntohs(dst_udp_port));
+ ntohs(dst_udp_port), ntohs(src_udp_port));
return ret;
}
+static int ipcp_udp_flow_dealloc_req(int udp_port)
+{
+ int fd = -1;
+ struct shm_ap_rbuff * rb;
+ int port_id = -1;
+
+ LOG_DBGF("Remote notified flow deallocation on port %d.",
+ ntohs(udp_port));
+
+ rw_lock_rdlock(&_ipcp->state_lock);
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ fd = udp_port_to_fd(udp_port);
+ if (fd < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Could not find flow on UDP port %d.",
+ ntohs(udp_port));
+ return 0;
+ }
+
+ clr_fd(fd);
+
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ port_id = _ap_instance->flows[fd].port_id;
+ _ap_instance->flows[fd].port_id = -1;
+ rb = _ap_instance->flows[fd].rb;
+ _ap_instance->flows[fd].rb = NULL;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ if (rb != NULL)
+ shm_ap_rbuff_close(rb);
+
+ rw_lock_unlock(&_ipcp->state_lock);
+
+ ipcp_flow_dealloc(0, port_id);
+
+ close(fd);
+
+ LOG_DBGF("Flow on UDP port %d deallocated.", ntohs(udp_port));
+
+ return 0;
+}
+
static void * ipcp_udp_listener()
{
uint8_t buf[SHIM_UDP_MSG_SIZE];
@@ -598,6 +694,9 @@ static void * ipcp_udp_listener()
msg->dst_udp_port,
msg->response);
break;
+ case SHIM_UDP_MSG_CODE__FLOW_DEALLOC:
+ ipcp_udp_flow_dealloc_req(msg->src_udp_port);
+ break;
default:
LOG_ERR("Unknown message received %d.", msg->code);
shim_udp_msg__free_unpacked(msg, NULL);
@@ -1123,7 +1222,6 @@ static int ipcp_udp_flow_alloc(pid_t n_pid,
int fd;
struct hostent * h;
uint32_t ip_addr = 0;
- bool fd_wait = true;
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
uint32_t dns_addr = 0;
#endif
@@ -1220,19 +1318,7 @@ static int ipcp_udp_flow_alloc(pid_t n_pid,
rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
-
- _ap_instance->fd_set_sync = true;
- FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
-
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
-
- while (fd_wait) {
- sched_yield();
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
- fd_wait = _ap_instance->fd_set_sync;
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
- }
+ set_fd(fd);
_ap_instance->flows[fd].port_id = port_id;
_ap_instance->flows[fd].state = FLOW_PENDING;
@@ -1250,7 +1336,7 @@ static int ipcp_udp_flow_alloc(pid_t n_pid,
rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
- FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+ clr_fd(fd);
_ap_instance->flows[fd].port_id = -1;
_ap_instance->flows[fd].state = FLOW_NULL;
@@ -1275,7 +1361,6 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
struct sockaddr_in f_saddr;
struct sockaddr_in r_saddr;
socklen_t len = sizeof(r_saddr);
- bool fd_wait = true;
if (response)
return 0;
@@ -1291,7 +1376,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return 0;
+ return -1;
}
if (_ap_instance->flows[fd].state != FLOW_PENDING) {
@@ -1308,23 +1393,21 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
_ap_instance->flows[fd].port_id = -1;
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ipcp->state_lock);
- return 0;
+ return -1;
}
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ipcp->state_lock);
if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) {
- rw_lock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow with port_id %d has no peer.", port_id);
- return 0;
- };
+ LOG_DBGF("Flow with port_id %d has no socket.", port_id);
+ return -1;
+ }
if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) {
- rw_lock_unlock(&_ipcp->state_lock);
LOG_DBGF("Flow with port_id %d has no peer.", port_id);
- return 0;
- };
+ return -1;
+ }
rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
@@ -1332,19 +1415,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
_ap_instance->flows[fd].state = FLOW_ALLOCATED;
_ap_instance->flows[fd].rb = rb;
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
-
- _ap_instance->fd_set_sync = true;
- FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
-
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
-
- while (fd_wait) {
- sched_yield();
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
- fd_wait = _ap_instance->fd_set_sync;
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
- }
+ set_fd(fd);
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ipcp->state_lock);
@@ -1360,7 +1431,7 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
_ap_instance->flows[fd].rb = NULL;
- FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+ clr_fd(fd);
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ipcp->state_lock);
@@ -1377,13 +1448,16 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_pid,
static int ipcp_udp_flow_dealloc(int port_id)
{
int fd = -1;
+ int remote_udp = -1;
struct shm_ap_rbuff * rb;
-
- LOG_DBGF("Deallocating flow with port_id %d.", port_id);
+ struct sockaddr_in r_saddr;
+ socklen_t r_saddr_len = sizeof(r_saddr);
rw_lock_rdlock(&_ipcp->state_lock);
rw_lock_wrlock(&_ap_instance->flows_lock);
+ LOG_DBGF("Grabbed lock for deallocating flow.");
+
fd = port_id_to_fd(port_id);
if (fd < 0) {
rw_lock_unlock(&_ap_instance->flows_lock);
@@ -1393,17 +1467,43 @@ static int ipcp_udp_flow_dealloc(int port_id)
}
_ap_instance->flows[fd].state = FLOW_NULL;
- _ap_instance->flows[fd].port_id = 0;
+ _ap_instance->flows[fd].port_id = -1;
rb = _ap_instance->flows[fd].rb;
_ap_instance->flows[fd].rb = NULL;
- FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+ clr_fd(fd);
rw_lock_unlock(&_ap_instance->flows_lock);
+ LOG_DBGF("Deallocated flow with port_id %d, fd %d.", port_id, fd);
+
if (rb != NULL)
shm_ap_rbuff_close(rb);
+ if (getpeername(fd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ LOG_DBGF("Flow with port_id %d has no peer.", port_id);
+ close(fd);
+ return 0 ;
+ }
+
+ remote_udp = r_saddr.sin_port;
+ r_saddr.sin_port = LISTEN_PORT;
+
+ if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
+ rw_lock_unlock(&_ipcp->state_lock);
+ close(fd);
+ return 0 ;
+ }
+
+ if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr,
+ remote_udp) < 0) {
+ LOG_DBGF("Could not notify remote.");
+ rw_lock_unlock(&_ipcp->state_lock);
+ close(fd);
+ return 0;
+ }
+
rw_lock_unlock(&_ipcp->state_lock);
close(fd);
diff --git a/src/ipcpd/shim-udp/shim_udp_messages.proto b/src/ipcpd/shim-udp/shim_udp_messages.proto
index 1d054f1f..a15fc18c 100644
--- a/src/ipcpd/shim-udp/shim_udp_messages.proto
+++ b/src/ipcpd/shim-udp/shim_udp_messages.proto
@@ -1,6 +1,7 @@
enum shim_udp_msg_code {
- FLOW_REQ = 1;
- FLOW_REPLY = 2;
+ FLOW_REQ = 1;
+ FLOW_REPLY = 2;
+ FLOW_DEALLOC = 3;
};
message shim_udp_msg {
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 28f82751..f1993960 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -318,11 +318,6 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)
free(e->name);
instance_name_destroy(e->api);
- if (e->req_ap_name != NULL)
- free(e->req_ap_name);
- if (e->req_ae_name != NULL)
- free(e->req_ae_name);
-
free(e);
e = NULL;
@@ -389,7 +384,6 @@ static int reg_name_entry_del_name(char * name)
return 0;
list_del(&e->next);
-
reg_name_entry_destroy(e);
return 0;
@@ -821,6 +815,7 @@ static struct port_map_entry * flow_accept(pid_t pid,
LOG_DBGF("Unregistered AP calling accept().");
return NULL;
}
+
if (rne->accept) {
rw_lock_unlock(&instance->reg_lock);
rw_lock_unlock(&instance->state_lock);
@@ -1016,6 +1011,12 @@ static int flow_alloc_res(int port_id)
return -1;
}
+ if (e->state == FLOW_NULL) {
+ rw_lock_unlock(&instance->flows_lock);
+ rw_lock_unlock(&instance->state_lock);
+ return -1;
+ }
+
if (e->state == FLOW_ALLOCATED) {
rw_lock_unlock(&instance->flows_lock);
rw_lock_unlock(&instance->state_lock);
@@ -1105,6 +1106,7 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
{
struct reg_name_entry * rne;
struct port_map_entry * pme;
+ bool acc_wait = true;
pme = malloc(sizeof(*pme));
if (pme == NULL) {
@@ -1133,13 +1135,19 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
pme->n_pid = rne->api->id;
- rne->req_ap_name = strdup(ap_name);
- rne->req_ae_name = strdup(ae_name);
-
list_add(&pme->next, &instance->port_map);
pthread_mutex_lock(&rne->acc_lock);
+ rne->req_ap_name = ap_name;
+ rne->req_ae_name = ae_name;
+
+ if (rne->accept == false) {
+ pthread_mutex_unlock(&rne->acc_lock);
+ LOG_WARN("This AP is not accepting flow allocations.");
+ return NULL;
+ }
+
rne->flow_arrived = 0;
pthread_mutex_unlock(&rne->acc_lock);
@@ -1147,6 +1155,13 @@ static struct port_map_entry * flow_req_arr(pid_t pid,
if (pthread_cond_signal(&rne->acc_signal))
LOG_ERR("Failed to send signal.");
+ while (acc_wait) {
+ sched_yield();
+ pthread_mutex_lock(&rne->acc_lock);
+ acc_wait = (rne->flow_arrived != -1);
+ pthread_mutex_unlock(&rne->acc_lock);
+ }
+
rw_lock_unlock(&instance->flows_lock);
rw_lock_unlock(&instance->reg_lock);
rw_lock_unlock(&instance->state_lock);
@@ -1202,6 +1217,8 @@ static int flow_dealloc_ipcp(int port_id)
return 0;
}
+ bmp_release(instance->port_ids, port_id);
+
list_del(&e->next);
rw_lock_unlock(&instance->flows_lock);
@@ -1245,8 +1262,10 @@ static void irm_destroy(struct irm * irm)
struct port_map_entry * e = list_entry(h,
struct port_map_entry,
next);
+
list_del(&e->next);
free(e);
+
}
if (irm->dum != NULL)
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 63cf92f7..95fca1ec 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -154,6 +154,26 @@ static int port_id_to_fd(int port_id)
}
#endif
+static void clean_fds()
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
+ if (flow_alloc_res(i) < 0) {
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+ if (_ap_instance->flows[i].port_id < 0) {
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ continue;
+ }
+
+ bmp_release(_ap_instance->fds, i);
+ _ap_instance->flows[i].port_id = -1;
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ _ap_instance->flows[i].rb = NULL;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ }
+ }
+}
+
int ap_reg(char ** difs,
size_t len)
{
@@ -202,6 +222,7 @@ int ap_reg(char ** difs,
rw_lock_wrlock(&_ap_instance->flows_lock);
fd = bmp_allocate(_ap_instance->fds);
+ _ap_instance->flows[fd].port_id = -1;
rw_lock_unlock(&_ap_instance->flows_lock);
rw_lock_unlock(&_ap_instance->data_lock);
@@ -268,6 +289,8 @@ int flow_accept(int fd,
msg.pid = _ap_instance->api->id;
+ clean_fds();
+
rw_lock_unlock(&_ap_instance->data_lock);
recv_msg = send_recv_irm_msg(&msg);
@@ -389,6 +412,8 @@ int flow_alloc(char * dst_name,
msg.pid = _ap_instance->api->id;
msg.ap_name = _ap_instance->api->name;
+ clean_fds();
+
rw_lock_unlock(&_ap_instance->data_lock);
recv_msg = send_recv_irm_msg(&msg);
@@ -402,6 +427,7 @@ int flow_alloc(char * dst_name,
}
rw_lock_rdlock(&_ap_instance->data_lock);
+
rw_lock_wrlock(&_ap_instance->flows_lock);
fd = bmp_allocate(_ap_instance->fds);
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 4f777805..6c51e916 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -50,6 +50,7 @@ int client_socket_open(char * file_name)
(struct sockaddr *) &serv_addr,
sizeof(serv_addr))) {
LOG_ERR("Failed to connect to daemon");
+ close(sockfd);
return -1;
}
@@ -83,11 +84,13 @@ int server_socket_open(char * file_name)
(struct sockaddr *) &serv_addr,
sizeof(serv_addr))) {
LOG_ERR("Failed to bind socket");
+ close(sockfd);
return -1;
}
if (listen(sockfd, 0)) {
LOG_ERR("Failed to listen to socket");
+ close(sockfd);
return -1;
}