summaryrefslogtreecommitdiff
path: root/src/ipcpd/normal
diff options
context:
space:
mode:
authorSander Vrijders <[email protected]>2017-01-04 11:35:18 +0100
committerSander Vrijders <[email protected]>2017-01-04 11:35:18 +0100
commita241f53af601a8b9462b600a20381228f0f8e2e0 (patch)
tree3c8f23666da58259f67bc253314fa1e6952111e6 /src/ipcpd/normal
parent004b586fd6b381fd1292aa276342163123be560a (diff)
parentf3fbf2c6093b293f995c4d784509577695e052b1 (diff)
downloadouroboros-a241f53af601a8b9462b600a20381228f0f8e2e0.tar.gz
ouroboros-a241f53af601a8b9462b600a20381228f0f8e2e0.zip
Merged in dstaesse/ouroboros/be-normal-refactor (pull request #331)
ipcpd: Refactor normal ipcp, initial commit
Diffstat (limited to 'src/ipcpd/normal')
-rw-r--r--src/ipcpd/normal/ae.h29
-rw-r--r--src/ipcpd/normal/dir.c49
-rw-r--r--src/ipcpd/normal/fmgr.c102
-rw-r--r--src/ipcpd/normal/fmgr.h12
-rw-r--r--src/ipcpd/normal/frct.c123
-rw-r--r--src/ipcpd/normal/main.c181
-rw-r--r--src/ipcpd/normal/ribmgr.c277
-rw-r--r--src/ipcpd/normal/ribmgr.h6
-rw-r--r--src/ipcpd/normal/ro.h6
9 files changed, 453 insertions, 332 deletions
diff --git a/src/ipcpd/normal/ae.h b/src/ipcpd/normal/ae.h
new file mode 100644
index 00000000..229ff4aa
--- /dev/null
+++ b/src/ipcpd/normal/ae.h
@@ -0,0 +1,29 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2017
+ *
+ * Application Entities for the normal IPC process
+ *
+ * Dimitri Staessens <[email protected]>
+ * Sander Vrijders <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef OUROBOROS_IPCPD_NORMAL_AE_H
+#define OUROBOROS_IPCPD_NORMAL_AE_H
+
+#define MGMT_AE "Management"
+#define DT_AE "Data transfer"
+
+#endif /* OUROBOROS_IPCPD_NORMAL_AE_H */
diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c
index 47fb1f6e..c5bb03dd 100644
--- a/src/ipcpd/normal/dir.c
+++ b/src/ipcpd/normal/dir.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* DIF directory
*
@@ -26,7 +26,6 @@
#include <ouroboros/errno.h>
#include "dir.h"
-#include "ipcp.h"
#include "ro.h"
#include "pathname.h"
#include "ribmgr.h"
@@ -95,41 +94,27 @@ int dir_name_reg(char * name)
char * path;
uint64_t * addr;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("IPCP is not in RUNNING state.");
- return -1;
- }
-
ro_attr_init(&attr);
attr.enrol_sync = true;
attr.recv_set = ALL_MEMBERS;
path = create_path(name);
- if (path == NULL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ if (path == NULL)
return -ENOMEM;
- }
addr = malloc(sizeof(*addr));
if (addr == NULL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
pathname_destroy(path);
return -ENOMEM;
}
*addr = ribmgr_address();
if (ro_create(path, &attr, (uint8_t *) addr, sizeof(*addr))) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
pathname_destroy(path);
LOG_ERR("Failed to create RIB object.");
return -1;
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
LOG_DBG("Registered %s.", name);
pathname_destroy(path);
@@ -140,29 +125,16 @@ int dir_name_unreg(char * name)
{
char * path;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("IPCP is not in RUNNING state.");
- return -1;
- }
-
path = create_path(name);
- if (path == NULL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
+ if (path == NULL)
return -ENOMEM;
- }
if (ro_delete(path)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
pathname_destroy(path);
LOG_ERR("No such RIB object exists.");
return -1;
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
pathname_destroy(path);
return 0;
@@ -176,18 +148,9 @@ int dir_name_query(char * name)
uint64_t addr;
struct dt_const * dtc;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- return -1;
- }
-
path = create_path(name);
- if (path == NULL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- return -1;
- }
+ if (path == NULL)
+ return -ENOMEM;
if (ro_exists(path)) {
if (ro_read(path, &ro_data) < 0) {
@@ -206,8 +169,6 @@ int dir_name_query(char * name)
ret = (addr == ribmgr_address()) ? -1 : 0;
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
pathname_destroy(path);
return ret;
diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c
index 4b24d5a1..64d9a5a9 100644
--- a/src/ipcpd/normal/fmgr.c
+++ b/src/ipcpd/normal/fmgr.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* Flow manager of the IPC Process
*
@@ -60,7 +60,6 @@ struct {
cep_id_t np1_fd_to_cep_id[AP_MAX_FLOWS];
int np1_cep_id_to_fd[IPCPD_MAX_CONNS];
- pthread_t nm1_flow_acceptor;
pthread_t nm1_sdu_reader;
pthread_t np1_sdu_reader;
@@ -68,65 +67,6 @@ struct {
int fd;
} fmgr;
-static void * fmgr_nm1_acceptor(void * o)
-{
- int fd;
- char * ae_name;
- qoscube_t cube;
- qosspec_t qs;
-
- (void) o;
-
- while (true) {
- if (ipcp_get_state() == IPCP_SHUTDOWN)
- return 0;
-
- fd = flow_accept(&ae_name, &qs);
- if (fd < 0) {
- LOG_WARN("Flow accept failed.");
- continue;
- }
-
- if (!(strcmp(ae_name, MGMT_AE) == 0 ||
- strcmp(ae_name, DT_AE) == 0)) {
- if (flow_alloc_resp(fd, -1))
- LOG_WARN("Failed to reply to flow allocation.");
- flow_dealloc(fd);
- continue;
- }
-
- if (flow_alloc_resp(fd, 0)) {
- LOG_WARN("Failed to reply to flow allocation.");
- flow_dealloc(fd);
- continue;
- }
-
- LOG_DBG("Accepted new flow allocation request for AE %s.",
- ae_name);
-
- if (strcmp(ae_name, MGMT_AE) == 0) {
- if (ribmgr_add_flow(fd)) {
- LOG_WARN("Failed to hand fd to RIB.");
- flow_dealloc(fd);
- continue;
- }
- } else {
- ipcp_flow_get_qoscube(fd, &cube);
- if (flow_set_add(fmgr.nm1_set[cube], fd)) {
- LOG_WARN("Failed to add fd.");
- flow_dealloc(fd);
- continue;
- }
- /* FIXME: Temporary, until we have a PFF */
- fmgr.fd = fd;
- }
-
- free(ae_name);
- }
-
- return (void *) 0;
-}
-
static void * fmgr_np1_sdu_reader(void * o)
{
struct shm_du_buff * sdb;
@@ -313,7 +253,6 @@ int fmgr_init()
}
}
- pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL);
pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL);
pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL);
@@ -325,11 +264,9 @@ int fmgr_fini()
int i;
int j;
- pthread_cancel(fmgr.nm1_flow_acceptor);
pthread_cancel(fmgr.np1_sdu_reader);
pthread_cancel(fmgr.nm1_sdu_reader);
- pthread_join(fmgr.nm1_flow_acceptor, NULL);
pthread_join(fmgr.np1_sdu_reader, NULL);
pthread_join(fmgr.nm1_sdu_reader, NULL);
@@ -360,16 +297,6 @@ int fmgr_np1_alloc(int fd,
uint8_t * ro_data;
uint64_t addr;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("IPCP is not enrolled yet.");
- return -1; /* -ENOTINIT */
- }
-
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
path = pathname_create(RO_DIR);
if (path == NULL)
return -1;
@@ -605,30 +532,21 @@ int fmgr_np1_post_sdu(cep_id_t cep_id, struct shm_du_buff * sdb)
return 0;
}
-int fmgr_nm1_mgmt_flow(char * dst_name)
+/* FIXME: do this in a topologymanager instance */
+int fmgr_nm1_add_flow(int fd)
{
- int fd;
- int result;
+ qoscube_t qos;
- /* FIXME: Request retransmission. */
- fd = flow_alloc(dst_name, MGMT_AE, NULL);
- if (fd < 0) {
- LOG_ERR("Failed to allocate flow to %s.", dst_name);
+ if (flow_alloc_resp(fd, 0) < 0) {
+ LOG_ERR("Could not respond to new flow.");
return -1;
}
- result = flow_alloc_res(fd);
- if (result < 0) {
- LOG_ERR("Result of flow allocation to %s is %d.",
- dst_name, result);
- return -1;
- }
+ ipcp_flow_get_qoscube(fd, &qos);
+ flow_set_add(fmgr.nm1_set[qos], fd);
- if (ribmgr_add_flow(fd)) {
- LOG_ERR("Failed to hand file descriptor to RIB manager.");
- flow_dealloc(fd);
- return -1;
- }
+ /* FIXME: Temporary, until we have a PFF */
+ fmgr.fd = fd;
return 0;
}
diff --git a/src/ipcpd/normal/fmgr.h b/src/ipcpd/normal/fmgr.h
index e17f3f55..85731081 100644
--- a/src/ipcpd/normal/fmgr.h
+++ b/src/ipcpd/normal/fmgr.h
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* Flow manager of the IPC Process
*
@@ -24,15 +24,9 @@
#include <ouroboros/shared.h>
-#include <unistd.h>
-#include <stdint.h>
-#include <sys/types.h>
-
+#include "ae.h"
#include "frct.h"
-#define MGMT_AE "Management"
-#define DT_AE "Data transfer"
-
int fmgr_init(void);
int fmgr_fini(void);
@@ -53,7 +47,7 @@ int fmgr_np1_post_buf(cep_id_t id,
int fmgr_np1_post_sdu(cep_id_t id,
struct shm_du_buff * sdb);
-int fmgr_nm1_mgmt_flow(char * dst_name);
+int fmgr_nm1_add_flow(int fd);
int fmgr_nm1_dt_flow(char * dst_name,
qoscube_t qos);
diff --git a/src/ipcpd/normal/frct.c b/src/ipcpd/normal/frct.c
index 33bd044b..6cd68f18 100644
--- a/src/ipcpd/normal/frct.c
+++ b/src/ipcpd/normal/frct.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* The Flow and Retransmission control component
*
@@ -40,21 +40,21 @@ enum conn_state {
};
struct frct_i {
- uint32_t cep_id;
- uint64_t r_address;
- uint32_t r_cep_id;
+ uint32_t cep_id;
+ uint64_t r_address;
+ uint32_t r_cep_id;
qoscube_t cube;
- uint64_t seqno;
+ uint64_t seqno;
enum conn_state state;
};
struct {
- pthread_mutex_t instances_lock;
struct frct_i ** instances;
+ pthread_mutex_t instances_lock;
- struct bmp * cep_ids;
- pthread_mutex_t cep_ids_lock;
+ struct bmp * cep_ids;
+ pthread_mutex_t cep_ids_lock;
} frct;
static cep_id_t next_cep_id(void)
@@ -62,9 +62,11 @@ static cep_id_t next_cep_id(void)
cep_id_t ret;
pthread_mutex_lock(&frct.cep_ids_lock);
+
ret = bmp_allocate(frct.cep_ids);
if (!bmp_is_id_valid(frct.cep_ids, ret))
ret = INVALID_CEP_ID;
+
pthread_mutex_unlock(&frct.cep_ids_lock);
return ret;
@@ -75,51 +77,48 @@ static int release_cep_id(cep_id_t id)
int ret;
pthread_mutex_lock(&frct.cep_ids_lock);
+
ret = bmp_release(frct.cep_ids, id);
+
pthread_mutex_unlock(&frct.cep_ids_lock);
return ret;
}
-int frct_init()
+static int init_cep_ids(void)
{
- int i;
-
if (pthread_mutex_init(&frct.cep_ids_lock, NULL))
return -1;
- if (pthread_mutex_init(&frct.instances_lock, NULL))
- return -1;
-
- frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS);
- if (frct.instances == NULL)
- return -1;
-
- for (i = 0; i < IRMD_MAX_FLOWS; i++)
- frct.instances[i] = NULL;
-
frct.cep_ids = bmp_create(IRMD_MAX_FLOWS, (INVALID_CEP_ID + 1));
if (frct.cep_ids == NULL) {
- free(frct.instances);
+ pthread_mutex_destroy(&frct.cep_ids_lock);
return -1;
}
return 0;
}
-int frct_fini()
+static int init_instances(void)
{
- pthread_mutex_lock(&frct.cep_ids_lock);
- bmp_destroy(frct.cep_ids);
- pthread_mutex_unlock(&frct.cep_ids_lock);
+ int i;
- free(frct.instances);
+ if (pthread_mutex_init(&frct.instances_lock, NULL))
+ return -1;
+
+ frct.instances = malloc(sizeof(*(frct.instances)) * IRMD_MAX_FLOWS);
+ if (frct.instances == NULL) {
+ pthread_mutex_destroy(&frct.instances_lock);
+ return -1;
+ }
+
+ for (i = 0; i < IRMD_MAX_FLOWS; i++)
+ frct.instances[i] = NULL;
return 0;
}
-static struct frct_i * create_frct_i(uint64_t address,
- cep_id_t r_cep_id)
+static struct frct_i * create_frct_i(uint64_t address, cep_id_t r_cep_id)
{
struct frct_i * instance;
cep_id_t id;
@@ -145,6 +144,60 @@ static struct frct_i * create_frct_i(uint64_t address,
return instance;
}
+static void destroy_frct_i(struct frct_i * instance)
+{
+ free(instance);
+}
+
+static void fini_cep_ids(void)
+{
+ pthread_mutex_lock(&frct.cep_ids_lock);
+
+ bmp_destroy(frct.cep_ids);
+
+ pthread_mutex_unlock(&frct.cep_ids_lock);
+
+ pthread_mutex_destroy(&frct.cep_ids_lock);
+}
+
+static void fini_instances(void)
+{
+ int i;
+
+ pthread_mutex_lock(&frct.instances_lock);
+
+ for (i = 0; i < IRMD_MAX_FLOWS; i++)
+ if (frct.instances[i] != NULL)
+ destroy_frct_i(frct.instances[i]);
+
+ pthread_mutex_unlock(&frct.instances_lock);
+
+ pthread_mutex_destroy(&frct.instances_lock);
+
+ free(frct.instances);
+}
+
+int frct_init()
+{
+ if (init_cep_ids())
+ return -1;
+
+ if (init_instances()) {
+ fini_cep_ids();
+ return -1;
+ }
+
+ return 0;
+}
+
+int frct_fini()
+{
+ fini_cep_ids();
+ fini_instances();
+
+ return 0;
+}
+
int frct_nm1_post_sdu(struct pci * pci,
struct shm_du_buff * sdb)
{
@@ -209,14 +262,6 @@ int frct_nm1_post_sdu(struct pci * pci,
return 0;
}
-/* Call under instances lock */
-static void destroy_frct_i(struct frct_i * instance)
-{
- release_cep_id(instance->cep_id);
- frct.instances[instance->cep_id] = NULL;
- free(instance);
-}
-
cep_id_t frct_i_create(uint64_t address,
buffer_t * buf,
qoscube_t cube)
@@ -328,7 +373,11 @@ int frct_i_destroy(cep_id_t id,
pci.seqno = 0;
pci.qos_id = instance->cube;
+ frct.instances[id] = NULL;
destroy_frct_i(instance);
+
+ release_cep_id(instance->cep_id);
+
pthread_mutex_unlock(&frct.instances_lock);
if (buf != NULL && buf->data != NULL)
diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c
index 34ba52da..f6a88f29 100644
--- a/src/ipcpd/normal/main.c
+++ b/src/ipcpd/normal/main.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* Normal IPC Process
*
@@ -59,7 +59,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(IPCP_SHUTDOWN);
+ if (ipcp_get_state() == IPCP_INIT)
+ ipcp_set_state(IPCP_NULL);
+
+ if (ipcp_get_state() == IPCP_OPERATIONAL)
+ ipcp_set_state(IPCP_SHUTDOWN);
pthread_rwlock_unlock(&ipcpi.state_lock);
}
@@ -68,12 +72,11 @@ void ipcp_sig_handler(int sig, siginfo_t * info, void * c)
}
}
-static int normal_ipcp_enroll(char * dif_name)
+static int normal_ipcp_enroll(char * dst_name)
{
- struct timespec timeout = {(ENROLL_TIMEOUT / 1000),
- (ENROLL_TIMEOUT % 1000) * MILLION};
+ int ret;
- pthread_rwlock_rdlock(&ipcpi.state_lock);
+ pthread_rwlock_wrlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_INIT) {
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -81,37 +84,62 @@ static int normal_ipcp_enroll(char * dif_name)
return -1; /* -ENOTINIT */
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
- if (fmgr_nm1_mgmt_flow(dif_name)) {
- LOG_ERR("Failed to establish management flow.");
+ if (ribmgr_init()) {
+ LOG_ERR("Failed to initialise RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- if (ribmgr_enrol()) {
- LOG_ERR("Failed to enrol IPCP.");
+ if (ribmgr_nm1_mgt_flow(dst_name)) {
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ LOG_ERR("Failed to establish management flow.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
return -1;
}
- if (ipcp_wait_state(IPCP_BOOTING, &timeout) == -ETIMEDOUT) {
- LOG_ERR("Enrollment timed out.");
+ ret = ribmgr_enrol();
+ if (ret < 0) {
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ if (ret == -ETIMEDOUT)
+ LOG_ERR("Enrollment timed out.");
+ else
+ LOG_ERR("Failed to enrol IPCP: %d.", ret);
return -1;
}
if (ribmgr_start_policies()) {
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start policies.");
return -1;
}
- pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (fmgr_init()) {
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Failed to start flow manager.");
+ return -1;
+ }
+
+ if (frct_init()) {
+ if (fmgr_fini())
+ LOG_WARN("Failed to finalize flow manager.");
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Failed to initialize FRCT.");
+ return -1;
+ }
+
ipcp_set_state(IPCP_OPERATIONAL);
+
pthread_rwlock_unlock(&ipcpi.state_lock);
/* FIXME: Remove once we obtain neighbors during enrollment */
- if (fmgr_nm1_dt_flow(dif_name, QOS_CUBE_BE)) {
+ if (fmgr_nm1_dt_flow(dst_name, QOS_CUBE_BE)) {
LOG_ERR("Failed to establish data transfer flow.");
return -1;
}
@@ -121,6 +149,11 @@ static int normal_ipcp_enroll(char * dif_name)
static int normal_ipcp_bootstrap(struct dif_config * conf)
{
+ if (conf == NULL || conf->type != THIS_TYPE) {
+ LOG_ERR("Bad DIF configuration.");
+ return -EINVAL;
+ }
+
pthread_rwlock_wrlock(&ipcpi.state_lock);
if (ipcp_get_state() != IPCP_INIT) {
@@ -129,26 +162,48 @@ static int normal_ipcp_bootstrap(struct dif_config * conf)
return -1; /* -ENOTINIT */
}
+ if (ribmgr_init()) {
+ LOG_ERR("Failed to initialise RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ return -1;
+ }
+
if (ribmgr_bootstrap(conf)) {
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to bootstrap RIB manager.");
return -1;
}
- ipcp_set_state(IPCP_BOOTING);
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
if (ribmgr_start_policies()) {
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- ipcp_set_state(IPCP_INIT);
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start policies.");
return -1;
}
- pthread_rwlock_wrlock(&ipcpi.state_lock);
+ if (fmgr_init()) {
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Failed to start flow manager.");
+ return -1;
+ }
+
+ if (frct_init()) {
+ if (fmgr_fini())
+ LOG_WARN("Failed to finalize flow manager.");
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_ERR("Failed to initialize FRCT.");
+ return -1;
+ }
ipcp_set_state(IPCP_OPERATIONAL);
+
ipcpi.data->dif_name = conf->dif_name;
pthread_rwlock_unlock(&ipcpi.state_lock);
@@ -169,10 +224,56 @@ static struct ipcp_ops normal_ops = {
.ipcp_flow_dealloc = fmgr_np1_dealloc
};
+static void * flow_acceptor(void * o)
+{
+ int fd;
+ char * ae_name;
+ qosspec_t qs;
+
+ (void) o;
+
+ while (true) {
+ pthread_rwlock_rdlock(&ipcpi.state_lock);
+
+ if (ipcp_get_state() != IPCP_OPERATIONAL) {
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+ LOG_INFO("Shutting down flow acceptor.");
+ return 0;
+ }
+
+ pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ fd = flow_accept(&ae_name, &qs);
+ if (fd < 0) {
+ LOG_WARN("Flow accept failed.");
+ continue;
+ }
+
+ LOG_DBG("New flow allocation request for AE %s.", ae_name);
+
+ if (strcmp(ae_name, MGMT_AE) == 0) {
+ ribmgr_add_nm1_flow(fd);
+ } else if (strcmp(ae_name, DT_AE) == 0) {
+ fmgr_nm1_add_flow(fd);
+ } else {
+ LOG_DBG("Flow allocation request for unknown AE %s.",
+ ae_name);
+ if (flow_alloc_resp(fd, -1))
+ LOG_WARN("Failed to reply to flow allocation.");
+ flow_dealloc(fd);
+ }
+
+ free(ae_name);
+ }
+
+ return (void *) 0;
+}
+
int main(int argc, char * argv[])
{
struct sigaction sig_act;
sigset_t sigset;
+ pthread_t acceptor;
if (ap_init(argv[0])) {
LOG_ERR("Failed to init AP");
@@ -215,16 +316,8 @@ int main(int argc, char * argv[])
pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
- if (ribmgr_init()) {
- fmgr_fini();
- ipcp_fini();
- close_logfile();
- exit(EXIT_FAILURE);
- }
-
if (ipcp_create_r(getpid())) {
LOG_ERR("Failed to notify IRMd we are initialized.");
- fmgr_fini();
ipcp_fini();
close_logfile();
exit(EXIT_FAILURE);
@@ -232,22 +325,24 @@ int main(int argc, char * argv[])
ipcp_wait_state(IPCP_OPERATIONAL, NULL);
- if (fmgr_init()) {
- ipcp_fini();
- close_logfile();
- exit(EXIT_FAILURE);
+ if (pthread_create(&acceptor, NULL, flow_acceptor, NULL)) {
+ LOG_ERR("Failed to create acceptor thread.");
+ ipcp_set_state(IPCP_SHUTDOWN);
}
ipcp_fini();
- if (fmgr_fini())
- LOG_ERR("Failed to finalize flow manager.");
+ if (ipcp_get_state() == IPCP_SHUTDOWN) {
+ pthread_cancel(acceptor);
+ pthread_join(acceptor, NULL);
- if (ribmgr_fini())
- LOG_ERR("Failed to finalize RIB manager.");
-
- if (frct_fini())
- LOG_ERR("Failed to finalize FRCT.");
+ if (frct_fini())
+ LOG_WARN("Failed to finalize FRCT.");
+ if (fmgr_fini())
+ LOG_WARN("Failed to finalize flow manager.");
+ if (ribmgr_fini())
+ LOG_WARN("Failed to finalize RIB manager.");
+ }
close_logfile();
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 3b4a5784..ab492e7f 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* RIB manager of the IPC Process
*
@@ -29,21 +29,21 @@
#include <ouroboros/ipcp-dev.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/errno.h>
-
-#include <stdlib.h>
-#include <pthread.h>
-#include <string.h>
-#include <errno.h>
+#include <ouroboros/dev.h>
#include "timerwheel.h"
#include "addr_auth.h"
#include "ribmgr.h"
#include "dt_const.h"
-#include "frct.h"
-#include "ipcp.h"
#include "ro.h"
#include "pathname.h"
#include "dir.h"
+#include "ae.h"
+
+#include <stdlib.h>
+#include <pthread.h>
+#include <string.h>
+#include <errno.h>
#include "static_info.pb-c.h"
typedef StaticInfoMsg static_info_msg_t;
@@ -73,7 +73,7 @@ struct rnode {
* as an index in a B-tree
*/
- /* If there are no children, this is a leaf */
+ /* If there are no children, this is a leaf. */
struct rnode * child;
struct rnode * sibling;
@@ -107,6 +107,14 @@ struct ro_id {
char * full_name;
};
+enum ribmgr_state {
+ RIBMGR_NULL,
+ RIBMGR_INIT,
+ RIBMGR_OPERATIONAL,
+ RIBMGR_SHUTDOWN
+};
+
+/* FIXME: Extract rib from ribmgr. */
struct {
struct rnode * root;
pthread_mutex_t ro_lock;
@@ -130,6 +138,10 @@ struct {
struct addr_auth * addr_auth;
enum pol_addr_auth addr_auth_type;
+
+ enum ribmgr_state state;
+ pthread_cond_t state_cond;
+ pthread_mutex_t state_lock;
} rib;
void ribmgr_ro_created(const char * name,
@@ -138,15 +150,11 @@ void ribmgr_ro_created(const char * name,
{
static_info_msg_t * stat_msg;
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_CONFIG &&
- strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) {
+ if (strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) {
LOG_DBG("Received static DIF information.");
stat_msg = static_info_msg__unpack(NULL, len, data);
if (stat_msg == NULL) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to unpack static info message.");
return;
}
@@ -161,17 +169,8 @@ void ribmgr_ro_created(const char * name,
rib.dtc.max_pdu_size = stat_msg->max_pdu_size;
rib.addr_auth_type = stat_msg->addr_auth_type;
- if (frct_init()) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- static_info_msg__free_unpacked(stat_msg, NULL);
- LOG_ERR("Failed to init FRCT");
- return;
- }
-
static_info_msg__free_unpacked(stat_msg, NULL);
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
}
/* We only have a create operation for now. */
@@ -185,7 +184,6 @@ static struct rnode * find_rnode_by_name(const char * name)
{
char * str;
char * str1;
- char * saveptr;
char * token;
struct rnode * node;
@@ -195,8 +193,8 @@ static struct rnode * find_rnode_by_name(const char * name)
node = rib.root;
- for (str1 = str; ; str1 = NULL) {
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
+ for (str1 = str; node != NULL; str1 = NULL) {
+ token = strtok(str1, PATH_DELIMITER);
if (token == NULL)
break;
@@ -207,18 +205,13 @@ static struct rnode * find_rnode_by_name(const char * name)
break;
else
node = node->sibling;
-
- if (node == NULL) {
- free(str);
- return NULL;
- }
}
free(str);
return node;
}
-/* Call under RIB object lock */
+/* Call under RIB object lock. */
static int ro_msg_create(struct rnode * node,
ro_msg_t * msg)
{
@@ -334,7 +327,7 @@ static struct rnode * ribmgr_ro_create(const char * name,
node = rib.root;
- for (str1 = str; ; str1 = NULL) {
+ for (str1 = str; node != NULL; str1 = NULL) {
token = strtok_r(str1, PATH_DELIMITER, &saveptr);
if (token == NULL) {
LOG_ERR("RO already exists.");
@@ -356,9 +349,6 @@ static struct rnode * ribmgr_ro_create(const char * name,
sibling = true;
}
}
-
- if (node == NULL)
- break;
}
token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr);
@@ -554,9 +544,88 @@ int ribmgr_init()
return -1;
}
+ if (pthread_cond_init(&rib.state_cond, NULL)) {
+ LOG_ERR("Failed to init condvar.");
+ timerwheel_destroy(rib.wheel);
+ bmp_destroy(rib.sids);
+ pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.ro_lock);
+ pthread_mutex_destroy(&rib.subs_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
+ free(rib.root);
+ return -1;
+ }
+
+ if (pthread_mutex_init(&rib.state_lock, NULL)) {
+ LOG_ERR("Failed to init mutex.");
+ pthread_cond_destroy(&rib.state_cond);
+ timerwheel_destroy(rib.wheel);
+ bmp_destroy(rib.sids);
+ pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.ro_lock);
+ pthread_mutex_destroy(&rib.subs_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
+ free(rib.root);
+ return -1;
+ }
+
+ rib.state = RIBMGR_INIT;
+
return 0;
}
+static enum ribmgr_state ribmgr_get_state(void)
+{
+ enum ribmgr_state state;
+
+ pthread_mutex_lock(&rib.state_lock);
+
+ state = rib.state;
+
+ pthread_mutex_unlock(&rib.state_lock);
+
+ return state;
+}
+
+static void ribmgr_set_state(enum ribmgr_state state)
+{
+ pthread_mutex_lock(&rib.state_lock);
+
+ rib.state = state;
+
+ pthread_cond_broadcast(&rib.state_cond);
+
+ pthread_mutex_unlock(&rib.state_lock);
+}
+
+static int ribmgr_wait_state(enum ribmgr_state state,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret = 0;
+
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+
+ pthread_mutex_lock(&rib.state_lock);
+
+ while (rib.state != state
+ && rib.state != RIBMGR_SHUTDOWN
+ && rib.state != RIBMGR_NULL) {
+ if (timeout == NULL)
+ ret = -pthread_cond_wait(&rib.state_cond,
+ &rib.state_lock);
+ else
+ ret = -pthread_cond_timedwait(&rib.state_cond,
+ &rib.state_lock,
+ &abstime);
+ }
+
+ pthread_mutex_unlock(&rib.state_lock);
+
+ return ret;
+}
+
static void rtree_destroy(struct rnode * node)
{
if (node != NULL) {
@@ -574,7 +643,13 @@ int ribmgr_fini()
struct list_head * pos = NULL;
struct list_head * n = NULL;
+ pthread_mutex_lock(&rib.state_lock);
+ rib.state = RIBMGR_SHUTDOWN;
+ pthread_cond_broadcast(&rib.state_cond);
+ pthread_mutex_unlock(&rib.state_lock);
+
pthread_rwlock_wrlock(&rib.flows_lock);
+
list_for_each_safe(pos, n, &rib.flows) {
struct mgmt_flow * flow =
list_entry(pos, struct mgmt_flow, next);
@@ -583,6 +658,7 @@ int ribmgr_fini()
list_del(&flow->next);
free(flow);
}
+
pthread_rwlock_unlock(&rib.flows_lock);
ro_unsubscribe(rib.ribmgr_sid);
@@ -591,8 +667,10 @@ int ribmgr_fini()
addr_auth_destroy(rib.addr_auth);
pthread_mutex_lock(&rib.ro_lock);
+
rtree_destroy(rib.root->child);
free(rib.root);
+
pthread_mutex_unlock(&rib.ro_lock);
bmp_destroy(rib.sids);
@@ -603,6 +681,9 @@ int ribmgr_fini()
pthread_rwlock_destroy(&rib.flows_lock);
pthread_mutex_destroy(&rib.ro_ids_lock);
+ pthread_cond_destroy(&rib.state_cond);
+ pthread_mutex_destroy(&rib.state_lock);
+
return 0;
}
@@ -827,16 +908,7 @@ static int ribmgr_cdap_start(struct cdap * instance,
if (strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("New enrollment request.");
- pthread_rwlock_wrlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_OPERATIONAL) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("IPCP in wrong state.");
- return -1;
- }
-
if (cdap_reply_send(instance, key, 0, NULL, 0)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to enrollment request.");
return -1;
}
@@ -847,7 +919,6 @@ static int ribmgr_cdap_start(struct cdap * instance,
pthread_mutex_lock(&rib.ro_lock);
if (ribmgr_enrol_sync(instance, rib.root->child)) {
pthread_mutex_unlock(&rib.ro_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to sync part of the RIB.");
return -1;
}
@@ -859,18 +930,14 @@ static int ribmgr_cdap_start(struct cdap * instance,
key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,
NULL, 0, 0);
if (key < 0) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send stop of enrollment.");
return -1;
}
if (cdap_reply_wait(instance, key, NULL, NULL)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Remote failed to complete enrollment.");
return -1;
}
-
- pthread_rwlock_unlock(&ipcpi.state_lock);
} else {
LOG_WARN("Request to start unknown operation.");
if (cdap_reply_send(instance, key, -1, NULL, 0))
@@ -884,20 +951,18 @@ static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)
{
int ret = 0;
- pthread_rwlock_wrlock(&ipcpi.state_lock);
- if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {
+ if (strcmp(name, ENROLLMENT) == 0) {
LOG_DBG("Stop enrollment received.");
-
- ipcp_set_state(IPCP_BOOTING);
- } else
+ /* FIXME: don't use states to match start to stop. */
+ ribmgr_set_state(RIBMGR_OPERATIONAL);
+ } else {
ret = -1;
+ }
if (cdap_reply_send(instance, key, ret, NULL, 0)) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to send reply to stop request.");
return -1;
}
- pthread_rwlock_unlock(&ipcpi.state_lock);
return 0;
}
@@ -1068,7 +1133,7 @@ static void * cdap_req_handler(void * o)
return (void *) 0;
}
-int ribmgr_add_flow(int fd)
+static int ribmgr_add_flow(int fd)
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
@@ -1127,6 +1192,46 @@ int ribmgr_remove_flow(int fd)
return -1;
}
+/* FIXME: do this in a topologymanager instance */
+int ribmgr_add_nm1_flow(int fd)
+{
+ if (flow_alloc_resp(fd, 0) < 0) {
+ LOG_ERR("Could not respond to new flow.");
+ return -1;
+ }
+
+ return ribmgr_add_flow(fd);
+}
+
+int ribmgr_nm1_mgt_flow(char * dst_name)
+{
+ int fd;
+ int result;
+
+ /* FIXME: Request retransmission. */
+ fd = flow_alloc(dst_name, MGMT_AE, NULL);
+ if (fd < 0) {
+ LOG_ERR("Failed to allocate flow to %s.", dst_name);
+ return -1;
+ }
+
+ result = flow_alloc_res(fd);
+ if (result < 0) {
+ LOG_ERR("Result of flow allocation to %s is %d.",
+ dst_name, result);
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ if (ribmgr_add_flow(fd)) {
+ LOG_ERR("Failed to add file descriptor.");
+ flow_dealloc(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
int ribmgr_bootstrap(struct dif_config * conf)
{
static_info_msg_t stat_info = STATIC_INFO_MSG__INIT;
@@ -1134,11 +1239,6 @@ int ribmgr_bootstrap(struct dif_config * conf)
size_t len = 0;
struct ro_attr attr;
- if (conf == NULL || conf->type != IPCP_NORMAL) {
- LOG_ERR("Bad DIF configuration.");
- return -EINVAL;
- }
-
ro_attr_init(&attr);
attr.enrol_sync = true;
@@ -1189,83 +1289,54 @@ int ribmgr_bootstrap(struct dif_config * conf)
return -1;
}
- if (frct_init()) {
- LOG_ERR("Failed to initialize FRCT.");
- dir_fini();
- ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);
- ribmgr_ro_delete(RIBMGR_PREFIX);
- return -1;
- }
-
LOG_DBG("Bootstrapped RIB Manager.");
return 0;
}
-int ribmgr_enrol(void)
+int ribmgr_enrol()
{
struct cdap * instance = NULL;
struct mgmt_flow * flow;
cdap_key_t key;
int ret;
-
- pthread_rwlock_wrlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_INIT) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("IPCP in wrong state.");
- return -1;
- }
-
- ipcp_set_state(IPCP_CONFIG);
+ struct timespec timeout = {(ENROLL_TIMEOUT / 1000),
+ (ENROLL_TIMEOUT % 1000) * MILLION};
pthread_rwlock_wrlock(&rib.flows_lock);
- if (list_empty(&rib.flows)) {
- ipcp_set_state(IPCP_INIT);
- pthread_rwlock_unlock(&rib.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("No flows in RIB.");
- return -1;
- }
+
+ assert(!list_empty(&rib.flows));
flow = list_first_entry((&rib.flows), struct mgmt_flow, next);
instance = flow->instance;
key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0);
if (key < 0) {
- ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&rib.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to start enrollment.");
return -1;
}
ret = cdap_reply_wait(instance, key, NULL, NULL);
if (ret) {
- ipcp_set_state(IPCP_INIT);
pthread_rwlock_unlock(&rib.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
LOG_ERR("Failed to enroll: %d.", ret);
return -1;
}
pthread_rwlock_unlock(&rib.flows_lock);
- pthread_rwlock_unlock(&ipcpi.state_lock);
+
+ if (ribmgr_wait_state(RIBMGR_OPERATIONAL, &timeout) == -ETIMEDOUT)
+ LOG_ERR("Enrollment of RIB timed out.");
+
+ if (ribmgr_get_state() != RIBMGR_OPERATIONAL)
+ return -1;
return 0;
}
int ribmgr_start_policies(void)
{
- pthread_rwlock_rdlock(&ipcpi.state_lock);
-
- if (ipcp_get_state() != IPCP_BOOTING) {
- pthread_rwlock_unlock(&ipcpi.state_lock);
- LOG_ERR("Cannot start policies in wrong state.");
- return -1;
- }
- pthread_rwlock_unlock(&ipcpi.state_lock);
-
rib.addr_auth = addr_auth_create(rib.addr_auth_type);
if (rib.addr_auth == NULL) {
LOG_ERR("Failed to create address authority.");
diff --git a/src/ipcpd/normal/ribmgr.h b/src/ipcpd/normal/ribmgr.h
index 954b7a3c..ddc98e6e 100644
--- a/src/ipcpd/normal/ribmgr.h
+++ b/src/ipcpd/normal/ribmgr.h
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* RIB manager of the IPC Process
*
@@ -31,9 +31,9 @@ int ribmgr_init(void);
int ribmgr_fini(void);
-int ribmgr_add_flow(int fd);
+int ribmgr_add_nm1_flow(int fd);
-int ribmgr_remove_flow(int fd);
+int ribmgr_nm1_mgt_flow(char * dst_name);
int ribmgr_bootstrap(struct dif_config * conf);
diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h
index 43e432c3..6fda2adf 100644
--- a/src/ipcpd/normal/ro.h
+++ b/src/ipcpd/normal/ro.h
@@ -1,5 +1,5 @@
/*
- * Ouroboros - Copyright (C) 2016
+ * Ouroboros - Copyright (C) 2016 - 2017
*
* RIB objects
*
@@ -22,6 +22,10 @@
#ifndef OUROBOROS_IPCPD_NORMAL_RO_H
#define OUROBOROS_IPCPD_NORMAL_RO_H
+#include <stdbool.h>
+#include <time.h>
+#include <stdint.h>
+
enum ro_recv_set {
NO_SYNC = 0,
NEIGHBORS,