summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <[email protected]>2018-05-14 09:20:44 +0200
committerSander Vrijders <[email protected]>2018-05-14 11:40:09 +0200
commit38cfdc212c623a46038f005b0c1604c3fdaf3762 (patch)
tree57d9c0860f014afc9cb10de022d3e72fe4fa8fbc
parenteaf14819c8cdab3c5ae1d678b0a12977f8b2d9e1 (diff)
downloadouroboros-38cfdc212c623a46038f005b0c1604c3fdaf3762.tar.gz
ouroboros-38cfdc212c623a46038f005b0c1604c3fdaf3762.zip
lib: Add event types to fqueue
The event type of the current event in the fqueue can now be requested using the fqueue_type() command. Currently events for packets (FLOW_PKT), flows (FLOW_UP, FLOW_DOWN) and allocation (FLOW_ALLOC, FLOW_DEALLOC) are specified. The implementation only tracks FLOW_PKT at this point. Signed-off-by: Dimitri Staessens <[email protected]> Signed-off-by: Sander Vrijders <[email protected]>
-rw-r--r--doc/man/fqueue.331
-rw-r--r--include/ouroboros/fqueue.h40
-rw-r--r--include/ouroboros/shm_flow_set.h3
-rw-r--r--src/lib/dev.c52
-rw-r--r--src/lib/shm_flow_set.c38
5 files changed, 108 insertions, 56 deletions
diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3
index 00c28d4c..abd21cfe 100644
--- a/doc/man/fqueue.3
+++ b/doc/man/fqueue.3
@@ -19,6 +19,8 @@ on flows
\fBint fqueue_next(fqueue_t * \fIfq\fB);
+\fBint fqueue_type(fqueue_t * \fIfq\fB);
+
\fBint fevent(fset_t * \fIset\fB, fqueue_t * \fIfq\fB,
const struct timespec * \fItimeo\fB);
@@ -36,6 +38,22 @@ an \fBfqueue_t\fR \fIfq\fR.
The \fBfqueue_next\fR() function retrieves the next event (a \fIflow
descriptor\fR) that is ready within the event queue \fIfq\fR.
+The \fBfqueue_type\fR() function retrieves the type for the current
+event on the fd that was returned by \fBfqueue_next\fR(). Event types
+are:
+.RS 4
+FLOW_PKT: A new packet arrived on this flow and is ready for reading.
+
+FLOW_UP: The flow is now marked UP and ready for read/write.
+
+FLOW_DOWN: The flow is now marked DOWN and cannot be written to.
+
+FLOW_ALLOC: A pending flow is now allocated.
+
+FLOW_DEALLOC: The flow is deallocated by the other (N+1 or N-1)
+process.
+.RE
+
The \fBfevent\fR() function retrieves all events that occured on any
\fIflow descriptor\fR within \fIset\fR and returns them in the event
queue \fBfq\fR. If a \fBstruct timespec *\fI timeo\fR can be provided,
@@ -50,7 +68,14 @@ On success, \fBfqueue_create\fR() returns a pointer to an
\fBfqueue_destroy\fR() has no return value.
-On success, \fBfevent\fR() returns the number of events that occured in \fIset\fR.
+On success, \fBfevent\fR() returns the number of events that occured
+in \fIset\fR.
+
+On success, \fBfqueue_next\fR() returns the next file descriptor for
+which an event occurred.
+
+On success, \fBfqueue_type\fR() returns the event type for the last
+event returned by \fBfqueue_next\fR().
.SH ERRORS
@@ -62,10 +87,10 @@ were available to create the \fBfqueue_t\fR.
.B -EINVAL
An invalid argument was passed (\fIfq\fR or \fIset\fR was \fINULL\fR).
-In addition, \fBfqueue_next\fR() can return
+In addition, \fBfqueue_next\fR() or \fBqueue_type\fR() can return
.B -EPERM
-No more fds available in \fIfq\fR.
+No more fds available or no current event in \fIfq\fR.
and \fBfevent\fR() can return
diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h
index 1b102669..8a5dc988 100644
--- a/include/ouroboros/fqueue.h
+++ b/include/ouroboros/fqueue.h
@@ -28,6 +28,14 @@
#include <stdbool.h>
#include <time.h>
+enum fqtype {
+ FLOW_PKT = 0,
+ FLOW_DOWN,
+ FLOW_UP,
+ FLOW_ALLOC,
+ FLOW_DEALLOC
+};
+
struct flow_set;
struct fqueue;
@@ -37,30 +45,32 @@ typedef struct fqueue fqueue_t;
__BEGIN_DECLS
-fset_t * fset_create(void);
+fset_t * fset_create(void);
+
+void fset_destroy(fset_t * set);
-void fset_destroy(fset_t * set);
+fqueue_t * fqueue_create(void);
-fqueue_t * fqueue_create(void);
+void fqueue_destroy(struct fqueue * fq);
-void fqueue_destroy(struct fqueue * fq);
+void fset_zero(fset_t * set);
-void fset_zero(fset_t * set);
+int fset_add(fset_t * set,
+ int fd);
-int fset_add(fset_t * set,
- int fd);
+bool fset_has(const fset_t * set,
+ int fd);
-bool fset_has(const fset_t * set,
- int fd);
+void fset_del(fset_t * set,
+ int fd);
-void fset_del(fset_t * set,
- int fd);
+int fqueue_next(fqueue_t * fq);
-int fqueue_next(fqueue_t * fq);
+enum fqtype fqueue_type(fqueue_t * fq);
-int fevent(fset_t * set,
- fqueue_t * fq,
- const struct timespec * timeo);
+int fevent(fset_t * set,
+ fqueue_t * fq,
+ const struct timespec * timeo);
__END_DECLS
diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h
index 76849137..ebf63af5 100644
--- a/include/ouroboros/shm_flow_set.h
+++ b/include/ouroboros/shm_flow_set.h
@@ -53,7 +53,8 @@ void shm_flow_set_del(struct shm_flow_set * shm_set,
int port_id);
void shm_flow_set_notify(struct shm_flow_set * set,
- int port_id);
+ int port_id,
+ int event);
ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set,
size_t idx,
diff --git a/src/lib/dev.c b/src/lib/dev.c
index edcf56ed..3d854c2a 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -65,7 +65,7 @@ struct flow_set {
};
struct fqueue {
- int fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
+ int fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */
size_t fqsize;
size_t next;
};
@@ -875,7 +875,7 @@ ssize_t flow_write(int fd,
if (ret < 0)
shm_rdrbuff_remove(ai.rdrb, idx);
else
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set)
int fset_add(struct flow_set * set,
int fd)
{
- int ret;
+ int ret;
size_t sdus;
size_t i;
@@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set,
sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
for (i = 0; i < sdus; i++)
- shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
+ shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq)
if (fq == NULL)
return -EINVAL;
- if (fq->fqsize == 0)
+ if (fq->fqsize == 0 || fq->next == fq->fqsize)
return -EPERM;
pthread_rwlock_rdlock(&ai.lock);
- fd = ai.ports[fq->fqueue[fq->next++]].fd;
+ fd = ai.ports[fq->fqueue[fq->next]].fd;
- pthread_rwlock_unlock(&ai.lock);
+ fq->next += 2;
- if (fq->next == fq->fqsize) {
- fq->fqsize = 0;
- fq->next = 0;
- }
+ pthread_rwlock_unlock(&ai.lock);
return fd;
}
+enum fqtype fqueue_type(struct fqueue * fq)
+{
+ if (fq == NULL)
+ return -EINVAL;
+
+ if (fq->fqsize == 0 || fq->next == 0)
+ return -EPERM;
+
+ return fq->fqueue[fq->next - 1];
+}
+
int fevent(struct flow_set * set,
struct fqueue * fq,
const struct timespec * timeo)
@@ -1130,11 +1138,9 @@ int fevent(struct flow_set * set,
if (set == NULL || fq == NULL)
return -EINVAL;
- if (fq->fqsize > 0)
+ if (fq->fqsize > 0 && fq->next != fq->fqsize)
return fq->fqsize;
- assert(!fq->next);
-
if (timeo != NULL) {
clock_gettime(PTHREAD_COND_CLOCK, &abstime);
ts_add(&abstime, timeo, &abstime);
@@ -1147,7 +1153,8 @@ int fevent(struct flow_set * set,
return -ETIMEDOUT;
}
- fq->fqsize = ret;
+ fq->fqsize = ret << 1;
+ fq->next = 0;
assert(ret);
@@ -1365,9 +1372,9 @@ int ipcp_flow_write(int fd,
return -ENOMEM;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
@@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd)
int local_flow_write(int fd,
size_t idx)
{
- int ret;
+ struct flow * flow;
+ int ret;
assert(fd >= 0);
+ flow = &ai.flows[fd];
+
pthread_rwlock_rdlock(&ai.lock);
- if (ai.flows[fd].port_id < 0) {
+ if (flow->port_id < 0) {
pthread_rwlock_unlock(&ai.lock);
return -ENOTALLOC;
}
- ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+ ret = shm_rbuff_write(flow->tx_rb, idx);
if (ret == 0)
- shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+ shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
pthread_rwlock_unlock(&ai.lock);
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index d2107fc3..bb9e3caa 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -27,7 +27,6 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/shm_flow_set.h>
-#include <ouroboros/fqueue.h>
#include <ouroboros/errno.h>
#include <pthread.h>
@@ -54,24 +53,29 @@
#define FN_MAX_CHARS 255
-#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int))
+#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct portevent))
#define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \
+ PROG_MAX_FQUEUES * sizeof(size_t) \
+ PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
- + PROG_MAX_FQUEUES * FQUEUESIZE \
+ + PROG_MAX_FQUEUES * QUEUESIZE \
+ sizeof(pthread_mutex_t))
#define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx)
+struct portevent {
+ int port_id;
+ int event;
+};
+
struct shm_flow_set {
- ssize_t * mtable;
- size_t * heads;
- pthread_cond_t * conds;
- int * fqueues;
- pthread_mutex_t * lock;
+ ssize_t * mtable;
+ size_t * heads;
+ pthread_cond_t * conds;
+ struct portevent * fqueues;
+ pthread_mutex_t * lock;
- pid_t pid;
+ pid_t pid;
};
struct shm_flow_set * shm_flow_set_create()
@@ -125,7 +129,7 @@ struct shm_flow_set * shm_flow_set_create()
set->mtable = shm_base;
set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
- set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
+ set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);
set->lock = (pthread_mutex_t *)
(set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
@@ -191,10 +195,9 @@ struct shm_flow_set * shm_flow_set_open(pid_t pid)
set->mtable = shm_base;
set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
- set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
+ set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);
set->lock = (pthread_mutex_t *)
(set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
-
set->pid = pid;
return set;
@@ -316,7 +319,8 @@ int shm_flow_set_has(struct shm_flow_set * set,
}
void shm_flow_set_notify(struct shm_flow_set * set,
- int port_id)
+ int port_id,
+ int event)
{
assert(set);
assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
@@ -328,8 +332,10 @@ void shm_flow_set_notify(struct shm_flow_set * set,
return;
}
- *(fqueue_ptr(set, set->mtable[port_id]) +
- (set->heads[set->mtable[port_id]])++) = port_id;
+ (fqueue_ptr(set, set->mtable[port_id]) +
+ (set->heads[set->mtable[port_id]]))->port_id = port_id;
+ (fqueue_ptr(set, set->mtable[port_id]) +
+ (set->heads[set->mtable[port_id]])++)->event = event;
pthread_cond_signal(&set->conds[set->mtable[port_id]]);
@@ -380,7 +386,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
if (ret != -ETIMEDOUT) {
memcpy(fqueue,
fqueue_ptr(set, idx),
- set->heads[idx] * sizeof(int));
+ set->heads[idx] * sizeof(struct portevent));
ret = set->heads[idx];
set->heads[idx] = 0;
}