summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-05-14 21:10:08 +0200
committerdimitri staessens <[email protected]>2016-05-14 21:10:08 +0200
commite82a5c416673e1eb2fbafb1deaaa8ac07971215e (patch)
tree57ace5cb356ad7a764670b594912cc310e140bdf
parentaf4097f51b7bd4f37212c2d49f0596779d79c36e (diff)
downloadouroboros-e82a5c416673e1eb2fbafb1deaaa8ac07971215e.tar.gz
ouroboros-e82a5c416673e1eb2fbafb1deaaa8ac07971215e.zip
lib: dev.c: added locking
Locking is required for multi-threaded applications. Flows are locked separately. Read/Write locks are used for concurrent reads.
-rw-r--r--src/lib/dev.c212
-rw-r--r--src/lib/shm_ap_rbuff.c4
2 files changed, 179 insertions, 37 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index ae27a05f..440f40f9 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -31,6 +31,7 @@
#include <ouroboros/shm_du_map.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
+#include <ouroboros/rw_lock.h>
#include <stdlib.h>
#include <string.h>
@@ -47,9 +48,11 @@ struct ap_data {
instance_name_t * api;
struct shm_du_map * dum;
struct bmp * fds;
-
struct shm_ap_rbuff * rb;
+ rw_lock_t data_lock;
+
struct flow flows[AP_MAX_FLOWS];
+ rw_lock_t flows_lock;
} * _ap_instance;
int ap_init(char * ap_name)
@@ -92,14 +95,19 @@ int ap_init(char * ap_name)
_ap_instance->rb = shm_ap_rbuff_create();
if (_ap_instance->rb == NULL) {
instance_name_destroy(_ap_instance->api);
+ shm_du_map_close(_ap_instance->dum);
bmp_destroy(_ap_instance->fds);
free(_ap_instance);
return -1;
}
- for (i = 0; i < AP_MAX_FLOWS; ++i)
+ for (i = 0; i < AP_MAX_FLOWS; ++i) {
_ap_instance->flows[i].rb = NULL;
+ _ap_instance->flows[i].port_id = -1;
+ }
+ rw_lock_init(&_ap_instance->flows_lock);
+ rw_lock_init(&_ap_instance->data_lock);
return 0;
}
@@ -110,6 +118,9 @@ void ap_fini(void)
if (_ap_instance == NULL)
return;
+
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
if (_ap_instance->api != NULL)
instance_name_destroy(_ap_instance->api);
if (_ap_instance->fds != NULL)
@@ -122,6 +133,8 @@ void ap_fini(void)
if (_ap_instance->flows[i].rb != NULL)
shm_ap_rbuff_close(_ap_instance->flows[i].rb);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
free(_ap_instance);
}
@@ -142,7 +155,7 @@ int ap_reg(char ** difs,
{
irm_msg_t msg = IRM_MSG__INIT;
irm_msg_t * recv_msg = NULL;
- int fd = bmp_allocate(_ap_instance->fds);
+ int fd = -1;
if (difs == NULL ||
len == 0 ||
@@ -157,11 +170,16 @@ int ap_reg(char ** difs,
msg.code = IRM_MSG_CODE__IRM_AP_REG;
msg.has_pid = true;
- msg.pid = _ap_instance->api->id;
- msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
msg.n_dif_name = len;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -176,6 +194,12 @@ int ap_reg(char ** difs,
irm_msg__free_unpacked(recv_msg, NULL);
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ fd = bmp_allocate(_ap_instance->fds);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return fd;
}
@@ -194,11 +218,16 @@ int ap_unreg(char ** difs,
msg.code = IRM_MSG_CODE__IRM_AP_UNREG;
msg.has_pid = true;
- msg.pid = _ap_instance->api->id;
- msg.ap_name = _ap_instance->api->name;
msg.dif_name = difs;
msg.n_dif_name = len;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ msg.pid = _ap_instance->api->id;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -224,8 +253,13 @@ int flow_accept(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ACCEPT;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -235,18 +269,8 @@ int flow_accept(int fd,
return -1;
}
- cfd = bmp_allocate(_ap_instance->fds);
-
- _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
- if (_ap_instance->flows[cfd].rb == NULL) {
- bmp_release(_ap_instance->fds, cfd);
- irm_msg__free_unpacked(recv_msg, NULL);
- return -1;
- }
-
*ap_name = strdup(recv_msg->ap_name);
if (*ap_name == NULL) {
- bmp_release(_ap_instance->fds, cfd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -254,21 +278,46 @@ int flow_accept(int fd,
if (ae_name != NULL) {
*ae_name = strdup(recv_msg->ae_name);
if (*ae_name == NULL) {
- bmp_release(_ap_instance->fds, cfd);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
}
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ cfd = bmp_allocate(_ap_instance->fds);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ _ap_instance->flows[cfd].rb = shm_ap_rbuff_open(recv_msg->pid);
+ if (_ap_instance->flows[cfd].rb == NULL) {
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
+ bmp_release(_ap_instance->fds, cfd);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ irm_msg__free_unpacked(recv_msg, NULL);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ return -1;
+ }
+
_ap_instance->flows[cfd].port_id = recv_msg->port_id;
_ap_instance->flows[cfd].oflags = FLOW_O_DEFAULT;
-
+ rw_lock_unlock(&_ap_instance->flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
bmp_release(_ap_instance->fds, fd);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return cfd;
}
@@ -281,9 +330,21 @@ int flow_alloc_resp(int fd,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RESP;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
msg.has_port_id = true;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
msg.has_response = true;
msg.response = response;
@@ -318,10 +379,15 @@ int flow_alloc(char * dst_name,
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC;
msg.dst_name = dst_name;
- msg.ap_name = _ap_instance->api->name;
+ msg.ae_name = src_ae_name;
msg.has_pid = true;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.pid = _ap_instance->api->id;
- msg.ae_name = src_ae_name;
+ msg.ap_name = _ap_instance->api->name;
+
+ rw_lock_unlock(&_ap_instance->data_lock);
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
@@ -332,11 +398,23 @@ int flow_alloc(char * dst_name,
return -1;
}
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
fd = bmp_allocate(_ap_instance->fds);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
_ap_instance->flows[fd].rb = shm_ap_rbuff_open(recv_msg->pid);
if (_ap_instance->flows[fd].rb == NULL) {
+ rw_lock_wrlock(&_ap_instance->data_lock);
+
bmp_release(_ap_instance->fds, fd);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
irm_msg__free_unpacked(recv_msg, NULL);
return -1;
}
@@ -344,6 +422,8 @@ int flow_alloc(char * dst_name,
_ap_instance->flows[fd].port_id = recv_msg->port_id;
_ap_instance->flows[fd].oflags = FLOW_O_DEFAULT;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
@@ -357,8 +437,13 @@ int flow_alloc_res(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_ALLOC_RES;
msg.has_port_id = true;
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -382,8 +467,14 @@ int flow_dealloc(int fd)
msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
msg.has_port_id = true;
+
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
msg.port_id = _ap_instance->flows[fd].port_id;
+ rw_lock_unlock(&_ap_instance->data_lock);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;
@@ -401,42 +492,77 @@ int flow_dealloc(int fd)
int flow_cntl(int fd, int cmd, int oflags)
{
- int old = _ap_instance->flows[fd].oflags;
+ int old;
+
+ rw_lock_wrlock(&_ap_instance->flows_lock);
+
+ old = _ap_instance->flows[fd].oflags;
+
switch (cmd) {
case FLOW_F_GETFL: /* GET FLOW FLAGS */
- return _ap_instance->flows[fd].oflags;
+ rw_lock_unlock(&_ap_instance->flows_lock);
+ return old;
case FLOW_F_SETFL: /* SET FLOW FLAGS */
_ap_instance->flows[fd].oflags = oflags;
+ rw_lock_unlock(&_ap_instance->flows_lock);
return old;
default:
+ rw_lock_unlock(&_ap_instance->flows_lock);
return FLOW_O_INVALID; /* unknown command */
}
}
ssize_t flow_write(int fd, void * buf, size_t count)
{
- size_t index = shm_create_du_buff(_ap_instance->dum,
- count + DU_BUFF_HEADSPACE +
- DU_BUFF_TAILSPACE,
- DU_BUFF_HEADSPACE,
- (uint8_t *) buf,
- count);
- struct rb_entry e = {index, _ap_instance->flows[fd].port_id};
- if (index == -1)
+ size_t index;
+ struct rb_entry e;
+
+ if (buf == NULL)
+ return 0;
+
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ index = shm_create_du_buff(_ap_instance->dum,
+ count + DU_BUFF_HEADSPACE +
+ DU_BUFF_TAILSPACE,
+ DU_BUFF_HEADSPACE,
+ (uint8_t *) buf,
+ count);
+ if (index == -1) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
+ e.index = index;
+ e.port_id = _ap_instance->flows[fd].port_id;
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
if (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0) {
shm_release_du_buff(_ap_instance->dum, index);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return -EPIPE;
}
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return 0;
} else {
while (shm_ap_rbuff_write(_ap_instance->flows[fd].rb, &e) < 0)
- ;
+ LOG_DBGF("Couldn't write to rbuff.");
}
+ rw_lock_unlock(&_ap_instance->data_lock);
+
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
return 0;
}
@@ -446,27 +572,41 @@ ssize_t flow_read(int fd, void * buf, size_t count)
int n;
uint8_t * sdu;
+ rw_lock_rdlock(&_ap_instance->data_lock);
+
+ rw_lock_rdlock(&_ap_instance->flows_lock);
+
if (_ap_instance->flows[fd].oflags & FLOW_O_NONBLOCK) {
e = shm_ap_rbuff_read(_ap_instance->rb);
} else {
- /* FIXME: move this to a thread */
+
+ /* FIXME: this will throw away packets for other fd's */
while (e == NULL ||
- e->port_id != _ap_instance->flows[fd].port_id)
+ e->port_id != _ap_instance->flows[fd].port_id) {
e = shm_ap_rbuff_read(_ap_instance->rb);
+ }
}
- if (e == NULL)
+ rw_lock_unlock(&_ap_instance->flows_lock);
+
+ if (e == NULL) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
n = shm_du_map_read_sdu(&sdu,
_ap_instance->dum,
e->index);
- if (n < 0)
+ if (n < 0) {
+ rw_lock_unlock(&_ap_instance->data_lock);
return -1;
+ }
memcpy(buf, sdu, MIN(n, count));
shm_release_du_buff(_ap_instance->dum, e->index);
+ rw_lock_unlock(&_ap_instance->data_lock);
+
return n;
}
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6c04ccc5..da6f0e33 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -253,8 +253,10 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
}
e = malloc(sizeof(*e));
- if (e == NULL)
+ if (e == NULL) {
+ pthread_mutex_unlock(rb->shm_mutex);
return NULL;
+ }
*e = *(rb->shm_base + *rb->ptr_tail);