summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-04-04 02:43:10 +0200
committerdimitri staessens <[email protected]>2017-04-04 09:27:02 +0200
commitf48008cdd28bf31e6f0646b1bb3786f0dc0aede0 (patch)
treeb68ec1c7f6ed81169cfd42f4cac1af6fe0bb65c6
parent0f30eaa3d4dd573f9af30a9fd0c5d22bad63c560 (diff)
downloadouroboros-f48008cdd28bf31e6f0646b1bb3786f0dc0aede0.tar.gz
ouroboros-f48008cdd28bf31e6f0646b1bb3786f0dc0aede0.zip
lib, irmd, ipcpd: Stabilize flow allocation
-rw-r--r--src/ipcpd/local/main.c78
-rw-r--r--src/irmd/api_table.c3
-rw-r--r--src/irmd/main.c69
-rw-r--r--src/irmd/registry.c61
-rw-r--r--src/irmd/registry.h1
-rw-r--r--src/lib/dev.c79
6 files changed, 186 insertions, 105 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c
index 718c8d7e..bb7f8325 100644
--- a/src/ipcpd/local/main.c
+++ b/src/ipcpd/local/main.c
@@ -85,7 +85,7 @@ static void * ipcp_local_sdu_loop(void * o)
(void) o;
- while (flow_event_wait(local_data.flows, local_data.fq, &timeout)) {
+ while (true) {
int fd;
ssize_t idx;
@@ -96,9 +96,14 @@ static void * ipcp_local_sdu_loop(void * o)
return (void *) 1; /* -ENOTENROLLED */
}
- pthread_rwlock_rdlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ flow_event_wait(local_data.flows, local_data.fq, &timeout);
while ((fd = fqueue_next(local_data.fq)) >= 0) {
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_rdlock(&local_data.lock);
+
idx = local_flow_read(fd);
assert(idx < (SHM_BUFFER_SIZE));
@@ -107,10 +112,11 @@ static void * ipcp_local_sdu_loop(void * o)
if (fd != -1)
local_flow_write(fd, idx);
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ pthread_rwlock_unlock(&local_data.lock);
}
- pthread_rwlock_unlock(&local_data.lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
}
return (void *) 0;
@@ -229,12 +235,6 @@ static int ipcp_local_flow_alloc(int fd,
assert(dst_name);
- out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube);
- if (out_fd < 0) {
- log_dbg("Flow allocation failed.");
- return -1;
- }
-
pthread_rwlock_rdlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_OPERATIONAL) {
@@ -243,16 +243,37 @@ static int ipcp_local_flow_alloc(int fd,
return -1; /* -ENOTENROLLED */
}
+ /*
+ * This function needs to return completely before
+ * flow_resp. Taking the wrlock on the data is the simplest
+ * way to achieve this.
+ */
+
pthread_rwlock_wrlock(&local_data.lock);
- local_data.in_out[fd] = out_fd;
- local_data.in_out[out_fd] = fd;
+ out_fd = ipcp_flow_req_arr(getpid(), dst_name, cube);
+ if (out_fd < 0) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ log_dbg("Flow allocation failed: %d", out_fd);
+ return -1;
+ }
- flow_set_add(local_data.flows, fd);
+ /*
+ * The idea of the port_wait_assign in dev.c was to do the
+ * above synchronisation. But if the lock is not taken, the
+ * resp() function may be called before a lock would be taken
+ * here. This shim will be deprecated, but ideally the sync is
+ * fixed in ipcp.c.
+ */
+
+ local_data.in_out[fd] = out_fd;
+ local_data.in_out[out_fd] = fd;
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ flow_set_add(local_data.flows, fd);
+
log_info("Pending local allocation request on fd %d.", fd);
return 0;
@@ -264,24 +285,30 @@ static int ipcp_local_flow_alloc_resp(int fd,
int out_fd = -1;
int ret = -1;
- if (response)
- return 0;
-
pthread_rwlock_rdlock(&ipcpi.state_lock);
- pthread_rwlock_rdlock(&local_data.lock);
+ pthread_rwlock_wrlock(&local_data.lock);
+
+ if (response) {
+ if (local_data.in_out[fd] != -1)
+ local_data.in_out[local_data.in_out[fd]] = fd;
+ local_data.in_out[fd] = -1;
+ pthread_rwlock_unlock(&local_data.lock);
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ return 0;
+ }
out_fd = local_data.in_out[fd];
- if (out_fd < 0) {
+ if (out_fd == -1) {
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- flow_set_add(local_data.flows, fd);
-
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ flow_set_add(local_data.flows, fd);
+
if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0)
return -1;
@@ -297,24 +324,17 @@ static int ipcp_local_flow_dealloc(int fd)
ipcp_flow_fini(fd);
pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- log_dbg("Won't register with non-enrolled IPCP.");
- return -1; /* -ENOTENROLLED */
- }
-
pthread_rwlock_wrlock(&local_data.lock);
flow_set_del(local_data.flows, fd);
local_data.in_out[fd] = -1;
- flow_dealloc(fd);
-
pthread_rwlock_unlock(&local_data.lock);
pthread_rwlock_unlock(&ipcpi.state_lock);
+ flow_dealloc(fd);
+
log_info("Flow with fd %d deallocated.", fd);
return 0;
diff --git a/src/irmd/api_table.c b/src/irmd/api_table.c
index 1c655004..5ff0fcf6 100644
--- a/src/irmd/api_table.c
+++ b/src/irmd/api_table.c
@@ -185,7 +185,8 @@ int api_entry_sleep(struct api_entry * e)
&dl);
if (e->state == API_DESTROY) {
- reg_entry_del_api(e->re, e->api);
+ if (e->re != NULL)
+ reg_entry_del_api(e->re, e->api);
ret = -1;
}
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 19d27bf9..e6647285 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1063,16 +1063,21 @@ static struct irm_flow * flow_accept(pid_t api)
api_n1 = f->n_1_api;
port_id = f->port_id;
- log_info("Flow on port_id %d allocated.", f->port_id);
-
pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_rdlock(&irmd->reg_lock);
e = api_table_get(&irmd->api_table, api);
if (e == NULL) {
pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ list_del(&f->next);
+ bmp_release(irmd->port_ids, f->port_id);
+ pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+ clear_irm_flow(f);
+ irm_flow_set_state(f, FLOW_NULL);
+ irm_flow_destroy(f);
log_dbg("Process gone while accepting flow.");
return NULL;
}
@@ -1085,8 +1090,15 @@ static struct irm_flow * flow_accept(pid_t api)
if (reg_entry_get_state(re) != REG_NAME_FLOW_ARRIVED) {
pthread_rwlock_unlock(&irmd->reg_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ list_del(&f->next);
+ bmp_release(irmd->port_ids, f->port_id);
+ pthread_rwlock_unlock(&irmd->flows_lock);
pthread_rwlock_unlock(&irmd->state_lock);
ipcp_flow_alloc_resp(api_n1, port_id, api_n, -1);
+ clear_irm_flow(f);
+ irm_flow_set_state(f, FLOW_NULL);
+ irm_flow_destroy(f);
log_err("Entry in wrong state.");
return NULL;
}
@@ -1097,12 +1109,22 @@ static struct irm_flow * flow_accept(pid_t api)
pthread_rwlock_unlock(&irmd->state_lock);
if (ipcp_flow_alloc_resp(api_n1, port_id, api_n, 0)) {
- log_dbg("Failed to respond to alloc.");
+ pthread_rwlock_rdlock(&irmd->state_lock);
+ pthread_rwlock_wrlock(&irmd->flows_lock);
+ list_del(&f->next);
+ pthread_rwlock_unlock(&irmd->flows_lock);
+ pthread_rwlock_unlock(&irmd->state_lock);
+ log_dbg("Failed to respond to alloc. Port_id invalidated.");
+ clear_irm_flow(f);
+ irm_flow_set_state(f, FLOW_NULL);
+ irm_flow_destroy(f);
return NULL;
}
irm_flow_set_state(f, FLOW_ALLOCATED);
+ log_info("Flow on port_id %d allocated.", f->port_id);
+
return f;
}
@@ -1157,17 +1179,9 @@ static struct irm_flow * flow_alloc(pid_t api,
assert(irm_flow_get_state(f) == FLOW_ALLOC_PENDING);
- if (ipcp_flow_alloc(ipcp, port_id, api,
- dst_name, cube) < 0) {
- pthread_rwlock_rdlock(&irmd->state_lock);
- pthread_rwlock_wrlock(&irmd->flows_lock);
- list_del(&f->next);
- clear_irm_flow(f);
- bmp_release(irmd->port_ids, f->port_id);
- pthread_rwlock_unlock(&irmd->flows_lock);
- pthread_rwlock_unlock(&irmd->state_lock);
- irm_flow_set_state(f, FLOW_NULL);
- irm_flow_destroy(f);
+ if (ipcp_flow_alloc(ipcp, port_id, api, dst_name, cube) < 0) {
+ /* sanitizer cleans this */
+ log_info("Failed to respond to alloc.");
return NULL;
}
@@ -1351,7 +1365,6 @@ static struct irm_flow * flow_req_arr(pid_t api,
pthread_rwlock_rdlock(&irmd->state_lock);
pthread_rwlock_wrlock(&irmd->reg_lock);
-
case REG_NAME_FLOW_ACCEPT:
h_api = reg_entry_get_api(re);
if (h_api == -1) {
@@ -1691,19 +1704,17 @@ void * irm_sanitize(void * o)
if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING
&& ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) {
- list_del(&f->next);
log_dbg("Pending port_id %d timed out.",
f->port_id);
- clear_irm_flow(f);
+ f->n_1_api = -1;
+ irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- bmp_release(irmd->port_ids, f->port_id);
- irm_flow_destroy(f);
continue;
}
if (kill(f->n_api, 0) < 0) {
struct shm_flow_set * set;
- log_dbg("AP-I %d gone, flow %d deallocated.",
+ log_dbg("AP-I %d gone, deallocating flow %d.",
f->n_api, f->port_id);
set = shm_flow_set_open(f->n_api);
if (set != NULL)
@@ -1711,22 +1722,18 @@ void * irm_sanitize(void * o)
f->n_api = -1;
irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
ipcp_flow_dealloc(f->n_1_api, f->port_id);
- clear_irm_flow(f);
continue;
}
if (kill(f->n_1_api, 0) < 0) {
struct shm_flow_set * set;
- list_del(&f->next);
log_err("IPCP %d gone, flow %d removed.",
f->n_1_api, f->port_id);
set = shm_flow_set_open(f->n_api);
if (set != NULL)
shm_flow_set_destroy(set);
-
- clear_irm_flow(f);
- bmp_release(irmd->port_ids, f->port_id);
- irm_flow_destroy(f);
+ f->n_1_api = -1;
+ irm_flow_set_state(f, FLOW_DEALLOC_PENDING);
}
}
@@ -2087,6 +2094,8 @@ static int irm_create(void)
if (irmd == NULL)
return -ENOMEM;
+ memset(irmd, 0, sizeof(*irmd));
+
memset(&st, 0, sizeof(st));
irmd->state = IRMD_NULL;
@@ -2158,7 +2167,10 @@ static int irm_create(void)
if ((irmd->lf = lockfile_create()) == NULL) {
if ((irmd->lf = lockfile_open()) == NULL) {
log_err("Lockfile error.");
- irm_destroy();
+ free(irmd->threadpool);
+ bmp_destroy(irmd->thread_ids);
+ bmp_destroy(irmd->port_ids);
+ free(irmd);
return -1;
}
@@ -2172,6 +2184,9 @@ static int irm_create(void)
log_info("IRMd already running (%d), exiting.",
lockfile_owner(irmd->lf));
lockfile_close(irmd->lf);
+ free(irmd->threadpool);
+ bmp_destroy(irmd->thread_ids);
+ bmp_destroy(irmd->port_ids);
free(irmd);
return -1;
}
diff --git a/src/irmd/registry.c b/src/irmd/registry.c
index 2043ca46..53be77cd 100644
--- a/src/irmd/registry.c
+++ b/src/irmd/registry.c
@@ -37,6 +37,7 @@
#include <signal.h>
#include <unistd.h>
#include <limits.h>
+#include <assert.h>
struct reg_dif {
struct list_head next;
@@ -121,6 +122,12 @@ static void reg_entry_destroy(struct reg_entry * e)
if (e->name != NULL)
free(e->name);
+ list_for_each_safe(p, h, &e->reg_apis) {
+ struct pid_el * pe = list_entry(p, struct pid_el, next);
+ list_del(&pe->next);
+ free(pe);
+ }
+
list_for_each_safe(p, h, &e->reg_apns) {
struct str_el * a = list_entry(p, struct str_el, next);
list_del(&a->next);
@@ -224,9 +231,13 @@ int reg_entry_add_apn(struct reg_entry * e,
list_add(&n->next, &e->reg_apns);
+ pthread_mutex_lock(&e->state_lock);
+
if (e->state == REG_NAME_IDLE)
e->state = REG_NAME_AUTO_ACCEPT;
+ pthread_mutex_unlock(&e->state_lock);
+
return 0;
}
@@ -245,11 +256,14 @@ void reg_entry_del_apn(struct reg_entry * e,
}
}
+ pthread_mutex_lock(&e->state_lock);
+
if (e->state == REG_NAME_AUTO_ACCEPT && list_is_empty(&e->reg_apns)) {
e->state = REG_NAME_IDLE;
pthread_cond_broadcast(&e->state_cond);
}
+ pthread_mutex_unlock(&e->state_lock);
}
char * reg_entry_get_apn(struct reg_entry * e)
@@ -279,27 +293,29 @@ int reg_entry_add_api(struct reg_entry * e,
{
struct pid_el * i;
- if (e == NULL)
- return -EINVAL;
+ assert(e);
if (reg_entry_has_api(e, api)) {
log_dbg("Instance already registered with this name.");
return -EPERM;
}
+ pthread_mutex_lock(&e->state_lock);
+
if (e->state == REG_NAME_NULL) {
+ pthread_mutex_unlock(&e->state_lock);
log_dbg("Tried to add instance in NULL state.");
return -EPERM;
}
i = malloc(sizeof(*i));
- if (i == NULL)
+ if (i == NULL) {
+ pthread_mutex_unlock(&e->state_lock);
return -ENOMEM;
+ }
i->pid = api;
- pthread_mutex_lock(&e->state_lock);
-
list_add(&i->next, &e->reg_apis);
if (e->state == REG_NAME_IDLE ||
@@ -316,6 +332,8 @@ int reg_entry_add_api(struct reg_entry * e,
static void reg_entry_check_state(struct reg_entry * e)
{
+ assert(e);
+
if (e->state == REG_NAME_DESTROY) {
e->state = REG_NAME_NULL;
pthread_cond_broadcast(&e->state_cond);
@@ -337,6 +355,9 @@ static void reg_entry_check_state(struct reg_entry * e)
void reg_entry_del_pid_el(struct reg_entry * e,
struct pid_el * p)
{
+ assert(e);
+ assert(p);
+
list_del(&p->next);
free(p);
@@ -349,6 +370,8 @@ void reg_entry_del_api(struct reg_entry * e,
struct list_head * p;
struct list_head * h;
+ assert(e);
+
if (e == NULL)
return;
@@ -378,8 +401,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e)
{
enum reg_name_state state;
- if (e == NULL)
- return REG_NAME_NULL;
+ assert(e);
pthread_mutex_lock(&e->state_lock);
@@ -393,8 +415,7 @@ enum reg_name_state reg_entry_get_state(struct reg_entry * e)
int reg_entry_set_state(struct reg_entry * e,
enum reg_name_state state)
{
- if (state == REG_NAME_DESTROY)
- return -EPERM;
+ assert(state != REG_NAME_DESTROY);
pthread_mutex_lock(&e->state_lock);
@@ -413,8 +434,8 @@ int reg_entry_leave_state(struct reg_entry * e,
struct timespec abstime;
int ret = 0;
- if (e == NULL || state == REG_NAME_DESTROY)
- return -EINVAL;
+ assert(e);
+ assert(state != REG_NAME_DESTROY);
if (timeout != NULL) {
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
@@ -450,8 +471,8 @@ int reg_entry_wait_state(struct reg_entry * e,
struct timespec abstime;
int ret = 0;
- if (e == NULL || state == REG_NAME_DESTROY)
- return -EINVAL;
+ assert(e);
+ assert(state != REG_NAME_DESTROY);
if (timeout != NULL) {
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
@@ -487,6 +508,8 @@ struct reg_entry * registry_get_entry(struct list_head * registry,
{
struct list_head * p = NULL;
+ assert(registry);
+
list_for_each(p, registry) {
struct reg_entry * e = list_entry(p, struct reg_entry, next);
if (!wildcard_match(name, e->name))
@@ -501,8 +524,8 @@ struct reg_entry * registry_add_name(struct list_head * registry,
{
struct reg_entry * e = NULL;
- if (name == NULL)
- return NULL;
+ assert(registry);
+ assert(name);
if (registry_has_name(registry, name)) {
log_dbg("Name %s already registered.", name);
@@ -545,12 +568,13 @@ void registry_del_api(struct list_head * registry,
{
struct list_head * p;
- if ( api == -1)
- return;
+ assert(registry);
+ assert(api > 0);
list_for_each(p, registry) {
struct reg_entry * e = list_entry(p, struct reg_entry, next);
pthread_mutex_lock(&e->state_lock);
+ assert(e);
reg_entry_del_api(e, api);
pthread_mutex_unlock(&e->state_lock);
}
@@ -586,8 +610,7 @@ void registry_destroy(struct list_head * registry)
struct list_head * p = NULL;
struct list_head * h = NULL;
- if (registry == NULL)
- return;
+ assert(registry);
list_for_each_safe(p, h, registry) {
struct reg_entry * e = list_entry(p, struct reg_entry, next);
diff --git a/src/irmd/registry.h b/src/irmd/registry.h
index 2c766732..08e78019 100644
--- a/src/irmd/registry.h
+++ b/src/irmd/registry.h
@@ -63,7 +63,6 @@ struct reg_entry {
struct list_head reg_apis;
enum reg_name_state state;
- qoscube_t qos;
pthread_cond_t state_cond;
pthread_mutex_t state_lock;
};
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 5acbada2..c063fd47 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -49,6 +49,7 @@ struct fqueue {
enum port_state {
PORT_NULL = 0,
+ PORT_INIT,
PORT_ID_PENDING,
PORT_ID_ASSIGNED,
PORT_DESTROY
@@ -82,7 +83,7 @@ static void port_destroy(struct port * p)
pthread_cond_wait(&p->state_cond, &p->state_lock);
p->fd = -1;
- p->state = PORT_ID_PENDING;
+ p->state = PORT_INIT;
pthread_mutex_unlock(&p->state_lock);
}
@@ -109,11 +110,14 @@ static enum port_state port_wait_assign(struct port * p)
pthread_mutex_lock(&p->state_lock);
- if (p->state != PORT_ID_PENDING) {
+ if (p->state == PORT_ID_ASSIGNED) {
pthread_mutex_unlock(&p->state_lock);
- return -1;
+ return PORT_ID_ASSIGNED;
}
+ if(p->state == PORT_INIT)
+ p->state = PORT_ID_PENDING;
+
while (p->state == PORT_ID_PENDING)
pthread_cond_wait(&p->state_cond, &p->state_lock);
@@ -124,6 +128,8 @@ static enum port_state port_wait_assign(struct port * p)
state = p->state;
+ assert(state != PORT_INIT);
+
pthread_mutex_unlock(&p->state_lock);
return state;
@@ -237,7 +243,6 @@ static void reset_flow(int fd)
shm_flow_set_close(ai.flows[fd].set);
init_flow(fd);
-
}
int ap_init(const char * ap_name)
@@ -319,7 +324,7 @@ int ap_init(const char * ap_name)
}
for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
- ai.ports[i].state = PORT_ID_PENDING;
+ ai.ports[i].state = PORT_INIT;
pthread_mutex_init(&ai.ports[i].state_lock, NULL);
pthread_cond_init(&ai.ports[i].state_cond, NULL);
}
@@ -354,8 +359,7 @@ void ap_fini()
ssize_t idx;
while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0)
shm_rdrbuff_remove(ai.rdrb, idx);
- port_set_state(&ai.ports[ai.flows[i].port_id],
- PORT_NULL);
+ port_destroy(&ai.ports[ai.flows[i].port_id]);
reset_flow(i);
}
}
@@ -459,6 +463,8 @@ int flow_accept(qosspec_t * qs,
ai.flows[fd].api = recv_msg->api;
ai.flows[fd].cube = recv_msg->qoscube;
+ assert(ai.ports[ai.flows[fd].port_id].state == PORT_INIT);
+
if (qs != NULL)
fill_qosspec(qs, ai.flows[fd].cube);
@@ -555,6 +561,8 @@ int flow_alloc(const char * dst_name,
ai.flows[fd].api = recv_msg->api;
ai.flows[fd].cube = recv_msg->qoscube;
+ assert(ai.ports[recv_msg->port_id].state == PORT_INIT);
+
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
@@ -600,6 +608,7 @@ int flow_dealloc(int fd)
if (!recv_msg->has_result) {
irm_msg__free_unpacked(recv_msg, NULL);
+ assert(false);
return -1;
}
@@ -1113,8 +1122,8 @@ int np1_flow_alloc(pid_t n_api,
ai.flows[fd].oflags = FLOW_O_DEFAULT;
ai.flows[fd].api = n_api;
- ai.ports[port_id].fd = fd;
- port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+ ai.ports[port_id].fd = fd;
+ ai.ports[port_id].state = PORT_ID_ASSIGNED;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1127,7 +1136,7 @@ int np1_flow_dealloc(int port_id)
int fd;
pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
fd = ai.ports[port_id].fd;
@@ -1141,10 +1150,11 @@ int np1_flow_resp(int port_id)
{
int fd;
- port_wait_assign(&ai.ports[port_id]);
+ if (port_wait_assign(&ai.ports[port_id]) != PORT_ID_ASSIGNED)
+ return -1;
pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.flows_lock);
fd = ai.ports[port_id].fd;
@@ -1211,66 +1221,78 @@ int ipcp_flow_req_arr(pid_t api,
return -1; /* -ENOMOREFDS */
}
- ai.flows[fd].tx_rb = NULL;
-
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
recv_msg = send_recv_irm_msg(&msg);
- if (recv_msg == NULL)
+
+ pthread_rwlock_rdlock(&ai.data_lock);
+ pthread_rwlock_wrlock(&ai.flows_lock);
+
+ if (recv_msg == NULL) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
return -EIRMD;
+ }
if (!recv_msg->has_port_id || !recv_msg->has_api) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
if (recv_msg->has_result && recv_msg->result) {
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
+ return -1;
}
port_id = recv_msg->port_id;
if (port_id < 0) {
+ ai.ports[fd].state = PORT_INIT;
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
- pthread_rwlock_rdlock(&ai.data_lock);
- pthread_rwlock_wrlock(&ai.flows_lock);
-
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
if (ai.flows[fd].rx_rb == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].tx_rb = shm_rbuff_open(recv_msg->api, port_id);
if (ai.flows[fd].tx_rb == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].set = shm_flow_set_open(recv_msg->api);
if (ai.flows[fd].set == NULL) {
- irm_msg__free_unpacked(recv_msg, NULL);
reset_flow(fd);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
ai.flows[fd].port_id = port_id;
- ai.flows[fd].oflags = FLOW_O_DEFAULT;
+ ai.flows[fd].oflags = FLOW_O_DEFAULT;
ai.ports[port_id].fd = fd;
- port_set_state(&(ai.ports[port_id]), PORT_ID_ASSIGNED);
+ port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1390,19 +1412,20 @@ int ipcp_flow_write(int fd,
int ipcp_flow_fini(int fd)
{
- struct shm_rbuff * rb;
+ struct shm_rbuff * rx_rb;
flow_set_flags(fd, FLOW_O_WRONLY);
pthread_rwlock_rdlock(&ai.data_lock);
pthread_rwlock_rdlock(&ai.flows_lock);
- rb = ai.flows[fd].rx_rb;
+ rx_rb = ai.flows[fd].rx_rb;
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- shm_rbuff_fini(rb);
+ shm_rbuff_fini(rx_rb);
+
return 0;
}