summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2024-02-23 09:29:47 +0100
committerSander Vrijders <[email protected]>2024-02-23 16:41:37 +0100
commite6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch)
treead959d95f8fb1f6d4744c57c9027bf182bc3190b
parentdcefa07624926da23a559eedc3f7361ac36e8312 (diff)
downloadouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz
ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r--include/ouroboros/ipcp-dev.h26
-rw-r--r--include/ouroboros/irm.h4
-rw-r--r--include/ouroboros/protobuf.h46
-rw-r--r--include/ouroboros/pthread.h2
-rw-r--r--include/ouroboros/serdes-irm.h79
-rw-r--r--include/ouroboros/sockets.h.in16
-rw-r--r--src/ipcpd/broadcast/main.c3
-rw-r--r--src/ipcpd/eth/eth.c111
-rw-r--r--src/ipcpd/ipcp.c53
-rw-r--r--src/ipcpd/ipcp.h25
-rw-r--r--src/ipcpd/local/main.c20
-rw-r--r--src/ipcpd/udp/main.c68
-rw-r--r--src/ipcpd/unicast/fa.c50
-rw-r--r--src/ipcpd/unicast/fa.h18
-rw-r--r--src/irmd/CMakeLists.txt9
-rw-r--r--src/irmd/config.h.in45
-rw-r--r--src/irmd/ipcp.c12
-rw-r--r--src/irmd/main.c105
-rw-r--r--src/irmd/reg/reg.c2
-rw-r--r--src/lib/CMakeLists.txt7
-rw-r--r--src/lib/config.h.in1
-rw-r--r--src/lib/dev.c665
-rw-r--r--src/lib/frct.c4
-rw-r--r--src/lib/pb/ipcp.proto3
-rw-r--r--src/lib/pb/ipcp_config.proto5
-rw-r--r--src/lib/pb/irm.proto55
-rw-r--r--src/lib/pb/model.proto (renamed from src/lib/pb/qos.proto)28
-rw-r--r--src/lib/protobuf.c78
-rw-r--r--src/lib/serdes-irm.c478
-rw-r--r--src/lib/serdes-oep.c1
-rw-r--r--src/lib/sockets.c27
-rw-r--r--src/lib/timerwheel.c10
32 files changed, 1298 insertions, 758 deletions
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index 5ee78905..378d724a 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -20,27 +20,25 @@
* Foundation, Inc., http://www.fsf.org/about/contact/.
*/
-#include <ouroboros/shm_rdrbuff.h>
-#include <ouroboros/qoscube.h>
-#include <ouroboros/ipcp.h>
-
#ifndef OUROBOROS_LIB_IPCP_DEV_H
#define OUROBOROS_LIB_IPCP_DEV_H
+#include <ouroboros/ipcp.h>
+#include <ouroboros/qoscube.h>
+#include <ouroboros/shm_rdrbuff.h>
+#include <ouroboros/utils.h>
+
int ipcp_create_r(const struct ipcp_info * info);
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
+int ipcp_flow_req_arr(const buffer_t * dst,
qosspec_t qs,
time_t mpl,
- const void * data,
- size_t dlen);
-
-int ipcp_flow_alloc_reply(int fd,
- int response,
- time_t mpl,
- const void * data,
- size_t len);
+ const buffer_t * data);
+
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data);
int ipcp_flow_read(int fd,
struct shm_du_buff ** sdb);
diff --git a/include/ouroboros/irm.h b/include/ouroboros/irm.h
index b27343e8..0105f88e 100644
--- a/include/ouroboros/irm.h
+++ b/include/ouroboros/irm.h
@@ -30,10 +30,6 @@
#include <sys/types.h>
-/* Name binding options. */
-#define BIND_AUTO 0x01
-#define NAME_SIZE 255
-
struct ipcp_list_info {
pid_t pid;
enum ipcp_type type;
diff --git a/include/ouroboros/protobuf.h b/include/ouroboros/protobuf.h
index eb292721..9d38afb1 100644
--- a/include/ouroboros/protobuf.h
+++ b/include/ouroboros/protobuf.h
@@ -23,44 +23,56 @@
#ifndef OUROBOROS_LIB_PROTOBUF_H
#define OUROBOROS_LIB_PROTOBUF_H
+#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
#include <ouroboros/ipcp.h>
#include <ouroboros/irm.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/serdes-oep.h>
#include "ipcp_config.pb-c.h"
typedef IpcpConfigMsg ipcp_config_msg_t;
-typedef LayerInfoMsg layer_info_msg_t;
-typedef DtConfigMsg dt_config_msg_t;
-typedef EthConfigMsg eth_config_msg_t;
-typedef UdpConfigMsg udp_config_msg_t;
-typedef UniConfigMsg uni_config_msg_t;
+typedef DtConfigMsg dt_config_msg_t;
+typedef EthConfigMsg eth_config_msg_t;
+typedef UdpConfigMsg udp_config_msg_t;
+typedef UniConfigMsg uni_config_msg_t;
#include "ipcp.pb-c.h"
-typedef IpcpMsg ipcp_msg_t;
+typedef IpcpMsg ipcp_msg_t;
#include "irm.pb-c.h"
-typedef IpcpInfoMsg ipcp_info_msg_t;
-typedef IpcpListMsg ipcp_list_msg_t;
-typedef NameInfoMsg name_info_msg_t;
+typedef IrmMsg irm_msg_t;
+typedef TimespecMsg timespec_msg_t;
+typedef IpcpInfoMsg ipcp_info_msg_t;
+typedef IpcpListMsg ipcp_list_msg_t;
-#include "qos.pb-c.h"
-typedef QosspecMsg qosspec_msg_t;
+#include "model.pb-c.h"
+typedef FlowInfoMsg flow_info_msg_t;
+typedef LayerInfoMsg layer_info_msg_t;
+typedef NameInfoMsg name_info_msg_t;
+typedef QosspecMsg qosspec_msg_t;
#include "enroll.pb-c.h"
-typedef EnrollReqMsg enroll_req_msg_t;
-typedef EnrollRespMsg enroll_resp_msg_t;
-typedef EnrollAckMsg enroll_ack_msg_t;
+typedef EnrollReqMsg enroll_req_msg_t;
+typedef EnrollRespMsg enroll_resp_msg_t;
+typedef EnrollAckMsg enroll_ack_msg_t;
/* IPCP configuration */
+timespec_msg_t * timespec_s_to_msg(const struct timespec * s);
+
+struct timespec timespec_msg_to_s(timespec_msg_t * msg);
+
+flow_info_msg_t * flow_info_s_to_msg(const struct flow_info * s);
+
+struct flow_info flow_info_msg_to_s(const flow_info_msg_t * msg);
layer_info_msg_t * layer_info_s_to_msg(const struct layer_info * s);
struct layer_info layer_info_msg_to_s(const layer_info_msg_t * msg);
-ipcp_info_msg_t * ipcp_info_s_to_msg(const struct ipcp_info * s);
+ipcp_info_msg_t * ipcp_info_s_to_msg(const struct ipcp_info * s);
-struct ipcp_info ipcp_info_msg_to_s(const ipcp_info_msg_t * msg);
+struct ipcp_info ipcp_info_msg_to_s(const ipcp_info_msg_t * msg);
dt_config_msg_t * dt_config_s_to_msg(const struct dt_config * s);
@@ -102,4 +114,4 @@ enroll_ack_msg_t * enroll_ack_s_to_msg(const struct enroll_ack * s);
struct enroll_ack enroll_ack_msg_to_s(const enroll_ack_msg_t * msg);
-#endif /* OUROBOROS_LIB_PROTOBUF_H */ \ No newline at end of file
+#endif /* OUROBOROS_LIB_PROTOBUF_H */
diff --git a/include/ouroboros/pthread.h b/include/ouroboros/pthread.h
index df9213c0..7044cb5e 100644
--- a/include/ouroboros/pthread.h
+++ b/include/ouroboros/pthread.h
@@ -35,11 +35,13 @@ static int __attribute__((unused)) __timedwait(pthread_cond_t * cond,
return pthread_cond_timedwait(cond, mtx, abstime);
}
+#if defined (_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L
/* various cleanup functions for pthread_cleanup_push */
static void __attribute__((unused)) __cleanup_rwlock_unlock(void * rwlock)
{
pthread_rwlock_unlock((pthread_rwlock_t *) rwlock);
}
+#endif
static void __attribute__((unused)) __cleanup_mutex_unlock(void * mutex)
{
diff --git a/include/ouroboros/serdes-irm.h b/include/ouroboros/serdes-irm.h
new file mode 100644
index 00000000..1d041541
--- /dev/null
+++ b/include/ouroboros/serdes-irm.h
@@ -0,0 +1,79 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Ouroboros IRM Protocol - serialization/deserialization
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_SERDES_IRM_H
+#define OUROBOROS_LIB_SERDES_IRM_H
+
+#include <ouroboros/flow.h>
+#include <ouroboros/ipcp.h>
+#include <ouroboros/time.h>
+#include <ouroboros/utils.h>
+
+#include <inttypes.h>
+
+int flow_alloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo);
+
+int flow_join__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo);
+
+int flow_accept__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo);
+
+int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf,
+ const buffer_t * dst,
+ const struct flow_info * flow,
+ const buffer_t * data);
+
+int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ int response,
+ const buffer_t * data);
+
+/* response to alloc / join / accept / flow_req_arr */
+int flow__irm_result_des(buffer_t * buf,
+ struct flow_info * flow,
+ buffer_t * sk);
+
+int flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo);
+
+int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * info);
+
+int ipcp_create_r__irm_req_ser(buffer_t * buf,
+ const struct ipcp_info * ipcp);
+
+int proc_announce__irm_req_ser(buffer_t * buf,
+ const char * prog);
+
+int proc_exit__irm_req_ser(buffer_t * buf);
+
+int irm__irm_result_des(buffer_t * buf);
+
+#endif /* OUROBOROS_LIB_SERDES_IRM_H*/
diff --git a/include/ouroboros/sockets.h.in b/include/ouroboros/sockets.h.in
index cd198781..095674a9 100644
--- a/include/ouroboros/sockets.h.in
+++ b/include/ouroboros/sockets.h.in
@@ -23,18 +23,17 @@
#ifndef OUROBOROS_LIB_SOCKETS_H
#define OUROBOROS_LIB_SOCKETS_H
-#include <sys/types.h>
+#include <ouroboros/protobuf.h>
-#include "irm.pb-c.h"
-typedef IrmMsg irm_msg_t;
+#include <sys/types.h>
-#define SOCK_PATH "/var/run/ouroboros/"
-#define SOCK_PATH_SUFFIX ".sock"
+#define SOCK_PATH "/var/run/ouroboros/"
+#define SOCK_PATH_SUFFIX ".sock"
-#define IRM_SOCK_PATH SOCK_PATH "irm" SOCK_PATH_SUFFIX
+#define IRM_SOCK_PATH SOCK_PATH "irm" SOCK_PATH_SUFFIX
#define IPCP_SOCK_PATH_PREFIX SOCK_PATH "ipcp"
-#define SOCK_BUF_SIZE @SOCK_BUF_SIZE@
+#define SOCK_BUF_SIZE @SOCK_BUF_SIZE@
/* Returns the full socket path of an IPCP */
char * ipcp_sock_path(pid_t pid);
@@ -43,8 +42,9 @@ int server_socket_open(char * file_name);
int client_socket_open(char * file_name);
-irm_msg_t * send_recv_irm_msg(irm_msg_t * msg);
+int send_recv_msg(buffer_t * buf);
+irm_msg_t * send_recv_irm_msg(irm_msg_t * msg);
/* cleanup socket when cancelling thread */
void __cleanup_close_ptr(void * o);
diff --git a/src/ipcpd/broadcast/main.c b/src/ipcpd/broadcast/main.c
index d2cbed6f..f51fc629 100644
--- a/src/ipcpd/broadcast/main.c
+++ b/src/ipcpd/broadcast/main.c
@@ -212,6 +212,7 @@ static int broadcast_ipcp_join(int fd,
{
struct conn conn;
time_t mpl = IPCP_BROADCAST_MPL;
+ buffer_t data = {NULL, 0};
(void) qs;
@@ -226,7 +227,7 @@ static int broadcast_ipcp_join(int fd,
notifier_event(NOTIFY_DT_CONN_ADD, &conn);
- ipcp_flow_alloc_reply(fd, 0, mpl, NULL, 0);
+ ipcp_flow_alloc_reply(fd, 0, mpl, &data);
return 0;
}
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index c0aaf711..ea6e0f1c 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -455,16 +455,15 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
return 0;
}
-static int eth_ipcp_alloc(const uint8_t * dst_addr,
+static int eth_ipcp_alloc(const uint8_t * dst_addr,
#if defined(BUILD_ETH_DIX)
- uint16_t eid,
+ uint16_t eid,
#elif defined(BUILD_ETH_LLC)
- uint8_t ssap,
+ uint8_t ssap,
#endif
- const uint8_t * hash,
- qosspec_t qs,
- const void * data,
- size_t dlen)
+ const uint8_t * hash,
+ qosspec_t qs,
+ const buffer_t * data)
{
uint8_t * buf;
struct mgmt_msg * msg;
@@ -473,7 +472,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
len = sizeof(*msg) + ipcp_dir_hash_len();
- buf = malloc(len + ETH_HEADER_TOT_SIZE + dlen);
+ buf = malloc(len + ETH_HEADER_TOT_SIZE + data->len);
if (buf == NULL)
return -1;
@@ -496,8 +495,8 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, hash, ipcp_dir_hash_len());
- if (dlen > 0)
- memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen);
+ if (data->len > 0)
+ memcpy(buf + len + ETH_HEADER_TOT_SIZE, data->data, data->len);
ret = eth_ipcp_send_frame(dst_addr,
#if defined(BUILD_ETH_DIX)
@@ -506,28 +505,27 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
reverse_bits(MGMT_SAP),
reverse_bits(MGMT_SAP),
#endif
- buf, len + dlen);
+ buf, len + data->len);
free(buf);
return ret;
}
-static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
+static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
#if defined(BUILD_ETH_DIX)
- uint16_t seid,
- uint16_t deid,
+ uint16_t seid,
+ uint16_t deid,
#elif defined(BUILD_ETH_LLC)
- uint8_t ssap,
- uint8_t dsap,
+ uint8_t ssap,
+ uint8_t dsap,
#endif
- int response,
- const void * data,
- size_t len)
+ int response,
+ const buffer_t * data)
{
struct mgmt_msg * msg;
uint8_t * buf;
- buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + len);
+ buf = malloc(sizeof(*msg) + ETH_HEADER_TOT_SIZE + data->len);
if (buf == NULL)
return -1;
@@ -543,8 +541,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
#endif
msg->response = response;
- if (len > 0)
- memcpy(msg + 1, data, len);
+ if (data->len > 0)
+ memcpy(msg + 1, data->data, data->len);
if (eth_ipcp_send_frame(dst_addr,
#if defined(BUILD_ETH_DIX)
@@ -553,7 +551,7 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
reverse_bits(MGMT_SAP),
reverse_bits(MGMT_SAP),
#endif
- buf, sizeof(*msg) + len)) {
+ buf, sizeof(*msg) + data->len)) {
free(buf);
return -1;
}
@@ -563,20 +561,19 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
return 0;
}
-static int eth_ipcp_req(uint8_t * r_addr,
+static int eth_ipcp_req(uint8_t * r_addr,
#if defined(BUILD_ETH_DIX)
- uint16_t r_eid,
+ uint16_t r_eid,
#elif defined(BUILD_ETH_LLC)
- uint8_t r_sap,
+ uint8_t r_sap,
#endif
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len)
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
int fd;
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data, len);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_ETH_MPL, data);
if (fd < 0) {
log_err("Could not get new flow from IRMd.");
return -1;
@@ -600,17 +597,16 @@ static int eth_ipcp_req(uint8_t * r_addr,
return 0;
}
-static int eth_ipcp_alloc_reply(uint8_t * r_addr,
+static int eth_ipcp_alloc_reply(uint8_t * r_addr,
#if defined(BUILD_ETH_DIX)
- uint16_t seid,
- uint16_t deid,
+ uint16_t seid,
+ uint16_t deid,
#elif defined(BUILD_ETH_LLC)
- uint8_t ssap,
- int dsap,
+ uint8_t ssap,
+ int dsap,
#endif
- int response,
- const void * data,
- size_t len)
+ int response,
+ const buffer_t * data)
{
int ret = 0;
int fd = -1;
@@ -649,7 +645,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr,
#elif defined(BUILD_ETH_LLC)
log_dbg("Flow reply, fd %d, SSAP %d, DSAP %d.", fd, ssap, dsap);
#endif
- if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data, len)) < 0) {
+ if ((ret = ipcp_flow_alloc_reply(fd, response, mpl, data)) < 0) {
log_err("Failed to reply to flow allocation.");
return -1;
}
@@ -716,6 +712,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
struct mgmt_msg * msg;
size_t msg_len;
qosspec_t qs;
+ buffer_t data;
msg = (struct mgmt_msg *) buf;
@@ -735,6 +732,9 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
qs.cypher_s = ntoh16(msg->cypher_s);
qs.timeout = ntoh32(msg->timeout);
+ data.data = (uint8_t *) buf + msg_len;
+ data.len = len - msg_len;
+
if (shim_data_reg_has(eth_data.shim_data,
buf + sizeof(*msg))) {
eth_ipcp_req(r_addr,
@@ -745,13 +745,15 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
#endif
buf + sizeof(*msg),
qs,
- buf + msg_len,
- len - msg_len);
+ &data);
}
break;
case FLOW_REPLY:
assert(len >= sizeof(*msg));
+ data.data = (uint8_t *) buf + sizeof(*msg);
+ data.len = len - sizeof(*msg);
+
eth_ipcp_alloc_reply(r_addr,
#if defined(BUILD_ETH_DIX)
ntohs(msg->seid),
@@ -761,8 +763,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
msg->dsap,
#endif
msg->response,
- buf + sizeof(*msg),
- len - sizeof(*msg));
+ &data);
break;
case NAME_QUERY_REQ:
eth_ipcp_name_query_req(buf + sizeof(*msg), r_addr);
@@ -1589,11 +1590,10 @@ static int eth_ipcp_query(const uint8_t * hash)
return ret;
}
-static int eth_ipcp_flow_alloc(int fd,
- const uint8_t * hash,
- qosspec_t qs,
- const void * data,
- size_t len)
+static int eth_ipcp_flow_alloc(int fd,
+ const uint8_t * hash,
+ qosspec_t qs,
+ const buffer_t * data)
{
#ifdef BUILD_ETH_LLC
uint8_t ssap = 0;
@@ -1634,8 +1634,7 @@ static int eth_ipcp_flow_alloc(int fd,
#endif
hash,
qs,
- data,
- len) < 0) {
+ data) < 0) {
#ifdef BUILD_ETH_LLC
pthread_rwlock_wrlock(&eth_data.flows_lock);
bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap);
@@ -1654,10 +1653,9 @@ static int eth_ipcp_flow_alloc(int fd,
return 0;
}
-static int eth_ipcp_flow_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len)
+static int eth_ipcp_flow_alloc_resp(int fd,
+ int response,
+ const buffer_t * data)
{
#if defined(BUILD_ETH_DIX)
uint16_t r_eid;
@@ -1698,8 +1696,7 @@ static int eth_ipcp_flow_alloc_resp(int fd,
ssap, r_sap,
#endif
response,
- data,
- len) < 0) {
+ data) < 0) {
#ifdef BUILD_ETH_LLC
pthread_rwlock_wrlock(&eth_data.flows_lock);
bmp_release(eth_data.saps, eth_data.fd_to_ef[fd].sap);
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 0215cdaa..966c4920 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -261,15 +261,18 @@ static void * acceptloop(void * o)
return (void *) 0;
}
-int ipcp_wait_flow_req_arr(const uint8_t * dst,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t len)
+int ipcp_wait_flow_req_arr(const uint8_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
{
struct timespec ts = TIMESPEC_INIT_MS(ALLOC_TIMEOUT);
struct timespec abstime;
int fd;
+ buffer_t hash;
+
+ hash.data = (uint8_t *) dst;
+ hash.len = ipcp_dir_hash_len();
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
@@ -290,7 +293,7 @@ int ipcp_wait_flow_req_arr(const uint8_t * dst,
assert(ipcpi.alloc_id == -1);
- fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, mpl, data, len);
+ fd = ipcp_flow_req_arr(&hash, qs, mpl, data);
if (fd < 0) {
pthread_mutex_unlock(&ipcpi.alloc_lock);
log_err("Failed to get fd for flow.");
@@ -492,13 +495,12 @@ static void do_query(const uint8_t * hash,
ret_msg->result = ipcpi.ops->ipcp_query(hash);
}
-static void do_flow_alloc(pid_t pid,
- int flow_id,
- uint8_t * dst,
- qosspec_t qs,
- void * data,
- size_t len,
- ipcp_msg_t * ret_msg)
+static void do_flow_alloc(pid_t pid,
+ int flow_id,
+ uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data,
+ ipcp_msg_t * ret_msg)
{
int fd;
@@ -525,7 +527,7 @@ static void do_flow_alloc(pid_t pid,
goto finish;
}
- ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data, len);
+ ret_msg->result = ipcpi.ops->ipcp_flow_alloc(fd, dst, qs, data);
finish:
log_info("Finished allocating flow %d to " HASH_FMT32 ": %d.",
flow_id, HASH_VAL32(dst), ret_msg->result);
@@ -566,11 +568,10 @@ static void do_flow_join(pid_t pid,
log_info("Finished joining layer " HASH_FMT32 ".", HASH_VAL32(dst));
}
-static void do_flow_alloc_resp(int resp,
- int flow_id,
- const void * data,
- size_t len,
- ipcp_msg_t * ret_msg)
+static void do_flow_alloc_resp(int resp,
+ int flow_id,
+ const buffer_t * data,
+ ipcp_msg_t * ret_msg)
{
int fd = -1;
@@ -597,7 +598,7 @@ static void do_flow_alloc_resp(int resp,
}
}
- ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data, len);
+ ret_msg->result = ipcpi.ops->ipcp_flow_alloc_resp(fd, resp, data);
finish:
log_info("Finished responding to allocation request: %d",
ret_msg->result);
@@ -648,6 +649,7 @@ static void * mainloop(void * o)
ipcp_msg_t ret_msg = IPCP_MSG__INIT;
qosspec_t qs;
struct cmd * cmd;
+ buffer_t data;
ret_msg.code = IPCP_MSG_CODE__IPCP_REPLY;
@@ -710,11 +712,12 @@ static void * mainloop(void * o)
assert(msg->hash.len == ipcp_dir_hash_len());
assert(msg->pk.len > 0 ? msg->pk.data != NULL
: msg->pk.data == NULL);
+ data.len = msg->pk.len;
+ data.data = msg->pk.data;
qs = qos_spec_msg_to_s(msg->qosspec);
do_flow_alloc(msg->pid, msg->flow_id,
msg->hash.data, qs,
- msg->pk.data, msg->pk.len,
- &ret_msg);
+ &data, &ret_msg);
break;
case IPCP_MSG_CODE__IPCP_FLOW_JOIN:
assert(msg->hash.len == ipcp_dir_hash_len());
@@ -725,10 +728,10 @@ static void * mainloop(void * o)
case IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP:
assert(msg->pk.len > 0 ? msg->pk.data != NULL
: msg->pk.data == NULL);
-
+ data.len = msg->pk.len;
+ data.data = msg->pk.data;
do_flow_alloc_resp(msg->response, msg->flow_id,
- msg->pk.data, msg->pk.len,
- &ret_msg);
+ &data, &ret_msg);
break;
case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
do_flow_dealloc(msg->flow_id, msg->timeo_sec, &ret_msg);
diff --git a/src/ipcpd/ipcp.h b/src/ipcpd/ipcp.h
index 1ce07c57..aab490c7 100644
--- a/src/ipcpd/ipcp.h
+++ b/src/ipcpd/ipcp.h
@@ -53,20 +53,18 @@ struct ipcp_ops {
int (* ipcp_query)(const uint8_t * hash);
- int (* ipcp_flow_alloc)(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len);
+ int (* ipcp_flow_alloc)(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data);
int (* ipcp_flow_join)(int fd,
const uint8_t * dst,
qosspec_t qs);
- int (* ipcp_flow_alloc_resp)(int fd,
- int response,
- const void * data,
- size_t len);
+ int (* ipcp_flow_alloc_resp)(int fd,
+ int response,
+ const buffer_t * data);
int (* ipcp_flow_dealloc)(int fd);
};
@@ -129,11 +127,10 @@ int ipcp_parse_arg(int argc,
char * argv[]);
/* Helper functions to handle races during flow allocation */
-int ipcp_wait_flow_req_arr(const uint8_t * dst,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t len);
+int ipcp_wait_flow_req_arr(const uint8_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data);
int ipcp_wait_flow_resp(const int fd);
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index dd2c7209..5a53dec5 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -186,11 +186,10 @@ static int local_ipcp_query(const uint8_t * hash)
return ret;
}
-static int local_ipcp_flow_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len)
+static int local_ipcp_flow_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
int out_fd = -1;
@@ -198,7 +197,7 @@ static int local_ipcp_flow_alloc(int fd,
HASH_VAL32(dst), fd);
assert(dst);
- out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data, len);
+ out_fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_LOCAL_MPL, data);
if (out_fd < 0) {
log_dbg("Flow allocation failed: %d", out_fd);
return -1;
@@ -218,10 +217,9 @@ static int local_ipcp_flow_alloc(int fd,
return 0;
}
-static int local_ipcp_flow_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len)
+static int local_ipcp_flow_alloc_resp(int fd,
+ int response,
+ const buffer_t * data)
{
int out_fd;
time_t mpl = IPCP_LOCAL_MPL;
@@ -249,7 +247,7 @@ static int local_ipcp_flow_alloc_resp(int fd,
fset_add(local_data.flows, fd);
- if (ipcp_flow_alloc_reply(out_fd, response, mpl, data, len) < 0)
+ if (ipcp_flow_alloc_reply(out_fd, response, mpl, data) < 0)
return -1;
log_info("Flow allocation completed, fds (%d, %d).", out_fd, fd);
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index 909ca0a5..2e8d84ce 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -203,18 +203,17 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr,
uint32_t s_eid,
const uint8_t * dst,
qosspec_t qs,
- const void * data,
- size_t dlen)
+ const buffer_t * data)
{
uint8_t * buf;
struct mgmt_msg * msg;
size_t len;
- assert(dlen > 0 ? data != NULL : data == NULL);
+ assert(data->len > 0 ? data->data != NULL : data->data == NULL);
len = sizeof(*msg) + ipcp_dir_hash_len();
- buf = malloc(len + dlen);
+ buf = malloc(len + data->len);
if (buf == NULL)
return -1;
@@ -233,10 +232,10 @@ static int udp_ipcp_port_alloc(const struct sockaddr_in * r_saddr,
msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
- if (dlen > 0)
- memcpy(buf + len, data, dlen);
+ if (data->len > 0)
+ memcpy(buf + len, data->data, data->len);
- if (sendto(udp_data.s_fd, msg, len + dlen,
+ if (sendto(udp_data.s_fd, msg, len + data->len,
SENDTO_FLAGS,
(const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0) {
free(buf);
@@ -252,12 +251,11 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr,
uint32_t s_eid,
uint32_t d_eid,
int8_t response,
- const void * data,
- size_t len)
+ const buffer_t * data)
{
struct mgmt_msg * msg;
- msg = malloc(sizeof(*msg) + len);
+ msg = malloc(sizeof(*msg) + data->len);
if (msg == NULL)
return -1;
@@ -267,10 +265,10 @@ static int udp_ipcp_port_alloc_resp(const struct sockaddr_in * r_saddr,
msg->d_eid = hton32(d_eid);
msg->response = response;
- if (len > 0)
- memcpy(msg + 1, data, len);
+ if (data->len > 0)
+ memcpy(msg + 1, data->data, data->len);
- if (sendto(udp_data.s_fd, msg, sizeof(*msg) + len,
+ if (sendto(udp_data.s_fd, msg, sizeof(*msg) + data->len,
SENDTO_FLAGS,
(const struct sockaddr *) r_saddr, sizeof(*r_saddr)) < 0 ) {
free(msg);
@@ -286,12 +284,11 @@ static int udp_ipcp_port_req(struct sockaddr_in * c_saddr,
int d_eid,
const uint8_t * dst,
qosspec_t qs,
- const void * data,
- size_t len)
+ const buffer_t * data)
{
int fd;
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data, len);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UDP_MPL, data);
if (fd < 0) {
log_err("Could not get new flow from IRMd.");
return -1;
@@ -314,8 +311,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr,
uint32_t s_eid,
uint32_t d_eid,
int8_t response,
- const void * data,
- size_t len)
+ const buffer_t * data)
{
time_t mpl = IPCP_UDP_MPL;
@@ -333,7 +329,7 @@ static int udp_ipcp_port_alloc_reply(const struct sockaddr_in * saddr,
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (ipcp_flow_alloc_reply(s_eid, response, mpl, data, len) < 0) {
+ if (ipcp_flow_alloc_reply(s_eid, response, mpl, data) < 0) {
log_err("Failed to reply to flow allocation.");
return -1;
}
@@ -351,6 +347,7 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf,
struct mgmt_msg * msg;
size_t msg_len;
qosspec_t qs;
+ buffer_t data;
msg = (struct mgmt_msg *) buf;
@@ -360,6 +357,10 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf,
assert(len >= msg_len);
+ data.len = len - msg_len;
+ data.data = (uint8_t *) buf + msg_len;
+
+
qs.delay = ntoh32(msg->delay);
qs.bandwidth = ntoh64(msg->bandwidth);
qs.availability = msg->availability;
@@ -372,17 +373,18 @@ static int udp_ipcp_mgmt_frame(const uint8_t * buf,
return udp_ipcp_port_req(&c_saddr, ntoh32(msg->s_eid),
(uint8_t *) (msg + 1), qs,
- buf + msg_len,
- len - msg_len);
+ &data);
case FLOW_REPLY:
assert(len >= sizeof(*msg));
+ data.len = len - sizeof(*msg);
+ data.data = (uint8_t *) buf + sizeof(*msg);
+
return udp_ipcp_port_alloc_reply(&c_saddr,
ntoh32(msg->s_eid),
ntoh32(msg->d_eid),
msg->response,
- buf + sizeof(*msg),
- len - sizeof(*msg));
+ &data);
default:
log_err("Unknown message received %d.", msg->code);
return -1;
@@ -983,11 +985,10 @@ static int udp_ipcp_query(const uint8_t * hash)
return 0;
}
-static int udp_ipcp_flow_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len)
+static int udp_ipcp_flow_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
struct sockaddr_in r_saddr; /* Server address */
uint32_t ip_addr = 0;
@@ -1017,7 +1018,7 @@ static int udp_ipcp_flow_alloc(int fd,
r_saddr.sin_addr.s_addr = ip_addr;
r_saddr.sin_port = udp_data.s_saddr.sin_port;
- if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data, len) < 0) {
+ if (udp_ipcp_port_alloc(&r_saddr, fd, dst, qs, data) < 0) {
log_err("Could not allocate port.");
return -1;
}
@@ -1034,10 +1035,9 @@ static int udp_ipcp_flow_alloc(int fd,
return 0;
}
-static int udp_ipcp_flow_alloc_resp(int fd,
- int resp,
- const void * data,
- size_t len)
+static int udp_ipcp_flow_alloc_resp(int fd,
+ int resp,
+ const buffer_t * data)
{
struct sockaddr_in saddr;
int d_eid;
@@ -1054,7 +1054,7 @@ static int udp_ipcp_flow_alloc_resp(int fd,
pthread_rwlock_unlock(&udp_data.flows_lock);
- if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data, len) < 0) {
+ if (udp_ipcp_port_alloc_resp(&saddr, d_eid, fd, resp, data) < 0) {
fset_del(udp_data.np1_flows, fd);
log_err("Failed to respond to flow request.");
return -1;
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index cea9483e..3631fd7b 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -478,8 +478,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qosspec_t qs;
struct fa_flow * flow;
uint8_t * dst;
- uint8_t * data; /* Piggbacked data on flow alloc request. */
- size_t dlen; /* Length of piggybacked data. */
+ buffer_t data; /* Piggbacked data on flow alloc request. */
msg_len = sizeof(*msg) + ipcp_dir_hash_len();
if (len < msg_len) {
@@ -487,9 +486,9 @@ static int fa_handle_flow_req(struct fa_msg * msg,
return -EPERM;
}
- dst = (uint8_t *)(msg + 1);
- data = (uint8_t *) msg + msg_len;
- dlen = len - msg_len;
+ dst = (uint8_t *)(msg + 1);
+ data.data = (uint8_t *) msg + msg_len;
+ data.len = len - msg_len;
qs.delay = ntoh32(msg->delay);
qs.bandwidth = ntoh64(msg->bandwidth);
@@ -501,7 +500,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
qs.cypher_s = ntoh16(msg->cypher_s);
qs.timeout = ntoh32(msg->timeout);
- fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, data, dlen);
+ fd = ipcp_wait_flow_req_arr(dst, qs, IPCP_UNICAST_MPL, &data);
if (fd < 0)
return fd;
@@ -525,14 +524,13 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
{
int fd;
struct fa_flow * flow;
- uint8_t * data; /* Piggbacked data on flow alloc request. */
- size_t dlen; /* Length of piggybacked data. */
+ buffer_t data; /* Piggbacked data on flow alloc request. */
time_t mpl = IPCP_UNICAST_MPL;
assert(len >= sizeof(*msg));
- data = (uint8_t *) msg + sizeof(*msg);
- dlen = len - sizeof(*msg);
+ data.data = (uint8_t *) msg + sizeof(*msg);
+ data.len = len - sizeof(*msg);
pthread_rwlock_wrlock(&fa.flows_lock);
@@ -555,7 +553,7 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
pthread_rwlock_unlock(&fa.flows_lock);
- if (ipcp_flow_alloc_reply(fd, msg->response, mpl, data, dlen) < 0) {
+ if (ipcp_flow_alloc_reply(fd, msg->response, mpl, &data) < 0) {
log_err("Failed to reply for flow allocation on fd %d.", fd);
return -EIRMD;
}
@@ -738,11 +736,10 @@ void fa_stop(void)
psched_destroy(fa.psched);
}
-int fa_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t dlen)
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data)
{
struct fa_msg * msg;
struct shm_du_buff * sdb;
@@ -758,7 +755,7 @@ int fa_alloc(int fd,
len = sizeof(*msg) + ipcp_dir_hash_len();
- if (ipcp_sdb_reserve(&sdb, len + dlen))
+ if (ipcp_sdb_reserve(&sdb, len + data->len))
return -1;
msg = (struct fa_msg *) shm_du_buff_head(sdb);
@@ -780,8 +777,8 @@ int fa_alloc(int fd,
msg->timeout = hton32(qs.timeout);
memcpy(msg + 1, dst, ipcp_dir_hash_len());
- if (dlen > 0)
- memcpy(shm_du_buff_head(sdb) + len, data, dlen);
+ if (data->len > 0)
+ memcpy(shm_du_buff_head(sdb) + len, data->data, data->len);
if (dt_write_packet(addr, qc, fa.eid, sdb)) {
log_err("Failed to send flow allocation request packet.");
@@ -802,10 +799,9 @@ int fa_alloc(int fd,
return 0;
}
-int fa_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len)
+int fa_alloc_resp(int fd,
+ int response,
+ const buffer_t * data)
{
struct fa_msg * msg;
struct shm_du_buff * sdb;
@@ -819,9 +815,9 @@ int fa_alloc_resp(int fd,
goto fail_alloc_resp;
}
- if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
+ if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + data->len)) {
log_err("Failed to reserve sdb (%zu bytes).",
- sizeof(*msg) + len);
+ sizeof(*msg) + data->len);
goto fail_reserve;
}
@@ -830,8 +826,8 @@ int fa_alloc_resp(int fd,
msg->code = FLOW_REPLY;
msg->response = response;
- if (len > 0)
- memcpy(msg + 1, data, len);
+ if (data->len > 0)
+ memcpy(msg + 1, data->data, data->len);
pthread_rwlock_rdlock(&fa.flows_lock);
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index 6d559a22..1e716966 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -34,16 +34,14 @@ int fa_start(void);
void fa_stop(void);
-int fa_alloc(int fd,
- const uint8_t * dst,
- qosspec_t qs,
- const void * data,
- size_t len);
-
-int fa_alloc_resp(int fd,
- int response,
- const void * data,
- size_t len);
+int fa_alloc(int fd,
+ const uint8_t * dst,
+ qosspec_t qs,
+ const buffer_t * data);
+
+int fa_alloc_resp(int fd,
+ int response,
+ const buffer_t * data);
int fa_dealloc(int fd);
diff --git a/src/irmd/CMakeLists.txt b/src/irmd/CMakeLists.txt
index 3a5be324..7eaa0ce6 100644
--- a/src/irmd/CMakeLists.txt
+++ b/src/irmd/CMakeLists.txt
@@ -40,18 +40,19 @@ endif ()
set(IRMD_REQ_ARR_TIMEOUT 1000 CACHE STRING
"Timeout for an application to respond to a new flow (ms)")
-set(IRMD_FLOW_TIMEOUT 5000 CACHE STRING
- "Timeout for a flow allocation response (ms)")
+
set(BOOTSTRAP_TIMEOUT 5000 CACHE STRING
"Timeout for an IPCP to bootstrap (ms)")
set(ENROLL_TIMEOUT 60000 CACHE STRING
"Timeout for an IPCP to enroll (ms)")
-set(REG_TIMEOUT 10000 CACHE STRING
+set(REG_TIMEOUT 60000 CACHE STRING
"Timeout for registering a name (ms)")
-set(QUERY_TIMEOUT 3000 CACHE STRING
+set(QUERY_TIMEOUT 60000 CACHE STRING
"Timeout to query a name with an IPCP (ms)")
set(CONNECT_TIMEOUT 60000 CACHE STRING
"Timeout to connect an IPCP to another IPCP (ms)")
+set(FLOW_ALLOC_TIMEOUT 5000 CACHE STRING
+ "Timeout for a flow allocation response (ms)")
set(IRMD_MIN_THREADS 8 CACHE STRING
"Minimum number of worker threads in the IRMd")
set(IRMD_ADD_THREADS 8 CACHE STRING
diff --git a/src/irmd/config.h.in b/src/irmd/config.h.in
index b25053f7..fa1156b9 100644
--- a/src/irmd/config.h.in
+++ b/src/irmd/config.h.in
@@ -21,38 +21,39 @@
*/
-#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@"
-#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@"
-#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@"
-#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@"
-#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@"
-#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
+#define IPCP_UDP_EXEC "@IPCP_UDP_TARGET@"
+#define IPCP_ETH_LLC_EXEC "@IPCP_ETH_LLC_TARGET@"
+#define IPCP_ETH_DIX_EXEC "@IPCP_ETH_DIX_TARGET@"
+#define IPCP_UNICAST_EXEC "@IPCP_UNICAST_TARGET@"
+#define IPCP_BROADCAST_EXEC "@IPCP_BROADCAST_TARGET@"
+#define IPCP_LOCAL_EXEC "@IPCP_LOCAL_TARGET@"
-#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@"
-#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@"
+#define INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@"
+#define INSTALL_SBINDIR "@CMAKE_INSTALL_SBINDIR@"
-#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@
+#define PTHREAD_COND_CLOCK @PTHREAD_COND_CLOCK@
-#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@
+#define SOCKET_TIMEOUT @SOCKET_TIMEOUT@
-#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@
-#define IRMD_FLOW_TIMEOUT @IRMD_FLOW_TIMEOUT@
+#define IRMD_REQ_ARR_TIMEOUT @IRMD_REQ_ARR_TIMEOUT@
-#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@
-#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@
-#define REG_TIMEOUT @REG_TIMEOUT@
-#define QUERY_TIMEOUT @QUERY_TIMEOUT@
-#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@
+#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@
+#define FLOW_DEALLOC_TIMEOUT @FLOW_DEALLOC_TIMEOUT@
-#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@
+#define BOOTSTRAP_TIMEOUT @BOOTSTRAP_TIMEOUT@
+#define ENROLL_TIMEOUT @ENROLL_TIMEOUT@
+#define REG_TIMEOUT @REG_TIMEOUT@
+#define QUERY_TIMEOUT @QUERY_TIMEOUT@
+#define CONNECT_TIMEOUT @CONNECT_TIMEOUT@
-#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@
-#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@
+#define SYS_MAX_FLOWS @SYS_MAX_FLOWS@
+#define IRMD_MIN_THREADS @IRMD_MIN_THREADS@
+#define IRMD_ADD_THREADS @IRMD_ADD_THREADS@
#cmakedefine HAVE_FUSE
#ifdef HAVE_FUSE
-#define FUSE_PREFIX "@FUSE_PREFIX@"
+#define FUSE_PREFIX "@FUSE_PREFIX@"
#endif
#cmakedefine HAVE_TOML
@@ -61,7 +62,7 @@
#define OUROBOROS_CONFIG_FILE "@OUROBOROS_CONFIG_FILE@"
#endif
-#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@
+#define IRMD_PKILL_TIMEOUT @IRMD_PKILL_TIMEOUT@
#cmakedefine IRMD_KILL_ALL_PROCESSES
#cmakedefine HAVE_LIBGCRYPT
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 3253a8f3..c8055aa1 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -58,6 +58,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid,
struct timeval tv;
struct timespec tic;
struct timespec toc;
+ bool dealloc = false;
if (kill(pid, 0) < 0)
return NULL;
@@ -101,6 +102,15 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid,
tv.tv_sec = CONNECT_TIMEOUT / 1000;
tv.tv_usec = (CONNECT_TIMEOUT % 1000) * 1000;
break;
+ case IPCP_MSG_CODE__IPCP_FLOW_ALLOC:
+ tv.tv_sec = FLOW_ALLOC_TIMEOUT / 1000;
+ tv.tv_usec = (FLOW_ALLOC_TIMEOUT % 1000) * 1000;
+ break;
+ case IPCP_MSG_CODE__IPCP_FLOW_DEALLOC:
+ dealloc = true;
+ tv.tv_sec = 0; /* FIX DEALLOC: don't wait for dealloc */
+ tv.tv_usec = 500;
+ break;
default:
tv.tv_sec = SOCKET_TIMEOUT / 1000;
tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000;
@@ -127,7 +137,7 @@ ipcp_msg_t * send_recv_ipcp_msg(pid_t pid,
if (len > 0)
recv_msg = ipcp_msg__unpack(NULL, len, buf);
else {
- if (errno == EAGAIN) {
+ if (errno == EAGAIN && !dealloc) {
int diff = ts_diff_ms(&tic, &toc);
log_warn("IPCP command timed out after %d ms.", diff);
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 2cbe8ed4..32f41ab2 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1234,14 +1234,14 @@ static int flow_alloc_reply(struct flow_info * flow,
}
static int flow_dealloc(struct flow_info * flow,
- time_t timeo)
+ struct timespec * ts)
{
log_info("Deallocating flow %d for process %d.",
flow->id, flow->n_pid);
reg_dealloc_flow(flow);
- if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, timeo) < 0) {
+ if (ipcp_flow_dealloc(flow->n_1_pid, flow->id, ts->tv_sec) < 0) {
log_err("Failed to request dealloc from %d.", flow->n_1_pid);
return -EIPCP;
}
@@ -1324,14 +1324,27 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
struct flow_info flow;
struct proc_info proc;
struct name_info name;
- struct timespec * abstime = NULL;
- struct timespec ts;
+ struct timespec * abstime;
+ struct timespec max = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT);
+ struct timespec now;
+ struct timespec ts = TIMESPEC_INIT_S(0); /* static analysis */
int res;
irm_msg_t * ret_msg;
buffer_t data;
memset(&flow, 0, sizeof(flow));
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+ if (msg->timeo != NULL) {
+ ts = timespec_msg_to_s(msg->timeo);
+ ts_add(&ts, &now, &ts);
+ abstime = &ts;
+ } else {
+ ts_add(&max, &now, &max);
+ abstime = NULL;
+ }
+
ret_msg = malloc(sizeof(*ret_msg));
if (ret_msg == NULL) {
log_err("Failed to malloc return msg.");
@@ -1342,20 +1355,6 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
ret_msg->code = IRM_MSG_CODE__IRM_REPLY;
- if (msg->has_timeo_sec) {
- struct timespec now;
-
- clock_gettime(PTHREAD_COND_CLOCK, &now);
- assert(msg->has_timeo_nsec);
-
- ts.tv_sec = msg->timeo_sec;
- ts.tv_nsec = msg->timeo_nsec;
-
- ts_add(&ts, &now, &ts);
-
- abstime = &ts;
- }
-
pthread_cleanup_push(free_msg, ret_msg);
switch (msg->code) {
@@ -1430,20 +1429,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
data.len = msg->pk.len;
data.data = msg->pk.data;
+ msg->has_pk = false;
assert(data.len > 0 ? data.data != NULL : data.data == NULL);
- flow.n_pid = msg->pid;
- flow.qs = qos_raw;
+ flow = flow_info_msg_to_s(msg->flow_info);
res = flow_accept(&flow, &data, abstime);
if (res == 0) {
- qosspec_msg_t * qs_msg;
- qs_msg = qos_spec_s_to_msg(&flow.qs);
- ret_msg->has_flow_id = true;
- ret_msg->flow_id = flow.id;
- ret_msg->has_pid = true;
- ret_msg->pid = flow.n_1_pid;
- ret_msg->has_mpl = true;
- ret_msg->qosspec = qs_msg;
- ret_msg->mpl = flow.mpl;
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
ret_msg->has_symmkey = data.len != 0;
ret_msg->symmkey.data = data.data;
ret_msg->symmkey.len = data.len;
@@ -1453,17 +1444,12 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
data.len = msg->pk.len;
data.data = msg->pk.data;
msg->has_pk = false;
- flow.n_pid = msg->pid;
- flow.qs = qos_spec_msg_to_s(msg->qosspec);
assert(data.len > 0 ? data.data != NULL : data.data == NULL);
+ flow = flow_info_msg_to_s(msg->flow_info);
+ abstime = abstime == NULL ? &max : abstime;
res = flow_alloc(&flow, msg->dst, &data, abstime);
if (res == 0) {
- ret_msg->has_flow_id = true;
- ret_msg->flow_id = flow.id;
- ret_msg->has_pid = true;
- ret_msg->pid = flow.n_1_pid;
- ret_msg->has_mpl = true;
- ret_msg->mpl = flow.mpl;
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
ret_msg->has_symmkey = data.len != 0;
ret_msg->symmkey.data = data.data;
ret_msg->symmkey.len = data.len;
@@ -1471,46 +1457,38 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
break;
case IRM_MSG_CODE__IRM_FLOW_JOIN:
assert(msg->pk.len == 0 && msg->pk.data == NULL);
- flow.qs = qos_spec_msg_to_s(msg->qosspec);
+ flow = flow_info_msg_to_s(msg->flow_info);
+ abstime = abstime == NULL ? &max : abstime;
res = flow_join(&flow, msg->dst, abstime);
+ if (res == 0)
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
- flow.n_pid = msg->pid;
- flow.id = msg->flow_id;
- res = flow_dealloc(&flow, msg->timeo_sec);
+ flow = flow_info_msg_to_s(msg->flow_info);
+ res = flow_dealloc(&flow, &ts);
break;
case IRM_MSG_CODE__IPCP_FLOW_DEALLOC:
- flow.n_1_pid = msg->pid;
- flow.id = msg->flow_id;
+ flow = flow_info_msg_to_s(msg->flow_info);
res = flow_dealloc_resp(&flow);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
data.len = msg->pk.len;
data.data = msg->pk.data;
- msg->has_pk = false; /* pass data */
- msg->pk.data = NULL;
+ msg->pk.data = NULL; /* pass data */
msg->pk.len = 0;
assert(data.len > 0 ? data.data != NULL : data.data == NULL);
- flow.n_1_pid = msg->pid;
- flow.mpl = msg->mpl;
- flow.qs = qos_spec_msg_to_s(msg->qosspec);
+ flow = flow_info_msg_to_s(msg->flow_info);
res = flow_req_arr(&flow, msg->hash.data, &data);
- if (res == 0) {
- ret_msg->has_flow_id = true;
- ret_msg->flow_id = flow.id;
- ret_msg->has_pid = true;
- ret_msg->pid = flow.n_pid;
- }
+ if (res == 0)
+ ret_msg->flow_info = flow_info_s_to_msg(&flow);
break;
case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:
data.len = msg->pk.len;
data.data = msg->pk.data;
- msg->has_pk = false; /* pass data */
- msg->pk.data = NULL;
+ msg->pk.data = NULL; /* pass data */
msg->pk.len = 0;
assert(data.len > 0 ? data.data != NULL : data.data == NULL);
- flow.id = msg->flow_id;
- flow.mpl = msg->mpl;
+ flow = flow_info_msg_to_s(msg->flow_info);
res = flow_alloc_reply(&flow, msg->response, &data);
break;
default:
@@ -1522,7 +1500,10 @@ static irm_msg_t * do_command_msg(irm_msg_t * msg)
pthread_cleanup_pop(false);
ret_msg->has_result = true;
- ret_msg->result = res;
+ if (abstime == &max && res == -ETIMEDOUT)
+ ret_msg->result = -EPERM; /* No timeout requested */
+ else
+ ret_msg->result = res;
return ret_msg;
}
@@ -1664,8 +1645,6 @@ static void destroy_mount(char * mnt)
{
struct stat st;
- log_dbg("Destroying mountpoint %s.", mnt);
-
if (stat(mnt, &st) == -1){
switch(errno) {
case ENOENT:
@@ -1719,7 +1698,7 @@ static void cleanup_pid(pid_t pid)
void * irm_sanitize(void * o)
{
pid_t pid;
- struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 20);
+ struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 20);
(void) o;
@@ -2003,7 +1982,7 @@ static void * kill_dash_nine(void * o)
{
time_t slept = 0;
#ifdef IRMD_KILL_ALL_PROCESSES
- struct timespec ts = TIMESPEC_INIT_MS(IRMD_FLOW_TIMEOUT / 19);
+ struct timespec ts = TIMESPEC_INIT_MS(FLOW_ALLOC_TIMEOUT / 19);
#endif
(void) o;
diff --git a/src/irmd/reg/reg.c b/src/irmd/reg/reg.c
index f486c1cc..731e44b6 100644
--- a/src/irmd/reg/reg.c
+++ b/src/irmd/reg/reg.c
@@ -1490,7 +1490,7 @@ int reg_get_exec(enum hash_algo algo,
exec = __reg_get_exec(algo, hash);
if (exec == NULL) {
- ret = 0;
+ ret = -EPERM;
goto finish;
}
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4a4684b0..b0bdb5ce 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -4,8 +4,8 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_BINARY_DIR}/include)
-protobuf_generate_c(QOSSPEC_PROTO_SRCS QOSSPEC_PROTO_HDRS
- pb/qos.proto)
+protobuf_generate_c(MODEL_PROTO_SRCS MODEL_PROTO_HDRS
+ pb/model.proto)
protobuf_generate_c(IPCP_CONFIG_PROTO_SRCS IPCP_CONFIG_PROTO_HDRS
pb/ipcp_config.proto)
protobuf_generate_c(ENROLL_PROTO_SRCS ENROLL_PROTO_HDRS
@@ -264,6 +264,7 @@ set(SOURCE_FILES_COMMON
qoscube.c
random.c
rib.c
+ serdes-irm.c
serdes-oep.c
sha3.c
shm_flow_set.c
@@ -278,7 +279,7 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/config.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/config.h" @ONLY)
add_library(ouroboros-common SHARED ${SOURCE_FILES_COMMON} ${IRM_PROTO_SRCS}
- ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${QOSSPEC_PROTO_SRCS}
+ ${IPCP_PROTO_SRCS} ${IPCP_CONFIG_PROTO_SRCS} ${MODEL_PROTO_SRCS}
${ENROLL_PROTO_SRCS})
add_library(ouroboros-dev SHARED ${SOURCE_FILES_DEV} ${CACEP_PROTO_SRCS})
diff --git a/src/lib/config.h.in b/src/lib/config.h.in
index d1eb9a54..604038b4 100644
--- a/src/lib/config.h.in
+++ b/src/lib/config.h.in
@@ -42,6 +42,7 @@
#define SHM_RDRB_BLOCK_SIZE @SHM_RDRB_BLOCK_SIZE@
#define SHM_BUFFER_SIZE @SHM_BUFFER_SIZE@
#define SHM_RBUFF_SIZE @SHM_RBUFF_SIZE@
+#define FLOW_ALLOC_TIMEOUT @FLOW_ALLOC_TIMEOUT@
#if defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))
/* Avoid a bug in robust mutex implementation of glibc 2.25 */
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9e37978c..a7f20e88 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -42,9 +42,9 @@
#include <ouroboros/fccntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/protobuf.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
@@ -57,6 +57,7 @@
#ifdef HAVE_LIBGCRYPT
#include <gcrypt.h>
#endif
+#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -79,6 +80,7 @@
/* map flow_ids to flow descriptors; track state of the flow */
struct fmap {
int fd;
+ /* TODO: use actual flow state */
enum flow_state state;
};
@@ -88,12 +90,13 @@ struct fmap {
struct flow {
struct list_head next;
+ struct flow_info info;
+
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int flow_id;
+
uint16_t oflags;
- qosspec_t qs;
ssize_t part_idx;
struct crypt_info crypt;
@@ -221,53 +224,32 @@ static enum flow_state flow_wait_assign(int flow_id)
return state;
}
-static int proc_announce(char * prog)
+static int proc_announce(const char * prog)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.prog = prog;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return ret;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ return -ENOMEM;
- return ret;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
}
+/* IRMd will clean up the mess if this fails */
static void proc_exit(void)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_EXIT;
- msg.has_pid = true;
- msg.pid = getpid();
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ if (proc_exit__irm_req_ser(&msg) < 0)
return;
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return;
- }
-
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return;
+ send_recv_msg(&msg);
}
#include "frct.c"
@@ -305,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow,
if (shm_rbuff_write(flow->tx_rb, idx))
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
}
@@ -323,8 +305,8 @@ static void _flow_keepalive(struct flow * flow)
s_act = flow->snd_act;
r_act = flow->rcv_act;
- flow_id = flow->flow_id;
- timeo = flow->qs.timeout;
+ flow_id = flow->info.id;
+ timeo = flow->info.qs.timeout;
acl = shm_rbuff_get_acl(flow->rx_rb);
if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
@@ -400,10 +382,10 @@ static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].flow_id = -1;
+ ai.flows[fd].info.id = -1;
}
-static void flow_fini(int fd)
+static void __flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -414,13 +396,13 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
frcti_destroy(ai.flows[fd].frcti);
}
- if (ai.flows[fd].flow_id != -1) {
- flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]);
+ if (ai.flows[fd].info.id != -1) {
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
bmp_release(ai.fds, fd);
}
@@ -436,7 +418,7 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL) {
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
}
@@ -448,11 +430,17 @@ static void flow_fini(int fd)
flow_clear(fd);
}
-static int flow_init(int flow_id,
- pid_t pid,
- qosspec_t qs,
- uint8_t * s,
- time_t mpl)
+static void flow_fini(int fd)
+{
+ pthread_rwlock_wrlock(&ai.lock);
+
+ __flow_fini(fd);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static int flow_init(struct flow_info * info,
+ buffer_t * sk)
{
struct timespec now;
struct flow * flow;
@@ -471,43 +459,43 @@ static int flow_init(int flow_id,
flow = &ai.flows[fd];
- flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
+ flow->info = *info;
+
+ flow->rx_rb = shm_rbuff_open(info->n_pid, info->id);
if (flow->rx_rb == NULL)
goto fail_rx_rb;
- flow->tx_rb = shm_rbuff_open(pid, flow_id);
+ flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id);
if (flow->tx_rb == NULL)
goto fail_tx_rb;
- flow->set = shm_flow_set_open(pid);
+ flow->set = shm_flow_set_open(info->n_1_pid);
if (flow->set == NULL)
goto fail_set;
- flow->flow_id = flow_id;
flow->oflags = FLOWFDEFAULT;
flow->part_idx = NO_PART;
- flow->qs = qs;
flow->snd_act = now;
flow->rcv_act = now;
- flow->crypt.flags = qs.cypher_s; /* TODO: remove cypher_s from qos */
+ flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */
- if (flow->crypt.flags > 0 && s != NULL) /* static analyzer s != NULL */
- memcpy(flow->crypt.key, s ,SYMMKEYSZ);
- else
- memset(flow->crypt.key, 0, SYMMKEYSZ);
+ memset(flow->crypt.key, 0, SYMMKEYSZ);
+
+ if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL)
+ memcpy(flow->crypt.key, sk->data , sk->len);
if (crypt_init(&flow->crypt) < 0)
goto fail_crypt;
assert(flow->frcti == NULL);
- if (flow->qs.in_order != 0) {
- flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
+ if (info->qs.in_order != 0) {
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
if (flow->frcti == NULL)
goto fail_frcti;
- if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ if (shm_flow_set_add(ai.fqset, 0, info->id))
goto fail_flow_set_add;
++ai.n_frcti;
@@ -518,16 +506,16 @@ static int flow_init(int flow_id,
list_add_tail(&flow->next, &ai.flow_list);
- ai.id_to_fd[flow_id].fd = fd;
+ ai.id_to_fd[info->id].fd = fd;
- flow_set_state(&ai.id_to_fd[flow_id], FLOW_ALLOCATED);
+ flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
pthread_rwlock_unlock(&ai.lock);
return fd;
fail_tx_thread:
- shm_flow_set_del(ai.fqset, 0, flow_id);
+ shm_flow_set_del(ai.fqset, 0, info->id);
fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
@@ -722,12 +710,12 @@ static void fini(void)
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].flow_id != -1) {
+ if (ai.flows[i].info.id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- flow_fini(i);
+ __flow_fini(i);
}
}
@@ -774,142 +762,94 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini;
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
- uint8_t * symmkey;
-
- msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
- msg.has_pid = true;
- msg.pid = getpid();
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk;
+ int fd;
+ int err;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- goto fail_recv;
-
- if (!recv_msg->has_result)
- goto fail_msg;
-
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_msg;
- }
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl || recv_msg->qosspec == NULL)
- goto fail_msg;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- symmkey = recv_msg->has_symmkey ? recv_msg->symmkey.data : NULL;
+ if (flow_accept__irm_req_ser(&msg, &flow, timeo))
+ return -ENOMEM;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qos_spec_msg_to_s(recv_msg->qosspec),
- symmkey,
- recv_msg->mpl);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- if (fd < 0)
- return fd;
+ fd = flow_init(&flow, &sk);
- pthread_rwlock_rdlock(&ai.lock);
+ freebuf(sk);
if (qs != NULL)
- *qs = ai.flows[fd].qs;
-
- pthread_rwlock_unlock(&ai.lock);
+ *qs = flow.qs;
return fd;
-
- fail_msg:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_recv:
- return err;
}
int flow_alloc(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk; /* symmetric key */
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
-
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
- if (recv_msg == NULL)
- goto fail_send_recv;
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_result)
- goto fail_result;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if ((qs != NULL && qs->cypher_s != 0) &&
- (!recv_msg->has_symmkey || recv_msg->symmkey.len != SYMMKEYSZ)) {
- err = -ECRYPT;
- goto fail_result;
- }
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- /* TODO: Make sure qosspec is set in msg */
- if (qs != NULL && recv_msg->qosspec != NULL)
- *qs = qos_spec_msg_to_s(recv_msg->qosspec);
+ fd = flow_init(&flow, &sk);
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, recv_msg->symmkey.data,
- recv_msg->mpl);
+ freebuf(sk);
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send_recv:
- return err;
}
int flow_join(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t s[SYMMKEYSZ];
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
@@ -918,184 +858,145 @@ int flow_join(const char * dst,
if (qs != NULL && qs->cypher_s > 0)
return -ENOTSUP; /* TODO: Encrypted broadcast */
- memset(s, 0, SYMMKEYSZ);
+ memset(&flow, 0, sizeof(flow));
- msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (recv_msg == NULL)
- goto fail_send;
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- if (!recv_msg->has_result)
- goto fail_result;
+ fd = flow_init(&flow, NULL);
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
-
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
-
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, s,
- recv_msg->mpl);
-
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send:
- return err;
}
+#define PKT_BUF_LEN 2048
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t buf[128];
- struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
- struct flow * f;
- time_t timeo;
+ struct flow_info info;
+ uint8_t pkt[PKT_BUF_LEN];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ struct timespec timeo = TIMESPEC_INIT_S(0);
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_nsec = 0;
+ memset(&info, 0, sizeof(flow));
- f = &ai.flows[fd];
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
-
- f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+ flow->oflags = FLOWFDEFAULT | FLOWFRNOPART;
- f->rcv_timesout = true;
- f->rcv_timeo = tic;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = tic;
pthread_rwlock_unlock(&ai.lock);
- flow_read(fd, buf, 128);
+ flow_read(fd, buf, SOCK_BUF_SIZE);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
- while (timeo < 0) { /* keep the flow active for rtx */
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+ while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
ssize_t ret;
pthread_rwlock_unlock(&ai.lock);
- ret = flow_read(fd, buf, 128);
+ ret = flow_read(fd, pkt, PKT_BUF_LEN);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
- if (ret == -EFLOWDOWN && timeo < 0)
- timeo = -timeo;
+ if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
+ timeo.tv_sec = -timeo.tv_sec;
}
- msg.timeo_sec = timeo;
-
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
- shm_rbuff_fini(ai.flows[fd].tx_rb);
+ shm_rbuff_fini(flow->tx_rb);
pthread_cleanup_pop(true);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ info.id = flow->info.id;
+ info.n_pid = getpid();
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int ipcp_flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- struct flow * f;
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_flow_id = true;
+ flow = &ai.flows[fd];
- f = &ai.flows[fd];
+ memset(&info, 0, sizeof(flow));
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
+ info.id = flow->info.id;
+ info.n_1_pid = flow->info.n_1_pid;
pthread_rwlock_unlock(&ai.lock);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int fccntl(int fd,
@@ -1122,7 +1023,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -1167,7 +1068,7 @@ int fccntl(int fd,
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = flow->qs;
+ *qs = flow->info.qs;
break;
case FLOWGRXQLEN:
qlen = va_arg(l, size_t *);
@@ -1194,13 +1095,13 @@ int fccntl(int fd,
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_UP);
}
@@ -1302,7 +1203,7 @@ static int flow_tx_sdb(struct flow * flow,
if (crypt_encrypt(&flow->crypt, sdb) < 0)
goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && add_crc(sdb) != 0)
goto enomem;
}
@@ -1316,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_cleanup_pop(true);
@@ -1353,7 +1254,7 @@ ssize_t flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1398,7 +1299,7 @@ static bool invalid_pkt(struct flow * flow,
if (shm_du_buff_len(sdb) == 0)
return true;
- if (flow->qs.ber == 0 && chk_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0)
return true;
if (crypt_decrypt(&flow->crypt, sdb) < 0)
@@ -1461,7 +1362,7 @@ ssize_t flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1627,20 +1528,20 @@ int fset_add(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
ret = -EINVAL;
goto fail;
}
if (flow->frcti != NULL)
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
if (ret < 0)
goto fail;
if (shm_rbuff_queued(ai.flows[fd].rx_rb))
- shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1663,11 +1564,11 @@ void fset_del(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->info.id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1682,12 +1583,12 @@ bool fset_has(const struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
pthread_rwlock_unlock(&ai.lock);
@@ -1828,10 +1729,20 @@ ssize_t fevent(struct flow_set * set,
/* ipcp-dev functions. */
-int np1_flow_alloc(pid_t n_pid,
- int flow_id)
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id)
{
- return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
+ struct flow_info flow;
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.id = flow_id;
+ flow.n_pid = getpid();
+ flow.qs = qos_np1;
+ flow.mpl = 0;
+ flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+
+ return flow_init(&flow, NULL);
}
int np1_flow_dealloc(int flow_id,
@@ -1874,123 +1785,85 @@ int np1_flow_resp(int flow_id)
int ipcp_create_r(const struct ipcp_info * info)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.ipcp_info = ipcp_info_s_to_msg(info);
+ if (ipcp_create_r__irm_req_ser(&msg,info) < 0)
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- ipcp_info_msg__free_unpacked(msg.ipcp_info, NULL);
+ return irm__irm_result_des(&msg);
+}
- if (recv_msg == NULL)
- return -EIRMD;
+int ipcp_flow_req_arr(const buffer_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
+{
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ memset(&flow, 0, sizeof(flow));
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
+ assert(dst != NULL && dst->len != 0 && dst->data != NULL);
- return ret;
-}
+ flow.n_1_pid = getpid();
+ flow.qs = qs;
+ flow.mpl = mpl;
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t dlen)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
-
- assert(dst != NULL);
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- msg.qosspec = qos_spec_s_to_msg(&qs);
- msg.has_mpl = true;
- msg.mpl = mpl;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = dlen;
-
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
-
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_flow_id || !recv_msg->has_pid) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
+ return -ENOMEM;
- if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ /* inverted for np1_flow */
+ flow.n_1_pid = flow.n_pid;
+ flow.n_pid = getpid();
+ flow.mpl = 0;
- return fd;
+ return flow_init(&flow, NULL);
}
-int ipcp_flow_alloc_reply(int fd,
- int response,
- time_t mpl,
- const void * data,
- size_t len)
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_flow_id = true;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = (uint32_t) len;
- msg.has_mpl = true;
- msg.mpl = mpl;
-
pthread_rwlock_rdlock(&ai.lock);
- msg.flow_id = ai.flows[fd].flow_id;
+ flow.id = ai.flows[fd].info.id;
pthread_rwlock_unlock(&ai.lock);
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ flow.mpl = mpl;
- ret = recv_msg->result;
+ if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
}
int ipcp_flow_read(int fd,
@@ -2006,7 +1879,7 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
while (frcti_queued_pdu(flow->frcti) < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -2038,7 +1911,7 @@ int ipcp_flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2066,7 +1939,7 @@ int np1_flow_read(int fd,
flow = &ai.flows[fd];
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
pthread_rwlock_rdlock(&ai.lock);
@@ -2097,7 +1970,7 @@ int np1_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2115,7 +1988,7 @@ int np1_flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
return ret;
}
@@ -2139,7 +2012,7 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -1;
}
@@ -2148,7 +2021,7 @@ int ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
rx_rb = ai.flows[fd].rx_rb;
@@ -2169,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
- *cube = qos_spec_to_cube(ai.flows[fd].qs);
+ *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
pthread_rwlock_unlock(&ai.lock);
@@ -2184,7 +2057,7 @@ size_t ipcp_flow_queued(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
q = shm_rbuff_queued(ai.flows[fd].tx_rb);
@@ -2220,14 +2093,14 @@ int local_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
else
shm_rdrbuff_remove(ai.rdrb, idx);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 604e8a62..c6fef35c 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -271,7 +271,7 @@ static void __send_frct_pkt(int fd,
#endif
goto fail;
- shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+ shm_flow_set_notify(f->set, f->info.id, FLOW_PKT);
return;
@@ -398,7 +398,7 @@ static struct frcti * frcti_create(int fd,
frcti->n_out = 0;
frcti->n_rqo = 0;
#endif
- if (ai.flows[fd].qs.loss == 0) {
+ if (ai.flows[fd].info.qs.loss == 0) {
frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
frcti->rcv_cr.cflags |= FRCTFRTX;
}
diff --git a/src/lib/pb/ipcp.proto b/src/lib/pb/ipcp.proto
index 71bf90b8..c2c7f48b 100644
--- a/src/lib/pb/ipcp.proto
+++ b/src/lib/pb/ipcp.proto
@@ -23,7 +23,8 @@
syntax = "proto2";
import "ipcp_config.proto";
-import "qos.proto";
+import "model.proto";
+
enum ipcp_msg_code {
IPCP_BOOTSTRAP = 1;
diff --git a/src/lib/pb/ipcp_config.proto b/src/lib/pb/ipcp_config.proto
index ca4d55aa..28528b0c 100644
--- a/src/lib/pb/ipcp_config.proto
+++ b/src/lib/pb/ipcp_config.proto
@@ -22,10 +22,7 @@
syntax = "proto2";
-message layer_info_msg {
- required string name = 1;
- required uint32 dir_hash_algo = 2;
-}
+import "model.proto";
message dt_config_msg {
required uint32 addr_size = 1;
diff --git a/src/lib/pb/irm.proto b/src/lib/pb/irm.proto
index c962e5e5..da3bd982 100644
--- a/src/lib/pb/irm.proto
+++ b/src/lib/pb/irm.proto
@@ -23,7 +23,7 @@
syntax = "proto2";
import "ipcp_config.proto";
-import "qos.proto";
+import "model.proto";
enum irm_msg_code {
IRM_CREATE_IPCP = 1;
@@ -55,11 +55,9 @@ enum irm_msg_code {
IRM_REPLY = 27;
}
-message ipcp_info_msg {
- required uint32 type = 1;
- required string name = 2;
- required uint32 pid = 3;
- required uint32 state = 4;
+message timespec_msg {
+ required uint64 tv_sec = 1;
+ required uint32 tv_nsec = 2;
}
message ipcp_list_msg {
@@ -70,33 +68,30 @@ message ipcp_list_msg {
required uint32 hash_algo = 5;
}
-message name_info_msg {
- required string name = 1;
- required uint32 pol_lb = 2;
-}
-
message irm_msg {
required irm_msg_code code = 1;
optional string prog = 2;
optional sint32 pid = 3;
optional string name = 4;
- optional ipcp_info_msg ipcp_info = 5;
- optional string layer = 6;
- repeated string exec = 7;
- optional sint32 response = 8;
- optional string dst = 9;
- optional bytes hash = 10;
- optional sint32 flow_id = 11;
- optional qosspec_msg qosspec = 12;
- optional ipcp_config_msg conf = 13;
- optional uint32 opts = 14;
- repeated ipcp_list_msg ipcps = 15;
- repeated name_info_msg names = 16;
- optional uint32 timeo_sec = 17;
- optional uint32 timeo_nsec = 18;
- optional sint32 mpl = 19;
- optional string comp = 20;
- optional bytes pk = 21; /* piggyback */
- optional bytes symmkey = 22;
- optional sint32 result = 23;
+ optional flow_info_msg flow_info = 5;
+ optional ipcp_info_msg ipcp_info = 6;
+ optional string layer = 7;
+ repeated string exec = 8;
+ optional sint32 response = 9;
+ optional string dst = 10;
+ optional bytes hash = 11;
+ optional sint32 flow_id = 12;
+ optional qosspec_msg qosspec = 13;
+ optional ipcp_config_msg conf = 14;
+ optional uint32 opts = 15;
+ repeated ipcp_list_msg ipcps = 16;
+ repeated name_info_msg names = 17;
+ optional timespec_msg timeo = 18;
+ optional sint32 mpl = 20;
+ optional string comp = 21;
+ optional bytes pk = 22; /* piggyback */
+ optional bytes symmkey = 23;
+ optional uint32 timeo_sec = 24;
+ optional uint32 timeo_nsec = 25;
+ optional sint32 result = 26;
}
diff --git a/src/lib/pb/qos.proto b/src/lib/pb/model.proto
index 64f5a285..f1e401f9 100644
--- a/src/lib/pb/qos.proto
+++ b/src/lib/pb/model.proto
@@ -1,7 +1,7 @@
/*
* Ouroboros - Copyright (C) 2016 - 2024
*
- * QoS specification message
+ * Model description messages
*
* Dimitri Staessens <[email protected]>
* Sander Vrijders <[email protected]>
@@ -33,3 +33,29 @@ message qosspec_msg {
required uint32 cypher_s = 8; /* Crypto strength in bits. */
required uint32 timeout = 9; /* Timeout in ms. */
}
+
+message flow_info_msg {
+ required uint32 id = 1;
+ required uint32 n_pid = 2;
+ required uint32 n_1_pid = 3;
+ required uint32 mpl = 4;
+ required uint32 state = 5;
+ required qosspec_msg qos = 6;
+}
+
+message name_info_msg {
+ required string name = 1;
+ required uint32 pol_lb = 2;
+}
+
+message layer_info_msg {
+ required string name = 1;
+ required uint32 dir_hash_algo = 2;
+}
+
+message ipcp_info_msg {
+ required uint32 type = 1;
+ required string name = 2;
+ required uint32 pid = 3;
+ required uint32 state = 4;
+}
diff --git a/src/lib/protobuf.c b/src/lib/protobuf.c
index 2135d57e..b586168c 100644
--- a/src/lib/protobuf.c
+++ b/src/lib/protobuf.c
@@ -28,6 +28,84 @@
#include <string.h>
#include <time.h>
+timespec_msg_t * timespec_s_to_msg(const struct timespec * s)
+{
+ timespec_msg_t * msg;
+
+ assert(s != NULL);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ timespec_msg__init(msg);
+
+ msg->tv_sec = s->tv_sec;
+ msg->tv_nsec = s->tv_nsec;
+
+ return msg;
+
+ fail_malloc:
+ return NULL;
+}
+
+struct timespec timespec_msg_to_s(timespec_msg_t * msg)
+{
+ struct timespec s;
+
+ assert(msg != NULL);
+
+ s.tv_sec = msg->tv_sec;
+ s.tv_nsec = msg->tv_nsec;
+
+ return s;
+}
+
+flow_info_msg_t * flow_info_s_to_msg(const struct flow_info * s)
+{
+ flow_info_msg_t * msg;
+
+ assert(s != NULL);
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ flow_info_msg__init(msg);
+
+ msg->id = s->id;
+ msg->n_pid = s->n_pid;
+ msg->n_1_pid = s->n_1_pid;
+ msg->mpl = s->mpl;
+ msg->state = s->state;
+ msg->qos = qos_spec_s_to_msg(&s->qs);
+ if (msg->qos == NULL)
+ goto fail_msg;
+
+ return msg;
+
+ fail_msg:
+ flow_info_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return NULL;
+}
+
+struct flow_info flow_info_msg_to_s(const flow_info_msg_t * msg)
+{
+ struct flow_info s;
+
+ assert(msg != NULL);
+
+ s.id = msg->id;
+ s.n_pid = msg->n_pid;
+ s.n_1_pid = msg->n_1_pid;
+ s.mpl = msg->mpl;
+ s.state = msg->state;
+ s.qs = qos_spec_msg_to_s(msg->qos);
+
+ return s;
+}
+
layer_info_msg_t * layer_info_s_to_msg(const struct layer_info * s)
{
layer_info_msg_t * msg;
diff --git a/src/lib/serdes-irm.c b/src/lib/serdes-irm.c
new file mode 100644
index 00000000..c4ba3053
--- /dev/null
+++ b/src/lib/serdes-irm.c
@@ -0,0 +1,478 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Ouroboros IRM Protocol - serialization/deserialization
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/serdes-irm.h>
+#include <ouroboros/protobuf.h>
+
+#include <stdlib.h>
+#include <string.h>
+
+int flow_accept__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo);
+ if (timeo != NULL && msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+static int __flow_alloc_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo,
+ int msg_code)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = msg_code;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->dst = strdup(dst);
+ if (msg->dst == NULL)
+ goto fail_msg;
+
+ msg->timeo = timeo == NULL ? NULL : timespec_s_to_msg(timeo);
+ if (timeo != NULL && msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int flow_alloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo)
+{
+ return __flow_alloc_ser(buf, flow, dst, timeo,
+ IRM_MSG_CODE__IRM_FLOW_ALLOC);
+}
+
+int flow_join__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const char * dst,
+ const struct timespec * timeo)
+{
+ return __flow_alloc_ser(buf, flow, dst, timeo,
+ IRM_MSG_CODE__IRM_FLOW_JOIN);
+}
+
+int flow__irm_result_des(buffer_t * buf,
+ struct flow_info * flow,
+ buffer_t * sk)
+{
+ irm_msg_t * msg;
+ int err;
+
+ if (sk != NULL)
+ sk->data = NULL;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ if (msg->result < 0) {
+ err = msg->result;
+ goto fail;
+ }
+
+ if (msg->flow_info == NULL) {
+ err = -EBADF;
+ goto fail;
+ }
+
+ *flow = flow_info_msg_to_s(msg->flow_info);
+
+ if (flow->qs.cypher_s > 0 && sk != NULL) {
+ if (msg->symmkey.data == NULL || msg->symmkey.len == 0) {
+ err = -ECRYPT;
+ goto fail;
+ }
+
+ sk->len = msg->symmkey.len;
+ sk->data = msg->symmkey.data;
+
+ msg->symmkey.data = NULL;
+ msg->symmkey.len = 0;
+ }
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
+
+int flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ const struct timespec * timeo)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->timeo = timespec_s_to_msg(timeo);
+ if (msg->timeo == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_dealloc__irm_req_ser(buffer_t * buf,
+ const struct flow_info * flow)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+
+int ipcp_create_r__irm_req_ser(buffer_t * buf,
+ const struct ipcp_info * ipcp)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_CREATE_R;
+ msg->ipcp_info = ipcp_info_s_to_msg(ipcp);
+ if (msg->ipcp_info == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int proc_announce__irm_req_ser(buffer_t * buf,
+ const char * prog)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
+ msg->has_pid = true;
+ msg->pid = getpid();
+ msg->prog = strdup(prog);
+ if (msg->prog == NULL)
+ goto fail_msg;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int proc_exit__irm_req_ser(buffer_t * buf)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IRM_PROC_EXIT;
+ msg->has_pid = true;
+ msg->pid = getpid();
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_req_arr__irm_req_ser(buffer_t * buf,
+ const buffer_t * dst,
+ const struct flow_info * flow,
+ const buffer_t * data)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_hash = true;
+ msg->hash.len = dst->len;
+ msg->hash.data = dst->data;
+ msg->has_pk = true;
+ msg->pk.len = data->len;
+ msg->pk.data = data->data;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+
+ /* Don't free * dst or data! */
+ msg->hash.len = 0;
+ msg->hash.data = NULL;
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int ipcp_flow_alloc_reply__irm_msg_ser(buffer_t * buf,
+ const struct flow_info * flow,
+ int response,
+ const buffer_t * data)
+{
+ irm_msg_t * msg;
+ size_t len;
+
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto fail_malloc;
+
+ irm_msg__init(msg);
+
+ msg->code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
+ msg->flow_info = flow_info_s_to_msg(flow);
+ if (msg->flow_info == NULL)
+ goto fail_msg;
+
+ msg->has_pk = true;
+ msg->pk.len = data->len;
+ msg->pk.data = data->data;
+ msg->has_response = true;
+ msg->response = response;
+
+ len = irm_msg__get_packed_size(msg);
+ if (len == 0 || len > buf->len)
+ goto fail_msg;
+
+ buf->len = len;
+
+ irm_msg__pack(msg, buf->data);
+
+ /* Don't free * data! */
+ msg->pk.len = 0;
+ msg->pk.data = NULL;
+
+ irm_msg__free_unpacked(msg, NULL);
+
+ return 0;
+ fail_msg:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_malloc:
+ return -ENOMEM;
+}
+
+int irm__irm_result_des(buffer_t * buf)
+{
+ irm_msg_t * msg;
+ int err;
+
+ msg = irm_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ err = -EIRMD;
+ goto fail_msg;
+ }
+
+ if (!msg->has_result) {
+ err = -EIRMD;
+ goto fail;
+ }
+
+ err = msg->result;
+ fail:
+ irm_msg__free_unpacked(msg, NULL);
+ fail_msg:
+ return err;
+}
diff --git a/src/lib/serdes-oep.c b/src/lib/serdes-oep.c
index f92011c5..8a836b3b 100644
--- a/src/lib/serdes-oep.c
+++ b/src/lib/serdes-oep.c
@@ -25,7 +25,6 @@
#include <ouroboros/protobuf.h>
#include <ouroboros/serdes-oep.h>
-
ssize_t enroll_req_ser(const struct enroll_req * req,
buffer_t buf)
{
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 9c5b7a51..13219db0 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -21,6 +21,7 @@
*/
#include <ouroboros/errno.h>
+#include <ouroboros/pthread.h>
#include <ouroboros/sockets.h>
#include <ouroboros/utils.h>
@@ -29,7 +30,6 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
-#include <pthread.h>
#include <stdbool.h>
/* Apple doesn't support SEQPACKET. */
@@ -118,7 +118,8 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
pthread_cleanup_push(__cleanup_close_ptr, &sockfd);
- if (write(sockfd, buf, len) != -1)
+ len = write(sockfd, buf, len);
+ if (len >= 0)
len = read(sockfd, buf, SOCK_BUF_SIZE);
pthread_cleanup_pop(true);
@@ -131,6 +132,28 @@ irm_msg_t * send_recv_irm_msg(irm_msg_t * msg)
return NULL;
}
+int send_recv_msg(buffer_t * msg)
+{
+ int sockfd;
+ ssize_t len = 0;
+
+ sockfd = client_socket_open(IRM_SOCK_PATH);
+ if (sockfd < 0)
+ return -1;
+
+ pthread_cleanup_push(__cleanup_close_ptr, &sockfd);
+
+ len = write(sockfd, msg->data, msg->len);
+ if (len >= 0)
+ len = read(sockfd, msg->data, SOCK_BUF_SIZE);
+
+ pthread_cleanup_pop(true);
+
+ msg->len = (size_t) len;
+
+ return len < 0 ? -1 : 0;
+}
+
char * ipcp_sock_path(pid_t pid)
{
char * full_name = NULL;
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 40dfbb19..96f4ac47 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -178,7 +178,7 @@ static void timerwheel_move(void)
shm_du_buff_ack(r->sdb);
#endif
if (f->frcti == NULL
- || f->flow_id != r->flow_id)
+ || f->info.id != r->flow_id)
goto cleanup;
pthread_rwlock_rdlock(&r->frcti->lock);
@@ -249,7 +249,7 @@ static void timerwheel_move(void)
if (shm_rbuff_write(f->tx_rb, idx) < 0)
#endif
goto flow_down;
- shm_flow_set_notify(f->set, f->flow_id,
+ shm_flow_set_notify(f->set, f->info.id,
FLOW_PKT);
reschedule:
list_add(&r->next, &rw.rxms[lvl][rslot]);
@@ -292,7 +292,7 @@ static void timerwheel_move(void)
rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
- if (f->flow_id == a->flow_id && f->frcti != NULL)
+ if (f->info.id == a->flow_id && f->frcti != NULL)
send_frct_pkt(a->frcti);
free(a);
@@ -341,7 +341,7 @@ static int timerwheel_rxm(struct frcti * frcti,
slot = r->t0 >> RXMQ_RES;
r->fd = frcti->fd;
- r->flow_id = ai.flows[r->fd].flow_id;
+ r->flow_id = ai.flows[r->fd].info.id;
pthread_rwlock_unlock(&r->frcti->lock);
@@ -394,7 +394,7 @@ static int timerwheel_delayed_ack(int fd,
a->fd = fd;
a->frcti = frcti;
- a->flow_id = ai.flows[fd].flow_id;
+ a->flow_id = ai.flows[fd].info.id;
pthread_mutex_lock(&rw.lock);