summaryrefslogtreecommitdiff
path: root/src/lib/dev.c
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-10-11 13:39:43 +0200
committerdimitri staessens <[email protected]>2016-10-11 13:45:02 +0200
commitc69dd846c5aa2bed4db16961c5774a20cea7f828 (patch)
tree7e9eb9f82c4fab120abc11c4b539d4beb4c19982 /src/lib/dev.c
parent99356adf207e0fe81a34ee1acfd8cacc3d2860c7 (diff)
downloadouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.tar.gz
ouroboros-c69dd846c5aa2bed4db16961c5774a20cea7f828.zip
lib: Track SDUs in the fast path
This will allow to finalize deallocating flows until all SDUs have been processed. Read and write calls will now block when a flow was deallocated. Replaces NULL checks in the fast path with asserts.
Diffstat (limited to 'src/lib/dev.c')
-rw-r--r--src/lib/dev.c29
1 files changed, 24 insertions, 5 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index e20d23d4..c1f769ad 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -356,6 +356,8 @@ int flow_accept(char ** ae_name)
ai.ports[recv_msg->port_id].fd = fd;
ai.ports[recv_msg->port_id].state = PORT_ID_ASSIGNED;
+ shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -478,6 +480,8 @@ int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos)
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
+ shm_ap_rbuff_open_port(ai.rb, recv_msg->port_id);
+
irm_msg__free_unpacked(recv_msg, NULL);
return fd;
@@ -559,6 +563,8 @@ int flow_dealloc(int fd)
bmp_release(ai.fds, fd);
+ shm_ap_rbuff_close_port(ai.rb, msg.port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
send_irm_msg(&msg);
@@ -630,10 +636,10 @@ ssize_t flow_write(int fd, void * buf, size_t count)
DU_BUFF_TAILSPACE,
buf,
count);
- if (idx == -1) {
+ if (idx < 0) {
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return -EAGAIN;
+ return -idx;
}
e.index = idx;
@@ -643,7 +649,7 @@ ssize_t flow_write(int fd, void * buf, size_t count)
shm_rdrbuff_remove(ai.rdrb, idx);
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
- return -1;
+ return -ENOTALLOC;
}
} else { /* blocking */
struct shm_rdrbuff * rdrb = ai.rdrb;
@@ -664,8 +670,12 @@ ssize_t flow_write(int fd, void * buf, size_t count)
e.index = idx;
e.port_id = ai.flows[fd].port_id;
- while (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0)
- ;
+ if (shm_ap_rbuff_write(ai.flows[fd].rb, &e) < 0) {
+ shm_rdrbuff_remove(ai.rdrb, e.index);
+ pthread_rwlock_unlock(&ai.flows_lock);
+ pthread_rwlock_unlock(&ai.data_lock);
+ return -ENOTALLOC;
+ }
}
pthread_rwlock_unlock(&ai.flows_lock);
@@ -838,6 +848,8 @@ int np1_flow_alloc(pid_t n_api, int port_id)
ai.ports[port_id].fd = fd;
port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED);
+ shm_ap_rbuff_open_port(ai.rb, port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -867,6 +879,8 @@ int np1_flow_dealloc(int port_id)
port_destroy(&ai.ports[port_id]);
+ shm_ap_rbuff_close_port(ai.rb, port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -902,6 +916,8 @@ int np1_flow_resp(pid_t n_api, int port_id)
ai.flows[fd].rb = rb;
+ shm_ap_rbuff_open_port(ai.rb, port_id);
+
pthread_rwlock_unlock(&ai.flows_lock);
pthread_rwlock_unlock(&ai.data_lock);
@@ -1011,6 +1027,9 @@ int ipcp_flow_alloc_reply(int fd, int response)
msg.has_response = true;
msg.response = response;
+ if (response)
+ shm_ap_rbuff_open_port(ai.rb, msg.port_id);
+
recv_msg = send_recv_irm_msg(&msg);
if (recv_msg == NULL)
return -1;