123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- #!/usr/bin/python3
- #
- import argparse
- import shutil
- import logging
- import random
- import os
- import multiprocessing
- import threading
- import time
- import json
- import gzip
- import pickle
- import tempfile
- #
- import stem.control
- import stem.descriptor.remote
- import stem.process
- import numpy as np
- #
- import numa
- import log_system_usage
- import chutney_manager
- import throughput_server
- import experiment_client
- import useful
- #
- class DummyEnterExit:
- def __enter__(self):
- return self
- #
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
- #
- #
- class RepeatExperimentError(Exception):
- pass
- #
- class Experiment:
- def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
- num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder,
- buffer_len=None, wait_range=None, measureme=False, test_network=True):
- self.save_data_path = save_data_path
- self.measureme_log_path = measureme_log_path
- self.num_bytes = num_bytes
- self.num_streams_per_client = num_streams_per_client
- self.num_clients = num_clients
- self.num_guards = num_guards
- self.num_authorities = num_authorities
- self.num_exits = num_exits
- self.circuit_generator_builder = circuit_generator_builder
- self.buffer_len = buffer_len
- self.wait_range = wait_range
- self.measureme = measureme
- self.test_network = test_network
- #
- self.chutney_path = '/home/sengler/code/measureme/chutney'
- self.tor_path = '/home/sengler/code/measureme/tor'
- self.server_address = ('', 12353)
- #
- self.nodes = None
- self.proxy_control_ports = None
- #
- self.configure_chutney()
- #
- if save_data_path is not None:
- with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f:
- settings = {}
- settings['save_data_path'] = self.save_data_path
- settings['num_bytes'] = self.num_bytes
- settings['num_streams_per_client'] = self.num_streams_per_client
- settings['num_clients'] = self.num_clients
- settings['num_guards'] = self.num_guards
- settings['num_authorities'] = self.num_authorities
- settings['num_exits'] = self.num_exits
- settings['buffer_len'] = self.buffer_len
- settings['wait_range'] = self.wait_range
- settings['measureme'] = self.measureme
- settings['chutney_path'] = self.chutney_path
- settings['tor_path'] = self.tor_path
- settings['server_address'] = self.server_address
- #
- json.dump(settings, f)
- #
- #
- #
- def configure_chutney(self):
- self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
- [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
- [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
- [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)]
- #
- for node in self.nodes:
- if self.measureme_log_path is not None:
- node.options['measureme_log_dir'] = measureme_log_path
- #
- #
- numa_remaining = numa.get_numa_overview()
- numa_sets = {}
- for (node, index) in zip(self.nodes, range(len(self.nodes))):
- num_cpus = node.options['num_cpus']
- if num_cpus%2 != 0:
- num_cpus += 1
- #
- (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
- node.options['numa_settings'] = (numa_node, processors)
- numa_sets[node.guess_nickname(index)] = (numa_node, processors)
- #
- #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
- #
- #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
- self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
- # TODO: ^^ improve this
- #
- if self.save_data_path is not None:
- with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f:
- pickle.dump(numa_sets, f, protocol=4)
- #
- #
- #
- def start_chutney(self, next_action=None):
- #
- (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
- try:
- with os.fdopen(fd, mode='w') as f:
- f.write(chutney_manager.create_chutney_config(self.nodes))
- #
- try:
- chutney_network = None
- num_attempts = 0
- start_time = time.time()
- while chutney_network is None:
- try:
- num_attempts += 1
- verification_rounds = 1 if self.test_network else 0
- chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds)
- except KeyboardInterrupt:
- raise
- except:
- logging.exception('The Chutney network failed to start (attempt {}). Trying again...'.format(num_attempts))
- if num_attempts > 4:
- logging.exception('Just kidding, we\'ve tried too many times.')
- raise
- #
- #
- #
- num_lines_to_print = 50
- time_to_create_network = time.time()-start_time
- logging.debug('Chutney network started in {} seconds ({:.2f} minutes)'.format(round(time_to_create_network), time_to_create_network/60))
- logging.debug('Last '+str(num_lines_to_print)+' lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-num_lines_to_print:]))
- if self.save_data_path is not None:
- with open(os.path.join(self.save_data_path, 'chutney-startup.log'), 'w') as f:
- f.write(chutney_network.startup_output)
- #
- #
- #with chutney_network as net:
- with chutney_network:
- nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
- fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames}
- #
- if self.save_data_path is not None:
- with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f:
- pickle.dump(fingerprints, f, protocol=4)
- #
- #
- if next_action is not None:
- next_action()
- #
- #
- finally:
- if self.measureme_log_path is not None:
- for f in os.listdir(self.measureme_log_path):
- shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f))
- #
- shutil.rmtree(self.measureme_log_path)
- #
- #
- finally:
- if self.save_data_path is not None:
- shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file)))
- #
- os.remove(tmp_network_file)
- #
- #
- def start_throughput_server(self, next_action=None):
- stop_event = multiprocessing.Event()
- server = throughput_server.ThroughputServer(self.server_address, stop_event)
- def server_run_wrapper():
- try:
- server.run()
- except KeyboardInterrupt:
- logging.info('Stopping server (KeyboardInterrupt)')
- #
- #
- p = multiprocessing.Process(target=server_run_wrapper)
- p.start()
- #
- try:
- if next_action is not None:
- next_action()
- #
- finally:
- stop_event.set()
- #
- p.join()
- #
- results = [x['results'] for x in server.results]
- results_brief = []
- #
- for r in results:
- to_add = {}
- to_add['first_byte'] = r['deltas']['timestamps'][0]
- to_add['last_byte'] = r['deltas']['timestamps'][-1]
- to_add['data_size'] = r['data_size']
- to_add['measured_data_size'] = int(np.sum(r['deltas']['bytes']))
- to_add['custom_data'] = json.loads(r['custom_data'].decode('utf-8'))
- to_add['time_started_push'] = r['time_started_push']
- results_brief.append(to_add)
- #
- num_expected_results = len(self.proxy_control_ports)*self.num_streams_per_client
- #
- threshold = 0.95
- if len(results)/num_expected_results < threshold:
- logging.warn('Less than {}% of streams completed: {}/{}'.format(round(threshold*100), len(results), num_expected_results))
- raise RepeatExperimentError
- #
- if self.save_data_path is not None:
- logging.info('Starting to save server results...')
- with open(os.path.join(self.save_data_path, 'server_results_brief.json'), 'w') as f:
- json.dump(results_brief, f)
- #
- with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
- pickle.dump(results, f, protocol=4)
- #
- #
- if len(results) > 0:
- avg_data_size = sum([x['data_size'] for x in results])/len(results)
- avg_transfer_rate = sum([x['transfer_rate'] for x in results])/len(results)
- time_of_first_byte = min([x['time_of_first_byte'] for x in results])
- time_of_last_byte = max([x['time_of_last_byte'] for x in results])
- total_transfer_rate = sum([x['data_size'] for x in results])/(time_of_last_byte-time_of_first_byte)
- #
- logging.info('Group size: %d/%d', len(results), num_expected_results)
- logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
- logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
- logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
- #
- #
- def start_system_logging(self, next_action=None):
- stop_cpu_logging_event = multiprocessing.Event()
- p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
- args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.5, [], stop_cpu_logging_event))
- p.start()
- #
- try:
- if next_action is not None:
- next_action()
- #
- finally:
- stop_cpu_logging_event.set()
- #
- p.join()
- #
- def start_throughput_clients(self):
- circuit_generator = None
- consensus_attempts_remaining = 10
- num_expecting_relays = len([x for x in range(len(self.nodes)) if ('client', 1) not in self.nodes[x].options.items()])
- while circuit_generator is None and consensus_attempts_remaining > 0:
- logging.debug('Getting consensus')
- try:
- consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('', 10000),)).run()
- except Exception as e:
- raise Exception('Unable to retrieve the consensus') from e
- #
- num_relays = len([1 for desc in consensus])
- logging.info('Got consensus with {}/{} descriptors'.format(num_relays, num_expecting_relays))
- #
- if num_relays != num_expecting_relays:
- logging.info('Not enough descriptors, trying again in 20 seconds...')
- time.sleep(20)
- else:
- try:
- circuit_generator = self.circuit_generator_builder(consensus, self.server_address)
- except AssertionError:
- logging.exception('Problem with the consensus, trying again in 10 seconds...')
- time.sleep(10)
- #
- #
- consensus_attempts_remaining -= 1
- #
- assert circuit_generator is not None, 'Could not build the circuit generator'
- #
- proxy_addresses = []
- for control_port in self.proxy_control_ports:
- proxy = {}
- proxy['control'] = ('', control_port)
- proxy['socks'] = ('', experiment_client.get_socks_port(control_port))
- proxy_addresses.append(proxy)
- #
- controllers = []
- protocol_manager = experiment_client.ExperimentProtocolManager()
- #
- client_info = {}
- client_info['clients'] = []
- #
- circuit_counter = 0
- try:
- for proxy_address in proxy_addresses:
- controller = experiment_client.ExperimentController(proxy_address['control'])
- controller.connect()
- # the controller has to attach new streams to circuits, so the
- # connection has to stay open until we're done creating streams
- #
- for _ in range(self.num_streams_per_client):
- # make a circuit for each stream
- controller.build_circuit(circuit_generator, circuit_counter)
- circuit_counter += 1
- #time.sleep(0.05)
- #
- controllers.append(controller)
- #
- start_event = multiprocessing.Event()
- #
- #used_measureme_ids = set()
- measureme_id_counter = 1
- for stream_index in range(self.num_streams_per_client):
- for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
- #if self.measureme:
- # measureme_id = stream_index*len(controllers) + controller_index + 1
- # assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
- # used_measureme_ids |= set([measureme_id])
- #else:
- # measureme_id = None
- measureme_id = measureme_id_counter
- measureme_id_counter += 1
- #
- wait_duration = random.randint(0, self.wait_range)
- protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
- proxy_address['control'], controller, start_event, self.measureme,
- wait_duration=wait_duration, measureme_id=measureme_id,
- num_bytes=self.num_bytes, buffer_len=self.buffer_len)
- protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
- #
- client_info['clients'].append({'measureme_id':measureme_id, 'wait_duration':wait_duration})
- #
- #
- time.sleep(2)
- client_info['start_time'] = time.time()
- start_event.set()
- #
- # unfortunately mixing threads and processes can cause python to deadlock, and some client protocols
- # have been found to deadlock at about 1/1000 probability, so we must provide a timeout to kill
- # these deadlocked protocol processes
- # see: https://codewithoutrules.com/2018/09/04/python-multiprocessing/
- protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)), kill_timeout=20*60)
- logging.debug('Client protocols have finished')
- finally:
- for controller in controllers:
- controller.disconnect()
- #
- logging.debug('Protocol manager stopping...')
- protocol_manager.stop()
- logging.debug('Protocol manager has finished')
- #
- if self.save_data_path is not None:
- with gzip.GzipFile(os.path.join(self.save_data_path, 'client_info.pickle.gz'), 'wb') as f:
- pickle.dump(client_info, f, protocol=4)
- #
- #
- #
- #
- def wait_for_keyboard_interrupt():
- try:
- logging.info('Press Ctrl-C to stop.')
- while True:
- time.sleep(30)
- #
- except KeyboardInterrupt:
- print('')
- #
- #
- def build_circuit_generator(consensus, server_address):
- fingerprints = experiment_client.get_fingerprints(consensus)
- exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
- #
- assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- return lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
- #
- '''
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
- parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
- args = parser.parse_args()
- #
- experiment_time = time.time()
- #
- if args.debugging != 'only-chutney':
- save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
- os.mkdir(save_data_path)
- else:
- save_data_path = None
- #
- if args.debugging is not None:
- measureme_log_path = None
- else:
- measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
- os.mkdir(measureme_log_path)
- #
- experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client,
- args.buffer_len, args.wait_range, args.measureme)
- #
- start_time = time.time()
- #
- if args.debugging == 'no-chutney':
- experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
- elif args.debugging == 'only-chutney':
- experiment.start_chutney(wait_for_keyboard_interrupt)
- else:
- experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
- #
- logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
- #
- '''
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
- parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
- args = parser.parse_args()
- #
- num_clients = 4
- num_guards = 6
- num_authorities = 2 # will also act as a guard
- num_exits = 1
- #
- experiment_time = time.time()
- #
- if args.debugging != 'only-chutney':
- base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
- os.mkdir(base_save_data_path)
- else:
- base_save_data_path = None
- #
- if args.debugging is not None:
- measureme_log_path = None
- else:
- measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
- #
- start_time = time.time()
- all_data_paths = []
- #
- #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]:
- for num_streams_per_client in [1, 2, 3, 4, 5]:
- #for num_streams_per_client in [6,7,8]:
- logging.info('Starting with {} streams per client'.format(num_streams_per_client))
- save_data_path = None
- #
- if base_save_data_path is not None:
- save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients))
- all_data_paths.append(save_data_path)
- os.mkdir(save_data_path)
- #
- if measureme_log_path is not None:
- os.mkdir(measureme_log_path)
- #
- experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
- num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
- args.buffer_len, args.wait_range, args.measureme)
- #
- if args.debugging == 'no-chutney':
- experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
- elif args.debugging == 'only-chutney':
- experiment.start_chutney(wait_for_keyboard_interrupt)
- else:
- experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
- #
- #
- logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
- #
- import parse_measureme_logs
- for path in all_data_paths:
- logging.info('Parsing logs for {}'.format(path))
- measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')]
- #
- logs = []
- for name in measureme_tor_logs:
- with open(name, 'r') as f:
- logs.append(parse_measureme_logs.read_log(f))
- #
- #
- streams = parse_measureme_logs.get_streams_from_logs(logs)
- #
- with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f:
- pickle.dump(streams, f, protocol=4)
- #
- #
- #