diff options
author | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2016-05-08 16:34:19 +0200 |
commit | 5812dfb832e513dc455a0d48624bcad62334d457 (patch) | |
tree | 93a02e1b20f54bb869eadc856f201412c633315c /src/irmd/main.c | |
parent | de8f2015cbd015b1cced366cb12c054be62c23b1 (diff) | |
parent | 021af9e01ce6c6376534b33ef1a06ea4189028d4 (diff) | |
download | ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.tar.gz ouroboros-5812dfb832e513dc455a0d48624bcad62334d457.zip |
Merged in dstaesse/ouroboros/be-fast-path (pull request #65)
irmd: flow allocation and fast path
Diffstat (limited to 'src/irmd/main.c')
-rw-r--r-- | src/irmd/main.c | 764 |
1 files changed, 574 insertions, 190 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 67254feb..a6403612 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -34,6 +34,7 @@ #include <ouroboros/utils.h> #include <ouroboros/dif_config.h> #include <ouroboros/shm_du_map.h> +#include <ouroboros/bitmap.h> #include <sys/socket.h> #include <sys/un.h> @@ -42,41 +43,146 @@ #include <errno.h> #include <string.h> #include <limits.h> +#include <pthread.h> /* FIXME: this smells like part of namespace management */ #define ALL_DIFS "*" +#ifndef IRMD_MAX_FLOWS + #define IRMD_MAX_FLOWS 4096 +#endif + +#ifndef IRMD_THREADPOOL_SIZE + #define IRMD_THREADPOOL_SIZE 3 +#endif + + + +enum flow_state { + FLOW_NULL = 0, + FLOW_PENDING, + FLOW_ALLOCATED +}; + struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; + + pthread_mutex_t lock; }; -/* currently supports only registering whatevercast groups of a single AP */ +/* currently supports only registering whatevercast groups of a single AP-I */ struct reg_name_entry { struct list_head next; /* generic whatevercast name */ char * name; - /* FIXME: resolve name instead */ + /* FIXME: make a list resolve to AP-I instead */ instance_name_t * api; - uint32_t reg_ap_id; + + bool accept; + char * req_ap_name; + char * req_ae_name; + bool flow_arrived; + + pthread_mutex_t fa_lock; +}; + +/* keeps track of port_id's between N and N - 1 */ +struct port_map_entry { + struct list_head next; + + uint32_t port_id; + + pid_t n_pid; + pid_t n_1_pid; + + enum flow_state state; }; struct irm { - /* FIXME: list of ipcps can be merged with registered names */ + /* FIXME: list of ipcps could be merged with registered names */ struct list_head ipcps; struct list_head reg_names; + int sockfd; + + /* keep track of all flows in this processing system */ + struct bmp * port_ids; + + /* maps port_ids to pid pair */ + struct list_head port_map; + struct shm_du_map * dum; -}; -struct irm * instance = NULL; + pthread_t * threadpool; + + pthread_mutex_t lock; +} * instance = NULL; -static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) +static struct port_map_entry * get_port_map_entry(uint32_t port_id) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->port_id == port_id) + return e; + } + + return NULL; +} + +static struct port_map_entry * get_port_map_entry_n(pid_t n_pid) +{ + struct list_head * pos = NULL; + + list_for_each(pos, &instance->port_map) { + struct port_map_entry * e = + list_entry(pos, struct port_map_entry, next); + + if (e->n_pid == n_pid) + return e; + } + + return NULL; +} + +static struct ipcp_entry * ipcp_entry_create() +{ + struct ipcp_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->api = NULL; + e->dif_name = NULL; + + INIT_LIST_HEAD(&e->next); + pthread_mutex_init(&e->lock, NULL); + + return e; +} + +static void ipcp_entry_destroy(struct ipcp_entry * e) +{ + if (e == NULL) + return; + + if (e->api != NULL) + instance_name_destroy(e->api); + + if (e->dif_name != NULL) + free(e->dif_name); + + free(e); +} + +static struct ipcp_entry * get_ipcp_entry_by_name(instance_name_t * api) { - struct ipcp_entry * tmp = NULL; struct list_head * pos = NULL; list_for_each(pos, &instance->ipcps) { @@ -87,7 +193,7 @@ static struct ipcp_entry * find_ipcp_entry_by_name(instance_name_t * api) return tmp; } - return tmp; + return NULL; } static instance_name_t * get_ipcp_by_name(char * ap_name) @@ -143,9 +249,14 @@ static struct reg_name_entry * reg_name_entry_create() if (e == NULL) return NULL; - e->reg_ap_id = rand() % INT_MAX; - e->name = NULL; + e->name = NULL; + e->api = NULL; + e->accept = false; + e->req_ap_name = NULL; + e->req_ae_name = NULL; + e->flow_arrived = false; + pthread_mutex_init(&e->fa_lock, NULL); INIT_LIST_HEAD(&e->next); return e; @@ -153,7 +264,7 @@ static struct reg_name_entry * reg_name_entry_create() static struct reg_name_entry * reg_name_entry_init(struct reg_name_entry * e, char * name, - instance_name_t * api) + instance_name_t * api) { if (e == NULL || name == NULL || api == NULL) return NULL; @@ -171,10 +282,18 @@ static int reg_name_entry_destroy(struct reg_name_entry * e) free(e->name); instance_name_destroy(e->api); + + if (e->req_ap_name != NULL) + free(e->req_ap_name); + if (e->req_ae_name != NULL) + free(e->req_ae_name); + + free(e); + return 0; } -static struct reg_name_entry * find_reg_name_entry_by_name(char * name) +static struct reg_name_entry * get_reg_name_entry_by_name(char * name) { struct list_head * pos = NULL; @@ -189,7 +308,7 @@ static struct reg_name_entry * find_reg_name_entry_by_name(char * name) return NULL; } -static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) +static struct reg_name_entry * get_reg_name_entry_by_id(pid_t pid) { struct list_head * pos = NULL; @@ -197,7 +316,7 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) struct reg_name_entry * e = list_entry(pos, struct reg_name_entry, next); - if (reg_ap_id == e->reg_ap_id) + if (e->api->id == pid) return e; } @@ -207,10 +326,17 @@ static struct reg_name_entry * find_reg_name_entry_by_id(uint32_t reg_ap_id) /* FIXME: add only name when we have NSM solved */ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) { e = reg_name_entry_create(); - e = reg_name_entry_init(e, name, api); + if (e == NULL) + return -1; + + if (reg_name_entry_init(e, name, api) == NULL) { + reg_name_entry_destroy(e); + return -1; + } + list_add(&e->next, &instance->reg_names); return 0; } @@ -221,7 +347,7 @@ static int reg_name_entry_add_name_instance(char * name, instance_name_t * api) static int reg_name_entry_del_name(char * name) { - struct reg_name_entry * e = find_reg_name_entry_by_name(name); + struct reg_name_entry * e = get_reg_name_entry_by_name(name); if (e == NULL) return 0; @@ -240,34 +366,38 @@ static pid_t create_ipcp(char * ap_name, pid = ipcp_create(ap_name, ipcp_type); if (pid == -1) { - LOG_ERR("Failed to create IPCP"); + LOG_ERR("Failed to create IPCP."); return -1; } - tmp = malloc(sizeof(*tmp)); - if (tmp == NULL) { + tmp = ipcp_entry_create(); + if (tmp == NULL) return -1; - } INIT_LIST_HEAD(&tmp->next); tmp->api = instance_name_create(); if (tmp->api == NULL) { - free(tmp); + ipcp_entry_destroy(tmp); return -1; } if(instance_name_init_from(tmp->api, ap_name, pid) == NULL) { instance_name_destroy(tmp->api); - free(tmp); + ipcp_entry_destroy(tmp); return -1; } tmp->dif_name = NULL; - LOG_DBG("Created IPC process with pid %d", pid); + pthread_mutex_lock(&instance->lock); list_add(&tmp->next, &instance->ipcps); + + pthread_mutex_unlock(&instance->lock); + + LOG_INFO("Created IPCP %s-%d ", ap_name, pid); + return pid; } @@ -276,18 +406,19 @@ static int destroy_ipcp(instance_name_t * api) struct list_head * pos = NULL; struct list_head * n = NULL; + if (api == NULL) + return 0; + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { LOG_ERR("No such IPCP in the system."); - return -1; + return 0; } - LOG_DBG("Destroying ipcp %s-%d", api->name, api->id); - if (ipcp_destroy(api->id)) - LOG_ERR("Could not destroy IPCP"); + LOG_ERR("Could not destroy IPCP."); list_for_each_safe(pos, n, &(instance->ipcps)) { struct ipcp_entry * tmp = @@ -295,8 +426,12 @@ static int destroy_ipcp(instance_name_t * api) if (instance_name_cmp(api, tmp->api) == 0) list_del(&tmp->next); + + ipcp_entry_destroy(tmp); } + LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); + return 0; } @@ -313,25 +448,28 @@ static int bootstrap_ipcp(instance_name_t * api, return -1; } - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { - LOG_ERR("Could not bootstrap IPCP"); + LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", + api->name, api->id, conf->dif_name); + return 0; } @@ -343,21 +481,21 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; - entry = find_ipcp_entry_by_name(api); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { - LOG_ERR("No such IPCP"); + LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { - LOG_ERR("Failed to strdup"); + LOG_ERR("Failed to strdup."); return -1; } member = da_resolve_daf(dif_name); if (member == NULL) { - LOG_ERR("Could not find a member of that DIF"); + LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; return -1; @@ -365,19 +503,22 @@ static int enroll_ipcp(instance_name_t * api, n_1_difs_size = da_resolve_dap(member, n_1_difs); if (n_1_difs_size < 1) { - LOG_ERR("Could not find N-1 DIFs"); + LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; return -1; } if (ipcp_enroll(entry->api->id, member, n_1_difs[0])) { - LOG_ERR("Could not enroll IPCP"); + LOG_ERR("Could not enroll IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", + api->name, api->id, dif_name); + return 0; } @@ -386,7 +527,7 @@ static int reg_ipcp(instance_name_t * api, size_t difs_size) { if (ipcp_reg(api->id, difs, difs_size)) { - LOG_ERR("Could not register IPCP to N-1 DIF(s)"); + LOG_ERR("Could not register IPCP to N-1 DIF(s)."); return -1; } @@ -399,24 +540,23 @@ static int unreg_ipcp(instance_name_t * api, { if (ipcp_unreg(api->id, difs, difs_size)) { - LOG_ERR("Could not unregister IPCP from N-1 DIF(s)"); + LOG_ERR("Could not unregister IPCP from N-1 DIF(s)."); return -1; } return 0; } -static int ap_unreg_id(uint32_t reg_ap_id, - pid_t pid, +static int ap_unreg_id(pid_t pid, char ** difs, size_t len) { int i; int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; + struct reg_name_entry * rne = NULL; + struct list_head * pos = NULL; - rne = find_reg_name_entry_by_id(reg_ap_id); + rne = get_reg_name_entry_by_id(pid); if (rne == NULL) return 0; /* no such id */ @@ -458,7 +598,6 @@ static int ap_reg(char * ap_name, { int i; int ret = 0; - int reg_ap_id = 0; struct list_head * pos = NULL; struct reg_name_entry * rne = NULL; @@ -466,18 +605,18 @@ static int ap_reg(char * ap_name, instance_name_t * ipcpi = NULL; if (instance->ipcps.next == NULL) - LOG_ERR("No IPCPs in this system."); + return -1; /* check if this ap_name is already registered */ - rne = find_reg_name_entry_by_name(ap_name); + rne = get_reg_name_entry_by_name(ap_name); if (rne != NULL) return -1; /* can only register one instance for now */ - rne = reg_name_entry_create(); - if (rne == NULL) + api = instance_name_create(); + if (api == NULL) { return -1; + } - api = instance_name_create(); if (instance_name_init_from(api, ap_name, ap_id) == NULL) { instance_name_destroy(api); return -1; @@ -488,12 +627,6 @@ static int ap_reg(char * ap_name, * contains a single instance only */ - if (reg_name_entry_init(rne, strdup(ap_name), api) == NULL) { - reg_name_entry_destroy(rne); - instance_name_destroy(api); - return -1; - } - if (strcmp(difs[0], ALL_DIFS) == 0) { list_for_each(pos, &instance->ipcps) { struct ipcp_entry * e = @@ -528,11 +661,10 @@ static int ap_reg(char * ap_name, return -1; } /* for now, we register single instances */ - reg_name_entry_add_name_instance(strdup(ap_name), - instance_name_dup(api)); - instance_name_destroy(api); + ret = reg_name_entry_add_name_instance(strdup(ap_name), + api); - return reg_ap_id; + return ret; } static int ap_unreg(char * ap_name, @@ -542,149 +674,304 @@ static int ap_unreg(char * ap_name, { struct reg_name_entry * tmp = NULL; - instance_name_t * api = instance_name_create(); - if (api == NULL) - return -1; - - if (instance_name_init_from(api, ap_name, ap_id) == NULL) { - instance_name_destroy(api); - return -1; - } - /* check if ap_name is registered */ - tmp = find_reg_name_entry_by_name(api->name); - if (tmp == NULL) { - instance_name_destroy(api); + tmp = get_reg_name_entry_by_id(ap_id); + if (tmp == NULL) return 0; - } else { - return ap_unreg_id(tmp->reg_ap_id, api->id, difs, len); - } -} + if (strcmp(ap_name, tmp->api->name)) + return 0; -static int flow_accept(int fd, - pid_t pid, - char * ap_name, - char * ae_name) -{ - return -1; + return ap_unreg_id(ap_id, difs, len); } -static int flow_alloc_resp(int fd, - int result) +static struct port_map_entry * flow_accept(pid_t pid, + char ** ap_name, + char ** ae_name) { - return -1; + bool arrived = false; + + struct timespec ts = {0, 100000}; + + struct port_map_entry * pme; + struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) { + LOG_DBGF("Unregistered AP calling accept()."); + return NULL; + } + + if (rne->accept) { + LOG_DBGF("This AP still has a pending accept()."); + return NULL; + } + + rne->accept = true; + + /* FIXME: wait for a thread that runs select() on flow_arrived */ + while (!arrived) { + /* FIXME: this needs locking */ + rne = get_reg_name_entry_by_id(pid); + if (rne == NULL) + return NULL; + arrived = rne->flow_arrived; + nanosleep(&ts, NULL); + } + + pme = get_port_map_entry_n(pid); + if (pme == NULL) { + LOG_ERR("Port_id was not created yet."); + return NULL; + } + + pthread_mutex_lock(&rne->fa_lock); + *ap_name = rne->req_ap_name; + if (ae_name != NULL) + *ae_name = rne->req_ae_name; + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } -static int flow_alloc(char * dst_name, - char * src_ap_name, - char * src_ae_name, - struct qos_spec * qos, - int oflags) +static int flow_alloc_resp(pid_t n_pid, + uint32_t port_id, + int response) { - int port_id = 0; - pid_t pid = get_ipcp_by_dst_name(dst_name)->id; + struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); + struct port_map_entry * pme = get_port_map_entry(port_id); - LOG_DBG("flow alloc received from %s-%s to %s.", - src_ap_name, src_ae_name, dst_name); + if (rne == NULL || pme == NULL) + return -1; - return ipcp_flow_alloc(pid, - port_id, - dst_name, - src_ap_name, - src_ae_name, - qos); + /* FIXME: check all instances associated with the name */ + if (!rne->accept) { + LOG_ERR("No process listening for this name."); + return -1; + } + + /* + * consider the flow as handled + * once we can handle a list of AP-I's, remove it from the list + */ + + rne->flow_arrived = false; + rne->accept = false; + + if (!response) + pme->state = FLOW_ALLOCATED; + + return ipcp_flow_alloc_resp(pme->n_1_pid, + port_id, + pme->n_pid, + response); } -static int flow_alloc_res(int fd) +static struct port_map_entry * flow_alloc(pid_t pid, + char * dst_name, + char * src_ap_name, + char * src_ae_name, + struct qos_spec * qos) { + struct port_map_entry * e = malloc(sizeof(*e)); + if (e == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } - return -1; + e->port_id = bmp_allocate(instance->port_ids); + e->n_pid = pid; + e->state = FLOW_PENDING; + e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + list_add(&e->next, &instance->port_map); + + if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, + e->port_id, + e->n_pid, + dst_name, + src_ap_name, + src_ae_name, + qos) < 0) { + list_del(&e->next); + bmp_release(instance->port_ids, e->port_id); + free(e); + return NULL; + } + + return e; } -static int flow_dealloc(int fd) +static int flow_alloc_res(uint32_t port_id) { - return -1; + bool allocated = false; + struct port_map_entry * e; + struct timespec ts = {0,100000}; + + while (!allocated) { + /* FIXME: this needs locking */ + e = get_port_map_entry(port_id); + if (e == NULL) { + LOG_DBGF("Could not locate port_id %u", port_id); + return -1; + } + if (e->state == FLOW_ALLOCATED) + allocated = true; + nanosleep(&ts, NULL); + } + + return 0; } -static int flow_cntl(int fd, - int oflags) +static int flow_dealloc(uint32_t port_id) { - return -1; + pid_t n_1_pid; + + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + n_1_pid = e->n_1_pid; + + list_del(&e->next); + free(e); + + return ipcp_flow_dealloc(n_1_pid, port_id); } -static int flow_req_arr(char * dst_name, - char * ap_name, - char * ae_name) +static struct port_map_entry * flow_req_arr(pid_t pid, + char * dst_name, + char * ap_name, + char * ae_name) { - return -1; + struct reg_name_entry * rne; + struct port_map_entry * pme; + + rne = get_reg_name_entry_by_name(dst_name); + if (rne == NULL) { + LOG_DBGF("Destination name %s unknown.", dst_name); + return NULL; + } + + pme = malloc(sizeof(*pme)); + if (pme == NULL) { + LOG_ERR("Failed malloc of port_map_entry."); + return NULL; + } + + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_pid = rne->api->id; + pme->state = FLOW_PENDING; + pme->n_1_pid = pid; + + list_add(&pme->next, &instance->port_map); + + pthread_mutex_lock(&rne->fa_lock); + + rne->req_ap_name = strdup(ap_name); + rne->req_ae_name = strdup(ae_name); + + rne->flow_arrived = true; + + pthread_mutex_unlock(&rne->fa_lock); + + return pme; } static int flow_alloc_reply(uint32_t port_id, - int result) + int response) { - return -1; + struct port_map_entry * e; + + /* FIXME: do this under lock */ + if (!response) { + e = get_port_map_entry(port_id); + if (e == NULL) + return -1; + e->state = FLOW_ALLOCATED; + } + + /* FIXME: does this need to be propagated to the IPCP? */ + + return 0; } static int flow_dealloc_ipcp(uint32_t port_id) { - return -1; + struct port_map_entry * e = get_port_map_entry(port_id); + if (e == NULL) + return 0; + + list_del(&e->next); + free(e); + + return 0; +} + +static void irm_destroy(struct irm * irm) +{ + struct list_head * h; + struct list_head * t; + + if (irm == NULL) + return; + + if (irm->threadpool != NULL) + free(irm->threadpool); + + if (irm->port_ids != NULL) + bmp_destroy(irm->port_ids); + /* clear the lists */ + list_for_each_safe(h, t, &irm->ipcps) { + struct ipcp_entry * e = list_entry(h, struct ipcp_entry, next); + destroy_ipcp(e->api); + } + + list_for_each_safe(h, t, &irm->reg_names) { + struct reg_name_entry * e = list_entry(h, + struct reg_name_entry, + next); + char * difs [1] = {ALL_DIFS}; + ap_unreg_id(e->api->id, difs, 1); + } + + list_for_each_safe(h, t, &irm->port_map) { + struct port_map_entry * e = list_entry(h, + struct port_map_entry, + next); + list_del(&e->next); + free(e); + } + + if (irm->dum != NULL) + shm_du_map_destroy(irm->dum); + + close(irm->sockfd); + free(irm); } void irmd_sig_handler(int sig, siginfo_t * info, void * c) { + int i; + switch(sig) { case SIGINT: case SIGTERM: case SIGHUP: - shm_du_map_close(instance->dum); - free(instance); - exit(0); + if (instance->threadpool != NULL) { + for (i = 0; i < IRMD_THREADPOOL_SIZE; i++) + pthread_cancel(instance->threadpool[i]); + } + + case SIGPIPE: + LOG_DBG("Ignoring SIGPIPE."); default: return; } } -int main() +void * mainloop() { - int sockfd; uint8_t buf[IRM_MSG_BUF_SIZE]; - struct sigaction sig_act; - - /* init sig_act */ - memset(&sig_act, 0, sizeof sig_act); - - /* install signal traps */ - sig_act.sa_sigaction = &irmd_sig_handler; - sig_act.sa_flags = SA_SIGINFO; - - sigaction(SIGINT, &sig_act, NULL); - sigaction(SIGTERM, &sig_act, NULL); - sigaction(SIGHUP, &sig_act, NULL); - - if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) - unlink("/dev/shm/" SHM_DU_MAP_FILENAME); - - instance = malloc(sizeof(*instance)); - if (instance == NULL) - return -1; - - if ((instance->dum = shm_du_map_create()) == NULL) { - free(instance); - return -1; - } - - INIT_LIST_HEAD(&instance->ipcps); - INIT_LIST_HEAD(&instance->reg_names); - - sockfd = server_socket_open(IRM_SOCK_PATH); - if (sockfd < 0) { - shm_du_map_close(instance->dum); - free(instance); - return -1; - } - while (true) { int cli_sockfd; irm_msg_t * msg; @@ -692,18 +979,19 @@ int main() instance_name_t api; buffer_t buffer; irm_msg_t ret_msg = IRM_MSG__INIT; + struct port_map_entry * e = NULL; ret_msg.code = IRM_MSG_CODE__IRM_REPLY; - cli_sockfd = accept(sockfd, 0, 0); + cli_sockfd = accept(instance->sockfd, 0, 0); if (cli_sockfd < 0) { - LOG_ERR("Cannot accept new connection"); + LOG_ERR("Cannot accept new connection."); continue; } count = read(cli_sockfd, buf, IRM_MSG_BUF_SIZE); if (count <= 0) { - LOG_ERR("Failed to read from socket"); + LOG_ERR("Failed to read from socket."); close(cli_sockfd); continue; } @@ -750,11 +1038,11 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_REG: - ret_msg.has_fd = true; - ret_msg.fd = ap_reg(msg->ap_name, - msg->pid, - msg->dif_name, - msg->n_dif_name); + ret_msg.has_result = true; + ret_msg.result = ap_reg(msg->ap_name, + msg->pid, + msg->dif_name, + msg->n_dif_name); break; case IRM_MSG_CODE__IRM_AP_UNREG: ret_msg.has_result = true; @@ -764,43 +1052,57 @@ int main() msg->n_dif_name); break; case IRM_MSG_CODE__IRM_FLOW_ACCEPT: - ret_msg.has_fd = true; - ret_msg.fd = flow_accept(msg->fd, - msg->pid, - ret_msg.ap_name, - ret_msg.ae_name); + e = flow_accept(msg->pid, + &ret_msg.ap_name, + &ret_msg.ae_name); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP: ret_msg.has_result = true; - ret_msg.result = flow_alloc_resp(msg->fd, - msg->result); + ret_msg.result = flow_alloc_resp(msg->pid, + msg->port_id, + msg->response); break; case IRM_MSG_CODE__IRM_FLOW_ALLOC: - ret_msg.has_fd = true; - ret_msg.fd = flow_alloc(msg->dst_name, - msg->ap_name, - msg->ae_name, - NULL, - msg->oflags); + e = flow_alloc(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name, + NULL); + if (e == NULL) + break; + + ret_msg.has_port_id = true; + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_1_pid; break; case IRM_MSG_CODE__IRM_FLOW_ALLOC_RES: - ret_msg.has_response = true; - ret_msg.response = flow_alloc_res(msg->fd); - break; - case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_dealloc(msg->fd); + ret_msg.result = flow_alloc_res(msg->port_id); break; - case IRM_MSG_CODE__IRM_FLOW_CONTROL: + case IRM_MSG_CODE__IRM_FLOW_DEALLOC: ret_msg.has_result = true; - ret_msg.result = flow_cntl(msg->fd, - msg->oflags); + ret_msg.result = flow_dealloc(msg->port_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: + e = flow_req_arr(msg->pid, + msg->dst_name, + msg->ap_name, + msg->ae_name); + if (e == NULL) + break; + ret_msg.has_port_id = true; - ret_msg.port_id = flow_req_arr(msg->dst_name, - msg->ap_name, - msg->ae_name); + ret_msg.port_id = e->port_id; + ret_msg.has_pid = true; + ret_msg.pid = e->n_pid; break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: ret_msg.has_result = true; @@ -812,7 +1114,7 @@ int main() ret_msg.result = flow_dealloc_ipcp(msg->port_id); break; default: - LOG_ERR("Don't know that message code"); + LOG_ERR("Don't know that message code."); break; } @@ -820,7 +1122,7 @@ int main() buffer.size = irm_msg__get_packed_size(&ret_msg); if (buffer.size == 0) { - LOG_ERR("Failed to send reply message"); + LOG_ERR("Failed to send reply message."); close(cli_sockfd); continue; } @@ -842,6 +1144,88 @@ int main() free(buffer.data); close(cli_sockfd); } +} + +static struct irm * irm_create() +{ + struct irm * i = malloc(sizeof(*i)); + if (i == NULL) + return NULL; + + if (access("/dev/shm/" SHM_DU_MAP_FILENAME, F_OK) != -1) + unlink("/dev/shm/" SHM_DU_MAP_FILENAME); + + i->threadpool = malloc(sizeof(pthread_t) * IRMD_THREADPOOL_SIZE); + if (i->threadpool == NULL) { + irm_destroy(i); + return NULL; + } + + if ((i->dum = shm_du_map_create()) == NULL) { + irm_destroy(i); + return NULL; + } + + INIT_LIST_HEAD(&i->ipcps); + INIT_LIST_HEAD(&i->reg_names); + INIT_LIST_HEAD(&i->port_map); + + i->port_ids = bmp_create(IRMD_MAX_FLOWS, 0); + if (i->port_ids == NULL) { + irm_destroy(i); + return NULL; + } + + i->sockfd = server_socket_open(IRM_SOCK_PATH); + if (i->sockfd < 0) { + irm_destroy(i); + return NULL; + } + + pthread_mutex_init(&i->lock, NULL); + + return i; +} + +int main() +{ + struct sigaction sig_act; + + int t = 0; + + /* init sig_act */ + memset(&sig_act, 0, sizeof sig_act); + + /* install signal traps */ + sig_act.sa_sigaction = &irmd_sig_handler; + sig_act.sa_flags = SA_SIGINFO; + + if (sigaction(SIGINT, &sig_act, NULL) < 0) + exit(1); + if (sigaction(SIGTERM, &sig_act, NULL) < 0) + exit(1); + if (sigaction(SIGHUP, &sig_act, NULL) < 0) + exit(1); + if (sigaction(SIGPIPE, &sig_act, NULL) < 0) + exit(1); + + instance = irm_create(); + if (instance == NULL) + return 1; + + /* + * FIXME: we need a main loop that delegates messages to subthreads in a + * way that avoids all possible deadlocks for local apps + */ + + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_create(&instance->threadpool[t], NULL, mainloop, NULL); + + /* wait for (all of them) to return */ + for (t = 0; t < IRMD_THREADPOOL_SIZE; ++t) + pthread_join(instance->threadpool[t], NULL); + + irm_destroy(instance); return 0; } |