diff options
author | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
commit | 5812dfb832e513dc455a0d48624bcad62334d457 (patch) | |
tree | 93a02e1b20f54bb869eadc856f201412c633315c /src/ipcpd | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
parent | 021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff) | |
download | ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip |
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/flow.c | 38 | ||||
-rw-r--r-- | src/ipcpd/flow.h | 12 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.c | 104 | ||||
-rw-r--r-- | src/ipcpd/ipcp-data.h | 16 | ||||
-rw-r--r-- | src/ipcpd/ipcp-ops.h | 9 | ||||
-rw-r--r-- | src/ipcpd/ipcp.c | 20 | ||||
-rw-r--r-- | src/ipcpd/ipcp.h | 3 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 524 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/tests/shim_udp_test.c | 12 |
9 files changed, 408 insertions, 330 deletions
diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c index c436733b..ae8f848c 100644 --- a/src/ipcpd/flow.c +++ b/src/ipcpd/flow.c @@ -27,7 +27,7 @@ #include <ouroboros/logs.h> -flow_t * flow_create(int32_t port_id) +flow_t * flow_create(uint32_t port_id) { flow_t * flow = malloc(sizeof *flow); if (flow == NULL) { @@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id) INIT_LIST_HEAD(&flow->list); flow->port_id = port_id; - flow->oflags = FLOW_O_DEFAULT; - flow->state = FLOW_NULL; + flow->state = FLOW_NULL; pthread_mutex_init(&flow->lock, NULL); @@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow) return; free(flow); } - -int flow_set_opts(flow_t * flow, uint16_t opts) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return -1; - } - - pthread_mutex_lock(&flow->lock); - - if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) { - flow->oflags = FLOW_O_DEFAULT; - pthread_mutex_unlock(&flow->lock); - LOG_WARN("Invalid flow options. Setting default."); - return -1; - } - - flow->oflags = opts; - - pthread_mutex_unlock(&flow->lock); - - return 0; -} - -uint16_t flow_get_opts(const flow_t * flow) -{ - if (flow == NULL) { - LOG_DBGF("Non-existing flow."); - return FLOW_O_INVALID; - } - - return flow->oflags; -} diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h index 000de5ad..0a3e90d1 100644 --- a/src/ipcpd/flow.h +++ b/src/ipcpd/flow.h @@ -25,6 +25,7 @@ #include <ouroboros/common.h> #include <ouroboros/list.h> +#include <ouroboros/shm_ap_rbuff.h> #include <pthread.h> /* same values as fcntl.h */ @@ -47,17 +48,14 @@ enum flow_state { typedef struct flow { struct list_head list; - int32_t port_id; - uint16_t oflags; - enum flow_state state; + uint32_t port_id; + struct shm_ap_rbuff * rb; + enum flow_state state; pthread_mutex_t lock; } flow_t; -flow_t * flow_create(int32_t port_id); +flow_t * flow_create(uint32_t port_id); void flow_destroy(flow_t * flow); -int flow_set_opts(flow_t * flow, uint16_t opts); -uint16_t flow_get_opts(const flow_t * flow); - #endif /* OUROBOROS_FLOW_H */ diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c index 72407a53..76fc4bcd 100644 --- a/src/ipcpd/ipcp-data.c +++ b/src/ipcpd/ipcp-data.c @@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create() if (data == NULL) return NULL; - data->iname = NULL; data->type = 0; - data->dum = NULL; return data; } struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type) { if (dst == NULL) return NULL; - dst->iname = instance_name_create(); - if (dst->iname == NULL) - return NULL; - - if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - dst->type = ipcp_type; - dst->dum = shm_du_map_open(); - if (dst->dum == NULL) { - instance_name_destroy(dst->iname); - return NULL; - } - /* init the lists */ INIT_LIST_HEAD(&dst->registry); - INIT_LIST_HEAD(&dst->flows); INIT_LIST_HEAD(&dst->directory); /* init the mutexes */ pthread_mutex_init(&dst->reg_lock, NULL); pthread_mutex_init(&dst->dir_lock, NULL); - pthread_mutex_init(&dst->flow_lock, NULL); return dst; } @@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data) dir_entry_destroy(list_entry(h, struct dir_entry, list)); } -static void clear_flows(struct ipcp_data * data) -{ - struct list_head * h; - struct list_head * t; - list_for_each_safe(h, t, &data->flows) - flow_destroy(list_entry(h, flow_t, list)); - -} - void ipcp_data_destroy(struct ipcp_data * data) { if (data == NULL) return; - /* FIXME: finish all pending operations here */ - - if (data->iname != NULL) - instance_name_destroy(data->iname); - data->iname = NULL; - - if (data->dum != NULL) - shm_du_map_close(data->dum); - data->dum = NULL; + /* FIXME: finish all pending operations here and cancel all threads */ pthread_mutex_lock(&data->reg_lock); pthread_mutex_lock(&data->dir_lock); - pthread_mutex_lock(&data->flow_lock); /* clear the lists */ clear_registry(data); clear_directory(data); - clear_flows(data); /* * no need to unlock, just free the entire thing - * pthread_mutex_unlock(&data->flow_lock); * pthread_mutex_unlock(&data->dir_lock); * pthread_mutex_unlock(&data->reg_lock); */ @@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data, return addr; } - -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id) -{ - struct list_head * h; - list_for_each(h, &data->flows) { - flow_t * f = list_entry(h, flow_t, list); - if (f->port_id == port_id) - return f; - } - - return NULL; -} - -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id) -{ - return ipcp_data_find_flow(data, port_id) != NULL; -} - -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow) -{ - if (data == NULL || flow == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - if (ipcp_data_has_flow(data, flow->port_id)) { - pthread_mutex_unlock(&data->flow_lock); - return -2; - } - - list_add(&flow->list,&data->flows); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} - -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id) -{ - flow_t * f; - - if (data == NULL) - return -1; - - pthread_mutex_lock(&data->flow_lock); - - f = ipcp_data_find_flow(data, port_id); - if (f == NULL) - return -1; - - list_del(&f->list); - - free(f); - - pthread_mutex_unlock(&data->flow_lock); - - return 0; -} diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h index 1dea8c3c..2e86ba11 100644 --- a/src/ipcpd/ipcp-data.h +++ b/src/ipcpd/ipcp-data.h @@ -34,17 +34,11 @@ #include "flow.h" struct ipcp_data { - instance_name_t * iname; enum ipcp_type type; - struct shm_du_map * dum; - struct list_head registry; pthread_mutex_t reg_lock; - struct list_head flows; - pthread_mutex_t flow_lock; - struct list_head directory; pthread_mutex_t dir_lock; @@ -53,7 +47,6 @@ struct ipcp_data { struct ipcp_data * ipcp_data_create(); struct ipcp_data * ipcp_data_init(struct ipcp_data * dst, - const char * ipcp_name, enum ipcp_type ipcp_type); void ipcp_data_destroy(struct ipcp_data * data); @@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data, const char * ap_name); uint64_t ipcp_data_get_addr(struct ipcp_data * data, const char * ap_name); -bool ipcp_data_has_flow(struct ipcp_data * data, - uint32_t port_id); -flow_t * ipcp_data_find_flow(struct ipcp_data * data, - uint32_t port_id); -int ipcp_data_add_flow(struct ipcp_data * data, - flow_t * flow); -int ipcp_data_del_flow(struct ipcp_data * data, - uint32_t port_id); - #endif /* IPCPD_IPCP_DATA_H */ diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h index 2ccb2e59..91b6cac9 100644 --- a/src/ipcpd/ipcp-ops.h +++ b/src/ipcpd/ipcp-ops.h @@ -39,20 +39,15 @@ struct ipcp_ops { int (* ipcp_name_reg)(char * name); int (* ipcp_name_unreg)(char * name); int (* ipcp_flow_alloc)(uint32_t port_id, + pid_t n_pid, char * dst_ap_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos); int (* ipcp_flow_alloc_resp)(uint32_t port_id, + pid_t n_pid, int response); int (* ipcp_flow_dealloc)(uint32_t port_id); - - /* FIXME: let's see how this will work with the shm_du_map */ - int (* ipcp_du_write)(uint32_t port_id, - size_t map_index); - - int (* ipcp_du_read)(uint32_t port_id, - size_t map_index); }; #endif /* IPCPD_IPCP_OPS_H */ diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index d6f373cd..13632a80 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[]) return 0; } -int ipcp_main_loop(struct ipcp * _ipcp) +void * ipcp_main_loop(void * o) { int lsockfd; int sockfd; uint8_t buf[IPCP_MSG_BUF_SIZE]; + struct ipcp * _ipcp = (struct ipcp *) o; ipcp_msg_t * msg; ssize_t count; @@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp) if (_ipcp == NULL) { LOG_ERR("Invalid ipcp struct."); - return 1; + return (void *) 1; } sockfd = server_socket_open(ipcp_sock_path(getpid())); if (sockfd < 0) { LOG_ERR("Could not open server socket."); - return 1; + return (void *) 1; } while (true) { @@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) conf.max_pdu_size = conf_msg->max_pdu_size; } if (conf_msg->ipcp_type == IPCP_SHIM_UDP) { - conf.ip_addr = conf_msg->ip_addr; + conf.ip_addr = conf_msg->ip_addr; conf.dns_addr = conf_msg->dns_addr; } @@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp) } ret_msg.has_result = true; ret_msg.result = - _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len); + _ipcp->ops->ipcp_unreg(msg->dif_names, + msg->len); break; case IPCP_MSG_CODE__IPCP_NAME_REG: if (_ipcp->ops->ipcp_name_reg == NULL) { @@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp) LOG_ERR("Flow_alloc unsupported."); break; } - ret_msg.has_fd = true; - ret_msg.fd = + ret_msg.has_result = true; + ret_msg.result = _ipcp->ops->ipcp_flow_alloc(msg->port_id, + msg->pid, msg->dst_name, msg->src_ap_name, msg->src_ae_name, @@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp) ret_msg.has_result = true; ret_msg.result = _ipcp->ops->ipcp_flow_alloc_resp(msg->port_id, + msg->pid, msg->result); break; case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC: @@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp) close(lsockfd); } - return 0; + return NULL; } diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h index 9decac8b..393af994 100644 --- a/src/ipcpd/ipcp.h +++ b/src/ipcpd/ipcp.h @@ -43,7 +43,8 @@ struct ipcp { int irmd_fd; }; -int ipcp_main_loop(); +void * ipcp_main_loop(void * o); +void * ipcp_sdu_loop(void * o); int ipcp_arg_check(int argc, char * argv[]); #endif diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 460fe9e3..1f7bb12f 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -24,12 +24,13 @@ #include "ipcp.h" #include "flow.h" #include <ouroboros/shm_du_map.h> +#include <ouroboros/shm_ap_rbuff.h> #include <ouroboros/list.h> #include <ouroboros/utils.h> #include <ouroboros/ipcp.h> #include <ouroboros/dif_config.h> #include <ouroboros/sockets.h> -#include <ouroboros/dev.h> +#include <ouroboros/bitmap.h> #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */ struct ipcp * _ipcp; #endif +/* + * copied from ouroboros/dev. The shim needs access to the internals + * because it doesn't follow all steps necessary steps to get + * the info + */ + +#define UNKNOWN_AP "__UNKNOWN_AP__" +#define UNKNOWN_AE "__UNKNOWN_AE__" + +#define AP_MAX_FLOWS 256 + +#ifndef DU_BUFF_HEADSPACE + #define DU_BUFF_HEADSPACE 128 +#endif + +#ifndef DU_BUFF_TAILSPACE + #define DU_BUFF_TAILSPACE 0 +#endif + +/* the shim needs access to these internals */ +struct shim_ap_data { + instance_name_t * api; + struct shm_du_map * dum; + struct bmp * fds; + + struct shm_ap_rbuff * rb; + struct flow flows[AP_MAX_FLOWS]; + + pthread_t mainloop; + pthread_t sduloop; + pthread_t handler; + pthread_t sdu_reader[2]; + int ping_pong; +} * _ap_instance; + +int shim_ap_init(char * ap_name) +{ + _ap_instance = malloc(sizeof(struct shim_ap_data)); + if (_ap_instance == NULL) { + return -1; + } + + _ap_instance->api = instance_name_create(); + if (_ap_instance->api == NULL) { + free(_ap_instance); + return -1; + } + + if (instance_name_init_from(_ap_instance->api, + ap_name, + getpid()) == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0); + if (_ap_instance->fds == NULL) { + instance_name_destroy(_ap_instance->api); + free(_ap_instance); + return -1; + } + + _ap_instance->dum = shm_du_map_open(); + if (_ap_instance->dum == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + _ap_instance->rb = shm_ap_rbuff_create(); + if (_ap_instance->rb == NULL) { + instance_name_destroy(_ap_instance->api); + bmp_destroy(_ap_instance->fds); + free(_ap_instance); + return -1; + } + + return 0; +} + +void shim_ap_fini() +{ + int i = 0; + + if (_ap_instance == NULL) + return; + if (_ap_instance->api != NULL) + instance_name_destroy(_ap_instance->api); + if (_ap_instance->fds != NULL) + bmp_destroy(_ap_instance->fds); + if (_ap_instance->dum != NULL) + shm_du_map_close(_ap_instance->dum); + if (_ap_instance->rb != NULL) + shm_ap_rbuff_destroy(_ap_instance->rb); + for (i = 0; i < AP_MAX_FLOWS; i ++) + if (_ap_instance->flows[i].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[i].rb); + + free(_ap_instance); +} + +static int port_id_to_fd(uint32_t port_id) +{ + int i; + for (i = 0; i < AP_MAX_FLOWS; ++i) + if (_ap_instance->flows[i].port_id == port_id + && _ap_instance->flows[i].state != FLOW_NULL) + return i; + return -1; +} + +static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count) +{ + /* the AP chooses the amount of headspace and tailspace */ + size_t index = shm_create_du_buff(_ap_instance->dum, + count, + 0, + buf, + count); + struct rb_entry e = {index, _ap_instance->flows[fd].port_id}; + + if (index == -1) + return -1; + + if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) { + shm_release_du_buff(_ap_instance->dum, index); + return -EPIPE; + } + + return 0; +} + +/* + * end copy from dev.c + */ + struct ipcp_udp_data { /* keep ipcp_data first for polymorphism */ struct ipcp_data ipcp_data; @@ -79,39 +218,15 @@ struct ipcp_udp_data { int s_fd; fd_set flow_fd_s; - flow_t * fd_to_flow_ptr[FD_SETSIZE]; - pthread_mutex_t lock; + pthread_mutex_t lock; }; -struct udp_flow { - /* keep flow first for polymorphism */ - flow_t flow; - int fd; -}; - -void ipcp_sig_handler(int sig, siginfo_t * info, void * c) -{ - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGHUP: - LOG_DBG("Terminating by order of %d. Bye.", info->si_pid); - if (info->si_pid == irmd_pid) { - /* shm_du_map_close(_ipcp->data->dum); */ - exit(0); - } - default: - return; - } -} - -struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) +struct ipcp_udp_data * ipcp_udp_data_create() { struct ipcp_udp_data * udp_data; struct ipcp_data * data; enum ipcp_type ipcp_type; - int n; udp_data = malloc(sizeof *udp_data); if (udp_data == NULL) { @@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name) ipcp_type = THIS_TYPE; data = (struct ipcp_data *) udp_data; - if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) { + if (ipcp_data_init(data, ipcp_type) == NULL) { free(udp_data); return NULL; } FD_ZERO(&udp_data->flow_fd_s); - for (n = 0; n < FD_SETSIZE; ++n) - udp_data->fd_to_flow_ptr[n] = NULL; return udp_data; } +void ipcp_udp_data_destroy(struct ipcp_udp_data * data) +{ + if (data == NULL) + return; + + ipcp_data_destroy((struct ipcp_data *) data); +} + +void ipcp_udp_destroy(struct ipcp * ipcp) +{ + ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data); + shim_ap_fini(); + free(ipcp); +} + +void ipcp_sig_handler(int sig, siginfo_t * info, void * c) +{ + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGHUP: + if (info->si_pid == irmd_pid || info->si_pid == 0) { + LOG_DBG("Terminating by order of %d. Bye.", + info->si_pid); + pthread_cancel(_ap_instance->mainloop); + pthread_cancel(_ap_instance->handler); + pthread_cancel(_ap_instance->sdu_reader[0]); + pthread_cancel(_ap_instance->sdu_reader[1]); + pthread_cancel(_ap_instance->sduloop); + exit(0); + } + default: + return; + } +} + static void * ipcp_udp_listener() { char buf[SHIM_UDP_BUF_SIZE]; @@ -141,10 +290,10 @@ static void * ipcp_udp_listener() struct sockaddr_in f_saddr; struct sockaddr_in c_saddr; struct hostent * hostp; - struct udp_flow * flow; int sfd = shim_data(_ipcp)->s_fd; while (true) { + int fd; n = sizeof c_saddr; n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0, (struct sockaddr *) &c_saddr, (unsigned *) &n); @@ -157,16 +306,7 @@ static void * ipcp_udp_listener() if (hostp == NULL) continue; - /* create a new socket for the server */ - flow = malloc(sizeof *flow); - if (flow == NULL) - continue; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - continue; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); memset((char *) &f_saddr, 0, sizeof f_saddr); f_saddr.sin_family = AF_INET; @@ -185,36 +325,33 @@ static void * ipcp_udp_listener() * the flow structure */ - if (connect(flow->fd, + if (connect(fd, (struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) { - close(flow->fd); - free(flow); + close(fd); continue; } + /* echo back the packet */ + while(send(fd, buf, strlen(buf), 0) < 0) + ; + /* reply to IRM */ - flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf, - UNKNOWN_AP, ""); - if (flow->flow.port_id < 0) { + _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(), + buf, + UNKNOWN_AP, + UNKNOWN_AE); + if (_ap_instance->flows[fd].port_id < 0) { LOG_ERR("Could not get port id from IRMd"); - close(flow->fd); - free(flow); + close(fd); continue; } - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - close(flow->fd); - free(flow); - continue; - } + _ap_instance->flows[fd].rb = NULL; + _ap_instance->flows[fd].state = FLOW_PENDING; - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.", + _ap_instance->flows[fd].port_id, fd); } return 0; @@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader() struct sockaddr_in r_saddr; while (true) { - flow_t * flow; - if (select(FD_SETSIZE, &shim_data(_ipcp)->flow_fd_s, NULL, NULL, NULL) @@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader() (struct sockaddr *) &r_saddr, (unsigned *) &n); - flow = shim_data(_ipcp)->fd_to_flow_ptr[fd]; - if (flow->state == FLOW_PENDING) { - if (connect(fd, - (struct sockaddr *) &r_saddr, - sizeof r_saddr) - < 0) - continue; - flow->state = FLOW_ALLOCATED; - } - /* send the sdu to the correct port_id */ - LOG_MISSING; + ipcp_udp_flow_write(fd, buf, n); } } @@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf) { char ipstr[INET_ADDRSTRLEN]; char dnsstr[INET_ADDRSTRLEN]; - pthread_t handler; - pthread_t sdu_reader; int enable = 1; if (conf->type != THIS_TYPE) { @@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) dnsstr, INET_ADDRSTRLEN); else - strcpy(dnsstr, "not set.\n"); + strcpy(dnsstr, "not set"); shim_data(_ipcp)->ip_addr = conf->ip_addr; shim_data(_ipcp)->dns_addr = conf->dns_addr; @@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf) /* UDP listen server */ if ((shim_data(_ipcp)->s_fd = - socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { + socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { LOG_DBGF("Can't create socket."); return -1; } @@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf) return -1; } - pthread_create(&handler, NULL, ipcp_udp_listener, NULL); - pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL); + FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_create(&_ap_instance->handler, + NULL, + ipcp_udp_listener, + NULL); + pthread_create(&_ap_instance->sdu_reader[0], + NULL, + ipcp_udp_sdu_reader, + NULL); + + pthread_create(&_ap_instance->sdu_reader[1], + NULL, + ipcp_udp_sdu_reader, + NULL); + + _ap_instance->ping_pong = 0; _ipcp->state = IPCP_ENROLLED; - LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.", - _ipcp->data->iname->name, _ipcp->data->iname->id); + LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.", + getpid()); LOG_DBG("Bound to IP address %s.", ipstr); LOG_DBG("DNS server address is %s.", dnsstr); @@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name) } int ipcp_udp_flow_alloc(uint32_t port_id, + pid_t n_pid, char * dst_name, char * src_ap_name, char * src_ae_name, struct qos_spec * qos) { - struct udp_flow * flow = NULL; struct sockaddr_in l_saddr; struct sockaddr_in r_saddr; + struct sockaddr_in rf_saddr; + int fd; + int n; + + char * recv_buf = NULL; struct hostent * h; if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL) return -1; - LOG_DBG("Received flow allocation request from %s to %s.", - src_ap_name, dst_name); - if (strlen(dst_name) > 255 || strlen(src_ap_name) > 255 || strlen(src_ae_name) > 255) { @@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id, if (qos != NULL) LOG_DBGF("QoS requested. UDP/IP can't do that."); - flow = malloc(sizeof *flow); - if (flow == NULL) - return -1; - - flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (flow->fd == -1) { - free(flow); - return -1; - } + fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); /* this socket is for the flow */ memset((char *) &l_saddr, 0, sizeof l_saddr); @@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id, l_saddr.sin_addr.s_addr = local_ip; l_saddr.sin_port = 0; - if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { - char ipstr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, - &l_saddr.sin_addr.s_addr, - ipstr, - INET_ADDRSTRLEN); - close(flow->fd); - free(flow); + if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) { + close(fd); return -1; } h = gethostbyname(dst_name); if (h == NULL) { LOG_DBGF("Could not resolve %s.", dst_name); - close(flow->fd); - free(flow); + close(fd); return -1; } - memset((char *) &r_saddr, 0, sizeof r_saddr); r_saddr.sin_family = AF_INET; - r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]); + r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0])); r_saddr.sin_port = LISTEN_PORT; + /* at least try to get the packet on the wire */ - while (sendto(flow->fd, dst_name, strlen(dst_name), 0, + while (sendto(fd, dst_name, strlen(dst_name), 0, (struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) { } - flow->flow.port_id = port_id; - flow->flow.oflags = FLOW_O_DEFAULT; - flow->flow.state = FLOW_PENDING; - - /* add flow to the list */ + /* wait for the other shim IPCP to respond */ - pthread_mutex_lock(&_ipcp->data->flow_lock); + recv_buf = malloc(strlen(dst_name) + 1); + n = sizeof(rf_saddr); + n = recvfrom(fd, + recv_buf, + strlen(dst_name) + 1, + 0, + (struct sockaddr *) &rf_saddr, + (unsigned *) &n); - if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) { - LOG_DBGF("Could not add flow."); - pthread_mutex_unlock(&_ipcp->data->flow_lock); - close(flow->fd); - free(flow); + if (connect(fd, + (struct sockaddr *) &rf_saddr, + sizeof rf_saddr) + < 0) { + free(recv_buf); return -1; } - pthread_mutex_unlock(&_ipcp->data->flow_lock); + if (!strcmp(recv_buf, dst_name)) + LOG_WARN("Incorrect echo from server"); + + free(recv_buf); + + _ap_instance->flows[fd].port_id = port_id; + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + close(fd); + } /* tell IRMd that flow allocation "worked" */ - if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) { + if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) { LOG_ERR("Failed to notify IRMd about flow allocation reply"); - close(flow->fd); - ipcp_data_del_flow(_ipcp->data, flow->flow.port_id); + close(fd); + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); return -1; } - FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s); - shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); - return 0; + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd); + + return fd; } int ipcp_udp_flow_alloc_resp(uint32_t port_id, + pid_t n_pid, int response) { - struct udp_flow * flow = - (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id); - if (flow == NULL) { - return -1; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; } - if (response) { - ipcp_data_del_flow(_ipcp->data, port_id); + if (response) return 0; - } /* awaken pending flow */ - if (flow->flow.state != FLOW_PENDING) + if (_ap_instance->flows[fd].state != FLOW_PENDING) { + LOG_DBGF("Flow was not pending."); return -1; + } + + _ap_instance->flows[fd].state = FLOW_ALLOCATED; + _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid); + if (_ap_instance->flows[fd].rb == NULL) { + LOG_ERR("Could not open N + 1 ringbuffer."); + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + return 0; + } - flow->flow.state = FLOW_ALLOCATED; + FD_SET(fd, &shim_data(_ipcp)->flow_fd_s); + + pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]); + pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong], + NULL, + ipcp_udp_sdu_reader, + NULL); + _ap_instance->ping_pong = !_ap_instance->ping_pong; + + LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd); return 0; } int ipcp_udp_flow_dealloc(uint32_t port_id) { - return 0; -} + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } -int ipcp_udp_du_write(uint32_t port_id, - size_t map_index) -{ + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); return 0; } -int ipcp_udp_du_read(uint32_t port_id, - size_t map_index) +/* FIXME: may be crap, didn't think this one through */ +int ipcp_udp_flow_dealloc_arr(uint32_t port_id) { - return 0; + int fd = port_id_to_fd(port_id); + if (fd < 0) { + LOG_DBGF("Could not find flow with port_id %u.", port_id); + return 0; + } + + _ap_instance->flows[fd].state = FLOW_NULL; + _ap_instance->flows[fd].port_id = 0; + if (_ap_instance->flows[fd].rb != NULL) + shm_ap_rbuff_close(_ap_instance->flows[fd].rb); + + FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s); + + return ipcp_flow_dealloc(0, port_id); } struct ipcp * ipcp_udp_create(char * ap_name) @@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name) struct ipcp_udp_data * data; struct ipcp_ops * ops; + if (shim_ap_init(ap_name) < 0) + return NULL; + i = malloc(sizeof *i); if (i == NULL) return NULL; - data = ipcp_udp_data_create(ap_name); + data = ipcp_udp_data_create(); if (data == NULL) { free(i); return NULL; @@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name) ops->ipcp_flow_alloc = ipcp_udp_flow_alloc; ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp; ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc; - ops->ipcp_du_read = ipcp_udp_du_read; - ops->ipcp_du_write = ipcp_udp_du_write; i->data = (struct ipcp_data *) data; i->ops = ops; @@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name) #ifndef MAKE_CHECK +/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */ +/* FIXME: stop eating the CPU */ +void * ipcp_udp_sdu_loop(void * o) +{ + while (true) { + struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb); + int fd; + int len = 0; + char * buf; + if (e == NULL) + continue; + + len = shm_du_map_read_sdu((uint8_t **) &buf, + _ap_instance->dum, + e->index); + if (len == -1) + continue; + + fd = port_id_to_fd(e->port_id); + + if (fd == -1) + continue; + + if (len == 0) + continue; + + send(fd, buf, len, 0); + + shm_release_du_buff(_ap_instance->dum, e->index); + } + + return (void *) 1; +} + int main (int argc, char * argv[]) { /* argument 1: pid of irmd ? */ @@ -680,6 +900,7 @@ int main (int argc, char * argv[]) sigaction(SIGINT, &sig_act, NULL); sigaction(SIGTERM, &sig_act, NULL); sigaction(SIGHUP, &sig_act, NULL); + sigaction(SIGPIPE, &sig_act, NULL); _ipcp = ipcp_udp_create(argv[2]); if (_ipcp == NULL) { @@ -687,7 +908,18 @@ int main (int argc, char * argv[]) exit(1); } - ipcp_main_loop(_ipcp); + pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp); + pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL); + + pthread_join(_ap_instance->sduloop, NULL); + pthread_join(_ap_instance->mainloop, NULL); + pthread_join(_ap_instance->handler, NULL); + pthread_join(_ap_instance->sdu_reader[0], NULL); + pthread_join(_ap_instance->sdu_reader[1], NULL); + + ipcp_udp_destroy(_ipcp); + + shim_ap_fini(); exit(0); } diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c index 036f5877..e5e8b32d 100644 --- a/src/ipcpd/shim-udp/tests/shim_udp_test.c +++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c @@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv) _ipcp = ipcp_udp_create(ipcp_name); if (_ipcp == NULL) { LOG_ERR("Could not instantiate shim IPCP."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv) if (ipcp_udp_name_reg("bogus name")) { LOG_ERR("Failed to register application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } if (ipcp_udp_name_unreg("bogus name")) { LOG_ERR("Failed to unregister application."); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } @@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if (ipcp_udp_name_reg(bogus)) { LOG_ERR("Failed to register application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } @@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv) sprintf(bogus, "bogus name %4d", i); if(ipcp_udp_name_unreg(bogus)) { LOG_ERR("Failed to unregister application %s.", bogus); - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(1); } } - shm_du_map_close(dum); + shm_du_map_destroy(dum); exit(0); } |