From 71f10f5efab37f3df3d909d324cff2e098d21c85 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Fri, 7 Oct 2016 15:25:22 +0200 Subject: lib, dev: Add asynchronous deallocation Flow deallocation from the application will immediately return (void call). The IRMd will not send a reply message. --- include/ouroboros/sockets.h | 2 ++ src/ipcpd/ipcp.c | 11 +++++++---- src/irmd/main.c | 45 ++++++++++++++++++--------------------------- src/lib/dev.c | 38 ++++++++++++++------------------------ src/lib/sockets.c | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 55 deletions(-) diff --git a/include/ouroboros/sockets.h b/include/ouroboros/sockets.h index aef4259e..3885ffb2 100644 --- a/include/ouroboros/sockets.h +++ b/include/ouroboros/sockets.h @@ -54,4 +54,6 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg); irm_msg_t * send_recv_irm_msg_b(irm_msg_t * msg); +void send_irm_msg(irm_msg_t * msg); + #endif diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index db72b88d..a9f80ee7 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -313,7 +313,8 @@ void * ipcp_main_loop(void * o) } fd = np1_flow_alloc(msg->api, msg->port_id); if (fd < 0) { - LOG_ERR("Could not get fd for flow."); + LOG_ERR("Could not get fd for port_id. %d", + msg->port_id); ret_msg.has_result = true; ret_msg.result = -1; break; @@ -326,7 +327,7 @@ void * ipcp_main_loop(void * o) msg->src_ae_name, msg->qos_cube); if (ret_msg.result < 0) { - LOG_DBG("Deallocating failed flow on port_id %d.", + LOG_DBG("Deallocate failed on port_id %d.", msg->port_id); flow_dealloc(fd); } @@ -340,7 +341,8 @@ void * ipcp_main_loop(void * o) if (!msg->response) { fd = np1_flow_resp(msg->api, msg->port_id); if (fd < 0) { - LOG_ERR("Could not get fd for flow."); + LOG_ERR("Could not get fd for port_id %d.", + msg->port_id); ret_msg.has_result = true; ret_msg.result = -1; break; @@ -359,7 +361,8 @@ void * ipcp_main_loop(void * o) fd = np1_flow_dealloc(msg->port_id); if (fd < 0) { - LOG_ERR("Could not get fd for flow."); + LOG_ERR("Could not deallocate port_id %d.", + msg->port_id); ret_msg.has_result = true; ret_msg.result = -1; break; diff --git a/src/irmd/main.c b/src/irmd/main.c index 523741ef..24a49c49 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -587,8 +587,6 @@ static int bind_api(pid_t api, if (name == NULL) return -EINVAL; - LOG_DBG("BIND_API called %d, %s", api, name); - pthread_rwlock_rdlock(&irmd->state_lock); if (irmd->state != IRMD_RUNNING) { @@ -1231,29 +1229,32 @@ static int flow_alloc_res(int port_id) static int flow_dealloc(pid_t api, int port_id) { - pid_t n_1_api; + pid_t n_1_api = -1; int ret = 0; struct irm_flow * f = NULL; pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - bmp_release(irmd->port_ids, port_id); f = get_irm_flow(port_id); if (f == NULL) { + bmp_release(irmd->port_ids, port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; } - n_1_api = f->n_1_api; + if (api == f->n_api) { + bmp_release(irmd->port_ids, port_id); + n_1_api = f->n_1_api; + } list_del(&f->next); pthread_rwlock_unlock(&irmd->flows_lock); - if (api != n_1_api) + if (n_1_api != -1) ret = ipcp_flow_dealloc(n_1_api, port_id); pthread_rwlock_unlock(&irmd->state_lock); @@ -1772,8 +1773,7 @@ void * mainloop() break; case IRM_MSG_CODE__IRM_BOOTSTRAP_IPCP: ret_msg.has_result = true; - ret_msg.result = bootstrap_ipcp(msg->api, - msg->conf); + ret_msg.result = bootstrap_ipcp(msg->api, msg->conf); break; case IRM_MSG_CODE__IRM_ENROLL_IPCP: ret_msg.has_result = true; @@ -1790,27 +1790,22 @@ void * mainloop() break; case IRM_MSG_CODE__IRM_UNBIND_AP: ret_msg.has_result = true; - ret_msg.result = unbind_ap(msg->ap_name, - msg->dst_name); + ret_msg.result = unbind_ap(msg->ap_name, msg->dst_name); break; case IRM_MSG_CODE__IRM_API_ANNOUNCE: ret_msg.has_result = true; - ret_msg.result = api_announce(msg->api, - msg->ap_name); + ret_msg.result = api_announce(msg->api, msg->ap_name); break; case IRM_MSG_CODE__IRM_BIND_API: ret_msg.has_result = true; - ret_msg.result = bind_api(msg->api, - msg->dst_name); + ret_msg.result = bind_api(msg->api, msg->dst_name); break; case IRM_MSG_CODE__IRM_UNBIND_API: ret_msg.has_result = true; - ret_msg.result = unbind_api(msg->api, - msg->dst_name); + ret_msg.result = unbind_api(msg->api, msg->dst_name); break; case IRM_MSG_CODE__IRM_LIST_IPCPS: - ret_msg.n_apis = list_ipcps(msg->dst_name, - &apis); + ret_msg.n_apis = list_ipcps(msg->dst_name, &apis); ret_msg.apis = apis; ret_msg.has_result = true; break; @@ -1827,15 +1822,12 @@ void * mainloop() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - e = flow_accept(msg->api, - &ret_msg.ae_name); - + e = flow_accept(msg->api, &ret_msg.ae_name); if (e == NULL) { ret_msg.has_result = true; ret_msg.result = -1; break; } - ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_api = true; @@ -1857,8 +1849,6 @@ void * mainloop() ret_msg.result = -1; break; } - - /* FIXME: badly timed dealloc may give SEGV */ ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_api = true; @@ -1869,9 +1859,10 @@ void * mainloop() ret_msg.result = flow_alloc_res(msg->port_id); break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->api, msg->port_id); - break; + flow_dealloc(msg->api, msg->port_id); + irm_msg__free_unpacked(msg, NULL); + close(cli_sockfd); + continue; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->api, msg->dst_name, diff --git a/src/lib/dev.c b/src/lib/dev.c index 8556d6e2..d36764ed 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -130,7 +130,7 @@ struct flow { pid_t api; - struct timespec timeout; + struct timespec * timeout; }; struct { @@ -220,8 +220,7 @@ int ap_init(char * ap_name) ai.flows[i].port_id = -1; ai.flows[i].oflags = 0; ai.flows[i].api = -1; - ai.flows[i].timeout.tv_sec = 0; - ai.flows[i].timeout.tv_nsec = 0; + ai.flows[i].timeout = NULL; } ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS); @@ -270,9 +269,12 @@ void ap_fini() pthread_rwlock_rdlock(&ai.flows_lock); - for (i = 0; i < AP_MAX_FLOWS; ++i) + for (i = 0; i < AP_MAX_FLOWS; ++i) { if (ai.flows[i].rb != NULL) shm_ap_rbuff_close(ai.flows[i].rb); + if (ai.flows[i].timeout != NULL) + free(ai.flows[i].timeout); + } for (i = 0; i < IRMD_MAX_FLOWS; ++i) { ai.ports[i].state = PORT_NULL; @@ -527,8 +529,6 @@ int flow_alloc_res(int fd) int flow_dealloc(int fd) { irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg = NULL; - int ret = -1; msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; msg.has_port_id = true; @@ -552,30 +552,20 @@ int flow_dealloc(int fd) shm_ap_rbuff_close(ai.flows[fd].rb); ai.flows[fd].rb = NULL; ai.flows[fd].api = -1; + if (ai.flows[fd].timeout != NULL) { + free(ai.flows[fd].timeout); + ai.flows[fd].timeout = NULL; + } bmp_release(ai.fds, fd); pthread_rwlock_unlock(&ai.flows_lock); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) { - pthread_rwlock_unlock(&ai.data_lock); - return -1; - } - - if (!recv_msg->has_result) { - pthread_rwlock_unlock(&ai.data_lock); - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } - - ret = recv_msg->result; + send_irm_msg(&msg); pthread_rwlock_unlock(&ai.data_lock); - irm_msg__free_unpacked(recv_msg, NULL); - - return ret; + return 0; } int flow_cntl(int fd, int cmd, int oflags) @@ -708,10 +698,10 @@ ssize_t flow_read(int fd, void * buf, size_t count) } else { struct shm_ap_rbuff * rb = ai.rb; int port_id = ai.flows[fd].port_id; - struct timespec timeout = ai.flows[fd].timeout; + struct timespec * timeout = ai.flows[fd].timeout; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout); + idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout); pthread_rwlock_rdlock(&ai.data_lock); } diff --git a/src/lib/sockets.c b/src/lib/sockets.c index 408e79e7..c8375c22 100644 --- a/src/lib/sockets.c +++ b/src/lib/sockets.c @@ -154,6 +154,39 @@ static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed) return recv_msg; } +void send_irm_msg(irm_msg_t * msg) +{ + int sockfd; + buffer_t buf; + + sockfd = client_socket_open(IRM_SOCK_PATH); + if (sockfd < 0) + return; + + buf.len = irm_msg__get_packed_size(msg); + if (buf.len == 0) { + close(sockfd); + return; + } + + buf.data = malloc(buf.len); + if (buf.data == NULL) { + close(sockfd); + return; + } + + pthread_cleanup_push(close_ptr, &sockfd); + pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); + + irm_msg__pack(msg, buf.data); + + if (write(sockfd, buf.data, buf.len) < 0) + return; + + pthread_cleanup_pop(true); + pthread_cleanup_pop(true); +} + irm_msg_t * send_recv_irm_msg(irm_msg_t * msg) { return send_recv_irm_msg_timed(msg, true); } -- cgit v1.2.3