summaryrefslogtreecommitdiff
path: root/src/irmd/main.c
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
committerSander Vrijders <[email protected]>2016-05-08 16:34:19 +0200
commit5812dfb832e513dc455a0d48624bcad62334d457 (patch)
tree93a02e1b20f54bb869eadc856f201412c633315c /src/irmd/main.c
parentde8f2015cbd015b1cced366cb12c054be62c23b1 (diff)
parent021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff)
downloadouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz
ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r--src/irmd/main.c764
1 files changed, 574 insertions, 190 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 67254feb..a6403612 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -34,6 +34,7 @@
#include <ouroboros/utils.h>
#include <ouroboros/dif_config.h>
#include <ouroboros/shm_du_map.h>
+#include <ouroboros/bitmap.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -42,41 +43,146 @@
#include <errno.h>
#include <string.h>
#include <limits.h>
+#include <pthread.h>
/* FIXME: this smells like part of namespace management */
#define ALL_DIFS "*"
+#ifndef IRMD_MAX_FLOWS
+ #define IRMD_MAX_FLOWS 4096
+#endif
+
+#ifndef IRMD_THREADPOOL_SIZE
+ #define IRMD_THREADPOOL_SIZE 3
+#endif
+
+
+
+enum flow_state {
+ FLOW_NULL = 0,
+ FLOW_PENDING,
+ FLOW_ALLOCATED
+};
+
struct ipcp_entry {
struct list_head next;
instance_name_t * api;
char * dif_name;
+
+ pthread_mutex_t lock;
};
-/* currently supports only registering whatevercast groups of a single AP */
+/* currently supports only registering whatevercast groups of a single AP-I */
struct reg_name_entry {
struct list_head next;
/* generic whatevercast name */
char * name;
- /* FIXME: resolve name instead */
+ /* FIXME: make a list resolve to AP-I instead */
instance_name_t * api;
- uint32_t reg_ap_id;
+
+ bool accept;
+ char * req_ap_name;
+ char * req_ae_name;
+ bool flow_arrived;
+
+ pthread_mutex_t fa_lock;
+};
+
+/* keeps track of port_id's between N and N - 1 */
+struct port_map_entry {
+ struct list_head next;
+
+ uint32_t port_id;
+
+ pid_t n_pid;
+ pid_t n_1_pid;
+
+ enum flow_state state;
};
struct irm {
- /* FIXME: list of ipcps can be merged with registered names */
+ /* FIXME: list of ipcps could be merged with registered names */
struct list_head ipcps;
struct list_head reg_names;
+ int sockfd;
+
+ /* keep track of all flows in this processing system */
+ struct bmp * port_ids;
+
+ /* maps port_ids to pid pair */
+ struct list_head port_map;
+
struct shm_du_map * dum;
-};
-struct irm * instance = NULL;
+ pthread_t * threadpool;
+
+ pthread_mutex_t lock;
+} * instance = NULL;
-static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
+static struct port_map_entry * get_port_map_entry(uint32_t port_id)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct port_map_entry * get_port_map_entry_n(pid_t n_pid)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &instance->port_map) {
+ struct port_map_entry * e =
+ list_entry(pos, struct port_map_entry, next);
+
+ if (e->n_pid == n_pid)
+ return e;
+ }
+
+ return NULL;
+}
+
+static struct ipcp_entry * ipcp_entry_create()
+{
+ struct ipcp_entry * e = malloc(sizeof(*e));
+ if (e == NULL)
+ return NULL;
+
+ e->api = NULL;
+ e->dif_name = NULL;
+
+ INIT_LIST_HEAD(&e->next);
+ pthread_mutex_init(&e->lock, NULL);
+
+ return e;
+}
+
+static void ipcp_entry_destroy(struct ipcp_entry * e)
+{
+ if (e == NULL)
+ return;
+
+ if (e->api != NULL)
+ instance_name_destroy(e->api);
+
+ if (e->dif_name != NULL)
+ free(e->dif_name);
+
+ free(e);
+}
+
+static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api)
{
- struct ipcp_entry * tmp = NULL;
struct list_head * pos = NULL;
list_for_each(pos, &instance->ipcps) {
@@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api)
return tmp;
}
- return tmp;
+ return NULL;
}
static instance_name_t * get_ipcp_by_name(char * ap_name)
@@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create()
if (e == NULL)
return NULL;
- e->reg_ap_id = rand() % INT_MAX;
- e->name = NULL;
+ e->name = NULL;
+ e->api = NULL;
+ e->accept = false;
+ e->req_ap_name = NULL;
+ e->req_ae_name = NULL;
+ e->flow_arrived = false;
+ pthread_mutex_init(&e->fa_lock, NULL);
INIT_LIST_HEAD(&e->next);
return e;
@@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create()
static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e,
char * name,
- instance_name_t * api)
+ instance_name_t * api)
{
if (e == NULL || name == NULL || api == NULL)
return NULL;
@@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e)
free(e->name);
instance_name_destroy(e->api);
+
+ if (e->req_ap_name != NULL)
+ free(e->req_ap_name);
+ if (e->req_ae_name != NULL)
+ free(e->req_ae_name);
+
+ free(e);
+
return 0;
}
-static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
+static struct reg_name_entry * get_reg_name_entry_by_name(char * name)
{
struct list_head * pos = NULL;
@@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name)
return NULL;
}
-static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
+static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid)
{
struct list_head * pos = NULL;
@@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
struct reg_name_entry * e =
list_entry(pos, struct reg_name_entry, next);
- if (reg_ap_id == e->reg_ap_id)
+ if (e->api->id == pid)
return e;
}
@@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id)
/* FIXME: add only name when we have NSM solved */
static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL) {
e = reg_name_entry_create();
- e = reg_name_entry_init(e, name, api);
+ if (e == NULL)
+ return -1;
+
+ if (reg_name_entry_init(e, name, api) == NULL) {
+ reg_name_entry_destroy(e);
+ return -1;
+ }
+
list_add(&e->next, &instance->reg_names);
return 0;
}
@@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api)
static int reg_name_entry_del_name(char * name)
{
- struct reg_name_entry * e = find_reg_name_entry_by_name(name);
+ struct reg_name_entry * e = get_reg_name_entry_by_name(name);
if (e == NULL)
return 0;
@@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name,
pid = ipcp_create(ap_name, ipcp_type);
if (pid == -1) {
- LOG_ERR("Failed to create IPCP");
+ LOG_ERR("Failed to create IPCP.");
return -1;
}
- tmp = malloc(sizeof(*tmp));
- if (tmp == NULL) {
+ tmp = ipcp_entry_create();
+ if (tmp == NULL)
return -1;
- }
INIT_LIST_HEAD(&tmp->next);
tmp->api = instance_name_create();
if (tmp->api == NULL) {
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) {
instance_name_destroy(tmp->api);
- free(tmp);
+ ipcp_entry_destroy(tmp);
return -1;
}
tmp->dif_name = NULL;
- LOG_DBG("Created IPC process with pid %d", pid);
+ pthread_mutex_lock(&instance->lock);
list_add(&tmp->next, &instance->ipcps);
+
+ pthread_mutex_unlock(&instance->lock);
+
+ LOG_INFO("Created IPCP %s-%d ", ap_name, pid);
+
return pid;
}
@@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api)
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ if (api == NULL)
+ return 0;
+
if (api->id == 0)
api = get_ipcp_by_name(api->name);
if (api == NULL) {
LOG_ERR("No such IPCP in the system.");
- return -1;
+ return 0;
}
- LOG_DBG("Destroying ipcp %s-%d", api->name, api->id);
-
if (ipcp_destroy(api->id))
- LOG_ERR("Could not destroy IPCP");
+ LOG_ERR("Could not destroy IPCP.");
list_for_each_safe(pos, n, &(instance->ipcps)) {
struct ipcp_entry * tmp =
@@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api)
if (instance_name_cmp(api, tmp->api) == 0)
list_del(&tmp->next);
+
+ ipcp_entry_destroy(tmp);
}
+ LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id);
+
return 0;
}
@@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api,
return -1;
}
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(conf->dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
if (ipcp_bootstrap(entry->api->id, conf)) {
- LOG_ERR("Could not bootstrap IPCP");
+ LOG_ERR("Could not bootstrap IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.",
+ api->name, api->id, conf->dif_name);
+
return 0;
}
@@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api,
ssize_t n_1_difs_size = 0;
struct ipcp_entry * entry = NULL;
- entry = find_ipcp_entry_by_name(api);
+ entry = get_ipcp_entry_by_name(api);
if (entry == NULL) {
- LOG_ERR("No such IPCP");
+ LOG_ERR("No such IPCP.");
return -1;
}
entry->dif_name = strdup(dif_name);
if (entry->dif_name == NULL) {
- LOG_ERR("Failed to strdup");
+ LOG_ERR("Failed to strdup.");
return -1;
}
member = da_resolve_daf(dif_name);
if (member == NULL) {
- LOG_ERR("Could not find a member of that DIF");
+ LOG_ERR("Could not find a member of that DIF.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
@@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api,
n_1_difs_size = da_resolve_dap(member, n_1_difs);
if (n_1_difs_size < 1) {
- LOG_ERR("Could not find N-1 DIFs");
+ LOG_ERR("Could not find N-1 DIFs.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) {
- LOG_ERR("Could not enroll IPCP");
+ LOG_ERR("Could not enroll IPCP.");
free(entry->dif_name);
entry->dif_name = NULL;
return -1;
}
+ LOG_INFO("Enrolled IPCP %s-%d in DIF %s.",
+ api->name, api->id, dif_name);
+
return 0;
}
@@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api,
size_t difs_size)
{
if (ipcp_reg(api->id, difs, difs_size)) {
- LOG_ERR("Could not register IPCP to N-1 DIF(s)");
+ LOG_ERR("Could not register IPCP to N-1 DIF(s).");
return -1;
}
@@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api,
{
if (ipcp_unreg(api->id, difs, difs_size)) {
- LOG_ERR("Could not unregister IPCP from N-1 DIF(s)");
+ LOG_ERR("Could not unregister IPCP from N-1 DIF(s).");
return -1;
}
return 0;
}
-static int ap_unreg_id(uint32_t reg_ap_id,
- pid_t pid,
+static int ap_unreg_id(pid_t pid,
char ** difs,
size_t len)
{
int i;
int ret = 0;
- struct reg_name_entry * rne = NULL;
- struct list_head * pos = NULL;
+ struct reg_name_entry * rne = NULL;
+ struct list_head * pos = NULL;
- rne = find_reg_name_entry_by_id(reg_ap_id);
+ rne = get_reg_name_entry_by_id(pid);
if (rne == NULL)
return 0; /* no such id */
@@ -458,7 +598,6 @@ static int ap_reg(char * ap_name,
{
int i;
int ret = 0;
- int reg_ap_id = 0;
struct list_head * pos = NULL;
struct reg_name_entry * rne = NULL;
@@ -466,18 +605,18 @@ static int ap_reg(char * ap_name,
instance_name_t * ipcpi = NULL;
if (instance->ipcps.next == NULL)
- LOG_ERR("No IPCPs in this system.");
+ return -1;
/* check if this ap_name is already registered */
- rne = find_reg_name_entry_by_name(ap_name);
+ rne = get_reg_name_entry_by_name(ap_name);
if (rne != NULL)
return -1; /* can only register one instance for now */
- rne = reg_name_entry_create();
- if (rne == NULL)
+ api = instance_name_create();
+ if (api == NULL) {
return -1;
+ }
- api = instance_name_create();
if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
instance_name_destroy(api);
return -1;
@@ -488,12 +627,6 @@ static int ap_reg(char * ap_name,
* contains a single instance only
*/
- if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) {
- reg_name_entry_destroy(rne);
- instance_name_destroy(api);
- return -1;
- }
-
if (strcmp(difs[0], ALL_DIFS) == 0) {
list_for_each(pos, &instance->ipcps) {
struct ipcp_entry * e =
@@ -528,11 +661,10 @@ static int ap_reg(char * ap_name,
return -1;
}
/* for now, we register single instances */
- reg_name_entry_add_name_instance(strdup(ap_name),
- instance_name_dup(api));
- instance_name_destroy(api);
+ ret = reg_name_entry_add_name_instance(strdup(ap_name),
+ api);
- return reg_ap_id;
+ return ret;
}
static int ap_unreg(char * ap_name,
@@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name,
{
struct reg_name_entry * tmp = NULL;
- instance_name_t * api = instance_name_create();
- if (api == NULL)
- return -1;
-
- if (instance_name_init_from(api, ap_name, ap_id) == NULL) {
- instance_name_destroy(api);
- return -1;
- }
-
/* check if ap_name is registered */
- tmp = find_reg_name_entry_by_name(api->name);
- if (tmp == NULL) {
- instance_name_destroy(api);
+ tmp = get_reg_name_entry_by_id(ap_id);
+ if (tmp == NULL)
return 0;
- } else {
- return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len);
- }
-}
+ if (strcmp(ap_name, tmp->api->name))
+ return 0;
-static int flow_accept(int fd,
- pid_t pid,
- char * ap_name,
- char * ae_name)
-{
- return -1;
+ return ap_unreg_id(ap_id, difs, len);
}
-static int flow_alloc_resp(int fd,
- int result)
+static struct port_map_entry * flow_accept(pid_t pid,
+ char ** ap_name,
+ char ** ae_name)
{
- return -1;
+ bool arrived = false;
+
+ struct timespec ts = {0, 100000};
+
+ struct port_map_entry * pme;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL) {
+ LOG_DBGF("Unregistered AP calling accept().");
+ return NULL;
+ }
+
+ if (rne->accept) {
+ LOG_DBGF("This AP still has a pending accept().");
+ return NULL;
+ }
+
+ rne->accept = true;
+
+ /* FIXME: wait for a thread that runs select() on flow_arrived */
+ while (!arrived) {
+ /* FIXME: this needs locking */
+ rne = get_reg_name_entry_by_id(pid);
+ if (rne == NULL)
+ return NULL;
+ arrived = rne->flow_arrived;
+ nanosleep(&ts, NULL);
+ }
+
+ pme = get_port_map_entry_n(pid);
+ if (pme == NULL) {
+ LOG_ERR("Port_id was not created yet.");
+ return NULL;
+ }
+
+ pthread_mutex_lock(&rne->fa_lock);
+ *ap_name = rne->req_ap_name;
+ if (ae_name != NULL)
+ *ae_name = rne->req_ae_name;
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
-static int flow_alloc(char * dst_name,
- char * src_ap_name,
- char * src_ae_name,
- struct qos_spec * qos,
- int oflags)
+static int flow_alloc_resp(pid_t n_pid,
+ uint32_t port_id,
+ int response)
{
- int port_id = 0;
- pid_t pid = get_ipcp_by_dst_name(dst_name)->id;
+ struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid);
+ struct port_map_entry * pme = get_port_map_entry(port_id);
- LOG_DBG("flow alloc received from %s-%s to %s.",
- src_ap_name, src_ae_name, dst_name);
+ if (rne == NULL || pme == NULL)
+ return -1;
- return ipcp_flow_alloc(pid,
- port_id,
- dst_name,
- src_ap_name,
- src_ae_name,
- qos);
+ /* FIXME: check all instances associated with the name */
+ if (!rne->accept) {
+ LOG_ERR("No process listening for this name.");
+ return -1;
+ }
+
+ /*
+ * consider the flow as handled
+ * once we can handle a list of AP-I's, remove it from the list
+ */
+
+ rne->flow_arrived = false;
+ rne->accept = false;
+
+ if (!response)
+ pme->state = FLOW_ALLOCATED;
+
+ return ipcp_flow_alloc_resp(pme->n_1_pid,
+ port_id,
+ pme->n_pid,
+ response);
}
-static int flow_alloc_res(int fd)
+static struct port_map_entry * flow_alloc(pid_t pid,
+ char * dst_name,
+ char * src_ap_name,
+ char * src_ae_name,
+ struct qos_spec * qos)
{
+ struct port_map_entry * e = malloc(sizeof(*e));
+ if (e == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
- return -1;
+ e->port_id = bmp_allocate(instance->port_ids);
+ e->n_pid = pid;
+ e->state = FLOW_PENDING;
+ e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id;
+
+ list_add(&e->next, &instance->port_map);
+
+ if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id,
+ e->port_id,
+ e->n_pid,
+ dst_name,
+ src_ap_name,
+ src_ae_name,
+ qos) < 0) {
+ list_del(&e->next);
+ bmp_release(instance->port_ids, e->port_id);
+ free(e);
+ return NULL;
+ }
+
+ return e;
}
-static int flow_dealloc(int fd)
+static int flow_alloc_res(uint32_t port_id)
{
- return -1;
+ bool allocated = false;
+ struct port_map_entry * e;
+ struct timespec ts = {0,100000};
+
+ while (!allocated) {
+ /* FIXME: this needs locking */
+ e = get_port_map_entry(port_id);
+ if (e == NULL) {
+ LOG_DBGF("Could not locate port_id %u", port_id);
+ return -1;
+ }
+ if (e->state == FLOW_ALLOCATED)
+ allocated = true;
+ nanosleep(&ts, NULL);
+ }
+
+ return 0;
}
-static int flow_cntl(int fd,
- int oflags)
+static int flow_dealloc(uint32_t port_id)
{
- return -1;
+ pid_t n_1_pid;
+
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ n_1_pid = e->n_1_pid;
+
+ list_del(&e->next);
+ free(e);
+
+ return ipcp_flow_dealloc(n_1_pid, port_id);
}
-static int flow_req_arr(char * dst_name,
- char * ap_name,
- char * ae_name)
+static struct port_map_entry * flow_req_arr(pid_t pid,
+ char * dst_name,
+ char * ap_name,
+ char * ae_name)
{
- return -1;
+ struct reg_name_entry * rne;
+ struct port_map_entry * pme;
+
+ rne = get_reg_name_entry_by_name(dst_name);
+ if (rne == NULL) {
+ LOG_DBGF("Destination name %s unknown.", dst_name);
+ return NULL;
+ }
+
+ pme = malloc(sizeof(*pme));
+ if (pme == NULL) {
+ LOG_ERR("Failed malloc of port_map_entry.");
+ return NULL;
+ }
+
+ pme->port_id = bmp_allocate(instance->port_ids);
+ pme->n_pid = rne->api->id;
+ pme->state = FLOW_PENDING;
+ pme->n_1_pid = pid;
+
+ list_add(&pme->next, &instance->port_map);
+
+ pthread_mutex_lock(&rne->fa_lock);
+
+ rne->req_ap_name = strdup(ap_name);
+ rne->req_ae_name = strdup(ae_name);
+
+ rne->flow_arrived = true;
+
+ pthread_mutex_unlock(&rne->fa_lock);
+
+ return pme;
}
static int flow_alloc_reply(uint32_t port_id,
- int result)
+ int response)
{
- return -1;
+ struct port_map_entry * e;
+
+ /* FIXME: do this under lock */
+ if (!response) {
+ e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return -1;
+ e->state = FLOW_ALLOCATED;
+ }
+
+ /* FIXME: does this need to be propagated to the IPCP? */
+
+ return 0;
}
static int flow_dealloc_ipcp(uint32_t port_id)
{
- return -1;
+ struct port_map_entry * e = get_port_map_entry(port_id);
+ if (e == NULL)
+ return 0;
+
+ list_del(&e->next);
+ free(e);
+
+ return 0;
+}
+
+static void irm_destroy(struct irm * irm)
+{
+ struct list_head * h;
+ struct list_head * t;
+
+ if (irm == NULL)
+ return;
+
+ if (irm->threadpool != NULL)
+ free(irm->threadpool);
+
+ if (irm->port_ids != NULL)
+ bmp_destroy(irm->port_ids);
+ /* clear the lists */
+ list_for_each_safe(h, t, &irm->ipcps) {
+ struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next);
+ destroy_ipcp(e->api);
+ }
+
+ list_for_each_safe(h, t, &irm->reg_names) {
+ struct reg_name_entry * e = list_entry(h,
+ struct reg_name_entry,
+ next);
+ char * difs [1] = {ALL_DIFS};
+ ap_unreg_id(e->api->id, difs, 1);
+ }
+
+ list_for_each_safe(h, t, &irm->port_map) {
+ struct port_map_entry * e = list_entry(h,
+ struct port_map_entry,
+ next);
+ list_del(&e->next);
+ free(e);
+ }
+
+ if (irm->dum != NULL)
+ shm_du_map_destroy(irm->dum);
+
+ close(irm->sockfd);
+ free(irm);
}
void irmd_sig_handler(int sig, siginfo_t * info, void * c)
{
+ int i;
+
switch(sig) {
case SIGINT:
case SIGTERM:
case SIGHUP:
- shm_du_map_close(instance->dum);
- free(instance);
- exit(0);
+ if (instance->threadpool != NULL) {
+ for (i = 0; i < IRMD_THREADPOOL_SIZE; i++)
+ pthread_cancel(instance->threadpool[i]);
+ }
+
+ case SIGPIPE:
+ LOG_DBG("Ignoring SIGPIPE.");
default:
return;
}
}
-int main()
+void * mainloop()
{
- int sockfd;
uint8_t buf[IRM_MSG_BUF_SIZE];
- struct sigaction sig_act;
-
- /* init sig_act */
- memset(&sig_act, 0, sizeof sig_act);
-
- /* install signal traps */
- sig_act.sa_sigaction = &irmd_sig_handler;
- sig_act.sa_flags = SA_SIGINFO;
-
- sigaction(SIGINT, &sig_act, NULL);
- sigaction(SIGTERM, &sig_act, NULL);
- sigaction(SIGHUP, &sig_act, NULL);
-
- if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
- unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
-
- instance = malloc(sizeof(*instance));
- if (instance == NULL)
- return -1;
-
- if ((instance->dum = shm_du_map_create()) == NULL) {
- free(instance);
- return -1;
- }
-
- INIT_LIST_HEAD(&instance->ipcps);
- INIT_LIST_HEAD(&instance->reg_names);
-
- sockfd = server_socket_open(IRM_SOCK_PATH);
- if (sockfd < 0) {
- shm_du_map_close(instance->dum);
- free(instance);
- return -1;
- }
-
while (true) {
int cli_sockfd;
irm_msg_t * msg;
@@ -692,18 +979,19 @@ int main()
instance_name_t api;
buffer_t buffer;
irm_msg_t ret_msg = IRM_MSG__INIT;
+ struct port_map_entry * e = NULL;
ret_msg.code = IRM_MSG_CODE__IRM_REPLY;
- cli_sockfd = accept(sockfd, 0, 0);
+ cli_sockfd = accept(instance->sockfd, 0, 0);
if (cli_sockfd < 0) {
- LOG_ERR("Cannot accept new connection");
+ LOG_ERR("Cannot accept new connection.");
continue;
}
count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE);
if (count <= 0) {
- LOG_ERR("Failed to read from socket");
+ LOG_ERR("Failed to read from socket.");
close(cli_sockfd);
continue;
}
@@ -750,11 +1038,11 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_REG:
- ret_msg.has_fd = true;
- ret_msg.fd = ap_reg(msg->ap_name,
- msg->pid,
- msg->dif_name,
- msg->n_dif_name);
+ ret_msg.has_result = true;
+ ret_msg.result = ap_reg(msg->ap_name,
+ msg->pid,
+ msg->dif_name,
+ msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_AP_UNREG:
ret_msg.has_result = true;
@@ -764,43 +1052,57 @@ int main()
msg->n_dif_name);
break;
case IRM_MSG_CODE__IRM_FLOW_ACCEPT:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_accept(msg->fd,
- msg->pid,
- ret_msg.ap_name,
- ret_msg.ae_name);
+ e = flow_accept(msg->pid,
+ &ret_msg.ap_name,
+ &ret_msg.ae_name);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP:
ret_msg.has_result = true;
- ret_msg.result = flow_alloc_resp(msg->fd,
- msg->result);
+ ret_msg.result = flow_alloc_resp(msg->pid,
+ msg->port_id,
+ msg->response);
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC:
- ret_msg.has_fd = true;
- ret_msg.fd = flow_alloc(msg->dst_name,
- msg->ap_name,
- msg->ae_name,
- NULL,
- msg->oflags);
+ e = flow_alloc(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name,
+ NULL);
+ if (e == NULL)
+ break;
+
+ ret_msg.has_port_id = true;
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_1_pid;
break;
case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES:
- ret_msg.has_response = true;
- ret_msg.response = flow_alloc_res(msg->fd);
- break;
- case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_dealloc(msg->fd);
+ ret_msg.result = flow_alloc_res(msg->port_id);
break;
- case IRM_MSG_CODE__IRM_FLOW_CONTROL:
+ case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
ret_msg.has_result = true;
- ret_msg.result = flow_cntl(msg->fd,
- msg->oflags);
+ ret_msg.result = flow_dealloc(msg->port_id);
break;
case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
+ e = flow_req_arr(msg->pid,
+ msg->dst_name,
+ msg->ap_name,
+ msg->ae_name);
+ if (e == NULL)
+ break;
+
ret_msg.has_port_id = true;
- ret_msg.port_id = flow_req_arr(msg->dst_name,
- msg->ap_name,
- msg->ae_name);
+ ret_msg.port_id = e->port_id;
+ ret_msg.has_pid = true;
+ ret_msg.pid = e->n_pid;
break;
case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY:
ret_msg.has_result = true;
@@ -812,7 +1114,7 @@ int main()
ret_msg.result = flow_dealloc_ipcp(msg->port_id);
break;
default:
- LOG_ERR("Don't know that message code");
+ LOG_ERR("Don't know that message code.");
break;
}
@@ -820,7 +1122,7 @@ int main()
buffer.size = irm_msg__get_packed_size(&ret_msg);
if (buffer.size == 0) {
- LOG_ERR("Failed to send reply message");
+ LOG_ERR("Failed to send reply message.");
close(cli_sockfd);
continue;
}
@@ -842,6 +1144,88 @@ int main()
free(buffer.data);
close(cli_sockfd);
}
+}
+
+static struct irm * irm_create()
+{
+ struct irm * i = malloc(sizeof(*i));
+ if (i == NULL)
+ return NULL;
+
+ if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1)
+ unlink("/dev/shm/" SHM_DU_MAP_FILENAME);
+
+ i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE);
+ if (i->threadpool == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ if ((i->dum = shm_du_map_create()) == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&i->ipcps);
+ INIT_LIST_HEAD(&i->reg_names);
+ INIT_LIST_HEAD(&i->port_map);
+
+ i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0);
+ if (i->port_ids == NULL) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ i->sockfd = server_socket_open(IRM_SOCK_PATH);
+ if (i->sockfd < 0) {
+ irm_destroy(i);
+ return NULL;
+ }
+
+ pthread_mutex_init(&i->lock, NULL);
+
+ return i;
+}
+
+int main()
+{
+ struct sigaction sig_act;
+
+ int t = 0;
+
+ /* init sig_act */
+ memset(&sig_act, 0, sizeof sig_act);
+
+ /* install signal traps */
+ sig_act.sa_sigaction = &irmd_sig_handler;
+ sig_act.sa_flags = SA_SIGINFO;
+
+ if (sigaction(SIGINT, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGTERM, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGHUP, &sig_act, NULL) < 0)
+ exit(1);
+ if (sigaction(SIGPIPE, &sig_act, NULL) < 0)
+ exit(1);
+
+ instance = irm_create();
+ if (instance == NULL)
+ return 1;
+
+ /*
+ * FIXME: we need a main loop that delegates messages to subthreads in a
+ * way that avoids all possible deadlocks for local apps
+ */
+
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_create(&instance->threadpool[t], NULL, mainloop, NULL);
+
+ /* wait for (all of them) to return */
+ for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t)
+ pthread_join(instance->threadpool[t], NULL);
+
+ irm_destroy(instance);
return 0;
}