summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2024-02-23 09:29:47 +0100
committerSander Vrijders <[email protected]>2024-02-23 16:41:37 +0100
commite6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch)
treead959d95f8fb1f6d4744c57c9027bf182bc3190b /src/lib/dev.c
parentdcefa07624926da23a559eedc3f7361ac36e8312 (diff)
downloadouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz
ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info struct/message between the components. Revises the messaging to move the use protocol buffers to its own source (serdes-irm). Adds a timeout to the IRMd flow allocator to make sure flow allocations don't hang forever (this was previously taken care of by the sanitize thread). Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c665
1 files changed, 269 insertions, 396 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9e37978c..a7f20e88 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -42,9 +42,9 @@
#include <ouroboros/fccntl.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/np1_flow.h>
-#include <ouroboros/protobuf.h>
#include <ouroboros/pthread.h>
#include <ouroboros/random.h>
+#include <ouroboros/serdes-irm.h>
#include <ouroboros/shm_flow_set.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_rbuff.h>
@@ -57,6 +57,7 @@
#ifdef HAVE_LIBGCRYPT
#include <gcrypt.h>
#endif
+#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -79,6 +80,7 @@
/* map flow_ids to flow descriptors; track state of the flow */
struct fmap {
int fd;
+ /* TODO: use actual flow state */
enum flow_state state;
};
@@ -88,12 +90,13 @@ struct fmap {
struct flow {
struct list_head next;
+ struct flow_info info;
+
struct shm_rbuff * rx_rb;
struct shm_rbuff * tx_rb;
struct shm_flow_set * set;
- int flow_id;
+
uint16_t oflags;
- qosspec_t qs;
ssize_t part_idx;
struct crypt_info crypt;
@@ -221,53 +224,32 @@ static enum flow_state flow_wait_assign(int flow_id)
return state;
}
-static int proc_announce(char * prog)
+static int proc_announce(const char * prog)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.prog = prog;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return ret;
- }
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (proc_announce__irm_req_ser(&msg, prog) < 0)
+ return -ENOMEM;
- return ret;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
+
+ return irm__irm_result_des(&msg);
}
+/* IRMd will clean up the mess if this fails */
static void proc_exit(void)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret = -1;
-
- msg.code = IRM_MSG_CODE__IRM_PROC_EXIT;
- msg.has_pid = true;
- msg.pid = getpid();
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+ if (proc_exit__irm_req_ser(&msg) < 0)
return;
- if (!recv_msg->has_result || (ret = recv_msg->result)) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return;
- }
-
- irm_msg__free_unpacked(recv_msg, NULL);
-
- return;
+ send_recv_msg(&msg);
}
#include "frct.c"
@@ -305,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow,
if (shm_rbuff_write(flow->tx_rb, idx))
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
}
@@ -323,8 +305,8 @@ static void _flow_keepalive(struct flow * flow)
s_act = flow->snd_act;
r_act = flow->rcv_act;
- flow_id = flow->flow_id;
- timeo = flow->qs.timeout;
+ flow_id = flow->info.id;
+ timeo = flow->info.qs.timeout;
acl = shm_rbuff_get_acl(flow->rx_rb);
if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
@@ -400,10 +382,10 @@ static void flow_clear(int fd)
{
memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
- ai.flows[fd].flow_id = -1;
+ ai.flows[fd].info.id = -1;
}
-static void flow_fini(int fd)
+static void __flow_fini(int fd)
{
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -414,13 +396,13 @@ static void flow_fini(int fd)
pthread_join(ai.tx, NULL);
}
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
frcti_destroy(ai.flows[fd].frcti);
}
- if (ai.flows[fd].flow_id != -1) {
- flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]);
+ if (ai.flows[fd].info.id != -1) {
+ flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]);
bmp_release(ai.fds, fd);
}
@@ -436,7 +418,7 @@ static void flow_fini(int fd)
if (ai.flows[fd].set != NULL) {
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
}
@@ -448,11 +430,17 @@ static void flow_fini(int fd)
flow_clear(fd);
}
-static int flow_init(int flow_id,
- pid_t pid,
- qosspec_t qs,
- uint8_t * s,
- time_t mpl)
+static void flow_fini(int fd)
+{
+ pthread_rwlock_wrlock(&ai.lock);
+
+ __flow_fini(fd);
+
+ pthread_rwlock_unlock(&ai.lock);
+}
+
+static int flow_init(struct flow_info * info,
+ buffer_t * sk)
{
struct timespec now;
struct flow * flow;
@@ -471,43 +459,43 @@ static int flow_init(int flow_id,
flow = &ai.flows[fd];
- flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
+ flow->info = *info;
+
+ flow->rx_rb = shm_rbuff_open(info->n_pid, info->id);
if (flow->rx_rb == NULL)
goto fail_rx_rb;
- flow->tx_rb = shm_rbuff_open(pid, flow_id);
+ flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id);
if (flow->tx_rb == NULL)
goto fail_tx_rb;
- flow->set = shm_flow_set_open(pid);
+ flow->set = shm_flow_set_open(info->n_1_pid);
if (flow->set == NULL)
goto fail_set;
- flow->flow_id = flow_id;
flow->oflags = FLOWFDEFAULT;
flow->part_idx = NO_PART;
- flow->qs = qs;
flow->snd_act = now;
flow->rcv_act = now;
- flow->crypt.flags = qs.cypher_s; /* TODO: remove cypher_s from qos */
+ flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */
- if (flow->crypt.flags > 0 && s != NULL) /* static analyzer s != NULL */
- memcpy(flow->crypt.key, s ,SYMMKEYSZ);
- else
- memset(flow->crypt.key, 0, SYMMKEYSZ);
+ memset(flow->crypt.key, 0, SYMMKEYSZ);
+
+ if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL)
+ memcpy(flow->crypt.key, sk->data , sk->len);
if (crypt_init(&flow->crypt) < 0)
goto fail_crypt;
assert(flow->frcti == NULL);
- if (flow->qs.in_order != 0) {
- flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl);
+ if (info->qs.in_order != 0) {
+ flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl);
if (flow->frcti == NULL)
goto fail_frcti;
- if (shm_flow_set_add(ai.fqset, 0, flow_id))
+ if (shm_flow_set_add(ai.fqset, 0, info->id))
goto fail_flow_set_add;
++ai.n_frcti;
@@ -518,16 +506,16 @@ static int flow_init(int flow_id,
list_add_tail(&flow->next, &ai.flow_list);
- ai.id_to_fd[flow_id].fd = fd;
+ ai.id_to_fd[info->id].fd = fd;
- flow_set_state(&ai.id_to_fd[flow_id], FLOW_ALLOCATED);
+ flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED);
pthread_rwlock_unlock(&ai.lock);
return fd;
fail_tx_thread:
- shm_flow_set_del(ai.fqset, 0, flow_id);
+ shm_flow_set_del(ai.fqset, 0, info->id);
fail_flow_set_add:
frcti_destroy(flow->frcti);
fail_frcti:
@@ -722,12 +710,12 @@ static void fini(void)
pthread_rwlock_wrlock(&ai.lock);
for (i = 0; i < PROG_MAX_FLOWS; ++i) {
- if (ai.flows[i].flow_id != -1) {
+ if (ai.flows[i].info.id != -1) {
ssize_t idx;
shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN);
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- flow_fini(i);
+ __flow_fini(i);
}
}
@@ -774,142 +762,94 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini;
int flow_accept(qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
- uint8_t * symmkey;
-
- msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
- msg.has_pid = true;
- msg.pid = getpid();
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk;
+ int fd;
+ int err;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- goto fail_recv;
-
- if (!recv_msg->has_result)
- goto fail_msg;
-
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_msg;
- }
+#ifdef QOS_DISABLE_CRC
+ if (qs != NULL)
+ qs->ber = 1;
+#endif
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl || recv_msg->qosspec == NULL)
- goto fail_msg;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- symmkey = recv_msg->has_symmkey ? recv_msg->symmkey.data : NULL;
+ if (flow_accept__irm_req_ser(&msg, &flow, timeo))
+ return -ENOMEM;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qos_spec_msg_to_s(recv_msg->qosspec),
- symmkey,
- recv_msg->mpl);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- if (fd < 0)
- return fd;
+ fd = flow_init(&flow, &sk);
- pthread_rwlock_rdlock(&ai.lock);
+ freebuf(sk);
if (qs != NULL)
- *qs = ai.flows[fd].qs;
-
- pthread_rwlock_unlock(&ai.lock);
+ *qs = flow.qs;
return fd;
-
- fail_msg:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_recv:
- return err;
}
int flow_alloc(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ buffer_t sk; /* symmetric key */
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
qs->ber = 1;
#endif
- msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
-
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
- if (recv_msg == NULL)
- goto fail_send_recv;
+ memset(&flow, 0, sizeof(flow));
- if (!recv_msg->has_result)
- goto fail_result;
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if ((qs != NULL && qs->cypher_s != 0) &&
- (!recv_msg->has_symmkey || recv_msg->symmkey.len != SYMMKEYSZ)) {
- err = -ECRYPT;
- goto fail_result;
- }
+ err = flow__irm_result_des(&msg, &flow, &sk);
+ if (err < 0)
+ return err;
- /* TODO: Make sure qosspec is set in msg */
- if (qs != NULL && recv_msg->qosspec != NULL)
- *qs = qos_spec_msg_to_s(recv_msg->qosspec);
+ fd = flow_init(&flow, &sk);
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, recv_msg->symmkey.data,
- recv_msg->mpl);
+ freebuf(sk);
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send_recv:
- return err;
}
int flow_join(const char * dst,
qosspec_t * qs,
const struct timespec * timeo)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t s[SYMMKEYSZ];
- int fd;
- int err = -EIRMD;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int fd;
+ int err;
#ifdef QOS_DISABLE_CRC
if (qs != NULL)
@@ -918,184 +858,145 @@ int flow_join(const char * dst,
if (qs != NULL && qs->cypher_s > 0)
return -ENOTSUP; /* TODO: Encrypted broadcast */
- memset(s, 0, SYMMKEYSZ);
+ memset(&flow, 0, sizeof(flow));
- msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN;
- msg.dst = (char *) dst;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs);
+ flow.n_pid = getpid();
+ flow.qs = qs == NULL ? qos_raw : *qs;
- if (timeo != NULL) {
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_sec = timeo->tv_sec;
- msg.timeo_nsec = timeo->tv_nsec;
- }
+ if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo))
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- if (recv_msg == NULL)
- goto fail_send;
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- if (!recv_msg->has_result)
- goto fail_result;
+ fd = flow_init(&flow, NULL);
- if (recv_msg->result != 0) {
- err = recv_msg->result;
- goto fail_result;
- }
-
- if (!recv_msg->has_pid || !recv_msg->has_flow_id ||
- !recv_msg->has_mpl)
- goto fail_result;
-
- fd = flow_init(recv_msg->flow_id, recv_msg->pid,
- qs == NULL ? qos_raw : *qs, s,
- recv_msg->mpl);
-
- irm_msg__free_unpacked(recv_msg, NULL);
+ if (qs != NULL)
+ *qs = flow.qs;
return fd;
-
- fail_result:
- irm_msg__free_unpacked(recv_msg, NULL);
- fail_send:
- return err;
}
+#define PKT_BUF_LEN 2048
int flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- uint8_t buf[128];
- struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
- struct flow * f;
- time_t timeo;
+ struct flow_info info;
+ uint8_t pkt[PKT_BUF_LEN];
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct timespec tic = TIMESPEC_INIT_NS(TICTIME);
+ struct timespec timeo = TIMESPEC_INIT_S(0);
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
- msg.has_flow_id = true;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_timeo_sec = true;
- msg.has_timeo_nsec = true;
- msg.timeo_nsec = 0;
+ memset(&info, 0, sizeof(flow));
- f = &ai.flows[fd];
+ flow = &ai.flows[fd];
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
-
- f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+ flow->oflags = FLOWFDEFAULT | FLOWFRNOPART;
- f->rcv_timesout = true;
- f->rcv_timeo = tic;
+ flow->rcv_timesout = true;
+ flow->rcv_timeo = tic;
pthread_rwlock_unlock(&ai.lock);
- flow_read(fd, buf, 128);
+ flow_read(fd, buf, SOCK_BUF_SIZE);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
- while (timeo < 0) { /* keep the flow active for rtx */
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
+ while (timeo.tv_sec < 0) { /* keep the flow active for rtx */
ssize_t ret;
pthread_rwlock_unlock(&ai.lock);
- ret = flow_read(fd, buf, 128);
+ ret = flow_read(fd, pkt, PKT_BUF_LEN);
pthread_rwlock_rdlock(&ai.lock);
- timeo = frcti_dealloc(f->frcti);
+ timeo.tv_sec = frcti_dealloc(flow->frcti);
- if (ret == -EFLOWDOWN && timeo < 0)
- timeo = -timeo;
+ if (ret == -EFLOWDOWN && timeo.tv_sec < 0)
+ timeo.tv_sec = -timeo.tv_sec;
}
- msg.timeo_sec = timeo;
-
pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
- shm_rbuff_fini(ai.flows[fd].tx_rb);
+ shm_rbuff_fini(flow->tx_rb);
pthread_cleanup_pop(true);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
+ info.id = flow->info.id;
+ info.n_pid = getpid();
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int ipcp_flow_dealloc(int fd)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- struct flow * f;
+ struct flow_info info;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ struct flow * flow;
+ int err;
if (fd < 0 || fd >= SYS_MAX_FLOWS )
return -EINVAL;
- msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_flow_id = true;
+ flow = &ai.flows[fd];
- f = &ai.flows[fd];
+ memset(&info, 0, sizeof(flow));
pthread_rwlock_rdlock(&ai.lock);
- if (f->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- msg.flow_id = f->flow_id;
+ info.id = flow->info.id;
+ info.n_1_pid = flow->info.n_1_pid;
pthread_rwlock_unlock(&ai.lock);
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -EIRMD;
- }
+ if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- pthread_rwlock_wrlock(&ai.lock);
+ err = irm__irm_result_des(&msg);
flow_fini(fd);
- pthread_rwlock_unlock(&ai.lock);
-
- return 0;
+ return err;
}
int fccntl(int fd,
@@ -1122,7 +1023,7 @@ int fccntl(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
va_end(l);
return -ENOTALLOC;
@@ -1167,7 +1068,7 @@ int fccntl(int fd,
qs = va_arg(l, qosspec_t *);
if (qs == NULL)
goto einval;
- *qs = flow->qs;
+ *qs = flow->info.qs;
break;
case FLOWGRXQLEN:
qlen = va_arg(l, size_t *);
@@ -1194,13 +1095,13 @@ int fccntl(int fd,
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
shm_flow_set_notify(flow->set,
- flow->flow_id,
+ flow->info.id,
FLOW_UP);
}
@@ -1302,7 +1203,7 @@ static int flow_tx_sdb(struct flow * flow,
if (crypt_encrypt(&flow->crypt, sdb) < 0)
goto enomem;
- if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && add_crc(sdb) != 0)
goto enomem;
}
@@ -1316,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
pthread_cleanup_pop(true);
@@ -1353,7 +1254,7 @@ ssize_t flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1398,7 +1299,7 @@ static bool invalid_pkt(struct flow * flow,
if (shm_du_buff_len(sdb) == 0)
return true;
- if (flow->qs.ber == 0 && chk_crc(sdb) != 0)
+ if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0)
return true;
if (crypt_decrypt(&flow->crypt, sdb) < 0)
@@ -1461,7 +1362,7 @@ ssize_t flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1627,20 +1528,20 @@ int fset_add(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
ret = -EINVAL;
goto fail;
}
if (flow->frcti != NULL)
- shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id);
- ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id);
+ ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id);
if (ret < 0)
goto fail;
if (shm_rbuff_queued(ai.flows[fd].rx_rb))
- shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1663,11 +1564,11 @@ void fset_del(struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id >= 0)
- shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
+ if (flow->info.id >= 0)
+ shm_flow_set_del(ai.fqset, set->idx, flow->info.id);
if (flow->frcti != NULL)
- shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id);
+ shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id);
pthread_rwlock_unlock(&ai.lock);
}
@@ -1682,12 +1583,12 @@ bool fset_has(const struct flow_set * set,
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return false;
}
- ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1);
+ ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1);
pthread_rwlock_unlock(&ai.lock);
@@ -1828,10 +1729,20 @@ ssize_t fevent(struct flow_set * set,
/* ipcp-dev functions. */
-int np1_flow_alloc(pid_t n_pid,
- int flow_id)
+int np1_flow_alloc(pid_t n_pid,
+ int flow_id)
{
- return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
+ struct flow_info flow;
+
+ memset(&flow, 0, sizeof(flow));
+
+ flow.id = flow_id;
+ flow.n_pid = getpid();
+ flow.qs = qos_np1;
+ flow.mpl = 0;
+ flow.n_1_pid = n_pid; /* This "flow" is upside-down! */
+
+ return flow_init(&flow, NULL);
}
int np1_flow_dealloc(int flow_id,
@@ -1874,123 +1785,85 @@ int np1_flow_resp(int flow_id)
int ipcp_create_r(const struct ipcp_info * info)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- msg.code = IRM_MSG_CODE__IPCP_CREATE_R;
- msg.ipcp_info = ipcp_info_s_to_msg(info);
+ if (ipcp_create_r__irm_req_ser(&msg,info) < 0)
+ return -ENOMEM;
- recv_msg = send_recv_irm_msg(&msg);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- ipcp_info_msg__free_unpacked(msg.ipcp_info, NULL);
+ return irm__irm_result_des(&msg);
+}
- if (recv_msg == NULL)
- return -EIRMD;
+int ipcp_flow_req_arr(const buffer_t * dst,
+ qosspec_t qs,
+ time_t mpl,
+ const buffer_t * data)
+{
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ memset(&flow, 0, sizeof(flow));
- ret = recv_msg->result;
- irm_msg__free_unpacked(recv_msg, NULL);
+ assert(dst != NULL && dst->len != 0 && dst->data != NULL);
- return ret;
-}
+ flow.n_1_pid = getpid();
+ flow.qs = qs;
+ flow.mpl = mpl;
-int ipcp_flow_req_arr(const uint8_t * dst,
- size_t len,
- qosspec_t qs,
- time_t mpl,
- const void * data,
- size_t dlen)
-{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int fd;
-
- assert(dst != NULL);
-
- msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR;
- msg.has_pid = true;
- msg.pid = getpid();
- msg.has_hash = true;
- msg.hash.len = len;
- msg.hash.data = (uint8_t *) dst;
- msg.qosspec = qos_spec_s_to_msg(&qs);
- msg.has_mpl = true;
- msg.mpl = mpl;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = dlen;
-
- recv_msg = send_recv_irm_msg(&msg);
- qosspec_msg__free_unpacked(msg.qosspec, NULL);
-
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_flow_id || !recv_msg->has_pid) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0)
+ return -ENOMEM;
- if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
+ err = flow__irm_result_des(&msg, &flow, NULL);
+ if (err < 0)
+ return err;
- irm_msg__free_unpacked(recv_msg, NULL);
+ /* inverted for np1_flow */
+ flow.n_1_pid = flow.n_pid;
+ flow.n_pid = getpid();
+ flow.mpl = 0;
- return fd;
+ return flow_init(&flow, NULL);
}
-int ipcp_flow_alloc_reply(int fd,
- int response,
- time_t mpl,
- const void * data,
- size_t len)
+int ipcp_flow_alloc_reply(int fd,
+ int response,
+ time_t mpl,
+ const buffer_t * data)
{
- irm_msg_t msg = IRM_MSG__INIT;
- irm_msg_t * recv_msg;
- int ret;
+ struct flow_info flow;
+ uint8_t buf[SOCK_BUF_SIZE];
+ buffer_t msg = {buf, SOCK_BUF_SIZE};
+ int err;
assert(fd >= 0 && fd < SYS_MAX_FLOWS);
- msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
- msg.has_flow_id = true;
- msg.has_pk = true;
- msg.pk.data = (uint8_t *) data;
- msg.pk.len = (uint32_t) len;
- msg.has_mpl = true;
- msg.mpl = mpl;
-
pthread_rwlock_rdlock(&ai.lock);
- msg.flow_id = ai.flows[fd].flow_id;
+ flow.id = ai.flows[fd].info.id;
pthread_rwlock_unlock(&ai.lock);
- msg.has_response = true;
- msg.response = response;
-
- recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
- return -EIRMD;
-
- if (!recv_msg->has_result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
+ flow.mpl = mpl;
- ret = recv_msg->result;
+ if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0)
+ return -ENOMEM;
- irm_msg__free_unpacked(recv_msg, NULL);
+ err = send_recv_msg(&msg);
+ if (err < 0)
+ return err;
- return ret;
+ return irm__irm_result_des(&msg);
}
int ipcp_flow_read(int fd,
@@ -2006,7 +1879,7 @@ int ipcp_flow_read(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
while (frcti_queued_pdu(flow->frcti) < 0) {
pthread_rwlock_unlock(&ai.lock);
@@ -2038,7 +1911,7 @@ int ipcp_flow_write(int fd,
pthread_rwlock_wrlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2066,7 +1939,7 @@ int np1_flow_read(int fd,
flow = &ai.flows[fd];
- assert(flow->flow_id >= 0);
+ assert(flow->info.id >= 0);
pthread_rwlock_rdlock(&ai.lock);
@@ -2097,7 +1970,7 @@ int np1_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -2115,7 +1988,7 @@ int np1_flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
return ret;
}
@@ -2139,7 +2012,7 @@ int ipcp_flow_fini(int fd)
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].flow_id < 0) {
+ if (ai.flows[fd].info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -1;
}
@@ -2148,7 +2021,7 @@ int ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
shm_flow_set_notify(ai.flows[fd].set,
- ai.flows[fd].flow_id,
+ ai.flows[fd].info.id,
FLOW_DEALLOC);
rx_rb = ai.flows[fd].rx_rb;
@@ -2169,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd,
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
- *cube = qos_spec_to_cube(ai.flows[fd].qs);
+ *cube = qos_spec_to_cube(ai.flows[fd].info.qs);
pthread_rwlock_unlock(&ai.lock);
@@ -2184,7 +2057,7 @@ size_t ipcp_flow_queued(int fd)
pthread_rwlock_rdlock(&ai.lock);
- assert(ai.flows[fd].flow_id >= 0);
+ assert(ai.flows[fd].info.id >= 0);
q = shm_rbuff_queued(ai.flows[fd].tx_rb);
@@ -2220,14 +2093,14 @@ int local_flow_write(int fd,
pthread_rwlock_rdlock(&ai.lock);
- if (flow->flow_id < 0) {
+ if (flow->info.id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
if (ret == 0)
- shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
+ shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT);
else
shm_rdrbuff_remove(ai.rdrb, idx);