summaryrefslogtreecommitdiff
path: root/src/ipcpd/local
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-04 15:23:54 +0200
committerSander Vrijders <[email protected]>2016-10-04 15:23:54 +0200
commit1a7c0923206cfb98d43122621a585027c67040ea (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src/ipcpd/local
parentecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff)
parentc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff)
downloadouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz
ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
Diffstat (limited to 'src/ipcpd/local')
-rw-r--r--src/ipcpd/local/main.c494
1 files changed, 99 insertions, 395 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index c0809429..1ccec0c0 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -22,17 +22,10 @@
#include <ouroboros/config.h>
#include "ipcp.h"
-#include "flow.h"
#include <ouroboros/errno.h>
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/list.h>
-#include <ouroboros/utils.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/irm_config.h>
-#include <ouroboros/bitmap.h>
-#include <ouroboros/shared.h>
#include <ouroboros/dev.h>
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/local-dev.h>
#define OUROBOROS_PREFIX "ipcpd/local"
#include <ouroboros/logs.h>
@@ -46,176 +39,51 @@
#define THIS_TYPE IPCP_LOCAL
-#define shim_data(type) ((struct ipcp_local_data *) type->data)
-
/* global for trapping signal */
int irmd_api;
-/* this IPCP's data */
-#ifdef MAKE_CHECK
-extern struct ipcp * _ipcp; /* defined in test */
-#else
-struct ipcp * _ipcp;
-#endif
-
-/*
- * copied from ouroboros/dev. The shim needs access to the internals
- * because it doesn't follow all steps necessary steps to get
- * the info
- */
-
-/* the shim needs access to these internals */
-struct shim_ap_data {
- pid_t api;
- struct shm_rdrbuff * rdrb;
- struct bmp * fds;
- struct shm_ap_rbuff * rb;
-
- int in_out[AP_MAX_FLOWS];
+struct {
+ int in_out[IRMD_MAX_FLOWS];
- struct flow flows[AP_MAX_FLOWS];
- pthread_rwlock_t flows_lock;
-
- pthread_t mainloop;
+ pthread_rwlock_t lock;
pthread_t sduloop;
+} local_data;
-} * _ap_instance;
-
-static int shim_ap_init()
+void local_data_init()
{
int i;
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ local_data.in_out[i] = -1;
- _ap_instance = malloc(sizeof(struct shim_ap_data));
- if (_ap_instance == NULL) {
- return -1;
- }
-
- _ap_instance->api = getpid();
-
- _ap_instance->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (_ap_instance->fds == NULL) {
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rdrb = shm_rdrbuff_open();
- if (_ap_instance->rdrb == NULL) {
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- _ap_instance->rb = shm_ap_rbuff_create_n();
- if (_ap_instance->rb == NULL) {
- shm_rdrbuff_close(_ap_instance->rdrb);
- bmp_destroy(_ap_instance->fds);
- free(_ap_instance);
- return -1;
- }
-
- for (i = 0; i < AP_MAX_FLOWS; i ++) {
- _ap_instance->flows[i].rb = NULL;
- _ap_instance->flows[i].port_id = -1;
- _ap_instance->flows[i].state = FLOW_NULL;
- _ap_instance->in_out[i] = -1;
- }
-
- pthread_rwlock_init(&_ap_instance->flows_lock, NULL);
-
- return 0;
-}
-
-void shim_ap_fini()
-{
- int i = 0;
-
- if (_ap_instance == NULL)
- return;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- if (_ipcp->state != IPCP_SHUTDOWN)
- LOG_WARN("Cleaning up AP while not in shutdown.");
-
- if (_ap_instance->fds != NULL)
- bmp_destroy(_ap_instance->fds);
-
- /* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(_ap_instance->rb)) >= 0)
- shm_rdrbuff_remove(_ap_instance->rdrb, i);
-
- if (_ap_instance->rdrb != NULL)
- shm_rdrbuff_close(_ap_instance->rdrb);
- if (_ap_instance->rb != NULL)
- shm_ap_rbuff_destroy(_ap_instance->rb);
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- for (i = 0; i < AP_MAX_FLOWS; i ++)
- if (_ap_instance->flows[i].rb != NULL)
- shm_ap_rbuff_close(_ap_instance->flows[i].rb);
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- free(_ap_instance);
+ pthread_rwlock_init(&local_data.lock, NULL);
}
-/* only call this under flows_lock */
-static int port_id_to_fd(int port_id)
+void local_data_fini()
{
- int i;
-
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (_ap_instance->flows[i].port_id == port_id
- && _ap_instance->flows[i].state != FLOW_NULL)
- return i;
- }
-
- return -1;
+ pthread_rwlock_destroy(&local_data.lock);
}
-/*
- * end copy from dev.c
- */
-
-/* FIXME: if we move _ap_instance to dev.h, we can reuse it everywhere */
static void * ipcp_local_sdu_loop(void * o)
{
while (true) {
- struct rb_entry * e;
- int fd;
-
- e = shm_ap_rbuff_read(_ap_instance->rb);
- if (e == NULL) {
- continue;
- }
+ struct rb_entry e;
+ int fd = local_flow_read(&e);
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (_ipcp->state != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return (void *) 1; /* -ENOTENROLLED */
}
- pthread_rwlock_rdlock(&_ap_instance->flows_lock);
- fd = _ap_instance->in_out[port_id_to_fd(e->port_id)];
- if (fd == -1) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- free(e);
- continue;
- }
-
- e->port_id = _ap_instance->flows[fd].port_id;
-
- while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, e) < 0)
- ;
+ pthread_rwlock_rdlock(&local_data.lock);
+ fd = local_data.in_out[fd];
+ pthread_rwlock_unlock(&local_data.lock);
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (fd != -1)
+ local_flow_write(fd, &e);
- free(e);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
return (void *) 1;
@@ -223,10 +91,6 @@ static void * ipcp_local_sdu_loop(void * o)
void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
{
- sigset_t sigset;
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGINT);
-
switch(sig) {
case SIGINT:
case SIGTERM:
@@ -236,11 +100,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
LOG_DBG("IPCP %d terminating by order of %d. Bye.",
getpid(), info->si_pid);
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(_ipcp, IPCP_SHUTDOWN);
+ ipcp_set_state(IPCP_SHUTDOWN);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
}
default:
return;
@@ -254,307 +118,154 @@ static int ipcp_local_bootstrap(struct dif_config * conf)
return -1;
}
- pthread_rwlock_wrlock(&_ipcp->state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_INIT) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_INIT) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("IPCP in wrong state.");
return -1;
}
- ipcp_set_state(_ipcp, IPCP_ENROLLED);
+ ipcp_set_state(IPCP_ENROLLED);
- pthread_create(&_ap_instance->sduloop,
- NULL,
- ipcp_local_sdu_loop,
- NULL);
+ pthread_create(&local_data.sduloop, NULL, ipcp_local_sdu_loop, NULL);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Bootstrapped local IPCP with api %d.",
- getpid());
+ LOG_INFO("Bootstrapped local IPCP with api %d.", getpid());
return 0;
}
static int ipcp_local_name_reg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_data_add_reg_entry(_ipcp->data, name)) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_data_add_reg_entry(ipcpi.data, name)) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBGF("Failed to add %s to local registry.", name);
return -1;
}
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_DBG("Registered %s.", name);
+ LOG_INFO("Registered %s.", name);
return 0;
}
static int ipcp_local_name_unreg(char * name)
{
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- ipcp_data_del_reg_entry(_ipcp->data, name);
+ ipcp_data_del_reg_entry(ipcpi.data, name);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ LOG_INFO("Unregistered %s.", name);
return 0;
}
-static int ipcp_local_flow_alloc(pid_t n_api,
- int port_id,
+static int ipcp_local_flow_alloc(int fd,
char * dst_name,
char * src_ae_name,
enum qos_cube qos)
{
- int in_fd = -1;
int out_fd = -1;
- struct shm_ap_rbuff * rb;
-
- LOG_INFO("Allocating flow to %s.", dst_name);
+ LOG_DBG("Allocating flow to %s on fd %d.", dst_name, fd);
if (dst_name == NULL || src_ae_name == NULL)
return -1;
/* This ipcpd has all QoS */
- pthread_rwlock_rdlock(&_ipcp->state_lock);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- if (ipcp_get_state(_ipcp) != IPCP_ENROLLED) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ if (ipcp_get_state() != IPCP_ENROLLED) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_DBGF("Won't register with non-enrolled IPCP.");
return -1; /* -ENOTENROLLED */
}
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1; /* -ENORBUFF */
- }
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
+ pthread_rwlock_wrlock(&local_data.lock);
- in_fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, in_fd)) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -EMFILE;
- }
+ out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);
- _ap_instance->flows[in_fd].port_id = port_id;
- _ap_instance->flows[in_fd].state = FLOW_PENDING;
- _ap_instance->flows[in_fd].rb = rb;
+ local_data.in_out[fd] = out_fd;
+ local_data.in_out[out_fd] = fd;
- LOG_DBGF("Pending local flow with port_id %d.", port_id);
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- /* reply to IRM */
- port_id = ipcp_flow_req_arr(getpid(),
- dst_name,
- src_ae_name);
-
- if (port_id < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_ERR("Could not get port id from IRMd");
- /* shm_ap_rbuff_close(n_api); */
- return -1;
- }
-
- out_fd = bmp_allocate(_ap_instance->fds);
- if (!bmp_is_id_valid(_ap_instance->fds, out_fd)) {
- /* shm_ap_rbuff_close(n_api); */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1; /* -ENOMOREFDS */
- }
-
- _ap_instance->flows[out_fd].port_id = port_id;
- _ap_instance->flows[out_fd].rb = NULL;
- _ap_instance->flows[out_fd].state = FLOW_PENDING;
-
- _ap_instance->in_out[in_fd] = out_fd;
- _ap_instance->in_out[out_fd] = in_fd;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- LOG_DBGF("Pending local allocation request, port_id %d.", port_id);
+ LOG_INFO("Pending local allocation request on fd %d.", fd);
return 0;
}
-static int ipcp_local_flow_alloc_resp(pid_t n_api,
- int port_id,
- int response)
+static int ipcp_local_flow_alloc_resp(int fd, int response)
{
- struct shm_ap_rbuff * rb;
- int in_fd = -1;
int out_fd = -1;
int ret = -1;
+ LOG_DBG("Received response for fd %d: %d.", fd, response);
+
if (response)
return 0;
- pthread_rwlock_rdlock(&_ipcp->state_lock);
-
- /* awaken pending flow */
-
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- in_fd = port_id_to_fd(port_id);
- if (in_fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return -1;
- }
-
- if (_ap_instance->flows[in_fd].state != FLOW_PENDING) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow was not pending.");
- return -1;
- }
-
- rb = shm_ap_rbuff_open_s(n_api);
- if (rb == NULL) {
- LOG_ERR("Could not open N + 1 ringbuffer.");
- _ap_instance->flows[in_fd].state = FLOW_NULL;
- _ap_instance->flows[in_fd].port_id = -1;
- _ap_instance->in_out[in_fd] = -1;
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- return -1;
- }
-
- _ap_instance->flows[in_fd].state = FLOW_ALLOCATED;
- _ap_instance->flows[in_fd].rb = rb;
-
- LOG_DBGF("Accepted flow, port_id %d on fd %d.", port_id, in_fd);
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
- out_fd = _ap_instance->in_out[in_fd];
+ out_fd = local_data.in_out[fd];
if (out_fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("No pending local flow with port_id %d.", port_id);
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- if (_ap_instance->flows[out_fd].state != FLOW_PENDING) {
- /* FIXME: clean up other end */
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Flow was not pending.");
- return -1;
- }
-
- _ap_instance->flows[out_fd].state = FLOW_ALLOCATED;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- if ((ret = ipcp_flow_alloc_reply(getpid(),
- _ap_instance->flows[out_fd].port_id,
- response)) < 0) {
- return -1; /* -EPIPE */
- }
+ if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
+ return -1;
- LOG_INFO("Flow allocation completed, port_ids (%d, %d).",
- _ap_instance->flows[out_fd].port_id,
- _ap_instance->flows[in_fd].port_id);
+ LOG_INFO("Flow allocation completed, fds (%d, %d).", out_fd, fd);
return ret;
}
-static int ipcp_local_flow_dealloc(int port_id)
+static int ipcp_local_flow_dealloc(int fd)
{
- int fd = -1;
- struct shm_ap_rbuff * rb;
-
- pthread_rwlock_rdlock(&_ipcp->state_lock);
- pthread_rwlock_wrlock(&_ap_instance->flows_lock);
-
- fd = port_id_to_fd(port_id);
- if (fd < 0) {
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
- pthread_rwlock_unlock(&_ipcp->state_lock);
- LOG_DBGF("Could not find flow with port_id %d.", port_id);
- return 0;
- }
-
- bmp_release(_ap_instance->fds, fd);
-
- if (_ap_instance->in_out[fd] != -1)
- _ap_instance->in_out[_ap_instance->in_out[fd]] = -1;
-
- _ap_instance->in_out[fd] = -1;
-
- _ap_instance->flows[fd].state = FLOW_NULL;
- _ap_instance->flows[fd].port_id = -1;
- rb = _ap_instance->flows[fd].rb;
- _ap_instance->flows[fd].rb = NULL;
-
- pthread_rwlock_unlock(&_ap_instance->flows_lock);
-
- if (rb != NULL)
- shm_ap_rbuff_close(rb);
-
- pthread_rwlock_unlock(&_ipcp->state_lock);
-
- LOG_DBGF("Flow with port_id %d deallocated.", port_id);
-
- return 0;
-}
-
-static struct ipcp * ipcp_local_create()
-{
- struct ipcp * i;
- struct ipcp_ops * ops;
+ int out_fd = -1;
- i = ipcp_instance_create();
- if (i == NULL)
- return NULL;
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&local_data.lock);
- i->data = ipcp_data_create();
- if (i->data == NULL) {
- free(i);
- return NULL;
- }
+ out_fd = local_data.in_out[fd];
- if (ipcp_data_init(i->data, THIS_TYPE) == NULL) {
- free(i->data);
- free(i);
- return NULL;
+ if (out_fd != -1) {
+ local_data.in_out[out_fd] = -1;
+ flow_dealloc(out_fd);
}
- ops = malloc(sizeof(*ops));
- if (ops == NULL) {
- free(i->data);
- free(i);
- return NULL;
- }
+ local_data.in_out[fd] = -1;
- ops->ipcp_bootstrap = ipcp_local_bootstrap;
- ops->ipcp_enroll = NULL; /* shim */
- ops->ipcp_name_reg = ipcp_local_name_reg;
- ops->ipcp_name_unreg = ipcp_local_name_unreg;
- ops->ipcp_flow_alloc = ipcp_local_flow_alloc;
- ops->ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp;
- ops->ipcp_flow_dealloc = ipcp_local_flow_dealloc;
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
- i->ops = ops;
+ LOG_INFO("Flow with fd %d deallocated.", fd);
- i->state = IPCP_INIT;
-
- return i;
+ return 0;
}
-#ifndef MAKE_CHECK
+static struct ipcp_ops local_ops = {
+ .ipcp_bootstrap = ipcp_local_bootstrap,
+ .ipcp_enroll = NULL, /* shim */
+ .ipcp_name_reg = ipcp_local_name_reg,
+ .ipcp_name_unreg = ipcp_local_name_unreg,
+ .ipcp_flow_alloc = ipcp_local_flow_alloc,
+ .ipcp_flow_alloc_resp = ipcp_local_flow_alloc_resp,
+ .ipcp_flow_dealloc = ipcp_local_flow_dealloc
+};
int main(int argc, char * argv[])
{
@@ -571,7 +282,9 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- if (shim_ap_init() < 0) {
+ local_data_init();
+
+ if (ap_init(NULL) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
@@ -591,17 +304,13 @@ int main(int argc, char * argv[])
sigaction(SIGHUP, &sig_act, NULL);
sigaction(SIGPIPE, &sig_act, NULL);
- _ipcp = ipcp_local_create();
- if (_ipcp == NULL) {
- LOG_ERR("Failed to create IPCP.");
+ pthread_sigmask(SIG_BLOCK, &sigset, NULL);
+
+ if (ipcp_init(THIS_TYPE, &local_ops) < 0) {
close_logfile();
exit(EXIT_FAILURE);
}
- pthread_sigmask(SIG_BLOCK, &sigset, NULL);
-
- pthread_create(&_ap_instance->mainloop, NULL, ipcp_main_loop, _ipcp);
-
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
if (ipcp_create_r(getpid())) {
@@ -610,21 +319,16 @@ int main(int argc, char * argv[])
exit(EXIT_FAILURE);
}
- pthread_join(_ap_instance->mainloop, NULL);
-
- pthread_cancel(_ap_instance->sduloop);
- pthread_join(_ap_instance->sduloop, NULL);
+ ipcp_fini();
- shim_ap_fini();
+ pthread_cancel(local_data.sduloop);
+ pthread_join(local_data.sduloop, NULL);
- ipcp_data_destroy(_ipcp->data);
+ ap_fini();
- free(_ipcp->ops);
- free(_ipcp);
+ local_data_fini();
close_logfile();
exit(EXIT_SUCCESS);
}
-
-#endif /* MAKE_CHECK */