summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2023-03-14 12:50:22 +0100
committerSander Vrijders <[email protected]>2023-03-18 17:12:26 +0100
commitee196e9a00475a029018181f8d6a00106ee462ce (patch)
tree8cb703ebaed3f8b5d7b71bf263710f9ed5edb6a1 /src/irmd/main.c
parent975a3ad0c761f5603a5026a56d825ba0ccb591c9 (diff)
downloadouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.tar.gz
ouroboros-ee196e9a00475a029018181f8d6a00106ee462ce.zip
lib: Split flow_alloc from flow_join
Better to keep these separate during IRMd revision. Moves the qosspec default out of the protobuf message parsing. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c129
1 files changed, 104 insertions, 25 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index b2f83388..5ef9a82e 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1415,12 +1415,103 @@ static int flow_accept(pid_t pid,
return 0;
}
+static int flow_join(pid_t pid,
+ const char * dst,
+ qosspec_t qs,
+ struct timespec * timeo,
+ struct irm_flow * f_out)
+{
+ struct irm_flow * f;
+ struct ipcp_entry * ipcp;
+ int flow_id;
+ int state;
+ uint8_t * hash;
+
+ log_info("Allocating flow for %d to %s.", pid, dst);
+
+ ipcp = get_ipcp_entry_by_layer(dst);
+ if (ipcp == NULL) {
+ log_info("Layer %s unreachable.", dst);
+ return -1;
+ }
+
+ pthread_rwlock_wrlock(&irmd.flows_lock);
+
+ flow_id = bmp_allocate(irmd.flow_ids);
+ if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) {
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ log_err("Could not allocate flow_id.");
+ return -EBADF;
+ }
+
+ f = irm_flow_create(pid, ipcp->pid, flow_id, qs);
+ if (f == NULL) {
+ bmp_release(irmd.flow_ids, flow_id);
+ pthread_rwlock_unlock(&irmd.flows_lock);
+ log_err("Could not allocate flow_id.");
+ return -ENOMEM;
+ }
+
+ list_add(&f->next, &irmd.irm_flows);
+
+ pthread_rwlock_unlock(&irmd.flows_lock);
+
+ assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
+
+ hash = malloc(IPCP_HASH_LEN(ipcp));
+ if (hash == NULL)
+ /* sanitizer cleans this */
+ return -ENOMEM;
+
+ str_hash(ipcp->dir_hash_algo, hash, dst);
+
+ if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs)) {
+ irm_flow_set_state(f, FLOW_NULL);
+ /* sanitizer cleans this */
+ log_info("Flow_join failed.");
+ free(hash);
+ return -EAGAIN;
+ }
+
+ free(hash);
+
+ state = irm_flow_wait_state(f, FLOW_ALLOCATED, timeo);
+ if (state != FLOW_ALLOCATED) {
+ if (state == -ETIMEDOUT) {
+ log_dbg("Flow allocation timed out");
+ return -ETIMEDOUT;
+ }
+
+ log_info("Pending flow to %s torn down.", dst);
+ return -EPIPE;
+ }
+
+ pthread_rwlock_wrlock(&irmd.flows_lock);
+
+ assert(irm_flow_get_state(f) == FLOW_ALLOCATED);
+
+ f_out->flow_id = f->flow_id;
+ f_out->n_pid = f->n_pid;
+ f_out->n_1_pid = f->n_1_pid;
+ f_out->data = f->data; /* pass owner */
+ f_out->len = f->len;
+ f_out->mpl = f->mpl;
+ f->data = NULL;
+ f->len = 0;
+
+ pthread_rwlock_unlock(&irmd.flows_lock);
+
+ log_info("Flow on flow_id %d allocated.", flow_id);
+
+ return 0;
+}
+
static int flow_alloc(pid_t pid,
const char * dst,
qosspec_t qs,
struct timespec * timeo,
struct irm_flow * f_out,
- bool join,
const void * data,
size_t len)
{
@@ -1432,8 +1523,7 @@ static int flow_alloc(pid_t pid,
log_info("Allocating flow for %d to %s.", pid, dst);
- ipcp = join ? get_ipcp_entry_by_layer(dst)
- : get_ipcp_by_dst_name(dst, pid);
+ ipcp = get_ipcp_by_dst_name(dst, pid);
if (ipcp == NULL) {
log_info("Destination %s unreachable.", dst);
return -1;
@@ -1469,24 +1559,13 @@ static int flow_alloc(pid_t pid,
str_hash(ipcp->dir_hash_algo, hash, dst);
- if (join) {
- if (ipcp_flow_join(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs)) {
- irm_flow_set_state(f, FLOW_NULL);
- /* sanitizer cleans this */
- log_info("Flow_join failed.");
- free(hash);
- return -EAGAIN;
- }
- } else {
- if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
- IPCP_HASH_LEN(ipcp), qs, data, len)) {
- irm_flow_set_state(f, FLOW_NULL);
- /* sanitizer cleans this */
- log_info("Flow_allocation failed.");
- free(hash);
- return -EAGAIN;
- }
+ if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash,
+ IPCP_HASH_LEN(ipcp), qs, data, len)) {
+ irm_flow_set_state(f, FLOW_NULL);
+ /* sanitizer cleans this */
+ log_info("Flow_allocation failed.");
+ free(hash);
+ return -EAGAIN;
}
free(hash);
@@ -2136,7 +2215,7 @@ static void * mainloop(void * o)
: msg->pk.data == NULL);
result = flow_alloc(msg->pid, msg->dst,
qos_spec_msg_to_s(msg->qosspec),
- timeo, &e, false, msg->pk.data,
+ timeo, &e, msg->pk.data,
msg->pk.len);
if (result == 0) {
ret_msg->has_flow_id = true;
@@ -2152,9 +2231,9 @@ static void * mainloop(void * o)
break;
case IRM_MSG_CODE__IRM_FLOW_JOIN:
assert(msg->pk.len == 0 && msg->pk.data == NULL);
- result = flow_alloc(msg->pid, msg->dst,
- qos_spec_msg_to_s(msg->qosspec),
- timeo, &e, true, NULL, 0);
+ result = flow_join(msg->pid, msg->dst,
+ qos_spec_msg_to_s(msg->qosspec),
+ timeo, &e);
if (result == 0) {
ret_msg->has_flow_id = true;
ret_msg->flow_id = e.flow_id;