diff options
author | Dimitri Staessens <[email protected]> | 2023-03-18 21:02:10 +0100 |
---|---|---|
committer | Sander Vrijders <[email protected]> | 2023-03-21 18:12:28 +0100 |
commit | 54156a3d9a2a7f87591e5efd37a8fe6f708b933f (patch) | |
tree | 8df5ecafc1fd199adb05ea8f4573103c38c63848 /src/irmd/reg | |
parent | 9ce5f6b0b1ae281bfa82df31a92026d946366286 (diff) | |
download | ouroboros-54156a3d9a2a7f87591e5efd37a8fe6f708b933f.tar.gz ouroboros-54156a3d9a2a7f87591e5efd37a8fe6f708b933f.zip |
irmd: Move registry objects to their own sources
Rename internal data structures so it's clear that they are the IRMd
representation of these objects for management purposes.
Split functionality for these objects off and and move them to their
own source files.
Rename internal functions of the IRMd to reflect this, with some small
refactoring.
Signed-off-by: Dimitri Staessens <[email protected]>
Signed-off-by: Sander Vrijders <[email protected]>
Diffstat (limited to 'src/irmd/reg')
-rw-r--r-- | src/irmd/reg/flow.c | 222 | ||||
-rw-r--r-- | src/irmd/reg/flow.h | 83 | ||||
-rw-r--r-- | src/irmd/reg/ipcp.c | 152 | ||||
-rw-r--r-- | src/irmd/reg/ipcp.h | 58 | ||||
-rw-r--r-- | src/irmd/reg/name.c | 451 | ||||
-rw-r--r-- | src/irmd/reg/name.h | 103 | ||||
-rw-r--r-- | src/irmd/reg/proc.c | 265 | ||||
-rw-r--r-- | src/irmd/reg/proc.h | 75 | ||||
-rw-r--r-- | src/irmd/reg/prog.c | 174 | ||||
-rw-r--r-- | src/irmd/reg/prog.h | 52 |
10 files changed, 1635 insertions, 0 deletions
diff --git a/src/irmd/reg/flow.c b/src/irmd/reg/flow.c new file mode 100644 index 00000000..30b9c504 --- /dev/null +++ b/src/irmd/reg/flow.c @@ -0,0 +1,222 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200112L + +#include "config.h" + +#define OUROBOROS_PREFIX "reg-flow" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "flow.h" + +#include <assert.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_flow * reg_flow_create(pid_t n_pid, + pid_t n_1_pid, + int flow_id, + qosspec_t qs) +{ + pthread_condattr_t cattr; + struct reg_flow * f; + + f = malloc(sizeof(*f)); + if (f == NULL) + goto fail_malloc; + + memset(f, 0, sizeof(*f)); + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&f->cond, &cattr)) + goto fail_cond; + + if (pthread_mutex_init(&f->mtx, NULL)) + goto fail_mutex; + + f->n_rb = shm_rbuff_create(n_pid, flow_id); + if (f->n_rb == NULL) { + log_err("Could not create ringbuffer for process %d.", n_pid); + goto fail_n_rbuff; + } + + f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); + if (f->n_1_rb == NULL) { + log_err("Could not create ringbuffer for process %d.", n_1_pid); + goto fail_n_1_rbuff; + } + + if (clock_gettime(CLOCK_MONOTONIC, &f->t0) < 0) + log_warn("Failed to set timestamp."); + + pthread_condattr_destroy(&cattr); + + f->n_pid = n_pid; + f->n_1_pid = n_1_pid; + f->flow_id = flow_id; + f->qs = qs; + + f->state = FLOW_ALLOC_PENDING; + + return f; + + fail_n_1_rbuff: + shm_rbuff_destroy(f->n_rb); + fail_n_rbuff: + pthread_mutex_destroy(&f->mtx); + fail_mutex: + pthread_cond_destroy(&f->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + free(f); + fail_malloc: + return NULL; +} + +static void cancel_irm_destroy(void * o) +{ + struct reg_flow * f = (struct reg_flow *) o; + + pthread_mutex_unlock(&f->mtx); + + pthread_cond_destroy(&f->cond); + pthread_mutex_destroy(&f->mtx); + + shm_rbuff_destroy(f->n_rb); + shm_rbuff_destroy(f->n_1_rb); + + free(f); +} + +void reg_flow_destroy(struct reg_flow * f) +{ + assert(f); + + pthread_mutex_lock(&f->mtx); + + assert(f->data.len == 0); + + if (f->state == FLOW_DESTROY) { + pthread_mutex_unlock(&f->mtx); + return; + } + + if (f->state == FLOW_ALLOC_PENDING) + f->state = FLOW_DESTROY; + else + f->state = FLOW_NULL; + + pthread_cond_broadcast(&f->cond); + + pthread_cleanup_push(cancel_irm_destroy, f); + + while (f->state != FLOW_NULL) + pthread_cond_wait(&f->cond, &f->mtx); + + pthread_cleanup_pop(true); +} + +enum flow_state reg_flow_get_state(struct reg_flow * f) +{ + enum flow_state state; + + assert(f); + + pthread_mutex_lock(&f->mtx); + + state = f->state; + + pthread_mutex_unlock(&f->mtx); + + return state; +} + +void reg_flow_set_state(struct reg_flow * f, + enum flow_state state) +{ + assert(f); + assert(state != FLOW_DESTROY); + + pthread_mutex_lock(&f->mtx); + + f->state = state; + pthread_cond_broadcast(&f->cond); + + pthread_mutex_unlock(&f->mtx); +} + +int reg_flow_wait_state(struct reg_flow * f, + enum flow_state state, + struct timespec * dl) +{ + int ret = 0; + int s; + + assert(f); + assert(state != FLOW_NULL); + assert(state != FLOW_DESTROY); + assert(state != FLOW_DEALLOC_PENDING); + + pthread_mutex_lock(&f->mtx); + + assert(f->state != FLOW_NULL); + + pthread_cleanup_push(__cleanup_mutex_unlock, &f->mtx); + + while (!(f->state == state || + f->state == FLOW_DESTROY || + f->state == FLOW_DEALLOC_PENDING) && + ret != -ETIMEDOUT) { + if (dl != NULL) + ret = -pthread_cond_timedwait(&f->cond, + &f->mtx, + dl); + else + ret = -pthread_cond_wait(&f->cond, + &f->mtx); + } + + if (f->state == FLOW_DESTROY || + f->state == FLOW_DEALLOC_PENDING || + ret == -ETIMEDOUT) { + f->state = FLOW_NULL; + pthread_cond_broadcast(&f->cond); + } + + s = f->state; + + pthread_cleanup_pop(true); + + return ret ? ret : s; +} diff --git a/src/irmd/reg/flow.h b/src/irmd/reg/flow.h new file mode 100644 index 00000000..9af15032 --- /dev/null +++ b/src/irmd/reg/flow.h @@ -0,0 +1,83 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Flows + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_FLOW_H +#define OUROBOROS_IRMD_REG_FLOW_H + +#include <ouroboros/list.h> +#include <ouroboros/qos.h> +#include <ouroboros/shm_rbuff.h> +#include <ouroboros/utils.h> + +#include <sys/types.h> +#include <pthread.h> +#include <time.h> + +enum flow_state { + FLOW_NULL = 0, + FLOW_ALLOC_PENDING, + FLOW_ALLOC_REQ_PENDING, + FLOW_ALLOCATED, + FLOW_DEALLOC_PENDING, + FLOW_DESTROY +}; + +struct reg_flow { + struct list_head next; + + int flow_id; + + pid_t n_pid; + pid_t n_1_pid; + + qosspec_t qs; + time_t mpl; + buffer_t data; + + struct shm_rbuff * n_rb; + struct shm_rbuff * n_1_rb; + + struct timespec t0; + + enum flow_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_flow * reg_flow_create(pid_t n_pid, + pid_t n_1_pid, + int flow_id, + qosspec_t qs); + +void reg_flow_destroy(struct reg_flow * f); + +enum flow_state reg_flow_get_state(struct reg_flow * f); + + +void reg_flow_set_state(struct reg_flow * f, + enum flow_state state); + +int reg_flow_wait_state(struct reg_flow * f, + enum flow_state state, + struct timespec * timeo); + +#endif /* OUROBOROS_IRMD_REG_FLOW_H */ diff --git a/src/irmd/reg/ipcp.c b/src/irmd/reg/ipcp.c new file mode 100644 index 00000000..62505871 --- /dev/null +++ b/src/irmd/reg/ipcp.c @@ -0,0 +1,152 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/pthread.h> +#include <ouroboros/time_utils.h> + +#include "ipcp.h" + +#include <assert.h> +#include <signal.h> +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> + +struct reg_ipcp * reg_ipcp_create(const char * name, + enum ipcp_type type) +{ + struct reg_ipcp * ipcp; + pthread_condattr_t cattr; + + ipcp = malloc(sizeof(*ipcp)); + if (ipcp == NULL) + goto fail_malloc; + + if (pthread_mutex_init(&ipcp->mtx, NULL)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&ipcp->cond, &cattr)) + goto fail_cond; + + ipcp->name = strdup(name); + if (ipcp->name == NULL) + goto fail_name; + + pthread_condattr_destroy(&cattr); + + ipcp->layer = NULL; + ipcp->type = type; + ipcp->state = IPCP_BOOT; + + list_head_init(&ipcp->next); + + return ipcp; + + fail_name: + pthread_cond_destroy(&ipcp->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&ipcp->mtx); + fail_mutex: + free(ipcp); + fail_malloc: + return NULL; +} + +void reg_ipcp_destroy(struct reg_ipcp * ipcp) +{ + assert(ipcp); + + pthread_mutex_lock(&ipcp->mtx); + + while (ipcp->state == IPCP_BOOT) + pthread_cond_wait(&ipcp->cond, &ipcp->mtx); + + free(ipcp->layer); + free(ipcp->name); + + pthread_mutex_unlock(&ipcp->mtx); + + pthread_cond_destroy(&ipcp->cond); + pthread_mutex_destroy(&ipcp->mtx); + + free(ipcp); +} + +void reg_ipcp_set_state(struct reg_ipcp * ipcp, + enum ipcp_state state) +{ + pthread_mutex_lock(&ipcp->mtx); + + ipcp->state = state; + pthread_cond_broadcast(&ipcp->cond); + + pthread_mutex_unlock(&ipcp->mtx); +} + +int reg_ipcp_wait_boot(struct reg_ipcp * ipcp) +{ + int ret = 0; + struct timespec dl; + struct timespec to = {SOCKET_TIMEOUT / 1000, + (SOCKET_TIMEOUT % 1000) * MILLION}; + + clock_gettime(PTHREAD_COND_CLOCK, &dl); + ts_add(&dl, &to, &dl); + + pthread_mutex_lock(&ipcp->mtx); + + while (ipcp->state == IPCP_BOOT && ret != ETIMEDOUT) + ret = pthread_cond_timedwait(&ipcp->cond, &ipcp->mtx, &dl); + + if (ret == ETIMEDOUT) { + kill(ipcp->pid, SIGTERM); + ipcp->state = IPCP_NULL; + pthread_cond_signal(&ipcp->cond); + } + + if (ipcp->state != IPCP_LIVE) { + pthread_mutex_unlock(&ipcp->mtx); + return -1; + } + + pthread_mutex_unlock(&ipcp->mtx); + + return 0; +} diff --git a/src/irmd/reg/ipcp.h b/src/irmd/reg/ipcp.h new file mode 100644 index 00000000..8ad334cf --- /dev/null +++ b/src/irmd/reg/ipcp.h @@ -0,0 +1,58 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - IPCPs + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_IPCP_H +#define OUROBOROS_IRMD_REG_IPCP_H + +#include <ouroboros/list.h> + +enum ipcp_state { + IPCP_NULL = 0, + IPCP_BOOT, + IPCP_LIVE +}; + +struct reg_ipcp { + struct list_head next; + + char * name; + pid_t pid; + enum ipcp_type type; + enum hash_algo dir_hash_algo; + char * layer; + + enum ipcp_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_ipcp * reg_ipcp_create(const char * name, + enum ipcp_type type); + +void reg_ipcp_destroy(struct reg_ipcp * i); + +void reg_ipcp_set_state(struct reg_ipcp * i, + enum ipcp_state state); + +int reg_ipcp_wait_boot(struct reg_ipcp * i); + +#endif /* OUROBOROS_IRMD_REG_IPCP_H */
\ No newline at end of file diff --git a/src/irmd/reg/name.c b/src/irmd/reg/name.c new file mode 100644 index 00000000..7e13e888 --- /dev/null +++ b/src/irmd/reg/name.c @@ -0,0 +1,451 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include "config.h" + +#define OUROBOROS_PREFIX "reg_name" + +#include <ouroboros/errno.h> +#include <ouroboros/logs.h> +#include <ouroboros/time_utils.h> +#include <ouroboros/pthread.h> + +#include "name.h" +#include "utils.h" + +#include <stdlib.h> +#include <stdbool.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_name * reg_name_create(const char * name, + enum pol_balance lb) +{ + pthread_condattr_t cattr; + struct reg_name * n; + + assert(name != NULL); + + n = malloc(sizeof(*n)); + if (n == NULL) + goto fail_malloc; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&n->cond, &cattr)) + goto fail_cond; + + if (pthread_mutex_init(&n->mtx, NULL)) + goto fail_mutex; + + n->name = strdup(name); + if (n->name == NULL) + goto fail_name; + + pthread_condattr_destroy(&cattr); + + list_head_init(&n->next); + list_head_init(&n->reg_progs); + list_head_init(&n->reg_pids); + + n->pol_lb = lb; + n->state = NAME_IDLE; + + return n; + + fail_name: + pthread_mutex_destroy(&n->mtx); + fail_mutex: + pthread_cond_destroy(&n->cond); + fail_cond: + pthread_condattr_destroy(&cattr); + fail_cattr: + free(n); + fail_malloc: + return NULL; +} + +static void cancel_reg_name_destroy(void * o) +{ + struct reg_name * name; + struct list_head * p; + struct list_head * h; + + name = (struct reg_name *) o; + + pthread_mutex_unlock(&name->mtx); + + pthread_cond_destroy(&name->cond); + pthread_mutex_destroy(&name->mtx); + + if (name->name != NULL) + free(name->name); + + list_for_each_safe(p, h, &name->reg_pids) { + struct pid_el * pe = list_entry(p, struct pid_el, next); + list_del(&pe->next); + free(pe); + } + + list_for_each_safe(p, h, &name->reg_progs) { + struct str_el * se = list_entry(p, struct str_el, next); + list_del(&se->next); + free(se->str); + free(se); + } + + free(name); +} + +void reg_name_destroy(struct reg_name * name) +{ + if (name == NULL) + return; + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_DESTROY) { + pthread_mutex_unlock(&name->mtx); + return; + } + + if (name->state != NAME_FLOW_ACCEPT) + name->state = NAME_NULL; + else + name->state = NAME_DESTROY; + + pthread_cond_broadcast(&name->cond); + + pthread_cleanup_push(cancel_reg_name_destroy, name); + + while (name->state != NAME_NULL) + pthread_cond_wait(&name->cond, &name->mtx); + + pthread_cleanup_pop(true); +} + +static bool reg_name_has_prog(struct reg_name * name, + const char * prog) +{ + struct list_head * p; + + list_for_each(p, &name->reg_progs) { + struct str_el * name = list_entry(p, struct str_el, next); + if (!strcmp(name->str, prog)) + return true; + } + + return false; +} + +int reg_name_add_prog(struct reg_name * name, + struct reg_prog * a) +{ + struct str_el * n; + + if (reg_name_has_prog(name, a->prog)) { + log_warn("Program %s already accepting flows for %s.", + a->prog, name->name); + return 0; + } + + if (!(a->flags & BIND_AUTO)) { + log_dbg("Program %s cannot be auto-instantiated.", a->prog); + return 0; + } + + n = malloc(sizeof(*n)); + if (n == NULL) + return -ENOMEM; + + n->str = strdup(a->prog); + if (n->str == NULL) { + free(n); + return -ENOMEM; + } + + list_add(&n->next, &name->reg_progs); + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_IDLE) + name->state = NAME_AUTO_ACCEPT; + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +void reg_name_del_prog(struct reg_name * name, + const char * prog) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &name->reg_progs) { + struct str_el * se = list_entry(p, struct str_el, next); + if (strcmp(prog, se->str) == 0) { + list_del(&se->next); + free(se->str); + free(se); + } + } + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_AUTO_ACCEPT && list_is_empty(&name->reg_progs)) { + name->state = NAME_IDLE; + pthread_cond_broadcast(&name->cond); + } + + pthread_mutex_unlock(&name->mtx); +} + +char * reg_name_get_prog(struct reg_name * name) +{ + if (!list_is_empty(&name->reg_pids) || list_is_empty(&name->reg_progs)) + return NULL; + + return list_first_entry(&name->reg_progs, struct str_el, next)->str; +} + +static bool reg_name_has_pid(struct reg_name * name, + pid_t pid) +{ + struct list_head * p; + + list_for_each(p, &name->reg_progs) { + struct pid_el * name = list_entry(p, struct pid_el, next); + if (name->pid == pid) + return true; + } + + return false; +} + +int reg_name_add_pid(struct reg_name * name, + pid_t pid) +{ + struct pid_el * i; + + assert(name); + + if (reg_name_has_pid(name, pid)) { + log_dbg("Process already registered with this name."); + return -EPERM; + } + + pthread_mutex_lock(&name->mtx); + + if (name->state == NAME_NULL) { + pthread_mutex_unlock(&name->mtx); + log_dbg("Tried to add instance in NULL state."); + return -EPERM; + } + + i = malloc(sizeof(*i)); + if (i == NULL) { + pthread_mutex_unlock(&name->mtx); + return -ENOMEM; + } + + i->pid = pid; + + /* load balancing policy assigns queue order for this process. */ + switch(name->pol_lb) { + case LB_RR: /* Round robin policy. */ + list_add_tail(&i->next, &name->reg_pids); + break; + case LB_SPILL: /* Keep accepting flows on the current process */ + list_add(&i->next, &name->reg_pids); + break; + default: + free(i); + assert(false); + }; + + if (name->state == NAME_IDLE || + name->state == NAME_AUTO_ACCEPT || + name->state == NAME_AUTO_EXEC) { + name->state = NAME_FLOW_ACCEPT; + pthread_cond_broadcast(&name->cond); + } + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +void reg_name_set_policy(struct reg_name * name, + enum pol_balance lb) +{ + name->pol_lb = lb; +} + +static void reg_name_check_state(struct reg_name * name) +{ + assert(name); + + if (name->state == NAME_DESTROY) { + name->state = NAME_NULL; + pthread_cond_broadcast(&name->cond); + return; + } + + if (list_is_empty(&name->reg_pids)) { + if (!list_is_empty(&name->reg_progs)) + name->state = NAME_AUTO_ACCEPT; + else + name->state = NAME_IDLE; + } else { + name->state = NAME_FLOW_ACCEPT; + } + + pthread_cond_broadcast(&name->cond); +} + +void reg_name_del_pid_el(struct reg_name * name, + struct pid_el * p) +{ + assert(name); + assert(p); + + list_del(&p->next); + free(p); + + reg_name_check_state(name); +} + +void reg_name_del_pid(struct reg_name * name, + pid_t pid) +{ + struct list_head * p; + struct list_head * h; + + assert(name); + + if (name == NULL) + return; + + list_for_each_safe(p, h, &name->reg_pids) { + struct pid_el * a = list_entry(p, struct pid_el, next); + if (a->pid == pid) { + list_del(&a->next); + free(a); + } + } + + reg_name_check_state(name); +} + +pid_t reg_name_get_pid(struct reg_name * name) +{ + if (name == NULL) + return -1; + + if (list_is_empty(&name->reg_pids)) + return -1; + + return list_first_entry(&name->reg_pids, struct pid_el, next)->pid; +} + +enum name_state reg_name_get_state(struct reg_name * name) +{ + enum name_state state; + + assert(name); + + pthread_mutex_lock(&name->mtx); + + state = name->state; + + pthread_mutex_unlock(&name->mtx); + + return state; +} + +int reg_name_set_state(struct reg_name * name, + enum name_state state) +{ + assert(state != NAME_DESTROY); + + pthread_mutex_lock(&name->mtx); + + name->state = state; + pthread_cond_broadcast(&name->cond); + + pthread_mutex_unlock(&name->mtx); + + return 0; +} + +int reg_name_leave_state(struct reg_name * name, + enum name_state state, + struct timespec * timeout) +{ + struct timespec abstime; + int ret = 0; + + assert(name); + assert(state != NAME_DESTROY); + + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_mutex_lock(&name->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, &name->mtx); + + while (name->state == state && ret != -ETIMEDOUT) + if (timeout) + ret = -pthread_cond_timedwait(&name->cond, + &name->mtx, + &abstime); + else + ret = -pthread_cond_wait(&name->cond, + &name->mtx); + + if (name->state == NAME_DESTROY) { + ret = -1; + name->state = NAME_NULL; + pthread_cond_broadcast(&name->cond); + } + + pthread_cleanup_pop(true); + + return ret; +} diff --git a/src/irmd/reg/name.h b/src/irmd/reg/name.h new file mode 100644 index 00000000..0731782c --- /dev/null +++ b/src/irmd/reg/name.h @@ -0,0 +1,103 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Names + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_NAME_H +#define OUROBOROS_IRMD_REG_NAME_H + +#include <ouroboros/hash.h> +#include <ouroboros/ipcp.h> +#include <ouroboros/list.h> +#include <ouroboros/irm.h> + +#include "proc.h" +#include "prog.h" + +#include <stdint.h> +#include <stdbool.h> +#include <pthread.h> +#include <string.h> +#include <sys/types.h> + +enum name_state { + NAME_NULL = 0, + NAME_IDLE, + NAME_AUTO_ACCEPT, + NAME_AUTO_EXEC, + NAME_FLOW_ACCEPT, + NAME_FLOW_ARRIVED, + NAME_DESTROY +}; + +/* An entry in the registry */ +struct reg_name { + struct list_head next; + char * name; + + /* Policies for this name. */ + enum pol_balance pol_lb; /* Load balance incoming flows. */ + /* Programs that can be instantiated by the irmd. */ + struct list_head reg_progs; + /* Processes that are listening for this name. */ + struct list_head reg_pids; + + enum name_state state; + pthread_cond_t cond; + pthread_mutex_t mtx; +}; + +struct reg_name * reg_name_create(const char * name, + enum pol_balance lb); + +void reg_name_destroy(struct reg_name * n); + +int reg_name_add_prog(struct reg_name * n, + struct reg_prog * p); + +void reg_name_del_prog(struct reg_name * n, + const char * prog); + +char * reg_name_get_prog(struct reg_name * n); + +int reg_name_add_pid(struct reg_name * n, + pid_t pid); + +void reg_name_del_pid(struct reg_name * n, + pid_t pid); + +void reg_name_del_pid_el(struct reg_name * n, + struct pid_el * p); + +pid_t reg_name_get_pid(struct reg_name * n); + +void reg_name_set_policy(struct reg_name * n, + enum pol_balance lb); + +enum name_state reg_name_get_state(struct reg_name * n); + +int reg_name_set_state(struct reg_name * n, + enum name_state state); + +int reg_name_leave_state(struct reg_name * n, + enum name_state state, + struct timespec * timeout); + +#endif /* OUROBOROS_IRMD_REG_NAME_H */ diff --git a/src/irmd/reg/proc.c b/src/irmd/reg/proc.c new file mode 100644 index 00000000..1aae789d --- /dev/null +++ b/src/irmd/reg/proc.c @@ -0,0 +1,265 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" + +#include <ouroboros/list.h> +#include <ouroboros/errno.h> +#include <ouroboros/time_utils.h> + +#include "proc.h" +#include "name.h" + +#include <stdlib.h> +#include <unistd.h> +#include <limits.h> +#include <assert.h> + +struct reg_proc * reg_proc_create(pid_t pid, + const char * prog) +{ + struct reg_proc * proc; + pthread_condattr_t cattr; + + assert(prog); + + proc = malloc(sizeof(*proc)); + if (proc == NULL) + goto fail_malloc; + + if (pthread_condattr_init(&cattr)) + goto fail_condattr; + +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + + if (pthread_mutex_init(&proc->lock, NULL)) + goto fail_mutex; + + if (pthread_cond_init(&proc->cond, &cattr)) + goto fail_cond; + + proc->set = shm_flow_set_create(pid); + if (proc->set == NULL) + goto fail_set; + + proc->prog = strdup(prog); + if(proc->prog == NULL) + goto fail_prog; + + list_head_init(&proc->next); + list_head_init(&proc->names); + + proc->pid = pid; + proc->name = NULL; + proc->state = PROC_INIT; + + return proc; + + fail_prog: + shm_flow_set_destroy(proc->set); + fail_set: + pthread_cond_destroy(&proc->cond);; + fail_cond: + pthread_mutex_destroy(&proc->lock); + fail_mutex: + pthread_condattr_destroy(&cattr); + fail_condattr: + free(proc); + fail_malloc: + return NULL; +} + +static void cancel_reg_proc(void * o) +{ + struct reg_proc * proc = (struct reg_proc *) o; + + proc->state = PROC_NULL; + + pthread_mutex_unlock(&proc->lock); +} + +void reg_proc_destroy(struct reg_proc * proc) +{ + struct list_head * p; + struct list_head * h; + + assert(proc); + + pthread_mutex_lock(&proc->lock); + + if (proc->state == PROC_DESTROY) { + pthread_mutex_unlock(&proc->lock); + return; + } + + if (proc->state == PROC_SLEEP) + proc->state = PROC_DESTROY; + + pthread_cond_signal(&proc->cond); + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state != PROC_INIT) + pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + pthread_mutex_unlock(&proc->lock); + + shm_flow_set_destroy(proc->set); + + pthread_cond_destroy(&proc->cond); + pthread_mutex_destroy(&proc->lock); + + list_for_each_safe(p, h, &proc->names) { + struct str_el * n = list_entry(p, struct str_el, next); + list_del(&n->next); + if (n->str != NULL) + free(n->str); + free(n); + } + + free(proc->prog); + free(proc); +} + +int reg_proc_add_name(struct reg_proc * proc, + const char * name) +{ + struct str_el * s; + + assert(proc); + assert(name); + + s = malloc(sizeof(*s)); + if (s == NULL) + goto fail_malloc; + + s->str = strdup(name); + if (s->str == NULL) + goto fail_name; + + list_add(&s->next, &proc->names); + + return 0; + + fail_name: + free(s); + fail_malloc: + return -ENOMEM; +} + +void reg_proc_del_name(struct reg_proc * proc, + const char * name) +{ + struct list_head * p = NULL; + struct list_head * h = NULL; + + assert(proc); + assert(name); + + list_for_each_safe(p, h, &proc->names) { + struct str_el * s = list_entry(p, struct str_el, next); + if (!strcmp(name, s->str)) { + list_del(&s->next); + free(s->str); + free(s); + } + } +} + +int reg_proc_sleep(struct reg_proc * proc, + struct timespec * dl) +{ + + int ret = 0; + + assert(proc); + + pthread_mutex_lock(&proc->lock); + + if (proc->state != PROC_WAKE && proc->state != PROC_DESTROY) + proc->state = PROC_SLEEP; + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state == PROC_SLEEP && ret != -ETIMEDOUT) + if (dl != NULL) + ret = -pthread_cond_timedwait(&proc->cond, + &proc->lock, dl); + else + ret = -pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + if (proc->state == PROC_DESTROY) { + if (proc->name != NULL) + reg_name_del_pid(proc->name, proc->pid); + ret = -1; + } + + proc->state = PROC_INIT; + + pthread_cond_broadcast(&proc->cond); + pthread_mutex_unlock(&proc->lock); + + return ret; +} + +void reg_proc_wake(struct reg_proc * proc, + struct reg_name * name) +{ + assert(proc); + assert(name); + + pthread_mutex_lock(&proc->lock); + + if (proc->state != PROC_SLEEP) { + pthread_mutex_unlock(&proc->lock); + return; + } + + proc->state = PROC_WAKE; + proc->name = name; + + pthread_cond_broadcast(&proc->cond); + + pthread_cleanup_push(cancel_reg_proc, proc); + + while (proc->state == PROC_WAKE) + pthread_cond_wait(&proc->cond, &proc->lock); + + pthread_cleanup_pop(false); + + if (proc->state == PROC_DESTROY) + proc->state = PROC_INIT; + + pthread_mutex_unlock(&proc->lock); +} diff --git a/src/irmd/reg/proc.h b/src/irmd/reg/proc.h new file mode 100644 index 00000000..fb11f34f --- /dev/null +++ b/src/irmd/reg/proc.h @@ -0,0 +1,75 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Processes + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROC_H +#define OUROBOROS_IRMD_REG_PROC_H + +#include <ouroboros/shm_flow_set.h> + +#include "utils.h" + +#include <unistd.h> +#include <pthread.h> + +enum proc_state { + PROC_NULL = 0, + PROC_INIT, + PROC_SLEEP, + PROC_WAKE, + PROC_DESTROY +}; + +struct reg_proc { + struct list_head next; + pid_t pid; + char * prog; /* program instantiated */ + struct list_head names; /* names for which process accepts flows */ + struct shm_flow_set * set; + + struct reg_name * name; /* name for which a flow arrived */ + + /* The process will block on this */ + enum proc_state state; + pthread_cond_t cond; + pthread_mutex_t lock; +}; + +struct reg_proc * reg_proc_create(pid_t proc, + const char * prog); + +void reg_proc_destroy(struct reg_proc * proc); + +int reg_proc_sleep(struct reg_proc * proc, + struct timespec * timeo); + +void reg_proc_wake(struct reg_proc * proc, + struct reg_name * name); + +void reg_proc_cancel(struct reg_proc * proc); + +int reg_proc_add_name(struct reg_proc * proc, + const char * name); + +void reg_proc_del_name(struct reg_proc * proc, + const char * name); + +#endif /* OUROBOROS_IRMD_REG_PROC_H */ diff --git a/src/irmd/reg/prog.c b/src/irmd/reg/prog.c new file mode 100644 index 00000000..e3fd6105 --- /dev/null +++ b/src/irmd/reg/prog.c @@ -0,0 +1,174 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200809L +#endif + +#include <ouroboros/errno.h> +#include <ouroboros/irm.h> +#include <ouroboros/utils.h> + +#include "prog.h" +#include "utils.h" + +#include <assert.h> +#include <stdlib.h> +#include <string.h> + + +static char ** create_argv(const char * prog, + size_t argc, + char ** argv) +{ + char ** argv2; + size_t i; + + argv2 = malloc((argc + 2) * sizeof(*argv2)); /* prog + args + NULL */ + if (argv2 == 0) + goto fail_malloc; + + argv2[0] = strdup(prog); + if (argv2[0] == NULL) + goto fail_prog; + + for (i = 1; i <= argc; ++i) { + argv2[i] = strdup(argv[i - 1]); + if (argv2[i] == NULL) + goto fail_arg; + } + + argv2[argc + 1] = NULL; + + return argv2; + + fail_arg: + argvfree(argv2); + fail_prog: + free(argv2); + fail_malloc: + return NULL; +} + +struct reg_prog * reg_prog_create(const char * prog, + uint32_t flags, + int argc, + char ** argv) +{ + struct reg_prog * p; + + assert(prog); + + p = malloc(sizeof(*p)); + if (p == NULL) + goto fail_malloc; + + memset(p, 0, sizeof(*p)); + + p->prog = strdup(path_strip(prog)); + if (p->prog == NULL) + goto fail_prog; + + if (argc > 0 && flags & BIND_AUTO) { + p->argv = create_argv(prog, argc, argv); + if (p->argv == NULL) + goto fail_argv; + } + + list_head_init(&p->next); + list_head_init(&p->names); + + p->flags = flags; + + return p; + + fail_argv: + free(p->prog); + fail_prog: + free(p); + fail_malloc: + return NULL; +} + +void reg_prog_destroy(struct reg_prog * prog) +{ + struct list_head * p; + struct list_head * h; + + if (prog == NULL) + return; + + list_for_each_safe(p, h, &prog->names) { + struct str_el * s = list_entry(p, struct str_el, next); + list_del(&s->next); + free(s->str); + free(s); + } + + argvfree(prog->argv); + free(prog->prog); + free(prog); +} + +int reg_prog_add_name(struct reg_prog * prog, + const char * name) +{ + struct str_el * s; + + if (prog == NULL || name == NULL) + return -EINVAL; + + s = malloc(sizeof(*s)); + if (s == NULL) + goto fail_malloc; + + s->str = strdup(name); + if(s->str == NULL) + goto fail_name; + + list_add(&s->next, &prog->names); + + return 0; + + fail_name: + free(s); + fail_malloc: + return -ENOMEM; +} + +void reg_prog_del_name(struct reg_prog * prog, + const char * name) +{ + struct list_head * p; + struct list_head * h; + + list_for_each_safe(p, h, &prog->names) { + struct str_el * s = list_entry(p, struct str_el, next); + if (!strcmp(name, s->str)) { + list_del(&s->next); + free(s->str); + free(s); + } + } +} diff --git a/src/irmd/reg/prog.h b/src/irmd/reg/prog.h new file mode 100644 index 00000000..c45f2d8c --- /dev/null +++ b/src/irmd/reg/prog.h @@ -0,0 +1,52 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2023 + * + * The IPC Resource Manager - Registry - Programs + * + * Dimitri Staessens <[email protected]> + * Sander Vrijders <[email protected]> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_IRMD_REG_PROG_H +#define OUROBOROS_IRMD_REG_PROG_H + +#include <ouroboros/list.h> + +#include <unistd.h> +#include <stdint.h> + +struct reg_prog { + struct list_head next; + char * prog; /* name of binary */ + uint32_t flags; + char ** argv; + struct list_head names; /* names that all instances will listen for */ +}; + +struct reg_prog * reg_prog_create(const char * prog, + uint32_t flags, + int argc, + char ** argv); + +void reg_prog_destroy(struct reg_prog * prog); + +int reg_prog_add_name(struct reg_prog * prog, + const char * name); + +void reg_prog_del_name(struct reg_prog * prog, + const char * name); + +#endif /* OUROBOROS_IRMD_REG_PROG_H */ |