summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
committerSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
commit5812dfb832e513dc455a0d48624bcad62334d457 (patch)
tree93a02e1b20f54bb869eadc856f201412c633315c /src/ipcpd
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
parent021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff)
downloadouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz
ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/flow.c38
-rw-r--r--src/ipcpd/flow.h12
-rw-r--r--src/ipcpd/ipcp-data.c104
-rw-r--r--src/ipcpd/ipcp-data.h16
-rw-r--r--src/ipcpd/ipcp-ops.h9
-rw-r--r--src/ipcpd/ipcp.c20
-rw-r--r--src/ipcpd/ipcp.h3
-rw-r--r--src/ipcpd/shim-udp/main.c524
-rw-r--r--src/ipcpd/shim-udp/tests/shim_udp_test.c12
9 files changed, 408 insertions, 330 deletions
diff --git a/src/ipcpd/flow.c b/src/ipcpd/flow.c
index c436733b..ae8f848c 100644
--- a/src/ipcpd/flow.c
+++ b/src/ipcpd/flow.c
@@ -27,7 +27,7 @@
#include <ouroboros/logs.h>
-flow_t * flow_create(int32_t port_id)
+flow_t * flow_create(uint32_t port_id)
{
flow_t * flow = malloc(sizeof *flow);
if (flow == NULL) {
@@ -38,8 +38,7 @@ flow_t * flow_create(int32_t port_id)
INIT_LIST_HEAD(&flow->list);
flow->port_id = port_id;
- flow->oflags = FLOW_O_DEFAULT;
- flow->state = FLOW_NULL;
+ flow->state = FLOW_NULL;
pthread_mutex_init(&flow->lock, NULL);
@@ -52,36 +51,3 @@ void flow_destroy(flow_t * flow)
return;
free(flow);
}
-
-int flow_set_opts(flow_t * flow, uint16_t opts)
-{
- if (flow == NULL) {
- LOG_DBGF("Non-existing flow.");
- return -1;
- }
-
- pthread_mutex_lock(&flow->lock);
-
- if ((opts & FLOW_O_ACCMODE) == FLOW_O_ACCMODE) {
- flow->oflags = FLOW_O_DEFAULT;
- pthread_mutex_unlock(&flow->lock);
- LOG_WARN("Invalid flow options. Setting default.");
- return -1;
- }
-
- flow->oflags = opts;
-
- pthread_mutex_unlock(&flow->lock);
-
- return 0;
-}
-
-uint16_t flow_get_opts(const flow_t * flow)
-{
- if (flow == NULL) {
- LOG_DBGF("Non-existing flow.");
- return FLOW_O_INVALID;
- }
-
- return flow->oflags;
-}
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h
index 000de5ad..0a3e90d1 100644
--- a/src/ipcpd/flow.h
+++ b/src/ipcpd/flow.h
@@ -25,6 +25,7 @@
#include <ouroboros/common.h>
#include <ouroboros/list.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <pthread.h>
/* same values as fcntl.h */
@@ -47,17 +48,14 @@ enum flow_state {
typedef struct flow {
struct list_head list;
- int32_t port_id;
- uint16_t oflags;
- enum flow_state state;
+ uint32_t port_id;
+ struct shm_ap_rbuff * rb;
+ enum flow_state state;
pthread_mutex_t lock;
} flow_t;
-flow_t * flow_create(int32_t port_id);
+flow_t * flow_create(uint32_t port_id);
void flow_destroy(flow_t * flow);
-int flow_set_opts(flow_t * flow, uint16_t opts);
-uint16_t flow_get_opts(const flow_t * flow);
-
#endif /* OUROBOROS_FLOW_H */
diff --git a/src/ipcpd/ipcp-data.c b/src/ipcpd/ipcp-data.c
index 72407a53..76fc4bcd 100644
--- a/src/ipcpd/ipcp-data.c
+++ b/src/ipcpd/ipcp-data.c
@@ -96,46 +96,26 @@ struct ipcp_data * ipcp_data_create()
if (data == NULL)
return NULL;
- data->iname = NULL;
data->type = 0;
- data->dum = NULL;
return data;
}
struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,
- const char * ipcp_name,
enum ipcp_type ipcp_type)
{
if (dst == NULL)
return NULL;
- dst->iname = instance_name_create();
- if (dst->iname == NULL)
- return NULL;
-
- if(instance_name_init_from(dst->iname, ipcp_name, getpid()) == NULL) {
- instance_name_destroy(dst->iname);
- return NULL;
- }
-
dst->type = ipcp_type;
- dst->dum = shm_du_map_open();
- if (dst->dum == NULL) {
- instance_name_destroy(dst->iname);
- return NULL;
- }
-
/* init the lists */
INIT_LIST_HEAD(&dst->registry);
- INIT_LIST_HEAD(&dst->flows);
INIT_LIST_HEAD(&dst->directory);
/* init the mutexes */
pthread_mutex_init(&dst->reg_lock, NULL);
pthread_mutex_init(&dst->dir_lock, NULL);
- pthread_mutex_init(&dst->flow_lock, NULL);
return dst;
}
@@ -156,42 +136,22 @@ static void clear_directory(struct ipcp_data * data)
dir_entry_destroy(list_entry(h, struct dir_entry, list));
}
-static void clear_flows(struct ipcp_data * data)
-{
- struct list_head * h;
- struct list_head * t;
- list_for_each_safe(h, t, &data->flows)
- flow_destroy(list_entry(h, flow_t, list));
-
-}
-
void ipcp_data_destroy(struct ipcp_data * data)
{
if (data == NULL)
return;
- /* FIXME: finish all pending operations here */
-
- if (data->iname != NULL)
- instance_name_destroy(data->iname);
- data->iname = NULL;
-
- if (data->dum != NULL)
- shm_du_map_close(data->dum);
- data->dum = NULL;
+ /* FIXME: finish all pending operations here and cancel all threads */
pthread_mutex_lock(&data->reg_lock);
pthread_mutex_lock(&data->dir_lock);
- pthread_mutex_lock(&data->flow_lock);
/* clear the lists */
clear_registry(data);
clear_directory(data);
- clear_flows(data);
/*
* no need to unlock, just free the entire thing
- * pthread_mutex_unlock(&data->flow_lock);
* pthread_mutex_unlock(&data->dir_lock);
* pthread_mutex_unlock(&data->reg_lock);
*/
@@ -380,65 +340,3 @@ uint64_t ipcp_data_get_addr(struct ipcp_data * data,
return addr;
}
-
-flow_t * ipcp_data_find_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- struct list_head * h;
- list_for_each(h, &data->flows) {
- flow_t * f = list_entry(h, flow_t, list);
- if (f->port_id == port_id)
- return f;
- }
-
- return NULL;
-}
-
-bool ipcp_data_has_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- return ipcp_data_find_flow(data, port_id) != NULL;
-}
-
-int ipcp_data_add_flow(struct ipcp_data * data,
- flow_t * flow)
-{
- if (data == NULL || flow == NULL)
- return -1;
-
- pthread_mutex_lock(&data->flow_lock);
-
- if (ipcp_data_has_flow(data, flow->port_id)) {
- pthread_mutex_unlock(&data->flow_lock);
- return -2;
- }
-
- list_add(&flow->list,&data->flows);
-
- pthread_mutex_unlock(&data->flow_lock);
-
- return 0;
-}
-
-int ipcp_data_del_flow(struct ipcp_data * data,
- uint32_t port_id)
-{
- flow_t * f;
-
- if (data == NULL)
- return -1;
-
- pthread_mutex_lock(&data->flow_lock);
-
- f = ipcp_data_find_flow(data, port_id);
- if (f == NULL)
- return -1;
-
- list_del(&f->list);
-
- free(f);
-
- pthread_mutex_unlock(&data->flow_lock);
-
- return 0;
-}
diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h
index 1dea8c3c..2e86ba11 100644
--- a/src/ipcpd/ipcp-data.h
+++ b/src/ipcpd/ipcp-data.h
@@ -34,17 +34,11 @@
#include "flow.h"
struct ipcp_data {
- instance_name_t * iname;
enum ipcp_type type;
- struct shm_du_map * dum;
-
struct list_head registry;
pthread_mutex_t reg_lock;
- struct list_head flows;
- pthread_mutex_t flow_lock;
-
struct list_head directory;
pthread_mutex_t dir_lock;
@@ -53,7 +47,6 @@ struct ipcp_data {
struct ipcp_data * ipcp_data_create();
struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,
- const char * ipcp_name,
enum ipcp_type ipcp_type);
void ipcp_data_destroy(struct ipcp_data * data);
@@ -73,13 +66,4 @@ bool ipcp_data_is_in_directory(struct ipcp_data * data,
const char * ap_name);
uint64_t ipcp_data_get_addr(struct ipcp_data * data,
const char * ap_name);
-bool ipcp_data_has_flow(struct ipcp_data * data,
- uint32_t port_id);
-flow_t * ipcp_data_find_flow(struct ipcp_data * data,
- uint32_t port_id);
-int ipcp_data_add_flow(struct ipcp_data * data,
- flow_t * flow);
-int ipcp_data_del_flow(struct ipcp_data * data,
- uint32_t port_id);
-
#endif /* IPCPD_IPCP_DATA_H */
diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h
index 2ccb2e59..91b6cac9 100644
--- a/src/ipcpd/ipcp-ops.h
+++ b/src/ipcpd/ipcp-ops.h
@@ -39,20 +39,15 @@ struct ipcp_ops {
int (* ipcp_name_reg)(char * name);
int (* ipcp_name_unreg)(char * name);
int (* ipcp_flow_alloc)(uint32_t port_id,
+ pid_t n_pid,
char * dst_ap_name,
char * src_ap_name,
char * src_ae_name,
struct qos_spec * qos);
int (* ipcp_flow_alloc_resp)(uint32_t port_id,
+ pid_t n_pid,
int response);
int (* ipcp_flow_dealloc)(uint32_t port_id);
-
- /* FIXME: let's see how this will work with the shm_du_map */
- int (* ipcp_du_write)(uint32_t port_id,
- size_t map_index);
-
- int (* ipcp_du_read)(uint32_t port_id,
- size_t map_index);
};
#endif /* IPCPD_IPCP_OPS_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index d6f373cd..13632a80 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -45,11 +45,12 @@ int ipcp_arg_check(int argc, char * argv[])
return 0;
}
-int ipcp_main_loop(struct ipcp * _ipcp)
+void * ipcp_main_loop(void * o)
{
int lsockfd;
int sockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
+ struct ipcp * _ipcp = (struct ipcp *) o;
ipcp_msg_t * msg;
ssize_t count;
@@ -61,13 +62,13 @@ int ipcp_main_loop(struct ipcp * _ipcp)
if (_ipcp == NULL) {
LOG_ERR("Invalid ipcp struct.");
- return 1;
+ return (void *) 1;
}
sockfd = server_socket_open(ipcp_sock_path(getpid()));
if (sockfd < 0) {
LOG_ERR("Could not open server socket.");
- return 1;
+ return (void *) 1;
}
while (true) {
@@ -113,7 +114,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)
conf.max_pdu_size = conf_msg->max_pdu_size;
}
if (conf_msg->ipcp_type == IPCP_SHIM_UDP) {
- conf.ip_addr = conf_msg->ip_addr;
+ conf.ip_addr = conf_msg->ip_addr;
conf.dns_addr = conf_msg->dns_addr;
}
@@ -149,7 +150,8 @@ int ipcp_main_loop(struct ipcp * _ipcp)
}
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_unreg(msg->dif_names, msg->len);
+ _ipcp->ops->ipcp_unreg(msg->dif_names,
+ msg->len);
break;
case IPCP_MSG_CODE__IPCP_NAME_REG:
if (_ipcp->ops->ipcp_name_reg == NULL) {
@@ -172,9 +174,10 @@ int ipcp_main_loop(struct ipcp * _ipcp)
LOG_ERR("Flow_alloc unsupported.");
break;
}
- ret_msg.has_fd = true;
- ret_msg.fd =
+ ret_msg.has_result = true;
+ ret_msg.result =
_ipcp->ops->ipcp_flow_alloc(msg->port_id,
+ msg->pid,
msg->dst_name,
msg->src_ap_name,
msg->src_ae_name,
@@ -188,6 +191,7 @@ int ipcp_main_loop(struct ipcp * _ipcp)
ret_msg.has_result = true;
ret_msg.result =
_ipcp->ops->ipcp_flow_alloc_resp(msg->port_id,
+ msg->pid,
msg->result);
break;
case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
@@ -231,5 +235,5 @@ int ipcp_main_loop(struct ipcp * _ipcp)
close(lsockfd);
}
- return 0;
+ return NULL;
}
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 9decac8b..393af994 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -43,7 +43,8 @@ struct ipcp {
int irmd_fd;
};
-int ipcp_main_loop();
+void * ipcp_main_loop(void * o);
+void * ipcp_sdu_loop(void * o);
int ipcp_arg_check(int argc, char * argv[]);
#endif
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index 460fe9e3..1f7bb12f 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -24,12 +24,13 @@
#include "ipcp.h"
#include "flow.h"
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/ipcp.h>
#include <ouroboros/dif_config.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/dev.h>
+#include <ouroboros/bitmap.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -67,6 +68,144 @@ extern struct ipcp * _ipcp; /* defined in test */
struct ipcp * _ipcp;
#endif
+/*
+ * copied from ouroboros/dev. The shim needs access to the internals
+ * because it doesn't follow all steps necessary steps to get
+ * the info
+ */
+
+#define UNKNOWN_AP "__UNKNOWN_AP__"
+#define UNKNOWN_AE "__UNKNOWN_AE__"
+
+#define AP_MAX_FLOWS 256
+
+#ifndef DU_BUFF_HEADSPACE
+ #define DU_BUFF_HEADSPACE 128
+#endif
+
+#ifndef DU_BUFF_TAILSPACE
+ #define DU_BUFF_TAILSPACE 0
+#endif
+
+/* the shim needs access to these internals */
+struct shim_ap_data {
+ instance_name_t * api;
+ struct shm_du_map * dum;
+ struct bmp * fds;
+
+ struct shm_ap_rbuff * rb;
+ struct flow flows[AP_MAX_FLOWS];
+
+ pthread_t mainloop;
+ pthread_t sduloop;
+ pthread_t handler;
+ pthread_t sdu_reader[2];
+ int ping_pong;
+} * _ap_instance;
+
+int shim_ap_init(char * ap_name)
+{
+ _ap_instance = malloc(sizeof(struct shim_ap_data));
+ if (_ap_instance == NULL) {
+ return -1;
+ }
+
+ _ap_instance->api = instance_name_create();
+ if (_ap_instance->api == NULL) {
+ free(_ap_instance);
+ return -1;
+ }
+
+ if (instance_name_init_from(_ap_instance->api,
+ ap_name,
+ getpid()) == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (_ap_instance->fds == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->dum = shm_du_map_open();
+ if (_ap_instance->dum == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ _ap_instance->rb = shm_ap_rbuff_create();
+ if (_ap_instance->rb == NULL) {
+ instance_name_destroy(_ap_instance->api);
+ bmp_destroy(_ap_instance->fds);
+ free(_ap_instance);
+ return -1;
+ }
+
+ return 0;
+}
+
+void shim_ap_fini()
+{
+ int i = 0;
+
+ if (_ap_instance == NULL)
+ return;
+ if (_ap_instance->api != NULL)
+ instance_name_destroy(_ap_instance->api);
+ if (_ap_instance->fds != NULL)
+ bmp_destroy(_ap_instance->fds);
+ if (_ap_instance->dum != NULL)
+ shm_du_map_close(_ap_instance->dum);
+ if (_ap_instance->rb != NULL)
+ shm_ap_rbuff_destroy(_ap_instance->rb);
+ for (i = 0; i < AP_MAX_FLOWS; i ++)
+ if (_ap_instance->flows[i].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+
+ free(_ap_instance);
+}
+
+static int port_id_to_fd(uint32_t port_id)
+{
+ int i;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (_ap_instance->flows[i].port_id == port_id
+ && _ap_instance->flows[i].state != FLOW_NULL)
+ return i;
+ return -1;
+}
+
+static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
+{
+ /* the AP chooses the amount of headspace and tailspace */
+ size_t index = shm_create_du_buff(_ap_instance->dum,
+ count,
+ 0,
+ buf,
+ count);
+ struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
+
+ if (index == -1)
+ return -1;
+
+ if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
+ shm_release_du_buff(_ap_instance->dum, index);
+ return -EPIPE;
+ }
+
+ return 0;
+}
+
+/*
+ * end copy from dev.c
+ */
+
struct ipcp_udp_data {
/* keep ipcp_data first for polymorphism */
struct ipcp_data ipcp_data;
@@ -79,39 +218,15 @@ struct ipcp_udp_data {
int s_fd;
fd_set flow_fd_s;
- flow_t * fd_to_flow_ptr[FD_SETSIZE];
- pthread_mutex_t lock;
+ pthread_mutex_t lock;
};
-struct udp_flow {
- /* keep flow first for polymorphism */
- flow_t flow;
- int fd;
-};
-
-void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
-{
- switch(sig) {
- case SIGINT:
- case SIGTERM:
- case SIGHUP:
- LOG_DBG("Terminating by order of %d. Bye.", info->si_pid);
- if (info->si_pid == irmd_pid) {
- /* shm_du_map_close(_ipcp->data->dum); */
- exit(0);
- }
- default:
- return;
- }
-}
-
-struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)
+struct ipcp_udp_data * ipcp_udp_data_create()
{
struct ipcp_udp_data * udp_data;
struct ipcp_data * data;
enum ipcp_type ipcp_type;
- int n;
udp_data = malloc(sizeof *udp_data);
if (udp_data == NULL) {
@@ -121,18 +236,52 @@ struct ipcp_udp_data * ipcp_udp_data_create(char * ap_name)
ipcp_type = THIS_TYPE;
data = (struct ipcp_data *) udp_data;
- if (ipcp_data_init(data, ap_name, ipcp_type) == NULL) {
+ if (ipcp_data_init(data, ipcp_type) == NULL) {
free(udp_data);
return NULL;
}
FD_ZERO(&udp_data->flow_fd_s);
- for (n = 0; n < FD_SETSIZE; ++n)
- udp_data->fd_to_flow_ptr[n] = NULL;
return udp_data;
}
+void ipcp_udp_data_destroy(struct ipcp_udp_data * data)
+{
+ if (data == NULL)
+ return;
+
+ ipcp_data_destroy((struct ipcp_data *) data);
+}
+
+void ipcp_udp_destroy(struct ipcp * ipcp)
+{
+ ipcp_udp_data_destroy((struct ipcp_udp_data *) ipcp->data);
+ shim_ap_fini();
+ free(ipcp);
+}
+
+void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
+{
+ switch(sig) {
+ case SIGINT:
+ case SIGTERM:
+ case SIGHUP:
+ if (info->si_pid == irmd_pid || info->si_pid == 0) {
+ LOG_DBG("Terminating by order of %d. Bye.",
+ info->si_pid);
+ pthread_cancel(_ap_instance->mainloop);
+ pthread_cancel(_ap_instance->handler);
+ pthread_cancel(_ap_instance->sdu_reader[0]);
+ pthread_cancel(_ap_instance->sdu_reader[1]);
+ pthread_cancel(_ap_instance->sduloop);
+ exit(0);
+ }
+ default:
+ return;
+ }
+}
+
static void * ipcp_udp_listener()
{
char buf[SHIM_UDP_BUF_SIZE];
@@ -141,10 +290,10 @@ static void * ipcp_udp_listener()
struct sockaddr_in f_saddr;
struct sockaddr_in c_saddr;
struct hostent * hostp;
- struct udp_flow * flow;
int sfd = shim_data(_ipcp)->s_fd;
while (true) {
+ int fd;
n = sizeof c_saddr;
n = recvfrom(sfd, buf, SHIM_UDP_BUF_SIZE, 0,
(struct sockaddr *) &c_saddr, (unsigned *) &n);
@@ -157,16 +306,7 @@ static void * ipcp_udp_listener()
if (hostp == NULL)
continue;
- /* create a new socket for the server */
- flow = malloc(sizeof *flow);
- if (flow == NULL)
- continue;
-
- flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (flow->fd == -1) {
- free(flow);
- continue;
- }
+ fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
memset((char *) &f_saddr, 0, sizeof f_saddr);
f_saddr.sin_family = AF_INET;
@@ -185,36 +325,33 @@ static void * ipcp_udp_listener()
* the flow structure
*/
- if (connect(flow->fd,
+ if (connect(fd,
(struct sockaddr *) &c_saddr, sizeof c_saddr) < 0) {
- close(flow->fd);
- free(flow);
+ close(fd);
continue;
}
+ /* echo back the packet */
+ while(send(fd, buf, strlen(buf), 0) < 0)
+ ;
+
/* reply to IRM */
- flow->flow.port_id = ipcp_flow_req_arr(getpid(), buf,
- UNKNOWN_AP, "");
- if (flow->flow.port_id < 0) {
+ _ap_instance->flows[fd].port_id = ipcp_flow_req_arr(getpid(),
+ buf,
+ UNKNOWN_AP,
+ UNKNOWN_AE);
+ if (_ap_instance->flows[fd].port_id < 0) {
LOG_ERR("Could not get port id from IRMd");
- close(flow->fd);
- free(flow);
+ close(fd);
continue;
}
- flow->flow.oflags = FLOW_O_DEFAULT;
- flow->flow.state = FLOW_PENDING;
-
- if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) {
- LOG_DBGF("Could not add flow.");
- close(flow->fd);
- free(flow);
- continue;
- }
+ _ap_instance->flows[fd].rb = NULL;
+ _ap_instance->flows[fd].state = FLOW_PENDING;
- FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s);
- shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow;
+ LOG_DBG("Pending allocation request, port_id %u, UDP fd %d.",
+ _ap_instance->flows[fd].port_id, fd);
}
return 0;
@@ -229,8 +366,6 @@ static void * ipcp_udp_sdu_reader()
struct sockaddr_in r_saddr;
while (true) {
- flow_t * flow;
-
if (select(FD_SETSIZE,
&shim_data(_ipcp)->flow_fd_s,
NULL, NULL, NULL)
@@ -249,18 +384,8 @@ static void * ipcp_udp_sdu_reader()
(struct sockaddr *) &r_saddr,
(unsigned *) &n);
- flow = shim_data(_ipcp)->fd_to_flow_ptr[fd];
- if (flow->state == FLOW_PENDING) {
- if (connect(fd,
- (struct sockaddr *) &r_saddr,
- sizeof r_saddr)
- < 0)
- continue;
- flow->state = FLOW_ALLOCATED;
- }
-
/* send the sdu to the correct port_id */
- LOG_MISSING;
+ ipcp_udp_flow_write(fd, buf, n);
}
}
@@ -271,8 +396,6 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
{
char ipstr[INET_ADDRSTRLEN];
char dnsstr[INET_ADDRSTRLEN];
- pthread_t handler;
- pthread_t sdu_reader;
int enable = 1;
if (conf->type != THIS_TYPE) {
@@ -296,7 +419,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
dnsstr,
INET_ADDRSTRLEN);
else
- strcpy(dnsstr, "not set.\n");
+ strcpy(dnsstr, "not set");
shim_data(_ipcp)->ip_addr = conf->ip_addr;
shim_data(_ipcp)->dns_addr = conf->dns_addr;
@@ -304,7 +427,7 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
/* UDP listen server */
if ((shim_data(_ipcp)->s_fd =
- socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
+ socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
LOG_DBGF("Can't create socket.");
return -1;
}
@@ -328,13 +451,28 @@ int ipcp_udp_bootstrap(struct dif_config * conf)
return -1;
}
- pthread_create(&handler, NULL, ipcp_udp_listener, NULL);
- pthread_create(&sdu_reader, NULL, ipcp_udp_sdu_reader, NULL);
+ FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_create(&_ap_instance->handler,
+ NULL,
+ ipcp_udp_listener,
+ NULL);
+ pthread_create(&_ap_instance->sdu_reader[0],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+
+ pthread_create(&_ap_instance->sdu_reader[1],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+
+ _ap_instance->ping_pong = 0;
_ipcp->state = IPCP_ENROLLED;
- LOG_DBG("Bootstrapped shim IPCP over UDP %s-%d.",
- _ipcp->data->iname->name, _ipcp->data->iname->id);
+ LOG_DBG("Bootstrapped shim IPCP over UDP with pid %d.",
+ getpid());
LOG_DBG("Bound to IP address %s.", ipstr);
LOG_DBG("DNS server address is %s.", dnsstr);
@@ -464,23 +602,25 @@ int ipcp_udp_name_unreg(char * name)
}
int ipcp_udp_flow_alloc(uint32_t port_id,
+ pid_t n_pid,
char * dst_name,
char * src_ap_name,
char * src_ae_name,
struct qos_spec * qos)
{
- struct udp_flow * flow = NULL;
struct sockaddr_in l_saddr;
struct sockaddr_in r_saddr;
+ struct sockaddr_in rf_saddr;
+ int fd;
+ int n;
+
+ char * recv_buf = NULL;
struct hostent * h;
if (dst_name == NULL || src_ap_name == NULL || src_ae_name == NULL)
return -1;
- LOG_DBG("Received flow allocation request from %s to %s.",
- src_ap_name, dst_name);
-
if (strlen(dst_name) > 255
|| strlen(src_ap_name) > 255
|| strlen(src_ae_name) > 255) {
@@ -491,15 +631,7 @@ int ipcp_udp_flow_alloc(uint32_t port_id,
if (qos != NULL)
LOG_DBGF("QoS requested. UDP/IP can't do that.");
- flow = malloc(sizeof *flow);
- if (flow == NULL)
- return -1;
-
- flow->fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- if (flow->fd == -1) {
- free(flow);
- return -1;
- }
+ fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
/* this socket is for the flow */
memset((char *) &l_saddr, 0, sizeof l_saddr);
@@ -507,108 +639,161 @@ int ipcp_udp_flow_alloc(uint32_t port_id,
l_saddr.sin_addr.s_addr = local_ip;
l_saddr.sin_port = 0;
- if (bind(flow->fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
- char ipstr[INET_ADDRSTRLEN];
- inet_ntop(AF_INET,
- &l_saddr.sin_addr.s_addr,
- ipstr,
- INET_ADDRSTRLEN);
- close(flow->fd);
- free(flow);
+ if (bind(fd, (struct sockaddr *) &l_saddr, sizeof l_saddr) < 0) {
+ close(fd);
return -1;
}
h = gethostbyname(dst_name);
if (h == NULL) {
LOG_DBGF("Could not resolve %s.", dst_name);
- close(flow->fd);
- free(flow);
+ close(fd);
return -1;
}
-
memset((char *) &r_saddr, 0, sizeof r_saddr);
r_saddr.sin_family = AF_INET;
- r_saddr.sin_addr.s_addr = (uint32_t) *(h->h_addr_list[0]);
+ r_saddr.sin_addr.s_addr = *((uint32_t *) (h->h_addr_list[0]));
r_saddr.sin_port = LISTEN_PORT;
+
/* at least try to get the packet on the wire */
- while (sendto(flow->fd, dst_name, strlen(dst_name), 0,
+ while (sendto(fd, dst_name, strlen(dst_name), 0,
(struct sockaddr *) &r_saddr, sizeof r_saddr) < 0) {
}
- flow->flow.port_id = port_id;
- flow->flow.oflags = FLOW_O_DEFAULT;
- flow->flow.state = FLOW_PENDING;
-
- /* add flow to the list */
+ /* wait for the other shim IPCP to respond */
- pthread_mutex_lock(&_ipcp->data->flow_lock);
+ recv_buf = malloc(strlen(dst_name) + 1);
+ n = sizeof(rf_saddr);
+ n = recvfrom(fd,
+ recv_buf,
+ strlen(dst_name) + 1,
+ 0,
+ (struct sockaddr *) &rf_saddr,
+ (unsigned *) &n);
- if(ipcp_data_add_flow(_ipcp->data, (flow_t *) flow)) {
- LOG_DBGF("Could not add flow.");
- pthread_mutex_unlock(&_ipcp->data->flow_lock);
- close(flow->fd);
- free(flow);
+ if (connect(fd,
+ (struct sockaddr *) &rf_saddr,
+ sizeof rf_saddr)
+ < 0) {
+ free(recv_buf);
return -1;
}
- pthread_mutex_unlock(&_ipcp->data->flow_lock);
+ if (!strcmp(recv_buf, dst_name))
+ LOG_WARN("Incorrect echo from server");
+
+ free(recv_buf);
+
+ _ap_instance->flows[fd].port_id = port_id;
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ LOG_ERR("Could not open N + 1 ringbuffer.");
+ close(fd);
+ }
/* tell IRMd that flow allocation "worked" */
- if (ipcp_flow_alloc_reply(getpid(), flow->flow.port_id, 0)) {
+ if (ipcp_flow_alloc_reply(getpid(), port_id, 0)) {
LOG_ERR("Failed to notify IRMd about flow allocation reply");
- close(flow->fd);
- ipcp_data_del_flow(_ipcp->data, flow->flow.port_id);
+ close(fd);
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
return -1;
}
- FD_SET(flow->fd, &shim_data(_ipcp)->flow_fd_s);
- shim_data(_ipcp)->fd_to_flow_ptr[flow->fd] = &flow->flow;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
- return 0;
+ pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]);
+ pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+ _ap_instance->ping_pong = !_ap_instance->ping_pong;
+
+ LOG_DBG("Allocated flow with port_id %u on UDP fd %d.", port_id, fd);
+
+ return fd;
}
int ipcp_udp_flow_alloc_resp(uint32_t port_id,
+ pid_t n_pid,
int response)
{
- struct udp_flow * flow =
- (struct udp_flow *) ipcp_data_find_flow(_ipcp->data, port_id);
- if (flow == NULL) {
- return -1;
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
}
- if (response) {
- ipcp_data_del_flow(_ipcp->data, port_id);
+ if (response)
return 0;
- }
/* awaken pending flow */
- if (flow->flow.state != FLOW_PENDING)
+ if (_ap_instance->flows[fd].state != FLOW_PENDING) {
+ LOG_DBGF("Flow was not pending.");
return -1;
+ }
+
+ _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ _ap_instance->flows[fd].rb = shm_ap_rbuff_open(n_pid);
+ if (_ap_instance->flows[fd].rb == NULL) {
+ LOG_ERR("Could not open N + 1 ringbuffer.");
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ return 0;
+ }
- flow->flow.state = FLOW_ALLOCATED;
+ FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ pthread_cancel(_ap_instance->sdu_reader[_ap_instance->ping_pong]);
+ pthread_create(&_ap_instance->sdu_reader[_ap_instance->ping_pong],
+ NULL,
+ ipcp_udp_sdu_reader,
+ NULL);
+ _ap_instance->ping_pong = !_ap_instance->ping_pong;
+
+ LOG_DBG("Accepted flow, port_id %u on UDP fd %d.", port_id, fd);
return 0;
}
int ipcp_udp_flow_dealloc(uint32_t port_id)
{
- return 0;
-}
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
+ }
-int ipcp_udp_du_write(uint32_t port_id,
- size_t map_index)
-{
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ if (_ap_instance->flows[fd].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+
+ FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
return 0;
}
-int ipcp_udp_du_read(uint32_t port_id,
- size_t map_index)
+/* FIXME: may be crap, didn't think this one through */
+int ipcp_udp_flow_dealloc_arr(uint32_t port_id)
{
- return 0;
+ int fd = port_id_to_fd(port_id);
+ if (fd < 0) {
+ LOG_DBGF("Could not find flow with port_id %u.", port_id);
+ return 0;
+ }
+
+ _ap_instance->flows[fd].state = FLOW_NULL;
+ _ap_instance->flows[fd].port_id = 0;
+ if (_ap_instance->flows[fd].rb != NULL)
+ shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
+
+ FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+
+ return ipcp_flow_dealloc(0, port_id);
}
struct ipcp * ipcp_udp_create(char * ap_name)
@@ -617,11 +802,14 @@ struct ipcp * ipcp_udp_create(char * ap_name)
struct ipcp_udp_data * data;
struct ipcp_ops * ops;
+ if (shim_ap_init(ap_name) < 0)
+ return NULL;
+
i = malloc(sizeof *i);
if (i == NULL)
return NULL;
- data = ipcp_udp_data_create(ap_name);
+ data = ipcp_udp_data_create();
if (data == NULL) {
free(i);
return NULL;
@@ -643,8 +831,6 @@ struct ipcp * ipcp_udp_create(char * ap_name)
ops->ipcp_flow_alloc = ipcp_udp_flow_alloc;
ops->ipcp_flow_alloc_resp = ipcp_udp_flow_alloc_resp;
ops->ipcp_flow_dealloc = ipcp_udp_flow_dealloc;
- ops->ipcp_du_read = ipcp_udp_du_read;
- ops->ipcp_du_write = ipcp_udp_du_write;
i->data = (struct ipcp_data *) data;
i->ops = ops;
@@ -656,6 +842,40 @@ struct ipcp * ipcp_udp_create(char * ap_name)
#ifndef MAKE_CHECK
+/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
+/* FIXME: stop eating the CPU */
+void * ipcp_udp_sdu_loop(void * o)
+{
+ while (true) {
+ struct rb_entry * e = shm_ap_rbuff_read(_ap_instance->rb);
+ int fd;
+ int len = 0;
+ char * buf;
+ if (e == NULL)
+ continue;
+
+ len = shm_du_map_read_sdu((uint8_t **) &buf,
+ _ap_instance->dum,
+ e->index);
+ if (len == -1)
+ continue;
+
+ fd = port_id_to_fd(e->port_id);
+
+ if (fd == -1)
+ continue;
+
+ if (len == 0)
+ continue;
+
+ send(fd, buf, len, 0);
+
+ shm_release_du_buff(_ap_instance->dum, e->index);
+ }
+
+ return (void *) 1;
+}
+
int main (int argc, char * argv[])
{
/* argument 1: pid of irmd ? */
@@ -680,6 +900,7 @@ int main (int argc, char * argv[])
sigaction(SIGINT, &sig_act, NULL);
sigaction(SIGTERM, &sig_act, NULL);
sigaction(SIGHUP, &sig_act, NULL);
+ sigaction(SIGPIPE, &sig_act, NULL);
_ipcp = ipcp_udp_create(argv[2]);
if (_ipcp == NULL) {
@@ -687,7 +908,18 @@ int main (int argc, char * argv[])
exit(1);
}
- ipcp_main_loop(_ipcp);
+ pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
+ pthread_create(&_ap_instance->sduloop, NULL, ipcp_udp_sdu_loop, NULL);
+
+ pthread_join(_ap_instance->sduloop, NULL);
+ pthread_join(_ap_instance->mainloop, NULL);
+ pthread_join(_ap_instance->handler, NULL);
+ pthread_join(_ap_instance->sdu_reader[0], NULL);
+ pthread_join(_ap_instance->sdu_reader[1], NULL);
+
+ ipcp_udp_destroy(_ipcp);
+
+ shim_ap_fini();
exit(0);
}
diff --git a/src/ipcpd/shim-udp/tests/shim_udp_test.c b/src/ipcpd/shim-udp/tests/shim_udp_test.c
index 036f5877..e5e8b32d 100644
--- a/src/ipcpd/shim-udp/tests/shim_udp_test.c
+++ b/src/ipcpd/shim-udp/tests/shim_udp_test.c
@@ -59,7 +59,7 @@ int shim_udp_test(int argc, char ** argv)
_ipcp = ipcp_udp_create(ipcp_name);
if (_ipcp == NULL) {
LOG_ERR("Could not instantiate shim IPCP.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
@@ -69,13 +69,13 @@ int shim_udp_test(int argc, char ** argv)
if (ipcp_udp_name_reg("bogus name")) {
LOG_ERR("Failed to register application.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
if (ipcp_udp_name_unreg("bogus name")) {
LOG_ERR("Failed to unregister application.");
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
@@ -83,7 +83,7 @@ int shim_udp_test(int argc, char ** argv)
sprintf(bogus, "bogus name %4d", i);
if (ipcp_udp_name_reg(bogus)) {
LOG_ERR("Failed to register application %s.", bogus);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
}
@@ -92,12 +92,12 @@ int shim_udp_test(int argc, char ** argv)
sprintf(bogus, "bogus name %4d", i);
if(ipcp_udp_name_unreg(bogus)) {
LOG_ERR("Failed to unregister application %s.", bogus);
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(1);
}
}
- shm_du_map_close(dum);
+ shm_du_map_destroy(dum);
exit(0);
}