123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #!/usr/bin/python3
- #
- import argparse
- import shutil
- import logging
- import random
- import os
- import subprocess
- import multiprocessing
- import threading
- import time
- import json
- import gzip
- import pickle
- import tempfile
- import collections
- import gc
- #
- import stem.control
- import stem.descriptor.remote
- import stem.process
- #
- import numa
- import log_system_usage
- import chutney_manager
- import throughput_server
- import experiment_client
- import experiment
- import useful
- #
- #remote_name = 'sengler-rpi'
- #remote_name = 'cluck2'
- #remote_name = None
- #
- class CustomExperiment(experiment.Experiment):
- def __init__(self, use_helgrind, target_tor, num_additional_eventloops, remote_name, *args, **kwargs):
- self.use_helgrind = use_helgrind
- self.target_tor = target_tor
- self.num_additional_eventloops = num_additional_eventloops
- self.remote_name = remote_name
- super().__init__(*args, **kwargs)
- #
- self.chutney_path = '/home/sengler/code/working/chutney'
- #self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
- self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller'
- #self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller-kist-changes'
- #self.tor_path = '/home/sengler/code/working/tor'
- #
- def configure_chutney(self):
- #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
- # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
- # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
- # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
- #
- #target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
- #target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
- if self.remote_name == 'cluck2':
- local_ip = '172.19.156.16'
- target_ip = '172.19.156.136'
- #local_ip = '129.97.119.196'
- #target_ip = '129.97.119.226'
- target_hostname = 'cluck2'
- target_dir = '/tmp/chutney-net'
- elif self.remote_name == 'sengler-rpi':
- local_ip = '129.97.119.196'
- target_ip = '129.97.169.9'
- target_hostname = 'sengler-rpi'
- target_dir = '/tmp/chutney-net'
- elif self.remote_name is None:
- local_ip = None
- target_ip = None
- target_hostname = None
- target_dir = None
- else:
- raise Exception('hostname not known')
- #
- target_optional_args = {}
- if self.target_tor is not None:
- target_optional_args['tor'] = self.target_tor
- if self.use_helgrind:
- target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
- #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
- #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
- #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/home/sengler/build/lib/libtcmalloc_and_profiler.so'}
- #target_optional_args['add_environ_vars'] = {'EVENT_NOEPOLL': '', 'EVENT_SHOW_METHOD': ''}
- if target_ip is not None:
- target_optional_args['ip'] = target_ip
- if target_hostname is not None:
- target_optional_args['remote_hostname'] = target_hostname
- if target_dir is not None:
- target_optional_args['remote_net_dir'] = target_dir
- target_optional_args['num_cpus'] = 2 # make sure it can process onion skins fast enough, and keep it consistent between computers
- # tor actually uses one more worker thread than what you ask for
- target_optional_args['num_additional_eventloops'] = self.num_additional_eventloops
- target_optional_args['dircache'] = False
- # the voting interval is 40 seconds which puts an unrealistic workload on the target, so we disable it
- target_cpu_prof = False #True
- target_daemon = False
- target_log_throughput = True
- target_logs = ['notice']
- #other_logs = ['info', 'notice']
- other_logs = ['notice']
- #if self.use_helgrind:
- # valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
- #else:
- # valgrind_settings = None
- #
- self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=other_logs) for _ in range(self.num_authorities)] + \
- [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=other_logs) for _ in range(self.num_guards)] + \
- [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl', log_throughput=target_log_throughput,
- daemon=target_daemon, log_files=target_logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
- [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=other_logs) for _ in range(self.num_exits)] + \
- [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=other_logs) for _ in range(self.num_clients)]
- #
- for node in self.nodes:
- if not 'num_cpus' in node.options:
- node.options['num_cpus'] = 2
- #
- if not 'ip' in node.options and local_ip is not None:
- node.options['ip'] = local_ip
- #
- #
- #numa_remaining = numa.get_numa_overview()
- #for (node, index) in zip(self.nodes, range(len(self.nodes))):
- # num_cpus = node.options['num_cpus']
- # if num_cpus%2 != 0:
- # num_cpus += 1
- # #
- # if node.options['tag'] == 'target':
- # num_cpus = max(num_cpus, 6)
- # #
- # #if node.options['tag'] != 'target':
- # # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
- # # node.options['numa_settings'] = (numa_node, processors)
- # #
- ##
- self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
- # TODO: ^^ improve this
- #
- def start_remote_logging(self, next_action=None):
- if self.remote_name is None:
- # running locally
- if next_action is not None:
- next_action()
- #
- return
- #
- local_script_path = 'log_system_usage.py'
- remote_script_path = '/tmp/log_system_usage.py'
- remote_save_path = '/tmp/cpu-usage.pickle.gz'
- local_save_path = os.path.join(self.save_data_path, 'remote-cpu-usage.pickle.gz')
- #
- tor_pids = subprocess.check_output(['ssh', self.remote_name, 'pgrep tor']).decode('utf-8').split()
- tor_pids = [pid for pid in tor_pids]
- logging.info('Logging the following pids on {}: {}'.format(self.remote_name, tor_pids))
- command = 'python3 {} --interval 0.2 --pids {} {}'.format(remote_script_path, ','.join(tor_pids), remote_save_path)
- #
- try:
- subprocess.check_output(['scp', local_script_path, '{}:{}'.format(self.remote_name, remote_script_path)], stderr=subprocess.STDOUT)
- p = subprocess.Popen(['ssh', self.remote_name, command])
- #
- time.sleep(5)
- # wait a few seconds to make sure it doesn't exit immediately
- if p.poll() != None:
- raise Exception('Remote CPU monitoring script exited immediately')
- #
- try:
- subprocess.check_output(['ssh', self.remote_name, 'sudo renice -n -10 -g "$(pgrep --full --exact "{}")"'.format(command)], stderr=subprocess.STDOUT)
- # need to set the niceness for the process group, not just the process in order to also apply to threads
- except:
- logging.warn('Could not set the nice value for the remote python script, ignoring...')
- #
- if next_action is not None:
- next_action()
- time.sleep(5)
- # wait a few seconds so that we have extra data
- # this may be useful if we need to do averaging
- #
- if p.poll() != None:
- raise Exception('Remote CPU monitoring script exited before it was supposed to')
- #
- finally:
- try:
- subprocess.check_output(['ssh', self.remote_name, 'pkill --full --exact --signal sigint \'{}\''.format(command)], stderr=subprocess.STDOUT)
- except:
- logging.warn('Could not kill remote python script')
- #
- try:
- p.wait(timeout=30)
- except subprocess.TimeoutExpired:
- p.kill()
- logging.warn('Process did not end as expected, so sent a SIGKILL')
- except:
- logging.warn('Could not kill')
- #
- try:
- subprocess.check_output(['scp', '{}:{}'.format(self.remote_name, remote_save_path), local_save_path], stderr=subprocess.STDOUT)
- except:
- logging.warn('Failed to get remote \'{}\' data file'.format(remote_save_path))
- #
- try:
- subprocess.check_output(['ssh', self.remote_name, 'rm', remote_save_path], stderr=subprocess.STDOUT)
- except:
- logging.warn('Failed to delete remote \'{}\' data file'.format(remote_save_path))
- #
- try:
- subprocess.check_output(['ssh', self.remote_name, 'rm', remote_script_path], stderr=subprocess.STDOUT)
- except:
- logging.warn('Failed to delete remote \'{}\' script file'.format(remote_script_path))
- #
- #
- #
- #
- def build_circuit_generator(consensus, server_address):
- fingerprints = [desc.nickname for desc in consensus]
- exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
- authority_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('a')]
- #
- target_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('target')]
- assert len(target_fingerprints) >= 1, 'No target relay in the consensus'
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set(target_fingerprints)-set(authority_fingerprints))
- #
- assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- non_exit_fingerprints = sorted(non_exit_fingerprints)
- target_fingerprints = sorted(target_fingerprints)
- exit_fingerprints = sorted(exit_fingerprints)
- # try to get reproducible behavior
- #
- #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
- return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprints[gen_id%len(target_fingerprints)], exit_fingerprints[gen_id%len(exit_fingerprints)]]
- '''
- fingerprints = [desc.fingerprint for desc in consensus]
- exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
- #
- target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0]
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
- #
- assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
- return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
- '''
- #
- def existing_file(path):
- if not os.path.isfile(path):
- raise argparse.ArgumentTypeError('The file path is not valid')
- return path
- #
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--target-tor', type=existing_file, default=None,
- help='use a different tor binary for the target', metavar='tor-path')
- parser.add_argument('--helgrind', action='store_true',
- help='log helgrind data')
- args = parser.parse_args()
- #
- experiment_time = time.time()
- #base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
- base_save_data_path = os.path.join('/var/ssd-raid/sengler/data/experiments', str(int(experiment_time)))
- os.mkdir(base_save_data_path)
- #
- measureme_log_path = None
- measureme = False
- #
- start_time = time.time()
- #
- #tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'working-without':'/home/sengler/code/working/tor-without-tcmalloc/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor', 'dev-with':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'}
- tors = collections.OrderedDict()
- tors['working'] = '/home/sengler/code/working/tor/src/app/tor'
- tors['working-without'] = '/home/sengler/code/working/tor-without-tcmalloc/src/app/tor'
- tors['dev-with'] = '/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'
- tors['dev-without'] = '/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'
- hosts = ['sengler-rpi', 'cluck2']
- ###hosts = ['cluck2']
- ###hosts = ['sengler-rpi']
- num_repetitions = 15
- nums_additional_eventloops_options = [0, 1, 2, 3]
- #nums_additional_eventloops_options = [3, 2, 1, 0]
- #
- #tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
- #hosts = ['cluck2']
- #num_repetitions = 1
- #nums_additional_eventloops_options = [0, 1, 2, 3]
- #
- #tors = {'dev-debug-stall':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-debug-stall/src/app/tor'}
- #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-test-kist-changes/src/app/tor'}
- #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
- #hosts = ['cluck2']
- #num_repetitions = 5
- #nums_additional_eventloops_options = [3, 2, 1, 0]
- #
- try:
- for repeat in range(num_repetitions):
- for host in hosts:
- for (tor_name, tor_path) in tors.items():
- #num_clients = 4
- #num_guards = 6 # number of relays (including guards)
- #num_authorities = 2 # will also act as a relay or guard
- #num_exits = 8 # will be used only as an exit
- #num_streams_per_client = 1
- #if True:
- # num_clients = 4
- # num_guards = 10 # number of relays (including guards)
- # num_authorities = 2 # will also act as a relay or guard
- # num_exits = 12 # will be used only as an exit
- # num_streams_per_client = 3
- # num_bytes = 20*(2**20)
- if host == 'cluck2':
- num_clients = 150
- num_guards = 30 # number of relays (including guards)
- num_authorities = 2 # will also act as a relay or guard
- num_exits = 30 # will be used only as an exit
- num_streams_per_client = 10
- num_bytes = 20*(2**20)
- elif host == 'sengler-rpi':
- num_clients = 100
- num_guards = 300 # number of relays (including guards)
- num_authorities = 3 # will also act as a relay or guard
- num_exits = 300 # will be used only as an exit
- num_streams_per_client = 6
- num_bytes = 5*(2**20)
- elif host is None:
- num_clients = 10
- num_guards = 10 # number of relays (including guards)
- num_authorities = 2 # will also act as a relay or guard
- num_exits = 12 # will be used only as an exit
- num_streams_per_client = 5
- num_bytes = 20*(2**20)
- else:
- raise Exception('host not known')
- #
- nums_additional_eventloops = [0]
- if tor_name == 'working' or tor_name == 'working-without':
- nums_additional_eventloops = nums_additional_eventloops_options
- #
- for num_additional_eventloops in nums_additional_eventloops:
- attempt = 0
- while True:
- attempt_str = '' if attempt == 0 else '_attempt-{}'.format(attempt)
- save_data_path = os.path.join(base_save_data_path, '{}_{}_{}_{}{}'.format(host, tor_name, num_additional_eventloops, repeat, attempt_str))
- os.mkdir(save_data_path)
- logging.info('Starting on {} using {}-{} ({}), repeat {}, attempt {}'.format(host, tor_name, num_additional_eventloops, tor_path, repeat, attempt))
- #
- #exp = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
- exp = CustomExperiment(args.helgrind, tor_path, num_additional_eventloops, host, save_data_path,
- measureme_log_path, num_bytes,
- num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
- build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
- #
- def sleep_then_run(duration, func):
- logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
- time.sleep(duration)
- logging.info('Done sleeping')
- return func()
- #
- #import subprocess
- #p = subprocess.Popen(['ssh', '-t', 'sengler-rpi', 'python3 /tmp/log_system_usage.py /tmp/usage.gz'])
- #
- try:
- exp.start_chutney(lambda: exp.start_throughput_server(lambda: sleep_then_run(120, lambda: exp.start_system_logging(lambda: exp.start_remote_logging(exp.start_throughput_clients)))))
- except (stem.Timeout, stem.CircuitExtensionFailed, experiment.RepeatExperimentError):
- tries = 9
- attempt += 1
- if attempt < tries:
- logging.exception('Experiment run failed, trying again ({} tries remaining)'.format(tries-attempt))
- continue
- else:
- raise
- #
- #
- shutil.copytree('/run/user/3271/chutney-net/nodes', os.path.join(save_data_path, 'nodes'))
- os.system("ps u | grep 'tor'")
- os.system("rm -rf /run/user/3271/chutney-net/*")
- break
- #
- exp = None
- gc.collect()
- # not sure if this is actually useful, but hopefully it reduces memory usage for when we need to fork
- #
- #
- #
- #
- except KeyboardInterrupt:
- logging.info('Stopped (KeyboardInterrupt)')
- #
- logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
- #
|