aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rumba/model.py53
-rw-r--r--rumba/prototypes/irati.py332
-rw-r--r--rumba/prototypes/irati_templates.py338
-rw-r--r--rumba/testbeds/qemu.py150
-rwxr-xr-xtools/democonf2rumba.py16
5 files changed, 815 insertions, 74 deletions
diff --git a/rumba/model.py b/rumba/model.py
index 613a6be..8e63822 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -20,7 +20,7 @@
# MA 02110-1301 USA
import abc
-import re
+
# Represents generic testbed info
#
@@ -65,7 +65,7 @@ class DIF:
return hash(self.name)
def __eq__(self, other):
- return other != None and self.name == other.name
+ return other is not None and self.name == other.name
def __neq__(self, other):
return not self == other
@@ -79,6 +79,7 @@ class DIF:
def get_ipcp_class(self):
return IPCP
+
# Shim over UDP
#
class ShimUDPDIF(DIF):
@@ -88,6 +89,7 @@ class ShimUDPDIF(DIF):
def get_ipcp_class(self):
return ShimUDPIPCP
+
# Shim over Ethernet
#
# @link_speed [int] Speed of the Ethernet network, in Mbps
@@ -102,6 +104,7 @@ class ShimEthDIF(DIF):
def get_ipcp_class(self):
return ShimEthIPCP
+
# Normal DIF
#
# @policies [dict] Policies of the normal DIF
@@ -125,6 +128,7 @@ class NormalDIF(DIF):
s += "\n Component %s has policy %s" % (comp, pol)
return s
+
# SSH Configuration
#
class SSHConfig:
@@ -132,6 +136,7 @@ class SSHConfig:
self.hostname = hostname
self.port = port
+
# A node in the experiment
#
# @difs: DIFs the node will have an IPCP in
@@ -164,7 +169,7 @@ class Node:
def _undeclared_dif(self, dif):
if dif not in self.difs:
- raise Exception("Invalid registration: node %s is not declared "\
+ raise Exception("Invalid registration: node %s is not declared "
"to be part of DIF %s" % (self.name, dif.name))
def _validate(self):
@@ -206,8 +211,8 @@ class Node:
s += " ]\n"
s += " Bindings: [ "
- s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) \
- for ap in self.bindings])
+ s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap])
+ for ap in self.bindings])
s += " ]\n"
return s
@@ -216,7 +221,7 @@ class Node:
return hash(self.name)
def __eq__(self, other):
- return other != None and self.name == other.name
+ return other is not None and self.name == other.name
def __neq__(self, other):
return not self == other
@@ -255,6 +260,7 @@ class Node:
del self.bindings[name]
self._validate()
+
# Base class representing an IPC Process to be created in the experiment
#
# @name [string]: IPCP name
@@ -277,28 +283,31 @@ class IPCP:
(self.name, self.dif.name,
' '.join([dif.name for dif in self.registrations]),
',bootstrapper' if self.dif_bootstrapper else ''
- )
+ )
def __hash__(self):
return hash((self.name, self.dif.name))
def __eq__(self, other):
- return other != None and self.name == other.name \
+ return other is not None and self.name == other.name \
and self.dif == other.dif
def __neq__(self, other):
return not self == other
+
class ShimEthIPCP(IPCP):
def __init__(self, name, node, dif, ifname=None):
IPCP.__init__(self, name, node, dif)
self.ifname = ifname
+
class ShimUDPIPCP(IPCP):
def __init__(self, name, node, dif):
IPCP.__init__(self, name, node, dif)
# TODO: add IP and port
+
# Base class for ARCFIRE experiments
#
# @name [string] Name of the experiment
@@ -312,7 +321,7 @@ class Experiment:
self.testbed = testbed
self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual'
self.dif_ordering = []
- self.enrollments = [] # a list of per-DIF lists of enrollments
+ self.enrollments = [] # a list of per-DIF lists of enrollments
# Generate missing information
self.generate()
@@ -360,8 +369,8 @@ class Experiment:
difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
del difsdeps_inc
- #print(difsdeps_adj)
- #print(difsdeps_inc_cnt)
+ # print(difsdeps_adj)
+ # print(difsdeps_inc_cnt)
# Run Kahn's algorithm to compute topological
# ordering on the DIFs graph.
@@ -380,12 +389,12 @@ class Experiment:
frontier.add(nxt)
difsdeps_adj[cur] = set()
- circular_set = [dif for dif in difsdeps_inc_cnt \
+ circular_set = [dif for dif in difsdeps_inc_cnt
if difsdeps_inc_cnt[dif] != 0]
if len(circular_set):
- raise Exception("Fatal error: The specified DIFs topology" \
- "has one or more" \
- "circular dependencies, involving the following" \
+ raise Exception("Fatal error: The specified DIFs topology"
+ "has one or more"
+ "circular dependencies, involving the following"
" DIFs: %s" % circular_set)
print("DIF topological ordering: %s" % self.dif_ordering)
@@ -406,8 +415,8 @@ class Experiment:
for node in self.nodes:
if dif in node.dif_registrations:
- dif_graphs[dif][node] = [] # init for later use
- if first is None: # pick any node for later use
+ dif_graphs[dif][node] = [] # init for later use
+ if first is None: # pick any node for later use
first = node
for lower_dif in node.dif_registrations[dif]:
if lower_dif not in neighsets:
@@ -468,11 +477,11 @@ class Experiment:
print("Enrollments:")
for el in self.enrollments:
for e in el:
- print(" [%s] %s --> %s through N-1-DIF %s" % \
- (e['dif'],
- e['enrollee'].name,
- e['enroller'].name,
- e['lower_dif']))
+ print(" [%s] %s --> %s through N-1-DIF %s" %
+ (e['dif'],
+ e['enrollee'].name,
+ e['enroller'].name,
+ e['lower_dif']))
def compute_ipcps(self):
# For each node, compute the required IPCP instances, and associated
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index e8766da..44eadb0 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -2,6 +2,7 @@
# Commands to setup and instruct IRATI
#
# Vincenzo Maffione <[email protected]>
+# Marco Capitani <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -17,32 +18,343 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
+import copy
+import json
+
+import subprocess
+
+import os
import rumba.ssh_support as ssh
import rumba.model as mod
+import rumba.prototypes.irati_templates as irati_templates
+
# An experiment over the IRATI implementation
+from rumba import ssh_support
+
+
class Experiment(mod.Experiment):
+
+ @staticmethod
+ def real_sudo(s):
+ return 'sudo ' + s
+
+ @staticmethod
+ def fake_sudo(s):
+ return s
+
def __init__(self, testbed, nodes=None):
mod.Experiment.__init__(self, testbed, nodes)
+ self.manager = False
+
+ if self.testbed.username == 'root':
+ self.sudo = self.fake_sudo
+ else:
+ self.sudo = self.real_sudo
def setup(self):
- cmds = list()
+ """Installs IRATI on the vms."""
+ setup_irati = False
+ if setup_irati:
+ cmds = list()
- cmds.append("sudo apt-get update")
- cmds.append("sudo apt-get install g++ gcc "
- "protobuf-compiler libprotobuf-dev git --yes")
- cmds.append("sudo rm -rf ~/irati")
- cmds.append("cd && git clone https://github.com/IRATI/stack irati")
- cmds.append("cd ~/irati && sudo ./install-from-scratch")
- cmds.append("sudo nohup ipcm &> ipcm.log &")
+ cmds.append("sudo apt-get update")
+ cmds.append("sudo apt-get install g++ gcc "
+ "protobuf-compiler libprotobuf-dev git --yes")
+ cmds.append("sudo rm -rf ~/irati")
+ cmds.append("cd && git clone https://github.com/IRATI/stack irati")
+ cmds.append("cd ~/irati && sudo ./install-from-scratch")
+ cmds.append("sudo nohup ipcm &> ipcm.log &")
+ for node in self.nodes:
+ ssh.execute_commands(self.testbed, node.ssh_config,
+ cmds, time_out=None)
+
+ def bootstrap_network(self):
+ """Creates the network by enrolling and configuring the nodes"""
for node in self.nodes:
- ssh.execute_commands(self.testbed, node.ssh_config,
- cmds, time_out=None)
+ self.process_node(node)
+ self.enroll_nodes()
+
+ def run_experiment(self):
+ input('Press ENTER to quit.')
def run_prototype(self):
print("[IRATI experiment] start")
print("Setting up IRATI on the nodes...")
self.setup()
+ self.write_conf()
+ self.bootstrap_network()
+ self.run_experiment()
print("[IRATI experiment] end")
+
+ def process_node(self, node):
+ """
+ Installs the configuration and boots up rina on a node
+ :type node: mod.Node
+ :param node:
+ :return:
+ """
+ name = node.name
+ gen_files_conf = 'shimeth.%(name)s.*.dif da.map %(name)s.ipcm.conf' % {
+ 'name': name}
+ if any(node in dif.members for dif in self.dif_ordering):
+ gen_files_conf = ' '.join(
+ [gen_files_conf, 'normal.%(name)s.*.dif' % {'name': name}])
+ dir_path = os.path.dirname(os.path.abspath(__file__))
+ gen_files_bin = 'enroll.py'
+ gen_files_bin_full = os.path.join(dir_path, 'enroll.py')
+
+ ipcm_components = ['scripting', 'console']
+ if self.manager:
+ ipcm_components.append('mad')
+ ipcm_components = ', '.join(ipcm_components)
+
+ gen_files = ' '.join([gen_files_conf, gen_files_bin_full])
+
+ sshopts = ('-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'
+ ' -o IdentityFile=buildroot/irati_rsa')
+ format_args = {'name': name,
+ 'ssh': node.ssh_config.port,
+ 'username': self.testbed.username,
+ 'genfiles': gen_files,
+ 'genfilesconf': gen_files_conf,
+ 'genfilesbin': gen_files_bin,
+ 'sshopts': sshopts,
+ 'installpath': '/usr',
+ 'verb': 'DBG',
+ 'ipcmcomps': ipcm_components}
+ try:
+ print('DEBUG: executing >> '
+ 'scp %(sshopts)s -r -P %(ssh)s '
+ '%(genfiles)s %(username)s@localhost:'
+ % format_args)
+ subprocess.check_call(('scp %(sshopts)s -r -P %(ssh)s '
+ '%(genfiles)s %(username)s@localhost:'
+ % format_args), shell=True)
+ except subprocess.CalledProcessError as e:
+ raise Exception(str(e))
+
+ # TODO: review ssh opts through ssh support
+
+ cmds = [self.sudo('hostname %(name)s' % format_args),
+ self.sudo('chmod a+rw /dev/irati'),
+ self.sudo('mv %(genfilesconf)s /etc' % format_args),
+ self.sudo('mv %(genfilesbin)s /usr/bin') % format_args]
+
+ # TODO: is the port up on the VM at this point?
+
+ cmds += [self.sudo('modprobe rina-default-plugin'),
+ self.sudo('%(installpath)s/bin/ipcm -a \"%(ipcmcomps)s\" '
+ '-c /etc/%(name)s.ipcm.conf -l %(verb)s &> log &'
+ % format_args)]
+
+ print('DEBUG: sending node setup via ssh.')
+ # print('Credentials:')
+ # print(node.ssh_config.hostname, node.ssh_config.port,
+ # self.testbed.username, self.testbed.password)
+ ssh_support.execute_commands(self.testbed, node.ssh_config, cmds)
+
+ def enroll_nodes(self):
+ """Runs the enrollments one by one, respecting dependencies"""
+ for enrollment_list in self.enrollments:
+ for e in enrollment_list:
+ print(
+ 'I am going to enroll %s to DIF %s against neighbor %s,'
+ ' through lower DIF %s'
+ % (e['enrollee'],
+ e['dif'].name,
+ e['enroller'],
+ e['lower_dif'].name))
+
+ subprocess.check_call('sleep 2'. split()) # Important!
+
+ e_args = {'ldif': e['lower_dif'].name,
+ 'dif': e['dif'].name,
+ 'name': e['enrollee'].name,
+ 'o_name': e['enroller'].name}
+
+ cmd = self.sudo('enroll.py --lower-dif %(ldif)s --dif %(dif)s '
+ '--ipcm-conf /etc/%(name)s.ipcm.conf '
+ '--enrollee-name %(dif)s.%(name)s.IPCP '
+ '--enroller-name %(dif)s.%(o_name)s.IPCP'
+ % e_args)
+ print('DEBUG: sending enrollment operation via ssh.')
+ # print('Credentials:')
+ # print(e['enrollee'].ssh_config.hostname,
+ # e['enrollee'].ssh_config.port,
+ # self.testbed.username, self.testbed.password)
+ ssh_support.execute_command(self.testbed,
+ e['enrollee'].ssh_config,
+ cmd)
+
+ def write_conf(self):
+ """Write the configuration files"""
+ # Constants and initializations
+ ipcmconfs = dict()
+ difconfs = dict()
+ ipcp2shim_map = {}
+ node2id_map = {}
+ mgmt_dif_name = 'NMS'
+
+ # TODO: what format are the mappings registered in? Is this ok?
+ app_mappings = []
+ for node in self.nodes:
+ app_mappings += [{'name': app, 'dif': dif.name}
+ for app in node.registrations
+ for dif in node.registrations[app]]
+
+ # If some app directives were specified, use those to build da.map.
+ # Otherwise, assume the standard applications are to be mapped in
+ # the DIF with the highest rank.
+ if len(app_mappings) == 0:
+ if len(self.dif_ordering) > 0:
+ for adm in \
+ irati_templates.da_map_base["applicationToDIFMappings"]:
+ adm["difName"] = "%s" % (self.dif_ordering[-1],)
+ else:
+ irati_templates.da_map_base["applicationToDIFMappings"] = []
+ for apm in app_mappings:
+ irati_templates.da_map_base["applicationToDIFMappings"]\
+ .append({"encodedAppName": apm['name'],
+ "difName": "%s" % (apm['dif'])
+ })
+
+ # TODO ask: I guess this will need to be added,
+ # and in that case we should add it to the qemu plugin too...
+ # Where should we take it in input?
+
+ if self.manager:
+ # Add MAD/Manager configuration
+ irati_templates.ipcmconf_base["addons"] = {
+ "mad": {
+ "managerAppName": "",
+ "NMSDIFs": [{"DIF": "%s" % mgmt_dif_name}],
+ "managerConnections": [{
+ "managerAppName": "manager-1--",
+ "DIF": "%s" % mgmt_dif_name
+ }]
+ }
+ }
+
+ node_number = 1
+ for node in self.nodes: # type: mod.Node
+ node2id_map[node.name] = node_number
+ node_number += 1
+ ipcmconfs[node.name] = copy.deepcopy(irati_templates.ipcmconf_base)
+ if self.manager:
+ ipcmconfs[node.name]["addons"]["mad"]["managerAppName"] \
+ = "%s.mad-1--" % (node.name,)
+
+ for dif in self.dif_ordering: # type: mod.DIF
+ if isinstance(dif, mod.ShimEthDIF):
+ ipcp2shim_map.update({ipcp.name: dif for ipcp in dif.ipcps})
+ elif isinstance(dif, mod.NormalDIF):
+ difconfs[dif.name] = dict()
+ for node in dif.members:
+ difconfs[dif.name][node.name] = copy.deepcopy(
+ irati_templates.normal_dif_base
+ )
+
+ for node in self.nodes: # type: mod.Node
+ ipcmconf = ipcmconfs[node.name]
+
+ for ipcp in node.ipcps: # type: mod.ShimEthIPCP
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim = ipcp2shim_map[ipcp.name] # type: mod.ShimEthDIF
+ ipcmconf["ipcProcessesToCreate"].append({
+ "apName": "eth.%s.IPCP" % ipcp.name,
+ "apInstance": "1",
+ "difName": shim.name
+ })
+
+ template_file_name = 'shimeth.%s.%s.dif' \
+ % (node.name, shim.name)
+ ipcmconf["difConfigurations"].append({
+ "name": shim.name,
+ "template": template_file_name
+ })
+
+ fout = open(template_file_name, 'w')
+ fout.write(json.dumps(
+ {"difType": "shim-eth-vlan",
+ "configParameters": {
+ "interface-name": ipcp.ifname
+ }
+ },
+ indent=4, sort_keys=True))
+ fout.close()
+
+ # Run over dif_ordering array, to make sure each IPCM config has
+ # the correct ordering for the ipcProcessesToCreate list of operations.
+ # If we iterated over the difs map, the order would be randomic, and so
+ # some IPCP registrations in lower DIFs may fail.
+ # This would happen because at the moment of registration,
+ # it may be that the IPCP of the lower DIF has not been created yet.
+ shims = ipcp2shim_map.values()
+ for dif in self.dif_ordering: # type: mod.NormalDIF
+
+ if dif in shims:
+ # Shims are managed separately, in the previous loop
+ continue
+
+ for node in dif.members: # type: mod.Node
+ node_name = node.name
+ ipcmconf = ipcmconfs[node_name]
+
+ normal_ipcp = {"apName": "%s.%s.IPCP" % (dif.name, node_name),
+ "apInstance": "1",
+ "difName": "%s" % (dif.name,),
+ "difsToRegisterAt": []}
+
+ for lower_dif in node.dif_registrations[dif]: # type: mod.DIF
+ normal_ipcp["difsToRegisterAt"].append(lower_dif.name)
+
+ ipcmconf["ipcProcessesToCreate"].append(normal_ipcp)
+
+ ipcmconf["difConfigurations"].append({
+ "name": "%s" % (dif.name,),
+ "template": "normal.%s.%s.dif" % (node_name, dif.name,)
+ })
+
+ # Fill in the map of IPCP addresses.
+ # This could be moved at difconfs
+ for other_node in dif.members: # type: mod.Node
+ difconfs[dif.name][other_node.name] \
+ ["knownIPCProcessAddresses"].append({
+ "apName": "%s.%s.IPCP" % (dif.name, node_name),
+ "apInstance": "1",
+ "address": 16 + node2id_map[node_name]})
+ for path, ps in dif.policies.items():
+ # if policy['nodes'] == [] or vmname in policy['nodes']:
+ # TODO: manage per-node-policies
+ irati_templates.translate_policy(
+ difconfs[dif.name][node_name], path, ps, parms=[])
+
+ # Dump the DIF Allocator map
+ with open('da.map', 'w') as da_map_file:
+ json.dump(irati_templates.da_map_base,
+ da_map_file,
+ indent=4,
+ sort_keys=True)
+
+ for node in self.nodes:
+ # Dump the IPCM configuration files
+ with open('%s.ipcm.conf' % (node.name,), 'w') as node_file:
+ json.dump(ipcmconfs[node.name],
+ node_file,
+ indent=4,
+ sort_keys=True)
+
+ for dif in self.dif_ordering: # type: mod.DIF
+ dif_conf = difconfs.get(dif.name, None)
+ if dif_conf:
+ # Dump the normal DIF configuration files
+ for node in dif.members:
+ with open('normal.%s.%s.dif' % (node.name, dif.name), 'w') \
+ as dif_conf_file:
+ json.dump(dif_conf[node.name],
+ dif_conf_file,
+ indent=4,
+ sort_keys=True)
diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py
new file mode 100644
index 0000000..0f3ef05
--- /dev/null
+++ b/rumba/prototypes/irati_templates.py
@@ -0,0 +1,338 @@
+# Environment setup for VMs. Standard linux approach
+env_dict = {'installpath': '/usr', 'varpath': ''}
+
+# Template for a IPCM configuration file
+ipcmconf_base = {
+ "configFileVersion": "1.4.1",
+ "localConfiguration": {
+ "installationPath": "%(installpath)s/bin" % env_dict,
+ "libraryPath": "%(installpath)s/lib" % env_dict,
+ "logPath": "%(varpath)s/var/log" % env_dict,
+ "consoleSocket": "%(varpath)s/var/run/ipcm-console.sock" % env_dict,
+ "pluginsPaths": [
+ "%(installpath)s/lib/rinad/ipcp" % env_dict,
+ "/lib/modules/4.1.33-irati/extra"
+ ]
+ },
+
+ "ipcProcessesToCreate": [],
+ "difConfigurations": [],
+}
+
+
+da_map_base = {
+ "applicationToDIFMappings": [
+ {
+ "encodedAppName": "rina.apps.echotime.server-1--",
+ "difName": "n.DIF"
+ },
+ {
+ "encodedAppName": "traffic.generator.server-1--",
+ "difName": "n.DIF"
+ }
+ ],
+}
+
+
+# Template for a normal DIF configuration file
+normal_dif_base = {
+ "difType" : "normal-ipc",
+ "dataTransferConstants" : {
+ "addressLength" : 2,
+ "cepIdLength" : 2,
+ "lengthLength" : 2,
+ "portIdLength" : 2,
+ "qosIdLength" : 2,
+ "rateLength" : 4,
+ "frameLength" : 4,
+ "sequenceNumberLength" : 4,
+ "ctrlSequenceNumberLength" : 4,
+ "maxPduSize" : 10000,
+ "maxPduLifetime" : 60000
+ },
+
+ "qosCubes" : [ {
+ "name" : "unreliablewithflowcontrol",
+ "id" : 1,
+ "partialDelivery" : False,
+ "orderedDelivery" : True,
+ "efcpPolicies" : {
+ "dtpPolicySet" : {
+ "name" : "default",
+ "version" : "0"
+ },
+ "initialATimer" : 0,
+ "dtcpPresent" : True,
+ "dtcpConfiguration" : {
+ "dtcpPolicySet" : {
+ "name" : "default",
+ "version" : "0"
+ },
+ "rtxControl" : False,
+ "flowControl" : True,
+ "flowControlConfig" : {
+ "rateBased" : False,
+ "windowBased" : True,
+ "windowBasedConfig" : {
+ "maxClosedWindowQueueLength" : 10,
+ "initialCredit" : 200
+ }
+ }
+ }
+ }
+ }, {
+ "name" : "reliablewithflowcontrol",
+ "id" : 2,
+ "partialDelivery" : False,
+ "orderedDelivery" : True,
+ "maxAllowableGap": 0,
+ "efcpPolicies" : {
+ "dtpPolicySet" : {
+ "name" : "default",
+ "version" : "0"
+ },
+ "initialATimer" : 0,
+ "dtcpPresent" : True,
+ "dtcpConfiguration" : {
+ "dtcpPolicySet" : {
+ "name" : "default",
+ "version" : "0"
+ },
+ "rtxControl" : True,
+ "rtxControlConfig" : {
+ "dataRxmsNmax" : 5,
+ "initialRtxTime" : 1000
+ },
+ "flowControl" : True,
+ "flowControlConfig" : {
+ "rateBased" : False,
+ "windowBased" : True,
+ "windowBasedConfig" : {
+ "maxClosedWindowQueueLength" : 10,
+ "initialCredit" : 200
+ }
+ }
+ }
+ }
+ } ],
+
+ "knownIPCProcessAddresses": [],
+
+ "addressPrefixes" : [ {
+ "addressPrefix" : 0,
+ "organization" : "N.Bourbaki"
+ }, {
+ "addressPrefix" : 16,
+ "organization" : "IRATI"
+ } ],
+
+ "rmtConfiguration" : {
+ "pffConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "0"
+ }
+ },
+ "policySet" : {
+ "name" : "default",
+ "version" : "1"
+ }
+ },
+
+ "enrollmentTaskConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "1",
+ "parameters" : [ {
+ "name" : "enrollTimeoutInMs",
+ "value" : "10000"
+ }, {
+ "name" : "watchdogPeriodInMs",
+ "value" : "30000"
+ }, {
+ "name" : "declaredDeadIntervalInMs",
+ "value" : "120000"
+ }, {
+ "name" : "neighborsEnrollerPeriodInMs",
+ "value" : "0"
+ }, {
+ "name" : "maxEnrollmentRetries",
+ "value" : "0"
+ } ]
+ }
+ },
+
+ "flowAllocatorConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "1"
+ }
+ },
+
+ "namespaceManagerConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "1"
+ }
+ },
+
+ "securityManagerConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "1"
+ }
+ },
+
+ "resourceAllocatorConfiguration" : {
+ "pduftgConfiguration" : {
+ "policySet" : {
+ "name" : "default",
+ "version" : "0"
+ }
+ }
+ },
+
+ "routingConfiguration" : {
+ "policySet" : {
+ "name" : "link-state",
+ "version" : "1",
+ "parameters" : [ {
+ "name" : "objectMaximumAge",
+ "value" : "10000"
+ },{
+ "name" : "waitUntilReadCDAP",
+ "value" : "5001"
+ },{
+ "name" : "waitUntilError",
+ "value" : "5001"
+ },{
+ "name" : "waitUntilPDUFTComputation",
+ "value" : "103"
+ },{
+ "name" : "waitUntilFSODBPropagation",
+ "value" : "101"
+ },{
+ "name" : "waitUntilAgeIncrement",
+ "value" : "997"
+ },{
+ "name" : "routingAlgorithm",
+ "value" : "Dijkstra"
+ }]
+ }
+ }
+}
+
+
+def ps_set(d, k, v, parms):
+ if k not in d:
+ d[k] = {'name': '', 'version': '1'}
+
+ if d[k]["name"] == v and "parameters" in d[k]:
+ cur_names = [p["name"] for p in d[k]["parameters"]]
+ for p in parms:
+ name, value = p.split('=')
+ if name in cur_names:
+ for i in range(len(d[k]["parameters"])):
+ if d[k]["parameters"][i]["name"] == name:
+ d[k]["parameters"][i]["value"] = value
+ break
+ else:
+ d[k]["parameters"].append({ 'name': name, 'value': value })
+
+ elif len(parms) > 0:
+ d[k]["parameters"] = [ { 'name': p.split('=')[0], 'value': p.split('=')[1]} for p in parms ]
+
+ d[k]["name"] = v
+
+
+def dtp_ps_set(d, v, parms):
+ for i in range(len(d["qosCubes"])):
+ ps_set(d["qosCubes"][i]["efcpPolicies"], "dtpPolicySet", v, parms)
+
+
+def dtcp_ps_set(d, v, parms):
+ for i in range(len(d["qosCubes"])):
+ ps_set(d["qosCubes"][i]["efcpPolicies"]["dtcpConfiguration"], "dtcpPolicySet", v, parms)
+
+
+policy_translator = {
+ 'rmt.pff': lambda d, v, p: ps_set(d["rmtConfiguration"]["pffConfiguration"],
+ "policySet", v, p),
+ 'rmt': lambda d, v, p: ps_set(d["rmtConfiguration"], "policySet", v, p),
+ 'enrollment-task': lambda d, v, p: ps_set(d["enrollmentTaskConfiguration"],
+ "policySet", v, p),
+ 'flow-allocator': lambda d, v, p: ps_set(d["flowAllocatorConfiguration"],
+ "policySet", v, p),
+ 'namespace-manager': lambda d, v, p: ps_set(
+ d["namespaceManagerConfiguration"], "policySet", v, p),
+ 'security-manager': lambda d, v, p: ps_set(
+ d["securityManagerConfiguration"], "policySet", v, p),
+ 'routing': lambda d, v, p: ps_set(d["routingConfiguration"], "policySet", v, p),
+ 'resource-allocator.pduftg': lambda d, v, p: ps_set(
+ d["resourceAllocatorConfiguration"], "policySet", v, p),
+ 'efcp.*.dtcp': None,
+ 'efcp.*.dtp': None,
+}
+
+
+def is_security_path(path):
+ sp = path.split('.')
+ return (len(sp) == 3) and (sp[0] == 'security-manager') \
+ and (sp[1] in ['auth', 'encrypt', 'ttl', 'errorcheck'])
+
+
+# Do we know this path ?
+def policy_path_valid(path):
+ if path in policy_translator:
+ return True
+
+ # Try to validate security configuration
+ if is_security_path(path):
+ return True
+
+ return False
+
+
+def translate_security_path(d, path, ps, parms):
+ u1, component, profile = path.split('.')
+ if "authSDUProtProfiles" not in d["securityManagerConfiguration"]:
+ d["securityManagerConfiguration"]["authSDUProtProfiles"] = {}
+ d = d["securityManagerConfiguration"]["authSDUProtProfiles"]
+
+ tr = {'auth': 'authPolicy', 'encrypt': 'encryptPolicy',
+ 'ttl': 'TTLPolicy', 'errorcheck': 'ErrorCheckPolicy'}
+
+ if profile == 'default':
+ if profile not in d:
+ d["default"] = {}
+
+ ps_set(d["default"], tr[component], ps, parms)
+
+ else: # profile is the name of a DIF
+ if "specific" not in d:
+ d["specific"] = []
+ j = -1
+ for i in range(len(d["specific"])):
+ if d["specific"][i]["underlyingDIF"] == profile + ".DIF":
+ j = i
+ break
+
+ if j == -1: # We need to create an entry for the new DIF
+ d["specific"].append({"underlyingDIF" : profile + ".DIF"})
+
+ ps_set(d["specific"][j], tr[component], ps, parms)
+
+
+def translate_policy(difconf, path, ps, parms):
+ if path =='efcp.*.dtcp':
+ dtcp_ps_set(difconf, ps, parms)
+
+ elif path == 'efcp.*.dtp':
+ dtp_ps_set(difconf, ps, parms)
+
+ elif is_security_path(path):
+ translate_security_path(difconf, path, ps, parms)
+
+ else:
+ policy_translator[path](difconf, ps, parms)
+
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 3573554..d3e1698 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <[email protected]>
+# Marco Capitani <[email protected]>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -23,10 +24,12 @@ import subprocess
import os
import rumba.model as mod
+from rumba import ssh_support
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +37,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -69,6 +75,41 @@ class Testbed(mod.Testbed):
else:
results_queue.put("Command chain ran with %d errors" % errors)
+ def recover_if_names(self, experiment):
+ for node in experiment.nodes:
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim_name, node_name = ipcp.name.split('.')
+ port_set = [x for x in self.vms[node_name]['ports']
+ if x['shim'].name == shim_name]
+ port = port_set[0]
+ port_id = port['port_id']
+ vm_id = self.vms[node_name]['id']
+ mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
+ print('DEBUG: recovering ifname for port: '
+ + port['tap_id'] + '.')
+ output = ssh_support.return_commands(
+ self,
+ node.ssh_config,
+ ['mac2ifname ' + mac])
+ print('DEBUG: output is %s' % output)
+ if not hasattr(output, '__len__') or len(output) != 1:
+ raise Exception("Could not retrieve ifname for ipcp %s."
+ % ipcp.name)
+ ipcp.ifname = output[0]
+ args = {'vlan': int(port['shim'].name), 'port': ipcp.ifname}
+ cmds = ['ip link set %(port)s up'
+ % args,
+ 'ip link add link %(port)s name %(port)s.%(vlan)s '
+ 'type vlan id %(vlan)s'
+ % args,
+ 'ip link set %(port)s.%(vlan)s up'
+ % args]
+ ssh_support.execute_commands(self,
+ node.ssh_config,
+ cmds)
+
+
def swap_in(self, experiment):
"""
:type experiment mod.Experiment
@@ -88,11 +129,12 @@ class Testbed(mod.Testbed):
e_queue = multiprocessing.Queue()
print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
- command_list = []
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
continue
self.shims.append(shim)
+ ipcps = shim.ipcps
+ command_list = []
command_list += ('sudo brctl addbr %(br)s\n'
'sudo ip link set %(br)s up'
% {'br': shim.name}
@@ -113,23 +155,39 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
+ ipcp_set = [x for x in ipcps if x in node.ipcps]
+ if len(ipcp_set) > 1:
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
+ % (shim.name, node.name))
+ ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
+ assert ipcp.name == '%s.%s' % (shim.name, node.name), \
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
+ ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
# Avoid stacking processes if one failed before.
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -149,7 +207,8 @@ class Testbed(mod.Testbed):
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -157,9 +216,9 @@ class Testbed(mod.Testbed):
boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
booting_budget = boot_batch_size
- boot_backoff = 12 # in seconds
+ boot_backoff = 12 # in seconds
base_port = 2222
- vm_memory = 164 # in megabytes
+ vm_memory = 164 # in megabytes
vm_frontend = 'virtio-net-pci'
vmid = 1
@@ -167,6 +226,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
name = node.name
vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
+ vm['id'] = vmid
fwdp = base_port + vmid
fwdc = fwdp + 10000
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99)
@@ -199,7 +259,6 @@ class Testbed(mod.Testbed):
'-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
'-netdev user,id=mgmt,%(hostfwdstr)s '
'-vga std '
- '-pidfile rina-%(id)s.pid '
'-serial file:%(vmname)s.log '
% vars_dict
)
@@ -211,23 +270,29 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff)
+
+ print('Sleeping %s secs waiting '
+ 'for the VMs to boot' % boot_backoff)
+
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
- pass
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -238,6 +303,7 @@ class Testbed(mod.Testbed):
print('Sleeping %s secs waiting for the last VMs to boot' % tsleep)
time.sleep(tsleep)
+ self.recover_if_names(experiment)
def swap_out(self, experiment):
"""
@@ -267,9 +333,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -280,14 +347,17 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print(
+ 'Failure while shutting down: %s' % str(error_queue.get())
+ )
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -302,7 +372,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -314,13 +386,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
diff --git a/tools/democonf2rumba.py b/tools/democonf2rumba.py
index cda112c..d9ea8e7 100755
--- a/tools/democonf2rumba.py
+++ b/tools/democonf2rumba.py
@@ -202,22 +202,30 @@ if __name__ == '__main__':
import rumba.testbeds.emulab as emulab
test_class = emulab.Testbed
testbed_args = {a.dest: getattr(args, a.dest)
- for a in emulab_p._actions if a.dest != 'help'}
+ for a in emulab_p._actions
+ if a.dest != 'help'
+ and getattr(args, a.dest) is not None}
elif args.testbed == 'jfed':
import rumba.testbeds.jfed as jfed
test_class = jfed.Testbed
testbed_args = {a.dest: getattr(args, a.dest)
- for a in jfed_p._actions if a.dest != 'help'}
+ for a in jfed_p._actions
+ if a.dest != 'help'
+ and getattr(args, a.dest) is not None}
elif args.testbed == 'qemu':
import rumba.testbeds.qemu as qemu
test_class = qemu.Testbed
testbed_args = {a.dest: getattr(args, a.dest)
- for a in qemu_p._actions if a.dest != 'help'}
+ for a in qemu_p._actions
+ if a.dest != 'help'
+ and getattr(args, a.dest) is not None}
elif args.testbed == 'fake':
import rumba.testbeds.faketestbed as fake
test_class = fake.Testbed
testbed_args = {a.dest: getattr(args, a.dest)
- for a in fake_p._actions if a.dest != 'help'}
+ for a in fake_p._actions
+ if a.dest != 'help'
+ and getattr(args, a.dest) is not None}
else:
if args.testbed is None:
print('Testbed type must be specified!')