From b802b25ddfe6f1b6ecabe3ba70e3dac2e99e7a50 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Thu, 4 Oct 2018 18:06:32 +0200 Subject: lib: Pass qosspec at flow allocation The flow allocator now passes the full qos specification to the endpoint, instead of just a cube. This is a more flexible architecture, as it makes QoS cubes internal to the layers. Adds endianness transforms for the flow allocator protocol in the normal IPCP. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/lib/irmd_messages.proto | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/lib/irmd_messages.proto') diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 16dfe828..2ed2ec37 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -23,6 +23,7 @@ syntax = "proto2"; import "ipcp_config.proto"; +import "qosspec.proto"; enum irm_msg_code { IRM_CREATE_IPCP = 1; @@ -67,7 +68,7 @@ message irm_msg { optional string dst = 9; optional bytes hash = 10; optional sint32 port_id = 11; - optional sint32 qoscube = 12; + optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; repeated ipcp_info_msg ipcps = 15; -- cgit v1.2.3 From 04384c0eab88615902025023bb3e1339ea0f9d1a Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Fri, 5 Oct 2018 11:24:55 +0200 Subject: lib: Rename port_id to flow_id Renames port_id to flow_id according to updated nomenclature. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- doc/man/flow_alloc.3 | 4 +- doc/man/ouroboros-tutorial.7 | 18 +++--- include/ouroboros/np1_flow.h | 6 +- include/ouroboros/shm_flow_set.h | 8 +-- include/ouroboros/shm_rbuff.h | 4 +- src/ipcpd/ipcp.c | 16 ++--- src/irmd/ipcp.c | 18 +++--- src/irmd/ipcp.h | 6 +- src/irmd/irm_flow.c | 8 +-- src/irmd/irm_flow.h | 4 +- src/irmd/main.c | 122 +++++++++++++++++++-------------------- src/lib/dev.c | 102 ++++++++++++++++---------------- src/lib/ipcpd_messages.proto | 2 +- src/lib/irmd_messages.proto | 2 +- src/lib/shm_flow_set.c | 40 ++++++------- src/lib/shm_rbuff.c | 18 +++--- src/lib/shm_rbuff_ll.c | 2 +- src/lib/shm_rbuff_pthr.c | 2 +- 18 files changed, 191 insertions(+), 191 deletions(-) (limited to 'src/lib/irmd_messages.proto') diff --git a/doc/man/flow_alloc.3 b/doc/man/flow_alloc.3 index dda0c877..8a21eda8 100644 --- a/doc/man/flow_alloc.3 +++ b/doc/man/flow_alloc.3 @@ -2,7 +2,7 @@ .\" Dimitri Staessens .\" Sander Vrijders -.TH FLOW_ALLOC 3 2017-12-02 Ouroboros "Ouroboros Programmer's Manual" +.TH FLOW_ALLOC 3 2018-10-05 Ouroboros "Ouroboros Programmer's Manual" .SH NAME @@ -80,7 +80,7 @@ Failed to contact an IRMd instance. \fBflow_accept\fR() and \fBflow_alloc\fR() can also return .B -EBADF -No more flow desciptors or port_ids available. +No more flow desciptors or flow_ids available. .B -ENOMEM Not enough system memory resources available to allocate the flow. diff --git a/doc/man/ouroboros-tutorial.7 b/doc/man/ouroboros-tutorial.7 index 76fa0068..b7a22208 100644 --- a/doc/man/ouroboros-tutorial.7 +++ b/doc/man/ouroboros-tutorial.7 @@ -2,7 +2,7 @@ .\" Dimitri Staessens .\" Sander Vrijders -.TH OUROBOROS-TUTORIAL 7 2017-12-02 Ouroboros "Ouroboros User Manual" +.TH OUROBOROS-TUTORIAL 7 2018-10-05 Ouroboros "Ouroboros User Manual" .SH NAME @@ -114,25 +114,25 @@ rtt min/avg/max/mdev = 0.304/0.392/0.475/0.086 ms .RE That's all there is to it! The IRMd should log the flow -allocation. There are two endpoints of the flow (port_id's 0 and 1), +allocation. There are two endpoints of the flow (flow_id's 0 and 1), one for the server (1) and one for the client (0). After the flow -request, a new port_id is created at the server side (port_id 1) and -then a previously pending flow (on port_id 0) is allocated following +request, a new flow_id is created at the server side (flow_id 1) and +then a previously pending flow (on flow_id 0) is allocated following the response from the server. When the communication is done, the flow is deallocated and the -resources (port_id's 0 and 1) are released. +resources (flow_id's 0 and 1) are released. .RS 4 ==23918== irmd(II): Flow request arrived for my.oping.server. .br -==23918== irmd(II): Flow on port_id 1 allocated. +==23918== irmd(II): Flow on flow_id 1 allocated. .br -==23918== irmd(II): Flow on port_id 0 allocated. +==23918== irmd(II): Flow on flow_id 0 allocated. .br -==23918== irmd(II): Completed deallocation of port_id 0 by process 23932. +==23918== irmd(II): Completed deallocation of flow_id 0 by process 23932. .br -==23918== irmd(II): Completed deallocation of port_id 1 by process 23932. +==23918== irmd(II): Completed deallocation of flow_id 1 by process 23932. .RE .SH TERMINOLOGY diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h index 3435c24a..3a2bbd12 100644 --- a/include/ouroboros/np1_flow.h +++ b/include/ouroboros/np1_flow.h @@ -28,11 +28,11 @@ #include int np1_flow_alloc(pid_t n_pid, - int port_id, + int flow_id, qosspec_t qs); -int np1_flow_resp(int port_id); +int np1_flow_resp(int flow_id); -int np1_flow_dealloc(int port_id); +int np1_flow_dealloc(int flow_id); #endif /* OUROBOROS_NP1_FLOW_H */ diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h index ebf63af5..45d372a0 100644 --- a/include/ouroboros/shm_flow_set.h +++ b/include/ouroboros/shm_flow_set.h @@ -42,18 +42,18 @@ void shm_flow_set_zero(struct shm_flow_set * shm_set, int shm_flow_set_add(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); int shm_flow_set_has(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); void shm_flow_set_del(struct shm_flow_set * shm_set, size_t idx, - int port_id); + int flow_id); void shm_flow_set_notify(struct shm_flow_set * set, - int port_id, + int flow_id, int event); ssize_t shm_flow_set_wait(const struct shm_flow_set * shm_set, diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index 223f6bf4..447e081e 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -35,10 +35,10 @@ struct shm_rbuff; struct shm_rbuff * shm_rbuff_create(pid_t pid, - int port_id); + int flow_id); struct shm_rbuff * shm_rbuff_open(pid_t pid, - int port_id); + int flow_id); void shm_rbuff_close(struct shm_rbuff * rb); diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index e415bbd9..f8df5640 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -432,11 +432,11 @@ static void * mainloop(void * o) qs = msg_to_spec(msg->qosspec); fd = np1_flow_alloc(msg->pid, - msg->port_id, + msg->flow_id, qs); if (fd < 0) { - log_err("Failed allocating fd on port_id %d.", - msg->port_id); + log_err("Failed allocating fd on flow_id %d.", + msg->flow_id); ret_msg.result = -1; break; } @@ -461,10 +461,10 @@ static void * mainloop(void * o) } if (!msg->response) { - fd = np1_flow_resp(msg->port_id); + fd = np1_flow_resp(msg->flow_id); if (fd < 0) { log_warn("Port_id %d is not known.", - msg->port_id); + msg->flow_id); ret_msg.result = -1; break; } @@ -488,10 +488,10 @@ static void * mainloop(void * o) break; } - fd = np1_flow_dealloc(msg->port_id); + fd = np1_flow_dealloc(msg->flow_id); if (fd < 0) { - log_warn("Could not deallocate port_id %d.", - msg->port_id); + log_warn("Could not deallocate flow_id %d.", + msg->flow_id); ret_msg.result = -1; break; } diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 0bdf674b..20aee79f 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -429,7 +429,7 @@ int ipcp_query(pid_t pid, } int ipcp_flow_alloc(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, const uint8_t * dst, size_t len, @@ -443,8 +443,8 @@ int ipcp_flow_alloc(pid_t pid, assert(dst); msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; msg.has_pid = true; msg.pid = n_pid; msg.has_hash = true; @@ -469,7 +469,7 @@ int ipcp_flow_alloc(pid_t pid, } int ipcp_flow_alloc_resp(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, int response) { @@ -478,8 +478,8 @@ int ipcp_flow_alloc_resp(pid_t pid, int ret = -1; msg.code = IPCP_MSG_CODE__IPCP_FLOW_ALLOC_RESP; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; msg.has_pid = true; msg.pid = n_pid; msg.has_response = true; @@ -501,15 +501,15 @@ int ipcp_flow_alloc_resp(pid_t pid, } int ipcp_flow_dealloc(pid_t pid, - int port_id) + int flow_id) { ipcp_msg_t msg = IPCP_MSG__INIT; ipcp_msg_t * recv_msg = NULL; int ret = -1; msg.code = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC; - msg.has_port_id = true; - msg.port_id = port_id; + msg.has_flow_id = true; + msg.flow_id = flow_id; recv_msg = send_recv_ipcp_msg(pid, &msg); if (recv_msg == NULL) diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 28396333..5d096788 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -63,18 +63,18 @@ int ipcp_query(pid_t pid, size_t len); int ipcp_flow_alloc(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, const uint8_t * dst, size_t len, qosspec_t qs); int ipcp_flow_alloc_resp(pid_t pid, - int port_id, + int flow_id, pid_t n_pid, int response); int ipcp_flow_dealloc(pid_t pid, - int port_id); + int flow_id); #endif /* OUROBOROS_IRMD_IPCP_H */ diff --git a/src/irmd/irm_flow.c b/src/irmd/irm_flow.c index a5a9f28c..a0889f09 100644 --- a/src/irmd/irm_flow.c +++ b/src/irmd/irm_flow.c @@ -38,7 +38,7 @@ struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, - int port_id, + int flow_id, qosspec_t qs) { pthread_condattr_t cattr; @@ -60,16 +60,16 @@ struct irm_flow * irm_flow_create(pid_t n_pid, f->n_pid = n_pid; f->n_1_pid = n_1_pid; - f->port_id = port_id; + f->flow_id = flow_id; f->qs = qs; - f->n_rb = shm_rbuff_create(n_pid, port_id); + f->n_rb = shm_rbuff_create(n_pid, flow_id); if (f->n_rb == NULL) { log_err("Could not create ringbuffer for process %d.", n_pid); goto fail_n_rbuff; } - f->n_1_rb = shm_rbuff_create(n_1_pid, port_id); + f->n_1_rb = shm_rbuff_create(n_1_pid, flow_id); if (f->n_1_rb == NULL) { log_err("Could not create ringbuffer for process %d.", n_1_pid); goto fail_n_1_rbuff; diff --git a/src/irmd/irm_flow.h b/src/irmd/irm_flow.h index f4de8187..1cd2e662 100644 --- a/src/irmd/irm_flow.h +++ b/src/irmd/irm_flow.h @@ -42,7 +42,7 @@ enum flow_state { struct irm_flow { struct list_head next; - int port_id; + int flow_id; pid_t n_pid; pid_t n_1_pid; @@ -61,7 +61,7 @@ struct irm_flow { struct irm_flow * irm_flow_create(pid_t n_pid, pid_t n_1_pid, - int port_id, + int flow_id, qosspec_t qs); void irm_flow_destroy(struct irm_flow * f); diff --git a/src/irmd/main.c b/src/irmd/main.c index 427e09d1..78fcf7b5 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -118,7 +118,7 @@ struct { struct list_head spawned_pids; /* child processes */ pthread_rwlock_t reg_lock; /* lock for registration info */ - struct bmp * port_ids; /* port_ids for flows */ + struct bmp * flow_ids; /* flow_ids for flows */ struct list_head irm_flows; /* flow information */ pthread_rwlock_t flows_lock; /* lock for flows */ @@ -174,13 +174,13 @@ static void clear_irm_flow(struct irm_flow * f) { shm_rdrbuff_remove(irmd.rdrb, idx); } -static struct irm_flow * get_irm_flow(int port_id) +static struct irm_flow * get_irm_flow(int flow_id) { struct list_head * pos = NULL; list_for_each(pos, &irmd.irm_flows) { struct irm_flow * e = list_entry(pos, struct irm_flow, next); - if (e->port_id == port_id) + if (e->flow_id == flow_id) return e; } @@ -1152,7 +1152,7 @@ static int flow_accept(pid_t pid, pid_t pid_n1; pid_t pid_n; - int port_id; + int flow_id; int ret; pthread_rwlock_wrlock(&irmd.reg_lock); @@ -1198,7 +1198,7 @@ static int flow_accept(pid_t pid, pid_n = f->n_pid; pid_n1 = f->n_1_pid; - port_id = f->port_id; + flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); pthread_rwlock_rdlock(&irmd.reg_lock); @@ -1208,9 +1208,9 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1228,9 +1228,9 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, -1); + ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, -1); clear_irm_flow(f); irm_flow_set_state(f, FLOW_NULL); irm_flow_destroy(f); @@ -1242,7 +1242,7 @@ static int flow_accept(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); - if (ipcp_flow_alloc_resp(pid_n1, port_id, pid_n, 0)) { + if (ipcp_flow_alloc_resp(pid_n1, flow_id, pid_n, 0)) { pthread_rwlock_wrlock(&irmd.flows_lock); list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); @@ -1255,7 +1255,7 @@ static int flow_accept(pid_t pid, irm_flow_set_state(f, FLOW_ALLOCATED); - log_info("Flow on port_id %d allocated.", f->port_id); + log_info("Flow on flow_id %d allocated.", f->flow_id); *fl = f; @@ -1270,7 +1270,7 @@ static int flow_alloc(pid_t pid, { struct irm_flow * f; struct ipcp_entry * ipcp; - int port_id; + int flow_id; int state; uint8_t * hash; @@ -1281,18 +1281,18 @@ static int flow_alloc(pid_t pid, } pthread_rwlock_wrlock(&irmd.flows_lock); - port_id = bmp_allocate(irmd.port_ids); - if (!bmp_is_id_valid(irmd.port_ids, port_id)) { + flow_id = bmp_allocate(irmd.flow_ids); + if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return -EBADF; } - f = irm_flow_create(pid, ipcp->pid, port_id, qs); + f = irm_flow_create(pid, ipcp->pid, flow_id, qs); if (f == NULL) { - bmp_release(irmd.port_ids, port_id); + bmp_release(irmd.flow_ids, flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return -ENOMEM; } @@ -1309,7 +1309,7 @@ static int flow_alloc(pid_t pid, str_hash(ipcp->dir_hash_algo, hash, dst); - if (ipcp_flow_alloc(ipcp->pid, port_id, pid, hash, + if (ipcp_flow_alloc(ipcp->pid, flow_id, pid, hash, IPCP_HASH_LEN(ipcp), qs)) { /* sanitizer cleans this */ log_info("Flow_allocation failed."); @@ -1334,13 +1334,13 @@ static int flow_alloc(pid_t pid, *e = f; - log_info("Flow on port_id %d allocated.", port_id); + log_info("Flow on flow_id %d allocated.", flow_id); return 0; } static int flow_dealloc(pid_t pid, - int port_id) + int flow_id) { pid_t n_1_pid = -1; int ret = 0; @@ -1349,10 +1349,10 @@ static int flow_dealloc(pid_t pid, pthread_rwlock_wrlock(&irmd.flows_lock); - f = get_irm_flow(port_id); + f = get_irm_flow(flow_id); if (f == NULL) { pthread_rwlock_unlock(&irmd.flows_lock); - log_dbg("Deallocate unknown port %d by %d.", port_id, pid); + log_dbg("Deallocate unknown port %d by %d.", flow_id, pid); return 0; } @@ -1374,19 +1374,19 @@ static int flow_dealloc(pid_t pid, irm_flow_set_state(f, FLOW_NULL); clear_irm_flow(f); irm_flow_destroy(f); - bmp_release(irmd.port_ids, port_id); - log_info("Completed deallocation of port_id %d by process %d.", - port_id, pid); + bmp_release(irmd.flow_ids, flow_id); + log_info("Completed deallocation of flow_id %d by process %d.", + flow_id, pid); } else { irm_flow_set_state(f, FLOW_DEALLOC_PENDING); - log_dbg("Partial deallocation of port_id %d by process %d.", - port_id, pid); + log_dbg("Partial deallocation of flow_id %d by process %d.", + flow_id, pid); } pthread_rwlock_unlock(&irmd.flows_lock); if (n_1_pid != -1) - ret = ipcp_flow_dealloc(n_1_pid, port_id); + ret = ipcp_flow_dealloc(n_1_pid, flow_id); return ret; } @@ -1428,7 +1428,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, struct pid_el * c_pid; struct ipcp_entry * ipcp; pid_t h_pid = -1; - int port_id = -1; + int flow_id = -1; struct timespec wt = {IRMD_REQ_ARR_TIMEOUT / 1000, (IRMD_REQ_ARR_TIMEOUT % 1000) * MILLION}; @@ -1515,17 +1515,17 @@ static struct irm_flow * flow_req_arr(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); - port_id = bmp_allocate(irmd.port_ids); - if (!bmp_is_id_valid(irmd.port_ids, port_id)) { + flow_id = bmp_allocate(irmd.flow_ids); + if (!bmp_is_id_valid(irmd.flow_ids, flow_id)) { pthread_rwlock_unlock(&irmd.flows_lock); return NULL; } - f = irm_flow_create(h_pid, pid, port_id, qs); + f = irm_flow_create(h_pid, pid, flow_id, qs); if (f == NULL) { - bmp_release(irmd.port_ids, port_id); + bmp_release(irmd.flow_ids, flow_id); pthread_rwlock_unlock(&irmd.flows_lock); - log_err("Could not allocate port_id."); + log_err("Could not allocate flow_id."); return NULL; } @@ -1541,7 +1541,7 @@ static struct irm_flow * flow_req_arr(pid_t pid, pthread_rwlock_unlock(&irmd.reg_lock); pthread_rwlock_wrlock(&irmd.flows_lock); clear_irm_flow(f); - bmp_release(irmd.port_ids, f->port_id); + bmp_release(irmd.flow_ids, f->flow_id); list_del(&f->next); pthread_rwlock_unlock(&irmd.flows_lock); log_err("Could not get process table entry for %d.", h_pid); @@ -1558,14 +1558,14 @@ static struct irm_flow * flow_req_arr(pid_t pid, return f; } -static int flow_alloc_reply(int port_id, +static int flow_alloc_reply(int flow_id, int response) { struct irm_flow * f; pthread_rwlock_rdlock(&irmd.flows_lock); - f = get_irm_flow(port_id); + f = get_irm_flow(flow_id); if (f == NULL) { pthread_rwlock_unlock(&irmd.flows_lock); return -1; @@ -1631,8 +1631,8 @@ static void irm_fini(void) pthread_rwlock_wrlock(&irmd.flows_lock); - if (irmd.port_ids != NULL) - bmp_destroy(irmd.port_ids); + if (irmd.flow_ids != NULL) + bmp_destroy(irmd.flow_ids); list_for_each_safe(p, h, &irmd.irm_flows) { struct irm_flow * f = list_entry(p, struct irm_flow, next); @@ -1759,14 +1759,14 @@ void * irm_sanitize(void * o) list_for_each_safe(p, h, &irmd.irm_flows) { int ipcpi; - int port_id; + int flow_id; struct irm_flow * f = list_entry(p, struct irm_flow, next); if (irm_flow_get_state(f) == FLOW_ALLOC_PENDING && ts_diff_ms(&f->t0, &now) > IRMD_FLOW_TIMEOUT) { - log_dbg("Pending port_id %d timed out.", - f->port_id); + log_dbg("Pending flow_id %d timed out.", + f->flow_id); f->n_pid = -1; irm_flow_set_state(f, FLOW_DEALLOC_PENDING); continue; @@ -1776,16 +1776,16 @@ void * irm_sanitize(void * o) struct shm_flow_set * set; log_dbg("Process %d gone, deallocating " "flow %d.", - f->n_pid, f->port_id); + f->n_pid, f->flow_id); set = shm_flow_set_open(f->n_pid); if (set != NULL) shm_flow_set_destroy(set); f->n_pid = -1; irm_flow_set_state(f, FLOW_DEALLOC_PENDING); ipcpi = f->n_1_pid; - port_id = f->port_id; + flow_id = f->flow_id; pthread_rwlock_unlock(&irmd.flows_lock); - ipcp_flow_dealloc(ipcpi, port_id); + ipcp_flow_dealloc(ipcpi, flow_id); pthread_rwlock_wrlock(&irmd.flows_lock); continue; } @@ -1793,7 +1793,7 @@ void * irm_sanitize(void * o) if (kill(f->n_1_pid, 0) < 0) { struct shm_flow_set * set; log_err("IPCP %d gone, flow %d removed.", - f->n_1_pid, f->port_id); + f->n_1_pid, f->flow_id); set = shm_flow_set_open(f->n_pid); if (set != NULL) shm_flow_set_destroy(set); @@ -1994,8 +1994,8 @@ static void * mainloop(void * o) result = flow_accept(msg->pid, timeo, &e); if (result == 0) { qosspec_msg_t qs_msg; - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; qs_msg = spec_to_msg(&e->qs); @@ -2007,14 +2007,14 @@ static void * mainloop(void * o) msg_to_spec(msg->qosspec), timeo, &e); if (result == 0) { - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_1_pid; } break; case IRM_MSG_CODE__IRM_FLOW_DEALLOC: - result = flow_dealloc(msg->pid, msg->port_id); + result = flow_dealloc(msg->pid, msg->flow_id); break; case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR: e = flow_req_arr(msg->pid, @@ -2022,14 +2022,14 @@ static void * mainloop(void * o) msg_to_spec(msg->qosspec)); result = (e == NULL ? -1 : 0); if (result == 0) { - ret_msg->has_port_id = true; - ret_msg->port_id = e->port_id; + ret_msg->has_flow_id = true; + ret_msg->flow_id = e->flow_id; ret_msg->has_pid = true; ret_msg->pid = e->n_pid; } break; case IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY: - result = flow_alloc_reply(msg->port_id, msg->response); + result = flow_alloc_reply(msg->flow_id, msg->response); break; default: log_err("Don't know that message code."); @@ -2143,10 +2143,10 @@ static int irm_init(void) list_head_init(&irmd.irm_flows); list_head_init(&irmd.cmds); - irmd.port_ids = bmp_create(SYS_MAX_FLOWS, 0); - if (irmd.port_ids == NULL) { - log_err("Failed to create port_ids bitmap."); - goto fail_port_ids; + irmd.flow_ids = bmp_create(SYS_MAX_FLOWS, 0); + if (irmd.flow_ids == NULL) { + log_err("Failed to create flow_ids bitmap."); + goto fail_flow_ids; } if ((irmd.lf = lockfile_create()) == NULL) { @@ -2235,8 +2235,8 @@ static int irm_init(void) fail_stat: lockfile_destroy(irmd.lf); fail_lockfile: - bmp_destroy(irmd.port_ids); - fail_port_ids: + bmp_destroy(irmd.flow_ids); + fail_flow_ids: pthread_cond_destroy(&irmd.cmd_cond); fail_cmd_cond: pthread_mutex_destroy(&irmd.cmd_lock); diff --git a/src/lib/dev.c b/src/lib/dev.c index 00dcf991..117ce49d 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -91,7 +91,7 @@ struct flow { struct shm_rbuff * rx_rb; struct shm_rbuff * tx_rb; struct shm_flow_set * set; - int port_id; + int flow_id; int oflags; qosspec_t spec; ssize_t part_idx; @@ -167,12 +167,12 @@ static void port_set_state(struct port * p, pthread_mutex_unlock(&p->state_lock); } -static enum port_state port_wait_assign(int port_id) +static enum port_state port_wait_assign(int flow_id) { enum port_state state; struct port * p; - p = &ai.ports[port_id]; + p = &ai.ports[flow_id]; pthread_mutex_lock(&p->state_lock); @@ -231,7 +231,7 @@ static void flow_clear(int fd) { memset(&ai.flows[fd], 0, sizeof(ai.flows[fd])); - ai.flows[fd].port_id = -1; + ai.flows[fd].flow_id = -1; ai.flows[fd].pid = -1; } @@ -239,8 +239,8 @@ static void flow_fini(int fd) { assert(fd >= 0 && fd < SYS_MAX_FLOWS); - if (ai.flows[fd].port_id != -1) { - port_destroy(&ai.ports[ai.flows[fd].port_id]); + if (ai.flows[fd].flow_id != -1) { + port_destroy(&ai.ports[ai.flows[fd].flow_id]); bmp_release(ai.fds, fd); } @@ -256,7 +256,7 @@ static void flow_fini(int fd) if (ai.flows[fd].set != NULL) { shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); shm_flow_set_close(ai.flows[fd].set); } @@ -267,7 +267,7 @@ static void flow_fini(int fd) flow_clear(fd); } -static int flow_init(int port_id, +static int flow_init(int flow_id, pid_t pid, qosspec_t qs) { @@ -282,11 +282,11 @@ static int flow_init(int port_id, goto fail_fds; } - ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, port_id); + ai.flows[fd].rx_rb = shm_rbuff_open(ai.pid, flow_id); if (ai.flows[fd].rx_rb == NULL) goto fail; - ai.flows[fd].tx_rb = shm_rbuff_open(pid, port_id); + ai.flows[fd].tx_rb = shm_rbuff_open(pid, flow_id); if (ai.flows[fd].tx_rb == NULL) goto fail; @@ -294,15 +294,15 @@ static int flow_init(int port_id, if (ai.flows[fd].set == NULL) goto fail; - ai.flows[fd].port_id = port_id; + ai.flows[fd].flow_id = flow_id; ai.flows[fd].oflags = FLOWFDEFAULT; ai.flows[fd].pid = pid; ai.flows[fd].part_idx = NO_PART; ai.flows[fd].spec = qs; - ai.ports[port_id].fd = fd; + ai.ports[flow_id].fd = fd; - port_set_state(&ai.ports[port_id], PORT_ID_ASSIGNED); + port_set_state(&ai.ports[flow_id], PORT_ID_ASSIGNED); pthread_rwlock_unlock(&ai.lock); @@ -446,7 +446,7 @@ static void fini(void) pthread_rwlock_wrlock(&ai.lock); for (i = 0; i < PROG_MAX_FLOWS; ++i) { - if (ai.flows[i].port_id != -1) { + if (ai.flows[i].flow_id != -1) { ssize_t idx; shm_rbuff_set_acl(ai.flows[i].rx_rb, ACL_FLOWDOWN); while ((idx = shm_rbuff_read(ai.flows[i].rx_rb)) >= 0) @@ -522,13 +522,13 @@ int flow_accept(qosspec_t * qs, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id || + if (!recv_msg->has_pid || !recv_msg->has_flow_id || recv_msg->qosspec == NULL) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, + fd = flow_init(recv_msg->flow_id, recv_msg->pid, msg_to_spec(recv_msg->qosspec)); irm_msg__free_unpacked(recv_msg, NULL); @@ -595,12 +595,12 @@ int flow_alloc(const char * dst, return res; } - if (!recv_msg->has_pid || !recv_msg->has_port_id) { + if (!recv_msg->has_pid || !recv_msg->has_flow_id) { irm_msg__free_unpacked(recv_msg, NULL); return -EIRMD; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs == NULL ? qos_raw : *qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -635,15 +635,15 @@ int flow_dealloc(int fd) return -EINVAL; msg.code = IRM_MSG_CODE__IRM_FLOW_DEALLOC; - msg.has_port_id = true; + msg.has_flow_id = true; msg.has_pid = true; msg.pid = ai.pid; pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -690,7 +690,7 @@ int fccntl(int fd, pthread_rwlock_wrlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); va_end(l); return -ENOTALLOC; @@ -762,13 +762,13 @@ int fccntl(int fd, rx_acl |= ACL_FLOWDOWN; tx_acl |= ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_DOWN); } else { rx_acl &= ~ACL_FLOWDOWN; tx_acl &= ~ACL_FLOWDOWN; shm_flow_set_notify(flow->set, - flow->port_id, + flow->flow_id, FLOW_UP); } @@ -836,7 +836,7 @@ ssize_t flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -880,7 +880,7 @@ ssize_t flow_write(int fd, if (ret < 0) shm_rdrbuff_remove(ai.rdrb, idx); else - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -918,7 +918,7 @@ ssize_t flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } @@ -1050,11 +1050,11 @@ int fset_add(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].port_id); + ret = shm_flow_set_add(ai.fqset, set->idx, ai.flows[fd].flow_id); packets = shm_rbuff_queued(ai.flows[fd].rx_rb); for (i = 0; i < packets; i++) - shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT); + shm_flow_set_notify(ai.fqset, ai.flows[fd].flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1069,8 +1069,8 @@ void fset_del(struct flow_set * set, pthread_rwlock_wrlock(&ai.lock); - if (ai.flows[fd].port_id >= 0) - shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].port_id); + if (ai.flows[fd].flow_id >= 0) + shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id); pthread_rwlock_unlock(&ai.lock); } @@ -1085,12 +1085,12 @@ bool fset_has(const struct flow_set * set, pthread_rwlock_rdlock(&ai.lock); - if (ai.flows[fd].port_id < 0) { + if (ai.flows[fd].flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return false; } - ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].port_id) == 1); + ret = (shm_flow_set_has(ai.fqset, set->idx, ai.flows[fd].flow_id) == 1); pthread_rwlock_unlock(&ai.lock); @@ -1166,35 +1166,35 @@ int fevent(struct flow_set * set, /* ipcp-dev functions. */ int np1_flow_alloc(pid_t n_pid, - int port_id, + int flow_id, qosspec_t qs) { - return flow_init(port_id, n_pid, qs); + return flow_init(flow_id, n_pid, qs); } -int np1_flow_dealloc(int port_id) +int np1_flow_dealloc(int flow_id) { int fd; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); return fd; } -int np1_flow_resp(int port_id) +int np1_flow_resp(int flow_id) { int fd; - if (port_wait_assign(port_id) != PORT_ID_ASSIGNED) + if (port_wait_assign(flow_id) != PORT_ID_ASSIGNED) return -1; pthread_rwlock_rdlock(&ai.lock); - fd = ai.ports[port_id].fd; + fd = ai.ports[flow_id].fd; pthread_rwlock_unlock(&ai.lock); @@ -1254,7 +1254,7 @@ int ipcp_flow_req_arr(pid_t pid, if (recv_msg == NULL) return -EIRMD; - if (!recv_msg->has_port_id || !recv_msg->has_pid) { + if (!recv_msg->has_flow_id || !recv_msg->has_pid) { irm_msg__free_unpacked(recv_msg, NULL); return -1; } @@ -1264,7 +1264,7 @@ int ipcp_flow_req_arr(pid_t pid, return -1; } - fd = flow_init(recv_msg->port_id, recv_msg->pid, qs); + fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs); irm_msg__free_unpacked(recv_msg, NULL); @@ -1281,11 +1281,11 @@ int ipcp_flow_alloc_reply(int fd, assert(fd >= 0 && fd < SYS_MAX_FLOWS); msg.code = IRM_MSG_CODE__IPCP_FLOW_ALLOC_REPLY; - msg.has_port_id = true; + msg.has_flow_id = true; pthread_rwlock_rdlock(&ai.lock); - msg.port_id = ai.flows[fd].port_id; + msg.flow_id = ai.flows[fd].flow_id; pthread_rwlock_unlock(&ai.lock); @@ -1322,7 +1322,7 @@ int ipcp_flow_read(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); rb = flow->rx_rb; @@ -1360,7 +1360,7 @@ int ipcp_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(flow->port_id >= 0); + assert(flow->flow_id >= 0); if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) { pthread_rwlock_unlock(&ai.lock); @@ -1378,7 +1378,7 @@ int ipcp_flow_write(int fd, ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); @@ -1424,7 +1424,7 @@ void ipcp_flow_fini(int fd) shm_rbuff_set_acl(ai.flows[fd].tx_rb, ACL_FLOWDOWN); shm_flow_set_notify(ai.flows[fd].set, - ai.flows[fd].port_id, + ai.flows[fd].flow_id, FLOW_DEALLOC); rx_rb = ai.flows[fd].rx_rb; @@ -1444,7 +1444,7 @@ int ipcp_flow_get_qoscube(int fd, pthread_rwlock_rdlock(&ai.lock); - assert(ai.flows[fd].port_id >= 0); + assert(ai.flows[fd].flow_id >= 0); *cube = qos_spec_to_cube(ai.flows[fd].spec); @@ -1480,14 +1480,14 @@ int local_flow_write(int fd, pthread_rwlock_rdlock(&ai.lock); - if (flow->port_id < 0) { + if (flow->flow_id < 0) { pthread_rwlock_unlock(&ai.lock); return -ENOTALLOC; } ret = shm_rbuff_write(flow->tx_rb, idx); if (ret == 0) - shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT); + shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT); pthread_rwlock_unlock(&ai.lock); diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto index 48198a5b..ae1014ac 100644 --- a/src/lib/ipcpd_messages.proto +++ b/src/lib/ipcpd_messages.proto @@ -42,7 +42,7 @@ enum ipcp_msg_code { message ipcp_msg { required ipcp_msg_code code = 1; optional bytes hash = 2; - optional int32 port_id = 3; + optional int32 flow_id = 3; optional string dst = 4; optional qosspec_msg qosspec = 5; optional ipcp_config_msg conf = 6; diff --git a/src/lib/irmd_messages.proto b/src/lib/irmd_messages.proto index 2ed2ec37..351b4a8e 100644 --- a/src/lib/irmd_messages.proto +++ b/src/lib/irmd_messages.proto @@ -67,7 +67,7 @@ message irm_msg { optional sint32 response = 8; optional string dst = 9; optional bytes hash = 10; - optional sint32 port_id = 11; + optional sint32 flow_id = 11; optional qosspec_msg qosspec = 12; optional ipcp_config_msg conf = 13; optional uint32 opts = 14; diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c index 008e4a0d..1c94c599 100644 --- a/src/lib/shm_flow_set.c +++ b/src/lib/shm_flow_set.c @@ -64,7 +64,7 @@ #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx) struct portevent { - int port_id; + int flow_id; int event; }; @@ -268,20 +268,20 @@ void shm_flow_set_zero(struct shm_flow_set * set, int shm_flow_set_add(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] != -1) { + if (set->mtable[flow_id] != -1) { pthread_mutex_unlock(set->lock); return -EPERM; } - set->mtable[port_id] = idx; + set->mtable[flow_id] = idx; pthread_mutex_unlock(set->lock); @@ -290,33 +290,33 @@ int shm_flow_set_add(struct shm_flow_set * set, void shm_flow_set_del(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == (ssize_t) idx) - set->mtable[port_id] = -1; + if (set->mtable[flow_id] == (ssize_t) idx) + set->mtable[flow_id] = -1; pthread_mutex_unlock(set->lock); } int shm_flow_set_has(struct shm_flow_set * set, size_t idx, - int port_id) + int flow_id) { int ret = 0; assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); assert(idx < PROG_MAX_FQUEUES); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == (ssize_t) idx) + if (set->mtable[flow_id] == (ssize_t) idx) ret = 1; pthread_mutex_unlock(set->lock); @@ -325,25 +325,25 @@ int shm_flow_set_has(struct shm_flow_set * set, } void shm_flow_set_notify(struct shm_flow_set * set, - int port_id, + int flow_id, int event) { assert(set); - assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); pthread_mutex_lock(set->lock); - if (set->mtable[port_id] == -1) { + if (set->mtable[flow_id] == -1) { pthread_mutex_unlock(set->lock); return; } - (fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]]))->port_id = port_id; - (fqueue_ptr(set, set->mtable[port_id]) + - (set->heads[set->mtable[port_id]])++)->event = event; + (fqueue_ptr(set, set->mtable[flow_id]) + + (set->heads[set->mtable[flow_id]]))->flow_id = flow_id; + (fqueue_ptr(set, set->mtable[flow_id]) + + (set->heads[set->mtable[flow_id]])++)->event = event; - pthread_cond_signal(&set->conds[set->mtable[port_id]]); + pthread_cond_signal(&set->conds[set->mtable[flow_id]]); pthread_mutex_unlock(set->lock); } diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 5319e89c..a6eab699 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -66,7 +66,7 @@ struct shm_rbuff { pthread_cond_t * add; /* packet arrived */ pthread_cond_t * del; /* packet removed */ pid_t pid; /* pid of the owner */ - int port_id; /* port_id of the flow */ + int flow_id; /* flow_id of the flow */ }; void shm_rbuff_close(struct shm_rbuff * rb) @@ -81,7 +81,7 @@ void shm_rbuff_close(struct shm_rbuff * rb) #define MM_FLAGS (PROT_READ | PROT_WRITE) struct shm_rbuff * rbuff_create(pid_t pid, - int port_id, + int flow_id, int flags) { struct shm_rbuff * rb; @@ -89,7 +89,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, ssize_t * shm_base; char fn[FN_MAX_CHARS]; - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", pid, port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", pid, flow_id); rb = malloc(sizeof(*rb)); if (rb == NULL) @@ -116,7 +116,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, rb->add = (pthread_cond_t *) (rb->lock + 1); rb->del = rb->add + 1; rb->pid = pid; - rb->port_id = port_id; + rb->flow_id = flow_id; return rb; @@ -131,7 +131,7 @@ struct shm_rbuff * rbuff_create(pid_t pid, } struct shm_rbuff * shm_rbuff_create(pid_t pid, - int port_id) + int flow_id) { struct shm_rbuff * rb; pthread_mutexattr_t mattr; @@ -140,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, mask = umask(0); - rb = rbuff_create(pid, port_id, O_CREAT | O_EXCL | O_RDWR); + rb = rbuff_create(pid, flow_id, O_CREAT | O_EXCL | O_RDWR); umask(mask); @@ -175,7 +175,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, *rb->tail = 0; rb->pid = pid; - rb->port_id = port_id; + rb->flow_id = flow_id; pthread_mutexattr_destroy(&mattr); pthread_condattr_destroy(&cattr); @@ -197,9 +197,9 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, } struct shm_rbuff * shm_rbuff_open(pid_t pid, - int port_id) + int flow_id) { - return rbuff_create(pid, port_id, O_RDWR); + return rbuff_create(pid, flow_id, O_RDWR); } #if (defined(SHM_RBUFF_LOCKLESS) && \ diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 43eac7f6..0fc9ae7b 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -29,7 +29,7 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) assert(rb); - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); shm_rbuff_close(rb); diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index c74503ff..51d801f6 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -33,7 +33,7 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) pthread_mutex_unlock(rb->lock); #endif - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); shm_rbuff_close(rb); -- cgit v1.2.3