diff options
author | Dimitri Staessens <[email protected]> | 2024-02-23 09:29:47 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2024-02-23 16:41:37 +0100 |
commit | e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f (patch) | |
tree | ad959d95f8fb1f6d4744c57c9027bf182bc3190b /src/lib/dev.c | |
parent | dcefa07624926da23a559eedc3f7361ac36e8312 (diff) | |
download | ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.tar.gz ouroboros-e6c2d4c9c6b8b12bbcf7bc8bd494b3ba56133e1f.zip |
lib: Revise app flow allocation
This revises the application flow allocator to use the flow_info
struct/message between the components. Revises the messaging to move
the use protocol buffers to its own source (serdes-irm).
Adds a timeout to the IRMd flow allocator to make sure flow
allocations don't hang forever (this was previously taken care of by
the sanitize thread).
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r-- | src/lib/dev.c | 665 |
1 files changed, 269 insertions, 396 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c index 9e37978c..a7f20e88 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -42,9 +42,9 @@ #include <ouroboros/fccntl.h> #include <ouroboros/bitmap.h> #include <ouroboros/np1_flow.h> -#include <ouroboros/protobuf.h> #include <ouroboros/pthread.h> #include <ouroboros/random.h> +#include <ouroboros/serdes-irm.h> #include <ouroboros/shm_flow_set.h> #include <ouroboros/shm_rdrbuff.h> #include <ouroboros/shm_rbuff.h> @@ -57,6 +57,7 @@ #ifdef HAVE_LIBGCRYPT #include <gcrypt.h> #endif +#include <assert.h> #include <stdlib.h> #include <string.h> #include <stdio.h> @@ -79,6 +80,7 @@ /* map flow_ids to flow descriptors; track state of the flow */ struct fmap { int fd; + /* TODO: use actual flow state */ enum flow_state state; }; @@ -88,12 +90,13 @@ struct fmap { struct flow { struct list_head next; + struct flow_info info; + struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int flow_id; + uint16_t oflags; - qosspec_t qs; ssize_t part_idx; struct crypt_info crypt; @@ -221,53 +224,32 @@ static enum flow_state flow_wait_assign(int flow_id) return state; } -static int proc_announce(char * prog) +static int proc_announce(const char * prog) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret = -1; - - msg.code = IRM_MSG_CODE__IRM_PROC_ANNOUNCE; - msg.has_pid = true; - msg.pid = getpid(); - msg.prog = prog; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return ret; - } + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - irm_msg__free_unpacked(recv_msg, NULL); + if (proc_announce__irm_req_ser(&msg, prog) < 0) + return -ENOMEM; - return ret; + err = send_recv_msg(&msg); + if (err < 0) + return err; + + return irm__irm_result_des(&msg); } +/* IRMd will clean up the mess if this fails */ static void proc_exit(void) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret = -1; - - msg.code = IRM_MSG_CODE__IRM_PROC_EXIT; - msg.has_pid = true; - msg.pid = getpid(); + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) + if (proc_exit__irm_req_ser(&msg) < 0) return; - if (!recv_msg->has_result || (ret = recv_msg->result)) { - irm_msg__free_unpacked(recv_msg, NULL); - return; - } - - irm_msg__free_unpacked(recv_msg, NULL); - - return; + send_recv_msg(&msg); } #include "frct.c" @@ -305,7 +287,7 @@ static void flow_send_keepalive(struct flow * flow, if (shm_rbuff_write(flow->tx_rb, idx)) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); } @@ -323,8 +305,8 @@ static void _flow_keepalive(struct flow * flow) s_act = flow->snd_act; r_act = flow->rcv_act; - flow_id = flow->flow_id; - timeo = flow->qs.timeout; + flow_id = flow->info.id; + timeo = flow->info.qs.timeout; acl = shm_rbuff_get_acl(flow->rx_rb); if (timeo == 0 || acl & (ACL_FLOWPEER | ACL_FLOWDOWN)) @@ -400,10 +382,10 @@ static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].flow_id = -1; + ai.flows[fd].info.id = -1; } -static void flow_fini(int fd) +static void __flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); @@ -414,13 +396,13 @@ static void flow_fini(int fd) pthread_join(ai.tx, NULL); } - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); frcti_destroy(ai.flows[fd].frcti); } - if (ai.flows[fd].flow_id != -1) { - flow_destroy(&ai.id_to_fd[ai.flows[fd].flow_id]); + if (ai.flows[fd].info.id != -1) { + flow_destroy(&ai.id_to_fd[ai.flows[fd].info.id]); bmp_release(ai.fds, fd); } @@ -436,7 +418,7 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) { shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + ai.flows[fd].info.id, FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); } @@ -448,11 +430,17 @@ static void flow_fini(int fd) flow_clear(fd); } -static int flow_init(int flow_id, - pid_t pid, - qosspec_t qs, - uint8_t * s, - time_t mpl) +static void flow_fini(int fd) +{ + pthread_rwlock_wrlock(&ai.lock); + + __flow_fini(fd); + + pthread_rwlock_unlock(&ai.lock); +} + +static int flow_init(struct flow_info * info, + buffer_t * sk) { struct timespec now; struct flow * flow; @@ -471,43 +459,43 @@ static int flow_init(int flow_id, flow = &ai.flows[fd]; - flow->rx_rb = shm_rbuff_open(getpid(), flow_id); + flow->info = *info; + + flow->rx_rb = shm_rbuff_open(info->n_pid, info->id); if (flow->rx_rb == NULL) goto fail_rx_rb; - flow->tx_rb = shm_rbuff_open(pid, flow_id); + flow->tx_rb = shm_rbuff_open(info->n_1_pid, info->id); if (flow->tx_rb == NULL) goto fail_tx_rb; - flow->set = shm_flow_set_open(pid); + flow->set = shm_flow_set_open(info->n_1_pid); if (flow->set == NULL) goto fail_set; - flow->flow_id = flow_id; flow->oflags = FLOWFDEFAULT; flow->part_idx = NO_PART; - flow->qs = qs; flow->snd_act = now; flow->rcv_act = now; - flow->crypt.flags = qs.cypher_s; /* TODO: remove cypher_s from qos */ + flow->crypt.flags = info->qs.cypher_s; /* TODO: move cypher_s */ - if (flow->crypt.flags > 0 && s != NULL) /* static analyzer s != NULL */ - memcpy(flow->crypt.key, s ,SYMMKEYSZ); - else - memset(flow->crypt.key, 0, SYMMKEYSZ); + memset(flow->crypt.key, 0, SYMMKEYSZ); + + if (flow->crypt.flags > 0 && sk!= NULL && sk->data != NULL) + memcpy(flow->crypt.key, sk->data , sk->len); if (crypt_init(&flow->crypt) < 0) goto fail_crypt; assert(flow->frcti == NULL); - if (flow->qs.in_order != 0) { - flow->frcti = frcti_create(fd, DELT_A, DELT_R, mpl); + if (info->qs.in_order != 0) { + flow->frcti = frcti_create(fd, DELT_A, DELT_R, info->mpl); if (flow->frcti == NULL) goto fail_frcti; - if (shm_flow_set_add(ai.fqset, 0, flow_id)) + if (shm_flow_set_add(ai.fqset, 0, info->id)) goto fail_flow_set_add; ++ai.n_frcti; @@ -518,16 +506,16 @@ static int flow_init(int flow_id, list_add_tail(&flow->next, &ai.flow_list); - ai.id_to_fd[flow_id].fd = fd; + ai.id_to_fd[info->id].fd = fd; - flow_set_state(&ai.id_to_fd[flow_id], FLOW_ALLOCATED); + flow_set_state(&ai.id_to_fd[info->id], FLOW_ALLOCATED); pthread_rwlock_unlock(&ai.lock); return fd; fail_tx_thread: - shm_flow_set_del(ai.fqset, 0, flow_id); + shm_flow_set_del(ai.fqset, 0, info->id); fail_flow_set_add: frcti_destroy(flow->frcti); fail_frcti: @@ -722,12 +710,12 @@ static void fini(void) pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].flow_id != -1) { + if (ai.flows[i].info.id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - flow_fini(i); + __flow_fini(i); } } @@ -774,142 +762,94 @@ __attribute__((section(FINI_SECTION))) __typeof__(fini) * __fini = fini; int flow_accept(qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - int err = -EIRMD; - uint8_t * symmkey; - - msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT; - msg.has_pid = true; - msg.pid = getpid(); + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; + int fd; + int err; - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - goto fail_recv; - - if (!recv_msg->has_result) - goto fail_msg; - - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_msg; - } +#ifdef QOS_DISABLE_CRC + if (qs != NULL) + qs->ber = 1; +#endif + memset(&flow, 0, sizeof(flow)); - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl || recv_msg->qosspec == NULL) - goto fail_msg; + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - symmkey = recv_msg->has_symmkey ? recv_msg->symmkey.data : NULL; + if (flow_accept__irm_req_ser(&msg, &flow, timeo)) + return -ENOMEM; - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qos_spec_msg_to_s(recv_msg->qosspec), - symmkey, - recv_msg->mpl); + err = send_recv_msg(&msg); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; - if (fd < 0) - return fd; + fd = flow_init(&flow, &sk); - pthread_rwlock_rdlock(&ai.lock); + freebuf(sk); if (qs != NULL) - *qs = ai.flows[fd].qs; - - pthread_rwlock_unlock(&ai.lock); + *qs = flow.qs; return fd; - - fail_msg: - irm_msg__free_unpacked(recv_msg, NULL); - fail_recv: - return err; } int flow_alloc(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - int err = -EIRMD; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + buffer_t sk; /* symmetric key */ + int fd; + int err; #ifdef QOS_DISABLE_CRC if (qs != NULL) qs->ber = 1; #endif - msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.pid = getpid(); - msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); - - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); - if (recv_msg == NULL) - goto fail_send_recv; + memset(&flow, 0, sizeof(flow)); - if (!recv_msg->has_result) - goto fail_result; + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_result; - } + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl) - goto fail_result; + err = send_recv_msg(&msg); + if (err < 0) + return err; - if ((qs != NULL && qs->cypher_s != 0) && - (!recv_msg->has_symmkey || recv_msg->symmkey.len != SYMMKEYSZ)) { - err = -ECRYPT; - goto fail_result; - } + err = flow__irm_result_des(&msg, &flow, &sk); + if (err < 0) + return err; - /* TODO: Make sure qosspec is set in msg */ - if (qs != NULL && recv_msg->qosspec != NULL) - *qs = qos_spec_msg_to_s(recv_msg->qosspec); + fd = flow_init(&flow, &sk); - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qs == NULL ? qos_raw : *qs, recv_msg->symmkey.data, - recv_msg->mpl); + freebuf(sk); - irm_msg__free_unpacked(recv_msg, NULL); + if (qs != NULL) + *qs = flow.qs; return fd; - - fail_result: - irm_msg__free_unpacked(recv_msg, NULL); - fail_send_recv: - return err; } int flow_join(const char * dst, qosspec_t * qs, const struct timespec * timeo) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - uint8_t s[SYMMKEYSZ]; - int fd; - int err = -EIRMD; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int fd; + int err; #ifdef QOS_DISABLE_CRC if (qs != NULL) @@ -918,184 +858,145 @@ int flow_join(const char * dst, if (qs != NULL && qs->cypher_s > 0) return -ENOTSUP; /* TODO: Encrypted broadcast */ - memset(s, 0, SYMMKEYSZ); + memset(&flow, 0, sizeof(flow)); - msg.code = IRM_MSG_CODE__IRM_FLOW_JOIN; - msg.dst = (char *) dst; - msg.has_pid = true; - msg.pid = getpid(); - msg.qosspec = qos_spec_s_to_msg(qs == NULL ? &qos_raw : qs); + flow.n_pid = getpid(); + flow.qs = qs == NULL ? qos_raw : *qs; - if (timeo != NULL) { - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_sec = timeo->tv_sec; - msg.timeo_nsec = timeo->tv_nsec; - } + if (flow_alloc__irm_req_ser(&msg, &flow, dst, timeo)) + return -ENOMEM; - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - if (recv_msg == NULL) - goto fail_send; + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; - if (!recv_msg->has_result) - goto fail_result; + fd = flow_init(&flow, NULL); - if (recv_msg->result != 0) { - err = recv_msg->result; - goto fail_result; - } - - if (!recv_msg->has_pid || !recv_msg->has_flow_id || - !recv_msg->has_mpl) - goto fail_result; - - fd = flow_init(recv_msg->flow_id, recv_msg->pid, - qs == NULL ? qos_raw : *qs, s, - recv_msg->mpl); - - irm_msg__free_unpacked(recv_msg, NULL); + if (qs != NULL) + *qs = flow.qs; return fd; - - fail_result: - irm_msg__free_unpacked(recv_msg, NULL); - fail_send: - return err; } +#define PKT_BUF_LEN 2048 int flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - uint8_t buf[128]; - struct timespec tic = TIMESPEC_INIT_NS(TICTIME); - struct flow * f; - time_t timeo; + struct flow_info info; + uint8_t pkt[PKT_BUF_LEN]; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct timespec tic = TIMESPEC_INIT_NS(TICTIME); + struct timespec timeo = TIMESPEC_INIT_S(0); + struct flow * flow; + int err; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_flow_id = true; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_timeo_sec = true; - msg.has_timeo_nsec = true; - msg.timeo_nsec = 0; + memset(&info, 0, sizeof(flow)); - f = &ai.flows[fd]; + flow = &ai.flows[fd]; pthread_rwlock_rdlock(&ai.lock); - if (f->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = f->flow_id; - - f->oflags = FLOWFDEFAULT | FLOWFRNOPART; + flow->oflags = FLOWFDEFAULT | FLOWFRNOPART; - f->rcv_timesout = true; - f->rcv_timeo = tic; + flow->rcv_timesout = true; + flow->rcv_timeo = tic; pthread_rwlock_unlock(&ai.lock); - flow_read(fd, buf, 128); + flow_read(fd, buf, SOCK_BUF_SIZE); pthread_rwlock_rdlock(&ai.lock); - timeo = frcti_dealloc(f->frcti); - while (timeo < 0) { /* keep the flow active for rtx */ + timeo.tv_sec = frcti_dealloc(flow->frcti); + while (timeo.tv_sec < 0) { /* keep the flow active for rtx */ ssize_t ret; pthread_rwlock_unlock(&ai.lock); - ret = flow_read(fd, buf, 128); + ret = flow_read(fd, pkt, PKT_BUF_LEN); pthread_rwlock_rdlock(&ai.lock); - timeo = frcti_dealloc(f->frcti); + timeo.tv_sec = frcti_dealloc(flow->frcti); - if (ret == -EFLOWDOWN && timeo < 0) - timeo = -timeo; + if (ret == -EFLOWDOWN && timeo.tv_sec < 0) + timeo.tv_sec = -timeo.tv_sec; } - msg.timeo_sec = timeo; - pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock); - shm_rbuff_fini(ai.flows[fd].tx_rb); + shm_rbuff_fini(flow->tx_rb); pthread_cleanup_pop(true); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; + info.id = flow->info.id; + info.n_pid = getpid(); - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (flow_dealloc__irm_req_ser(&msg, &info, &timeo) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); - - return 0; + return err; } int ipcp_flow_dealloc(int fd) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - struct flow * f; + struct flow_info info; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + struct flow * flow; + int err; if (fd < 0 || fd >= SYS_MAX_FLOWS ) return -EINVAL; - msg.code = IRM_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_flow_id = true; + flow = &ai.flows[fd]; - f = &ai.flows[fd]; + memset(&info, 0, sizeof(flow)); pthread_rwlock_rdlock(&ai.lock); - if (f->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } - msg.flow_id = f->flow_id; + info.id = flow->info.id; + info.n_1_pid = flow->info.n_1_pid; pthread_rwlock_unlock(&ai.lock); - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -EIRMD; - } + if (ipcp_flow_dealloc__irm_req_ser(&msg, &info) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - pthread_rwlock_wrlock(&ai.lock); + err = irm__irm_result_des(&msg); flow_fini(fd); - pthread_rwlock_unlock(&ai.lock); - - return 0; + return err; } int fccntl(int fd, @@ -1122,7 +1023,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -1167,7 +1068,7 @@ int fccntl(int fd, qs = va_arg(l, qosspec_t *); if (qs == NULL) goto einval; - *qs = flow->qs; + *qs = flow->info.qs; break; case FLOWGRXQLEN: qlen = va_arg(l, size_t *); @@ -1194,13 +1095,13 @@ int fccntl(int fd, rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->flow_id, + flow->info.id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->flow_id, + flow->info.id, FLOW_UP); } @@ -1302,7 +1203,7 @@ static int flow_tx_sdb(struct flow * flow, if (crypt_encrypt(&flow->crypt, sdb) < 0) goto enomem; - if (flow->qs.ber == 0 && add_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && add_crc(sdb) != 0) goto enomem; } @@ -1316,7 +1217,7 @@ static int flow_tx_sdb(struct flow * flow, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); pthread_cleanup_pop(true); @@ -1353,7 +1254,7 @@ ssize_t flow_write(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1398,7 +1299,7 @@ static bool invalid_pkt(struct flow * flow, if (shm_du_buff_len(sdb) == 0) return true; - if (flow->qs.ber == 0 && chk_crc(sdb) != 0) + if (flow->info.qs.ber == 0 && chk_crc(sdb) != 0) return true; if (crypt_decrypt(&flow->crypt, sdb) < 0) @@ -1461,7 +1362,7 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1627,20 +1528,20 @@ int fset_add(struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { ret = -EINVAL; goto fail; } if (flow->frcti != NULL) - shm_flow_set_del(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_del(ai.fqset, 0, ai.flows[fd].info.id); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].info.id); if (ret < 0) goto fail; if (shm_rbuff_queued(ai.flows[fd].rx_rb)) - shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); + shm_flow_set_notify(ai.fqset, ai.flows[fd].info.id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1663,11 +1564,11 @@ void fset_del(struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, flow->flow_id); + if (flow->info.id >= 0) + shm_flow_set_del(ai.fqset, set->idx, flow->info.id); if (flow->frcti != NULL) - shm_flow_set_add(ai.fqset, 0, ai.flows[fd].flow_id); + shm_flow_set_add(ai.fqset, 0, ai.flows[fd].info.id); pthread_rwlock_unlock(&ai.lock); } @@ -1682,12 +1583,12 @@ bool fset_has(const struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (ai.flows[fd].info.id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].info.id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1828,10 +1729,20 @@ ssize_t fevent(struct flow_set * set, /* ipcp-dev functions. */ -int np1_flow_alloc(pid_t n_pid, - int flow_id) +int np1_flow_alloc(pid_t n_pid, + int flow_id) { - return flow_init(flow_id, n_pid, qos_np1, NULL, 0); + struct flow_info flow; + + memset(&flow, 0, sizeof(flow)); + + flow.id = flow_id; + flow.n_pid = getpid(); + flow.qs = qos_np1; + flow.mpl = 0; + flow.n_1_pid = n_pid; /* This "flow" is upside-down! */ + + return flow_init(&flow, NULL); } int np1_flow_dealloc(int flow_id, @@ -1874,123 +1785,85 @@ int np1_flow_resp(int flow_id) int ipcp_create_r(const struct ipcp_info * info) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - msg.code = IRM_MSG_CODE__IPCP_CREATE_R; - msg.ipcp_info = ipcp_info_s_to_msg(info); + if (ipcp_create_r__irm_req_ser(&msg,info) < 0) + return -ENOMEM; - recv_msg = send_recv_irm_msg(&msg); + err = send_recv_msg(&msg); + if (err < 0) + return err; - ipcp_info_msg__free_unpacked(msg.ipcp_info, NULL); + return irm__irm_result_des(&msg); +} - if (recv_msg == NULL) - return -EIRMD; +int ipcp_flow_req_arr(const buffer_t * dst, + qosspec_t qs, + time_t mpl, + const buffer_t * data) +{ + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + memset(&flow, 0, sizeof(flow)); - ret = recv_msg->result; - irm_msg__free_unpacked(recv_msg, NULL); + assert(dst != NULL && dst->len != 0 && dst->data != NULL); - return ret; -} + flow.n_1_pid = getpid(); + flow.qs = qs; + flow.mpl = mpl; -int ipcp_flow_req_arr(const uint8_t * dst, - size_t len, - qosspec_t qs, - time_t mpl, - const void * data, - size_t dlen) -{ - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int fd; - - assert(dst != NULL); - - msg.code = IRM_MSG_CODE__IPCP_FLOW_REQ_ARR; - msg.has_pid = true; - msg.pid = getpid(); - msg.has_hash = true; - msg.hash.len = len; - msg.hash.data = (uint8_t *) dst; - msg.qosspec = qos_spec_s_to_msg(&qs); - msg.has_mpl = true; - msg.mpl = mpl; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = dlen; - - recv_msg = send_recv_irm_msg(&msg); - qosspec_msg__free_unpacked(msg.qosspec, NULL); - - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_flow_id || !recv_msg->has_pid) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + if (ipcp_flow_req_arr__irm_req_ser(&msg, dst, &flow, data) < 0) + return -ENOMEM; - if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + err = send_recv_msg(&msg); + if (err < 0) + return err; - fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0); + err = flow__irm_result_des(&msg, &flow, NULL); + if (err < 0) + return err; - irm_msg__free_unpacked(recv_msg, NULL); + /* inverted for np1_flow */ + flow.n_1_pid = flow.n_pid; + flow.n_pid = getpid(); + flow.mpl = 0; - return fd; + return flow_init(&flow, NULL); } -int ipcp_flow_alloc_reply(int fd, - int response, - time_t mpl, - const void * data, - size_t len) +int ipcp_flow_alloc_reply(int fd, + int response, + time_t mpl, + const buffer_t * data) { - irm_msg_t msg = IRM_MSG__INIT; - irm_msg_t * recv_msg; - int ret; + struct flow_info flow; + uint8_t buf[SOCK_BUF_SIZE]; + buffer_t msg = {buf, SOCK_BUF_SIZE}; + int err; assert(fd >= 0 && fd < SYS_MAX_FLOWS); - msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_flow_id = true; - msg.has_pk = true; - msg.pk.data = (uint8_t *) data; - msg.pk.len = (uint32_t) len; - msg.has_mpl = true; - msg.mpl = mpl; - pthread_rwlock_rdlock(&ai.lock); - msg.flow_id = ai.flows[fd].flow_id; + flow.id = ai.flows[fd].info.id; pthread_rwlock_unlock(&ai.lock); - msg.has_response = true; - msg.response = response; - - recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) - return -EIRMD; - - if (!recv_msg->has_result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; - } + flow.mpl = mpl; - ret = recv_msg->result; + if (ipcp_flow_alloc_reply__irm_msg_ser(&msg, &flow, response, data) < 0) + return -ENOMEM; - irm_msg__free_unpacked(recv_msg, NULL); + err = send_recv_msg(&msg); + if (err < 0) + return err; - return ret; + return irm__irm_result_des(&msg); } int ipcp_flow_read(int fd, @@ -2006,7 +1879,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); while (frcti_queued_pdu(flow->frcti) < 0) { pthread_rwlock_unlock(&ai.lock); @@ -2038,7 +1911,7 @@ int ipcp_flow_write(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -2066,7 +1939,7 @@ int np1_flow_read(int fd, flow = &ai.flows[fd]; - assert(flow->flow_id >= 0); + assert(flow->info.id >= 0); pthread_rwlock_rdlock(&ai.lock); @@ -2097,7 +1970,7 @@ int np1_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -2115,7 +1988,7 @@ int np1_flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); return ret; } @@ -2139,7 +2012,7 @@ int ipcp_flow_fini(int fd) pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].flow_id < 0) { + if (ai.flows[fd].info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -1; } @@ -2148,7 +2021,7 @@ int ipcp_flow_fini(int fd) shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].flow_id, + ai.flows[fd].info.id, FLOW_DEALLOC); rx_rb = ai.flows[fd].rx_rb; @@ -2169,9 +2042,9 @@ int ipcp_flow_get_qoscube(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(ai.flows[fd].info.id >= 0); - *cube = qos_spec_to_cube(ai.flows[fd].qs); + *cube = qos_spec_to_cube(ai.flows[fd].info.qs); pthread_rwlock_unlock(&ai.lock); @@ -2184,7 +2057,7 @@ size_t ipcp_flow_queued(int fd) pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].flow_id >= 0); + assert(ai.flows[fd].info.id >= 0); q = shm_rbuff_queued(ai.flows[fd].tx_rb); @@ -2220,14 +2093,14 @@ int local_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->flow_id < 0) { + if (flow->info.id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL); if (ret == 0) - shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->info.id, FLOW_PKT); else shm_rdrbuff_remove(ai.rdrb, idx); |