summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-10-04 15:23:54 +0200
committerSander Vrijders <[email protected]>2016-10-04 15:23:54 +0200
commit1a7c0923206cfb98d43122621a585027c67040ea (patch)
treeacd08f09f5a094e897020e97961b2847209df043 /src/irmd/main.c
parentecdf47b97abb8c5107846f4ef4a17bd62ba6dc82 (diff)
parentc96efb13edfaf9b2f2c626bd2a5d5d5afd38155f (diff)
downloadouroboros-1a7c0923206cfb98d43122621a585027c67040ea.tar.gz
ouroboros-1a7c0923206cfb98d43122621a585027c67040ea.zip
Merged in dstaesse/ouroboros/be-unify (pull request #251)
lib, ipcp: Revise fast path and flow interfaces
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c176
1 files changed, 51 insertions, 125 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index cc9160bf..523741ef 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -21,14 +21,9 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#define OUROBOROS_PREFIX "irmd"
-
#include <ouroboros/config.h>
#include <ouroboros/errno.h>
-#include <ouroboros/logs.h>
#include <ouroboros/sockets.h>
-#include <ouroboros/ipcp.h>
-#include <ouroboros/nsm.h>
#include <ouroboros/list.h>
#include <ouroboros/utils.h>
#include <ouroboros/irm_config.h>
@@ -36,14 +31,19 @@
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/bitmap.h>
-#include <ouroboros/flow.h>
#include <ouroboros/qos.h>
#include <ouroboros/time_utils.h>
+#define OUROBOROS_PREFIX "irmd"
+
+#include <ouroboros/logs.h>
+
+
#include "utils.h"
#include "registry.h"
#include "irm_flow.h"
#include "api_table.h"
+#include "ipcp.h"
#include <sys/socket.h>
#include <sys/un.h>
@@ -60,10 +60,12 @@
struct ipcp_entry {
struct list_head next;
+
char * name;
pid_t api;
enum ipcp_type type;
char * dif_name;
+
pthread_cond_t init_cond;
pthread_mutex_t init_lock;
bool init;
@@ -100,7 +102,7 @@ struct irm {
pthread_t irm_sanitize;
pthread_t shm_sanitize;
-} * irmd = NULL;
+} * irmd;
static struct irm_flow * get_irm_flow(int port_id)
{
@@ -108,7 +110,6 @@ static struct irm_flow * get_irm_flow(int port_id)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->port_id == port_id)
return e;
}
@@ -122,7 +123,6 @@ static struct irm_flow * get_irm_flow_n(pid_t n_api)
list_for_each(pos, &irmd->irm_flows) {
struct irm_flow * e = list_entry(pos, struct irm_flow, next);
-
if (e->n_api == n_api)
return e;
}
@@ -965,8 +965,7 @@ static struct irm_flow * flow_accept(pid_t api, char ** dst_ae_name)
return NULL;
}
- LOG_INFO("New instance (%d) of %s added.", api, e->apn);
-
+ LOG_DBG("New instance (%d) of %s added.", api, e->apn);
LOG_DBG("This instance accepts flows for:");
list_for_each(p, &e->names) {
struct str_el * s = list_entry(p, struct str_el, next);
@@ -1053,8 +1052,8 @@ static int flow_alloc_resp(pid_t n_api,
struct api_entry * e = NULL;
int ret = -1;
- pid_t f_n_1_api;
- pid_t f_n_api;
+ pid_t api_n1;
+ pid_t api_n;
pthread_rwlock_rdlock(&irmd->state_lock);
@@ -1107,21 +1106,17 @@ static int flow_alloc_resp(pid_t n_api,
return -1;
}
- f_n_api = f->n_api;
- f_n_1_api = f->n_1_api;
-
- if (!response) {
- f->state = FLOW_ALLOCATED;
- pthread_cond_signal(&f->state_cond);
- }
+ api_n = f->n_api;
+ api_n1 = f->n_1_api;
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- ret = ipcp_flow_alloc_resp(f_n_1_api,
- port_id,
- f_n_api,
- response);
+ ret = ipcp_flow_alloc_resp(api_n1, port_id, api_n, response);
+
+ if (!(response || ret))
+ irm_flow_set_state(f, FLOW_ALLOCATED);
+
return ret;
}
@@ -1132,6 +1127,7 @@ static struct irm_flow * flow_alloc(pid_t api,
{
struct irm_flow * f;
pid_t ipcp;
+ int port_id;
/* FIXME: Map qos_spec to qos_cube */
@@ -1151,6 +1147,7 @@ static struct irm_flow * flow_alloc(pid_t api,
f->n_api = api;
f->state = FLOW_PENDING;
+
if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0)
LOG_WARN("Failed to set timestamp.");
@@ -1167,7 +1164,7 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->reg_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
- f->port_id = bmp_allocate(irmd->port_ids);
+ port_id = f->port_id = bmp_allocate(irmd->port_ids);
f->n_1_api = ipcp;
list_add(&f->next, &irmd->irm_flows);
@@ -1175,19 +1172,15 @@ static struct irm_flow * flow_alloc(pid_t api,
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- if (ipcp_flow_alloc(ipcp,
- f->port_id,
- f->n_api,
- dst_name,
- src_ae_name,
- QOS_CUBE_BE) < 0) {
+ if (ipcp_flow_alloc(ipcp, port_id, api,
+ dst_name, src_ae_name, QOS_CUBE_BE) < 0) {
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->flows_lock);
list_del(&f->next);
bmp_release(irmd->port_ids, f->port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- free(f);
+ irm_flow_destroy(f);
return NULL;
}
@@ -1208,20 +1201,20 @@ static int flow_alloc_res(int port_id)
f = get_irm_flow(port_id);
if (f == NULL) {
- LOG_ERR("Could not find port %d.", port_id);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_ERR("Could not find port %d.", port_id);
return -1;
}
- if (f->state == FLOW_NULL) {
- LOG_INFO("Port %d is deprecated.", port_id);
+ if (irm_flow_get_state(f) == FLOW_NULL) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
+ LOG_INFO("Port %d is deprecated.", port_id);
return -1;
}
- if (f->state == FLOW_ALLOCATED) {
+ if (irm_flow_get_state(f) == FLOW_ALLOCATED) {
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
return 0;
@@ -1230,35 +1223,13 @@ static int flow_alloc_res(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
- pthread_mutex_lock(&f->state_lock);
-
- while (f->state == FLOW_PENDING)
- pthread_cond_wait(&f->state_cond, &f->state_lock);
-
- pthread_mutex_unlock(&f->state_lock);
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_ALLOCATED) {
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
+ if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED)
return 0;
- }
-
- f->state = FLOW_NULL;
- pthread_cond_broadcast(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
return -1;
}
-static int flow_dealloc(int port_id)
+static int flow_dealloc(pid_t api, int port_id)
{
pid_t n_1_api;
int ret = 0;
@@ -1282,7 +1253,8 @@ static int flow_dealloc(int port_id)
pthread_rwlock_unlock(&irmd->flows_lock);
- ret = ipcp_flow_dealloc(n_1_api, port_id);
+ if (api != n_1_api)
+ ret = ipcp_flow_dealloc(n_1_api, port_id);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1340,6 +1312,9 @@ static struct irm_flow * flow_req_arr(pid_t api,
struct pid_el * c_api;
pid_t h_api = -1;
+ LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",
+ api, dst_name, ae_name);
+
f = irm_flow_create();
if (f == NULL) {
LOG_ERR("Failed to create irm_flow.");
@@ -1490,8 +1465,7 @@ static struct irm_flow * flow_req_arr(pid_t api,
return f;
}
-static int flow_alloc_reply(int port_id,
- int response)
+static int flow_alloc_reply(int port_id, int response)
{
struct irm_flow * f;
@@ -1505,18 +1479,10 @@ static int flow_alloc_reply(int port_id,
return -1;
}
- pthread_mutex_lock(&f->state_lock);
-
if (!response)
- f->state = FLOW_ALLOCATED;
-
+ irm_flow_set_state(f, FLOW_ALLOCATED);
else
- f->state = FLOW_NULL;
-
- if (pthread_cond_signal(&f->state_cond))
- LOG_ERR("Failed to send signal.");
-
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
@@ -1524,30 +1490,6 @@ static int flow_alloc_reply(int port_id,
return 0;
}
-static int flow_dealloc_ipcp(int port_id)
-{
- struct irm_flow * f = NULL;
-
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
-
- f = get_irm_flow(port_id);
- if (f == NULL) {
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- return 0;
- }
-
- list_del(&f->next);
-
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
-
- irm_flow_destroy(f);
-
- return 0;
-}
-
static void irm_destroy()
{
struct list_head * p;
@@ -1729,46 +1671,35 @@ void * irm_sanitize()
struct irm_flow * f =
list_entry(p, struct irm_flow, next);
- pthread_mutex_lock(&f->state_lock);
-
- if (f->state == FLOW_PENDING &&
- ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
+ if (irm_flow_get_state(f) == FLOW_PENDING
+ && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
LOG_INFO("Pending port_id %d timed out.",
f->port_id);
- f->state = FLOW_NULL;
- pthread_cond_signal(&f->state_cond);
- pthread_mutex_unlock(&f->state_lock);
+ irm_flow_set_state(f, FLOW_NULL);
continue;
}
- pthread_mutex_unlock(&f->state_lock);
-
if (kill(f->n_api, 0) < 0) {
- struct shm_ap_rbuff * n_rb =
- shm_ap_rbuff_open_s(f->n_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_api);
bmp_release(irmd->port_ids, f->port_id);
-
list_del(&f->next);
LOG_INFO("AP-I %d gone, flow %d deallocated.",
f->n_api, f->port_id);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- if (n_rb != NULL)
- shm_ap_rbuff_destroy(n_rb);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
continue;
}
if (kill(f->n_1_api, 0) < 0) {
- struct shm_ap_rbuff * n_1_rb_s =
- shm_ap_rbuff_open_s(f->n_1_api);
- struct shm_ap_rbuff * n_1_rb_n =
- shm_ap_rbuff_open_n(f->n_1_api);
+ struct shm_ap_rbuff * rb =
+ shm_ap_rbuff_open(f->n_1_api);
list_del(&f->next);
LOG_ERR("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
- if (n_1_rb_n != NULL)
- shm_ap_rbuff_destroy(n_1_rb_n);
- if (n_1_rb_s != NULL)
- shm_ap_rbuff_destroy(n_1_rb_s);
+ if (rb != NULL)
+ shm_ap_rbuff_destroy(rb);
irm_flow_destroy(f);
}
}
@@ -1939,7 +1870,7 @@ void * mainloop()
break;
case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->port_id);
+ ret_msg.result = flow_dealloc(msg->api, msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
e = flow_req_arr(msg->api,
@@ -1950,7 +1881,6 @@ void * mainloop()
ret_msg.result = -1;
break;
}
- /* FIXME: badly timed dealloc may give SEGV */
ret_msg.has_port_id = true;
ret_msg.port_id = e->port_id;
ret_msg.has_api = true;
@@ -1961,10 +1891,6 @@ void * mainloop()
ret_msg.result = flow_alloc_reply(msg->port_id,
msg->response);
break;
- case IRM_MSG_CODE__IPCP_FLOW_DEALLOC:
- ret_msg.has_result = true;
- ret_msg.result = flow_dealloc_ipcp(msg->port_id);
- break;
default:
LOG_ERR("Don't know that message code.");
break;