summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2017-08-13 11:29:46 +0200
committerdimitri staessens <[email protected]>2017-08-13 11:29:46 +0200
commit65f5aa30dc5cb6a284785835b0b14e5b19dfaa9a (patch)
tree4660fa2f05279cf5388ca2b4eb07f67d5c452ea1 /src/lib/dev.c
parent9b7f03ae5e1ac92c5a33979067c31a46468c35e1 (diff)
downloadouroboros-65f5aa30dc5cb6a284785835b0b14e5b19dfaa9a.tar.gz
ouroboros-65f5aa30dc5cb6a284785835b0b14e5b19dfaa9a.zip
lib: Fix data race on fqueues bitmap
This locks the process when allocating and destroying flow_sets. The flows_lock has been renamed to lock. Refactors and fixes a memleak in ouroboros_init.
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c269
1 files changed, 143 insertions, 126 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index c8e43778..daa41a23 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -95,7 +95,7 @@ struct {
struct flow * flows;
struct port * ports;
- pthread_rwlock_t flows_lock;
+ pthread_rwlock_t lock;
} ai;
static void port_destroy(struct port * p)
@@ -144,11 +144,11 @@ static enum port_state port_wait_assign(int port_id)
enum port_state state;
struct port * p;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
p = &ai.ports[port_id];
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
pthread_mutex_lock(&p->state_lock);
@@ -240,18 +240,18 @@ static int flow_init(int port_id,
{
int fd;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
fd = bmp_allocate(ai.fds);
if (!bmp_is_id_valid(ai.fds, fd)) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -EBADF;
}
ai.flows[fd].rx_rb = shm_rbuff_open(ai.api, port_id);
if (ai.flows[fd].rx_rb == NULL) {
bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOMEM;
}
@@ -259,7 +259,7 @@ static int flow_init(int port_id,
if (ai.flows[fd].tx_rb == NULL) {
flow_fini(fd);
bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOMEM;
}
@@ -267,7 +267,7 @@ static int flow_init(int port_id,
if (ai.flows[fd].set == NULL) {
flow_fini(fd);
bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOMEM;
}
@@ -281,14 +281,15 @@ static int flow_init(int port_id,
port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
int ouroboros_init(const char * ap_name)
{
- int i = 0;
+ int i = 0;
+ int ret = -ENOMEM;
assert(ai.ap_name == NULL);
@@ -296,82 +297,87 @@ int ouroboros_init(const char * ap_name)
ai.fds = bmp_create(AP_MAX_FLOWS - AP_RES_FDS, AP_RES_FDS);
if (ai.fds == NULL)
- return -ENOMEM;
+ goto fail_fds;
ai.fqueues = bmp_create(AP_MAX_FQUEUES, 0);
- if (ai.fqueues == NULL) {
- bmp_destroy(ai.fds);
- return -ENOMEM;
- }
+ if (ai.fqueues == NULL)
+ goto fail_fqueues;
ai.fqset = shm_flow_set_create();
- if (ai.fqset == NULL) {
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -ENOMEM;
- }
+ if (ai.fqset == NULL)
+ goto fail_fqset;
ai.rdrb = shm_rdrbuff_open();
if (ai.rdrb == NULL) {
- shm_flow_set_destroy(ai.fqset);
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -EIRMD;
+ ret = -EIRMD;
+ goto fail_rdrb;
}
ai.flows = malloc(sizeof(*ai.flows) * AP_MAX_FLOWS);
- if (ai.flows == NULL) {
- shm_rdrbuff_close(ai.rdrb);
- shm_flow_set_destroy(ai.fqset);
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -ENOMEM;
- }
+ if (ai.flows == NULL)
+ goto fail_flows;
for (i = 0; i < AP_MAX_FLOWS; ++i)
flow_clear(i);
ai.ports = malloc(sizeof(*ai.ports) * IRMD_MAX_FLOWS);
- if (ai.ports == NULL) {
- free(ai.flows);
- shm_rdrbuff_close(ai.rdrb);
- shm_flow_set_destroy(ai.fqset);
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -ENOMEM;
- }
+ if (ai.ports == NULL)
+ goto fail_ports;
if (ap_name != NULL) {
ai.ap_name = strdup(path_strip((char *) ap_name));
- if (ai.ap_name == NULL) {
- free(ai.flows);
- shm_rdrbuff_close(ai.rdrb);
- shm_flow_set_destroy(ai.fqset);
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -ENOMEM;
- }
+ if (ai.ap_name == NULL)
+ goto fail_ap_name;
if (api_announce((char *) ai.ap_name)) {
- free(ai.ap_name);
- free(ai.flows);
- shm_rdrbuff_close(ai.rdrb);
- shm_flow_set_destroy(ai.fqset);
- bmp_destroy(ai.fqueues);
- bmp_destroy(ai.fds);
- return -EIRMD;
+ ret = -EIRMD;
+ goto fail_announce;
}
}
for (i = 0; i < IRMD_MAX_FLOWS; ++i) {
ai.ports[i].state = PORT_INIT;
- pthread_mutex_init(&ai.ports[i].state_lock, NULL);
- pthread_cond_init(&ai.ports[i].state_cond, NULL);
+ if (pthread_mutex_init(&ai.ports[i].state_lock, NULL)) {
+ int j;
+ for (j = 0; j < i; ++j)
+ pthread_mutex_destroy(&ai.ports[j].state_lock);
+ goto fail_announce;
+ }
+ if (pthread_cond_init(&ai.ports[i].state_cond, NULL)) {
+ int j;
+ for (j = 0; j < i; ++j)
+ pthread_cond_destroy(&ai.ports[j].state_cond);
+ goto fail_state_cond;
+ }
}
- pthread_rwlock_init(&ai.flows_lock, NULL);
+ if (pthread_rwlock_init(&ai.lock, NULL))
+ goto fail_lock;
return 0;
+
+ fail_lock:
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ pthread_cond_destroy(&ai.ports[i].state_cond);
+ fail_state_cond:
+ for (i = 0; i < IRMD_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&ai.ports[i].state_lock);
+ fail_announce:
+ free(ai.ap_name);
+ fail_ap_name:
+ free(ai.ports);
+ fail_ports:
+ free(ai.flows);
+ fail_flows:
+ shm_rdrbuff_close(ai.rdrb);
+ fail_rdrb:
+ shm_flow_set_destroy(ai.fqset);
+ fail_fqset:
+ bmp_destroy(ai.fqueues);
+ fail_fqueues:
+ bmp_destroy(ai.fds);
+ fail_fds:
+ return ret;
}
void ouroboros_fini()
@@ -386,7 +392,7 @@ void ouroboros_fini()
if (ai.ap_name != NULL)
free(ai.ap_name);
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
for (i = 0; i < AP_MAX_FLOWS; ++i) {
if (ai.flows[i].port_id != -1) {
@@ -407,9 +413,9 @@ void ouroboros_fini()
free(ai.flows);
free(ai.ports);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
- pthread_rwlock_destroy(&ai.flows_lock);
+ pthread_rwlock_destroy(&ai.lock);
}
int flow_accept(qosspec_t * qs,
@@ -531,13 +537,13 @@ int flow_dealloc(int fd)
msg.has_api = true;
msg.api = ai.api;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
assert(!(ai.flows[fd].port_id < 0));
msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
@@ -551,12 +557,12 @@ int flow_dealloc(int fd)
irm_msg__free_unpacked(recv_msg, NULL);
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
flow_fini(fd);
bmp_release(ai.fds, fd);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -569,10 +575,10 @@ int flow_set_flags(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -584,7 +590,7 @@ int flow_set_flags(int fd,
if (flags & FLOW_O_RDWR)
shm_rbuff_unblock(ai.flows[fd].rx_rb);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return old;
}
@@ -596,16 +602,16 @@ int flow_get_flags(int fd)
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
old = ai.flows[fd].oflags;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return old;
}
@@ -618,10 +624,10 @@ int flow_get_timeout(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS || timeo == NULL)
return -EINVAL;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -630,7 +636,7 @@ int flow_get_timeout(int fd,
else
ret = -EPERM;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return ret;
}
@@ -641,10 +647,10 @@ int flow_set_timeout(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EINVAL;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -655,7 +661,7 @@ int flow_set_timeout(int fd,
ai.flows[fd].rcv_timeo = *timeo;
}
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -666,16 +672,16 @@ int flow_get_qosspec(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS || qs == NULL)
return -EINVAL;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
*qs = ai.flows[fd].spec;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -692,15 +698,15 @@ ssize_t flow_write(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
if ((ai.flows[fd].oflags & FLOW_O_ACCMODE) == FLOW_O_RDONLY) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
@@ -711,20 +717,20 @@ ssize_t flow_write(int fd,
buf,
count);
if (idx < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return idx;
}
if (shm_rbuff_write(ai.flows[fd].tx_rb, idx) < 0) {
shm_rdrbuff_remove(ai.rdrb, idx);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
struct shm_rbuff * tx_rb = ai.flows[fd].tx_rb;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
assert(tx_rb);
@@ -739,12 +745,12 @@ ssize_t flow_write(int fd,
return -ENOTALLOC;
}
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
}
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -760,22 +766,22 @@ ssize_t flow_read(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS)
return -EBADF;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
if (ai.flows[fd].oflags & FLOW_O_NONBLOCK) {
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
} else {
struct shm_rbuff * rb = ai.flows[fd].rx_rb;
bool timeo = ai.flows[fd].timesout;
struct timespec timeout = ai.flows[fd].rcv_timeo;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
if (timeo)
idx = shm_rbuff_read_b(rb, &timeout);
@@ -809,12 +815,17 @@ struct flow_set * flow_set_create()
assert(ai.fqueues);
+ pthread_rwlock_wrlock(&ai.lock);
+
set->idx = bmp_allocate(ai.fqueues);
if (!bmp_is_id_valid(ai.fqueues, set->idx)) {
+ pthread_rwlock_unlock(&ai.lock);
free(set);
return NULL;
}
+ pthread_rwlock_unlock(&ai.lock);
+
return set;
}
@@ -824,7 +835,13 @@ void flow_set_destroy(struct flow_set * set)
return;
flow_set_zero(set);
+
+ pthread_rwlock_wrlock(&ai.lock);
+
bmp_release(ai.fqueues, set->idx);
+
+ pthread_rwlock_unlock(&ai.lock);
+
free(set);
}
@@ -867,7 +884,7 @@ int flow_set_add(struct flow_set * set,
if (set == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id);
@@ -875,7 +892,7 @@ int flow_set_add(struct flow_set * set,
for (i = 0; i < sdus; i++)
shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return ret;
}
@@ -886,12 +903,12 @@ void flow_set_del(struct flow_set * set,
if (set == NULL)
return;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id >= 0)
shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
}
bool flow_set_has(const struct flow_set * set,
@@ -902,16 +919,16 @@ bool flow_set_has(const struct flow_set * set,
if (set == NULL || fd < 0)
return false;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return false;
}
ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return ret;
}
@@ -926,11 +943,11 @@ int fqueue_next(struct fqueue * fq)
if (fq->fqsize == 0)
return -EPERM;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[fq->fqueue[fq->next++]].fd;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
if (fq->next == fq->fqsize) {
fq->fqsize = 0;
@@ -980,11 +997,11 @@ int np1_flow_dealloc(int port_id)
{
int fd;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[port_id].fd;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
@@ -996,11 +1013,11 @@ int np1_flow_resp(int port_id)
if (port_wait_assign(port_id) != PORT_ID_ASSIGNED)
return -1;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
fd = ai.ports[port_id].fd;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
@@ -1086,11 +1103,11 @@ int ipcp_flow_alloc_reply(int fd,
msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY;
msg.has_port_id = true;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
msg.port_id = ai.flows[fd].port_id;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
msg.has_response = true;
msg.response = response;
@@ -1121,16 +1138,16 @@ int ipcp_flow_read(int fd,
assert(fd >=0);
assert(sdb);
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if ((port_id = ai.flows[fd].port_id) < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
rb = ai.flows[fd].rx_rb;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
idx = shm_rbuff_read(rb);
if (idx < 0)
@@ -1149,15 +1166,15 @@ int ipcp_flow_write(int fd,
if (sdb == NULL)
return -EINVAL;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
if ((ai.flows[fd].oflags & FLOW_O_ACCMODE) == FLOW_O_RDONLY) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -EPERM;
}
@@ -1168,7 +1185,7 @@ int ipcp_flow_write(int fd,
shm_rbuff_write(ai.flows[fd].tx_rb, idx);
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -1201,11 +1218,11 @@ void ipcp_flow_fini(int fd)
flow_set_flags(fd, FLOW_O_WRONLY);
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
rx_rb = ai.flows[fd].rx_rb;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
shm_rbuff_fini(rx_rb);
}
@@ -1216,16 +1233,16 @@ int ipcp_flow_get_qoscube(int fd,
if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)
return -EINVAL;
- pthread_rwlock_wrlock(&ai.flows_lock);
+ pthread_rwlock_wrlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
*cube = ai.flows[fd].cube;
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -1236,11 +1253,11 @@ ssize_t local_flow_read(int fd)
assert(fd >= 0);
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
ret = shm_rbuff_read(ai.flows[fd].rx_rb);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return ret;
}
@@ -1251,10 +1268,10 @@ int local_flow_write(int fd,
if (fd < 0)
return -EINVAL;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
if (ai.flows[fd].port_id < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
@@ -1262,7 +1279,7 @@ int local_flow_write(int fd,
shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}
@@ -1272,19 +1289,19 @@ int ipcp_read_shim(int fd,
{
ssize_t idx;
- pthread_rwlock_rdlock(&ai.flows_lock);
+ pthread_rwlock_rdlock(&ai.lock);
assert(ai.flows[fd].rx_rb);
idx = shm_rbuff_read(ai.flows[fd].rx_rb);
if (idx < 0) {
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return -EAGAIN;
}
*sdb = shm_rdrbuff_get(ai.rdrb, idx);
- pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.lock);
return 0;
}