summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2018-07-03 15:51:16 +0200
committerDimitri Staessens <[email protected]>2018-07-04 16:16:41 +0200
commit301212fc967b51fd01e02c0dca8c9183db923b11 (patch)
tree4db76fb5977f0ecd34eb300a6375e1cf5d280497
parentbedee0b0aac97fb195288ec81837e192cbb7b27c (diff)
downloadouroboros-301212fc967b51fd01e02c0dca8c9183db923b11.tar.gz
ouroboros-301212fc967b51fd01e02c0dca8c9183db923b11.zip
ipcpd: React to flow events actively
This adds the infrastructure to actively react to flow up, down and deallocated events. Signed-off-by: Sander Vrijders <[email protected]> Signed-off-by: Dimitri Staessens <[email protected]>
-rw-r--r--src/ipcpd/normal/connmgr.c7
-rw-r--r--src/ipcpd/normal/connmgr.h21
-rw-r--r--src/ipcpd/normal/sdu_sched.c25
-rw-r--r--src/lib/dev.c20
4 files changed, 55 insertions, 18 deletions
diff --git a/src/ipcpd/normal/connmgr.c b/src/ipcpd/normal/connmgr.c
index 9530633b..bf07ebc4 100644
--- a/src/ipcpd/normal/connmgr.c
+++ b/src/ipcpd/normal/connmgr.c
@@ -193,7 +193,9 @@ static void handle_event(void * self,
(void) self;
- if (!(event == NOTIFY_DT_FLOW_UP || event == NOTIFY_DT_FLOW_DOWN))
+ if (!(event == NOTIFY_DT_FLOW_UP ||
+ event == NOTIFY_DT_FLOW_DOWN ||
+ event == NOTIFY_DT_FLOW_DEALLOC))
return;
if (get_conn_by_fd(*((int *) o), COMPID_DT, &conn))
@@ -206,6 +208,9 @@ static void handle_event(void * self,
case NOTIFY_DT_FLOW_DOWN:
notifier_event(NOTIFY_DT_CONN_DOWN, &conn);
break;
+ case NOTIFY_DT_FLOW_DEALLOC:
+ notifier_event(NOTIFY_DT_CONN_DEL, &conn);
+ break;
default:
break;
}
diff --git a/src/ipcpd/normal/connmgr.h b/src/ipcpd/normal/connmgr.h
index 510b8e4e..a7e8a6e0 100644
--- a/src/ipcpd/normal/connmgr.h
+++ b/src/ipcpd/normal/connmgr.h
@@ -28,16 +28,17 @@
#include "comp.h"
-#define NOTIFY_DT_CONN_ADD 0x00D0
-#define NOTIFY_DT_CONN_DEL 0x00D1
-#define NOTIFY_DT_CONN_QOS 0x00D2
-#define NOTIFY_DT_CONN_UP 0x00D3
-#define NOTIFY_DT_CONN_DOWN 0x00D4
-#define NOTIFY_DT_FLOW_UP 0x00D5
-#define NOTIFY_DT_FLOW_DOWN 0x00D6
-
-#define NOTIFY_MGMT_CONN_ADD 0x00F0
-#define NOTIFY_MGMT_CONN_DEL 0x00F1
+#define NOTIFY_DT_CONN_ADD 0x00D0
+#define NOTIFY_DT_CONN_DEL 0x00D1
+#define NOTIFY_DT_CONN_QOS 0x00D2
+#define NOTIFY_DT_CONN_UP 0x00D3
+#define NOTIFY_DT_CONN_DOWN 0x00D4
+#define NOTIFY_DT_FLOW_UP 0x00D5
+#define NOTIFY_DT_FLOW_DOWN 0x00D6
+#define NOTIFY_DT_FLOW_DEALLOC 0x00D7
+
+#define NOTIFY_MGMT_CONN_ADD 0x00F0
+#define NOTIFY_MGMT_CONN_DEL 0x00F1
int connmgr_init(void);
diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c
index 0ae22895..6ce18ed5 100644
--- a/src/ipcpd/normal/sdu_sched.c
+++ b/src/ipcpd/normal/sdu_sched.c
@@ -25,9 +25,11 @@
#include "config.h"
#include <ouroboros/errno.h>
+#include <ouroboros/notifier.h>
#include "ipcp.h"
#include "sdu_sched.h"
+#include "connmgr.h"
#include <assert.h>
#include <sched.h>
@@ -86,10 +88,25 @@ static void * sdu_reader(void * o)
continue;
while ((fd = fqueue_next(fq)) >= 0) {
- if (ipcp_flow_read(fd, &sdb))
- continue;
-
- sched->callback(fd, qc, sdb);
+ switch (fqueue_type(fq)) {
+ case FLOW_DEALLOC:
+ notifier_event(NOTIFY_DT_FLOW_DEALLOC, &fd);
+ break;
+ case FLOW_DOWN:
+ notifier_event(NOTIFY_DT_FLOW_DOWN, &fd);
+ break;
+ case FLOW_UP:
+ notifier_event(NOTIFY_DT_FLOW_UP, &fd);
+ break;
+ case FLOW_PKT:
+ if (ipcp_flow_read(fd, &sdb))
+ continue;
+
+ sched->callback(fd, qc, sdb);
+ break;
+ default:
+ break;
+ }
}
}
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 9eade797..dd908f78 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -252,8 +252,12 @@ static void flow_fini(int fd)
shm_rbuff_close(ai.flows[fd].tx_rb);
}
- if (ai.flows[fd].set != NULL)
+ if (ai.flows[fd].set != NULL) {
+ shm_flow_set_notify(ai.flows[fd].set,
+ ai.flows[fd].port_id,
+ FLOW_DEALLOC);
shm_flow_set_close(ai.flows[fd].set);
+ }
if (ai.flows[fd].frcti != NULL)
frcti_destroy(ai.flows[fd].frcti);
@@ -435,8 +439,6 @@ static void fini(void)
frct_fini();
- shm_flow_set_destroy(ai.fqset);
-
if (ai.prog != NULL)
free(ai.prog);
@@ -452,6 +454,8 @@ static void fini(void)
}
}
+ shm_flow_set_destroy(ai.fqset);
+
for (i = 0; i < SYS_MAX_FLOWS; ++i) {
pthread_mutex_destroy(&ai.ports[i].state_lock);
pthread_cond_destroy(&ai.ports[i].state_cond);
@@ -764,9 +768,15 @@ int fccntl(int fd,
if (flow->oflags & FLOWFDOWN) {
rx_acl |= ACL_FLOWDOWN;
tx_acl |= ACL_FLOWDOWN;
+ shm_flow_set_notify(flow->set,
+ flow->port_id,
+ FLOW_DOWN);
} else {
rx_acl &= ~ACL_FLOWDOWN;
tx_acl &= ~ACL_FLOWDOWN;
+ shm_flow_set_notify(flow->set,
+ flow->port_id,
+ FLOW_UP);
}
shm_rbuff_set_acl(flow->rx_rb, rx_acl);
@@ -1425,6 +1435,10 @@ void ipcp_flow_fini(int fd)
shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN);
+ shm_flow_set_notify(ai.flows[fd].set,
+ ai.flows[fd].port_id,
+ FLOW_DEALLOC);
+
rx_rb = ai.flows[fd].rx_rb;
pthread_rwlock_unlock(&ai.lock);