#!/usr/bin/python3 # import argparse import shutil import logging import random import os import subprocess import multiprocessing import threading import time import json import gzip import pickle import tempfile import collections import gc # import stem.control import stem.descriptor.remote import stem.process # import numa import log_system_usage import chutney_manager import throughput_server import experiment_client import experiment import useful # #remote_name = 'sengler-rpi' #remote_name = 'cluck2' #remote_name = None # class CustomExperiment(experiment.Experiment): def __init__(self, use_helgrind, target_tor, target_ld_preload, num_additional_eventloops, remote_name, remote_options, *args, **kwargs): self.use_helgrind = use_helgrind self.target_tor = target_tor self.target_ld_preload = target_ld_preload self.num_additional_eventloops = num_additional_eventloops self.remote_name = remote_name self.remote_options = remote_options super().__init__(*args, **kwargs) # self.chutney_path = '/root/code/working/chutney' self.tor_path = '/root/code/dev/tor-0.4.2.6-fixed-controller' # def configure_chutney(self): #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \ # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \ # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \ # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)] # #target_tor_path = '/root/code/working/tor/src/app/tor' #target_tor_path = '/root/code/releases/tor-0.4.2.5/src/app/tor' ''' if self.remote_name == 'cluck2': local_ip = '172.19.156.16' target_ip = '172.19.156.136' #local_ip = '129.97.119.196' #target_ip = '129.97.119.226' target_hostname = 'cluck2' target_dir = '/tmp/chutney-net' # elif self.remote_name == 'cluck15': local_ip = '172.19.156.16' target_ip = '172.19.156.188' target_hostname = '129.97.119.239' target_dir = '/tmp/chutney-net' # elif self.remote_name == 'grunt3': #local_ip = '172.19.156.16' #target_ip = '172.19.156.44' #local_ip = '129.97.119.196' #target_ip = '129.97.119.203' target_hostname = '129.97.119.203' target_dir = '/tmp/chutney-net' # elif self.remote_name == 'sengler-rpi': local_ip = '129.97.119.196' target_ip = '129.97.169.9' target_hostname = target_ip target_dir = '/tmp/chutney-net' # elif self.remote_name is None: local_ip = None target_ip = None target_hostname = None target_dir = None else: raise Exception('hostname not known') # ''' target_optional_args = {} if self.target_tor is not None: target_optional_args['tor'] = self.target_tor if self.use_helgrind: target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes'] if self.target_ld_preload is not None: target_optional_args['add_environ_vars'] = {'LD_PRELOAD': self.target_ld_preload} #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'} #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'} #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/root/build/lib/libtcmalloc_and_profiler.so'} #target_optional_args['add_environ_vars'] = {'EVENT_NOEPOLL': '', 'EVENT_SHOW_METHOD': ''} if self.remote_options['target_ip'] is not None: target_optional_args['ip'] = self.remote_options['target_ip'] if self.remote_options['target_ssh'] is not None: target_optional_args['remote_hostname'] = self.remote_options['target_ssh'] if self.remote_options['target_chutney_net_dir'] is not None: target_optional_args['remote_net_dir'] = self.remote_options['target_chutney_net_dir'] if self.remote_options.get('target_numa_settings', None) is not None: target_optional_args['numa_settings'] = self.remote_options['target_numa_settings'] # target_optional_args['num_cpus'] = 2 # make sure it can process onion skins fast enough, and keep it consistent between computers # tor actually uses one more worker thread than what you ask for target_optional_args['num_additional_eventloops'] = self.num_additional_eventloops target_optional_args['dircache'] = False # the voting interval is 40 seconds which puts an unrealistic workload on the target, so we disable it if 'target_cpu_prof' in self.remote_options: target_cpu_prof = self.remote_options['target_cpu_prof'] else: target_cpu_prof = False # target_daemon = False target_log_throughput = True target_logs = ['notice'] #other_logs = ['info', 'notice'] other_logs = ['notice'] #if self.use_helgrind: # valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes'] #else: # valgrind_settings = None # self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=other_logs) for _ in range(self.num_authorities)] + \ [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=other_logs) for _ in range(self.num_guards)] + \ [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl', log_throughput=target_log_throughput, daemon=target_daemon, log_files=target_logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \ [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=other_logs) for _ in range(self.num_exits)] + \ [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=other_logs) for _ in range(self.num_clients)] # for node in self.nodes: if not 'num_cpus' in node.options: node.options['num_cpus'] = 2 # if not 'ip' in node.options and self.remote_options['local_ip'] is not None: node.options['ip'] = self.remote_options['local_ip'] # # #numa_remaining = numa.get_numa_overview() #for (node, index) in zip(self.nodes, range(len(self.nodes))): # num_cpus = node.options['num_cpus'] # if num_cpus%2 != 0: # num_cpus += 1 # # # if node.options['tag'] == 'target': # num_cpus = max(num_cpus, 6) # # # #if node.options['tag'] != 'target': # # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining) # # node.options['numa_settings'] = (numa_node, processors) # # ## self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()] # TODO: ^^ improve this # def start_remote_logging(self, next_action=None): if self.remote_name is None: # running locally if next_action is not None: next_action() # return # local_script_path = 'log_system_usage.py' remote_script_path = '/tmp/log_system_usage.py' remote_save_path = '/tmp/cpu-usage.pickle.gz' local_save_path = os.path.join(self.save_data_path, 'remote-cpu-usage.pickle.gz') # tor_pids = subprocess.check_output(['ssh', self.remote_name, 'pgrep tor']).decode('utf-8').split() tor_pids = [pid for pid in tor_pids] logging.info('Logging the following pids on {}: {}'.format(self.remote_name, tor_pids)) command = 'python3 {} --interval 0.2 --pids {} {}'.format(remote_script_path, ','.join(tor_pids), remote_save_path) # try: subprocess.check_output(['scp', local_script_path, '{}:{}'.format(self.remote_name, remote_script_path)], stderr=subprocess.STDOUT) p = subprocess.Popen(['ssh', self.remote_name, command]) # time.sleep(5) # wait a few seconds to make sure it doesn't exit immediately if p.poll() != None: raise Exception('Remote CPU monitoring script exited immediately') # if self.remote_options.get('has_sudo', False) is True: try: subprocess.check_output(['ssh', self.remote_name, 'sudo renice -n -10 -g "$(pgrep --full --exact "{}")"'.format(command)], stderr=subprocess.STDOUT) # need to set the niceness for the process group, not just the process in order to also apply to threads except: logging.warn('Could not set the nice value for the remote python script, ignoring...') # # if next_action is not None: next_action() time.sleep(5) # wait a few seconds so that we have extra data # this may be useful if we need to do averaging # if p.poll() != None: raise Exception('Remote CPU monitoring script exited before it was supposed to') # finally: try: subprocess.check_output(['ssh', self.remote_name, 'pkill --full --exact --signal sigint \'{}\''.format(command)], stderr=subprocess.STDOUT) except: logging.warn('Could not kill remote python script') # try: p.wait(timeout=30) except subprocess.TimeoutExpired: p.kill() logging.warn('Process did not end as expected, so sent a SIGKILL') except: logging.warn('Could not kill') # try: subprocess.check_output(['scp', '{}:{}'.format(self.remote_name, remote_save_path), local_save_path], stderr=subprocess.STDOUT) except: logging.warn('Failed to get remote \'{}\' data file'.format(remote_save_path)) # try: subprocess.check_output(['ssh', self.remote_name, 'rm', remote_save_path], stderr=subprocess.STDOUT) except: logging.warn('Failed to delete remote \'{}\' data file'.format(remote_save_path)) # try: subprocess.check_output(['ssh', self.remote_name, 'rm', remote_script_path], stderr=subprocess.STDOUT) except: logging.warn('Failed to delete remote \'{}\' script file'.format(remote_script_path)) # # # # def build_circuit_generator(consensus, server_address): fingerprints = [desc.nickname for desc in consensus] exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)] authority_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('a')] # target_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('target')] assert len(target_fingerprints) >= 1, 'No target relay in the consensus' non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set(target_fingerprints)-set(authority_fingerprints)) # assert len(exit_fingerprints) >= 1, 'Need at least one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # non_exit_fingerprints = sorted(non_exit_fingerprints) target_fingerprints = sorted(target_fingerprints) exit_fingerprints = sorted(exit_fingerprints) # try to get reproducible behavior # #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)] return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprints[gen_id%len(target_fingerprints)], exit_fingerprints[gen_id%len(exit_fingerprints)]] ''' fingerprints = [desc.fingerprint for desc in consensus] exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)] # target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0] non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint])) # assert len(exit_fingerprints) >= 1, 'Need at least one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)] return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]] ''' # def existing_file(path): if not os.path.isfile(path): raise argparse.ArgumentTypeError('The file path is not valid') return path # if __name__ == '__main__': # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput.') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--target-tor', type=existing_file, default=None, help='use a different tor binary for the target', metavar='tor-path') parser.add_argument('--helgrind', action='store_true', help='log helgrind data') args = parser.parse_args() # #experiment_dir = '/var/ssd-raid/sengler/data/experiments' experiment_dir = '/results' # experiment_time = time.time() #base_save_data_path = os.path.join('/root/data/experiments', str(int(experiment_time))) base_save_data_path = os.path.join(experiment_dir, str(int(experiment_time))) os.mkdir(base_save_data_path) # measureme_log_path = None measureme = False # start_time = time.time() # #tors = {'working':'/root/code/working/tor/src/app/tor', 'working-without':'/root/code/working/tor-without-tcmalloc/src/app/tor', 'dev-without':'/root/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor', 'dev-with':'/root/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'} #####tors = collections.OrderedDict() #####tors['working'] = '/root/code/working/tor/src/app/tor' #####tors['working-without'] = '/root/code/working/tor-without-tcmalloc/src/app/tor' #####tors['dev-with'] = '/root/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor' #####tors['dev-without'] = '/root/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor' ######hosts = ['sengler-rpi', 'cluck2'] #####hosts = ['grunt3'] ######hosts = ['sengler-rpi'] #####num_repetitions = 15 #####nums_additional_eventloops_options = [0, 1, 2, 3] ######nums_additional_eventloops_options = [3, 2, 1, 0] #tcmalloc_ld_preload = '/root/build/lib/libtcmalloc_and_profiler.so' tcmalloc_ld_preload = '/root/build/lib/libtcmalloc.so' jemalloc_ld_preload = '/root/build/lib/libjemalloc.so' tors = collections.OrderedDict() tors['multi-tcmalloc'] = ('/root/code/working/tor/src/app/tor', tcmalloc_ld_preload) tors['multi-jemalloc'] = ('/root/code/working/tor/src/app/tor', jemalloc_ld_preload) tors['multi-none'] = ('/root/code/working/tor/src/app/tor', None) tors['vanilla-tcmalloc'] = ('/root/code/dev/tor-0.4.2.6-throughput-log/src/app/tor', tcmalloc_ld_preload) tors['vanilla-jemalloc'] = ('/root/code/dev/tor-0.4.2.6-throughput-log/src/app/tor', jemalloc_ld_preload) tors['vanilla-none'] = ('/root/code/dev/tor-0.4.2.6-throughput-log/src/app/tor', None) configurations = {} configurations['broken'] = {'num_clients': 150, 'num_guards': 30, # number of relays (including guards) 'num_authorities': 2, # will also act as a relay or guard 'num_exits': 30, # will be used only as an exit 'num_streams_per_client': 10, 'num_bytes': 20*(2**20)} configurations['full-server'] = {'num_clients': 150, 'num_guards': 300, # number of relays (including guards) 'num_authorities': 3, # will also act as a relay or guard 'num_exits': 300, # will be used only as an exit 'num_streams_per_client': 10, 'num_bytes': 10*(2**20)} configurations['small-server'] = {'num_clients': 100, 'num_guards': 300, # number of relays (including guards) 'num_authorities': 3, # will also act as a relay or guard 'num_exits': 300, # will be used only as an exit 'num_streams_per_client': 6, 'num_bytes': 5*(2**20)} configurations['tiny'] = {'num_clients': 10, 'num_guards': 10, # number of relays (including guards) 'num_authorities': 3, # will also act as a relay or guard 'num_exits': 10, # will be used only as an exit 'num_streams_per_client': 1, 'num_bytes': 5*(2**20)} remotes = collections.OrderedDict() remotes['clack1'] = {'local_ip': '192.168.1.203', 'target_ip': '192.168.1.102', 'target_ssh': '192.168.1.102', 'target_chutney_net_dir': '/tmp/chutney-net', 'target_numa_settings': (0, [0, 32, 2, 34, 4, 36])} # 'target_cpu_prof': True} remotes['sengler-rpi'] = {'local_ip': '129.97.119.248', 'target_ip': '129.97.169.9', 'target_ssh': '129.97.169.9', 'target_chutney_net_dir': '/tmp/chutney-net', 'has_sudo': True} remotes['localhost'] = {'local_ip': '127.0.0.1', 'target_ip': '127.0.0.1', 'target_ssh': '127.0.0.1', 'target_chutney_net_dir': '/tmp/chutney-net'} experiments = [('clack1', 'full-server'), ('sengler-rpi', 'small-server')] num_repetitions = 10 nums_additional_eventloops_options = [0, 1, 2, 3] #nums_additional_eventloops_options = [3, 2, 1, 0] # #tors = {'working':'/root/code/working/tor/src/app/tor', 'dev-without':'/root/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'} #hosts = ['cluck2'] #num_repetitions = 1 #nums_additional_eventloops_options = [0, 1, 2, 3] # #tors = {'dev-debug-stall':'/root/code/dev/tor-throughput-log-0.4.2.6-debug-stall/src/app/tor'} #tors = {'dev-without':'/root/code/dev/tor-throughput-log-0.4.2.6-test-kist-changes/src/app/tor'} #tors = {'dev-without':'/root/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'} #hosts = ['cluck2'] #num_repetitions = 5 #nums_additional_eventloops_options = [3, 2, 1, 0] # try: for repeat in range(num_repetitions): for (remote_name, configuration_name) in experiments: remote_options = remotes[remote_name] configuration_options = configurations[configuration_name] # for (tor_name, (tor_path, tor_ld_preload)) in tors.items(): num_clients = configuration_options['num_clients'] num_guards = configuration_options['num_guards'] num_authorities = configuration_options['num_authorities'] num_exits = configuration_options['num_exits'] num_streams_per_client = configuration_options['num_streams_per_client'] num_bytes = configuration_options['num_bytes'] # #num_clients = 4 #num_guards = 6 # number of relays (including guards) #num_authorities = 2 # will also act as a relay or guard #num_exits = 8 # will be used only as an exit #num_streams_per_client = 1 #if True: # num_clients = 4 # num_guards = 10 # number of relays (including guards) # num_authorities = 2 # will also act as a relay or guard # num_exits = 12 # will be used only as an exit # num_streams_per_client = 3 # num_bytes = 20*(2**20) ''' if remote_name == 'cluck2': num_clients = 150 num_guards = 30 # number of relays (including guards) num_authorities = 2 # will also act as a relay or guard num_exits = 30 # will be used only as an exit num_streams_per_client = 10 num_bytes = 20*(2**20) elif remote_name == 'cluck15': num_clients = 150 num_guards = 30 # number of relays (including guards) num_authorities = 2 # will also act as a relay or guard num_exits = 30 # will be used only as an exit num_streams_per_client = 10 num_bytes = 20*(2**20) elif remote_name == 'grunt3': num_clients = 150 num_guards = 30 # number of relays (including guards) num_authorities = 2 # will also act as a relay or guard num_exits = 30 # will be used only as an exit num_streams_per_client = 10 num_bytes = 20*(2**20) elif remote_name == 'sengler-rpi': num_clients = 100 num_guards = 300 # number of relays (including guards) num_authorities = 3 # will also act as a relay or guard num_exits = 300 # will be used only as an exit num_streams_per_client = 6 num_bytes = 5*(2**20) elif remote_name is None: num_clients = 10 num_guards = 10 # number of relays (including guards) num_authorities = 2 # will also act as a relay or guard num_exits = 12 # will be used only as an exit num_streams_per_client = 5 num_bytes = 20*(2**20) else: raise Exception('remote not known') # ''' nums_additional_eventloops = [0] if tor_name.startswith('multi'): nums_additional_eventloops = nums_additional_eventloops_options # for num_additional_eventloops in nums_additional_eventloops: attempt = 0 while True: attempt_str = '' if attempt == 0 else '_attempt-{}'.format(attempt) save_data_path = os.path.join(base_save_data_path, '{}_{}_{}_{}{}'.format(remote_name, tor_name, num_additional_eventloops, repeat, attempt_str)) os.mkdir(save_data_path) logging.info('Starting on {} using {}-{} ({}, {}), repeat {}, attempt {}'.format(remote_name, tor_name, num_additional_eventloops, tor_path, tor_ld_preload, repeat, attempt)) # #exp = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes, exp = CustomExperiment(args.helgrind, tor_path, tor_ld_preload, num_additional_eventloops, remote_name, remote_options, save_data_path, measureme_log_path, num_bytes, num_streams_per_client, num_clients, num_guards, num_authorities, num_exits, build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False) # def sleep_then_run(duration, func): logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__)) time.sleep(duration) logging.info('Done sleeping') return func() # #import subprocess #p = subprocess.Popen(['ssh', '-t', 'sengler-rpi', 'python3 /tmp/log_system_usage.py /tmp/usage.gz']) # try: exp.start_chutney(lambda: exp.start_throughput_server(lambda: sleep_then_run(120, lambda: exp.start_system_logging(lambda: exp.start_remote_logging(exp.start_throughput_clients))))) except (stem.Timeout, stem.CircuitExtensionFailed, experiment.RepeatExperimentError): tries = 9 attempt += 1 if attempt < tries: logging.exception('Experiment run failed, trying again ({} tries remaining)'.format(tries-attempt)) continue else: raise # # chutney_data_dir = os.getenv('CHUTNEY_DATA_DIR') assert chutney_data_dir is not None # os.system('rm -rf {}/nodes/*/diff-cache'.format(chutney_data_dir)) os.system('rm -rf {}/nodes/*/keys'.format(chutney_data_dir)) os.system('rm -f {}/nodes/*/cached-*'.format(chutney_data_dir)) os.system('rm -f {}/nodes/*/v3-status-votes'.format(chutney_data_dir)) shutil.copytree('{}/nodes'.format(chutney_data_dir), os.path.join(save_data_path, 'nodes')) os.system("ps u | grep 'tor'") os.system('rm -rf {}/*'.format(chutney_data_dir)) break # exp = None gc.collect() # not sure if this is actually useful, but hopefully it reduces memory usage for when we need to fork # # # # except KeyboardInterrupt: logging.info('Stopped (KeyboardInterrupt)') # logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60)) #