diff options
author | dimitri staessens <[email protected]> | 2017-04-04 02:43:10 +0200 |
---|---|---|
committer | dimitri staessens <[email protected]> | 2017-04-04 09:27:02 +0200 |
commit | f48008cdd28bf31e6f0646b1bb3786f0dc0aede0 (patch) | |
tree | b68ec1c7f6ed81169cfd42f4cac1af6fe0bb65c6 | |
parent | 0f30eaa3d4dd573f9af30a9fd0c5d22bad63c560 (diff) | |
download | ouroboros-f48008cdd28bf31e6f0646b1bb3786f0dc0aede0.tar.gz ouroboros-f48008cdd28bf31e6f0646b1bb3786f0dc0aede0.zip |
lib, irmd, ipcpd: Stabilize flow allocation
-rw-r--r-- | src/ipcpd/local/main.c | 78 | ||||
-rw-r--r-- | src/irmd/api_table.c | 3 | ||||
-rw-r--r-- | src/irmd/main.c | 69 | ||||
-rw-r--r-- | src/irmd/registry.c | 61 | ||||
-rw-r--r-- | src/irmd/registry.h | 1 | ||||
-rw-r--r-- | src/lib/dev.c | 79 |
6 files changed, 186 insertions, 105 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 718c8d7e..bb7f8325 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -85,7 +85,7 @@ static void * ipcp_local_sdu_loop(void * o) (void) o; - while (flow_event_wait(local_data.flows, local_data.fq, &timeout)) { + while (true) { int fd; ssize_t idx; @@ -96,9 +96,14 @@ static void * ipcp_local_sdu_loop(void * o) return (void *) 1; /* -ENOTENROLLED */ } - pthread_rwlock_rdlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + + flow_event_wait(local_data.flows, local_data.fq, &timeout); while ((fd = fqueue_next(local_data.fq)) >= 0) { + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&local_data.lock); + idx = local_flow_read(fd); assert(idx < (SHM_BUFFER_SIZE)); @@ -107,10 +112,11 @@ static void * ipcp_local_sdu_loop(void * o) if (fd != -1) local_flow_write(fd, idx); + + pthread_rwlock_unlock(&ipcpi.state_lock); + pthread_rwlock_unlock(&local_data.lock); } - pthread_rwlock_unlock(&local_data.lock); - pthread_rwlock_unlock(&ipcpi.state_lock); } return (void *) 0; @@ -229,12 +235,6 @@ static int ipcp_local_flow_alloc(int fd, assert(dst_name); - out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube); - if (out_fd < 0) { - log_dbg("Flow allocation failed."); - return -1; - } - pthread_rwlock_rdlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_OPERATIONAL) { @@ -243,16 +243,37 @@ static int ipcp_local_flow_alloc(int fd, return -1; /* -ENOTENROLLED */ } + /* + * This function needs to return completely before + * flow_resp. Taking the wrlock on the data is the simplest + * way to achieve this. + */ + pthread_rwlock_wrlock(&local_data.lock); - local_data.in_out[fd] = out_fd; - local_data.in_out[out_fd] = fd; + out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube); + if (out_fd < 0) { + pthread_rwlock_unlock(&ipcpi.state_lock); + log_dbg("Flow allocation failed: %d", out_fd); + return -1; + } - flow_set_add(local_data.flows, fd); + /* + * The idea of the port_wait_assign in dev.c was to do the + * above synchronisation. But if the lock is not taken, the + * resp() function may be called before a lock would be taken + * here. This shim will be deprecated, but ideally the sync is + * fixed in ipcp.c. + */ + + local_data.in_out[fd] = out_fd; + local_data.in_out[out_fd] = fd; pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); + flow_set_add(local_data.flows, fd); + log_info("Pending local allocation request on fd %d.", fd); return 0; @@ -264,24 +285,30 @@ static int ipcp_local_flow_alloc_resp(int fd, int out_fd = -1; int ret = -1; - if (response) - return 0; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&local_data.lock); + pthread_rwlock_wrlock(&local_data.lock); + + if (response) { + if (local_data.in_out[fd] != -1) + local_data.in_out[local_data.in_out[fd]] = fd; + local_data.in_out[fd] = -1; + pthread_rwlock_unlock(&local_data.lock); + pthread_rwlock_unlock(&ipcpi.state_lock); + return 0; + } out_fd = local_data.in_out[fd]; - if (out_fd < 0) { + if (out_fd == -1) { pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); return -1; } - flow_set_add(local_data.flows, fd); - pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); + flow_set_add(local_data.flows, fd); + if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) return -1; @@ -297,24 +324,17 @@ static int ipcp_local_flow_dealloc(int fd) ipcp_flow_fini(fd); pthread_rwlock_rdlock(&ipcpi.state_lock); - - if (ipcp_get_state() != IPCP_OPERATIONAL) { - pthread_rwlock_unlock(&ipcpi.state_lock); - log_dbg("Won't register with non-enrolled IPCP."); - return -1; /* -ENOTENROLLED */ - } - pthread_rwlock_wrlock(&local_data.lock); flow_set_del(local_data.flows, fd); local_data.in_out[fd] = -1; - flow_dealloc(fd); - pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); + flow_dealloc(fd); + log_info("Flow with fd %d deallocated.", fd); return 0; diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c index 1c655004..5ff0fcf6 100644 --- a/src/irmd/api_table.c +++ b/src/irmd/api_table.c @@ -185,7 +185,8 @@ int api_entry_sleep(struct api_entry * e) &dl); if (e->state == API_DESTROY) { - reg_entry_del_api(e->re, e->api); + if (e->re != NULL) + reg_entry_del_api(e->re, e->api); ret = -1; } diff --git a/src/irmd/main.c b/src/irmd/main.c index 19d27bf9..e6647285 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1063,16 +1063,21 @@ static struct irm_flow * flow_accept(pid_t api) api_n1 = f->n_1_api; port_id = f->port_id; - log_info("Flow on port_id %d allocated.", f->port_id); - pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_rdlock(&irmd->reg_lock); e = api_table_get(&irmd->api_table, api); if (e == NULL) { pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + list_del(&f->next); + bmp_release(irmd->port_ids, f->port_id); + pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + clear_irm_flow(f); + irm_flow_set_state(f, FLOW_NULL); + irm_flow_destroy(f); log_dbg("Process gone while accepting flow."); return NULL; } @@ -1085,8 +1090,15 @@ static struct irm_flow * flow_accept(pid_t api) if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) { pthread_rwlock_unlock(&irmd->reg_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + list_del(&f->next); + bmp_release(irmd->port_ids, f->port_id); + pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1); + clear_irm_flow(f); + irm_flow_set_state(f, FLOW_NULL); + irm_flow_destroy(f); log_err("Entry in wrong state."); return NULL; } @@ -1097,12 +1109,22 @@ static struct irm_flow * flow_accept(pid_t api) pthread_rwlock_unlock(&irmd->state_lock); if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) { - log_dbg("Failed to respond to alloc."); + pthread_rwlock_rdlock(&irmd->state_lock); + pthread_rwlock_wrlock(&irmd->flows_lock); + list_del(&f->next); + pthread_rwlock_unlock(&irmd->flows_lock); + pthread_rwlock_unlock(&irmd->state_lock); + log_dbg("Failed to respond to alloc. Port_id invalidated."); + clear_irm_flow(f); + irm_flow_set_state(f, FLOW_NULL); + irm_flow_destroy(f); return NULL; } irm_flow_set_state(f, FLOW_ALLOCATED); + log_info("Flow on port_id %d allocated.", f->port_id); + return f; } @@ -1157,17 +1179,9 @@ static struct irm_flow * flow_alloc(pid_t api, assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING); - if (ipcp_flow_alloc(ipcp, port_id, api, - dst_name, cube) < 0) { - pthread_rwlock_rdlock(&irmd->state_lock); - pthread_rwlock_wrlock(&irmd->flows_lock); - list_del(&f->next); - clear_irm_flow(f); - bmp_release(irmd->port_ids, f->port_id); - pthread_rwlock_unlock(&irmd->flows_lock); - pthread_rwlock_unlock(&irmd->state_lock); - irm_flow_set_state(f, FLOW_NULL); - irm_flow_destroy(f); + if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) { + /* sanitizer cleans this */ + log_info("Failed to respond to alloc."); return NULL; } @@ -1351,7 +1365,6 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_rdlock(&irmd->state_lock); pthread_rwlock_wrlock(&irmd->reg_lock); - case REG_NAME_FLOW_ACCEPT: h_api = reg_entry_get_api(re); if (h_api == -1) { @@ -1691,19 +1704,17 @@ void * irm_sanitize(void * o) if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { - list_del(&f->next); log_dbg("Pending port_id %d timed out.", f->port_id); - clear_irm_flow(f); + f->n_1_api = -1; + irm_flow_set_state(f, FLOW_DEALLOC_PENDING); ipcp_flow_dealloc(f->n_1_api, f->port_id); - bmp_release(irmd->port_ids, f->port_id); - irm_flow_destroy(f); continue; } if (kill(f->n_api, 0) < 0) { struct shm_flow_set * set; - log_dbg("AP-I %d gone, flow %d deallocated.", + log_dbg("AP-I %d gone, deallocating flow %d.", f->n_api, f->port_id); set = shm_flow_set_open(f->n_api); if (set != NULL) @@ -1711,22 +1722,18 @@ void * irm_sanitize(void * o) f->n_api = -1; irm_flow_set_state(f, FLOW_DEALLOC_PENDING); ipcp_flow_dealloc(f->n_1_api, f->port_id); - clear_irm_flow(f); continue; } if (kill(f->n_1_api, 0) < 0) { struct shm_flow_set * set; - list_del(&f->next); log_err("IPCP %d gone, flow %d removed.", f->n_1_api, f->port_id); set = shm_flow_set_open(f->n_api); if (set != NULL) shm_flow_set_destroy(set); - - clear_irm_flow(f); - bmp_release(irmd->port_ids, f->port_id); - irm_flow_destroy(f); + f->n_1_api = -1; + irm_flow_set_state(f, FLOW_DEALLOC_PENDING); } } @@ -2087,6 +2094,8 @@ static int irm_create(void) if (irmd == NULL) return -ENOMEM; + memset(irmd, 0, sizeof(*irmd)); + memset(&st, 0, sizeof(st)); irmd->state = IRMD_NULL; @@ -2158,7 +2167,10 @@ static int irm_create(void) if ((irmd->lf = lockfile_create()) == NULL) { if ((irmd->lf = lockfile_open()) == NULL) { log_err("Lockfile error."); - irm_destroy(); + free(irmd->threadpool); + bmp_destroy(irmd->thread_ids); + bmp_destroy(irmd->port_ids); + free(irmd); return -1; } @@ -2172,6 +2184,9 @@ static int irm_create(void) log_info("IRMd already running (%d), exiting.", lockfile_owner(irmd->lf)); lockfile_close(irmd->lf); + free(irmd->threadpool); + bmp_destroy(irmd->thread_ids); + bmp_destroy(irmd->port_ids); free(irmd); return -1; } diff --git a/src/irmd/registry.c b/src/irmd/registry.c index 2043ca46..53be77cd 100644 --- a/src/irmd/registry.c +++ b/src/irmd/registry.c @@ -37,6 +37,7 @@ #include <signal.h> #include <unistd.h> #include <limits.h> +#include <assert.h> struct reg_dif { struct list_head next; @@ -121,6 +122,12 @@ static void reg_entry_destroy(struct reg_entry * e) if (e->name != NULL) free(e->name); + list_for_each_safe(p, h, &e->reg_apis) { + struct pid_el * pe = list_entry(p, struct pid_el, next); + list_del(&pe->next); + free(pe); + } + list_for_each_safe(p, h, &e->reg_apns) { struct str_el * a = list_entry(p, struct str_el, next); list_del(&a->next); @@ -224,9 +231,13 @@ int reg_entry_add_apn(struct reg_entry * e, list_add(&n->next, &e->reg_apns); + pthread_mutex_lock(&e->state_lock); + if (e->state == REG_NAME_IDLE) e->state = REG_NAME_AUTO_ACCEPT; + pthread_mutex_unlock(&e->state_lock); + return 0; } @@ -245,11 +256,14 @@ void reg_entry_del_apn(struct reg_entry * e, } } + pthread_mutex_lock(&e->state_lock); + if (e->state == REG_NAME_AUTO_ACCEPT && list_is_empty(&e->reg_apns)) { e->state = REG_NAME_IDLE; pthread_cond_broadcast(&e->state_cond); } + pthread_mutex_unlock(&e->state_lock); } char * reg_entry_get_apn(struct reg_entry * e) @@ -279,27 +293,29 @@ int reg_entry_add_api(struct reg_entry * e, { struct pid_el * i; - if (e == NULL) - return -EINVAL; + assert(e); if (reg_entry_has_api(e, api)) { log_dbg("Instance already registered with this name."); return -EPERM; } + pthread_mutex_lock(&e->state_lock); + if (e->state == REG_NAME_NULL) { + pthread_mutex_unlock(&e->state_lock); log_dbg("Tried to add instance in NULL state."); return -EPERM; } i = malloc(sizeof(*i)); - if (i == NULL) + if (i == NULL) { + pthread_mutex_unlock(&e->state_lock); return -ENOMEM; + } i->pid = api; - pthread_mutex_lock(&e->state_lock); - list_add(&i->next, &e->reg_apis); if (e->state == REG_NAME_IDLE || @@ -316,6 +332,8 @@ int reg_entry_add_api(struct reg_entry * e, static void reg_entry_check_state(struct reg_entry * e) { + assert(e); + if (e->state == REG_NAME_DESTROY) { e->state = REG_NAME_NULL; pthread_cond_broadcast(&e->state_cond); @@ -337,6 +355,9 @@ static void reg_entry_check_state(struct reg_entry * e) void reg_entry_del_pid_el(struct reg_entry * e, struct pid_el * p) { + assert(e); + assert(p); + list_del(&p->next); free(p); @@ -349,6 +370,8 @@ void reg_entry_del_api(struct reg_entry * e, struct list_head * p; struct list_head * h; + assert(e); + if (e == NULL) return; @@ -378,8 +401,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e) { enum reg_name_state state; - if (e == NULL) - return REG_NAME_NULL; + assert(e); pthread_mutex_lock(&e->state_lock); @@ -393,8 +415,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e) int reg_entry_set_state(struct reg_entry * e, enum reg_name_state state) { - if (state == REG_NAME_DESTROY) - return -EPERM; + assert(state != REG_NAME_DESTROY); pthread_mutex_lock(&e->state_lock); @@ -413,8 +434,8 @@ int reg_entry_leave_state(struct reg_entry * e, struct timespec abstime; int ret = 0; - if (e == NULL || state == REG_NAME_DESTROY) - return -EINVAL; + assert(e); + assert(state != REG_NAME_DESTROY); if (timeout != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -450,8 +471,8 @@ int reg_entry_wait_state(struct reg_entry * e, struct timespec abstime; int ret = 0; - if (e == NULL || state == REG_NAME_DESTROY) - return -EINVAL; + assert(e); + assert(state != REG_NAME_DESTROY); if (timeout != NULL) { clock_gettime(PTHREAD_COND_CLOCK, &abstime); @@ -487,6 +508,8 @@ struct reg_entry * registry_get_entry(struct list_head * registry, { struct list_head * p = NULL; + assert(registry); + list_for_each(p, registry) { struct reg_entry * e = list_entry(p, struct reg_entry, next); if (!wildcard_match(name, e->name)) @@ -501,8 +524,8 @@ struct reg_entry * registry_add_name(struct list_head * registry, { struct reg_entry * e = NULL; - if (name == NULL) - return NULL; + assert(registry); + assert(name); if (registry_has_name(registry, name)) { log_dbg("Name %s already registered.", name); @@ -545,12 +568,13 @@ void registry_del_api(struct list_head * registry, { struct list_head * p; - if ( api == -1) - return; + assert(registry); + assert(api > 0); list_for_each(p, registry) { struct reg_entry * e = list_entry(p, struct reg_entry, next); pthread_mutex_lock(&e->state_lock); + assert(e); reg_entry_del_api(e, api); pthread_mutex_unlock(&e->state_lock); } @@ -586,8 +610,7 @@ void registry_destroy(struct list_head * registry) struct list_head * p = NULL; struct list_head * h = NULL; - if (registry == NULL) - return; + assert(registry); list_for_each_safe(p, h, registry) { struct reg_entry * e = list_entry(p, struct reg_entry, next); diff --git a/src/irmd/registry.h b/src/irmd/registry.h index 2c766732..08e78019 100644 --- a/src/irmd/registry.h +++ b/src/irmd/registry.h @@ -63,7 +63,6 @@ struct reg_entry { struct list_head reg_apis; enum reg_name_state state; - qoscube_t qos; pthread_cond_t state_cond; pthread_mutex_t state_lock; }; diff --git a/src/lib/dev.c b/src/lib/dev.c index 5acbada2..c063fd47 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -49,6 +49,7 @@ struct fqueue { enum port_state { PORT_NULL = 0, + PORT_INIT, PORT_ID_PENDING, PORT_ID_ASSIGNED, PORT_DESTROY @@ -82,7 +83,7 @@ static void port_destroy(struct port * p) pthread_cond_wait(&p->state_cond, &p->state_lock); p->fd = -1; - p->state = PORT_ID_PENDING; + p->state = PORT_INIT; pthread_mutex_unlock(&p->state_lock); } @@ -109,11 +110,14 @@ static enum port_state port_wait_assign(struct port * p) pthread_mutex_lock(&p->state_lock); - if (p->state != PORT_ID_PENDING) { + if (p->state == PORT_ID_ASSIGNED) { pthread_mutex_unlock(&p->state_lock); - return -1; + return PORT_ID_ASSIGNED; } + if(p->state == PORT_INIT) + p->state = PORT_ID_PENDING; + while (p->state == PORT_ID_PENDING) pthread_cond_wait(&p->state_cond, &p->state_lock); @@ -124,6 +128,8 @@ static enum port_state port_wait_assign(struct port * p) state = p->state; + assert(state != PORT_INIT); + pthread_mutex_unlock(&p->state_lock); return state; @@ -237,7 +243,6 @@ static void reset_flow(int fd) shm_flow_set_close(ai.flows[fd].set); init_flow(fd); - } int ap_init(const char * ap_name) @@ -319,7 +324,7 @@ int ap_init(const char * ap_name) } for (i = 0; i < IRMD_MAX_FLOWS; ++i) { - ai.ports[i].state = PORT_ID_PENDING; + ai.ports[i].state = PORT_INIT; pthread_mutex_init(&ai.ports[i].state_lock, NULL); pthread_cond_init(&ai.ports[i].state_cond, NULL); } @@ -354,8 +359,7 @@ void ap_fini() ssize_t idx; while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) shm_rdrbuff_remove(ai.rdrb, idx); - port_set_state(&ai.ports[ai.flows[i].port_id], - PORT_NULL); + port_destroy(&ai.ports[ai.flows[i].port_id]); reset_flow(i); } } @@ -459,6 +463,8 @@ int flow_accept(qosspec_t * qs, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT); + if (qs != NULL) fill_qosspec(qs, ai.flows[fd].cube); @@ -555,6 +561,8 @@ int flow_alloc(const char * dst_name, ai.flows[fd].api = recv_msg->api; ai.flows[fd].cube = recv_msg->qoscube; + assert(ai.ports[recv_msg->port_id].state == PORT_INIT); + ai.ports[recv_msg->port_id].fd = fd; ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED; @@ -600,6 +608,7 @@ int flow_dealloc(int fd) if (!recv_msg->has_result) { irm_msg__free_unpacked(recv_msg, NULL); + assert(false); return -1; } @@ -1113,8 +1122,8 @@ int np1_flow_alloc(pid_t n_api, ai.flows[fd].oflags = FLOW_O_DEFAULT; ai.flows[fd].api = n_api; - ai.ports[port_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + ai.ports[port_id].fd = fd; + ai.ports[port_id].state = PORT_ID_ASSIGNED; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1127,7 +1136,7 @@ int np1_flow_dealloc(int port_id) int fd; pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); fd = ai.ports[port_id].fd; @@ -1141,10 +1150,11 @@ int np1_flow_resp(int port_id) { int fd; - port_wait_assign(&ai.ports[port_id]); + if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED) + return -1; pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); + pthread_rwlock_rdlock(&ai.flows_lock); fd = ai.ports[port_id].fd; @@ -1211,66 +1221,78 @@ int ipcp_flow_req_arr(pid_t api, return -1; /* -ENOMOREFDS */ } - ai.flows[fd].tx_rb = NULL; - pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); recv_msg = send_recv_irm_msg(&msg); - if (recv_msg == NULL) + + pthread_rwlock_rdlock(&ai.data_lock); + pthread_rwlock_wrlock(&ai.flows_lock); + + if (recv_msg == NULL) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); return -EIRMD; + } if (!recv_msg->has_port_id || !recv_msg->has_api) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } if (recv_msg->has_result && recv_msg->result) { - irm_msg__free_unpacked(recv_msg, NULL); - return -1; + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); + return -1; } port_id = recv_msg->port_id; if (port_id < 0) { + ai.ports[fd].state = PORT_INIT; + pthread_rwlock_unlock(&ai.flows_lock); + pthread_rwlock_unlock(&ai.data_lock); irm_msg__free_unpacked(recv_msg, NULL); return -1; } - pthread_rwlock_rdlock(&ai.data_lock); - pthread_rwlock_wrlock(&ai.flows_lock); - ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id); if (ai.flows[fd].rx_rb == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id); if (ai.flows[fd].tx_rb == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].set = shm_flow_set_open(recv_msg->api); if (ai.flows[fd].set == NULL) { - irm_msg__free_unpacked(recv_msg, NULL); reset_flow(fd); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); + irm_msg__free_unpacked(recv_msg, NULL); return -1; } ai.flows[fd].port_id = port_id; - ai.flows[fd].oflags = FLOW_O_DEFAULT; + ai.flows[fd].oflags = FLOW_O_DEFAULT; ai.ports[port_id].fd = fd; - port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED); + port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); @@ -1390,19 +1412,20 @@ int ipcp_flow_write(int fd, int ipcp_flow_fini(int fd) { - struct shm_rbuff * rb; + struct shm_rbuff * rx_rb; flow_set_flags(fd, FLOW_O_WRONLY); pthread_rwlock_rdlock(&ai.data_lock); pthread_rwlock_rdlock(&ai.flows_lock); - rb = ai.flows[fd].rx_rb; + rx_rb = ai.flows[fd].rx_rb; pthread_rwlock_unlock(&ai.flows_lock); pthread_rwlock_unlock(&ai.data_lock); - shm_rbuff_fini(rb); + shm_rbuff_fini(rx_rb); + return 0; } |