summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-09-18 06:27:43 +0200
committerdimitri staessens <[email protected]>2016-10-04 15:16:00 +0200
commitc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src
parent2e561a634ae3e747b293a4e05eaf44726968dc1a (diff)
downloadouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.tar.gz
ouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.zip
lib, ipcp: Revise fast path and flow interfaces
IPCPs can now use ap_init() to initialize the memory. All flows are accessed using flow descriptors, this greatly simplifies IPCP development. Reverts the fast path to a single ap_rbuff per process. Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev holding tailored functions for shims. Moves the buffer_t to utils.h. Fixes the shim-eth-llc length field. Removes the flow from shared.h. Fixes #4 Fixes #5
Diffstat (limited to 'src')
-rw-r--r--src/ipcpd/flow.h39
-rw-r--r--src/ipcpd/ipcp-data.h41
-rw-r--r--src/ipcpd/ipcp-ops.h21
-rw-r--r--src/ipcpd/ipcp.c167
-rw-r--r--src/ipcpd/ipcp.h19
-rw-r--r--src/ipcpd/local/main.c494
-rw-r--r--src/ipcpd/normal/fmgr.c202
-rw-r--r--src/ipcpd/normal/fmgr.h12
-rw-r--r--src/ipcpd/normal/frct.h2
-rw-r--r--src/ipcpd/normal/main.c172
-rw-r--r--src/ipcpd/normal/ribmgr.c210
-rw-r--r--src/ipcpd/shim-eth-llc/main.c978
-rw-r--r--src/ipcpd/shim-udp/main.c980
-rw-r--r--src/irmd/CMakeLists.txt1
-rw-r--r--src/irmd/ipcp.c (renamed from src/lib/ipcp.c)128
-rw-r--r--src/irmd/ipcp.h62
-rw-r--r--src/irmd/irm_flow.c47
-rw-r--r--src/irmd/irm_flow.h18
-rw-r--r--src/irmd/main.c176
-rw-r--r--src/irmd/utils.h3
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/cdap.c1
-rw-r--r--src/lib/dev.c845
-rw-r--r--src/lib/irm.c2
-rw-r--r--src/lib/irmd_messages.proto3
-rw-r--r--src/lib/shm_ap_rbuff.c50
-rw-r--r--src/lib/shm_rdrbuff.c46
-rw-r--r--src/lib/sockets.c6
-rw-r--r--src/tools/cbr/cbr_server.c7
-rw-r--r--src/tools/irm/irm.c1
-rw-r--r--src/tools/irm/irm_utils.c1
-rw-r--r--src/tools/oping/oping_client.c3
32 files changed, 1975 insertions, 2763 deletions
diff --git a/src/ipcpd/flow.h b/src/ipcpd/flow.h
deleted file mode 100644
index 01226c1e..00000000
--- a/src/ipcpd/flow.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * Flows
- *
- * Dimitri Staessens <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#ifndef OUROBOROS_IPCP_FLOW_H
-#define OUROBOROS_IPCP_FLOW_H
-
-#include <ouroboros/list.h>
-#include <ouroboros/shm_ap_rbuff.h>
-
-#include <stdint.h>
-
-struct flow {
- int port_id;
- struct shm_ap_rbuff * rb;
- enum flow_state state;
-
- pid_t api;
-};
-
-#endif /* OUROBOROS_FLOW_H */
diff --git a/src/ipcpd/ipcp-data.h b/src/ipcpd/ipcp-data.h
index 36245eea..4971dbb5 100644
--- a/src/ipcpd/ipcp-data.h
+++ b/src/ipcpd/ipcp-data.h
@@ -30,7 +30,6 @@
#include <pthread.h>
#include "ipcp-ops.h"
-#include "flow.h"
struct ipcp_data {
enum ipcp_type type;
@@ -46,24 +45,32 @@ struct ipcp_data {
};
struct ipcp_data * ipcp_data_create();
+
struct ipcp_data * ipcp_data_init(struct ipcp_data * dst,
enum ipcp_type ipcp_type);
+
void ipcp_data_destroy(struct ipcp_data * data);
-int ipcp_data_add_reg_entry(struct ipcp_data * data,
- char * name);
-int ipcp_data_del_reg_entry(struct ipcp_data * data,
- const char * name);
-int ipcp_data_add_dir_entry(struct ipcp_data * data,
- char * ap_name,
- uint64_t addr);
-int ipcp_data_del_dir_entry(struct ipcp_data * data,
- const char * ap_name,
- uint64_t addr);
-bool ipcp_data_is_in_registry(struct ipcp_data * data,
- const char * name);
-bool ipcp_data_is_in_directory(struct ipcp_data * data,
- const char * ap_name);
-uint64_t ipcp_data_get_addr(struct ipcp_data * data,
- const char * ap_name);
+int ipcp_data_add_reg_entry(struct ipcp_data * data,
+ char * name);
+
+int ipcp_data_del_reg_entry(struct ipcp_data * data,
+ const char * name);
+
+int ipcp_data_add_dir_entry(struct ipcp_data * data,
+ char * ap_name,
+ uint64_t addr);
+
+int ipcp_data_del_dir_entry(struct ipcp_data * data,
+ const char * ap_name,
+ uint64_t addr);
+
+bool ipcp_data_is_in_registry(struct ipcp_data * data,
+ const char * name);
+
+bool ipcp_data_is_in_directory(struct ipcp_data * data,
+ const char * ap_name);
+
+uint64_t ipcp_data_get_addr(struct ipcp_data * data,
+ const char * ap_name);
#endif /* IPCPD_IPCP_DATA_H */
diff --git a/src/ipcpd/ipcp-ops.h b/src/ipcpd/ipcp-ops.h
index e43c2c38..815cda09 100644
--- a/src/ipcpd/ipcp-ops.h
+++ b/src/ipcpd/ipcp-ops.h
@@ -25,23 +25,26 @@
#define IPCPD_IPCP_OPS_H
#include <ouroboros/irm_config.h>
-#include <ouroboros/common.h>
-#include <sys/types.h>
+#include <ouroboros/shared.h>
struct ipcp_ops {
int (* ipcp_bootstrap)(struct dif_config * conf);
+
int (* ipcp_enroll)(char * dif_name);
- int (* ipcp_name_reg)(char * name);
+
+ int (* ipcp_name_reg)(char * name);
+
int (* ipcp_name_unreg)(char * name);
- int (* ipcp_flow_alloc)(pid_t n_api,
- int port_id,
+
+ int (* ipcp_flow_alloc)(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos);
- int (* ipcp_flow_alloc_resp)(pid_t n_api,
- int port_id,
- int response);
- int (* ipcp_flow_dealloc)(int port_id);
+
+ int (* ipcp_flow_alloc_resp)(int fd,
+ int response);
+
+ int (* ipcp_flow_dealloc)(int fd);
};
#endif /* IPCPD_IPCP_OPS_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index ec5ab927..db72b88d 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -21,8 +21,12 @@
*/
#include <ouroboros/config.h>
-#include <ouroboros/ipcp.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/utils.h>
+#include <ouroboros/sockets.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/dev.h>
+#include <ouroboros/np1_flow.h>
#define OUROBOROS_PREFIX "ipcpd/ipcp"
#include <ouroboros/logs.h>
@@ -32,62 +36,68 @@
#include <stdlib.h>
#include "ipcp.h"
-struct ipcp * ipcp_instance_create()
+int ipcp_init(enum ipcp_type type, struct ipcp_ops * ops)
{
pthread_condattr_t cattr;
- struct ipcp * i = malloc(sizeof *i);
- if (i == NULL)
- return NULL;
+ ipcpi.irmd_fd = -1;
+ ipcpi.state = IPCP_INIT;
- i->data = NULL;
- i->ops = NULL;
- i->irmd_fd = -1;
- i->state = IPCP_INIT;
+ ipcpi.ops = ops;
- pthread_rwlock_init(&i->state_lock, NULL);
- pthread_mutex_init(&i->state_mtx, NULL);
+ ipcpi.data = ipcp_data_create();
+ if (ipcpi.data == NULL)
+ return -ENOMEM;
+
+ ipcp_data_init(ipcpi.data, type);
+
+ pthread_rwlock_init(&ipcpi.state_lock, NULL);
+ pthread_mutex_init(&ipcpi.state_mtx, NULL);
pthread_condattr_init(&cattr);
#ifndef __APPLE__
pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
#endif
- pthread_cond_init(&i->state_cond, &cattr);
+ pthread_cond_init(&ipcpi.state_cond, &cattr);
- return i;
+ pthread_create(&ipcpi.mainloop, NULL, ipcp_main_loop, NULL);
+
+ return 0;
}
-void ipcp_set_state(struct ipcp * ipcp,
- enum ipcp_state state)
+void ipcp_fini()
{
- if (ipcp == NULL)
- return;
+ pthread_join(ipcpi.mainloop, NULL);
- pthread_mutex_lock(&ipcp->state_mtx);
+ ipcp_data_destroy(ipcpi.data);
+ pthread_cond_destroy(&ipcpi.state_cond);
+ pthread_mutex_destroy(&ipcpi.state_mtx);
+ pthread_rwlock_destroy(&ipcpi.state_lock);
+}
+
+void ipcp_set_state(enum ipcp_state state)
+{
+ pthread_mutex_lock(&ipcpi.state_mtx);
- ipcp->state = state;
+ ipcpi.state = state;
- pthread_cond_broadcast(&ipcp->state_cond);
- pthread_mutex_unlock(&ipcp->state_mtx);
+ pthread_cond_broadcast(&ipcpi.state_cond);
+ pthread_mutex_unlock(&ipcpi.state_mtx);
}
-enum ipcp_state ipcp_get_state(struct ipcp * ipcp)
+enum ipcp_state ipcp_get_state()
{
enum ipcp_state state;
- if (ipcp == NULL)
- return IPCP_NULL;
+ pthread_mutex_lock(&ipcpi.state_mtx);
- pthread_mutex_lock(&ipcp->state_mtx);
+ state = ipcpi.state;
- state = ipcp->state;
-
- pthread_mutex_unlock(&ipcp->state_mtx);
+ pthread_mutex_unlock(&ipcpi.state_mtx);
return state;
}
-int ipcp_wait_state(struct ipcp * ipcp,
- enum ipcp_state state,
+int ipcp_wait_state(enum ipcp_state state,
const struct timespec * timeout)
{
struct timespec abstime;
@@ -95,24 +105,24 @@ int ipcp_wait_state(struct ipcp * ipcp,
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
ts_add(&abstime, timeout, &abstime);
- pthread_mutex_lock(&ipcp->state_mtx);
+ pthread_mutex_lock(&ipcpi.state_mtx);
- while (ipcp->state != state && ipcp->state != IPCP_SHUTDOWN) {
+ while (ipcpi.state != state && ipcpi.state != IPCP_SHUTDOWN) {
int ret;
if (timeout == NULL)
- ret = pthread_cond_wait(&ipcp->state_cond,
- &ipcp->state_mtx);
+ ret = pthread_cond_wait(&ipcpi.state_cond,
+ &ipcpi.state_mtx);
else
- ret = pthread_cond_timedwait(&ipcp->state_cond,
- &ipcp->state_mtx,
+ ret = pthread_cond_timedwait(&ipcpi.state_cond,
+ &ipcpi.state_mtx,
&abstime);
if (ret) {
- pthread_mutex_unlock(&ipcp->state_mtx);
+ pthread_mutex_unlock(&ipcpi.state_mtx);
return -ret;
}
}
- pthread_mutex_unlock(&ipcp->state_mtx);
+ pthread_mutex_unlock(&ipcpi.state_mtx);
return 0;
}
@@ -161,7 +171,6 @@ void * ipcp_main_loop(void * o)
int lsockfd;
int sockfd;
uint8_t buf[IPCP_MSG_BUF_SIZE];
- struct ipcp * _ipcp = (struct ipcp *) o;
ipcp_msg_t * msg;
ssize_t count;
@@ -180,12 +189,6 @@ void * ipcp_main_loop(void * o)
struct timeval ltv = {(SOCKET_TIMEOUT / 1000),
(SOCKET_TIMEOUT % 1000) * 1000};
-
- if (_ipcp == NULL) {
- LOG_ERR("Invalid ipcp struct.");
- return (void *) 1;
- }
-
sock_path = ipcp_sock_path(getpid());
if (sock_path == NULL)
return (void *) 1;
@@ -202,13 +205,15 @@ void * ipcp_main_loop(void * o)
LOG_WARN("Failed to set timeout on socket.");
while (true) {
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ int fd = -1;
+
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_SHUTDOWN) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
break;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
@@ -235,7 +240,7 @@ void * ipcp_main_loop(void * o)
switch (msg->code) {
case IPCP_MSG_CODE__IPCP_BOOTSTRAP:
- if (_ipcp->ops->ipcp_bootstrap == NULL) {
+ if (ipcpi.ops->ipcp_bootstrap == NULL) {
LOG_ERR("Bootstrap unsupported.");
break;
}
@@ -267,72 +272,102 @@ void * ipcp_main_loop(void * o)
conf.if_name = conf_msg->if_name;
ret_msg.has_result = true;
- ret_msg.result = _ipcp->ops->ipcp_bootstrap(&conf);
+ ret_msg.result = ipcpi.ops->ipcp_bootstrap(&conf);
if (ret_msg.result < 0)
free(conf.dif_name);
break;
case IPCP_MSG_CODE__IPCP_ENROLL:
- if (_ipcp->ops->ipcp_enroll == NULL) {
+ if (ipcpi.ops->ipcp_enroll == NULL) {
LOG_ERR("Enroll unsupported.");
break;
}
ret_msg.has_result = true;
- ret_msg.result = _ipcp->ops->ipcp_enroll(msg->dif_name);
+ ret_msg.result = ipcpi.ops->ipcp_enroll(msg->dif_name);
break;
case IPCP_MSG_CODE__IPCP_NAME_REG:
- if (_ipcp->ops->ipcp_name_reg == NULL) {
+ if (ipcpi.ops->ipcp_name_reg == NULL) {
LOG_ERR("Ap_reg unsupported.");
break;
}
msg_name_dup = strdup(msg->name);
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_name_reg(msg_name_dup);
+ ipcpi.ops->ipcp_name_reg(msg_name_dup);
if (ret_msg.result < 0)
free(msg_name_dup);
break;
case IPCP_MSG_CODE__IPCP_NAME_UNREG:
- if (_ipcp->ops->ipcp_name_unreg == NULL) {
+ if (ipcpi.ops->ipcp_name_unreg == NULL) {
LOG_ERR("Ap_unreg unsupported.");
break;
}
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_name_unreg(msg->name);
+ ipcpi.ops->ipcp_name_unreg(msg->name);
break;
case IPCP_MSG_CODE__IPCP_FLOW_ALLOC:
- if (_ipcp->ops->ipcp_flow_alloc == NULL) {
+ if (ipcpi.ops->ipcp_flow_alloc == NULL) {
LOG_ERR("Flow_alloc unsupported.");
break;
}
+ fd = np1_flow_alloc(msg->api, msg->port_id);
+ if (fd < 0) {
+ LOG_ERR("Could not get fd for flow.");
+ ret_msg.has_result = true;
+ ret_msg.result = -1;
+ break;
+ }
+
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_flow_alloc(msg->api,
- msg->port_id,
+ ipcpi.ops->ipcp_flow_alloc(fd,
msg->dst_name,
msg->src_ae_name,
msg->qos_cube);
+ if (ret_msg.result < 0) {
+ LOG_DBG("Deallocating failed flow on port_id %d.",
+ msg->port_id);
+ flow_dealloc(fd);
+ }
break;
case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP:
- if (_ipcp->ops->ipcp_flow_alloc_resp == NULL) {
+ if (ipcpi.ops->ipcp_flow_alloc_resp == NULL) {
LOG_ERR("Flow_alloc_resp unsupported.");
break;
}
+
+ if (!msg->response) {
+ fd = np1_flow_resp(msg->api, msg->port_id);
+ if (fd < 0) {
+ LOG_ERR("Could not get fd for flow.");
+ ret_msg.has_result = true;
+ ret_msg.result = -1;
+ break;
+ }
+ }
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_flow_alloc_resp(msg->api,
- msg->port_id,
- msg->result);
+ ipcpi.ops->ipcp_flow_alloc_resp(fd,
+ msg->response);
break;
case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
- if (_ipcp->ops->ipcp_flow_dealloc == NULL) {
+ if (ipcpi.ops->ipcp_flow_dealloc == NULL) {
LOG_ERR("Flow_dealloc unsupported.");
break;
}
+
+ fd = np1_flow_dealloc(msg->port_id);
+ if (fd < 0) {
+ LOG_ERR("Could not get fd for flow.");
+ ret_msg.has_result = true;
+ ret_msg.result = -1;
+ break;
+ }
+
ret_msg.has_result = true;
ret_msg.result =
- _ipcp->ops->ipcp_flow_dealloc(msg->port_id);
+ ipcpi.ops->ipcp_flow_dealloc(fd);
break;
default:
LOG_ERR("Don't know that message code");
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index edaea0fd..87c0c5d1 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -24,7 +24,6 @@
#define IPCPD_IPCP_H
#include <ouroboros/config.h>
-#include <ouroboros/shared.h>
#include <pthread.h>
#include <time.h>
@@ -50,23 +49,23 @@ struct ipcp {
pthread_rwlock_t state_lock;
pthread_mutex_t state_mtx;
pthread_cond_t state_cond;
-};
-struct ipcp * ipcp_instance_create();
+ pthread_t mainloop;
+} ipcpi;
+
+int ipcp_init();
-void ipcp_set_state(struct ipcp * ipcp,
- enum ipcp_state state);
+void ipcp_fini();
-enum ipcp_state ipcp_get_state(struct ipcp * ipcp);
+void ipcp_set_state(enum ipcp_state state);
-int ipcp_wait_state(struct ipcp * ipcp,
- enum ipcp_state state,
+enum ipcp_state ipcp_get_state();
+
+int ipcp_wait_state(enum ipcp_state state,
const struct timespec * timeout);
void * ipcp_main_loop(void * o);
-void * ipcp_sdu_loop(void * o);
-
int ipcp_parse_arg(int argc,
char * argv[]);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index c0809429..1ccec0c0 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -22,17 +22,10 @@
#include <ouroboros/config.h>
#include "ipcp.h"
-#include "flow.h"
#include <ouroboros/errno.h>
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/list.h>
-#include <ouroboros/utils.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/irm_config.h>
-#include <ouroboros/bitmap.h>
-#include <ouroboros/shared.h>
#include <ouroboros/dev.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
#include <ouroboros/logs.h>
@@ -46,176 +39,51 @@
#define THIS_TYPE IPCP_LOCAL
-#define shim_data(type) ((struct ipcp_local_data *) type->data)
-
/* global for trapping signal */
int irmd_api;
-/* this IPCP's data */
-#ifdef MAKE_CHECK
-extern struct ipcp * _ipcp; /* defined in test */
-#else
-struct ipcp * _ipcp;
-#endif
-
-/*
- * copied from ouroboros/dev. The shim needs access to the internals
- * because it doesn't follow all steps necessary steps to get
- * the info
- */
-
-/* the shim needs access to these internals */
-struct shim_ap_data {
- pid_t api;
- struct shm_rdrbuff * rdrb;
- struct bmp * fds;
- struct shm_ap_rbuff * rb;
-
- int in_out[AP_MAX_FLOWS];
+struct {
+ int in_out[IRMD_MAX_FLOWS];
- struct flow flows[AP_MAX_FLOWS];
- pthread_rwlock_t flows_lock;
-
- pthread_t mainloop;
+ pthread_rwlock_t lock;
pthread_t sduloop;
+} local_data;
-} * _ap_instance;
-
-static int shim_ap_init()
+void local_data_init()
{
int i;
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ local_data.in_out[i] = -1;
- _ap_instance = malloc(sizeof(struct shim_ap_data));
- if (_ap_instance == NULL) {
- return -1;
- }
-
- _ap_instance->api = getpid();
-
- _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (_ap_instance->fds == NULL) {
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rdrb = shm_rdrbuff_open();
- if (_ap_instance->rdrb == NULL) {
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rb = shm_ap_rbuff_create_n();
- if (_ap_instance->rb == NULL) {
- shm_rdrbuff_close(_ap_instance->rdrb);
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- for (i = 0; i < AP_MAX_FLOWS; i ++) {
- _ap_instance->flows[i].rb = NULL;
- _ap_instance->flows[i].port_id = -1;
- _ap_instance->flows[i].state = FLOW_NULL;
- _ap_instance->in_out[i] = -1;
- }
-
- pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
-
- return 0;
-}
-
-void shim_ap_fini()
-{
- int i = 0;
-
- if (_ap_instance == NULL)
- return;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (_ipcp->state != IPCP_SHUTDOWN)
- LOG_WARN("Cleaning up AP while not in shutdown.");
-
- if (_ap_instance->fds != NULL)
- bmp_destroy(_ap_instance->fds);
-
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
- shm_rdrbuff_remove(_ap_instance->rdrb, i);
-
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_close(_ap_instance->rdrb);
- if (_ap_instance->rb != NULL)
- shm_ap_rbuff_destroy(_ap_instance->rb);
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- for (i = 0; i < AP_MAX_FLOWS; i ++)
- if (_ap_instance->flows[i].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[i].rb);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- free(_ap_instance);
+ pthread_rwlock_init(&local_data.lock, NULL);
}
-/* only call this under flows_lock */
-static int port_id_to_fd(int port_id)
+void local_data_fini()
{
- int i;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (_ap_instance->flows[i].port_id == port_id
- && _ap_instance->flows[i].state != FLOW_NULL)
- return i;
- }
-
- return -1;
+ pthread_rwlock_destroy(&local_data.lock);
}
-/*
- * end copy from dev.c
- */
-
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
static void * ipcp_local_sdu_loop(void * o)
{
while (true) {
- struct rb_entry * e;
- int fd;
-
- e = shm_ap_rbuff_read(_ap_instance->rb);
- if (e == NULL) {
- continue;
- }
+ struct rb_entry e;
+ int fd = local_flow_read(&e);
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (_ipcp->state != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return (void *) 1; /* -ENOTENROLLED */
}
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- fd = _ap_instance->in_out[port_id_to_fd(e->port_id)];
- if (fd == -1) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- free(e);
- continue;
- }
-
- e->port_id = _ap_instance->flows[fd].port_id;
-
- while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0)
- ;
+ pthread_rwlock_rdlock(&local_data.lock);
+ fd = local_data.in_out[fd];
+ pthread_rwlock_unlock(&local_data.lock);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (fd != -1)
+ local_flow_write(fd, &e);
- free(e);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
return (void *) 1;
@@ -223,10 +91,6 @@ static void * ipcp_local_sdu_loop(void * o)
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
- sigset_t sigset;
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGINT);
-
switch(sig) {
case SIGINT:
case SIGTERM:
@@ -236,11 +100,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
LOG_DBG("IPCP %d terminating by order of %d. Bye.",
getpid(), info->si_pid);
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(_ipcp, IPCP_SHUTDOWN);
+ ipcp_set_state(IPCP_SHUTDOWN);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
default:
return;
@@ -254,307 +118,154 @@ static int ipcp_local_bootstrap(struct dif_config * conf)
return -1;
}
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("IPCP in wrong state.");
return -1;
}
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
- pthread_create(&_ap_instance->sduloop,
- NULL,
- ipcp_local_sdu_loop,
- NULL);
+ pthread_create(&local_data.sduloop, NULL, ipcp_local_sdu_loop, NULL);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Bootstrapped local IPCP with api %d.",
- getpid());
+ LOG_INFO("Bootstrapped local IPCP with api %d.", getpid());
return 0;
}
static int ipcp_local_name_reg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBGF("Failed to add %s to local registry.", name);
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Registered %s.", name);
+ LOG_INFO("Registered %s.", name);
return 0;
}
static int ipcp_local_name_unreg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ LOG_INFO("Unregistered %s.", name);
return 0;
}
-static int ipcp_local_flow_alloc(pid_t n_api,
- int port_id,
+static int ipcp_local_flow_alloc(int fd,
char * dst_name,
char * src_ae_name,
enum qos_cube qos)
{
- int in_fd = -1;
int out_fd = -1;
- struct shm_ap_rbuff * rb;
-
- LOG_INFO("Allocating flow to %s.", dst_name);
+ LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd);
if (dst_name == NULL || src_ae_name == NULL)
return -1;
/* This ipcpd has all QoS */
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBGF("Won't register with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1; /* -ENORBUFF */
- }
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_wrlock(&local_data.lock);
- in_fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -EMFILE;
- }
+ out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
- _ap_instance->flows[in_fd].port_id = port_id;
- _ap_instance->flows[in_fd].state = FLOW_PENDING;
- _ap_instance->flows[in_fd].rb = rb;
+ local_data.in_out[fd] = out_fd;
+ local_data.in_out[out_fd] = fd;
- LOG_DBGF("Pending local flow with port_id %d.", port_id);
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- /* reply to IRM */
- port_id = ipcp_flow_req_arr(getpid(),
- dst_name,
- src_ae_name);
-
- if (port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_ERR("Could not get port id from IRMd");
- /* shm_ap_rbuff_close(n_api); */
- return -1;
- }
-
- out_fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) {
- /* shm_ap_rbuff_close(n_api); */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1; /* -ENOMOREFDS */
- }
-
- _ap_instance->flows[out_fd].port_id = port_id;
- _ap_instance->flows[out_fd].rb = NULL;
- _ap_instance->flows[out_fd].state = FLOW_PENDING;
-
- _ap_instance->in_out[in_fd] = out_fd;
- _ap_instance->in_out[out_fd] = in_fd;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- LOG_DBGF("Pending local allocation request, port_id %d.", port_id);
+ LOG_INFO("Pending local allocation request on fd %d.", fd);
return 0;
}
-static int ipcp_local_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+static int ipcp_local_flow_alloc_resp(int fd, int response)
{
- struct shm_ap_rbuff * rb;
- int in_fd = -1;
int out_fd = -1;
int ret = -1;
+ LOG_DBG("Received response for fd %d: %d.", fd, response);
+
if (response)
return 0;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- /* awaken pending flow */
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- in_fd = port_id_to_fd(port_id);
- if (in_fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return -1;
- }
-
- if (_ap_instance->flows[in_fd].state != FLOW_PENDING) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow was not pending.");
- return -1;
- }
-
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- LOG_ERR("Could not open N + 1 ringbuffer.");
- _ap_instance->flows[in_fd].state = FLOW_NULL;
- _ap_instance->flows[in_fd].port_id = -1;
- _ap_instance->in_out[in_fd] = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1;
- }
-
- _ap_instance->flows[in_fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[in_fd].rb = rb;
-
- LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- out_fd = _ap_instance->in_out[in_fd];
+ out_fd = local_data.in_out[fd];
if (out_fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("No pending local flow with port_id %d.", port_id);
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- if (_ap_instance->flows[out_fd].state != FLOW_PENDING) {
- /* FIXME: clean up other end */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow was not pending.");
- return -1;
- }
-
- _ap_instance->flows[out_fd].state = FLOW_ALLOCATED;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if ((ret = ipcp_flow_alloc_reply(getpid(),
- _ap_instance->flows[out_fd].port_id,
- response)) < 0) {
- return -1; /* -EPIPE */
- }
+ if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
+ return -1;
- LOG_INFO("Flow allocation completed, port_ids (%d, %d).",
- _ap_instance->flows[out_fd].port_id,
- _ap_instance->flows[in_fd].port_id);
+ LOG_INFO("Flow allocation completed, fds (%d, %d).", out_fd, fd);
return ret;
}
-static int ipcp_local_flow_dealloc(int port_id)
+static int ipcp_local_flow_dealloc(int fd)
{
- int fd = -1;
- struct shm_ap_rbuff * rb;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- fd = port_id_to_fd(port_id);
- if (fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return 0;
- }
-
- bmp_release(_ap_instance->fds, fd);
-
- if (_ap_instance->in_out[fd] != -1)
- _ap_instance->in_out[_ap_instance->in_out[fd]] = -1;
-
- _ap_instance->in_out[fd] = -1;
-
- _ap_instance->flows[fd].state = FLOW_NULL;
- _ap_instance->flows[fd].port_id = -1;
- rb = _ap_instance->flows[fd].rb;
- _ap_instance->flows[fd].rb = NULL;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
-
- if (rb != NULL)
- shm_ap_rbuff_close(rb);
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- LOG_DBGF("Flow with port_id %d deallocated.", port_id);
-
- return 0;
-}
-
-static struct ipcp * ipcp_local_create()
-{
- struct ipcp * i;
- struct ipcp_ops * ops;
+ int out_fd = -1;
- i = ipcp_instance_create();
- if (i == NULL)
- return NULL;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&local_data.lock);
- i->data = ipcp_data_create();
- if (i->data == NULL) {
- free(i);
- return NULL;
- }
+ out_fd = local_data.in_out[fd];
- if (ipcp_data_init(i->data, THIS_TYPE) == NULL) {
- free(i->data);
- free(i);
- return NULL;
+ if (out_fd != -1) {
+ local_data.in_out[out_fd] = -1;
+ flow_dealloc(out_fd);
}
- ops = malloc(sizeof(*ops));
- if (ops == NULL) {
- free(i->data);
- free(i);
- return NULL;
- }
+ local_data.in_out[fd] = -1;
- ops->ipcp_bootstrap = ipcp_local_bootstrap;
- ops->ipcp_enroll = NULL; /* shim */
- ops->ipcp_name_reg = ipcp_local_name_reg;
- ops->ipcp_name_unreg = ipcp_local_name_unreg;
- ops->ipcp_flow_alloc = ipcp_local_flow_alloc;
- ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp;
- ops->ipcp_flow_dealloc = ipcp_local_flow_dealloc;
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- i->ops = ops;
+ LOG_INFO("Flow with fd %d deallocated.", fd);
- i->state = IPCP_INIT;
-
- return i;
+ return 0;
}
-#ifndef MAKE_CHECK
+static struct ipcp_ops local_ops = {
+ .ipcp_bootstrap = ipcp_local_bootstrap,
+ .ipcp_enroll = NULL, /* shim */
+ .ipcp_name_reg = ipcp_local_name_reg,
+ .ipcp_name_unreg = ipcp_local_name_unreg,
+ .ipcp_flow_alloc = ipcp_local_flow_alloc,
+ .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp,
+ .ipcp_flow_dealloc = ipcp_local_flow_dealloc
+};
int main(int argc, char * argv[])
{
@@ -571,7 +282,9 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- if (shim_ap_init() < 0) {
+ local_data_init();
+
+ if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
@@ -591,17 +304,13 @@ int main(int argc, char * argv[])
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- _ipcp = ipcp_local_create();
- if (_ipcp == NULL) {
- LOG_ERR("Failed to create IPCP.");
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+ if (ipcp_init(THIS_TYPE, &local_ops) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
- pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
-
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
if (ipcp_create_r(getpid())) {
@@ -610,21 +319,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- pthread_join(_ap_instance->mainloop, NULL);
-
- pthread_cancel(_ap_instance->sduloop);
- pthread_join(_ap_instance->sduloop, NULL);
+ ipcp_fini();
- shim_ap_fini();
+ pthread_cancel(local_data.sduloop);
+ pthread_join(local_data.sduloop, NULL);
- ipcp_data_destroy(_ipcp->data);
+ ap_fini();
- free(_ipcp->ops);
- free(_ipcp);
+ local_data_fini();
close_logfile();
exit(EXIT_SUCCESS);
}
-
-#endif /* MAKE_CHECK */
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 79b1bb4b..b6ec1984 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,7 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
-#include <ouroboros/ipcp.h>
+#include <ouroboros/ipcp-dev.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -41,10 +41,8 @@
#include "flow_alloc.pb-c.h"
typedef FlowAllocMsg flow_alloc_msg_t;
-extern struct ipcp * _ipcp;
-
struct n_flow {
- struct flow flow;
+ int fd;
struct frct_i * frct_i;
enum qos_cube qos;
@@ -57,7 +55,7 @@ struct n_1_flow {
struct list_head next;
};
-struct fmgr {
+struct {
pthread_t listen_thread;
struct list_head n_1_flows;
@@ -66,10 +64,9 @@ struct fmgr {
struct list_head n_flows;
/* FIXME: Make this a read/write lock */
pthread_mutex_t n_flows_lock;
-} * fmgr = NULL;
+} fmgr;
-static int add_n_1_fd(int fd,
- char * ae_name)
+static int add_n_1_fd(int fd, char * ae_name)
{
struct n_1_flow * tmp;
@@ -85,9 +82,9 @@ static int add_n_1_fd(int fd,
INIT_LIST_HEAD(&tmp->next);
- pthread_mutex_lock(&fmgr->n_1_flows_lock);
- list_add(&tmp->next, &fmgr->n_1_flows);
- pthread_mutex_unlock(&fmgr->n_1_flows_lock);
+ pthread_mutex_lock(&fmgr.n_1_flows_lock);
+ list_add(&tmp->next, &fmgr.n_1_flows);
+ pthread_mutex_unlock(&fmgr.n_1_flows_lock);
return 0;
}
@@ -98,16 +95,16 @@ static void * fmgr_listen(void * o)
char * ae_name;
while (true) {
- ipcp_wait_state(_ipcp, IPCP_ENROLLED, NULL);
+ ipcp_wait_state(IPCP_ENROLLED, NULL);
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_SHUTDOWN) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() == IPCP_SHUTDOWN) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
fd = flow_accept(&ae_name);
if (fd < 0) {
@@ -161,17 +158,13 @@ static void * fmgr_listen(void * o)
int fmgr_init()
{
- fmgr = malloc(sizeof(*fmgr));
- if (fmgr == NULL)
- return -1;
+ INIT_LIST_HEAD(&fmgr.n_1_flows);
+ INIT_LIST_HEAD(&fmgr.n_flows);
- INIT_LIST_HEAD(&fmgr->n_1_flows);
- INIT_LIST_HEAD(&fmgr->n_flows);
+ pthread_mutex_init(&fmgr.n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr.n_flows_lock, NULL);
- pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
- pthread_mutex_init(&fmgr->n_flows_lock, NULL);
-
- pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
+ pthread_create(&fmgr.listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -180,23 +173,20 @@ int fmgr_fini()
{
struct list_head * pos = NULL;
- pthread_cancel(fmgr->listen_thread);
+ pthread_cancel(fmgr.listen_thread);
- pthread_join(fmgr->listen_thread, NULL);
+ pthread_join(fmgr.listen_thread, NULL);
- list_for_each(pos, &fmgr->n_1_flows) {
- struct n_1_flow * e =
- list_entry(pos, struct n_1_flow, next);
+ list_for_each(pos, &fmgr.n_1_flows) {
+ struct n_1_flow * e = list_entry(pos, struct n_1_flow, next);
if (e->ae_name != NULL)
free(e->ae_name);
if (ribmgr_remove_flow(e->fd))
LOG_ERR("Failed to remove management flow.");
}
- pthread_mutex_destroy(&fmgr->n_1_flows_lock);
- pthread_mutex_destroy(&fmgr->n_flows_lock);
-
- free(fmgr);
+ pthread_mutex_destroy(&fmgr.n_1_flows_lock);
+ pthread_mutex_destroy(&fmgr.n_flows_lock);
return 0;
}
@@ -243,8 +233,7 @@ int fmgr_mgmt_flow(char * dst_name)
return 0;
}
-int fmgr_dt_flow(char * dst_name,
- enum qos_cube qos)
+int fmgr_dt_flow(char * dst_name, enum qos_cube qos)
{
int fd;
int result;
@@ -288,14 +277,13 @@ int fmgr_dt_flow(char * dst_name,
}
/* Call under n_flows lock */
-static struct n_flow * get_n_flow_by_port_id(int port_id)
+static struct n_flow * get_n_flow_by_fd(int fd)
{
struct list_head * pos = NULL;
- list_for_each(pos, &fmgr->n_flows) {
- struct n_flow * e =
- list_entry(pos, struct n_flow, next);
- if (e->flow.port_id == port_id)
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
+ if (e->fd == fd)
return e;
}
@@ -307,9 +295,8 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
{
struct list_head * pos = NULL;
- list_for_each(pos, &fmgr->n_flows) {
- struct n_flow * e =
- list_entry(pos, struct n_flow, next);
+ list_for_each(pos, &fmgr.n_flows) {
+ struct n_flow * e = list_entry(pos, struct n_flow, next);
if (e->frct_i == frct_i)
return e;
}
@@ -317,8 +304,7 @@ static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
return NULL;
}
-int fmgr_flow_alloc(pid_t n_api,
- int port_id,
+int fmgr_flow_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos)
@@ -355,49 +341,40 @@ int fmgr_flow_alloc(pid_t n_api,
flow_alloc_msg__pack(&msg, buf.data);
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
frct_i = frct_i_create(address, &buf, qos);
if (frct_i == NULL) {
free(buf.data);
free(flow);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
free(buf.data);
- flow->flow.rb = shm_ap_rbuff_open_s(n_api);
- if (flow->flow.rb == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- free(flow);
- return -1;
- }
-
- flow->flow.api = n_api;
- flow->flow.port_id = port_id;
- flow->flow.state = FLOW_PENDING;
+ flow->fd = fd;
flow->frct_i = frct_i;
- flow->qos = qos;
+ flow->qos = qos;
INIT_LIST_HEAD(&flow->next);
- list_add(&flow->next, &fmgr->n_flows);
+ list_add(&flow->next, &fmgr.n_flows);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return 0;
}
/* Call under n_flows lock */
-static int n_flow_dealloc(int port_id)
+static int n_flow_dealloc(int fd)
{
struct n_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
int ret;
- flow = get_n_flow_by_port_id(port_id);
+ flow = get_n_flow_by_fd(fd);
if (flow == NULL)
return -1;
@@ -414,8 +391,6 @@ static int n_flow_dealloc(int port_id)
flow_alloc_msg__pack(&msg, buf.data);
ret = frct_i_destroy(flow->frct_i, &buf);
- if (flow->flow.rb != NULL)
- shm_ap_rbuff_close(flow->flow.rb);
list_del(&flow->next);
free(flow);
@@ -424,25 +399,17 @@ static int n_flow_dealloc(int port_id)
return ret;
}
-int fmgr_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+int fmgr_flow_alloc_resp(int fd, int response)
{
struct n_flow * flow;
flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
buffer_t buf;
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
- flow = get_n_flow_by_port_id(port_id);
+ flow = get_n_flow_by_fd(fd);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
-
- if (flow->flow.state != FLOW_PENDING) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- LOG_ERR("Flow is not pending.");
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
@@ -452,13 +419,13 @@ int fmgr_flow_alloc_resp(pid_t n_api,
buf.len = flow_alloc_msg__get_packed_size(&msg);
if (buf.len == 0) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
buf.data = malloc(buf.len);
if (buf.data == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return -1;
}
@@ -469,106 +436,85 @@ int fmgr_flow_alloc_resp(pid_t n_api,
free(buf.data);
list_del(&flow->next);
free(flow);
- } else {
- if (frct_i_accept(flow->frct_i, &buf)) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
-
- flow->flow.state = FLOW_ALLOCATED;
- flow->flow.api = n_api;
-
- flow->flow.rb = shm_ap_rbuff_open_s(n_api);
- if (flow->flow.rb == NULL) {
- n_flow_dealloc(port_id);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
- return -1;
- }
+ } else if (frct_i_accept(flow->frct_i, &buf)) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
+ return -1;
}
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return 0;
}
-int fmgr_flow_dealloc(int port_id)
+int fmgr_flow_dealloc(int fd)
{
int ret;
- pthread_mutex_lock(&fmgr->n_flows_lock);
- ret = n_flow_dealloc(port_id);
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
+ ret = n_flow_dealloc(fd);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
return ret;
}
-int fmgr_flow_alloc_msg(struct frct_i * frct_i,
- buffer_t * buf)
+int fmgr_flow_alloc_msg(struct frct_i * frct_i, buffer_t * buf)
{
struct n_flow * flow;
int ret = 0;
- int port_id;
+ int fd;
flow_alloc_msg_t * msg;
- pthread_mutex_lock(&fmgr->n_flows_lock);
+ pthread_mutex_lock(&fmgr.n_flows_lock);
- /* Depending on what is in the message call the function in ipcp.h */
+ /* Depending on the message call the function in ipcp-dev.h */
msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
if (msg == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
LOG_ERR("Failed to unpack flow alloc message");
return -1;
}
switch (msg->code) {
case FLOW_ALLOC_CODE__FLOW_REQ:
-
flow = malloc(sizeof(*flow));
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
return -1;
}
- flow->flow.state = FLOW_PENDING;
flow->frct_i = frct_i;
flow->qos = msg->qos_cube;
- flow->flow.rb = NULL;
- flow->flow.api = 0;
-
- port_id = ipcp_flow_req_arr(getpid(),
- msg->dst_name,
- msg->src_ae_name);
- if (port_id < 0) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ fd = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (fd < 0) {
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
free(flow);
flow_alloc_msg__free_unpacked(msg, NULL);
- LOG_ERR("Failed to get port-id from IRMd.");
+ LOG_ERR("Failed to get fd for flow.");
return -1;
}
- flow->flow.port_id = port_id;
+ flow->fd = fd;
INIT_LIST_HEAD(&flow->next);
- list_add(&flow->next, &fmgr->n_flows);
+ list_add(&flow->next, &fmgr.n_flows);
break;
case FLOW_ALLOC_CODE__FLOW_REPLY:
flow = get_n_flow_by_frct_i(frct_i);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
}
- ret = ipcp_flow_alloc_reply(getpid(),
- flow->flow.port_id,
- msg->response);
-
+ ret = ipcp_flow_alloc_reply(flow->fd, msg->response);
if (msg->response < 0) {
- shm_ap_rbuff_close(flow->flow.rb);
list_del(&flow->next);
free(flow);
}
@@ -577,13 +523,13 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,
case FLOW_ALLOC_CODE__FLOW_DEALLOC:
flow = get_n_flow_by_frct_i(frct_i);
if (flow == NULL) {
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
LOG_ERR("No such flow in flow manager.");
return -1;
}
- ret = irm_flow_dealloc(flow->flow.port_id);
+ ret = flow_dealloc(flow->fd);
break;
default:
LOG_ERR("Got an unknown flow allocation message.");
@@ -591,7 +537,7 @@ int fmgr_flow_alloc_msg(struct frct_i * frct_i,
break;
}
- pthread_mutex_unlock(&fmgr->n_flows_lock);
+ pthread_mutex_unlock(&fmgr.n_flows_lock);
flow_alloc_msg__free_unpacked(msg, NULL);
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index 342410ca..7e3ef5f4 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -35,25 +35,25 @@
#define DT_AE "Data transfer"
int fmgr_init();
+
int fmgr_fini();
/* N-flow ops */
int fmgr_mgmt_flow(char * dst_name);
+
int fmgr_dt_flow(char * dst_name,
enum qos_cube qos);
/* N+1-flow ops, local */
-int fmgr_flow_alloc(pid_t n_api,
- int port_id,
+int fmgr_flow_alloc(int fd,
char * dst_ap_name,
char * src_ae_name,
enum qos_cube qos);
-int fmgr_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response);
+int fmgr_flow_alloc_resp(int fd,
+ int response);
-int fmgr_flow_dealloc(int port_id);
+int fmgr_flow_dealloc(int fd);
/* N+1-flow ops, remote */
int fmgr_flow_alloc_msg(struct frct_i * frct_i,
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 09873445..0ee87004 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -24,7 +24,7 @@
#define OUROBOROS_IPCP_FRCT_H
#include <ouroboros/shared.h>
-#include <ouroboros/common.h>
+#include <ouroboros/utils.h>
#include "dt_const.h"
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 082973f4..4611408d 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -24,10 +24,8 @@
#include <ouroboros/config.h>
#include <ouroboros/logs.h>
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/dev.h>
-#include <ouroboros/ipcp.h>
+#include <ouroboros/ipcp-dev.h>
#include <ouroboros/time_utils.h>
#include <stdbool.h>
@@ -47,26 +45,8 @@
/* global for trapping signal */
int irmd_api;
-struct ipcp * _ipcp;
-
-#define normal_data(type) ((struct normal_ipcp_data *) type->data)
-
-struct normal_ipcp_data {
- /* Keep ipcp_data first for polymorphism. */
- struct ipcp_data ipcp_data;
-
- struct shm_rdrbuff * rdrb;
- struct shm_ap_rbuff * rb;
-
- pthread_t mainloop;
-};
-
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
- sigset_t sigset;
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGINT);
-
switch(sig) {
case SIGINT:
case SIGTERM:
@@ -75,11 +55,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
LOG_DBG("IPCP %d terminating by order of %d. Bye.",
getpid(), info->si_pid);
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(_ipcp, IPCP_SHUTDOWN);
+ ipcp_set_state(IPCP_SHUTDOWN);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
default:
return;
@@ -88,15 +68,15 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
static int normal_ipcp_name_reg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Registered %s.", name);
@@ -105,11 +85,11 @@ static int normal_ipcp_name_reg(char * name)
static int normal_ipcp_name_unreg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
@@ -119,59 +99,59 @@ static int normal_ipcp_enroll(char * dif_name)
struct timespec timeout = {(ENROLL_TIMEOUT / 1000),
(ENROLL_TIMEOUT % 1000) * MILLION};
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Won't enroll an IPCP that is not in INIT.");
return -1; /* -ENOTINIT */
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (fmgr_mgmt_flow(dif_name)) {
LOG_ERR("Failed to establish management flow.");
return -1;
}
- if (ipcp_wait_state(_ipcp, IPCP_ENROLLED, &timeout) == -ETIMEDOUT) {
+ if (ipcp_wait_state(IPCP_ENROLLED, &timeout) == -ETIMEDOUT) {
LOG_ERR("Enrollment timed out.");
return -1;
}
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
static int normal_ipcp_bootstrap(struct dif_config * conf)
{
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Won't bootstrap an IPCP that is not in INIT.");
return -1; /* -ENOTINIT */
}
if (ribmgr_bootstrap(conf)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to bootstrap RIB manager.");
return -1;
}
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
- _ipcp->data->dif_name = conf->dif_name;
+ ipcpi.data->dif_name = conf->dif_name;
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Bootstrapped in DIF %s.", conf->dif_name);
@@ -188,67 +168,6 @@ static struct ipcp_ops normal_ops = {
.ipcp_flow_dealloc = fmgr_flow_dealloc
};
-struct normal_ipcp_data * normal_ipcp_data_create()
-{
- struct normal_ipcp_data * normal_data;
- enum ipcp_type ipcp_type;
-
- normal_data = malloc(sizeof(*normal_data));
- if (normal_data == NULL) {
- LOG_ERR("Failed to allocate.");
- return NULL;
- }
-
- ipcp_type = THIS_TYPE;
- if (ipcp_data_init((struct ipcp_data *) normal_data,
- ipcp_type) == NULL) {
- free(normal_data);
- return NULL;
- }
-
- normal_data->rdrb = shm_rdrbuff_open();
- if (normal_data->rdrb == NULL) {
- free(normal_data);
- return NULL;
- }
-
- normal_data->rb = shm_ap_rbuff_create_n();
- if (normal_data->rb == NULL) {
- shm_rdrbuff_close(normal_data->rdrb);
- free(normal_data);
- return NULL;
- }
-
- return normal_data;
-}
-
-
-void normal_ipcp_data_destroy()
-{
- int idx = 0;
-
- if (_ipcp == NULL)
- return;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)
- LOG_WARN("Cleaning up while not in shutdown.");
-
- /* remove all remaining sdus */
- while ((idx = shm_ap_rbuff_peek_idx(normal_data(_ipcp)->rb)) >= 0)
- shm_rdrbuff_remove(normal_data(_ipcp)->rdrb, idx);
-
- if (normal_data(_ipcp)->rdrb != NULL)
- shm_rdrbuff_close(normal_data(_ipcp)->rdrb);
- if (normal_data(_ipcp)->rb != NULL)
- shm_ap_rbuff_close(normal_data(_ipcp)->rb);
-
- ipcp_data_destroy(_ipcp->data);
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
-}
-
int main(int argc, char * argv[])
{
struct sigaction sig_act;
@@ -285,56 +204,38 @@ int main(int argc, char * argv[])
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- _ipcp = ipcp_instance_create();
- if (_ipcp == NULL) {
- LOG_ERR("Failed to create instance.");
- close_logfile();
- exit(EXIT_FAILURE);
- }
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- _ipcp->data = (struct ipcp_data *) normal_ipcp_data_create();
- if (_ipcp->data == NULL) {
- LOG_ERR("Failed to create instance data.");
- free(_ipcp);
+ if (ipcp_init(THIS_TYPE, &normal_ops) < 0) {
+ LOG_ERR("Failed to create instance.");
close_logfile();
exit(EXIT_FAILURE);
}
- _ipcp->ops = &normal_ops;
- _ipcp->state = IPCP_INIT;
+ pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
if (fmgr_init()) {
- normal_ipcp_data_destroy();
- free(_ipcp);
+ ipcp_fini();
close_logfile();
exit(EXIT_FAILURE);
}
if (ribmgr_init()) {
- normal_ipcp_data_destroy();
fmgr_fini();
- free(_ipcp);
+ ipcp_fini();
close_logfile();
exit(EXIT_FAILURE);
}
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
- pthread_create(&normal_data(_ipcp)->mainloop, NULL,
- ipcp_main_loop, _ipcp);
-
- pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-
if (ipcp_create_r(getpid())) {
LOG_ERR("Failed to notify IRMd we are initialized.");
- normal_ipcp_data_destroy();
fmgr_fini();
- free(_ipcp);
+ ipcp_fini();
close_logfile();
exit(EXIT_FAILURE);
}
- pthread_join(normal_data(_ipcp)->mainloop, NULL);
+ ipcp_fini();
if (fmgr_fini())
LOG_ERR("Failed to finalize flow manager.");
@@ -345,10 +246,9 @@ int main(int argc, char * argv[])
if (frct_fini())
LOG_ERR("Failed to finalize FRCT.");
- normal_ipcp_data_destroy();
- free(_ipcp);
close_logfile();
ap_fini();
+
exit(EXIT_SUCCESS);
}
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 9733abc9..99d156f5 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -27,6 +27,7 @@
#include <ouroboros/cdap.h>
#include <ouroboros/list.h>
#include <ouroboros/time_utils.h>
+#include <ouroboros/ipcp-dev.h>
#include <stdlib.h>
#include <pthread.h>
@@ -45,15 +46,13 @@ typedef StaticInfoMsg static_info_msg_t;
#define ENROLLMENT "enrollment"
#define STATIC_INFO "static DIF information"
-extern struct ipcp * _ipcp;
-
struct mgmt_flow {
struct cdap * instance;
int fd;
struct list_head next;
};
-struct rib {
+struct {
struct dt_const dtc;
uint32_t address;
@@ -63,7 +62,7 @@ struct rib {
struct list_head cdap_reqs;
pthread_mutex_t cdap_reqs_lock;
-} * rib = NULL;
+} rib;
/* Call while holding cdap_reqs_lock */
/* FIXME: better not to call blocking functions under any lock */
@@ -84,13 +83,13 @@ int cdap_result_wait(struct cdap * instance,
return -1;
}
- list_add(&req->next, &rib->cdap_reqs);
+ list_add(&req->next, &rib.cdap_reqs);
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
ret = cdap_request_wait(req);
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
if (ret == -1) /* should only be on ipcp shutdown */
LOG_DBG("Waiting CDAP request destroyed.");
@@ -112,22 +111,16 @@ int cdap_result_wait(struct cdap * instance,
int ribmgr_init()
{
- rib = malloc(sizeof(*rib));
- if (rib == NULL)
- return -1;
+ INIT_LIST_HEAD(&rib.flows);
+ INIT_LIST_HEAD(&rib.cdap_reqs);
- INIT_LIST_HEAD(&rib->flows);
- INIT_LIST_HEAD(&rib->cdap_reqs);
-
- if (pthread_rwlock_init(&rib->flows_lock, NULL)) {
+ if (pthread_rwlock_init(&rib.flows_lock, NULL)) {
LOG_ERR("Failed to initialize rwlock.");
- free(rib);
return -1;
}
- if (pthread_mutex_init(&rib->cdap_reqs_lock, NULL)) {
+ if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) {
LOG_ERR("Failed to initialize mutex.");
- free(rib);
return -1;
}
@@ -139,19 +132,18 @@ int ribmgr_fini()
struct list_head * pos = NULL;
struct list_head * n = NULL;
- pthread_mutex_lock(&rib->cdap_reqs_lock);
- list_for_each_safe(pos, n, &rib->cdap_reqs) {
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
+ list_for_each_safe(pos, n, &rib.cdap_reqs) {
struct cdap_request * req =
list_entry(pos, struct cdap_request, next);
-
free(req->name);
list_del(&req->next);
free(req);
}
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
- pthread_rwlock_wrlock(&rib->flows_lock);
- list_for_each_safe(pos, n, &rib->flows) {
+ pthread_rwlock_wrlock(&rib.flows_lock);
+ list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (cdap_destroy(flow->instance))
@@ -159,9 +151,10 @@ int ribmgr_fini()
list_del(&flow->next);
free(flow);
}
- pthread_rwlock_unlock(&rib->flows_lock);
+ pthread_rwlock_unlock(&rib.flows_lock);
- free(rib);
+ pthread_mutex_destroy(&rib.cdap_reqs_lock);
+ pthread_rwlock_destroy(&rib.flows_lock);
return 0;
}
@@ -174,9 +167,9 @@ int ribmgr_cdap_reply(struct cdap * instance,
{
struct list_head * pos, * n = NULL;
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
- list_for_each_safe(pos, n, &rib->cdap_reqs) {
+ list_for_each_safe(pos, n, &rib.cdap_reqs) {
struct cdap_request * req =
list_entry(pos, struct cdap_request, next);
if (req->instance == instance &&
@@ -191,15 +184,15 @@ int ribmgr_cdap_reply(struct cdap * instance,
"executed succesfully",
req->code, req->name);
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
/* FIXME: In case of a read, update values here */
cdap_request_respond(req, result);
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
}
}
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
return 0;
}
@@ -223,34 +216,34 @@ int ribmgr_cdap_write(struct cdap * instance,
static_info_msg_t * msg;
int ret = 0;
- pthread_rwlock_wrlock(&_ipcp->state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL &&
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_PENDING_ENROLL &&
strcmp(name, STATIC_INFO) == 0) {
LOG_DBG("Received static DIF information.");
msg = static_info_msg__unpack(NULL, len, data);
if (msg == NULL) {
- ipcp_set_state(_ipcp, IPCP_INIT);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
cdap_send_reply(instance, invoke_id, -1, NULL, 0);
LOG_ERR("Failed to unpack static info message.");
return -1;
}
- rib->dtc.addr_size = msg->addr_size;
- rib->dtc.cep_id_size = msg->cep_id_size;
- rib->dtc.pdu_length_size = msg->pdu_length_size;
- rib->dtc.seqno_size = msg->seqno_size;
- rib->dtc.has_ttl = msg->has_ttl;
- rib->dtc.has_chk = msg->has_chk;
- rib->dtc.min_pdu_size = msg->min_pdu_size;
- rib->dtc.max_pdu_size = msg->max_pdu_size;
+ rib.dtc.addr_size = msg->addr_size;
+ rib.dtc.cep_id_size = msg->cep_id_size;
+ rib.dtc.pdu_length_size = msg->pdu_length_size;
+ rib.dtc.seqno_size = msg->seqno_size;
+ rib.dtc.has_ttl = msg->has_ttl;
+ rib.dtc.has_chk = msg->has_chk;
+ rib.dtc.min_pdu_size = msg->min_pdu_size;
+ rib.dtc.max_pdu_size = msg->max_pdu_size;
- rib->address = msg->address;
+ rib.address = msg->address;
- if (frct_init(&rib->dtc, rib->address)) {
- ipcp_set_state(_ipcp, IPCP_INIT);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (frct_init(&rib.dtc, rib.address)) {
+ ipcp_set_state(IPCP_INIT);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
cdap_send_reply(instance, invoke_id, -1, NULL, 0);
static_info_msg__free_unpacked(msg, NULL);
LOG_ERR("Failed to init FRCT");
@@ -262,7 +255,7 @@ int ribmgr_cdap_write(struct cdap * instance,
ret = -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
LOG_ERR("Failed to send reply to write request.");
@@ -303,39 +296,39 @@ int ribmgr_cdap_start(struct cdap * instance,
size_t len = 0;
int iid = 0;
- pthread_rwlock_wrlock(&_ipcp->state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_ENROLLED &&
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_ENROLLED &&
strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("New enrollment request.");
if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to enrollment request.");
return -1;
}
- stat_info.addr_size = rib->dtc.addr_size;
- stat_info.cep_id_size = rib->dtc.cep_id_size;
- stat_info.pdu_length_size = rib->dtc.pdu_length_size;
- stat_info.seqno_size = rib->dtc.seqno_size;
- stat_info.has_ttl = rib->dtc.has_ttl;
- stat_info.has_chk = rib->dtc.has_chk;
- stat_info.min_pdu_size = rib->dtc.min_pdu_size;
- stat_info.max_pdu_size = rib->dtc.max_pdu_size;
+ stat_info.addr_size = rib.dtc.addr_size;
+ stat_info.cep_id_size = rib.dtc.cep_id_size;
+ stat_info.pdu_length_size = rib.dtc.pdu_length_size;
+ stat_info.seqno_size = rib.dtc.seqno_size;
+ stat_info.has_ttl = rib.dtc.has_ttl;
+ stat_info.has_chk = rib.dtc.has_chk;
+ stat_info.min_pdu_size = rib.dtc.min_pdu_size;
+ stat_info.max_pdu_size = rib.dtc.max_pdu_size;
/* FIXME: Hand out an address. */
stat_info.address = 0;
len = static_info_msg__get_packed_size(&stat_info);
if (len == 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to get size of static information.");
return -1;
}
data = malloc(len);
if (data == NULL) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to allocate memory.");
return -1;
}
@@ -344,59 +337,59 @@ int ribmgr_cdap_start(struct cdap * instance,
LOG_DBGF("Sending static info...");
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
iid = cdap_send_write(instance, STATIC_INFO, data, len, 0);
if (iid < 0) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
free(data);
LOG_ERR("Failed to send static information.");
return -1;
}
if (cdap_result_wait(instance, WRITE, STATIC_INFO, iid)) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
free(data);
LOG_ERR("Remote did not receive static information.");
return -1;
}
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
/* FIXME: Send neighbors here. */
LOG_DBGF("Sending stop enrollment...");
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
iid = cdap_send_stop(instance, ENROLLMENT);
if (iid < 0) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
free(data);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
if (cdap_result_wait(instance, STOP, ENROLLMENT, iid)) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
free(data);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
free(data);
} else {
if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to start request.");
return -1;
}
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
@@ -407,21 +400,21 @@ int ribmgr_cdap_stop(struct cdap * instance,
{
int ret = 0;
- pthread_rwlock_wrlock(&_ipcp->state_lock);
- if (ipcp_get_state(_ipcp) == IPCP_PENDING_ENROLL &&
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (ipcp_get_state() == IPCP_PENDING_ENROLL &&
strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("Stop enrollment received.");
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
} else
ret = -1;
if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to stop request.");
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
@@ -457,19 +450,18 @@ int ribmgr_add_flow(int fd)
flow->instance = instance;
flow->fd = fd;
- pthread_rwlock_wrlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&rib->flows_lock);
- if (list_empty(&rib->flows) &&
- ipcp_get_state(_ipcp) == IPCP_INIT) {
- ipcp_set_state(_ipcp, IPCP_PENDING_ENROLL);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&rib.flows_lock);
+ if (list_empty(&rib.flows) && ipcp_get_state() == IPCP_INIT) {
+ ipcp_set_state(IPCP_PENDING_ENROLL);
- pthread_mutex_lock(&rib->cdap_reqs_lock);
+ pthread_mutex_lock(&rib.cdap_reqs_lock);
iid = cdap_send_start(instance,
ENROLLMENT);
if (iid < 0) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&rib->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&rib.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
cdap_destroy(instance);
free(flow);
@@ -477,20 +469,20 @@ int ribmgr_add_flow(int fd)
}
if (cdap_result_wait(instance, START, ENROLLMENT, iid)) {
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
- pthread_rwlock_unlock(&rib->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
+ pthread_rwlock_unlock(&rib.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
cdap_destroy(instance);
free(flow);
return -1;
}
- pthread_mutex_unlock(&rib->cdap_reqs_lock);
+ pthread_mutex_unlock(&rib.cdap_reqs_lock);
}
- list_add(&flow->next, &rib->flows);
- pthread_rwlock_unlock(&rib->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ list_add(&flow->next, &rib.flows);
+ pthread_rwlock_unlock(&rib.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
@@ -499,20 +491,20 @@ int ribmgr_remove_flow(int fd)
{
struct list_head * pos, * n = NULL;
- pthread_rwlock_wrlock(&rib->flows_lock);
- list_for_each_safe(pos, n, &rib->flows) {
+ pthread_rwlock_wrlock(&rib.flows_lock);
+ list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
if (flow->fd == fd) {
if (cdap_destroy(flow->instance))
LOG_ERR("Failed to destroy CDAP instance.");
list_del(&flow->next);
- pthread_rwlock_unlock(&rib->flows_lock);
+ pthread_rwlock_unlock(&rib.flows_lock);
free(flow);
return 0;
}
}
- pthread_rwlock_unlock(&rib->flows_lock);
+ pthread_rwlock_unlock(&rib.flows_lock);
return -1;
}
@@ -525,19 +517,19 @@ int ribmgr_bootstrap(struct dif_config * conf)
return -1;
}
- rib->dtc.addr_size = conf->addr_size;
- rib->dtc.cep_id_size = conf->cep_id_size;
- rib->dtc.pdu_length_size = conf->pdu_length_size;
- rib->dtc.seqno_size = conf->seqno_size;
- rib->dtc.has_ttl = conf->has_ttl;
- rib->dtc.has_chk = conf->has_chk;
- rib->dtc.min_pdu_size = conf->min_pdu_size;
- rib->dtc.max_pdu_size = conf->max_pdu_size;
+ rib.dtc.addr_size = conf->addr_size;
+ rib.dtc.cep_id_size = conf->cep_id_size;
+ rib.dtc.pdu_length_size = conf->pdu_length_size;
+ rib.dtc.seqno_size = conf->seqno_size;
+ rib.dtc.has_ttl = conf->has_ttl;
+ rib.dtc.has_chk = conf->has_chk;
+ rib.dtc.min_pdu_size = conf->min_pdu_size;
+ rib.dtc.max_pdu_size = conf->max_pdu_size;
/* FIXME: Set correct address. */
- rib->address = 0;
+ rib.address = 0;
- if (frct_init(&rib->dtc, rib->address)) {
+ if (frct_init(&rib.dtc, rib.address)) {
LOG_ERR("Failed to initialize FRCT.");
return -1;
}
diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c
index d74984cc..2cf46e51 100644
--- a/src/ipcpd/shim-eth-llc/main.c
+++ b/src/ipcpd/shim-eth-llc/main.c
@@ -24,24 +24,19 @@
#define _DEFAULT_SOURCE
-#include "ipcp.h"
-#include "flow.h"
#include <ouroboros/errno.h>
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/irm_config.h>
-#include <ouroboros/sockets.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/flow.h>
#include <ouroboros/dev.h>
+#include <ouroboros/ipcp-dev.h>
#define OUROBOROS_PREFIX "ipcpd/shim-eth-llc"
#include <ouroboros/logs.h>
+#include "ipcp.h"
+
#include <net/if.h>
#include <signal.h>
#include <stdlib.h>
@@ -79,18 +74,12 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;
#define LLC_HEADER_SIZE 3
#define MAX_SAPS 64
#define ETH_HEADER_SIZE (2 * MAC_SIZE + 2)
-#define ETH_FRAME_SIZE (SHIM_ETH_LLC_MAX_SDU_SIZE + ETH_HEADER_SIZE + \
- LLC_HEADER_SIZE + 2)
+#define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \
+ + SHIM_ETH_LLC_MAX_SDU_SIZE)
/* global for trapping signal */
int irmd_api;
-struct ipcp * _ipcp;
-
-#define shim_data(type) ((struct eth_llc_ipcp_data *) type->data)
-
-#define ipcp_flow(index) ((struct flow *) &(shim_data(_ipcp)->flows[index]))
-
struct eth_llc_frame {
uint8_t dst_hwaddr[MAC_SIZE];
uint8_t src_hwaddr[MAC_SIZE];
@@ -98,196 +87,78 @@ struct eth_llc_frame {
uint8_t dsap;
uint8_t ssap;
uint8_t cf;
- uint8_t size[2];
uint8_t payload;
};
-struct eth_llc_flow {
- struct flow flow;
- uint8_t sap;
- uint8_t r_sap;
- uint8_t r_addr[MAC_SIZE];
+struct ef {
+ int8_t sap;
+ int8_t r_sap;
+ uint8_t r_addr[MAC_SIZE];
};
-struct eth_llc_ipcp_data {
- /* Keep ipcp_data first for polymorphism. */
- struct ipcp_data ipcp_data;
-
+struct {
#ifdef __FreeBSD__
- struct sockaddr_dl device;
+ struct sockaddr_dl device;
#else
- struct sockaddr_ll device;
+ struct sockaddr_ll device;
#endif
- int s_fd;
-
- struct bmp * indices;
- struct bmp * saps;
+ int s_fd;
- struct shm_rdrbuff * rdrb;
- struct shm_ap_rbuff * rb;
-
- uint8_t * rx_ring;
- uint8_t * tx_ring;
- int tx_offset;
-
- struct eth_llc_flow flows[AP_MAX_FLOWS];
- pthread_rwlock_t flows_lock;
+ struct bmp * saps;
+#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
+ uint8_t * rx_ring;
+ uint8_t * tx_ring;
+ int tx_offset;
+#endif
+ int * ef_to_fd;
+ struct ef * fd_to_ef;
+ pthread_rwlock_t flows_lock;
- pthread_t mainloop;
- pthread_t sdu_writer;
- pthread_t sdu_reader;
-};
+ pthread_t sdu_writer;
+ pthread_t sdu_reader;
+} eth_llc_data;
-struct eth_llc_ipcp_data * eth_llc_ipcp_data_create()
+static int eth_llc_data_init()
{
- struct eth_llc_ipcp_data * eth_llc_data;
- enum ipcp_type ipcp_type;
-
- eth_llc_data = malloc(sizeof(*eth_llc_data));
- if (eth_llc_data == NULL) {
- LOG_ERR("Failed to allocate.");
- return NULL;
- }
-
- ipcp_type = THIS_TYPE;
- if (ipcp_data_init((struct ipcp_data *) eth_llc_data,
- ipcp_type) == NULL) {
- free(eth_llc_data);
- return NULL;
- }
-
- eth_llc_data->rdrb = shm_rdrbuff_open();
- if (eth_llc_data->rdrb == NULL) {
- free(eth_llc_data);
- return NULL;
- }
-
- eth_llc_data->rb = shm_ap_rbuff_create_n();
- if (eth_llc_data->rb == NULL) {
- shm_rdrbuff_close(eth_llc_data->rdrb);
- free(eth_llc_data);
- return NULL;
- }
+ int i;
- eth_llc_data->indices = bmp_create(AP_MAX_FLOWS, 0);
- if (eth_llc_data->indices == NULL) {
- shm_ap_rbuff_destroy(eth_llc_data->rb);
- shm_rdrbuff_close(eth_llc_data->rdrb);
- free(eth_llc_data);
- return NULL;
- }
+ eth_llc_data.fd_to_ef = malloc(sizeof(struct ef) * IRMD_MAX_FLOWS);
+ if (eth_llc_data.fd_to_ef == NULL)
+ return -ENOMEM;
- eth_llc_data->saps = bmp_create(MAX_SAPS, 2);
- if (eth_llc_data->indices == NULL) {
- bmp_destroy(eth_llc_data->indices);
- shm_ap_rbuff_destroy(eth_llc_data->rb);
- shm_rdrbuff_close(eth_llc_data->rdrb);
- free(eth_llc_data);
- return NULL;
+ eth_llc_data.ef_to_fd = malloc(sizeof(struct ef) * MAX_SAPS);
+ if (eth_llc_data.ef_to_fd == NULL) {
+ free(eth_llc_data.fd_to_ef);
+ return -ENOMEM;
}
- pthread_rwlock_init(&eth_llc_data->flows_lock, NULL);
-
- return eth_llc_data;
-}
-
-void eth_llc_ipcp_data_destroy()
-{
- int i = 0;
-
- if (_ipcp == NULL)
- return;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (ipcp_get_state(_ipcp) != IPCP_SHUTDOWN)
- LOG_WARN("Cleaning up while not in shutdown.");
-
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(shim_data(_ipcp)->rb)) >= 0)
- shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, i);
-
- if (shim_data(_ipcp)->rdrb != NULL)
- shm_rdrbuff_close(shim_data(_ipcp)->rdrb);
- if (shim_data(_ipcp)->rb != NULL)
- shm_ap_rbuff_destroy(shim_data(_ipcp)->rb);
- if (shim_data(_ipcp)->indices != NULL)
- bmp_destroy(shim_data(_ipcp)->indices);
- if (shim_data(_ipcp)->saps != NULL)
- bmp_destroy(shim_data(_ipcp)->saps);
-
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
-
- for (i = 0; i < AP_MAX_FLOWS; i ++)
- if (ipcp_flow(i)->rb != NULL)
- shm_ap_rbuff_close(ipcp_flow(i)->rb);
-
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- ipcp_data_destroy(_ipcp->data);
-}
-
-/* only call this under flows_lock */
-static int port_id_to_index(int port_id)
-{
- int i;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (ipcp_flow(i)->port_id == port_id
- && ipcp_flow(i)->state != FLOW_NULL)
- return i;
+ eth_llc_data.saps = bmp_create(MAX_SAPS, 2);
+ if (eth_llc_data.saps == NULL) {
+ free(eth_llc_data.ef_to_fd);
+ free(eth_llc_data.fd_to_ef);
+ return -ENOMEM;
}
- return -1;
-}
+ for (i = 0; i < MAX_SAPS; ++i)
+ eth_llc_data.ef_to_fd[i] = -1;
-/* only call this under flows_lock */
-static int addr_and_saps_to_index(const uint8_t * r_addr,
- uint8_t r_sap,
- uint8_t sap)
-{
- int i = 0;
-
- for (i = 0; i < AP_MAX_FLOWS; i++) {
- if (ipcp_flow(i)->state == FLOW_ALLOCATED &&
- shim_data(_ipcp)->flows[i].r_sap == r_sap &&
- shim_data(_ipcp)->flows[i].sap == sap &&
- !memcmp(shim_data(_ipcp)->flows[i].r_addr,
- r_addr,
- MAC_SIZE)) {
- return i;
- }
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ eth_llc_data.fd_to_ef[i].sap = -1;
+ eth_llc_data.fd_to_ef[i].r_sap = -1;
+ memset(&eth_llc_data.fd_to_ef[i].r_addr, 0, MAC_SIZE);
}
- return -1;
-}
+ pthread_rwlock_init(&eth_llc_data.flows_lock, NULL);
-/* only call this under flows_lock */
-static int sap_to_index(uint8_t sap)
-{
- int i = 0;
-
- for (i = 0; i < AP_MAX_FLOWS; i++) {
- if (shim_data(_ipcp)->flows[i].sap == sap) {
- return i;
- }
- }
-
- return -1;
+ return 0;
}
-/* only call this under flows_lock */
-static void destroy_ipcp_flow(int index)
+void eth_llc_data_fini()
{
- ipcp_flow(index)->port_id = -1;
- if (ipcp_flow(index)->rb != NULL)
- shm_ap_rbuff_close(ipcp_flow(index)->rb);
- ipcp_flow(index)->rb = NULL;
- ipcp_flow(index)->state = FLOW_NULL;
- bmp_release(shim_data(_ipcp)->indices, index);
- bmp_release(shim_data(_ipcp)->saps,
- shim_data(_ipcp)->flows[index].sap);
+ bmp_destroy(eth_llc_data.saps);
+ free(eth_llc_data.fd_to_ef);
+ free(eth_llc_data.ef_to_fd);
+ pthread_rwlock_destroy(&eth_llc_data.flows_lock);
}
static uint8_t reverse_bits(uint8_t b)
@@ -299,7 +170,7 @@ static uint8_t reverse_bits(uint8_t b)
return b;
}
-static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
+static int eth_llc_ipcp_send_frame(uint8_t * dst_addr,
uint8_t dsap,
uint8_t ssap,
uint8_t * payload,
@@ -307,24 +178,16 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
{
int frame_len = 0;
uint8_t cf = 0x03;
- int fd;
-
- uint16_t size;
uint16_t length;
-
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
struct pollfd pfd;
struct tpacket_hdr * header;
uint8_t * frame;
#else
uint8_t frame[SHIM_ETH_LLC_MAX_SDU_SIZE];
-#ifdef __FreeBSD__
- struct sockaddr_dl device;
-#else
- struct sockaddr_ll device;
-#endif
#endif
struct eth_llc_frame * llc_frame;
+
if (payload == NULL) {
LOG_ERR("Payload was NULL.");
return -1;
@@ -333,79 +196,75 @@ static int eth_llc_ipcp_send_frame(uint8_t dst_addr[MAC_SIZE],
if (len > SHIM_ETH_LLC_MAX_SDU_SIZE)
return -1;
- fd = (shim_data(_ipcp))->s_fd;
-
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- header = (void *) shim_data(_ipcp)->tx_ring +
- (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);
+ header = (void *) (eth_llc_data.tx_ring +
+ eth_llc_data.tx_offset * SHM_RDRB_BLOCK_SIZE);
while (header->tp_status != TP_STATUS_AVAILABLE) {
- pfd.fd = fd;
+ pfd.fd = eth_llc_data.s_fd;
pfd.revents = 0;
pfd.events = POLLIN | POLLRDNORM | POLLERR;
if (poll(&pfd, 1, -1) <= 0) {
- LOG_ERR("Failed to poll: %s.", strerror(errno));
+ LOG_ERR("Failed to poll.");
continue;
}
- header = (void *) shim_data(_ipcp)->tx_ring +
- (shim_data(_ipcp)->tx_offset * SHM_RDRB_BLOCK_SIZE);
+ header = (void *) (eth_llc_data.tx_ring
+ + eth_llc_data.tx_offset
+ * SHM_RDRB_BLOCK_SIZE);
}
- frame = (void *) header + TPACKET_HDRLEN - sizeof(struct sockaddr_ll);
+ frame = (uint8_t *) header
+ + TPACKET_HDRLEN - sizeof(struct sockaddr_ll);
#endif
-
llc_frame = (struct eth_llc_frame *) frame;
- memcpy(&llc_frame->dst_hwaddr, dst_addr, MAC_SIZE);
- memcpy(&llc_frame->src_hwaddr,
+ memcpy(llc_frame->dst_hwaddr, dst_addr, MAC_SIZE);
+ memcpy(llc_frame->src_hwaddr,
#ifdef __FreeBSD__
- LLADDR(&shim_data(_ipcp)->device),
+ LLADDR(&eth_llc_data.device),
#else
- shim_data(_ipcp)->device.sll_addr,
+ eth_llc_data.device.sll_addr,
#endif
MAC_SIZE);
- length = htons(LLC_HEADER_SIZE + sizeof(size) + len);
+ length = htons(LLC_HEADER_SIZE + len);
memcpy(&llc_frame->length, &length, sizeof(length));
llc_frame->dsap = dsap;
llc_frame->ssap = ssap;
- llc_frame->cf = cf;
- /* write the payload length, can't trust the driver */
- size = htons(len);
- memcpy(&llc_frame->size, &size, sizeof(size));
+ llc_frame->cf = cf;
memcpy(&llc_frame->payload, payload, len);
- frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + sizeof(uint16_t) + len;
+ frame_len = ETH_HEADER_SIZE + LLC_HEADER_SIZE + len;
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
header->tp_len = frame_len;
header->tp_status = TP_STATUS_SEND_REQUEST;
- if (send(fd, NULL, 0, MSG_DONTWAIT) < 0) {
+ if (send(eth_llc_data.s_fd, NULL, 0, MSG_DONTWAIT) < 0) {
LOG_ERR("Failed to write frame into TX_RING.");
return -1;
}
- shim_data(_ipcp)->tx_offset =
- (shim_data(_ipcp)->tx_offset + 1)
- & (SHM_BUFFER_SIZE -1);
+ eth_llc_data.tx_offset =
+ (eth_llc_data.tx_offset + 1) & (SHM_BUFFER_SIZE - 1);
#else
- device = (shim_data(_ipcp))->device;
-
- if (sendto(fd, frame, frame_len, 0,
- (struct sockaddr *) &device, sizeof(device)) <= 0) {
+ if (sendto(eth_llc_data.s_fd,
+ frame,
+ frame_len,
+ 0,
+ (struct sockaddr *) &eth_llc_data.device,
+ sizeof(eth_llc_data.device)) <= 0) {
LOG_ERR("Failed to send message.");
return -1;
}
#endif
-
return 0;
}
static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg,
- uint8_t dst_addr[MAC_SIZE])
+ uint8_t * dst_addr)
{
size_t len;
uint8_t * buf;
@@ -423,6 +282,7 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg,
if (eth_llc_ipcp_send_frame(dst_addr, reverse_bits(MGMT_SAP),
reverse_bits(MGMT_SAP), buf, len)) {
LOG_ERR("Failed to send management frame.");
+ free(buf);
return -1;
}
@@ -431,10 +291,10 @@ static int eth_llc_ipcp_send_mgmt_frame(shim_eth_llc_msg_t * msg,
return 0;
}
-static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE],
- uint8_t ssap,
- char * dst_name,
- char * src_ae_name)
+static int eth_llc_ipcp_sap_alloc(uint8_t * dst_addr,
+ uint8_t ssap,
+ char * dst_name,
+ char * src_ae_name)
{
shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
@@ -446,10 +306,10 @@ static int eth_llc_ipcp_port_alloc(uint8_t dst_addr[MAC_SIZE],
return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
}
-static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE],
- uint8_t ssap,
- uint8_t dsap,
- int response)
+static int eth_llc_ipcp_sap_alloc_resp(uint8_t * dst_addr,
+ uint8_t ssap,
+ uint8_t dsap,
+ int response)
{
shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
@@ -463,8 +323,7 @@ static int eth_llc_ipcp_port_alloc_resp(uint8_t dst_addr[MAC_SIZE],
return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
}
-static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE],
- uint8_t ssap)
+static int eth_llc_ipcp_sap_dealloc(uint8_t * dst_addr, uint8_t ssap)
{
shim_eth_llc_msg_t msg = SHIM_ETH_LLC_MSG__INIT;
@@ -474,142 +333,102 @@ static int eth_llc_ipcp_port_dealloc(uint8_t dst_addr[MAC_SIZE],
return eth_llc_ipcp_send_mgmt_frame(&msg, dst_addr);
}
-static int eth_llc_ipcp_port_req(uint8_t r_sap,
- uint8_t r_addr[MAC_SIZE],
- char * dst_name,
- char * src_ae_name)
+static int eth_llc_ipcp_sap_req(uint8_t r_sap,
+ uint8_t * r_addr,
+ char * dst_name,
+ char * src_ae_name)
{
- int port_id;
- ssize_t index = 0;
- int i;
+ int fd;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
-
- index = bmp_allocate(shim_data(_ipcp)->indices);
- if (index < 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- LOG_ERR("Out of free indices.");
- return -1;
- }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
/* reply to IRM */
- port_id = ipcp_flow_req_arr(getpid(),
- dst_name,
- src_ae_name);
-
- if (port_id < 0) {
- bmp_release(shim_data(_ipcp)->indices, index);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- LOG_ERR("Could not get port id from IRMd.");
+ fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
+ if (fd < 0) {
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Could not get new flow from IRMd.");
return -1;
}
- ipcp_flow(index)->port_id = port_id;
- ipcp_flow(index)->rb = NULL;
- ipcp_flow(index)->state = FLOW_PENDING;
- shim_data(_ipcp)->flows[index].r_sap = r_sap;
- for (i = 0; i < MAC_SIZE; i++) {
- shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i];
- }
+ eth_llc_data.fd_to_ef[fd].r_sap = r_sap;
+ memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("New flow request, port_id %d, remote SAP %d.", port_id, r_sap);
+ LOG_DBG("New flow request, fd %d, remote SAP %d.", fd, r_sap);
return 0;
}
-static int eth_llc_ipcp_port_alloc_reply(uint8_t ssap,
- uint8_t r_addr[MAC_SIZE],
- int dsap,
- int response)
+static int eth_llc_ipcp_sap_alloc_reply(uint8_t ssap,
+ uint8_t * r_addr,
+ int dsap,
+ int response)
{
- int index = -1;
int ret = 0;
- int port_id = -1;
- int i;
+ int fd = -1;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(& eth_llc_data.flows_lock);
- index = sap_to_index(ssap);
- if (index < 0) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ fd = eth_llc_data.ef_to_fd[dsap];
+ if (fd < 0) {
+ pthread_rwlock_unlock(& eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("No flow found with that SAP.");
return -1; /* -EFLOWNOTFOUND */
}
- if (ipcp_flow(index)->state != FLOW_PENDING) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1; /* -EFLOWNOTPENDING */
- }
-
- port_id = ipcp_flow(index)->port_id;
-
if (response) {
- destroy_ipcp_flow(index);
+ bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
} else {
- ipcp_flow(index)->state = FLOW_ALLOCATED;
- shim_data(_ipcp)->flows[index].r_sap = dsap;
- for (i = 0; i < MAC_SIZE; i++) {
- shim_data(_ipcp)->flows[index].r_addr[i] = r_addr[i];
- }
+ eth_llc_data.fd_to_ef[fd].r_sap = ssap;
+ memcpy(eth_llc_data.fd_to_ef[fd].r_addr, r_addr, MAC_SIZE);
}
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Flow reply, port_id %d, remote SAP %d.", port_id, dsap);
+ LOG_DBG("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap);
- if ((ret = ipcp_flow_alloc_reply(getpid(),
- port_id,
- response)) < 0) {
- return -1; /* -EPIPE */
- }
+ if ((ret = ipcp_flow_alloc_reply(fd, response)) < 0)
+ return -1;
return ret;
}
-static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap,
- uint8_t r_addr[MAC_SIZE])
+static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)
{
- int port_id = -1;
- int i = 0;
+ int fd = -1;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- i = sap_to_index(ssap);
- if (i < 0) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ fd = eth_llc_data.ef_to_fd[ssap];
+ if (fd < 0) {
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("No flow found for remote deallocation request.");
return 0;
}
- port_id = ipcp_flow(i)->port_id;
- destroy_ipcp_flow(i);
+ bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- irm_flow_dealloc(port_id);
+ flow_dealloc(fd);
- LOG_DBG("Flow with port_id %d deallocated.", port_id);
+ LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
}
-static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,
- size_t len,
- uint8_t r_addr[MAC_SIZE])
+static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)
{
shim_eth_llc_msg_t * msg = NULL;
@@ -621,27 +440,24 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,
switch (msg->code) {
case SHIM_ETH_LLC_MSG_CODE__FLOW_REQ:
- if (ipcp_data_is_in_registry(_ipcp->data,
- msg->dst_name)) {
- eth_llc_ipcp_port_req(msg->ssap,
- r_addr,
- msg->dst_name,
- msg->src_ae_name);
+ if (ipcp_data_is_in_registry(ipcpi.data, msg->dst_name)) {
+ eth_llc_ipcp_sap_req(msg->ssap,
+ r_addr,
+ msg->dst_name,
+ msg->src_ae_name);
}
break;
case SHIM_ETH_LLC_MSG_CODE__FLOW_REPLY:
- eth_llc_ipcp_port_alloc_reply(msg->ssap,
- r_addr,
- msg->dsap,
- msg->response);
+ eth_llc_ipcp_sap_alloc_reply(msg->ssap,
+ r_addr,
+ msg->dsap,
+ msg->response);
break;
case SHIM_ETH_LLC_MSG_CODE__FLOW_DEALLOC:
- eth_llc_ipcp_flow_dealloc_req(msg->ssap,
- r_addr);
+ eth_llc_ipcp_flow_dealloc_req(msg->ssap, r_addr);
break;
default:
- LOG_ERR("Unknown message received %d.",
- msg->code);
+ LOG_ERR("Unknown message received %d.", msg->code);
shim_eth_llc_msg__free_unpacked(msg, NULL);
return -1;
}
@@ -652,15 +468,11 @@ static int eth_llc_ipcp_mgmt_frame(uint8_t * buf,
static void * eth_llc_ipcp_sdu_reader(void * o)
{
- ssize_t index;
- struct rb_entry e;
uint8_t br_addr[MAC_SIZE];
+ uint16_t length;
uint8_t dsap;
uint8_t ssap;
- int i = 0;
- struct eth_llc_frame * llc_frame;
- uint16_t size;
-
+ int fd;
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
struct pollfd pfd;
int offset = 0;
@@ -670,43 +482,43 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
uint8_t buf[ETH_FRAME_SIZE];
int frame_len = 0;
#endif
+ struct eth_llc_frame * llc_frame;
memset(br_addr, 0xff, MAC_SIZE * sizeof(uint8_t));
while (true) {
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- header = (void *) shim_data(_ipcp)->rx_ring +
- (offset * SHM_RDRB_BLOCK_SIZE);
+ header = (void *) (eth_llc_data.rx_ring +
+ offset * SHM_RDRB_BLOCK_SIZE);
while (!(header->tp_status & TP_STATUS_USER)) {
- pfd.fd = shim_data(_ipcp)->s_fd;
+ pfd.fd = eth_llc_data.s_fd;
pfd.revents = 0;
pfd.events = POLLIN | POLLRDNORM | POLLERR;
if (poll(&pfd, 1, -1) <= 0) {
- LOG_ERR("Failed to poll: %s.", strerror(errno));
+ LOG_ERR("Failed to poll.");
continue;
}
- header = (void *) shim_data(_ipcp)->rx_ring +
- (offset * SHM_RDRB_BLOCK_SIZE);
+ header = (void *) (eth_llc_data.rx_ring +
+ offset * SHM_RDRB_BLOCK_SIZE);
}
- buf = (void * ) header + header->tp_mac;
+ buf = (uint8_t * ) header + header->tp_mac;
#else
- frame_len = recv(shim_data(_ipcp)->s_fd, buf,
+ frame_len = recv(eth_llc_data.s_fd, buf,
SHIM_ETH_LLC_MAX_SDU_SIZE, 0);
if (frame_len < 0) {
- LOG_ERR("Failed to recv frame.");
+ LOG_ERR("Failed to receive frame.");
continue;
}
#endif
-
llc_frame = (struct eth_llc_frame *) buf;
#ifdef __FreeBSD__
- if (memcmp(LLADDR(&shim_data(_ipcp)->device),
+ if (memcmp(LLADDR(&eth_llc_data.device),
#else
- if (memcmp(shim_data(_ipcp)->device.sll_addr,
+ if (memcmp(eth_llc_data.device.sll_addr,
#endif
&llc_frame->dst_hwaddr,
MAC_SIZE) &&
@@ -721,46 +533,29 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
dsap = reverse_bits(llc_frame->dsap);
ssap = reverse_bits(llc_frame->ssap);
- memcpy(&size, &llc_frame->size, sizeof(size));
+ memcpy(&length, &llc_frame->length, sizeof(length));
+ length = ntohs(length) - LLC_HEADER_SIZE;
if (ssap == MGMT_SAP && dsap == MGMT_SAP) {
eth_llc_ipcp_mgmt_frame(&llc_frame->payload,
- ntohs(size),
+ length,
llc_frame->src_hwaddr);
} else {
- pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock);
-
- i = addr_and_saps_to_index(llc_frame->src_hwaddr,
- ssap,
- dsap);
- if (i < 0) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->
- flows_lock);
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
+
+ fd = eth_llc_data.ef_to_fd[dsap];
+ if (fd < 0) {
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- offset = (offset + 1)
- & (SHM_BUFFER_SIZE - 1);
+ offset = (offset + 1) & (SHM_BUFFER_SIZE - 1);
header->tp_status = TP_STATUS_KERNEL;
#endif
continue;
}
- while ((index =
- shm_rdrbuff_write(shim_data(_ipcp)->rdrb,
- ipcp_flow(i)->api,
- 0,
- 0,
- &llc_frame->payload,
- ntohs(size)))
- < 0)
- ;
-
- e.index = index;
- e.port_id = ipcp_flow(i)->port_id;
-
- while (shm_ap_rbuff_write(ipcp_flow(i)->rb, &e) < 0)
- ;
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
+ flow_write(fd, &llc_frame->payload, length);
}
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
offset = (offset + 1) & (SHM_BUFFER_SIZE -1);
@@ -774,51 +569,34 @@ static void * eth_llc_ipcp_sdu_reader(void * o)
static void * eth_llc_ipcp_sdu_writer(void * o)
{
while (true) {
- struct rb_entry * e;
- int i;
- int len = 0;
- uint8_t * buf;
+ int fd;
+ struct shm_du_buff * sdb;
uint8_t ssap;
uint8_t dsap;
+ uint8_t r_addr[MAC_SIZE];
- e = shm_ap_rbuff_read(shim_data(_ipcp)->rb);
- if (e == NULL)
- continue;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- len = shm_rdrbuff_read((uint8_t **) &buf,
- shim_data(_ipcp)->rdrb,
- e->index);
- if (len <= 0) {
- free(e);
- LOG_ERR("Length of du map read was %d.", len);
- continue;
- }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&shim_data(_ipcp)->flows_lock);
-
- i = port_id_to_index(e->port_id);
- if (i < 0) {
- free(e);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
+ fd = ipcp_flow_read(&sdb);
+ if (fd < 0) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
continue;
}
- ssap = reverse_bits(shim_data(_ipcp)->flows[i].sap);
- dsap = reverse_bits(shim_data(_ipcp)->flows[i].r_sap);
-
- eth_llc_ipcp_send_frame(shim_data(_ipcp)->flows[i].r_addr,
- dsap, ssap, buf, len);
-
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_rdlock(&eth_llc_data.flows_lock);
- if (shim_data(_ipcp)->rdrb != NULL)
- shm_rdrbuff_remove(shim_data(_ipcp)->rdrb, e->index);
+ ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap);
+ dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap);
+ memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- free(e);
+ eth_llc_ipcp_send_frame(r_addr, dsap, ssap,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb)
+ - shm_du_buff_head(sdb));
+ ipcp_flow_del(sdb);
}
return (void *) 1;
@@ -826,10 +604,6 @@ static void * eth_llc_ipcp_sdu_writer(void * o)
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
- sigset_t sigset;
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGINT);
-
switch(sig) {
case SIGINT:
case SIGTERM:
@@ -838,12 +612,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
LOG_DBG("IPCP %d terminating by order of %d. Bye.",
getpid(), info->si_pid);
- pthread_rwlock_wrlock(&_ipcp->state_lock);
-
- ipcp_set_state(_ipcp, IPCP_SHUTDOWN);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ ipcp_set_state(IPCP_SHUTDOWN);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
default:
return;
@@ -852,7 +625,7 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
{
- int fd = -1;
+ int skfd = -1;
struct ifreq ifr;
int idx;
#ifdef __FreeBSD__
@@ -882,17 +655,10 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
memset(&ifr, 0, sizeof(ifr));
- fd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (fd < 0) {
- LOG_ERR("Failed to open socket.");
- return -1;
- }
-
memcpy(ifr.ifr_name, conf->if_name, strlen(conf->if_name));
#ifdef __FreeBSD__
if (getifaddrs(&ifaddr) < 0) {
- close(fd);
LOG_ERR("Could not get interfaces.");
return -1;
}
@@ -900,8 +666,7 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
for (ifa = ifaddr, idx = 0; ifa != NULL; ifa = ifa->ifa_next, ++idx) {
if (strcmp(ifa->ifa_name, conf->if_name))
continue;
- LOG_DBGF("Interface %s found.", conf->if_name);
-
+ LOG_DBG("Interface %s found.", conf->if_name);
memcpy(&ifr.ifr_addr, ifa->ifa_addr, sizeof(*ifa->ifa_addr));
break;
}
@@ -913,30 +678,33 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
}
freeifaddrs(ifaddr);
-
#else
- if (ioctl(fd, SIOCGIFHWADDR, &ifr)) {
- close(fd);
- LOG_ERR("Failed to ioctl: %s.", strerror(errno));
+ skfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (skfd < 0) {
+ LOG_ERR("Failed to open socket.");
+ return -1;
+ }
+
+ if (ioctl(skfd, SIOCGIFHWADDR, &ifr)) {
+ LOG_ERR("Failed to ioctl.");
+ close(skfd);
return -1;
}
+ close(skfd);
+
idx = if_nametoindex(conf->if_name);
if (idx == 0) {
LOG_ERR("Failed to retrieve interface index.");
+ close(skfd);
return -1;
}
#endif
-
- close(fd);
-
memset(&(device), 0, sizeof(device));
#ifdef __FreeBSD__
device.sdl_index = idx;
device.sdl_family = AF_LINK;
- memcpy(LLADDR(&device),
- ifr.ifr_addr.sa_data,
- MAC_SIZE * sizeof (uint8_t));
+ memcpy(LLADDR(&device), ifr.ifr_addr.sa_data, MAC_SIZE);
device.sdl_alen = MAC_SIZE;
/* TODO: replace socket calls with bpf for BSD */
LOG_MISSING;
@@ -944,23 +712,21 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
#else
device.sll_ifindex = idx;
device.sll_family = AF_PACKET;
- memcpy(device.sll_addr,
- ifr.ifr_hwaddr.sa_data,
- MAC_SIZE * sizeof (uint8_t));
+ memcpy(device.sll_addr, ifr.ifr_hwaddr.sa_data, MAC_SIZE);
device.sll_halen = MAC_SIZE;
device.sll_protocol = htons(ETH_P_ALL);
- fd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2));
+ skfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_802_2));
#endif
- if (fd < 0) {
- LOG_ERR("Failed to create socket: %s.", strerror(errno));
+ if (skfd < 0) {
+ LOG_ERR("Failed to create socket.");
return -1;
}
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
if (SHIM_ETH_LLC_MAX_SDU_SIZE > SHM_RDRB_BLOCK_SIZE) {
LOG_ERR("Max SDU size is bigger than DU map block size.");
- close(fd);
+ close(skfd);
return -1;
}
@@ -969,68 +735,68 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
req.tp_block_nr = SHM_BUFFER_SIZE;
req.tp_frame_nr = SHM_BUFFER_SIZE;
- if (setsockopt(fd, SOL_PACKET, PACKET_RX_RING,
+ if (setsockopt(skfd, SOL_PACKET, PACKET_RX_RING,
(void *) &req, sizeof(req))) {
LOG_ERR("Failed to set sockopt PACKET_RX_RING");
- close(fd);
+ close(skfd);
return -1;
}
- if (setsockopt(fd, SOL_PACKET, PACKET_TX_RING,
+ if (setsockopt(skfd, SOL_PACKET, PACKET_TX_RING,
(void *) &req, sizeof(req))) {
LOG_ERR("Failed to set sockopt PACKET_TX_RING");
- close(fd);
+ close(skfd);
return -1;
}
#endif
-
- if (bind(fd,(struct sockaddr *) &device, sizeof(device))) {
+ if (bind(skfd, (struct sockaddr *) &device, sizeof(device))) {
LOG_ERR("Failed to bind socket to interface");
- close(fd);
+ close(skfd);
return -1;
}
#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
- shim_data(_ipcp)->rx_ring = mmap(NULL,
- 2 * SHM_RDRB_BLOCK_SIZE
- * SHM_BUFFER_SIZE,
- PROT_READ | PROT_WRITE, MAP_SHARED,
- fd, 0);
- if (shim_data(_ipcp)->rx_ring == NULL) {
+ eth_llc_data.rx_ring = mmap(NULL, 2 * SHM_RDRB_BLOCK_SIZE
+ * SHM_BUFFER_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ skfd, 0);
+ if (eth_llc_data.rx_ring == NULL) {
LOG_ERR("Failed to mmap");
- close(fd);
+ close(skfd);
return -1;
}
- shim_data(_ipcp)->tx_ring = shim_data(_ipcp)->rx_ring
- + (SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE);
+ eth_llc_data.tx_ring = eth_llc_data.rx_ring
+ + SHM_RDRB_BLOCK_SIZE * SHM_BUFFER_SIZE;
#endif
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("IPCP in wrong state.");
- close(fd);
+ close(skfd);
return -1;
}
- shim_data(_ipcp)->s_fd = fd;
- shim_data(_ipcp)->device = device;
- shim_data(_ipcp)->tx_offset = 0;
+ eth_llc_data.s_fd = skfd;
+ eth_llc_data.device = device;
+#if defined(PACKET_RX_RING) && defined(PACKET_TX_RING)
+ eth_llc_data.tx_offset = 0;
+#endif
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
- pthread_create(&shim_data(_ipcp)->sdu_reader,
+ pthread_create(&eth_llc_data.sdu_reader,
NULL,
eth_llc_ipcp_sdu_reader,
NULL);
- pthread_create(&shim_data(_ipcp)->sdu_writer,
+ pthread_create(&eth_llc_data.sdu_writer,
NULL,
eth_llc_ipcp_sdu_writer,
NULL);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Bootstrapped shim IPCP over Ethernet with LLC with api %d.",
getpid());
@@ -1040,15 +806,15 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)
static int eth_llc_ipcp_name_reg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Registered %s.", name);
@@ -1057,25 +823,22 @@ static int eth_llc_ipcp_name_reg(char * name)
static int eth_llc_ipcp_name_unreg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
-static int eth_llc_ipcp_flow_alloc(pid_t n_api,
- int port_id,
+static int eth_llc_ipcp_flow_alloc(int fd,
char * dst_name,
char * src_ae_name,
enum qos_cube qos)
{
- struct shm_ap_rbuff * rb;
uint8_t ssap = 0;
uint8_t r_addr[MAC_SIZE];
- int index = 0;
LOG_INFO("Allocating flow to %s.", dst_name);
@@ -1083,182 +846,122 @@ static int eth_llc_ipcp_flow_alloc(pid_t n_api,
return -1;
if (qos != QOS_CUBE_BE)
- LOG_DBGF("QoS requested. Ethernet LLC can't do that. For now.");
-
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL)
- return -1; /* -ENORBUFF */
+ LOG_DBG("QoS requested. Ethernet LLC can't do that. For now.");
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- shm_ap_rbuff_close(rb);
- LOG_DBGF("Won't allocate flow with non-enrolled IPCP.");
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_DBG("Won't allocate flow with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
- index = bmp_allocate(shim_data(_ipcp)->indices);
- if (index < 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- shm_ap_rbuff_close(rb);
- return -1;
- }
-
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- ssap = bmp_allocate(shim_data(_ipcp)->saps);
+ ssap = bmp_allocate(eth_llc_data.saps);
if (ssap < 0) {
- shm_ap_rbuff_close(rb);
- bmp_release(shim_data(_ipcp)->indices, index);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- ipcp_flow(index)->port_id = port_id;
- ipcp_flow(index)->state = FLOW_PENDING;
- ipcp_flow(index)->rb = rb;
- shim_data(_ipcp)->flows[index].sap = ssap;
-
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- memset(r_addr, 0xff, MAC_SIZE * sizeof(uint8_t));
-
- if (eth_llc_ipcp_port_alloc(r_addr, ssap,
- dst_name,
- src_ae_name) < 0) {
- LOG_DBGF("Port alloc returned -1.");
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
- destroy_ipcp_flow(index);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ eth_llc_data.fd_to_ef[fd].sap = ssap;
+ eth_llc_data.ef_to_fd[ssap] = fd;
+
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ memset(r_addr, 0xff, MAC_SIZE);
+
+ if (eth_llc_ipcp_sap_alloc(r_addr, ssap, dst_name, src_ae_name) < 0) {
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
+ bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
+ eth_llc_data.fd_to_ef[fd].sap = -1;
+ eth_llc_data.ef_to_fd[ssap] = -1;
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- LOG_DBG("Pending flow with port_id %d on SAP %d.",
- port_id, ssap);
+ LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap);
- return index;
+ return 0;
}
-static int eth_llc_ipcp_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)
{
- struct shm_ap_rbuff * rb;
- int index = -1;
uint8_t ssap = 0;
+ uint8_t r_sap = 0;
+ uint8_t r_addr[MAC_SIZE];
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
-
- index = port_id_to_index(port_id);
- if (index < 0) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return -1;
- }
-
- if (ipcp_flow(index)->state != FLOW_PENDING) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow was not pending.");
- return -1;
- }
-
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- LOG_ERR("Could not open N + 1 ringbuffer.");
- ipcp_flow(index)->state = FLOW_NULL;
- ipcp_flow(index)->port_id = -1;
- bmp_release(shim_data(_ipcp)->indices, index);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1;
- }
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- ssap = bmp_allocate(shim_data(_ipcp)->saps);
+ ssap = bmp_allocate(eth_llc_data.saps);
if (ssap < 0) {
- ipcp_flow(index)->state = FLOW_NULL;
- ipcp_flow(index)->port_id = -1;
- shm_ap_rbuff_close(ipcp_flow(index)->rb);
- bmp_release(shim_data(_ipcp)->indices, index);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- ipcp_flow(index)->state = FLOW_ALLOCATED;
- ipcp_flow(index)->rb = rb;
- shim_data(_ipcp)->flows[index].sap = ssap;
-
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ eth_llc_data.fd_to_ef[fd].sap = ssap;
+ memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
+ r_sap = eth_llc_data.fd_to_ef[fd].r_sap;
+ eth_llc_data.ef_to_fd[ssap] = fd;
- if (eth_llc_ipcp_port_alloc_resp(shim_data(_ipcp)->flows[index].r_addr,
- shim_data(_ipcp)->flows[index].r_sap,
- ssap,
- response) < 0) {
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
- destroy_ipcp_flow(index);
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBGF("Could not send response.");
+ if (eth_llc_ipcp_sap_alloc_resp(r_addr, ssap, r_sap, response) < 0) {
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
+ bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- LOG_DBG("Accepted flow, port_id %d, SAP %d.", port_id, ssap);
+ LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);
return 0;
}
-static int eth_llc_ipcp_flow_dealloc(int port_id)
+static int eth_llc_ipcp_flow_dealloc(int fd)
{
- int index = -1;
uint8_t sap;
uint8_t addr[MAC_SIZE];
- int i;
int ret;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&eth_llc_data.flows_lock);
- index = port_id_to_index(port_id);
- if (index < 0) {
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return 0;
- }
+ sap = eth_llc_data.fd_to_ef[fd].r_sap;
+ memcpy(addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE);
- sap = shim_data(_ipcp)->flows[index].r_sap;
- for (i = 0; i < MAC_SIZE; i++) {
- addr[i] = shim_data(_ipcp)->flows[index].r_addr[i];
- }
+ bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap);
+ eth_llc_data.fd_to_ef[fd].sap = -1;
+ eth_llc_data.fd_to_ef[fd].r_sap = -1;
+ memset(&eth_llc_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE);
- destroy_ipcp_flow(index);
+ eth_llc_data.ef_to_fd[sap] = -1;
- pthread_rwlock_unlock(&shim_data(_ipcp)->flows_lock);
+ pthread_rwlock_unlock(&eth_llc_data.flows_lock);
- ret = eth_llc_ipcp_port_dealloc(addr, sap);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ ret = eth_llc_ipcp_sap_dealloc(addr, sap);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (ret < 0)
- LOG_DBGF("Could not notify remote.");
+ LOG_DBG("Could not notify remote.");
- LOG_DBG("Flow with port_id %d deallocated.", port_id);
+ LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
}
static struct ipcp_ops eth_llc_ops = {
.ipcp_bootstrap = eth_llc_ipcp_bootstrap,
- .ipcp_enroll = NULL, /* shim */
+ .ipcp_enroll = NULL,
.ipcp_name_reg = eth_llc_ipcp_name_reg,
.ipcp_name_unreg = eth_llc_ipcp_name_unreg,
.ipcp_flow_alloc = eth_llc_ipcp_flow_alloc,
@@ -1270,7 +973,6 @@ int main(int argc, char * argv[])
{
struct sigaction sig_act;
sigset_t sigset;
- int i = 0;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
@@ -1283,6 +985,14 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
+ if (eth_llc_data_init() < 0)
+ exit(EXIT_FAILURE);
+
+ if (ap_init(NULL) < 0) {
+ close_logfile();
+ exit(EXIT_FAILURE);
+ }
+
/* store the process id of the irmd */
irmd_api = atoi(argv[1]);
@@ -1298,35 +1008,13 @@ int main(int argc, char * argv[])
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- _ipcp = ipcp_instance_create();
- if (_ipcp == NULL) {
- LOG_ERR("Failed to create instance.");
- close_logfile();
- exit(EXIT_FAILURE);
- }
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
- _ipcp->data = (struct ipcp_data *) eth_llc_ipcp_data_create();
- if (_ipcp->data == NULL) {
- LOG_ERR("Failed to create instance data.");
- free(_ipcp);
+ if (ipcp_init(THIS_TYPE, &eth_llc_ops) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
- for (i = 0; i < AP_MAX_FLOWS; i++) {
- ipcp_flow(i)->rb = NULL;
- ipcp_flow(i)->port_id = -1;
- ipcp_flow(i)->state = FLOW_NULL;
- }
-
- _ipcp->ops = &eth_llc_ops;
- _ipcp->state = IPCP_INIT;
-
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
- pthread_create(&shim_data(_ipcp)->mainloop, NULL,
- ipcp_main_loop, _ipcp);
-
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
if (ipcp_create_r(getpid())) {
@@ -1335,17 +1023,17 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- pthread_join(shim_data(_ipcp)->mainloop, NULL);
+ ipcp_fini();
- pthread_cancel(shim_data(_ipcp)->sdu_reader);
- pthread_cancel(shim_data(_ipcp)->sdu_writer);
+ pthread_cancel(eth_llc_data.sdu_reader);
+ pthread_cancel(eth_llc_data.sdu_writer);
- pthread_join(shim_data(_ipcp)->sdu_writer, NULL);
- pthread_join(shim_data(_ipcp)->sdu_reader, NULL);
+ pthread_join(eth_llc_data.sdu_writer, NULL);
+ pthread_join(eth_llc_data.sdu_reader, NULL);
- eth_llc_ipcp_data_destroy();
+ ap_fini();
- free(_ipcp);
+ eth_llc_data_fini();
close_logfile();
diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c
index c35bd244..8c31e11a 100644
--- a/src/ipcpd/shim-udp/main.c
+++ b/src/ipcpd/shim-udp/main.c
@@ -22,18 +22,11 @@
#include <ouroboros/config.h>
#include "ipcp.h"
-#include "flow.h"
#include "shim_udp_config.h"
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/irm_config.h>
-#include <ouroboros/sockets.h>
-#include <ouroboros/bitmap.h>
-#include <ouroboros/flow.h>
#include <ouroboros/dev.h>
+#include <ouroboros/ipcp-dev.h>
#define OUROBOROS_PREFIX "ipcpd/shim-udp"
@@ -63,268 +56,93 @@ typedef ShimUdpMsg shim_udp_msg_t;
#define DNS_TTL 86400
#define FD_UPDATE_TIMEOUT 100 /* microseconds */
-#define shim_data(type) ((struct ipcp_udp_data *) type->data)
+#define local_ip (udp_data.s_saddr.sin_addr.s_addr)
-#define local_ip (((struct ipcp_udp_data *) \
- _ipcp->data)->s_saddr.sin_addr.s_addr)
+#define UDP_MAX_PORTS 0xFFFF
/* global for trapping signal */
int irmd_api;
-/* this IPCP's data */
-#ifdef MAKE_CHECK
-extern struct ipcp * _ipcp; /* defined in test */
-#else
-struct ipcp * _ipcp;
-#endif
-
-/*
- * copied from ouroboros/dev. The shim needs access to the internals
- * because it doesn't follow all steps necessary steps to get
- * the info
- */
-
-/* the shim needs access to these internals */
-struct shim_ap_data {
- pid_t api;
- struct shm_rdrbuff * rdrb;
- struct bmp * fds;
- struct shm_ap_rbuff * rb;
-
- struct flow flows[AP_MAX_FLOWS];
- pthread_rwlock_t flows_lock;
-
- pthread_t mainloop;
- pthread_t sduloop;
- pthread_t handler;
- pthread_t sdu_reader;
-
- bool fd_set_mod;
- pthread_cond_t fd_set_cond;
- pthread_mutex_t fd_set_lock;
-} * _ap_instance;
-
-static int shim_ap_init()
-{
- int i;
-
- _ap_instance = malloc(sizeof(struct shim_ap_data));
- if (_ap_instance == NULL) {
- return -1;
- }
-
- _ap_instance->api = getpid();
-
- _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (_ap_instance->fds == NULL) {
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rdrb = shm_rdrbuff_open();
- if (_ap_instance->rdrb == NULL) {
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rb = shm_ap_rbuff_create_n();
- if (_ap_instance->rb == NULL) {
- shm_rdrbuff_close(_ap_instance->rdrb);
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- for (i = 0; i < AP_MAX_FLOWS; i ++) {
- _ap_instance->flows[i].rb = NULL;
- _ap_instance->flows[i].port_id = -1;
- _ap_instance->flows[i].state = FLOW_NULL;
- }
-
- pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
- pthread_cond_init(&_ap_instance->fd_set_cond, NULL);
- pthread_mutex_init(&_ap_instance->fd_set_lock, NULL);
-
- return 0;
-}
-
-void shim_ap_fini()
-{
- int i = 0;
-
- if (_ap_instance == NULL)
- return;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (_ipcp->state != IPCP_SHUTDOWN)
- LOG_WARN("Cleaning up AP while not in shutdown.");
-
- if (_ap_instance->fds != NULL)
- bmp_destroy(_ap_instance->fds);
-
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
- shm_rdrbuff_remove(_ap_instance->rdrb, i);
-
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_close(_ap_instance->rdrb);
- if (_ap_instance->rb != NULL)
- shm_ap_rbuff_destroy(_ap_instance->rb);
+struct uf {
+ int udp;
+ int skfd;
+};
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+struct {
+ uint32_t ip_addr;
+ uint32_t dns_addr;
+ /* listen server */
+ struct sockaddr_in s_saddr;
+ int s_fd;
- for (i = 0; i < AP_MAX_FLOWS; i ++)
- if (_ap_instance->flows[i].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ fd_set flow_fd_s;
+ /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */
+ int uf_to_fd[FD_SETSIZE];
+ struct uf fd_to_uf[IRMD_MAX_FLOWS];
+ pthread_rwlock_t flows_lock;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_t sduloop;
+ pthread_t handler;
+ pthread_t sdu_reader;
- free(_ap_instance);
-}
+ bool fd_set_mod;
+ pthread_cond_t fd_set_cond;
+ pthread_mutex_t fd_set_lock;
+} udp_data;
-/* only call this under flows_lock */
-static int port_id_to_fd(int port_id)
+static void udp_data_init()
{
int i;
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (_ap_instance->flows[i].port_id == port_id
- && _ap_instance->flows[i].state != FLOW_NULL)
- return i;
- }
+ for (i = 0; i < FD_SETSIZE; ++i)
+ udp_data.uf_to_fd[i] = -1;
- return -1;
-}
-
-static ssize_t ipcp_udp_flow_write(int fd, void * buf, size_t count)
-{
- ssize_t index;
- struct rb_entry e;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
-
- index = shm_rdrbuff_write_b(_ap_instance->rdrb,
- _ap_instance->flows[fd].api,
- 0,
- 0,
- (uint8_t *) buf,
- count);
- if (index < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1;
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ udp_data.fd_to_uf[i].skfd = -1;
+ udp_data.fd_to_uf[i].udp = -1;
}
- e.index = index;
- e.port_id = _ap_instance->flows[fd].port_id;
-
- shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e);
+ FD_ZERO(&udp_data.flow_fd_s);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- return 0;
+ pthread_rwlock_init(&udp_data.flows_lock, NULL);
+ pthread_cond_init(&udp_data.fd_set_cond, NULL);
+ pthread_mutex_init(&udp_data.fd_set_lock, NULL);
}
-/*
- * end copy from dev.c
- */
-
-/* only call this under flows_lock */
-static int udp_port_to_fd(int udp_port)
+static void udp_data_fini()
{
- int i;
-
- struct sockaddr_in f_saddr;
- socklen_t len = sizeof(f_saddr);
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (_ap_instance->flows[i].state == FLOW_NULL)
- continue;
-
- if (getsockname(i, (struct sockaddr *) &f_saddr, &len) < 0)
- continue;
-
- if (f_saddr.sin_port == udp_port)
- return i;
- }
-
- return -1;
-}
-
-struct ipcp_udp_data {
- /* keep ipcp_data first for polymorphism */
- struct ipcp_data ipcp_data;
-
- uint32_t ip_addr;
- uint32_t dns_addr;
- /* listen server */
- struct sockaddr_in s_saddr;
- int s_fd;
-
- /* only modify under _ap_instance->flows_lock */
- fd_set flow_fd_s;
-};
-
-struct ipcp_udp_data * ipcp_udp_data_create()
-{
- struct ipcp_udp_data * udp_data;
- struct ipcp_data * data;
- enum ipcp_type ipcp_type;
-
- udp_data = malloc(sizeof(*udp_data));
- if (udp_data == NULL) {
- LOG_ERR("Failed to allocate.");
- return NULL;
- }
-
- ipcp_type = THIS_TYPE;
- data = (struct ipcp_data *) udp_data;
- if (ipcp_data_init(data, ipcp_type) == NULL) {
- free(udp_data);
- return NULL;
- }
-
- FD_ZERO(&udp_data->flow_fd_s);
-
- return udp_data;
+ pthread_rwlock_destroy(&udp_data.flows_lock);
+ pthread_mutex_destroy(&udp_data.fd_set_lock);
+ pthread_cond_destroy(&udp_data.fd_set_cond);
}
static void set_fd(int fd)
{
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
+ pthread_mutex_lock(&udp_data.fd_set_lock);
- _ap_instance->fd_set_mod = true;
- FD_SET(fd, &shim_data(_ipcp)->flow_fd_s);
+ udp_data.fd_set_mod = true;
+ FD_SET(fd, &udp_data.flow_fd_s);
- while (_ap_instance->fd_set_mod)
- pthread_cond_wait(&_ap_instance->fd_set_cond,
- &_ap_instance->fd_set_lock);
+ while (udp_data.fd_set_mod)
+ pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock);
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+ pthread_mutex_unlock(&udp_data.fd_set_lock);
}
static void clr_fd(int fd)
{
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
+ pthread_mutex_lock(&udp_data.fd_set_lock);
- _ap_instance->fd_set_mod = true;
- FD_CLR(fd, &shim_data(_ipcp)->flow_fd_s);
+ udp_data.fd_set_mod = true;
+ FD_CLR(fd, &udp_data.flow_fd_s);
- while (_ap_instance->fd_set_mod)
- pthread_cond_wait(&_ap_instance->fd_set_cond,
- &_ap_instance->fd_set_lock);
+ while (udp_data.fd_set_mod)
+ pthread_cond_wait(&udp_data.fd_set_cond, &udp_data.fd_set_lock);
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
+ pthread_mutex_unlock(&udp_data.fd_set_lock);
}
-
-static int send_shim_udp_msg(shim_udp_msg_t * msg,
- uint32_t dst_ip_addr)
+static int send_shim_udp_msg(shim_udp_msg_t * msg, uint32_t dst_ip_addr)
{
buffer_t buf;
struct sockaddr_in r_saddr;
@@ -340,13 +158,12 @@ static int send_shim_udp_msg(shim_udp_msg_t * msg,
}
buf.data = malloc(SHIM_UDP_MSG_SIZE);
- if (buf.data == NULL) {
+ if (buf.data == NULL)
return -1;
- }
shim_udp_msg__pack(msg, buf.data);
- if (sendto(shim_data(_ipcp)->s_fd,
+ if (sendto(udp_data.s_fd,
buf.data,
buf.len,
0,
@@ -409,8 +226,8 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
char * dst_name,
char * src_ae_name)
{
- int fd;
- int port_id;
+ int skfd;
+ int fd;
struct sockaddr_in f_saddr;
socklen_t f_saddr_len = sizeof(f_saddr);
@@ -418,7 +235,7 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
LOG_DBG("Port request arrived from UDP port %d",
ntohs(c_saddr->sin_port));
- if ((fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ if ((skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
LOG_ERR("Could not create UDP socket.");
return -1;
}
@@ -426,73 +243,72 @@ static int ipcp_udp_port_req(struct sockaddr_in * c_saddr,
memset((char *) &f_saddr, 0, sizeof(f_saddr));
f_saddr.sin_family = AF_INET;
f_saddr.sin_addr.s_addr = local_ip;
-
- /*
- * FIXME: we could have a port dedicated per registered AP
- * Not that critical for UDP, but will be for LLC
- */
-
f_saddr.sin_port = 0;
- if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
+ if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
LOG_ERR("Could not bind to socket.");
- close(fd);
+ close(skfd);
return -1;
}
- if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
+ if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
LOG_ERR("Could not get address from fd.");
return -1;
}
- /*
- * store the remote address in the file descriptor
- * this avoids having to store the sockaddr_in in
- * the flow structure
- */
-
- if (connect(fd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) {
+ /* connect stores the remote address in the file descriptor */
+ if (connect(skfd, (struct sockaddr *) c_saddr, sizeof(*c_saddr)) < 0) {
LOG_ERR("Could not connect to remote UDP client.");
- close(fd);
+ close(skfd);
return -1;
}
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
/* reply to IRM */
- port_id = ipcp_flow_req_arr(getpid(),
- dst_name,
- src_ae_name);
-
- if (port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_ERR("Could not get port id from IRMd");
- close(fd);
+ fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
+ if (fd < 0) {
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Could not get new flow from IRMd.");
+ close(skfd);
return -1;
}
- _ap_instance->flows[fd].port_id = port_id;
- _ap_instance->flows[fd].rb = NULL;
- _ap_instance->flows[fd].state = FLOW_PENDING;
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
+
+ udp_data.uf_to_fd[skfd] = fd;
+ udp_data.fd_to_uf[fd].skfd = skfd;
+ udp_data.fd_to_uf[fd].udp = f_saddr.sin_port;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Pending allocation request, port_id %d, UDP port (%d, %d).",
- port_id, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port));
+ LOG_DBG("Pending allocation request, fd %d, UDP port (%d, %d).",
+ fd, ntohs(f_saddr.sin_port), ntohs(c_saddr->sin_port));
return 0;
}
+/* returns the n flow descriptor */
+static int udp_port_to_fd(int udp_port)
+{
+ int i;
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ if (udp_data.fd_to_uf[i].udp == udp_port)
+ return i;
+
+ return -1;
+}
+
static int ipcp_udp_port_alloc_reply(int src_udp_port,
int dst_udp_port,
int response)
{
- int fd = -1;
- int ret = 0;
- int port_id = -1;
+ int fd = -1;
+ int ret = 0;
+ int skfd = -1;
struct sockaddr_in t_saddr;
socklen_t t_saddr_len = sizeof(t_saddr);
@@ -500,117 +316,82 @@ static int ipcp_udp_port_alloc_reply(int src_udp_port,
LOG_DBG("Received reply for flow on udp port %d.",
ntohs(dst_udp_port));
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
fd = udp_port_to_fd(dst_udp_port);
- if (fd == -1) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Unknown flow on UDP port %d.", ntohs(dst_udp_port));
- return -1; /* -EUNKNOWNFLOW */
- }
+ skfd = udp_data.fd_to_uf[fd].skfd;
- if (_ap_instance->flows[fd].state != FLOW_PENDING) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Flow on UDP port %d not pending.",
- ntohs(dst_udp_port));
- return -1; /* -EFLOWNOTPENDING */
- }
-
- port_id = _ap_instance->flows[fd].port_id;
-
- if (response) {
- _ap_instance->flows[fd].port_id = -1;
- _ap_instance->flows[fd].rb = NULL;
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
- _ap_instance->flows[fd].state = FLOW_NULL;
- } else {
- /* get the original address with the LISTEN PORT */
- if (getpeername(fd,
- (struct sockaddr *) &t_saddr,
- &t_saddr_len) < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Flow with port_id %d has no peer.", port_id);
- return -1;
- }
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- /* connect to the flow udp port */
- t_saddr.sin_port = src_udp_port;
+ /* get the original address with the LISTEN PORT */
+ if (getpeername(skfd, (struct sockaddr *) &t_saddr, &t_saddr_len) < 0) {
+ LOG_DBG("Flow with fd %d has no peer.", fd);
+ return -1;
+ }
- if (connect(fd,
- (struct sockaddr *) &t_saddr,
- sizeof(t_saddr)) < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- close(fd);
- return -1;
- }
+ /* connect to the flow udp port */
+ t_saddr.sin_port = src_udp_port;
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
+ if (connect(skfd, (struct sockaddr *) &t_saddr, sizeof(t_saddr)) < 0) {
+ close(skfd);
+ return -1;
}
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
+ set_fd(skfd);
- if ((ret = ipcp_flow_alloc_reply(getpid(),
- port_id,
- response)) < 0) {
- return -1; /* -EPIPE */
- }
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ if (ipcp_flow_alloc_reply(fd, response) < 0)
+ return -1;
- LOG_INFO("Flow allocation completed, UDP ports: (%d, %d).",
+ LOG_DBG("Flow allocation completed, UDP ports: (%d, %d).",
ntohs(dst_udp_port), ntohs(src_udp_port));
return ret;
-
}
static int ipcp_udp_flow_dealloc_req(int udp_port)
{
- int fd = -1;
- int port_id = -1;
-
- struct shm_ap_rbuff * rb;
+ int skfd = -1;
+ int fd = -1;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
fd = udp_port_to_fd(udp_port);
if (fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Could not find flow on UDP port %d.",
ntohs(udp_port));
return 0;
}
- clr_fd(fd);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ skfd = udp_data.fd_to_uf[fd].skfd;
- _ap_instance->flows[fd].state = FLOW_NULL;
- port_id = _ap_instance->flows[fd].port_id;
- _ap_instance->flows[fd].port_id = -1;
- rb = _ap_instance->flows[fd].rb;
- _ap_instance->flows[fd].rb = NULL;
+ udp_data.uf_to_fd[skfd] = -1;
+ udp_data.fd_to_uf[fd].udp = -1;
+ udp_data.fd_to_uf[fd].skfd = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- if (rb != NULL)
- shm_ap_rbuff_close(rb);
+ clr_fd(skfd);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- irm_flow_dealloc(port_id);
+ flow_dealloc(fd);
- close(fd);
+ close(skfd);
- LOG_DBG("Flow with port_id %d deallocated.", port_id);
+ LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
}
@@ -619,39 +400,28 @@ static void * ipcp_udp_listener()
{
uint8_t buf[SHIM_UDP_MSG_SIZE];
int n = 0;
-
struct sockaddr_in c_saddr;
+ int sfd = udp_data.s_fd;
while (true) {
- int sfd = 0;
shim_udp_msg_t * msg = NULL;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- sfd = shim_data(_ipcp)->s_fd;
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
memset(&buf, 0, SHIM_UDP_MSG_SIZE);
n = sizeof(c_saddr);
n = recvfrom(sfd, buf, SHIM_UDP_MSG_SIZE, 0,
(struct sockaddr *) &c_saddr, (unsigned *) &n);
-
- if (n < 0) {
+ if (n < 0)
continue;
- }
/* flow alloc request from other host */
if (gethostbyaddr((const char *) &c_saddr.sin_addr.s_addr,
sizeof(c_saddr.sin_addr.s_addr), AF_INET)
- == NULL) {
+ == NULL)
continue;
- }
msg = shim_udp_msg__unpack(NULL, n, buf);
- if (msg == NULL) {
+ if (msg == NULL)
continue;
- }
switch (msg->code) {
case SHIM_UDP_MSG_CODE__FLOW_REQ:
@@ -685,103 +455,80 @@ static void * ipcp_udp_listener()
static void * ipcp_udp_sdu_reader()
{
int n;
+ int skfd;
int fd;
+ /* FIXME: avoid this copy */
char buf[SHIM_UDP_MAX_SDU_SIZE];
struct sockaddr_in r_saddr;
fd_set read_fds;
int flags;
+ struct timeval tv = {0, FD_UPDATE_TIMEOUT};
while (true) {
- struct timeval tv = {0, FD_UPDATE_TIMEOUT};
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- pthread_mutex_lock(&_ap_instance->fd_set_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
+ pthread_mutex_lock(&udp_data.fd_set_lock);
- read_fds = shim_data(_ipcp)->flow_fd_s;
- _ap_instance->fd_set_mod = false;
- pthread_cond_broadcast(&_ap_instance->fd_set_cond);
+ read_fds = udp_data.flow_fd_s;
+ udp_data.fd_set_mod = false;
+ pthread_cond_broadcast(&udp_data.fd_set_cond);
- pthread_mutex_unlock(&_ap_instance->fd_set_lock);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_mutex_unlock(&udp_data.fd_set_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0) {
+ if (select(FD_SETSIZE, &read_fds, NULL, NULL, &tv) <= 0)
continue;
- }
- for (fd = 0; fd < FD_SETSIZE; ++fd) {
- if (!FD_ISSET(fd, &read_fds))
+ for (skfd = 0; skfd < FD_SETSIZE; ++skfd) {
+ if (!FD_ISSET(skfd, &read_fds))
continue;
- flags = fcntl(fd, F_GETFL, 0);
- fcntl(fd, F_SETFL, flags | O_NONBLOCK);
-
+ flags = fcntl(skfd, F_GETFL, 0);
+ fcntl(skfd, F_SETFL, flags | O_NONBLOCK);
+ fd = udp_data.uf_to_fd[skfd];
n = sizeof(r_saddr);
- if ((n = recvfrom(fd,
- buf,
+ if ((n = recvfrom(skfd,
+ &buf,
SHIM_UDP_MAX_SDU_SIZE,
0,
(struct sockaddr *) &r_saddr,
(unsigned *) &n)) <= 0)
continue;
- /* send the sdu to the correct port_id */
- ipcp_udp_flow_write(fd, buf, n);
+ /* send the sdu to the correct fd */
+ flow_write(fd, buf, n);
}
}
return (void *) 0;
}
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
static void * ipcp_udp_sdu_loop(void * o)
{
while (true) {
- struct rb_entry * e;
int fd;
- int len = 0;
- char * buf;
+ struct shm_du_buff * sdb;
- e = shm_ap_rbuff_read(_ap_instance->rb);
- if (e == NULL) {
+ fd = ipcp_flow_read(&sdb);
+ if (fd < 0)
continue;
- }
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- len = shm_rdrbuff_read((uint8_t **) &buf,
- _ap_instance->rdrb,
- e->index);
- if (len <= 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- free(e);
- continue;
- }
-
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ fd = udp_data.fd_to_uf[fd].skfd;
- fd = port_id_to_fd(e->port_id);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- if (fd == -1) {
- free(e);
- continue;
- }
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (send(fd, buf, len, 0) < 0)
+ if (send(fd,
+ shm_du_buff_head(sdb),
+ shm_du_buff_tail(sdb) - shm_du_buff_head(sdb),
+ 0) < 0)
LOG_ERR("Failed to send SDU.");
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_remove(_ap_instance->rdrb, e->index);
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- free(e);
+ ipcp_flow_del(sdb);
}
return (void *) 1;
@@ -789,23 +536,16 @@ static void * ipcp_udp_sdu_loop(void * o)
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
- sigset_t sigset;
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGINT);
-
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
if (info->si_pid == irmd_api) {
- LOG_DBG("IPCP %d terminating by order of %d. Bye.",
- getpid(), info->si_pid);
-
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(_ipcp, IPCP_SHUTDOWN);
+ ipcp_set_state(IPCP_SHUTDOWN);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
default:
return;
@@ -865,54 +605,52 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)
LOG_WARN("Failed to set SO_REUSEADDR.");
memset((char *) &s_saddr, 0, sizeof(s_saddr));
- shim_data(_ipcp)->s_saddr.sin_family = AF_INET;
- shim_data(_ipcp)->s_saddr.sin_addr.s_addr = conf->ip_addr;
- shim_data(_ipcp)->s_saddr.sin_port = LISTEN_PORT;
+ udp_data.s_saddr.sin_family = AF_INET;
+ udp_data.s_saddr.sin_addr.s_addr = conf->ip_addr;
+ udp_data.s_saddr.sin_port = LISTEN_PORT;
if (bind(fd,
- (struct sockaddr *) &shim_data(_ipcp)->s_saddr,
- sizeof(shim_data(_ipcp)->s_saddr)) < 0) {
+ (struct sockaddr *) &udp_data.s_saddr,
+ sizeof(udp_data.s_saddr)) < 0) {
LOG_ERR("Couldn't bind to %s.", ipstr);
close(fd);
return -1;
}
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("IPCP in wrong state.");
close(fd);
return -1;
}
- shim_data(_ipcp)->s_fd = fd;
- shim_data(_ipcp)->ip_addr = conf->ip_addr;
- shim_data(_ipcp)->dns_addr = conf->dns_addr;
+ udp_data.s_fd = fd;
+ udp_data.ip_addr = conf->ip_addr;
+ udp_data.dns_addr = conf->dns_addr;
- FD_CLR(shim_data(_ipcp)->s_fd, &shim_data(_ipcp)->flow_fd_s);
+ FD_CLR(udp_data.s_fd, &udp_data.flow_fd_s);
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
- pthread_create(&_ap_instance->handler,
+ pthread_create(&udp_data.handler,
NULL,
ipcp_udp_listener,
NULL);
- pthread_create(&_ap_instance->sdu_reader,
+ pthread_create(&udp_data.sdu_reader,
NULL,
ipcp_udp_sdu_reader,
NULL);
- pthread_create(&_ap_instance->sduloop,
+ pthread_create(&udp_data.sduloop,
NULL,
ipcp_udp_sdu_loop,
NULL);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.",
- getpid());
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_DBG("Bootstrapped shim IPCP over UDP with api %d.", getpid());
LOG_DBG("Bound to IP address %s.", ipstr);
LOG_DBG("DNS server address is %s.", dnsstr);
@@ -1059,10 +797,10 @@ static int ipcp_udp_name_reg(char * name)
return -1;
}
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to add %s to local registry.", name);
return -1;
}
@@ -1070,12 +808,12 @@ static int ipcp_udp_name_reg(char * name)
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
/* register application with DNS server */
- dns_addr = shim_data(_ipcp)->dns_addr;
+ dns_addr = udp_data.dns_addr;
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (dns_addr != 0) {
- ip_addr = shim_data(_ipcp)->ip_addr;
+ ip_addr = udp_data.ip_addr;
if (inet_ntop(AF_INET, &ip_addr,
ipstr, INET_ADDRSTRLEN) == NULL) {
@@ -1091,14 +829,14 @@ static int ipcp_udp_name_reg(char * name)
dnsstr, name, DNS_TTL, ipstr);
if (ddns_send(cmd)) {
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
}
#else
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
#endif
LOG_DBG("Registered %s.", name);
@@ -1122,11 +860,11 @@ static int ipcp_udp_name_unreg(char * name)
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
/* unregister application with DNS server */
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- dns_addr = shim_data(_ipcp)->dns_addr;
+ dns_addr = udp_data.dns_addr;
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (dns_addr != 0) {
if (inet_ntop(AF_INET, &dns_addr, dnsstr, INET_ADDRSTRLEN)
@@ -1140,17 +878,16 @@ static int ipcp_udp_name_unreg(char * name)
}
#endif
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
-static int ipcp_udp_flow_alloc(pid_t n_api,
- int port_id,
+static int ipcp_udp_flow_alloc(int fd,
char * dst_name,
char * src_ae_name,
enum qos_cube qos)
@@ -1158,15 +895,13 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
struct sockaddr_in r_saddr; /* server address */
struct sockaddr_in f_saddr; /* flow */
socklen_t f_saddr_len = sizeof(f_saddr);
- int fd;
+ int skfd;
struct hostent * h;
uint32_t ip_addr = 0;
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
uint32_t dns_addr = 0;
#endif
- struct shm_ap_rbuff * rb;
-
- LOG_INFO("Allocating flow to %s.", dst_name);
+ LOG_DBG("Allocating flow to %s.", dst_name);
if (dst_name == NULL || src_ae_name == NULL)
return -1;
@@ -1179,11 +914,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
if (qos != QOS_CUBE_BE)
LOG_DBG("QoS requested. UDP/IP can't do that.");
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL)
- return -1; /* -ENORBUFF */
-
- fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ skfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
/* this socket is for the flow */
memset((char *) &f_saddr, 0, sizeof(f_saddr));
@@ -1191,31 +922,31 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
f_saddr.sin_addr.s_addr = local_ip;
f_saddr.sin_port = 0;
- if (bind(fd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
- close(fd);
+ if (bind(skfd, (struct sockaddr *) &f_saddr, sizeof(f_saddr)) < 0) {
+ close(skfd);
return -1;
}
- if (getsockname(fd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
+ if (getsockname(skfd, (struct sockaddr *) &f_saddr, &f_saddr_len) < 0) {
LOG_ERR("Could not get address from fd.");
- close(fd);
+ close(skfd);
return -1;
}
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Won't allocate flow with non-enrolled IPCP.");
- close(fd);
+ close(skfd);
return -1; /* -ENOTENROLLED */
}
#ifdef CONFIG_OUROBOROS_ENABLE_DNS
- dns_addr = shim_data(_ipcp)->dns_addr;
+ dns_addr = udp_data.dns_addr;
if (dns_addr != 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
ip_addr = ddns_resolve(dst_name, dns_addr);
if (ip_addr == 0) {
@@ -1224,11 +955,11 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
return -1;
}
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBG("Won't allocate flow with non-enrolled IPCP.");
- close(fd);
+ close(skfd);
return -1; /* -ENOTENROLLED */
}
} else {
@@ -1236,7 +967,7 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
h = gethostbyname(dst_name);
if (h == NULL) {
LOG_DBG("Could not resolve %s.", dst_name);
- close(fd);
+ close(skfd);
return -1;
}
@@ -1251,60 +982,46 @@ static int ipcp_udp_flow_alloc(pid_t n_api,
r_saddr.sin_addr.s_addr = ip_addr;
r_saddr.sin_port = LISTEN_PORT;
- if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
- close(fd);
+ if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
+ close(skfd);
return -1;
}
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- _ap_instance->flows[fd].port_id = port_id;
- _ap_instance->flows[fd].state = FLOW_PENDING;
- _ap_instance->flows[fd].rb = rb;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
- set_fd(fd);
+ udp_data.fd_to_uf[fd].udp = f_saddr.sin_port;
+ udp_data.fd_to_uf[fd].skfd = skfd;
+ udp_data.uf_to_fd[skfd] = fd;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (ipcp_udp_port_alloc(ip_addr,
f_saddr.sin_port,
dst_name,
src_ae_name) < 0) {
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
-
- clr_fd(fd);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
- _ap_instance->flows[fd].port_id = -1;
- _ap_instance->flows[fd].state = FLOW_NULL;
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
- _ap_instance->flows[fd].rb = NULL;
+ udp_data.fd_to_uf[fd].udp = -1;
+ udp_data.fd_to_uf[fd].skfd = -1;
+ udp_data.uf_to_fd[skfd] = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- close(fd);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ close(skfd);
return -1;
}
- LOG_DBG("Flow pending on port_id %d.", port_id);
+ LOG_DBG("Flow pending on fd %d, UDP port %d.",
+ fd, ntohs(f_saddr.sin_port));
return fd;
}
-static int ipcp_udp_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+static int ipcp_udp_flow_alloc_resp(int fd, int response)
{
- struct shm_ap_rbuff * rb;
- int fd = -1;
+ int skfd = -1;
struct sockaddr_in f_saddr;
struct sockaddr_in r_saddr;
socklen_t len = sizeof(r_saddr);
@@ -1312,148 +1029,95 @@ static int ipcp_udp_flow_alloc_resp(pid_t n_api,
if (response)
return 0;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- /* awaken pending flow */
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
- fd = port_id_to_fd(port_id);
- if (fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Could not find flow with port_id %d.", port_id);
- return -1;
- }
+ skfd = udp_data.fd_to_uf[fd].skfd;
- if (_ap_instance->flows[fd].state != FLOW_PENDING) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Flow was not pending.");
+ if (getsockname(skfd, (struct sockaddr *) &f_saddr, &len) < 0) {
+ LOG_DBG("Socket with fd %d has no address.", skfd);
return -1;
}
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- LOG_ERR("Could not open N + 1 ringbuffer.");
- _ap_instance->flows[fd].state = FLOW_NULL;
- _ap_instance->flows[fd].port_id = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (getpeername(skfd, (struct sockaddr *) &r_saddr, &len) < 0) {
+ LOG_DBG("Socket with fd %d has no peer.", skfd);
return -1;
}
- if (getsockname(fd, (struct sockaddr *) &f_saddr, &len) < 0) {
- LOG_DBG("Flow with port_id %d has no socket.", port_id);
- return -1;
- }
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- if (getpeername(fd, (struct sockaddr *) &r_saddr, &len) < 0) {
- LOG_DBG("Flow with port_id %d has no peer.", port_id);
- return -1;
- }
+ set_fd(skfd);
- _ap_instance->flows[fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[fd].rb = rb;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
-
- set_fd(fd);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
if (ipcp_udp_port_alloc_resp(r_saddr.sin_addr.s_addr,
f_saddr.sin_port,
r_saddr.sin_port,
response) < 0) {
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
-
- clr_fd(fd);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- _ap_instance->flows[fd].state = FLOW_NULL;
- shm_ap_rbuff_close(_ap_instance->flows[fd].rb);
- _ap_instance->flows[fd].rb = NULL;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
+ clr_fd(skfd);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- LOG_DBG("Accepted flow, port_id %d on UDP fd %d.", port_id, fd);
+ LOG_DBG("Accepted flow, fd %d on UDP port %d.",
+ fd, ntohs(f_saddr.sin_port));
return 0;
}
-static int ipcp_udp_flow_dealloc(int port_id)
+static int ipcp_udp_flow_dealloc(int fd)
{
- int fd = -1;
+ int skfd = -1;
int remote_udp = -1;
- struct shm_ap_rbuff * rb;
struct sockaddr_in r_saddr;
socklen_t r_saddr_len = sizeof(r_saddr);
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&udp_data.flows_lock);
- fd = port_id_to_fd(port_id);
- if (fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Could not find flow with port_id %d.", port_id);
- return 0;
- }
+ skfd = udp_data.fd_to_uf[fd].skfd;
- clr_fd(fd);
+ udp_data.uf_to_fd[skfd] = -1;
+ udp_data.fd_to_uf[fd].udp = -1;
+ udp_data.fd_to_uf[fd].skfd = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_rdlock(&udp_data.flows_lock);
- _ap_instance->flows[fd].state = FLOW_NULL;
- _ap_instance->flows[fd].port_id = -1;
- rb = _ap_instance->flows[fd].rb;
- _ap_instance->flows[fd].rb = NULL;
+ clr_fd(skfd);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
+ pthread_rwlock_unlock(&udp_data.flows_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if (rb != NULL)
- shm_ap_rbuff_close(rb);
-
- if (getpeername(fd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBG("Flow with port_id %d has no peer.", port_id);
- close(fd);
+ if (getpeername(skfd, (struct sockaddr *) &r_saddr, &r_saddr_len) < 0) {
+ LOG_DBG("Socket with fd %d has no peer.", skfd);
+ close(skfd);
return 0;
}
remote_udp = r_saddr.sin_port;
r_saddr.sin_port = LISTEN_PORT;
- if (connect(fd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- close(fd);
+ if (connect(skfd, (struct sockaddr *) &r_saddr, sizeof(r_saddr)) < 0) {
+ close(skfd);
return 0 ;
}
- if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr,
- remote_udp) < 0) {
+ if (ipcp_udp_port_dealloc(r_saddr.sin_addr.s_addr, remote_udp) < 0) {
LOG_DBG("Could not notify remote.");
- pthread_rwlock_unlock(&_ipcp->state_lock);
- close(fd);
+ close(skfd);
return 0;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- close(fd);
+ close(skfd);
- LOG_DBG("Flow with port_id %d deallocated.", port_id);
+ LOG_DBG("Flow with fd %d deallocated.", fd);
return 0;
}
@@ -1468,31 +1132,6 @@ static struct ipcp_ops udp_ops = {
.ipcp_flow_dealloc = ipcp_udp_flow_dealloc
};
-static struct ipcp * ipcp_udp_create()
-{
- struct ipcp * i;
- struct ipcp_udp_data * data;
-
- i = ipcp_instance_create();
- if (i == NULL)
- return NULL;
-
- data = ipcp_udp_data_create();
- if (data == NULL) {
- free(i);
- return NULL;
- }
-
- i->data = (struct ipcp_data *) data;
- i->ops = &udp_ops;
-
- i->state = IPCP_INIT;
-
- return i;
-}
-
-#ifndef MAKE_CHECK
-
int main(int argc, char * argv[])
{
struct sigaction sig_act;
@@ -1508,7 +1147,9 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- if (shim_ap_init() < 0) {
+ udp_data_init();
+
+ if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
@@ -1528,17 +1169,13 @@ int main(int argc, char * argv[])
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- _ipcp = ipcp_udp_create();
- if (_ipcp == NULL) {
- LOG_ERR("Failed to create IPCP.");
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+ if (ipcp_init(THIS_TYPE, &udp_ops) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
- pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
-
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
if (ipcp_create_r(getpid())) {
@@ -1547,24 +1184,21 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- pthread_join(_ap_instance->mainloop, NULL);
+ ipcp_fini();
- pthread_cancel(_ap_instance->handler);
- pthread_cancel(_ap_instance->sdu_reader);
- pthread_cancel(_ap_instance->sduloop);
+ pthread_cancel(udp_data.handler);
+ pthread_cancel(udp_data.sdu_reader);
+ pthread_cancel(udp_data.sduloop);
- pthread_join(_ap_instance->sduloop, NULL);
- pthread_join(_ap_instance->handler, NULL);
- pthread_join(_ap_instance->sdu_reader, NULL);
+ pthread_join(udp_data.sduloop, NULL);
+ pthread_join(udp_data.handler, NULL);
+ pthread_join(udp_data.sdu_reader, NULL);
- shim_ap_fini();
+ ap_fini();
- ipcp_data_destroy(_ipcp->data);
- free(_ipcp);
+ udp_data_fini();
close_logfile();
exit(EXIT_SUCCESS);
}
-
-#endif /* MAKE_CHECK */
diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt
index 05919326..16b53414 100644
--- a/src/irmd/CMakeLists.txt
+++ b/src/irmd/CMakeLists.txt
@@ -8,6 +8,7 @@ set(SOURCE_FILES
# Add source files here
api_table.c
apn_table.c
+ ipcp.c
irm_flow.c
main.c
registry.c
diff --git a/src/lib/ipcp.c b/src/irmd/ipcp.c
index 01741121..f79e6caf 100644
--- a/src/lib/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -20,16 +20,17 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "lib-ipcp"
-
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/common.h>
-#include <ouroboros/logs.h>
#include <ouroboros/utils.h>
#include <ouroboros/sockets.h>
+#define OUROBOROS_PREFIX "irmd/ipcp"
+
+#include <ouroboros/logs.h>
+
+#include "ipcp.h"
+
#include <stdlib.h>
#include <string.h>
#include <signal.h>
@@ -42,11 +43,10 @@
static void close_ptr(void * o)
{
- close(*((int *) o));
+ close(*(int *) o);
}
-static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
- ipcp_msg_t * msg)
+ipcp_msg_t * send_recv_ipcp_msg(pid_t api, ipcp_msg_t * msg)
{
int sockfd = 0;
buffer_t buf;
@@ -177,31 +177,6 @@ pid_t ipcp_create(enum ipcp_type ipcp_type)
exit(EXIT_FAILURE);
}
-int ipcp_create_r(pid_t api)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_api = true;
- msg.api = api;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
int ipcp_destroy(pid_t api)
{
int status;
@@ -399,68 +374,6 @@ int ipcp_flow_alloc_resp(pid_t api,
return ret;
}
-int ipcp_flow_req_arr(pid_t api,
- char * dst_name,
- char * src_ae_name)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int port_id = -1;
-
- if (dst_name == NULL || src_ae_name == NULL)
- return -EINVAL;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_api = true;
- msg.api = api;
- msg.dst_name = dst_name;
- msg.ae_name = src_ae_name;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (!recv_msg->has_port_id) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- port_id = recv_msg->port_id;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return port_id;
-}
-
-int ipcp_flow_alloc_reply(pid_t api,
- int port_id,
- int response)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.port_id = port_id;
- msg.has_port_id = true;
- msg.response = response;
- msg.has_response = true;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-
int ipcp_flow_dealloc(pid_t api,
int port_id)
{
@@ -487,28 +400,3 @@ int ipcp_flow_dealloc(pid_t api,
return ret;
}
-
-int irm_flow_dealloc(int port_id)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_port_id = true;
- msg.port_id = port_id;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return 0;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return 0;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
new file mode 100644
index 00000000..930695fa
--- /dev/null
+++ b/src/irmd/ipcp.h
@@ -0,0 +1,62 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The API for the IRM to instruct IPCPs
+ *
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include <ouroboros/irm_config.h>
+#include <ouroboros/sockets.h>
+#include <ouroboros/shared.h>
+
+#include <sys/types.h>
+
+#ifndef OUROBOROS_IPCP_H
+#define OUROBOROS_IPCP_H
+
+/* Returns the process id */
+pid_t ipcp_create(enum ipcp_type ipcp_type);
+
+int ipcp_destroy(pid_t api);
+
+int ipcp_enroll(pid_t api,
+ char * dif_name);
+
+int ipcp_bootstrap(pid_t api,
+ dif_config_msg_t * conf);
+
+int ipcp_name_reg(pid_t api,
+ char * name);
+int ipcp_name_unreg(pid_t api,
+ char * name);
+
+int ipcp_flow_alloc(pid_t api,
+ int port_id,
+ pid_t n_api,
+ char * dst_name,
+ char * src_ae_name,
+ enum qos_cube qos);
+int ipcp_flow_alloc_resp(pid_t api,
+ int port_id,
+ pid_t n_api,
+ int response);
+
+int ipcp_flow_dealloc(pid_t api,
+ int port_id);
+
+#endif /* OUROBOROS_IPCP_H */
diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c
index d9fe3fb3..b99c6f97 100644
--- a/src/irmd/irm_flow.c
+++ b/src/irmd/irm_flow.c
@@ -58,6 +58,11 @@ void irm_flow_destroy(struct irm_flow * f)
{
pthread_mutex_lock(&f->state_lock);
+ if (f->state == FLOW_DESTROY) {
+ pthread_mutex_unlock(&f->state_lock);
+ return;
+ }
+
if (f->state == FLOW_PENDING)
f->state = FLOW_DESTROY;
else
@@ -75,3 +80,45 @@ void irm_flow_destroy(struct irm_flow * f)
free(f);
}
+
+enum flow_state irm_flow_get_state(struct irm_flow * f)
+{
+ enum flow_state state;
+
+ pthread_mutex_lock(&f->state_lock);
+
+ state = f->state;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ return state;
+}
+
+void irm_flow_set_state(struct irm_flow * f, enum flow_state state)
+{
+ pthread_mutex_lock(&f->state_lock);
+
+ f->state = state;
+ pthread_cond_broadcast(&f->state_cond);
+
+ pthread_mutex_unlock(&f->state_lock);
+}
+
+enum flow_state irm_flow_wait_state(struct irm_flow * f, enum flow_state state)
+{
+ pthread_mutex_lock(&f->state_lock);
+
+ while (!(f->state == state || f->state == FLOW_DESTROY))
+ pthread_cond_wait(&f->state_cond, &f->state_lock);
+
+ if (state == FLOW_DESTROY) {
+ f->state = FLOW_NULL;
+ pthread_cond_broadcast(&f->state_cond);
+ }
+
+ state = f->state;
+
+ pthread_mutex_unlock(&f->state_lock);
+
+ return state;
+}
diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h
index b7e5a1be..db6598bf 100644
--- a/src/irmd/irm_flow.h
+++ b/src/irmd/irm_flow.h
@@ -24,12 +24,18 @@
#define OUROBOROS_IRMD_IRM_FLOW_H
#include <ouroboros/list.h>
-#include <ouroboros/shared.h>
#include <sys/types.h>
#include <pthread.h>
#include <time.h>
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED,
+ FLOW_DESTROY
+};
+
struct irm_flow {
struct list_head next;
@@ -46,6 +52,16 @@ struct irm_flow {
};
struct irm_flow * irm_flow_create();
+
void irm_flow_destroy(struct irm_flow * f);
+enum flow_state irm_flow_get_state(struct irm_flow * f);
+
+
+void irm_flow_set_state(struct irm_flow * f,
+ enum flow_state state);
+
+enum flow_state irm_flow_wait_state(struct irm_flow * f,
+ enum flow_state state);
+
#endif /* OUROBOROS_IRMD_IRM_FLOW_H */
diff --git a/src/irmd/main.c b/src/irmd/main.c
index cc9160bf..523741ef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -21,14 +21,9 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "irmd"
-
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
-#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/nsm.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
@@ -36,14 +31,19 @@
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#define OUROBOROS_PREFIX "irmd"
+
+#include <ouroboros/logs.h>
+
+
#include "utils.h"
#include "registry.h"
#include "irm_flow.h"
#include "api_table.h"
+#include "ipcp.h"
#include <sys/socket.h>
#include <sys/un.h>
@@ -60,10 +60,12 @@
struct ipcp_entry {
struct list_head next;
+
char * name;
pid_t api;
enum ipcp_type type;
char * dif_name;
+
pthread_cond_t init_cond;
pthread_mutex_t init_lock;
bool init;
@@ -100,7 +102,7 @@ struct irm {
pthread_t irm_sanitize;
pthread_t shm_sanitize;
-} * irmd = NULL;
+} * irmd;
static struct irm_flow * get_irm_flow(int port_id)
{
@@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->port_id == port_id)
return e;
}
@@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->n_api == n_api)
return e;
}
@@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)
return NULL;
}
- LOG_INFO("New instance (%d) of %s added.", api, e->apn);
-
+ LOG_DBG("New instance (%d) of %s added.", api, e->apn);
LOG_DBG("This instance accepts flows for:");
list_for_each(p, &e->names) {
struct str_el * s = list_entry(p, struct str_el, next);
@@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api,
struct api_entry * e = NULL;
int ret = -1;
- pid_t f_n_1_api;
- pid_t f_n_api;
+ pid_t api_n1;
+ pid_t api_n;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api,
return -1;
}
- f_n_api = f->n_api;
- f_n_1_api = f->n_1_api;
-
- if (!response) {
- f->state = FLOW_ALLOCATED;
- pthread_cond_signal(&f->state_cond);
- }
+ api_n = f->n_api;
+ api_n1 = f->n_1_api;
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- ret = ipcp_flow_alloc_resp(f_n_1_api,
- port_id,
- f_n_api,
- response);
+ ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response);
+
+ if (!(response || ret))
+ irm_flow_set_state(f, FLOW_ALLOCATED);
+
return ret;
}
@@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api,
{
struct irm_flow * f;
pid_t ipcp;
+ int port_id;
/* FIXME: Map qos_spec to qos_cube */
@@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api,
f->n_api = api;
f->state = FLOW_PENDING;
+
if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
LOG_WARN("Failed to set timestamp.");
@@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- f->port_id = bmp_allocate(irmd->port_ids);
+ port_id = f->port_id = bmp_allocate(irmd->port_ids);
f->n_1_api = ipcp;
list_add(&f->next, &irmd->irm_flows);
@@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- if (ipcp_flow_alloc(ipcp,
- f->port_id,
- f->n_api,
- dst_name,
- src_ae_name,
- QOS_CUBE_BE) < 0) {
+ if (ipcp_flow_alloc(ipcp, port_id, api,
+ dst_name, src_ae_name, QOS_CUBE_BE) < 0) {
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
list_del(&f->next);
bmp_release(irmd->port_ids, f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(f);
+ irm_flow_destroy(f);
return NULL;
}
@@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id)
f = get_irm_flow(port_id);
if (f == NULL) {
- LOG_ERR("Could not find port %d.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_ERR("Could not find port %d.", port_id);
return -1;
}
- if (f->state == FLOW_NULL) {
- LOG_INFO("Port %d is deprecated.", port_id);
+ if (irm_flow_get_state(f) == FLOW_NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_INFO("Port %d is deprecated.", port_id);
return -1;
}
- if (f->state == FLOW_ALLOCATED) {
+ if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- pthread_mutex_lock(&f->state_lock);
-
- while (f->state == FLOW_PENDING)
- pthread_cond_wait(&f->state_cond, &f->state_lock);
-
- pthread_mutex_unlock(&f->state_lock);
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_ALLOCATED) {
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
+ if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED)
return 0;
- }
-
- f->state = FLOW_NULL;
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
-static int flow_dealloc(int port_id)
+static int flow_dealloc(pid_t api, int port_id)
{
pid_t n_1_api;
int ret = 0;
@@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
- ret = ipcp_flow_dealloc(n_1_api, port_id);
+ if (api != n_1_api)
+ ret = ipcp_flow_dealloc(n_1_api, port_id);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
+ LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
+ api, dst_name, ae_name);
+
f = irm_flow_create();
if (f == NULL) {
LOG_ERR("Failed to create irm_flow.");
@@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
return f;
}
-static int flow_alloc_reply(int port_id,
- int response)
+static int flow_alloc_reply(int port_id, int response)
{
struct irm_flow * f;
@@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id,
return -1;
}
- pthread_mutex_lock(&f->state_lock);
-
if (!response)
- f->state = FLOW_ALLOCATED;
-
+ irm_flow_set_state(f, FLOW_ALLOCATED);
else
- f->state = FLOW_NULL;
-
- if (pthread_cond_signal(&f->state_cond))
- LOG_ERR("Failed to send signal.");
-
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id,
return 0;
}
-static int flow_dealloc_ipcp(int port_id)
-{
- struct irm_flow * f = NULL;
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
-
- f = get_irm_flow(port_id);
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- return 0;
- }
-
- list_del(&f->next);
-
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
-
- irm_flow_destroy(f);
-
- return 0;
-}
-
static void irm_destroy()
{
struct list_head * p;
@@ -1729,46 +1671,35 @@ void * irm_sanitize()
struct irm_flow * f =
list_entry(p, struct irm_flow, next);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_PENDING &&
- ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
+ if (irm_flow_get_state(f) == FLOW_PENDING
+ && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
LOG_INFO("Pending port_id %d timed out.",
f->port_id);
- f->state = FLOW_NULL;
- pthread_cond_signal(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
continue;
}
- pthread_mutex_unlock(&f->state_lock);
-
if (kill(f->n_api, 0) < 0) {
- struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open_s(f->n_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_api);
bmp_release(irmd->port_ids, f->port_id);
-
list_del(&f->next);
LOG_INFO("AP-I %d gone, flow %d deallocated.",
f->n_api, f->port_id);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- if (n_rb != NULL)
- shm_ap_rbuff_destroy(n_rb);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
continue;
}
if (kill(f->n_1_api, 0) < 0) {
- struct shm_ap_rbuff * n_1_rb_s =
- shm_ap_rbuff_open_s(f->n_1_api);
- struct shm_ap_rbuff * n_1_rb_n =
- shm_ap_rbuff_open_n(f->n_1_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_1_api);
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
- if (n_1_rb_n != NULL)
- shm_ap_rbuff_destroy(n_1_rb_n);
- if (n_1_rb_s != NULL)
- shm_ap_rbuff_destroy(n_1_rb_s);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
}
}
@@ -1939,7 +1870,7 @@ void * mainloop()
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->port_id);
+ ret_msg.result = flow_dealloc(msg->api, msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
e = flow_req_arr(msg->api,
@@ -1950,7 +1881,6 @@ void * mainloop()
ret_msg.result = -1;
break;
}
- /* FIXME: badly timed dealloc may give SEGV */
ret_msg.has_port_id = true;
ret_msg.port_id = e->port_id;
ret_msg.has_api = true;
@@ -1961,10 +1891,6 @@ void * mainloop()
ret_msg.result = flow_alloc_reply(msg->port_id,
msg->response);
break;
- case IRM_MSG_CODE__IPCP_FLOW_DEALLOC:
- ret_msg.has_result = true;
- ret_msg.result = flow_dealloc_ipcp(msg->port_id);
- break;
default:
LOG_ERR("Don't know that message code.");
break;
diff --git a/src/irmd/utils.h b/src/irmd/utils.h
index 37c745af..2fbc8ef2 100644
--- a/src/irmd/utils.h
+++ b/src/irmd/utils.h
@@ -40,7 +40,8 @@ struct pid_el {
pid_t pid;
};
-int wildcard_match(const char * pattern, const char * string);
+int wildcard_match(const char * pattern,
+ const char * string);
/* functions for copying and destroying arguments list */
char ** argvdup(char ** argv);
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 14e7051a..b94d0eea 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -30,7 +30,6 @@ set(SOURCE_FILES
bitmap.c
cdap.c
dev.c
- ipcp.c
irm.c
list.c
lockfile.c
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 8b1b3bc6..92a05221 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -24,6 +24,7 @@
#include <ouroboros/cdap.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
#include <stdlib.h>
#include <pthread.h>
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 391563da..178ee287 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -24,6 +24,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
@@ -41,6 +42,87 @@ struct flow_set {
pthread_rwlock_t lock;
};
+enum port_state {
+ PORT_NULL = 0,
+ PORT_ID_PENDING,
+ PORT_ID_ASSIGNED,
+ PORT_DESTROY
+};
+
+struct port {
+ int fd;
+
+ enum port_state state;
+ pthread_mutex_t state_lock;
+ pthread_cond_t state_cond;
+};
+
+static void port_destroy(struct port * p)
+{
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ pthread_mutex_unlock(&p->state_lock);
+ return;
+ }
+
+ if (p->state == PORT_ID_PENDING)
+ p->state = PORT_DESTROY;
+ else
+ p->state = PORT_NULL;
+
+ pthread_cond_signal(&p->state_cond);
+
+ while (p->state != PORT_NULL)
+ pthread_cond_wait(&p->state_cond, &p->state_lock);
+
+ p->fd = -1;
+ p->state = PORT_ID_PENDING;
+
+ pthread_mutex_unlock(&p->state_lock);
+}
+
+static void port_set_state(struct port * p, enum port_state state)
+{
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ pthread_mutex_unlock(&p->state_lock);
+ return;
+ }
+
+ p->state = state;
+ pthread_cond_broadcast(&p->state_cond);
+
+ pthread_mutex_unlock(&p->state_lock);
+}
+
+enum port_state port_wait_assign(struct port * p)
+{
+ enum port_state state;
+
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state != PORT_ID_PENDING) {
+ pthread_mutex_unlock(&p->state_lock);
+ return -1;
+ }
+
+ while (!(p->state == PORT_ID_ASSIGNED || p->state == PORT_DESTROY))
+ pthread_cond_wait(&p->state_cond, &p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ p->state = PORT_NULL;
+ pthread_cond_broadcast(&p->state_cond);
+ }
+
+ state = p->state;
+
+ pthread_mutex_unlock(&p->state_lock);
+
+ return state;
+}
+
struct flow {
struct shm_ap_rbuff * rb;
int port_id;
@@ -48,24 +130,24 @@ struct flow {
pid_t api;
- struct timespec * timeout;
+ struct timespec timeout;
};
-struct ap_instance {
+struct {
char * ap_name;
char * daf_name;
pid_t api;
struct shm_rdrbuff * rdrb;
- struct bmp * fds;
struct shm_ap_rbuff * rb;
pthread_rwlock_t data_lock;
- struct flow flows[AP_MAX_FLOWS];
- int ports[AP_MAX_FLOWS];
+ struct bmp * fds;
+ struct flow * flows;
+ struct port * ports;
pthread_rwlock_t flows_lock;
-} * ai;
+} ai;
static int api_announce(char * ap_name)
{
@@ -76,12 +158,12 @@ static int api_announce(char * ap_name)
msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
msg.ap_name = ap_name;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -104,47 +186,61 @@ int ap_init(char * ap_name)
ap_name = path_strip(ap_name);
- ai = malloc(sizeof(*ai));
- if (ai == NULL) {
- return -ENOMEM;
- }
-
- ai->api = getpid();
- ai->ap_name = ap_name;
- ai->daf_name = NULL;
+ ai.api = getpid();
+ ai.ap_name = ap_name;
+ ai.daf_name = NULL;
- ai->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (ai->fds == NULL) {
- free(ai);
+ ai.fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (ai.fds == NULL)
return -ENOMEM;
+
+ ai.rdrb = shm_rdrbuff_open();
+ if (ai.rdrb == NULL) {
+ bmp_destroy(ai.fds);
+ return -1;
}
- ai->rdrb = shm_rdrbuff_open();
- if (ai->rdrb == NULL) {
- bmp_destroy(ai->fds);
- free(ai);
+ ai.rb = shm_ap_rbuff_create();
+ if (ai.rb == NULL) {
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
return -1;
}
- ai->rb = shm_ap_rbuff_create_s();
- if (ai->rb == NULL) {
- shm_rdrbuff_close(ai->rdrb);
- bmp_destroy(ai->fds);
- free(ai);
+ ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
+ if (ai.flows == NULL) {
+ shm_ap_rbuff_destroy(ai.rb);
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai->flows[i].rb = NULL;
- ai->flows[i].port_id = -1;
- ai->flows[i].oflags = 0;
- ai->flows[i].api = -1;
- ai->flows[i].timeout = NULL;
- ai->ports[i] = -1;
+ ai.flows[i].rb = NULL;
+ ai.flows[i].port_id = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
+ ai.flows[i].timeout.tv_sec = 0;
+ ai.flows[i].timeout.tv_nsec = 0;
}
- pthread_rwlock_init(&ai->flows_lock, NULL);
- pthread_rwlock_init(&ai->data_lock, NULL);
+ ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
+ if (ai.flows == NULL) {
+ free(ai.flows);
+ shm_ap_rbuff_destroy(ai.rb);
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
+ return -1;
+ }
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ ai.ports[i].state = PORT_ID_PENDING;
+ pthread_mutex_init(&ai.ports[i].state_lock, NULL);
+ pthread_cond_init(&ai.ports[i].state_cond, NULL);
+ }
+
+ pthread_rwlock_init(&ai.flows_lock, NULL);
+ pthread_rwlock_init(&ai.data_lock, NULL);
if (ap_name != NULL)
return api_announce(ap_name);
@@ -152,46 +248,49 @@ int ap_init(char * ap_name)
return 0;
}
-void ap_fini(void)
+void ap_fini()
{
int i = 0;
- if (ai == NULL)
- return;
-
- pthread_rwlock_wrlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai.data_lock);
/* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0)
- shm_rdrbuff_remove(ai->rdrb, i);
+ while ((i = shm_ap_rbuff_peek_idx(ai.rb)) >= 0)
+ shm_rdrbuff_remove(ai.rdrb, i);
- if (ai->fds != NULL)
- bmp_destroy(ai->fds);
- if (ai->rb != NULL)
- shm_ap_rbuff_destroy(ai->rb);
- if (ai->rdrb != NULL)
- shm_rdrbuff_close(ai->rdrb);
+ if (ai.fds != NULL)
+ bmp_destroy(ai.fds);
+ if (ai.rb != NULL)
+ shm_ap_rbuff_destroy(ai.rb);
+ if (ai.rdrb != NULL)
+ shm_rdrbuff_close(ai.rdrb);
- if (ai->daf_name != NULL)
- free(ai->daf_name);
+ if (ai.daf_name != NULL)
+ free(ai.daf_name);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (ai->flows[i].rb != NULL)
- shm_ap_rbuff_close(ai->flows[i].rb);
- ai->ports[ai->flows[i].port_id] = -1;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (ai.flows[i].rb != NULL)
+ shm_ap_rbuff_close(ai.flows[i].rb);
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ ai.ports[i].state = PORT_NULL;
+ pthread_mutex_destroy(&ai.ports[i].state_lock);
+ pthread_cond_destroy(&ai.ports[i].state_cond);
}
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ free(ai.flows);
+ free(ai.ports);
- pthread_rwlock_destroy(&ai->flows_lock);
- pthread_rwlock_destroy(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
- free(ai);
+ pthread_rwlock_destroy(&ai.flows_lock);
+ pthread_rwlock_destroy(&ai.data_lock);
}
+
int flow_accept(char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
@@ -201,11 +300,11 @@ int flow_accept(char ** ae_name)
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL)
@@ -216,22 +315,22 @@ int flow_accept(char ** ae_name)
return -1;
}
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- fd = bmp_allocate(ai->fds);
- if (!bmp_is_id_valid(ai->fds, fd)) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
- if (ai->flows[fd].rb == NULL) {
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -239,31 +338,31 @@ int flow_accept(char ** ae_name)
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(ai->flows[fd].rb);
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -ENOMEM;
}
}
- ai->flows[fd].port_id = recv_msg->port_id;
- ai->flows[fd].oflags = FLOW_O_DEFAULT;
- ai->flows[fd].api = recv_msg->api;
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
- ai->ports[recv_msg->port_id] = fd;
+ ai.ports[recv_msg->port_id].fd = fd;
+ ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int flow_alloc_resp(int fd,
- int response)
+int flow_alloc_resp(int fd, int response)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -274,49 +373,47 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_api = true;
- msg.api = ai->api;
+ msg.api = ai.api;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
msg.has_response = true;
msg.response = response;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return ret;
}
-int flow_alloc(char * dst_name,
- char * src_ae_name,
- struct qos_spec * qos)
+int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -333,11 +430,11 @@ int flow_alloc(char * dst_name,
msg.ae_name = src_ae_name;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -349,34 +446,35 @@ int flow_alloc(char * dst_name,
return -1;
}
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- fd = bmp_allocate(ai->fds);
- if (!bmp_is_id_valid(ai->fds, fd)) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
- if (ai->flows[fd].rb == NULL) {
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].port_id = recv_msg->port_id;
- ai->flows[fd].oflags = FLOW_O_DEFAULT;
- ai->flows[fd].api = recv_msg->api;
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
- ai->ports[recv_msg->port_id] = fd;
+ ai.ports[recv_msg->port_id].fd = fd;
+ ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -395,19 +493,19 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL) {
@@ -437,43 +535,43 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = getpid();
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- ai->ports[msg.port_id] = -1;
+ port_destroy(&ai.ports[msg.port_id]);
- ai->flows[fd].port_id = -1;
- shm_ap_rbuff_close(ai->flows[fd].rb);
- ai->flows[fd].rb = NULL;
- ai->flows[fd].api = -1;
+ ai.flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ ai.flows[fd].rb = NULL;
+ ai.flows[fd].api = -1;
- bmp_release(ai->fds, fd);
+ bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -487,30 +585,30 @@ int flow_cntl(int fd, int cmd, int oflags)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- old = ai->flows[fd].oflags;
+ old = ai.flows[fd].oflags;
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
- ai->flows[fd].oflags = oflags;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].oflags = oflags;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return old;
default:
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
@@ -526,62 +624,62 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rdrbuff_write(ai->rdrb,
- ai->flows[fd].api,
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rdrbuff_write(ai.rdrb,
+ ai.flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
+ buf,
count);
if (idx == -1) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EAGAIN;
}
e.index = idx;
- e.port_id = ai->flows[fd].port_id;
+ e.port_id = ai.flows[fd].port_id;
- if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(ai->rdrb, idx);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
} else { /* blocking */
- struct shm_rdrbuff * rdrb = ai->rdrb;
- pid_t api = ai->flows[fd].api;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ struct shm_rdrbuff * rdrb = ai.rdrb;
+ pid_t api = ai.flows[fd].api;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
idx = shm_rdrbuff_write_b(rdrb,
- api,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
- count);
+ api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ buf,
+ count);
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
e.index = idx;
- e.port_id = ai->flows[fd].port_id;
+ e.port_id = ai.flows[fd].port_id;
- while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)
+ while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0)
;
}
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return 0;
}
@@ -595,47 +693,44 @@ ssize_t flow_read(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(ai->rb,
- ai->flows[fd].port_id);
- pthread_rwlock_unlock(&ai->flows_lock);
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id);
+ pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_ap_rbuff * rb = ai->rb;
- int port_id = ai->flows[fd].port_id;
- struct timespec * timeout = ai->flows[fd].timeout;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
-
- idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
-
- pthread_rwlock_rdlock(&ai->data_lock);
+ struct shm_ap_rbuff * rb = ai.rb;
+ int port_id = ai.flows[fd].port_id;
+ struct timespec timeout = ai.flows[fd].timeout;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout);
+ pthread_rwlock_rdlock(&ai.data_lock);
}
if (idx < 0) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EAGAIN;
}
- n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);
+ n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
if (n < 0) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
memcpy(buf, sdu, MIN(n, count));
- shm_rdrbuff_remove(ai->rdrb, idx);
+ shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return n;
}
@@ -671,7 +766,7 @@ void flow_set_zero(struct flow_set * set)
void flow_set_add(struct flow_set * set, int fd)
{
pthread_rwlock_wrlock(&set->lock);
- set->b[ai->flows[fd].port_id] = true;
+ set->b[ai.flows[fd].port_id] = true;
set->dirty = true;
pthread_rwlock_unlock(&set->lock);
}
@@ -679,7 +774,7 @@ void flow_set_add(struct flow_set * set, int fd)
void flow_set_del(struct flow_set * set, int fd)
{
pthread_rwlock_wrlock(&set->lock);
- set->b[ai->flows[fd].port_id] = false;
+ set->b[ai.flows[fd].port_id] = false;
set->dirty = true;
pthread_rwlock_unlock(&set->lock);
}
@@ -688,7 +783,7 @@ bool flow_set_has(struct flow_set * set, int fd)
{
bool ret;
pthread_rwlock_rdlock(&set->lock);
- ret = set->b[ai->flows[fd].port_id];
+ ret = set->b[ai.flows[fd].port_id];
pthread_rwlock_unlock(&set->lock);
return ret;
}
@@ -712,12 +807,324 @@ int flow_select(struct flow_set * set, const struct timespec * timeout)
{
int port_id;
if (set == NULL) {
- port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout);
+ port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout);
} else {
flow_set_cpy(set);
- port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout);
+ port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout);
}
if (port_id < 0)
return port_id;
- return ai->ports[port_id];
+ return ai.ports[port_id].fd;
+}
+
+/* ipcp-dev functions */
+
+int np1_flow_alloc(pid_t n_api, int port_id)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].rb = shm_ap_rbuff_open(n_api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = n_api;
+
+ ai.ports[port_id].fd = fd;
+ port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int np1_flow_dealloc(int port_id)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = ai.ports[port_id].fd;
+ if (fd < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return fd;
+ }
+
+ ai.flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ ai.flows[fd].rb = NULL;
+ ai.flows[fd].api = -1;
+
+ bmp_release(ai.fds, fd);
+
+ port_destroy(&ai.ports[port_id]);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+
+int np1_flow_resp(pid_t n_api, int port_id)
+{
+ int fd;
+ struct shm_ap_rbuff * rb;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ port_wait_assign(&ai.ports[port_id]);
+
+ fd = ai.ports[port_id].fd;
+ if (fd < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return fd;
+ }
+
+ rb = shm_ap_rbuff_open(n_api);
+ if (rb == NULL) {
+ ai.flows[fd].port_id = -1;
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].rb = rb;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_create_r(pid_t api)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg.has_api = true;
+ msg.api = api;
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int port_id = -1;
+ int fd = -1;
+
+ if (dst_name == NULL || src_ae_name == NULL)
+ return -EINVAL;
+
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_api = true;
+ msg.api = api;
+ msg.dst_name = dst_name;
+ msg.ae_name = src_ae_name;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1; /* -ENOMOREFDS */
+ }
+
+ ai.flows[fd].rb = NULL;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (!recv_msg->has_port_id) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ port_id = recv_msg->port_id;
+ irm_msg__free_unpacked(recv_msg, NULL);
+ if (port_id < 0)
+ return -1;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].rb = NULL;
+
+ ai.ports[port_id].fd = fd;
+ port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_flow_alloc_reply(int fd, int response)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg.has_port_id = true;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+ msg.port_id = ai.flows[fd].port_id;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ msg.has_response = true;
+ msg.response = response;
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_read(struct shm_du_buff ** sdb)
+{
+ int fd;
+ struct rb_entry * e;
+
+ e = shm_ap_rbuff_read(ai.rb);
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[e->port_id].fd;
+
+ *sdb = shm_rdrbuff_get(ai.rdrb, e->index);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
+{
+ struct rb_entry e;
+
+ if (sdb == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
+ e.index = shm_du_buff_get_idx(sdb);
+ e.port_id = ai.flows[fd].port_id;
+
+ shm_ap_rbuff_write(ai.flows[fd].rb, &e);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+int local_flow_read(struct rb_entry * e)
+{
+ int fd;
+
+ *e = *(shm_ap_rbuff_read(ai.rb));
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[e->port_id].fd;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int local_flow_write(int fd, struct rb_entry * e)
+{
+ if (e == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
+ e->port_id = ai.flows[fd].port_id;
+
+ shm_ap_rbuff_write(ai.flows[fd].rb, e);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+void ipcp_flow_del(struct shm_du_buff * sdb)
+{
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
}
diff --git a/src/lib/irm.c b/src/lib/irm.c
index fce11ba5..c4c6395b 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -25,7 +25,7 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/irm.h>
-#include <ouroboros/common.h>
+#include <ouroboros/utils.h>
#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 7a634201..61c27d01 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -43,8 +43,7 @@ enum irm_msg_code {
IRM_FLOW_DEALLOC = 18;
IPCP_FLOW_REQ_ARR = 19;
IPCP_FLOW_ALLOC_REPLY = 20;
- IPCP_FLOW_DEALLOC = 21;
- IRM_REPLY = 22;
+ IRM_REPLY = 21;
};
message irm_msg {
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index d9e332fe..184a1bf2 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -21,14 +21,14 @@
*/
#include <ouroboros/config.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "shm_ap_rbuff"
#include <ouroboros/logs.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/lockfile.h>
-#include <ouroboros/time_utils.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +41,6 @@
#include <sys/stat.h>
#define FN_MAX_CHARS 255
-#define NORTH false
-#define SOUTH true
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
@@ -63,11 +61,10 @@ struct shm_ap_rbuff {
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
- bool dir; /* direction, false = N */
int fd;
};
-static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
+struct shm_ap_rbuff * shm_ap_rbuff_create()
{
struct shm_ap_rbuff * rb;
int shm_fd;
@@ -77,10 +74,7 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
char fn[FN_MAX_CHARS];
mode_t mask;
- if (dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid());
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -157,22 +151,18 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
rb->fd = shm_fd;
rb->api = getpid();
- rb->dir = dir;
return rb;
}
-static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
+struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
char fn[FN_MAX_CHARS];
- if (dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api);
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -215,31 +205,10 @@ static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
rb->fd = shm_fd;
rb->api = api;
- rb->dir = dir;
return rb;
}
-struct shm_ap_rbuff * shm_ap_rbuff_create_n()
-{
- return shm_ap_rbuff_create(NORTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_create_s()
-{
- return shm_ap_rbuff_create(SOUTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api)
-{
- return shm_ap_rbuff_open(api, NORTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api)
-{
- return shm_ap_rbuff_open(api, SOUTH);
-}
-
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
if (rb == NULL) {
@@ -285,10 +254,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
- if (rb->dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api);
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index bf5c7f16..fb58a4d6 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -24,7 +24,6 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/time_utils.h>
#include <pthread.h>
@@ -35,6 +34,7 @@
#include <string.h>
#include <signal.h>
#include <sys/stat.h>
+#include <stdbool.h>
#define OUROBOROS_PREFIX "shm_rdrbuff"
@@ -76,6 +76,7 @@ struct shm_du_buff {
size_t du_head;
size_t du_tail;
pid_t dst_api;
+ size_t idx;
};
struct shm_rdrbuff {
@@ -458,7 +459,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#endif
int sz = size + sizeof *sdb;
uint8_t * write_pos;
- ssize_t idx = -1;
if (rdrb == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -505,6 +505,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
sdb->dst_api = -1;
sdb->du_head = 0;
sdb->du_tail = 0;
+ sdb->idx = *rdrb->ptr_head;
*rdrb->ptr_head = 0;
}
@@ -521,7 +522,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
memcpy(write_pos, data, len);
- idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->ptr_head;
#ifdef SHM_RDRB_MULTI_BLOCK
*rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
@@ -529,7 +530,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#endif
pthread_mutex_unlock(rdrb->lock);
- return idx;
+ return sdb->idx;
}
ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
@@ -547,7 +548,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
#endif
int sz = size + sizeof *sdb;
uint8_t * write_pos;
- ssize_t idx = -1;
if (rdrb == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -596,6 +596,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
sdb->dst_api = -1;
sdb->du_head = 0;
sdb->du_tail = 0;
+ sdb->idx = *rdrb->ptr_head;
*rdrb->ptr_head = 0;
}
@@ -612,7 +613,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
memcpy(write_pos, data, len);
- idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->ptr_head;
#ifdef SHM_RDRB_MULTI_BLOCK
*rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
@@ -620,7 +621,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
#endif
pthread_cleanup_pop(true);
- return idx;
+ return sdb->idx;
}
int shm_rdrbuff_read(uint8_t ** dst,
@@ -654,6 +655,32 @@ int shm_rdrbuff_read(uint8_t ** dst,
return len;
}
+struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx)
+{
+ struct shm_du_buff * sdb;
+
+ if (idx > SHM_BUFFER_SIZE)
+ return NULL;
+#ifdef __APPLE__
+ pthread_mutex_lock(rdrb->lock);
+#else
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+#endif
+ if (shm_rdrb_empty(rdrb)) {
+ pthread_mutex_unlock(rdrb->lock);
+ return NULL;
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return sdb;
+}
+
int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)
{
if (idx > SHM_BUFFER_SIZE)
@@ -688,6 +715,11 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)
return 0;
}
+size_t shm_du_buff_get_idx(struct shm_du_buff * sdb)
+{
+ return sdb->idx;
+}
+
uint8_t * shm_du_buff_head(struct shm_du_buff * sdb)
{
if (sdb == NULL)
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 751c61b2..408e79e7 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -25,7 +25,6 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
-#include <ouroboros/common.h>
#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
@@ -102,13 +101,12 @@ int server_socket_open(char * file_name)
return sockfd;
}
-void close_ptr(void * o)
+static void close_ptr(void * o)
{
close(*(int *) o);
}
-static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg,
- bool timed)
+static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)
{
int sockfd;
buffer_t buf;
diff --git a/src/tools/cbr/cbr_server.c b/src/tools/cbr/cbr_server.c
index 8eff4a4c..c5664d8b 100644
--- a/src/tools/cbr/cbr_server.c
+++ b/src/tools/cbr/cbr_server.c
@@ -21,6 +21,10 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#include <ouroboros/dev.h>
+#include <ouroboros/time_utils.h>
+#include <ouroboros/fcntl.h>
+
#include <stdbool.h>
#ifdef __FreeBSD__
@@ -32,9 +36,6 @@
#include <stdlib.h>
#include <pthread.h>
-#include <ouroboros/dev.h>
-#include <ouroboros/time_utils.h>
-
#define THREADS_SIZE 10
pthread_t listen_thread;
diff --git a/src/tools/irm/irm.c b/src/tools/irm/irm.c
index c260feb9..a674c7ba 100644
--- a/src/tools/irm/irm.c
+++ b/src/tools/irm/irm.c
@@ -20,7 +20,6 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#include <ouroboros/common.h>
#include <ouroboros/irm.h>
#include <stdio.h>
#include <string.h>
diff --git a/src/tools/irm/irm_utils.c b/src/tools/irm/irm_utils.c
index feb8ac98..41a1e811 100644
--- a/src/tools/irm/irm_utils.c
+++ b/src/tools/irm/irm_utils.c
@@ -23,7 +23,6 @@
#include <string.h>
#include <stdbool.h>
#include <stdlib.h>
-#include <ouroboros/common.h>
#include "irm_utils.h"
diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c
index 3a254984..47b40118 100644
--- a/src/tools/oping/oping_client.c
+++ b/src/tools/oping/oping_client.c
@@ -22,7 +22,7 @@
*/
#include <ouroboros/dev.h>
-#include <ouroboros/errno.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/time_utils.h>
#ifdef __FreeBSD__
@@ -34,6 +34,7 @@
#include <sys/time.h>
#include <arpa/inet.h>
#include <math.h>
+#include <errno.h>
#include <float.h>
void shutdown_client(int signo, siginfo_t * info, void * c)