summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authordimitri staessens <[email protected]>2016-08-24 00:47:06 +0200
committerdimitri staessens <[email protected]>2016-08-24 00:47:06 +0200
commitf32db895e62152e1518fc5e184d19743d35e6cad (patch)
treeb32afcf153f4e6ac5880b5a986c3d6526d6d70c2 /src/ipcpd/normal
parentd8744f9b77a98183ca4ecc6e0be5ce9a6e92ede0 (diff)
parentbb0a01dbb52cb0a02ce684b6fef3ec85e6c1918a (diff)
downloadouroboros-f32db895e62152e1518fc5e184d19743d35e6cad.tar.gz
ouroboros-f32db895e62152e1518fc5e184d19743d35e6cad.zip
Merged in sandervrijders/ouroboros/be-normal-flow-alloc (pull request #219)
ipcpd: normal: Add initial steps for N+1 flow allocation
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/CMakeLists.txt10
-rw-r--r--src/ipcpd/normal/flow_alloc.proto13
-rw-r--r--src/ipcpd/normal/fmgr.c373
-rw-r--r--src/ipcpd/normal/fmgr.h12
-rw-r--r--src/ipcpd/normal/frct.c165
-rw-r--r--src/ipcpd/normal/frct.h20
-rw-r--r--src/ipcpd/normal/rmt.c52
-rw-r--r--src/ipcpd/normal/rmt.h34
8 files changed, 646 insertions, 33 deletions
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 1e291d30..555260f1 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -12,26 +12,30 @@ include_directories(${CURRENT_BINARY_PARENT_DIR})
include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_BINARY_DIR}/include)
-SET(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
+set(IPCP_NORMAL_TARGET ipcpd-normal CACHE STRING "IPCP_NORMAL_TARGET")
protobuf_generate_c(STATIC_INFO_SRCS STATIC_INFO_HDRS
static_info.proto)
+protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS
+ flow_alloc.proto)
+
set(SOURCE_FILES
# Add source files here
main.c
fmgr.c
frct.c
ribmgr.c
+ rmt.c
)
add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES}
- ${STATIC_INFO_SRCS})
+ ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS})
target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros)
include(MacroAddCompileFlags)
if (CMAKE_BUILD_TYPE MATCHES Debug)
- MACRO_ADD_COMPILE_FLAGS(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
+ macro_add_compile_flags(ipcpd-normal -DCONFIG_OUROBOROS_DEBUG)
endif (CMAKE_BUILD_TYPE MATCHES Debug)
install(TARGETS ipcpd-normal RUNTIME DESTINATION sbin)
diff --git a/src/ipcpd/normal/flow_alloc.proto b/src/ipcpd/normal/flow_alloc.proto
new file mode 100644
index 00000000..b1ca7cb8
--- /dev/null
+++ b/src/ipcpd/normal/flow_alloc.proto
@@ -0,0 +1,13 @@
+enum flow_alloc_code {
+ FLOW_REQ = 1;
+ FLOW_REPLY = 2;
+ FLOW_DEALLOC = 3;
+};
+
+message flow_alloc_msg {
+ required flow_alloc_code code = 1;
+ optional string dst_name = 2;
+ optional string src_ae_name = 3;
+ optional uint32 qos_cube = 4;
+ optional sint32 response = 5;
+};
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 70afff37..58ae1dc8 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -26,6 +26,7 @@
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/list.h>
+#include <ouroboros/ipcp.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -37,8 +38,19 @@
#include "frct.h"
#include "ipcp.h"
+#include "flow_alloc.pb-c.h"
+typedef FlowAllocMsg flow_alloc_msg_t;
+
extern struct ipcp * _ipcp;
+struct n_flow {
+ struct flow flow;
+ struct frct_i * frct_i;
+ enum qos_cube qos;
+
+ struct list_head next;
+};
+
struct n_1_flow {
int fd;
char * ae_name;
@@ -51,6 +63,9 @@ struct fmgr {
struct list_head n_1_flows;
pthread_mutex_t n_1_flows_lock;
+ struct list_head n_flows;
+ /* FIXME: Make this a read/write lock */
+ pthread_mutex_t n_flows_lock;
} * fmgr = NULL;
static int add_n_1_fd(int fd,
@@ -68,6 +83,8 @@ static int add_n_1_fd(int fd,
tmp->fd = fd;
tmp->ae_name = ae_name;
+ INIT_LIST_HEAD(&tmp->next);
+
pthread_mutex_lock(&fmgr->n_1_flows_lock);
list_add(&tmp->next, &fmgr->n_1_flows);
pthread_mutex_unlock(&fmgr->n_1_flows_lock);
@@ -125,7 +142,8 @@ static void * fmgr_listen(void * o)
}
if (strcmp(ae_name, DT_AE) == 0) {
- if (frct_dt_flow(fd)) {
+ /* FIXME: Pass correct QoS cube */
+ if (frct_dt_flow(fd, 0)) {
LOG_ERR("Failed to hand fd to FRCT.");
flow_dealloc(fd);
continue;
@@ -149,13 +167,12 @@ int fmgr_init()
return -1;
INIT_LIST_HEAD(&fmgr->n_1_flows);
+ INIT_LIST_HEAD(&fmgr->n_flows);
pthread_mutex_init(&fmgr->n_1_flows_lock, NULL);
+ pthread_mutex_init(&fmgr->n_flows_lock, NULL);
- pthread_create(&fmgr->listen_thread,
- NULL,
- fmgr_listen,
- NULL);
+ pthread_create(&fmgr->listen_thread, NULL, fmgr_listen, NULL);
return 0;
}
@@ -187,11 +204,17 @@ int fmgr_mgmt_flow(char * dst_name)
{
int fd;
int result;
+ char * ae_name;
+
+ ae_name = strdup(MGMT_AE);
+ if (ae_name == NULL)
+ return -1;
/* FIXME: Request retransmission. */
fd = flow_alloc(dst_name, MGMT_AE, NULL);
if (fd < 0) {
LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
return -1;
}
@@ -199,29 +222,98 @@ int fmgr_mgmt_flow(char * dst_name)
if (result < 0) {
LOG_ERR("Result of flow allocation to %s is %d",
dst_name, result);
+ free(ae_name);
return -1;
}
if (ribmgr_add_flow(fd)) {
LOG_ERR("Failed to hand file descriptor to RIB manager");
flow_dealloc(fd);
+ free(ae_name);
+ return -1;
+ }
+
+ if (add_n_1_fd(fd, ae_name)) {
+ LOG_ERR("Failed to add file descriptor to list.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos)
+{
+ int fd;
+ int result;
+ char * ae_name;
+
+ ae_name = strdup(DT_AE);
+ if (ae_name == NULL)
+ return -1;
+
+ /* FIXME: Map qos cube on correct QoS. */
+ fd = flow_alloc(dst_name, DT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s", dst_name);
+ free(ae_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d",
+ dst_name, result);
+ free(ae_name);
+ return -1;
+ }
+
+ if (frct_dt_flow(fd, qos)) {
+ LOG_ERR("Failed to hand file descriptor to FRCT");
+ flow_dealloc(fd);
+ free(ae_name);
return -1;
}
- if (add_n_1_fd(fd, strdup(MGMT_AE))) {
+ if (add_n_1_fd(fd, ae_name)) {
LOG_ERR("Failed to add file descriptor to list.");
flow_dealloc(fd);
+ free(ae_name);
return -1;
}
return 0;
}
-int fmgr_dt_flow(char * dst_name)
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_port_id(int port_id)
{
- LOG_MISSING;
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->flow.port_id == port_id)
+ return e;
+ }
+
+ return NULL;
+}
+
+/* Call under n_flows lock */
+static struct n_flow * get_n_flow_by_frct_i(struct frct_i * frct_i)
+{
+ struct list_head * pos = NULL;
+
+ list_for_each(pos, &fmgr->n_flows) {
+ struct n_flow * e =
+ list_entry(pos, struct n_flow, next);
+ if (e->frct_i == frct_i)
+ return e;
+ }
- return -1;
+ return NULL;
}
int fmgr_flow_alloc(pid_t n_api,
@@ -230,23 +322,274 @@ int fmgr_flow_alloc(pid_t n_api,
char * src_ae_name,
enum qos_cube qos)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ struct frct_i * frct_i;
+ uint32_t address = 0;
+ buffer_t buf;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL)
+ return -1;
+
+ /* FIXME: Obtain correct address here from DIF NSM */
- return -1;
+ msg.code = FLOW_ALLOC_CODE__FLOW_REQ;
+ msg.dst_name = dst_ap_name;
+ msg.src_ae_name = src_ae_name;
+ msg.qos_cube = qos;
+ msg.has_qos_cube = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ free(flow);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ free(flow);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ frct_i = frct_i_create(address, &buf, qos);
+ if (frct_i == NULL) {
+ free(buf.data);
+ free(flow);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ free(buf.data);
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ return -1;
+ }
+
+ flow->flow.api = n_api;
+ flow->flow.port_id = port_id;
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = qos;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return 0;
+}
+
+/* Call under n_flows lock */
+static int n_flow_dealloc(int port_id)
+{
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ buffer_t buf;
+ int ret;
+
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL)
+ return -1;
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0)
+ return -1;
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL)
+ return -1;
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ if (flow->flow.rb != NULL)
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+
+ free(flow);
+ free(buf.data);
+
+ return ret;
}
int fmgr_flow_alloc_resp(pid_t n_api,
int port_id,
int response)
{
- LOG_MISSING;
+ struct n_flow * flow;
+ flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;
+ int ret;
+ buffer_t buf;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
- return -1;
+ flow = get_n_flow_by_port_id(port_id);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ if (flow->flow.state != FLOW_PENDING) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Flow is not pending.");
+ return -1;
+ }
+
+ msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;
+ msg.response = response;
+ msg.has_response = true;
+
+ buf.len = flow_alloc_msg__get_packed_size(&msg);
+ if (buf.len == 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ buf.data = malloc(buf.len);
+ if (buf.data == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return -1;
+ }
+
+ flow_alloc_msg__pack(&msg, buf.data);
+
+ if (response < 0) {
+ ret = frct_i_destroy(flow->frct_i, &buf);
+ free(buf.data);
+ list_del(&flow->next);
+ free(flow);
+ } else {
+ frct_i_accept(flow->frct_i, &buf);
+ flow->flow.state = FLOW_ALLOCATED;
+ flow->flow.api = n_api;
+
+ flow->flow.rb = shm_ap_rbuff_open(n_api);
+ if (flow->flow.rb == NULL) {
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ return ret;
+ }
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
}
int fmgr_flow_dealloc(int port_id)
{
- LOG_MISSING;
+ int ret;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+ ret = n_flow_dealloc(port_id);
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ return ret;
+}
+
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf)
+{
+ struct n_flow * flow;
+ int ret = 0;
+ int port_id;
+ flow_alloc_msg_t * msg;
+
+ pthread_mutex_lock(&fmgr->n_flows_lock);
+
+ /* Depending on what is in the message call the function in ipcp.h */
+
+ msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);
+ if (msg == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ LOG_ERR("Failed to unpack flow alloc message");
+ return -1;
+ }
+
+ switch (msg->code) {
+ case FLOW_ALLOC_CODE__FLOW_REQ:
+
+ flow = malloc(sizeof(*flow));
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ flow->flow.state = FLOW_PENDING;
+ flow->frct_i = frct_i;
+ flow->qos = msg->qos_cube;
+ flow->flow.rb = NULL;
+ flow->flow.api = 0;
+
+ port_id = ipcp_flow_req_arr(getpid(),
+ msg->dst_name,
+ msg->src_ae_name);
+ if (port_id < 0) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ free(flow);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("Failed to get port-id from IRMd.");
+ return -1;
+ }
+
+ flow->flow.port_id = port_id;
+
+ INIT_LIST_HEAD(&flow->next);
+
+ list_add(&flow->next, &fmgr->n_flows);
+ break;
+ case FLOW_ALLOC_CODE__FLOW_REPLY:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_alloc_reply(getpid(),
+ flow->flow.port_id,
+ msg->response);
+
+ if (msg->response < 0) {
+ shm_ap_rbuff_close(flow->flow.rb);
+ list_del(&flow->next);
+ free(flow);
+ }
+
+ break;
+ case FLOW_ALLOC_CODE__FLOW_DEALLOC:
+ flow = get_n_flow_by_frct_i(frct_i);
+ if (flow == NULL) {
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+ flow_alloc_msg__free_unpacked(msg, NULL);
+ LOG_ERR("No such flow in flow manager.");
+ return -1;
+ }
+
+ ret = ipcp_flow_dealloc(0, flow->flow.port_id);
+ break;
+ default:
+ LOG_ERR("Got an unknown flow allocation message.");
+ ret = -1;
+ break;
+ }
+
+ pthread_mutex_unlock(&fmgr->n_flows_lock);
+
+ flow_alloc_msg__free_unpacked(msg, NULL);
- return -1;
+ return ret;
}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index dc88bbdf..342410ca 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -29,6 +29,8 @@
#include <stdint.h>
#include <sys/types.h>
+#include "frct.h"
+
#define MGMT_AE "Management"
#define DT_AE "Data transfer"
@@ -37,9 +39,10 @@ int fmgr_fini();
/* N-flow ops */
int fmgr_mgmt_flow(char * dst_name);
-int fmgr_dt_flow(char * dst_name);
+int fmgr_dt_flow(char * dst_name,
+ enum qos_cube qos);
-/* N+1-flow ops */
+/* N+1-flow ops, local */
int fmgr_flow_alloc(pid_t n_api,
int port_id,
char * dst_ap_name,
@@ -52,4 +55,9 @@ int fmgr_flow_alloc_resp(pid_t n_api,
int fmgr_flow_dealloc(int port_id);
+/* N+1-flow ops, remote */
+int fmgr_flow_alloc_msg(struct frct_i * frct_i,
+ buffer_t * buf);
+
+
#endif
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 2de9422d..7c2eba61 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -22,20 +22,67 @@
#define OUROBOROS_PREFIX "flow-rtx-control"
-#include <stdlib.h>
+#define IDS_SIZE 2048
+#include <ouroboros/config.h>
#include <ouroboros/logs.h>
+#include <ouroboros/bitmap.h>
+#include <ouroboros/list.h>
+
+#include <stdlib.h>
+#include <stdbool.h>
+#include <pthread.h>
#include "frct.h"
+
+enum conn_state {
+ CONN_PENDING = 0,
+ CONN_ESTABLISHED
+};
+
struct frct_i {
+ uint32_t cep_id;
+ uint32_t r_address;
+ uint32_t r_cep_id;
+
+ enum conn_state state;
+ struct list_head next;
};
struct frct {
struct dt_const * dtc;
uint32_t address;
+
+ struct list_head instances;
+ pthread_mutex_t instances_lock;
+
+ struct bmp * cep_ids;
+ pthread_mutex_t cep_ids_lock;
} * frct = NULL;
+static int next_cep_id()
+{
+ int ret;
+
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ ret = bmp_allocate(frct->cep_ids);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+
+ return ret;
+}
+
+static int release_cep_id(int id)
+{
+ int ret;
+
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ ret = bmp_release(frct->cep_ids, id);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+
+ return ret;
+}
+
int frct_init(struct dt_const * dtc, uint32_t address)
{
if (dtc == NULL)
@@ -48,35 +95,133 @@ int frct_init(struct dt_const * dtc, uint32_t address)
frct->dtc = dtc;
frct->address = address;
+ INIT_LIST_HEAD(&frct->instances);
+
+ if (pthread_mutex_init(&frct->cep_ids_lock, NULL)) {
+ free(frct);
+ return -1;
+ }
+
+ if (pthread_mutex_init(&frct->instances_lock, NULL)) {
+ free(frct);
+ return -1;
+ }
+
+ frct->cep_ids = bmp_create(IDS_SIZE, 0);
+ if (frct->cep_ids == NULL) {
+ free(frct);
+ return -1;
+ }
+
return 0;
}
int frct_fini()
{
- if (frct != NULL)
- free(frct);
+ pthread_mutex_lock(&frct->cep_ids_lock);
+ bmp_destroy(frct->cep_ids);
+ pthread_mutex_unlock(&frct->cep_ids_lock);
+ free(frct);
return 0;
}
-struct frct_i * frct_i_create(int port_id,
- enum qos_cube cube)
+int frct_dt_flow(int fd,
+ enum qos_cube qos)
{
LOG_MISSING;
- return NULL;
+ return -1;
}
-int frct_i_destroy(struct frct_i * instance)
+int frct_rmt_post()
{
LOG_MISSING;
return -1;
}
-int frct_dt_flow(int fd)
+/* Call under instances lock */
+static void destroy_frct_i(struct frct_i * instance)
{
- LOG_MISSING;
+ release_cep_id(instance->cep_id);
+ list_del(&instance->next);
+ free(instance);
+}
- return -1;
+struct frct_i * frct_i_create(uint32_t address,
+ buffer_t * buf,
+ enum qos_cube cube)
+{
+ struct frct_i * instance;
+
+ if (buf == NULL ||
+ buf->data == NULL)
+ return NULL;
+
+ instance = malloc(sizeof(*instance));
+ if (instance == NULL)
+ return NULL;
+
+ pthread_mutex_lock(&frct->instances_lock);
+
+ instance->r_address = address;
+ instance->cep_id = next_cep_id();
+ instance->state = CONN_PENDING;
+
+ INIT_LIST_HEAD(&instance->next);
+ list_add(&instance->next, &frct->instances);
+
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+
+ return instance;
+}
+
+int frct_i_accept(struct frct_i * instance,
+ buffer_t * buf)
+{
+ if (instance == NULL || buf == NULL || buf->data == NULL)
+ return -1;
+
+ pthread_mutex_lock(&frct->instances_lock);
+ if (instance->state != CONN_PENDING) {
+ pthread_mutex_unlock(&frct->instances_lock);
+ return -1;
+ }
+
+ instance->state = CONN_ESTABLISHED;
+ instance->cep_id = next_cep_id();
+
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+
+ return 0;
+}
+
+int frct_i_destroy(struct frct_i * instance,
+ buffer_t * buf)
+{
+ if (instance == NULL)
+ return -1;
+
+ pthread_mutex_lock(&frct->instances_lock);
+
+ if (!(instance->state == CONN_PENDING ||
+ instance->state == CONN_ESTABLISHED)) {
+ pthread_mutex_unlock(&frct->instances_lock);
+ return -1;
+ }
+
+ destroy_frct_i(instance);
+ pthread_mutex_unlock(&frct->instances_lock);
+
+ if (buf != NULL && buf->data != NULL) {
+
+ /* FIXME: Pack into FRCT header and hand SDU to RMT */
+ }
+
+ return 0;
}
diff --git a/src/ipcpd/normal/frct.h b/src/ipcpd/normal/frct.h
index 2e965d38..91b2dfc7 100644
--- a/src/ipcpd/normal/frct.h
+++ b/src/ipcpd/normal/frct.h
@@ -24,6 +24,7 @@
#define OUROBOROS_IPCP_FRCT_H
#include <ouroboros/shared.h>
+#include <ouroboros/common.h>
#include "dt_const.h"
@@ -33,10 +34,23 @@ int frct_init(struct dt_const * dtc,
uint32_t address);
int frct_fini();
-struct frct_i * frct_i_create(int port_id,
+int frct_dt_flow(int fd,
+ enum qos_cube qos);
+/*
+ * FIXME: Will take the index in the DU map,
+ * possibly cep-ids and address
+ */
+int frct_rmt_post();
+
+struct frct_i * frct_i_create(uint32_t address,
+ buffer_t * buf,
enum qos_cube cube);
-int frct_i_destroy(struct frct_i * instance);
+/* FIXME: Hand QoS cube here too? We received it in the flow alloc message. */
+int frct_i_accept(struct frct_i * instance,
+ buffer_t * buf);
+int frct_i_destroy(struct frct_i * instance,
+ buffer_t * buf);
-int frct_dt_flow(int fd);
+/* FIXME: Add read/write ops for frct instances */
#endif
diff --git a/src/ipcpd/normal/rmt.c b/src/ipcpd/normal/rmt.c
new file mode 100644
index 00000000..ee92c3e3
--- /dev/null
+++ b/src/ipcpd/normal/rmt.c
@@ -0,0 +1,52 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Relaying and Multiplexing task
+ *
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define OUROBOROS_PREFIX "flow-manager"
+
+#include <ouroboros/config.h>
+#include <ouroboros/logs.h>
+
+#include "rmt.h"
+
+struct rmt {
+};
+
+int rmt_init(struct dt_const * dtc)
+{
+ LOG_MISSING;
+
+ return -1;
+}
+
+int rmt_fini()
+{
+ LOG_MISSING;
+
+ return -1;
+}
+
+int rmt_frct_post()
+{
+ LOG_MISSING;
+
+ return -1;
+}
diff --git a/src/ipcpd/normal/rmt.h b/src/ipcpd/normal/rmt.h
new file mode 100644
index 00000000..cdd86a0b
--- /dev/null
+++ b/src/ipcpd/normal/rmt.h
@@ -0,0 +1,34 @@
+/*
+ * Ouroboros - Copyright (C) 2016
+ *
+ * The Relaying and Multiplexing task
+ *
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCP_RMT_H
+#define OUROBOROS_IPCP_RMT_H
+
+#include "dt_const.h"
+
+int rmt_init(struct dt_const * dtc);
+int rmt_fini();
+
+/* FIXME: Will take index in DU map */
+int rmt_frct_post();
+
+#endif