diff options
author | Sander Vrijders <[email protected]> | 2016-10-04 15:23:54 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-10-04 15:23:54 +0200 |
commit | 1a7c0923206cfb98d43122621a585027c67040ea (patch) | |
tree | acd08f09f5a094e897020e97961b2847209df043 /src/irmd | |
parent | ecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff) | |
parent | c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff) | |
download | ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip |
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
Diffstat (limited to 'src/irmd')
-rw-r--r-- | src/irmd/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/irmd/ipcp.c | 402 | ||||
-rw-r--r-- | src/irmd/ipcp.h | 62 | ||||
-rw-r--r-- | src/irmd/irm_flow.c | 47 | ||||
-rw-r--r-- | src/irmd/irm_flow.h | 18 | ||||
-rw-r--r-- | src/irmd/main.c | 176 | ||||
-rw-r--r-- | src/irmd/utils.h | 3 |
7 files changed, 582 insertions, 127 deletions
diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt index 05919326..16b53414 100644 --- a/src/irmd/CMakeLists.txt +++ b/src/irmd/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCE_FILES # Add source files here api_table.c apn_table.c + ipcp.c irm_flow.c main.c registry.c diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c new file mode 100644 index 00000000..f79e6caf --- /dev/null +++ b/src/irmd/ipcp.c @@ -0,0 +1,402 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * The API to instruct IPCPs + * + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/config.h> +#include <ouroboros/errno.h> +#include <ouroboros/utils.h> +#include <ouroboros/sockets.h> + +#define OUROBOROS_PREFIX "irmd/ipcp" + +#include <ouroboros/logs.h> + +#include "ipcp.h" + +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <stdbool.h> +#include <pthread.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <sys/socket.h> +#include <sys/time.h> + +static void close_ptr(void * o) +{ + close(*(int *) o); +} + +ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg_t * msg) +{ + int sockfd = 0; + buffer_t buf; + char * sock_path = NULL; + ssize_t count = 0; + ipcp_msg_t * recv_msg = NULL; + + struct timeval tv = {(SOCKET_TIMEOUT / 1000), + (SOCKET_TIMEOUT % 1000) * 1000}; + + sock_path = ipcp_sock_path(api); + if (sock_path == NULL) + return NULL; + + sockfd = client_socket_open(sock_path); + if (sockfd < 0) { + free(sock_path); + return NULL; + } + + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, + (void *) &tv, sizeof(tv))) + LOG_WARN("Failed to set timeout on socket."); + + free(sock_path); + + buf.len = ipcp_msg__get_packed_size(msg); + if (buf.len == 0) { + close(sockfd); + return NULL; + } + + buf.data = malloc(IPCP_MSG_BUF_SIZE); + if (buf.data == NULL) { + close(sockfd); + return NULL; + } + + pthread_cleanup_push(close_ptr, (void *) &sockfd); + pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data); + + ipcp_msg__pack(msg, buf.data); + + if (write(sockfd, buf.data, buf.len) != -1) + count = read(sockfd, buf.data, IPCP_MSG_BUF_SIZE); + + if (count > 0) + recv_msg = ipcp_msg__unpack(NULL, count, buf.data); + + pthread_cleanup_pop(true); + pthread_cleanup_pop(true); + + return recv_msg; +} + +pid_t ipcp_create(enum ipcp_type ipcp_type) +{ + pid_t api = -1; + char irmd_api[10]; + size_t len = 0; + char * ipcp_dir = "/sbin/"; + char * full_name = NULL; + char * exec_name = NULL; + char * log_file = NULL; + + sprintf(irmd_api, "%u", getpid()); + + api = fork(); + if (api == -1) { + LOG_ERR("Failed to fork"); + return api; + } + + if (api != 0) { + return api; + } + + if (ipcp_type == IPCP_NORMAL) + exec_name = IPCP_NORMAL_EXEC; + else if (ipcp_type == IPCP_SHIM_UDP) + exec_name = IPCP_SHIM_UDP_EXEC; + else if (ipcp_type == IPCP_SHIM_ETH_LLC) + exec_name = IPCP_SHIM_ETH_LLC_EXEC; + else if (ipcp_type == IPCP_LOCAL) + exec_name = IPCP_LOCAL_EXEC; + else + exit(EXIT_FAILURE); + + len += strlen(INSTALL_PREFIX); + len += strlen(ipcp_dir); + len += strlen(exec_name); + len += 1; + + full_name = malloc(len + 1); + if (full_name == NULL) { + LOG_ERR("Failed to malloc"); + exit(EXIT_FAILURE); + } + + strcpy(full_name, INSTALL_PREFIX); + strcat(full_name, ipcp_dir); + strcat(full_name, exec_name); + full_name[len] = '\0'; + + if (logfile != NULL) { + log_file = malloc(20); + if (log_file == NULL) { + LOG_ERR("Failed to malloc."); + exit(EXIT_FAILURE); + } + sprintf(log_file, "ipcpd-%u.log", getpid()); + } + + /* log_file to be placed at the end */ + char * argv[] = {full_name, + irmd_api, + log_file, + 0}; + + char * envp[] = {0}; + + execve(argv[0], &argv[0], envp); + + LOG_DBG("%s", strerror(errno)); + LOG_ERR("Failed to load IPCP daemon"); + LOG_ERR("Make sure to run the installed version"); + free(full_name); + exit(EXIT_FAILURE); +} + +int ipcp_destroy(pid_t api) +{ + int status; + + if (kill(api, SIGTERM)) { + LOG_ERR("Failed to destroy IPCP"); + return -1; + } + + if (waitpid(api, &status, 0) < 0) { + LOG_ERR("Failed to destroy IPCP"); + return -1; + } + + return 0; +} + +int ipcp_bootstrap(pid_t api, + dif_config_msg_t * conf) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + if (conf == NULL) + return -EINVAL; + + msg.code = IPCP_MSG_CODE__IPCP_BOOTSTRAP; + msg.conf = conf; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_enroll(pid_t api, + char * dif_name) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + if (dif_name == NULL) + return -EINVAL; + + msg.code = IPCP_MSG_CODE__IPCP_ENROLL; + msg.dif_name = dif_name; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_name_reg(pid_t api, + char * name) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + if (name == NULL) + return -1; + + msg.code = IPCP_MSG_CODE__IPCP_NAME_REG; + msg.name = name; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_name_unreg(pid_t api, + char * name) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IPCP_MSG_CODE__IPCP_NAME_UNREG; + msg.name = name; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_alloc(pid_t api, + int port_id, + pid_t n_api, + char * dst_name, + char * src_ae_name, + enum qos_cube qos) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + if (dst_name == NULL || src_ae_name == NULL) + return -EINVAL; + + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_api = true; + msg.api = n_api; + msg.src_ae_name = src_ae_name; + msg.dst_name = dst_name; + msg.has_qos_cube = true; + msg.qos_cube = qos; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (!recv_msg->has_result) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_alloc_resp(pid_t api, + int port_id, + pid_t n_api, + int response) +{ + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; + msg.has_port_id = true; + msg.port_id = port_id; + msg.has_api = true; + msg.api = n_api; + msg.has_response = true; + msg.response = response; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return -1; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return -1; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} + +int ipcp_flow_dealloc(pid_t api, + int port_id) +{ + + ipcp_msg_t msg = IPCP_MSG__INIT; + ipcp_msg_t * recv_msg = NULL; + int ret = -1; + + msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; + msg.has_port_id = true; + msg.port_id = port_id; + + recv_msg = send_recv_ipcp_msg(api, &msg); + if (recv_msg == NULL) + return 0; + + if (recv_msg->has_result == false) { + ipcp_msg__free_unpacked(recv_msg, NULL); + return 0; + } + + ret = recv_msg->result; + ipcp_msg__free_unpacked(recv_msg, NULL); + + return ret; +} diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h new file mode 100644 index 00000000..930695fa --- /dev/null +++ b/src/irmd/ipcp.h @@ -0,0 +1,62 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * The API for the IRM to instruct IPCPs + * + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <ouroboros/irm_config.h> +#include <ouroboros/sockets.h> +#include <ouroboros/shared.h> + +#include <sys/types.h> + +#ifndef OUROBOROS_IPCP_H +#define OUROBOROS_IPCP_H + +/* Returns the process id */ +pid_t ipcp_create(enum ipcp_type ipcp_type); + +int ipcp_destroy(pid_t api); + +int ipcp_enroll(pid_t api, + char * dif_name); + +int ipcp_bootstrap(pid_t api, + dif_config_msg_t * conf); + +int ipcp_name_reg(pid_t api, + char * name); +int ipcp_name_unreg(pid_t api, + char * name); + +int ipcp_flow_alloc(pid_t api, + int port_id, + pid_t n_api, + char * dst_name, + char * src_ae_name, + enum qos_cube qos); +int ipcp_flow_alloc_resp(pid_t api, + int port_id, + pid_t n_api, + int response); + +int ipcp_flow_dealloc(pid_t api, + int port_id); + +#endif /* OUROBOROS_IPCP_H */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index d9fe3fb3..b99c6f97 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -58,6 +58,11 @@ void irm_flow_destroy(struct irm_flow * f) { pthread_mutex_lock(&f->state_lock); + if (f->state == FLOW_DESTROY) { + pthread_mutex_unlock(&f->state_lock); + return; + } + if (f->state == FLOW_PENDING) f->state = FLOW_DESTROY; else @@ -75,3 +80,45 @@ void irm_flow_destroy(struct irm_flow * f) free(f); } + +enum flow_state irm_flow_get_state(struct irm_flow * f) +{ + enum flow_state state; + + pthread_mutex_lock(&f->state_lock); + + state = f->state; + + pthread_mutex_unlock(&f->state_lock); + + return state; +} + +void irm_flow_set_state(struct irm_flow * f, enum flow_state state) +{ + pthread_mutex_lock(&f->state_lock); + + f->state = state; + pthread_cond_broadcast(&f->state_cond); + + pthread_mutex_unlock(&f->state_lock); +} + +enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state) +{ + pthread_mutex_lock(&f->state_lock); + + while (!(f->state == state || f->state == FLOW_DESTROY)) + pthread_cond_wait(&f->state_cond, &f->state_lock); + + if (state == FLOW_DESTROY) { + f->state = FLOW_NULL; + pthread_cond_broadcast(&f->state_cond); + } + + state = f->state; + + pthread_mutex_unlock(&f->state_lock); + + return state; +} diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index b7e5a1be..db6598bf 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -24,12 +24,18 @@ #define OUROBOROS_IRMD_IRM_FLOW_H #include <ouroboros/list.h> -#include <ouroboros/shared.h> #include <sys/types.h> #include <pthread.h> #include <time.h> +enum flow_state { + FLOW_NULL = 0, + FLOW_PENDING, + FLOW_ALLOCATED, + FLOW_DESTROY +}; + struct irm_flow { struct list_head next; @@ -46,6 +52,16 @@ struct irm_flow { }; struct irm_flow * irm_flow_create(); + void irm_flow_destroy(struct irm_flow * f); +enum flow_state irm_flow_get_state(struct irm_flow * f); + + +void irm_flow_set_state(struct irm_flow * f, + enum flow_state state); + +enum flow_state irm_flow_wait_state(struct irm_flow * f, + enum flow_state state); + #endif /* OUROBOROS_IRMD_IRM_FLOW_H */ diff --git a/src/irmd/main.c b/src/irmd/main.c index cc9160bf..523741ef 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -21,14 +21,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define OUROBOROS_PREFIX "irmd" - #include <ouroboros/config.h> #include <ouroboros/errno.h> -#include <ouroboros/logs.h> #include <ouroboros/sockets.h> -#include <ouroboros/ipcp.h> -#include <ouroboros/nsm.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/irm_config.h> @@ -36,14 +31,19 @@ #include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/bitmap.h> -#include <ouroboros/flow.h> #include <ouroboros/qos.h> #include <ouroboros/time_utils.h> +#define OUROBOROS_PREFIX "irmd" + +#include <ouroboros/logs.h> + + #include "utils.h" #include "registry.h" #include "irm_flow.h" #include "api_table.h" +#include "ipcp.h" #include <sys/socket.h> #include <sys/un.h> @@ -60,10 +60,12 @@ struct ipcp_entry { struct list_head next; + char * name; pid_t api; enum ipcp_type type; char * dif_name; + pthread_cond_t init_cond; pthread_mutex_t init_lock; bool init; @@ -100,7 +102,7 @@ struct irm { pthread_t irm_sanitize; pthread_t shm_sanitize; -} * irmd = NULL; +} * irmd; static struct irm_flow * get_irm_flow(int port_id) { @@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->port_id == port_id) return e; } @@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api) list_for_each(pos, &irmd->irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->n_api == n_api) return e; } @@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name) return NULL; } - LOG_INFO("New instance (%d) of %s added.", api, e->apn); - + LOG_DBG("New instance (%d) of %s added.", api, e->apn); LOG_DBG("This instance accepts flows for:"); list_for_each(p, &e->names) { struct str_el * s = list_entry(p, struct str_el, next); @@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api, struct api_entry * e = NULL; int ret = -1; - pid_t f_n_1_api; - pid_t f_n_api; + pid_t api_n1; + pid_t api_n; pthread_rwlock_rdlock(&irmd->state_lock); @@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api, return -1; } - f_n_api = f->n_api; - f_n_1_api = f->n_1_api; - - if (!response) { - f->state = FLOW_ALLOCATED; - pthread_cond_signal(&f->state_cond); - } + api_n = f->n_api; + api_n1 = f->n_1_api; pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - ret = ipcp_flow_alloc_resp(f_n_1_api, - port_id, - f_n_api, - response); + ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response); + + if (!(response || ret)) + irm_flow_set_state(f, FLOW_ALLOCATED); + return ret; } @@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api, { struct irm_flow * f; pid_t ipcp; + int port_id; /* FIXME: Map qos_spec to qos_cube */ @@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api, f->n_api = api; f->state = FLOW_PENDING; + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) LOG_WARN("Failed to set timestamp."); @@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - f->port_id = bmp_allocate(irmd->port_ids); + port_id = f->port_id = bmp_allocate(irmd->port_ids); f->n_1_api = ipcp; list_add(&f->next, &irmd->irm_flows); @@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api, pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - if (ipcp_flow_alloc(ipcp, - f->port_id, - f->n_api, - dst_name, - src_ae_name, - QOS_CUBE_BE) < 0) { + if (ipcp_flow_alloc(ipcp, port_id, api, + dst_name, src_ae_name, QOS_CUBE_BE) < 0) { pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->flows_lock); list_del(&f->next); bmp_release(irmd->port_ids, f->port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - free(f); + irm_flow_destroy(f); return NULL; } @@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id) f = get_irm_flow(port_id); if (f == NULL) { - LOG_ERR("Could not find port %d.", port_id); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_ERR("Could not find port %d.", port_id); return -1; } - if (f->state == FLOW_NULL) { - LOG_INFO("Port %d is deprecated.", port_id); + if (irm_flow_get_state(f) == FLOW_NULL) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); + LOG_INFO("Port %d is deprecated.", port_id); return -1; } - if (f->state == FLOW_ALLOCATED) { + if (irm_flow_get_state(f) == FLOW_ALLOCATED) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); return 0; @@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - pthread_mutex_lock(&f->state_lock); - - while (f->state == FLOW_PENDING) - pthread_cond_wait(&f->state_cond, &f->state_lock); - - pthread_mutex_unlock(&f->state_lock); - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_ALLOCATED) { - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); + if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) return 0; - } - - f->state = FLOW_NULL; - pthread_cond_broadcast(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); return -1; } -static int flow_dealloc(int port_id) +static int flow_dealloc(pid_t api, int port_id) { pid_t n_1_api; int ret = 0; @@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); - ret = ipcp_flow_dealloc(n_1_api, port_id); + if (api != n_1_api) + ret = ipcp_flow_dealloc(n_1_api, port_id); pthread_rwlock_unlock(&irmd->state_lock); @@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api, struct pid_el * c_api; pid_t h_api = -1; + LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", + api, dst_name, ae_name); + f = irm_flow_create(); if (f == NULL) { LOG_ERR("Failed to create irm_flow."); @@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api, return f; } -static int flow_alloc_reply(int port_id, - int response) +static int flow_alloc_reply(int port_id, int response) { struct irm_flow * f; @@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id, return -1; } - pthread_mutex_lock(&f->state_lock); - if (!response) - f->state = FLOW_ALLOCATED; - + irm_flow_set_state(f, FLOW_ALLOCATED); else - f->state = FLOW_NULL; - - if (pthread_cond_signal(&f->state_cond)) - LOG_ERR("Failed to send signal."); - - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id, return 0; } -static int flow_dealloc_ipcp(int port_id) -{ - struct irm_flow * f = NULL; - - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - - f = get_irm_flow(port_id); - if (f == NULL) { - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - return 0; - } - - list_del(&f->next); - - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - - irm_flow_destroy(f); - - return 0; -} - static void irm_destroy() { struct list_head * p; @@ -1729,46 +1671,35 @@ void * irm_sanitize() struct irm_flow * f = list_entry(p, struct irm_flow, next); - pthread_mutex_lock(&f->state_lock); - - if (f->state == FLOW_PENDING && - ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { + if (irm_flow_get_state(f) == FLOW_PENDING + && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { LOG_INFO("Pending port_id %d timed out.", f->port_id); - f->state = FLOW_NULL; - pthread_cond_signal(&f->state_cond); - pthread_mutex_unlock(&f->state_lock); + irm_flow_set_state(f, FLOW_NULL); continue; } - pthread_mutex_unlock(&f->state_lock); - if (kill(f->n_api, 0) < 0) { - struct shm_ap_rbuff * n_rb = - shm_ap_rbuff_open_s(f->n_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_api); bmp_release(irmd->port_ids, f->port_id); - list_del(&f->next); LOG_INFO("AP-I %d gone, flow %d deallocated.", f->n_api, f->port_id); ipcp_flow_dealloc(f->n_1_api, f->port_id); - if (n_rb != NULL) - shm_ap_rbuff_destroy(n_rb); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); continue; } if (kill(f->n_1_api, 0) < 0) { - struct shm_ap_rbuff * n_1_rb_s = - shm_ap_rbuff_open_s(f->n_1_api); - struct shm_ap_rbuff * n_1_rb_n = - shm_ap_rbuff_open_n(f->n_1_api); + struct shm_ap_rbuff * rb = + shm_ap_rbuff_open(f->n_1_api); list_del(&f->next); LOG_ERR("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); - if (n_1_rb_n != NULL) - shm_ap_rbuff_destroy(n_1_rb_n); - if (n_1_rb_s != NULL) - shm_ap_rbuff_destroy(n_1_rb_s); + if (rb != NULL) + shm_ap_rbuff_destroy(rb); irm_flow_destroy(f); } } @@ -1939,7 +1870,7 @@ void * mainloop() break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->port_id); + ret_msg.result = flow_dealloc(msg->api, msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->api, @@ -1950,7 +1881,6 @@ void * mainloop() ret_msg.result = -1; break; } - /* FIXME: badly timed dealloc may give SEGV */ ret_msg.has_port_id = true; ret_msg.port_id = e->port_id; ret_msg.has_api = true; @@ -1961,10 +1891,6 @@ void * mainloop() ret_msg.result = flow_alloc_reply(msg->port_id, msg->response); break; - case IRM_MSG_CODE__IPCP_FLOW_DEALLOC: - ret_msg.has_result = true; - ret_msg.result = flow_dealloc_ipcp(msg->port_id); - break; default: LOG_ERR("Don't know that message code."); break; diff --git a/src/irmd/utils.h b/src/irmd/utils.h index 37c745af..2fbc8ef2 100644 --- a/src/irmd/utils.h +++ b/src/irmd/utils.h @@ -40,7 +40,8 @@ struct pid_el { pid_t pid; }; -int wildcard_match(const char * pattern, const char * string); +int wildcard_match(const char * pattern, + const char * string); /* functions for copying and destroying arguments list */ char ** argvdup(char ** argv); |