123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- #!/usr/bin/python3
- #
- import argparse
- import shutil
- import logging
- import random
- import os
- import multiprocessing
- import threading
- import time
- import json
- import gzip
- import pickle
- import tempfile
- #
- import stem.control
- import stem.descriptor.remote
- import stem.process
- #
- import numa
- import log_system_usage
- import chutney_manager
- import throughput_server
- import experiment_client
- import experiment
- import useful
- #
- class CustomExperiment(experiment.Experiment):
- def __init__(self, use_helgrind, target_tor, *args, **kwargs):
- self.use_helgrind = use_helgrind
- self.target_tor = target_tor
- super().__init__(*args, **kwargs)
- #
- self.chutney_path = '/home/sengler/code/working/chutney'
- self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
- #
- def configure_chutney(self):
- #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
- # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
- # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
- # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
- #
- #target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
- #target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
- local_ip = '172.19.156.16'
- target_ip = '172.19.156.136'
- target_hostname = 'cluck2'
- target_optional_args = {}
- if self.target_tor is not None:
- target_optional_args['tor'] = self.target_tor
- if self.use_helgrind:
- target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
- #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
- target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
- target_optional_args['ip'] = target_ip
- target_optional_args['remote_hostname'] = target_hostname
- target_cpu_prof = False #True
- target_daemon = False
- logs = ['notice']
- #if self.use_helgrind:
- # valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
- #else:
- # valgrind_settings = None
- #
- self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=logs) for _ in range(self.num_authorities)] + \
- [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=logs) for _ in range(self.num_guards)] + \
- [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl',
- daemon=target_daemon, log_files=logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
- [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=logs) for _ in range(self.num_exits)] + \
- [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=logs) for _ in range(self.num_clients)]
- #
- for node in self.nodes:
- if not 'num_cpus' in node.options:
- node.options['num_cpus'] = 2
- #
- if not 'ip' in node.options:
- node.options['ip'] = local_ip
- #
- #
- numa_remaining = numa.get_numa_overview()
- for (node, index) in zip(self.nodes, range(len(self.nodes))):
- num_cpus = node.options['num_cpus']
- if num_cpus%2 != 0:
- num_cpus += 1
- #
- if node.options['tag'] == 'target':
- num_cpus = max(num_cpus, 6)
- #
- (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
- node.options['numa_settings'] = (numa_node, processors)
- #
- self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
- # TODO: ^^ improve this
- #
- #
- def build_circuit_generator(consensus, server_address):
- fingerprints = [desc.nickname for desc in consensus]
- exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
- #
- target_fingerprint = [desc.nickname for desc in consensus if desc.nickname.endswith('target')][0]
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
- #
- assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
- return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
- '''
- fingerprints = [desc.fingerprint for desc in consensus]
- exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
- #
- target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0]
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
- #
- assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
- return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
- '''
- #
- def existing_file(path):
- if not os.path.isfile(path):
- raise argparse.ArgumentTypeError('The file path is not valid')
- return path
- #
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--target-tor', type=existing_file, default=None,
- help='use a different tor binary for the target', metavar='tor-path')
- parser.add_argument('--helgrind', action='store_true',
- help='log helgrind data')
- args = parser.parse_args()
- #
- #num_clients = 4
- #num_guards = 6 # number of relays (including guards)
- #num_authorities = 2 # will also act as a relay or guard
- #num_exits = 8 # will be used only as an exit
- num_clients = 12
- num_guards = 14 # number of relays (including guards)
- num_authorities = 2 # will also act as a relay or guard
- num_exits = 16 # will be used only as an exit
- #
- experiment_time = time.time()
- #
- save_data_path = None
- measureme_log_path = None
- measureme = False
- #
- start_time = time.time()
- #
- #num_streams_per_client = 1
- num_streams_per_client = 6
- logging.info('Starting with {} streams per client'.format(num_streams_per_client))
- #
- experiment = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
- num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
- build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
- #
- def sleep_then_run(duration, func):
- logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
- time.sleep(duration)
- logging.info('Done sleeping')
- return func()
- #
- try:
- experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
- except KeyboardInterrupt:
- logging.info('Stopped (KeyboardInterrupt)')
- #
- logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
- #
|