diff options
author | dimitri staessens <[email protected]> | 2016-05-10 11:32:59 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2016-05-10 11:32:59 +0200 |
commit | 440b4d33d71b19d0057e50ac61fa0b3127738479 (patch) | |
tree | 42bb8a3c784a128115b0a3a1e662c101fe30e5c6 /src | |
parent | f3b9cc299d729f21d34ed5a7c6b8d06f7b50020e (diff) | |
download | ouroboros-440b4d33d71b19d0057e50ac61fa0b3127738479.tar.gz ouroboros-440b4d33d71b19d0057e50ac61fa0b3127738479.zip |
irmd: introduced locking
This commit adds locking to the IRMd with a single global lock.
It also fixes some issues in cleaning up the daemon.
Diffstat (limited to 'src')
-rw-r--r-- | src/irmd/main.c | 422 | ||||
-rw-r--r-- | src/lib/dev.c | 5 | ||||
-rw-r--r-- | src/lib/ipcp.c | 2 |
3 files changed, 287 insertions, 142 deletions
diff --git a/src/irmd/main.c b/src/irmd/main.c index 8cf03400..2be5e1b6 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -53,7 +53,7 @@ #endif #ifndef IRMD_THREADPOOL_SIZE - #define IRMD_THREADPOOL_SIZE 3 + #define IRMD_THREADPOOL_SIZE 5 #endif enum flow_state { @@ -66,8 +66,6 @@ struct ipcp_entry { struct list_head next; instance_name_t * api; char * dif_name; - - pthread_mutex_t lock; }; /* currently supports only registering whatevercast groups of a single AP-I */ @@ -84,9 +82,9 @@ struct reg_name_entry { char * req_ap_name; char * req_ae_name; int response; + bool flow_arrived; - pthread_cond_t flow_arrived; - pthread_mutex_t fa_lock; + pthread_cond_t acc_signal; }; /* keeps track of port_id's between N and N - 1 */ @@ -98,6 +96,8 @@ struct port_map_entry { pid_t n_pid; pid_t n_1_pid; + pthread_cond_t res_signal; + enum flow_state state; }; @@ -118,9 +118,28 @@ struct irm { pthread_t * threadpool; - pthread_mutex_t lock; + pthread_mutex_t r_lock; } * instance = NULL; +static struct port_map_entry * port_map_entry_create() +{ + struct port_map_entry * e = malloc(sizeof(*e)); + if (e == NULL) + return NULL; + + e->n_pid = 0; + e->n_1_pid = 0; + e->port_id = 0; + e->state = FLOW_NULL; + + if (pthread_cond_init(&e->res_signal, NULL)) { + free(e); + return NULL; + } + + return e; +} + static struct port_map_entry * get_port_map_entry(int port_id) { struct list_head * pos = NULL; @@ -161,7 +180,6 @@ static struct ipcp_entry * ipcp_entry_create() e->dif_name = NULL; INIT_LIST_HEAD(&e->next); - pthread_mutex_init(&e->lock, NULL); return e; } @@ -251,13 +269,14 @@ static struct reg_name_entry * reg_name_entry_create() e->name = NULL; e->api = NULL; e->accept = false; - /* pending response */ - e->response = 1; e->req_ap_name = NULL; e->req_ae_name = NULL; + e->flow_arrived = false; - pthread_mutex_init(&e->fa_lock, NULL); - pthread_cond_init(&e->flow_arrived, NULL); + if (pthread_cond_init(&e->acc_signal, NULL)) { + free(e); + return NULL; + } INIT_LIST_HEAD(&e->next); @@ -290,8 +309,6 @@ static int reg_name_entry_destroy(struct reg_name_entry * e) if (e->req_ae_name != NULL) free(e->req_ae_name); - pthread_mutex_destroy(&e->fa_lock); - free(e); return 0; @@ -394,13 +411,13 @@ static pid_t create_ipcp(char * ap_name, tmp->dif_name = NULL; - pthread_mutex_lock(&instance->lock); + pthread_mutex_lock(&instance->r_lock); list_add(&tmp->next, &instance->ipcps); - pthread_mutex_unlock(&instance->lock); + pthread_mutex_unlock(&instance->r_lock); - LOG_INFO("Created IPCP %s-%d ", ap_name, pid); + LOG_INFO("Created IPCP %s-%d.", ap_name, pid); return pid; } @@ -409,6 +426,8 @@ static int destroy_ipcp(instance_name_t * api) { struct list_head * pos = NULL; struct list_head * n = NULL; + pid_t pid = 0; + if (api == NULL) return 0; @@ -421,6 +440,7 @@ static int destroy_ipcp(instance_name_t * api) return 0; } + pid =api->id; if (ipcp_destroy(api->id)) LOG_ERR("Could not destroy IPCP."); @@ -434,7 +454,7 @@ static int destroy_ipcp(instance_name_t * api) ipcp_entry_destroy(tmp); } - LOG_INFO("Destroyed IPCP %s-%d.", api->name, api->id); + LOG_INFO("Destroyed IPCP %d.", pid); return 0; } @@ -444,33 +464,41 @@ static int bootstrap_ipcp(instance_name_t * api, { struct ipcp_entry * entry = NULL; + pthread_mutex_lock(&instance->r_lock); + if (api->id == 0) api = get_ipcp_by_name(api->name); if (api == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP in the system."); return -1; } entry = get_ipcp_entry_by_name(api); if (entry == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(conf->dif_name); if (entry->dif_name == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } if (ipcp_bootstrap(entry->api->id, conf)) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Could not bootstrap IPCP."); free(entry->dif_name); entry->dif_name = NULL; return -1; } + pthread_mutex_unlock(&instance->r_lock); + LOG_INFO("Bootstrapped IPCP %s-%d in DIF %s.", api->name, api->id, conf->dif_name); @@ -485,14 +513,18 @@ static int enroll_ipcp(instance_name_t * api, ssize_t n_1_difs_size = 0; struct ipcp_entry * entry = NULL; + pthread_mutex_lock(&instance->r_lock); + entry = get_ipcp_entry_by_name(api); if (entry == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No such IPCP."); return -1; } entry->dif_name = strdup(dif_name); if (entry->dif_name == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Failed to strdup."); return -1; } @@ -502,6 +534,7 @@ static int enroll_ipcp(instance_name_t * api, LOG_ERR("Could not find a member of that DIF."); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } @@ -510,6 +543,7 @@ static int enroll_ipcp(instance_name_t * api, LOG_ERR("Could not find N-1 DIFs."); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } @@ -517,9 +551,12 @@ static int enroll_ipcp(instance_name_t * api, LOG_ERR("Could not enroll IPCP."); free(entry->dif_name); entry->dif_name = NULL; + pthread_mutex_unlock(&instance->r_lock); return -1; } + pthread_mutex_unlock(&instance->r_lock); + LOG_INFO("Enrolled IPCP %s-%d in DIF %s.", api->name, api->id, dif_name); @@ -551,50 +588,6 @@ static int unreg_ipcp(instance_name_t * api, return 0; } -static int ap_unreg_id(pid_t pid, - char ** difs, - size_t len) -{ - int i; - int ret = 0; - struct reg_name_entry * rne = NULL; - struct list_head * pos = NULL; - - rne = get_reg_name_entry_by_id(pid); - if (rne == NULL) - return 0; /* no such id */ - - if (instance->ipcps.next == NULL) { - LOG_ERR("No IPCPs in this system."); - return 0; - } - - if (strcmp(difs[0], ALL_DIFS) == 0) { - list_for_each(pos, &instance->ipcps) { - struct ipcp_entry * e = - list_entry(pos, struct ipcp_entry, next); - - if (ipcp_name_unreg(e->api->id, rne->name)) { - LOG_ERR("Could not unregister %s in DIF %s.", - rne->name, e->dif_name); - --ret; - } - } - } else { - for (i = 0; i < len; ++i) { - if (ipcp_name_unreg(pid, rne->name)) { - LOG_ERR("Could not unregister %s in DIF %s.", - rne->name, difs[i]); - --ret; - } - } - } - - reg_name_entry_del_name(rne->name); - - return ret; -} - static int ap_reg(char * ap_name, pid_t ap_id, char ** difs, @@ -608,24 +601,35 @@ static int ap_reg(char * ap_name, instance_name_t * api = NULL; instance_name_t * ipcpi = NULL; - if (instance->ipcps.next == NULL) - return -1; - /* check if this ap_name is already registered */ - rne = get_reg_name_entry_by_name(ap_name); - if (rne != NULL) - return -1; /* can only register one instance for now */ + pthread_mutex_lock(&instance->r_lock); + + if (instance->ipcps.next == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; + } api = instance_name_create(); if (api == NULL) { + pthread_mutex_unlock(&instance->r_lock); return -1; } if (instance_name_init_from(api, ap_name, ap_id) == NULL) { + pthread_mutex_unlock(&instance->r_lock); instance_name_destroy(api); return -1; } + /* check if this ap_name is already registered */ + + rne = get_reg_name_entry_by_name(ap_name); + if (rne != NULL) { + instance_name_destroy(api); + pthread_mutex_unlock(&instance->r_lock); + return -1; /* can only register one instance for now */ + } + /* * for now, the whatevercast name is the same as the ap_name and * contains a single instance only @@ -662,12 +666,15 @@ static int ap_reg(char * ap_name, if (ret == 0) { instance_name_destroy(api); + pthread_mutex_unlock(&instance->r_lock); return -1; } /* for now, we register single instances */ ret = reg_name_entry_add_name_instance(strdup(ap_name), api); + pthread_mutex_unlock(&instance->r_lock); + return ret; } @@ -676,17 +683,57 @@ static int ap_unreg(char * ap_name, char ** difs, size_t len) { - struct reg_name_entry * tmp = NULL; + int i; + int ret = 0; + struct reg_name_entry * rne = NULL; + struct list_head * pos = NULL; + + pthread_mutex_lock(&instance->r_lock); /* check if ap_name is registered */ - tmp = get_reg_name_entry_by_id(ap_id); - if (tmp == NULL) + rne = get_reg_name_entry_by_id(ap_id); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return 0; /* no such id */ + } + + if (strcmp(ap_name, rne->api->name)) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } - if (strcmp(ap_name, tmp->api->name)) + if (instance->ipcps.next == NULL) { + pthread_mutex_unlock(&instance->r_lock); + LOG_ERR("No IPCPs in this system."); return 0; + } + + if (strcmp(difs[0], ALL_DIFS) == 0) { + list_for_each(pos, &instance->ipcps) { + struct ipcp_entry * e = + list_entry(pos, struct ipcp_entry, next); + + if (ipcp_name_unreg(e->api->id, rne->name)) { + LOG_ERR("Could not unregister %s in DIF %s.", + rne->name, e->dif_name); + --ret; + } + } + } else { + for (i = 0; i < len; ++i) { + if (ipcp_name_unreg(ap_id, rne->name)) { + LOG_ERR("Could not unregister %s in DIF %s.", + rne->name, difs[i]); + --ret; + } + } + } - return ap_unreg_id(ap_id, difs, len); + reg_name_entry_del_name(rne->name); + + pthread_mutex_unlock(&instance->r_lock); + + return ret; } static struct port_map_entry * flow_accept(pid_t pid, @@ -694,40 +741,40 @@ static struct port_map_entry * flow_accept(pid_t pid, char ** ae_name) { struct port_map_entry * pme; - struct reg_name_entry * rne = get_reg_name_entry_by_id(pid); + struct reg_name_entry * rne = NULL; + + pthread_mutex_lock(&instance->r_lock); + + rne = get_reg_name_entry_by_id(pid); if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("Unregistered AP calling accept()."); return NULL; } - if (rne->accept) { + pthread_mutex_unlock(&instance->r_lock); LOG_DBGF("This AP still has a pending accept()."); return NULL; } - rne->accept = true; + rne->accept = true; + rne->flow_arrived = false; - pthread_mutex_lock(&rne->fa_lock); - - pthread_cond_wait(&rne->flow_arrived, &rne->fa_lock); - - if (rne->response == -1) { - list_del(&rne->next); - reg_name_entry_destroy(rne); - return NULL; - } + while (!rne->flow_arrived) + pthread_cond_wait(&rne->acc_signal, &instance->r_lock); pme = get_port_map_entry_n(pid); if (pme == NULL) { + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("Port_id was not created yet."); - pthread_mutex_unlock(&rne->fa_lock); return NULL; } *ap_name = rne->req_ap_name; if (ae_name != NULL) *ae_name = rne->req_ae_name; - pthread_mutex_unlock(&rne->fa_lock); + + pthread_mutex_unlock(&instance->r_lock); return pme; } @@ -737,14 +784,19 @@ static int flow_alloc_resp(pid_t n_pid, int response) { struct port_map_entry * pme = NULL; - struct reg_name_entry * rne = get_reg_name_entry_by_id(n_pid); - if (rne == NULL) + struct reg_name_entry * rne = NULL; + + pthread_mutex_lock(&instance->r_lock); + + rne = get_reg_name_entry_by_id(n_pid); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); return -1; + } - pthread_mutex_lock(&rne->fa_lock); /* FIXME: check all instances associated with the name */ if (!rne->accept) { - pthread_mutex_unlock(&rne->fa_lock); + pthread_mutex_unlock(&instance->r_lock); LOG_ERR("No process listening for this name."); return -1; } @@ -754,15 +806,15 @@ static int flow_alloc_resp(pid_t n_pid, * once we can handle a list of AP-I's, remove it from the list */ - rne->accept = false; - rne->response = response; + rne->accept = false; + rne->flow_arrived = false; if (!response) { pme = get_port_map_entry(port_id); pme->state = FLOW_ALLOCATED; } - pthread_mutex_unlock(&rne->fa_lock); + pthread_mutex_unlock(&instance->r_lock); return ipcp_flow_alloc_resp(pme->n_1_pid, port_id, @@ -776,53 +828,96 @@ static struct port_map_entry * flow_alloc(pid_t pid, char * src_ae_name, struct qos_spec * qos) { - struct port_map_entry * e = malloc(sizeof(*e)); - if (e == NULL) { + struct port_map_entry * pme; + instance_name_t * ipcp; + + pme = port_map_entry_create(); + if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); return NULL; } - e->port_id = bmp_allocate(instance->port_ids); - e->n_pid = pid; - e->state = FLOW_PENDING; - e->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + pme->port_id = bmp_allocate(instance->port_ids); + pme->n_pid = pid; + pme->state = FLOW_PENDING; + pme->n_1_pid = get_ipcp_by_dst_name(dst_name)->id; + + pthread_mutex_lock(&instance->r_lock); + + list_add(&pme->next, &instance->port_map); + + ipcp = get_ipcp_by_dst_name(dst_name); - list_add(&e->next, &instance->port_map); + pthread_mutex_unlock(&instance->r_lock); - if (ipcp_flow_alloc(get_ipcp_by_dst_name(dst_name)->id, - e->port_id, - e->n_pid, + if (ipcp == NULL) { + LOG_DBG("unknown ipcp"); + return NULL; + } + + if (ipcp_flow_alloc(ipcp->id, + pme->port_id, + pme->n_pid, dst_name, src_ap_name, src_ae_name, qos) < 0) { - list_del(&e->next); - bmp_release(instance->port_ids, e->port_id); - free(e); + pthread_mutex_lock(&instance->r_lock); + + list_del(&pme->next); + + pthread_mutex_unlock(&instance->r_lock); + + bmp_release(instance->port_ids, pme->port_id); + free(pme); + return NULL; } - return e; + return pme; } static int flow_alloc_res(int port_id) { - bool allocated = false; struct port_map_entry * e; - struct timespec ts = {0,100000}; - while (!allocated) { - /* FIXME: this needs locking */ + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; + } + + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&instance->r_lock); + return 0; + } + + while (true) { + pthread_cond_wait(&e->res_signal, &instance->r_lock); e = get_port_map_entry(port_id); if (e == NULL) { - LOG_DBGF("Could not locate port_id %d", port_id); + pthread_mutex_unlock(&instance->r_lock); return -1; } - if (e->state == FLOW_ALLOCATED) - allocated = true; - nanosleep(&ts, NULL); + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&instance->r_lock); + LOG_DBGF("Returning 0."); + return 0; + } + if (e->state == FLOW_NULL) { + list_del(&e->next); + pthread_mutex_unlock(&instance->r_lock); + free(e); + return -1; + + } + /* still pending, spurious wake */ } + pthread_mutex_unlock(&instance->r_lock); + return 0; } @@ -830,13 +925,22 @@ static int flow_dealloc(int port_id) { pid_t n_1_pid; - struct port_map_entry * e = get_port_map_entry(port_id); - if (e == NULL) + struct port_map_entry * e = NULL; + + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } n_1_pid = e->n_1_pid; list_del(&e->next); + + pthread_mutex_unlock(&instance->r_lock); + free(e); return ipcp_flow_dealloc(n_1_pid, port_id); @@ -850,12 +954,6 @@ static struct port_map_entry * flow_req_arr(pid_t pid, struct reg_name_entry * rne; struct port_map_entry * pme; - rne = get_reg_name_entry_by_name(dst_name); - if (rne == NULL) { - LOG_DBGF("Destination name %s unknown.", dst_name); - return NULL; - } - pme = malloc(sizeof(*pme)); if (pme == NULL) { LOG_ERR("Failed malloc of port_map_entry."); @@ -863,19 +961,32 @@ static struct port_map_entry * flow_req_arr(pid_t pid, } pme->port_id = bmp_allocate(instance->port_ids); - pme->n_pid = rne->api->id; pme->state = FLOW_PENDING; pme->n_1_pid = pid; - list_add(&pme->next, &instance->port_map); + pthread_mutex_lock(&instance->r_lock); - pthread_mutex_lock(&rne->fa_lock); + rne = get_reg_name_entry_by_name(dst_name); + if (rne == NULL) { + pthread_mutex_unlock(&instance->r_lock); + LOG_DBGF("Destination name %s unknown.", dst_name); + free(pme); + return NULL; + } + + pme->n_pid = rne->api->id; + + list_add(&pme->next, &instance->port_map); rne->req_ap_name = strdup(ap_name); rne->req_ae_name = strdup(ae_name); - pthread_mutex_unlock(&rne->fa_lock); - pthread_cond_signal(&rne->flow_arrived); + rne->flow_arrived = true; + + pthread_mutex_unlock(&instance->r_lock); + + if (pthread_cond_signal(&rne->acc_signal)) + LOG_ERR("Failed to send signal."); return pme; } @@ -885,26 +996,48 @@ static int flow_alloc_reply(int port_id, { struct port_map_entry * e; - /* FIXME: do this under lock */ - if (!response) { - e = get_port_map_entry(port_id); - if (e == NULL) - return -1; - e->state = FLOW_ALLOCATED; + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); + return -1; + } + + if (e->state == FLOW_ALLOCATED) { + pthread_mutex_unlock(&instance->r_lock); + return 0; } - /* FIXME: does this need to be propagated to the IPCP? */ + if (!response) + e->state = FLOW_ALLOCATED; + + else + e->state = FLOW_NULL; + + pthread_mutex_unlock(&instance->r_lock); + if (pthread_cond_signal(&e->res_signal)) + LOG_ERR("Failed to send signal."); return 0; } static int flow_dealloc_ipcp(int port_id) { - struct port_map_entry * e = get_port_map_entry(port_id); - if (e == NULL) + struct port_map_entry * e = NULL; + + pthread_mutex_lock(&instance->r_lock); + + e = get_port_map_entry(port_id); + if (e == NULL) { + pthread_mutex_unlock(&instance->r_lock); return 0; + } list_del(&e->next); + + pthread_mutex_unlock(&instance->r_lock); + free(e); return 0; @@ -933,8 +1066,8 @@ static void irm_destroy(struct irm * irm) struct reg_name_entry * e = list_entry(h, struct reg_name_entry, next); - char * difs [1] = {ALL_DIFS}; - ap_unreg_id(e->api->id, difs, 1); + list_del(&e->next); + free(e); } list_for_each_safe(h, t, &irm->port_map) { @@ -1059,6 +1192,7 @@ void * mainloop() e = flow_accept(msg->pid, &ret_msg.ap_name, &ret_msg.ae_name); + if (e == NULL) break; @@ -1186,7 +1320,15 @@ static struct irm * irm_create() return NULL; } - pthread_mutex_init(&i->lock, NULL); + if (pthread_mutex_init(&i->r_lock, NULL)) { + irm_destroy(i); + return NULL; + } + + if (pthread_mutex_init(&i->r_lock, NULL)) { + irm_destroy(i); + return NULL; + } return i; } diff --git a/src/lib/dev.c b/src/lib/dev.c index d574363b..c1cfe043 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -64,6 +64,7 @@ struct ap_data { int ap_init(char * ap_name) { + int i = 0; _ap_instance = malloc(sizeof(struct ap_data)); if (_ap_instance == NULL) { return -1; @@ -106,6 +107,10 @@ int ap_init(char * ap_name) return -1; } + for (i = 0; i < AP_MAX_FLOWS; ++i) + _ap_instance->flows[i].rb = NULL; + + return 0; } diff --git a/src/lib/ipcp.c b/src/lib/ipcp.c index 843582b9..1f1e5c99 100644 --- a/src/lib/ipcp.c +++ b/src/lib/ipcp.c @@ -148,8 +148,6 @@ pid_t ipcp_create(char * ipcp_name, strcat(full_name, exec_name); full_name[len] = '\0'; - LOG_DBG("Full name is %s", full_name); - char * argv[] = {full_name, irmd_pid, ipcp_name, |