summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-09-18 06:27:43 +0200
committerdimitri staessens <[email protected]>2016-10-04 15:16:00 +0200
commitc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src/lib
parent2e561a634ae3e747b293a4e05eaf44726968dc1a (diff)
downloadouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.tar.gz
ouroboros-c96efb13edfaf9b2f2c626bd2a5d5d5afd38155f.zip
lib, ipcp: Revise fast path and flow interfaces
IPCPs can now use ap_init() to initialize the memory. All flows are accessed using flow descriptors, this greatly simplifies IPCP development. Reverts the fast path to a single ap_rbuff per process. Splits lib/ipcp into irmd/ipcp and lib/ipcp-dev. Adds a lib/shim-dev holding tailored functions for shims. Moves the buffer_t to utils.h. Fixes the shim-eth-llc length field. Removes the flow from shared.h. Fixes #4 Fixes #5
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/CMakeLists.txt1
-rw-r--r--src/lib/cdap.c1
-rw-r--r--src/lib/dev.c845
-rw-r--r--src/lib/ipcp.c514
-rw-r--r--src/lib/irm.c2
-rw-r--r--src/lib/irmd_messages.proto3
-rw-r--r--src/lib/shm_ap_rbuff.c50
-rw-r--r--src/lib/shm_rdrbuff.c46
-rw-r--r--src/lib/sockets.c6
9 files changed, 678 insertions, 790 deletions
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 14e7051a..b94d0eea 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -30,7 +30,6 @@ set(SOURCE_FILES
bitmap.c
cdap.c
dev.c
- ipcp.c
irm.c
list.c
lockfile.c
diff --git a/src/lib/cdap.c b/src/lib/cdap.c
index 8b1b3bc6..92a05221 100644
--- a/src/lib/cdap.c
+++ b/src/lib/cdap.c
@@ -24,6 +24,7 @@
#include <ouroboros/cdap.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/dev.h>
+#include <ouroboros/fcntl.h>
#include <stdlib.h>
#include <pthread.h>
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 391563da..178ee287 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -24,6 +24,7 @@
#include <ouroboros/errno.h>
#include <ouroboros/dev.h>
#include <ouroboros/sockets.h>
+#include <ouroboros/fcntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
@@ -41,6 +42,87 @@ struct flow_set {
pthread_rwlock_t lock;
};
+enum port_state {
+ PORT_NULL = 0,
+ PORT_ID_PENDING,
+ PORT_ID_ASSIGNED,
+ PORT_DESTROY
+};
+
+struct port {
+ int fd;
+
+ enum port_state state;
+ pthread_mutex_t state_lock;
+ pthread_cond_t state_cond;
+};
+
+static void port_destroy(struct port * p)
+{
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ pthread_mutex_unlock(&p->state_lock);
+ return;
+ }
+
+ if (p->state == PORT_ID_PENDING)
+ p->state = PORT_DESTROY;
+ else
+ p->state = PORT_NULL;
+
+ pthread_cond_signal(&p->state_cond);
+
+ while (p->state != PORT_NULL)
+ pthread_cond_wait(&p->state_cond, &p->state_lock);
+
+ p->fd = -1;
+ p->state = PORT_ID_PENDING;
+
+ pthread_mutex_unlock(&p->state_lock);
+}
+
+static void port_set_state(struct port * p, enum port_state state)
+{
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ pthread_mutex_unlock(&p->state_lock);
+ return;
+ }
+
+ p->state = state;
+ pthread_cond_broadcast(&p->state_cond);
+
+ pthread_mutex_unlock(&p->state_lock);
+}
+
+enum port_state port_wait_assign(struct port * p)
+{
+ enum port_state state;
+
+ pthread_mutex_lock(&p->state_lock);
+
+ if (p->state != PORT_ID_PENDING) {
+ pthread_mutex_unlock(&p->state_lock);
+ return -1;
+ }
+
+ while (!(p->state == PORT_ID_ASSIGNED || p->state == PORT_DESTROY))
+ pthread_cond_wait(&p->state_cond, &p->state_lock);
+
+ if (p->state == PORT_DESTROY) {
+ p->state = PORT_NULL;
+ pthread_cond_broadcast(&p->state_cond);
+ }
+
+ state = p->state;
+
+ pthread_mutex_unlock(&p->state_lock);
+
+ return state;
+}
+
struct flow {
struct shm_ap_rbuff * rb;
int port_id;
@@ -48,24 +130,24 @@ struct flow {
pid_t api;
- struct timespec * timeout;
+ struct timespec timeout;
};
-struct ap_instance {
+struct {
char * ap_name;
char * daf_name;
pid_t api;
struct shm_rdrbuff * rdrb;
- struct bmp * fds;
struct shm_ap_rbuff * rb;
pthread_rwlock_t data_lock;
- struct flow flows[AP_MAX_FLOWS];
- int ports[AP_MAX_FLOWS];
+ struct bmp * fds;
+ struct flow * flows;
+ struct port * ports;
pthread_rwlock_t flows_lock;
-} * ai;
+} ai;
static int api_announce(char * ap_name)
{
@@ -76,12 +158,12 @@ static int api_announce(char * ap_name)
msg.code = IRM_MSG_CODE__IRM_API_ANNOUNCE;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
msg.ap_name = ap_name;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -104,47 +186,61 @@ int ap_init(char * ap_name)
ap_name = path_strip(ap_name);
- ai = malloc(sizeof(*ai));
- if (ai == NULL) {
- return -ENOMEM;
- }
-
- ai->api = getpid();
- ai->ap_name = ap_name;
- ai->daf_name = NULL;
+ ai.api = getpid();
+ ai.ap_name = ap_name;
+ ai.daf_name = NULL;
- ai->fds = bmp_create(AP_MAX_FLOWS, 0);
- if (ai->fds == NULL) {
- free(ai);
+ ai.fds = bmp_create(AP_MAX_FLOWS, 0);
+ if (ai.fds == NULL)
return -ENOMEM;
+
+ ai.rdrb = shm_rdrbuff_open();
+ if (ai.rdrb == NULL) {
+ bmp_destroy(ai.fds);
+ return -1;
}
- ai->rdrb = shm_rdrbuff_open();
- if (ai->rdrb == NULL) {
- bmp_destroy(ai->fds);
- free(ai);
+ ai.rb = shm_ap_rbuff_create();
+ if (ai.rb == NULL) {
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
return -1;
}
- ai->rb = shm_ap_rbuff_create_s();
- if (ai->rb == NULL) {
- shm_rdrbuff_close(ai->rdrb);
- bmp_destroy(ai->fds);
- free(ai);
+ ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
+ if (ai.flows == NULL) {
+ shm_ap_rbuff_destroy(ai.rb);
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
return -1;
}
for (i = 0; i < AP_MAX_FLOWS; ++i) {
- ai->flows[i].rb = NULL;
- ai->flows[i].port_id = -1;
- ai->flows[i].oflags = 0;
- ai->flows[i].api = -1;
- ai->flows[i].timeout = NULL;
- ai->ports[i] = -1;
+ ai.flows[i].rb = NULL;
+ ai.flows[i].port_id = -1;
+ ai.flows[i].oflags = 0;
+ ai.flows[i].api = -1;
+ ai.flows[i].timeout.tv_sec = 0;
+ ai.flows[i].timeout.tv_nsec = 0;
}
- pthread_rwlock_init(&ai->flows_lock, NULL);
- pthread_rwlock_init(&ai->data_lock, NULL);
+ ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
+ if (ai.flows == NULL) {
+ free(ai.flows);
+ shm_ap_rbuff_destroy(ai.rb);
+ shm_rdrbuff_close(ai.rdrb);
+ bmp_destroy(ai.fds);
+ return -1;
+ }
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ ai.ports[i].state = PORT_ID_PENDING;
+ pthread_mutex_init(&ai.ports[i].state_lock, NULL);
+ pthread_cond_init(&ai.ports[i].state_cond, NULL);
+ }
+
+ pthread_rwlock_init(&ai.flows_lock, NULL);
+ pthread_rwlock_init(&ai.data_lock, NULL);
if (ap_name != NULL)
return api_announce(ap_name);
@@ -152,46 +248,49 @@ int ap_init(char * ap_name)
return 0;
}
-void ap_fini(void)
+void ap_fini()
{
int i = 0;
- if (ai == NULL)
- return;
-
- pthread_rwlock_wrlock(&ai->data_lock);
+ pthread_rwlock_wrlock(&ai.data_lock);
/* remove all remaining sdus */
- while ((i = shm_ap_rbuff_peek_idx(ai->rb)) >= 0)
- shm_rdrbuff_remove(ai->rdrb, i);
+ while ((i = shm_ap_rbuff_peek_idx(ai.rb)) >= 0)
+ shm_rdrbuff_remove(ai.rdrb, i);
- if (ai->fds != NULL)
- bmp_destroy(ai->fds);
- if (ai->rb != NULL)
- shm_ap_rbuff_destroy(ai->rb);
- if (ai->rdrb != NULL)
- shm_rdrbuff_close(ai->rdrb);
+ if (ai.fds != NULL)
+ bmp_destroy(ai.fds);
+ if (ai.rb != NULL)
+ shm_ap_rbuff_destroy(ai.rb);
+ if (ai.rdrb != NULL)
+ shm_rdrbuff_close(ai.rdrb);
- if (ai->daf_name != NULL)
- free(ai->daf_name);
+ if (ai.daf_name != NULL)
+ free(ai.daf_name);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- for (i = 0; i < AP_MAX_FLOWS; ++i) {
- if (ai->flows[i].rb != NULL)
- shm_ap_rbuff_close(ai->flows[i].rb);
- ai->ports[ai->flows[i].port_id] = -1;
+ for (i = 0; i < AP_MAX_FLOWS; ++i)
+ if (ai.flows[i].rb != NULL)
+ shm_ap_rbuff_close(ai.flows[i].rb);
+
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
+ ai.ports[i].state = PORT_NULL;
+ pthread_mutex_destroy(&ai.ports[i].state_lock);
+ pthread_cond_destroy(&ai.ports[i].state_cond);
}
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ free(ai.flows);
+ free(ai.ports);
- pthread_rwlock_destroy(&ai->flows_lock);
- pthread_rwlock_destroy(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
- free(ai);
+ pthread_rwlock_destroy(&ai.flows_lock);
+ pthread_rwlock_destroy(&ai.data_lock);
}
+
int flow_accept(char ** ae_name)
{
irm_msg_t msg = IRM_MSG__INIT;
@@ -201,11 +300,11 @@ int flow_accept(char ** ae_name)
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL)
@@ -216,22 +315,22 @@ int flow_accept(char ** ae_name)
return -1;
}
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- fd = bmp_allocate(ai->fds);
- if (!bmp_is_id_valid(ai->fds, fd)) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
- if (ai->flows[fd].rb == NULL) {
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -239,31 +338,31 @@ int flow_accept(char ** ae_name)
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- shm_ap_rbuff_close(ai->flows[fd].rb);
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -ENOMEM;
}
}
- ai->flows[fd].port_id = recv_msg->port_id;
- ai->flows[fd].oflags = FLOW_O_DEFAULT;
- ai->flows[fd].api = recv_msg->api;
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
- ai->ports[recv_msg->port_id] = fd;
+ ai.ports[recv_msg->port_id].fd = fd;
+ ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
}
-int flow_alloc_resp(int fd,
- int response)
+int flow_alloc_resp(int fd, int response)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -274,49 +373,47 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_api = true;
- msg.api = ai->api;
+ msg.api = ai.api;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
msg.has_response = true;
msg.response = response;
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return ret;
}
-int flow_alloc(char * dst_name,
- char * src_ae_name,
- struct qos_spec * qos)
+int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
@@ -333,11 +430,11 @@ int flow_alloc(char * dst_name,
msg.ae_name = src_ae_name;
msg.has_api = true;
- pthread_rwlock_rdlock(&ai->data_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
- msg.api = ai->api;
+ msg.api = ai.api;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
@@ -349,34 +446,35 @@ int flow_alloc(char * dst_name,
return -1;
}
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- fd = bmp_allocate(ai->fds);
- if (!bmp_is_id_valid(ai->fds, fd)) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].rb = shm_ap_rbuff_open_n(recv_msg->api);
- if (ai->flows[fd].rb == NULL) {
- bmp_release(ai->fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].rb = shm_ap_rbuff_open(recv_msg->api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- ai->flows[fd].port_id = recv_msg->port_id;
- ai->flows[fd].oflags = FLOW_O_DEFAULT;
- ai->flows[fd].api = recv_msg->api;
+ ai.flows[fd].port_id = recv_msg->port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = recv_msg->api;
- ai->ports[recv_msg->port_id] = fd;
+ ai.ports[recv_msg->port_id].fd = fd;
+ ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -395,19 +493,19 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg_b(&msg);
if (recv_msg == NULL) {
@@ -437,43 +535,43 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = getpid();
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- msg.port_id = ai->flows[fd].port_id;
+ msg.port_id = ai.flows[fd].port_id;
- ai->ports[msg.port_id] = -1;
+ port_destroy(&ai.ports[msg.port_id]);
- ai->flows[fd].port_id = -1;
- shm_ap_rbuff_close(ai->flows[fd].rb);
- ai->flows[fd].rb = NULL;
- ai->flows[fd].api = -1;
+ ai.flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ ai.flows[fd].rb = NULL;
+ ai.flows[fd].api = -1;
- bmp_release(ai->fds, fd);
+ bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai->flows_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
if (!recv_msg->has_result) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ret = recv_msg->result;
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
@@ -487,30 +585,30 @@ int flow_cntl(int fd, int cmd, int oflags)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_wrlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- old = ai->flows[fd].oflags;
+ old = ai.flows[fd].oflags;
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
- ai->flows[fd].oflags = oflags;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ ai.flows[fd].oflags = oflags;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return old;
default:
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
@@ -526,62 +624,62 @@ ssize_t flow_write(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_rdrbuff_write(ai->rdrb,
- ai->flows[fd].api,
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_rdrbuff_write(ai.rdrb,
+ ai.flows[fd].api,
DU_BUFF_HEADSPACE,
DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
+ buf,
count);
if (idx == -1) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EAGAIN;
}
e.index = idx;
- e.port_id = ai->flows[fd].port_id;
+ e.port_id = ai.flows[fd].port_id;
- if (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0) {
- shm_rdrbuff_remove(ai->rdrb, idx);
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, idx);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
} else { /* blocking */
- struct shm_rdrbuff * rdrb = ai->rdrb;
- pid_t api = ai->flows[fd].api;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ struct shm_rdrbuff * rdrb = ai.rdrb;
+ pid_t api = ai.flows[fd].api;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
idx = shm_rdrbuff_write_b(rdrb,
- api,
- DU_BUFF_HEADSPACE,
- DU_BUFF_TAILSPACE,
- (uint8_t *) buf,
- count);
+ api,
+ DU_BUFF_HEADSPACE,
+ DU_BUFF_TAILSPACE,
+ buf,
+ count);
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
e.index = idx;
- e.port_id = ai->flows[fd].port_id;
+ e.port_id = ai.flows[fd].port_id;
- while (shm_ap_rbuff_write(ai->flows[fd].rb, &e) < 0)
+ while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0)
;
}
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return 0;
}
@@ -595,47 +693,44 @@ ssize_t flow_read(int fd, void * buf, size_t count)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai->data_lock);
- pthread_rwlock_rdlock(&ai->flows_lock);
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
- if (ai->flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
+ if (ai.flows[fd].port_id < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -ENOTALLOC;
}
- if (ai->flows[fd].oflags & FLOW_O_NONBLOCK) {
- idx = shm_ap_rbuff_read_port(ai->rb,
- ai->flows[fd].port_id);
- pthread_rwlock_unlock(&ai->flows_lock);
+ if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
+ idx = shm_ap_rbuff_read_port(ai.rb, ai.flows[fd].port_id);
+ pthread_rwlock_unlock(&ai.flows_lock);
} else {
- struct shm_ap_rbuff * rb = ai->rb;
- int port_id = ai->flows[fd].port_id;
- struct timespec * timeout = ai->flows[fd].timeout;
- pthread_rwlock_unlock(&ai->flows_lock);
- pthread_rwlock_unlock(&ai->data_lock);
-
- idx = shm_ap_rbuff_read_port_b(rb, port_id, timeout);
-
- pthread_rwlock_rdlock(&ai->data_lock);
+ struct shm_ap_rbuff * rb = ai.rb;
+ int port_id = ai.flows[fd].port_id;
+ struct timespec timeout = ai.flows[fd].timeout;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ idx = shm_ap_rbuff_read_port_b(rb, port_id, &timeout);
+ pthread_rwlock_rdlock(&ai.data_lock);
}
if (idx < 0) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EAGAIN;
}
- n = shm_rdrbuff_read(&sdu, ai->rdrb, idx);
+ n = shm_rdrbuff_read(&sdu, ai.rdrb, idx);
if (n < 0) {
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -1;
}
memcpy(buf, sdu, MIN(n, count));
- shm_rdrbuff_remove(ai->rdrb, idx);
+ shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&ai->data_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return n;
}
@@ -671,7 +766,7 @@ void flow_set_zero(struct flow_set * set)
void flow_set_add(struct flow_set * set, int fd)
{
pthread_rwlock_wrlock(&set->lock);
- set->b[ai->flows[fd].port_id] = true;
+ set->b[ai.flows[fd].port_id] = true;
set->dirty = true;
pthread_rwlock_unlock(&set->lock);
}
@@ -679,7 +774,7 @@ void flow_set_add(struct flow_set * set, int fd)
void flow_set_del(struct flow_set * set, int fd)
{
pthread_rwlock_wrlock(&set->lock);
- set->b[ai->flows[fd].port_id] = false;
+ set->b[ai.flows[fd].port_id] = false;
set->dirty = true;
pthread_rwlock_unlock(&set->lock);
}
@@ -688,7 +783,7 @@ bool flow_set_has(struct flow_set * set, int fd)
{
bool ret;
pthread_rwlock_rdlock(&set->lock);
- ret = set->b[ai->flows[fd].port_id];
+ ret = set->b[ai.flows[fd].port_id];
pthread_rwlock_unlock(&set->lock);
return ret;
}
@@ -712,12 +807,324 @@ int flow_select(struct flow_set * set, const struct timespec * timeout)
{
int port_id;
if (set == NULL) {
- port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout);
+ port_id = shm_ap_rbuff_peek_b(ai.rb, NULL, timeout);
} else {
flow_set_cpy(set);
- port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) set->s, timeout);
+ port_id = shm_ap_rbuff_peek_b(ai.rb, (bool *) set->s, timeout);
}
if (port_id < 0)
return port_id;
- return ai->ports[port_id];
+ return ai.ports[port_id].fd;
+}
+
+/* ipcp-dev functions */
+
+int np1_flow_alloc(pid_t n_api, int port_id)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].rb = shm_ap_rbuff_open(n_api);
+ if (ai.flows[fd].rb == NULL) {
+ bmp_release(ai.fds, fd);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].api = n_api;
+
+ ai.ports[port_id].fd = fd;
+ port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int np1_flow_dealloc(int port_id)
+{
+ int fd;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = ai.ports[port_id].fd;
+ if (fd < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return fd;
+ }
+
+ ai.flows[fd].port_id = -1;
+ shm_ap_rbuff_close(ai.flows[fd].rb);
+ ai.flows[fd].rb = NULL;
+ ai.flows[fd].api = -1;
+
+ bmp_release(ai.fds, fd);
+
+ port_destroy(&ai.ports[port_id]);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+
+int np1_flow_resp(pid_t n_api, int port_id)
+{
+ int fd;
+ struct shm_ap_rbuff * rb;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ port_wait_assign(&ai.ports[port_id]);
+
+ fd = ai.ports[port_id].fd;
+ if (fd < 0) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return fd;
+ }
+
+ rb = shm_ap_rbuff_open(n_api);
+ if (rb == NULL) {
+ ai.flows[fd].port_id = -1;
+ port_destroy(&ai.ports[port_id]);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1;
+ }
+
+ ai.flows[fd].rb = rb;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_create_r(pid_t api)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg.has_api = true;
+ msg.api = api;
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_req_arr(pid_t api, char * dst_name, char * src_ae_name)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int port_id = -1;
+ int fd = -1;
+
+ if (dst_name == NULL || src_ae_name == NULL)
+ return -EINVAL;
+
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg.has_api = true;
+ msg.api = api;
+ msg.dst_name = dst_name;
+ msg.ae_name = src_ae_name;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ fd = bmp_allocate(ai.fds);
+ if (!bmp_is_id_valid(ai.fds, fd)) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -1; /* -ENOMOREFDS */
+ }
+
+ ai.flows[fd].rb = NULL;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (!recv_msg->has_port_id) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ port_id = recv_msg->port_id;
+ irm_msg__free_unpacked(recv_msg, NULL);
+ if (port_id < 0)
+ return -1;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ ai.flows[fd].port_id = port_id;
+ ai.flows[fd].rb = NULL;
+
+ ai.ports[port_id].fd = fd;
+ port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_flow_alloc_reply(int fd, int response)
+{
+ irm_msg_t msg = IRM_MSG__INIT;
+ irm_msg_t * recv_msg = NULL;
+ int ret = -1;
+
+ msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg.has_port_id = true;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+ msg.port_id = ai.flows[fd].port_id;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ msg.has_response = true;
+ msg.response = response;
+
+ recv_msg = send_recv_irm_msg(&msg);
+ if (recv_msg == NULL)
+ return -1;
+
+ if (recv_msg->has_result == false) {
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
+ }
+
+ ret = recv_msg->result;
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ return ret;
+}
+
+int ipcp_flow_read(struct shm_du_buff ** sdb)
+{
+ int fd;
+ struct rb_entry * e;
+
+ e = shm_ap_rbuff_read(ai.rb);
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[e->port_id].fd;
+
+ *sdb = shm_rdrbuff_get(ai.rdrb, e->index);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int ipcp_flow_write(int fd, struct shm_du_buff * sdb)
+{
+ struct rb_entry e;
+
+ if (sdb == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
+ e.index = shm_du_buff_get_idx(sdb);
+ e.port_id = ai.flows[fd].port_id;
+
+ shm_ap_rbuff_write(ai.flows[fd].rb, &e);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+int local_flow_read(struct rb_entry * e)
+{
+ int fd;
+
+ *e = *(shm_ap_rbuff_read(ai.rb));
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ fd = ai.ports[e->port_id].fd;
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return fd;
+}
+
+int local_flow_write(int fd, struct rb_entry * e)
+{
+ if (e == NULL)
+ return -EINVAL;
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
+
+ if (ai.flows[fd].rb == NULL) {
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -EPERM;
+ }
+
+ e->port_id = ai.flows[fd].port_id;
+
+ shm_ap_rbuff_write(ai.flows[fd].rb, e);
+
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+
+ return 0;
+}
+
+void ipcp_flow_del(struct shm_du_buff * sdb)
+{
+ shm_rdrbuff_remove(ai.rdrb, shm_du_buff_get_idx(sdb));
}
diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c
deleted file mode 100644
index 01741121..00000000
--- a/src/lib/ipcp.c
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016
- *
- * The API to instruct IPCPs
- *
- * Sander Vrijders <[email protected]>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#define OUROBOROS_PREFIX "lib-ipcp"
-
-#include <ouroboros/config.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/common.h>
-#include <ouroboros/logs.h>
-#include <ouroboros/utils.h>
-#include <ouroboros/sockets.h>
-
-#include <stdlib.h>
-#include <string.h>
-#include <signal.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/socket.h>
-#include <sys/time.h>
-
-static void close_ptr(void * o)
-{
- close(*((int *) o));
-}
-
-static ipcp_msg_t * send_recv_ipcp_msg(pid_t api,
- ipcp_msg_t * msg)
-{
- int sockfd = 0;
- buffer_t buf;
- char * sock_path = NULL;
- ssize_t count = 0;
- ipcp_msg_t * recv_msg = NULL;
-
- struct timeval tv = {(SOCKET_TIMEOUT / 1000),
- (SOCKET_TIMEOUT % 1000) * 1000};
-
- sock_path = ipcp_sock_path(api);
- if (sock_path == NULL)
- return NULL;
-
- sockfd = client_socket_open(sock_path);
- if (sockfd < 0) {
- free(sock_path);
- return NULL;
- }
-
- if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO,
- (void *) &tv, sizeof(tv)))
- LOG_WARN("Failed to set timeout on socket.");
-
- free(sock_path);
-
- buf.len = ipcp_msg__get_packed_size(msg);
- if (buf.len == 0) {
- close(sockfd);
- return NULL;
- }
-
- buf.data = malloc(IPCP_MSG_BUF_SIZE);
- if (buf.data == NULL) {
- close(sockfd);
- return NULL;
- }
-
- pthread_cleanup_push(close_ptr, (void *) &sockfd);
- pthread_cleanup_push((void (*)(void *)) free, (void *) buf.data);
-
- ipcp_msg__pack(msg, buf.data);
-
- if (write(sockfd, buf.data, buf.len) != -1)
- count = read(sockfd, buf.data, IPCP_MSG_BUF_SIZE);
-
- if (count > 0)
- recv_msg = ipcp_msg__unpack(NULL, count, buf.data);
-
- pthread_cleanup_pop(true);
- pthread_cleanup_pop(true);
-
- return recv_msg;
-}
-
-pid_t ipcp_create(enum ipcp_type ipcp_type)
-{
- pid_t api = -1;
- char irmd_api[10];
- size_t len = 0;
- char * ipcp_dir = "/sbin/";
- char * full_name = NULL;
- char * exec_name = NULL;
- char * log_file = NULL;
-
- sprintf(irmd_api, "%u", getpid());
-
- api = fork();
- if (api == -1) {
- LOG_ERR("Failed to fork");
- return api;
- }
-
- if (api != 0) {
- return api;
- }
-
- if (ipcp_type == IPCP_NORMAL)
- exec_name = IPCP_NORMAL_EXEC;
- else if (ipcp_type == IPCP_SHIM_UDP)
- exec_name = IPCP_SHIM_UDP_EXEC;
- else if (ipcp_type == IPCP_SHIM_ETH_LLC)
- exec_name = IPCP_SHIM_ETH_LLC_EXEC;
- else if (ipcp_type == IPCP_LOCAL)
- exec_name = IPCP_LOCAL_EXEC;
- else
- exit(EXIT_FAILURE);
-
- len += strlen(INSTALL_PREFIX);
- len += strlen(ipcp_dir);
- len += strlen(exec_name);
- len += 1;
-
- full_name = malloc(len + 1);
- if (full_name == NULL) {
- LOG_ERR("Failed to malloc");
- exit(EXIT_FAILURE);
- }
-
- strcpy(full_name, INSTALL_PREFIX);
- strcat(full_name, ipcp_dir);
- strcat(full_name, exec_name);
- full_name[len] = '\0';
-
- if (logfile != NULL) {
- log_file = malloc(20);
- if (log_file == NULL) {
- LOG_ERR("Failed to malloc.");
- exit(EXIT_FAILURE);
- }
- sprintf(log_file, "ipcpd-%u.log", getpid());
- }
-
- /* log_file to be placed at the end */
- char * argv[] = {full_name,
- irmd_api,
- log_file,
- 0};
-
- char * envp[] = {0};
-
- execve(argv[0], &argv[0], envp);
-
- LOG_DBG("%s", strerror(errno));
- LOG_ERR("Failed to load IPCP daemon");
- LOG_ERR("Make sure to run the installed version");
- free(full_name);
- exit(EXIT_FAILURE);
-}
-
-int ipcp_create_r(pid_t api)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.has_api = true;
- msg.api = api;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_destroy(pid_t api)
-{
- int status;
-
- if (kill(api, SIGTERM)) {
- LOG_ERR("Failed to destroy IPCP");
- return -1;
- }
-
- if (waitpid(api, &status, 0) < 0) {
- LOG_ERR("Failed to destroy IPCP");
- return -1;
- }
-
- return 0;
-}
-
-int ipcp_bootstrap(pid_t api,
- dif_config_msg_t * conf)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- if (conf == NULL)
- return -EINVAL;
-
- msg.code = IPCP_MSG_CODE__IPCP_BOOTSTRAP;
- msg.conf = conf;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_enroll(pid_t api,
- char * dif_name)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- if (dif_name == NULL)
- return -EINVAL;
-
- msg.code = IPCP_MSG_CODE__IPCP_ENROLL;
- msg.dif_name = dif_name;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_name_reg(pid_t api,
- char * name)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- if (name == NULL)
- return -1;
-
- msg.code = IPCP_MSG_CODE__IPCP_NAME_REG;
- msg.name = name;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_name_unreg(pid_t api,
- char * name)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IPCP_MSG_CODE__IPCP_NAME_UNREG;
- msg.name = name;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_flow_alloc(pid_t api,
- int port_id,
- pid_t n_api,
- char * dst_name,
- char * src_ae_name,
- enum qos_cube qos)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- if (dst_name == NULL || src_ae_name == NULL)
- return -EINVAL;
-
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC;
- msg.has_port_id = true;
- msg.port_id = port_id;
- msg.has_api = true;
- msg.api = n_api;
- msg.src_ae_name = src_ae_name;
- msg.dst_name = dst_name;
- msg.has_qos_cube = true;
- msg.qos_cube = qos;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (!recv_msg->has_result) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_flow_alloc_resp(pid_t api,
- int port_id,
- pid_t n_api,
- int response)
-{
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP;
- msg.has_port_id = true;
- msg.port_id = port_id;
- msg.has_api = true;
- msg.api = n_api;
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int ipcp_flow_req_arr(pid_t api,
- char * dst_name,
- char * src_ae_name)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int port_id = -1;
-
- if (dst_name == NULL || src_ae_name == NULL)
- return -EINVAL;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_api = true;
- msg.api = api;
- msg.dst_name = dst_name;
- msg.ae_name = src_ae_name;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (!recv_msg->has_port_id) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- port_id = recv_msg->port_id;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return port_id;
-}
-
-int ipcp_flow_alloc_reply(pid_t api,
- int port_id,
- int response)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.port_id = port_id;
- msg.has_port_id = true;
- msg.response = response;
- msg.has_response = true;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -1;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-
-int ipcp_flow_dealloc(pid_t api,
- int port_id)
-{
-
- ipcp_msg_t msg = IPCP_MSG__INIT;
- ipcp_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_port_id = true;
- msg.port_id = port_id;
-
- recv_msg = send_recv_ipcp_msg(api, &msg);
- if (recv_msg == NULL)
- return 0;
-
- if (recv_msg->has_result == false) {
- ipcp_msg__free_unpacked(recv_msg, NULL);
- return 0;
- }
-
- ret = recv_msg->result;
- ipcp_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
-
-int irm_flow_dealloc(int port_id)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg = NULL;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_port_id = true;
- msg.port_id = port_id;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return 0;
-
- if (recv_msg->has_result == false) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return 0;
- }
-
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return ret;
-}
diff --git a/src/lib/irm.c b/src/lib/irm.c
index fce11ba5..c4c6395b 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -25,7 +25,7 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/irm.h>
-#include <ouroboros/common.h>
+#include <ouroboros/utils.h>
#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto
index 7a634201..61c27d01 100644
--- a/src/lib/irmd_messages.proto
+++ b/src/lib/irmd_messages.proto
@@ -43,8 +43,7 @@ enum irm_msg_code {
IRM_FLOW_DEALLOC = 18;
IPCP_FLOW_REQ_ARR = 19;
IPCP_FLOW_ALLOC_REPLY = 20;
- IPCP_FLOW_DEALLOC = 21;
- IRM_REPLY = 22;
+ IRM_REPLY = 21;
};
message irm_msg {
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index d9e332fe..184a1bf2 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -21,14 +21,14 @@
*/
#include <ouroboros/config.h>
+#include <ouroboros/shm_ap_rbuff.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
#define OUROBOROS_PREFIX "shm_ap_rbuff"
#include <ouroboros/logs.h>
-#include <ouroboros/shm_ap_rbuff.h>
-#include <ouroboros/lockfile.h>
-#include <ouroboros/time_utils.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +41,6 @@
#include <sys/stat.h>
#define FN_MAX_CHARS 255
-#define NORTH false
-#define SOUTH true
#define SHM_RBUFF_FILE_SIZE (SHM_BUFFER_SIZE * sizeof(struct rb_entry) \
+ 2 * sizeof(size_t) + sizeof(pthread_mutex_t) \
@@ -63,11 +61,10 @@ struct shm_ap_rbuff {
pthread_cond_t * add; /* SDU arrived */
pthread_cond_t * del; /* SDU removed */
pid_t api; /* api to which this rb belongs */
- bool dir; /* direction, false = N */
int fd;
};
-static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
+struct shm_ap_rbuff * shm_ap_rbuff_create()
{
struct shm_ap_rbuff * rb;
int shm_fd;
@@ -77,10 +74,7 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
char fn[FN_MAX_CHARS];
mode_t mask;
- if (dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", getpid());
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", getpid());
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", getpid());
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -157,22 +151,18 @@ static struct shm_ap_rbuff * shm_ap_rbuff_create(bool dir)
rb->fd = shm_fd;
rb->api = getpid();
- rb->dir = dir;
return rb;
}
-static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
+struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
{
struct shm_ap_rbuff * rb;
int shm_fd;
struct rb_entry * shm_base;
char fn[FN_MAX_CHARS];
- if (dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", api);
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", api);
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", api);
rb = malloc(sizeof(*rb));
if (rb == NULL) {
@@ -215,31 +205,10 @@ static struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api, bool dir)
rb->fd = shm_fd;
rb->api = api;
- rb->dir = dir;
return rb;
}
-struct shm_ap_rbuff * shm_ap_rbuff_create_n()
-{
- return shm_ap_rbuff_create(NORTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_create_s()
-{
- return shm_ap_rbuff_create(SOUTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_open_n(pid_t api)
-{
- return shm_ap_rbuff_open(api, NORTH);
-}
-
-struct shm_ap_rbuff * shm_ap_rbuff_open_s(pid_t api)
-{
- return shm_ap_rbuff_open(api, SOUTH);
-}
-
void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
{
if (rb == NULL) {
@@ -285,10 +254,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
if (close(rb->fd) < 0)
LOG_DBG("Couldn't close shared memory.");
- if (rb->dir == SOUTH)
- sprintf(fn, SHM_AP_RBUFF_PREFIX "south.%d", rb->api);
- else
- sprintf(fn, SHM_AP_RBUFF_PREFIX "north.%d", rb->api);
+ sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
LOG_DBG("Couldn't unmap shared memory.");
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index bf5c7f16..fb58a4d6 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -24,7 +24,6 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/time_utils.h>
#include <pthread.h>
@@ -35,6 +34,7 @@
#include <string.h>
#include <signal.h>
#include <sys/stat.h>
+#include <stdbool.h>
#define OUROBOROS_PREFIX "shm_rdrbuff"
@@ -76,6 +76,7 @@ struct shm_du_buff {
size_t du_head;
size_t du_tail;
pid_t dst_api;
+ size_t idx;
};
struct shm_rdrbuff {
@@ -458,7 +459,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#endif
int sz = size + sizeof *sdb;
uint8_t * write_pos;
- ssize_t idx = -1;
if (rdrb == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -505,6 +505,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
sdb->dst_api = -1;
sdb->du_head = 0;
sdb->du_tail = 0;
+ sdb->idx = *rdrb->ptr_head;
*rdrb->ptr_head = 0;
}
@@ -521,7 +522,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
memcpy(write_pos, data, len);
- idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->ptr_head;
#ifdef SHM_RDRB_MULTI_BLOCK
*rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
@@ -529,7 +530,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
#endif
pthread_mutex_unlock(rdrb->lock);
- return idx;
+ return sdb->idx;
}
ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
@@ -547,7 +548,6 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
#endif
int sz = size + sizeof *sdb;
uint8_t * write_pos;
- ssize_t idx = -1;
if (rdrb == NULL || data == NULL) {
LOG_DBGF("Bogus input, bugging out.");
@@ -596,6 +596,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
sdb->dst_api = -1;
sdb->du_head = 0;
sdb->du_tail = 0;
+ sdb->idx = *rdrb->ptr_head;
*rdrb->ptr_head = 0;
}
@@ -612,7 +613,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
memcpy(write_pos, data, len);
- idx = *rdrb->ptr_head;
+ sdb->idx = *rdrb->ptr_head;
#ifdef SHM_RDRB_MULTI_BLOCK
*rdrb->ptr_head = (*rdrb->ptr_head + blocks) & (SHM_BUFFER_SIZE - 1);
#else
@@ -620,7 +621,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb,
#endif
pthread_cleanup_pop(true);
- return idx;
+ return sdb->idx;
}
int shm_rdrbuff_read(uint8_t ** dst,
@@ -654,6 +655,32 @@ int shm_rdrbuff_read(uint8_t ** dst,
return len;
}
+struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, ssize_t idx)
+{
+ struct shm_du_buff * sdb;
+
+ if (idx > SHM_BUFFER_SIZE)
+ return NULL;
+#ifdef __APPLE__
+ pthread_mutex_lock(rdrb->lock);
+#else
+ if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) {
+ LOG_DBGF("Recovering dead mutex.");
+ pthread_mutex_consistent(rdrb->lock);
+ }
+#endif
+ if (shm_rdrb_empty(rdrb)) {
+ pthread_mutex_unlock(rdrb->lock);
+ return NULL;
+ }
+
+ sdb = idx_to_du_buff_ptr(rdrb, idx);
+
+ pthread_mutex_unlock(rdrb->lock);
+
+ return sdb;
+}
+
int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)
{
if (idx > SHM_BUFFER_SIZE)
@@ -688,6 +715,11 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, ssize_t idx)
return 0;
}
+size_t shm_du_buff_get_idx(struct shm_du_buff * sdb)
+{
+ return sdb->idx;
+}
+
uint8_t * shm_du_buff_head(struct shm_du_buff * sdb)
{
if (sdb == NULL)
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 751c61b2..408e79e7 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -25,7 +25,6 @@
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
-#include <ouroboros/common.h>
#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
@@ -102,13 +101,12 @@ int server_socket_open(char * file_name)
return sockfd;
}
-void close_ptr(void * o)
+static void close_ptr(void * o)
{
close(*(int *) o);
}
-static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg,
- bool timed)
+static irm_msg_t * send_recv_irm_msg_timed(irm_msg_t * msg, bool timed)
{
int sockfd;
buffer_t buf;