diff options
author | Marco Capitani <[email protected]> | 2017-04-05 17:26:35 +0200 |
---|---|---|
committer | Marco Capitani <[email protected]> | 2017-04-05 17:26:35 +0200 |
commit | afc508dbf2c34eedb93d46dc43d5ec284213cbcb (patch) | |
tree | e5056dacd8bc2449cab8e59a19b71eb312b9721d | |
parent | 0018610178892296c23ee2ca72bf05b5b41c23b9 (diff) | |
download | rumba-afc508dbf2c34eedb93d46dc43d5ec284213cbcb.tar.gz rumba-afc508dbf2c34eedb93d46dc43d5ec284213cbcb.zip |
Parallelization and splitting of the script
-rw-r--r-- | rumba/testbeds/qemu.py | 266 |
1 files changed, 189 insertions, 77 deletions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 2f5491a..45e5cc0 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -17,69 +17,135 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301 USA -import multiprocessing +from Queue import Empty +from multiprocessing import Process, Queue, cpu_count +from os import geteuid import rumba.model as mod -from subprocess import Popen +from subprocess import Popen, check_call, CalledProcessError class Testbed(mod.Testbed): - def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="", use_vhost=True): + def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="", + use_vhost=True, qemu_out_folder=""): mod.Testbed.__init__(self, exp_name, username, password, proj_name) self.vms = {} self.shims = [] self.vm_img_path = vm_img_folder self.vhost = use_vhost + self.qemu_folder = qemu_out_folder + self.boot_processes = [] + + @staticmethod + def _run_command_chain(commands, results_queue, error_queue): + """ + Runs (sequentially) the command list. + + On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty. + + :type commands: list + :type results_queue: Queue + :type error_queue: Queue + :param commands: list of commands to execute + :param results_queue: Queue of results of parallel processes + :param error_queue: Queue of error(s) encountered + :return: None + """ + try: + for command in commands: + if not error_queue.empty(): + break + print('DEBUG: executing >> {}'.format(command)) + check_call(command.split()) + + results_queue.put("Command chain ran correctly.") + except CalledProcessError as e: + error_queue.put(str(e)) def create_experiment(self, experiment): """ :type experiment mod.Experiment - :rtype str :param experiment: The experiment running - :return the script to be run to create the experiment's vms """ + if geteuid() != 0: + print("ERROR: QEMU testbed requires root access: please use sudo.") + return # TODO I'd rather raise an appropriate error... + print("[QEMU testbed] swapping in") - command_str = "" # Building bridges and taps - + shim_processes = [] + r_queue = Queue() + e_queue = Queue() + print(experiment.dif_ordering) for shim in experiment.dif_ordering: + command_list = [] if not isinstance(shim, mod.ShimEthDIF): # Nothing to do here continue self.shims.append(shim) - command_str += 'sudo brctl addbr %(br)s\n' \ - 'sudo ip link set %(br)s up\n' \ - '\n' % {'br': shim.name} + command_list += ('sudo brctl addbr %(br)s\n' + 'sudo ip link set %(br)s up' + % {'br': shim.name} + ).split('\n') for node in shim.members: # type:mod.Node name = node.name vm = self.vms.setdefault(name, {'vm': node, 'ports': []}) port_id = len(vm['ports']) + 1 tap_id = '%s.%02x' % (name, port_id) - command_str += 'sudo ip tuntap add mode tap name %(tap)s\n' \ - 'sudo ip link set %(tap)s up\n' \ - 'sudo brctl addif %(br)s %(tap)s\n\n' \ - % {'tap': tap_id, 'br': shim.name} + command_list += ('sudo ip tuntap add mode tap name %(tap)s\n' + 'sudo ip link set %(tap)s up\n' + 'sudo brctl addif %(br)s %(tap)s' + % {'tap': tap_id, 'br': shim.name} + ).split('\n') if shim.link_speed > 0: speed = '%dmbit' % shim.link_speed # Rate limit the traffic transmitted on the TAP interface - command_str += 'sudo tc qdisc add dev %(tap)s handle 1: root ' \ - 'htb default 11\n' \ - 'sudo tc class add dev %(tap)s parent 1: classid ' \ - '1:1 htb rate 10gbit\n' \ - 'sudo tc class add dev %(tap)s parent 1:1 classid ' \ - '1:11 htb rate %(speed)s\n' \ - % {'tap': tap_id, 'speed': speed} + command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root ' + 'htb default 11\n' + 'sudo tc class add dev %(tap)s parent 1: classid ' + '1:1 htb rate 10gbit\n' + 'sudo tc class add dev %(tap)s parent 1:1 classid ' + '1:11 htb rate %(speed)s' + % {'tap': tap_id, 'speed': speed} + ).split('\n') vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id}) # TODO deal with Ip address (shim UDP DIF). + # Avoid stacking processes if one failed before. + if not e_queue.empty(): + break + # Launch commands asynchronously + process = Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue)) + shim_processes.append(process) + process.start() + + # Wait for all processes to be over. + total_processes = len(shim_processes) + max_waiting_time = 2 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + # Check for errors + if not e_queue.empty(): + print('Testbed instantiation failed: {}'.format(str(e_queue.get()))) + return # TODO: again, I would prefer a specific exception + try: + # Check for results + result = r_queue.get(timeout=1) + if result == "Command chain ran correctly.": + over_processes += 1 + print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes)) + except Empty: + max_waiting_time -= 1 + # Building vms - boot_batch_size = max(1, multiprocessing.cpu_count() // 2) + boot_batch_size = max(1, cpu_count() // 2) booting_budget = boot_batch_size boot_backoff = 12 base_port = 2222 @@ -105,22 +171,24 @@ class Testbed(mod.Testbed): host_fwd_str = 'hostfwd=tcp::%(fwdp)s-:22' % vars_dict vars_dict['hostfwdstr'] = host_fwd_str - command_str += 'qemu-system-x86_64 ' + command = 'qemu-system-x86_64 ' # TODO manage non default images - command_str += '-kernel %(vmimgpath)s/bzImage ' \ - '-append "console=ttyS0" ' \ - '-initrd %(vmimgpath)s/rootfs.cpio ' % vars_dict - command_str += '-nographic ' \ - '-display none ' \ - '--enable-kvm ' \ - '-smp 1 ' \ - '-m %(memory)sM ' \ - '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' \ - '-netdev user,id=mgmt,%(hostfwdstr)s ' \ - '-vga std ' \ - '-pidfile rina-%(id)s.pid ' \ - '-serial file:%(vmname)s.log ' \ - % vars_dict + command += ('-kernel %(vmimgpath)s/bzImage ' + '-append "console=ttyS0" ' + '-initrd %(vmimgpath)s/rootfs.cpio ' + % vars_dict) + command += ('-nographic ' + '-display none ' + '--enable-kvm ' + '-smp 1 ' + '-m %(memory)sM ' + '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' + '-netdev user,id=mgmt,%(hostfwdstr)s ' + '-vga std ' + '-pidfile rina-%(id)s.pid ' + '-serial file:%(vmname)s.log ' + % vars_dict + ) del vars_dict @@ -129,63 +197,107 @@ class Testbed(mod.Testbed): mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id']) port['mac'] = mac - command_str += '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' \ - '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' \ - 'downscript=no%(vhost)s ' \ - % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], - 'frontend': vm_frontend, - 'vhost': ',vhost=on' if self.vhost else ''} - - command_str += '&\n' + command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' + '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' + 'downscript=no%(vhost)s ' + % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], + 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''} + ) booting_budget -= 1 if booting_budget <= 0: - command_str += 'sleep %s\n' % boot_backoff - booting_budget = boot_batch_size + pass # TODO manage the backoff time + # command_str += 'sleep %s\n' % boot_backoff + # booting_budget = boot_batch_size + + with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file: + print('DEBUG: executing >> {}'.format(command)) + self.boot_processes.append(Popen(command.split(), stdout=out_file)) + pass vmid += 1 - Popen([command_str], shell=True, executable="/bin/bash").wait() # TODO something more elegant maybe? - return command_str + return - def _make_down_script(self): + def __del__(self): """ :rtype str :return: The script to tear down the experiment """ - command_str = 'kill_qemu() {\n' \ - ' PIDFILE=$1\n' \ - ' PID=$(cat $PIDFILE)\n' \ - ' if [ -n $PID ]; then\n' \ - ' kill $PID\n' \ - ' while [ -n "$(ps -p $PID -o comm=)" ]; do\n' \ - ' sleep 1\n' \ - ' done\n' \ - ' fi\n' \ - '\n' \ - ' rm $PIDFILE\n' \ - '}\n\n' + # TERM qemu processes + for process in self.boot_processes: + process.terminate() - for vm_name, vm in self.vms.items(): - command_str += 'kill_qemu rina-%(id)s.pid\n' % {'id': vm['id']} - - command_str += '\n' + # Wait for them to shut down + for process in self.boot_processes: + process.wait() + port_processes = [] + error_queue = Queue() + results_queue = Queue() for vm_name, vm in self.vms.items(): for port in vm['ports']: tap = port['tap_id'] shim = port['shim'] - command_str += 'sudo brctl delif %(br)s %(tap)s\n' \ - 'sudo ip link set %(tap)s down\n' \ - 'sudo ip tuntap del mode tap name %(tap)s\n\n' \ - % {'tap': tap, 'br': shim.name} + commands = [] + + commands += ('sudo brctl delif %(br)s %(tap)s\n' + 'sudo ip link set %(tap)s down\n' + 'sudo ip tuntap del mode tap name %(tap)s' + % {'tap': tap, 'br': shim.name} + ).split('\n') + process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue)) + port_processes.append(process) + process.start() + + total_processes = len(port_processes) + max_waiting_time = 2 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + # Check for errors + if not error_queue.empty(): + print('Failure while shutting down: {}'.format(str(error_queue.get()))) + over_processes += 1 + try: + # Check for results + result = results_queue.get(timeout=1) + if result == "Command chain ran correctly.": + over_processes += 1 + print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes)) + except Empty: + max_waiting_time -= 1 + + error_queue = Queue() + results_queue = Queue() + shim_processes = [] for shim in self.shims: - command_str += 'sudo ip link set %(br)s down\n' \ - 'sudo brctl delbr %(br)s\n' \ - '\n' % {'br': shim.name} - return command_str + commands = [] + commands += ('sudo ip link set %(br)s down\n' + 'sudo brctl delbr %(br)s' + % {'br': shim.name} + ).split('\n') + process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue)) + shim_processes.append(process) + process.start() - def __del__(self): - script = self._make_down_script() - Popen([script], shell=True, executable="/bin/bash") + total_processes = len(shim_processes) + max_waiting_time = 2 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + # Check for errors + if not error_queue.empty(): + print('Failure while shutting down: {}'.format(str(error_queue.get()))) + over_processes += 1 + try: + # Check for results + result = results_queue.get(timeout=1) + if result == "Command chain ran correctly.": + over_processes += 1 + print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes)) + except Empty: + max_waiting_time -= 1 + + return |