#!/usr/bin/python3 # import argparse import shutil import logging import random import os import multiprocessing import threading import time import json import gzip import pickle import tempfile # import stem.control import stem.descriptor.remote import stem.process import numpy as np # import numa import log_system_usage import chutney_manager import throughput_server import experiment_client import useful # class DummyEnterExit: def __enter__(self): return self # def __exit__(self, exc_type, exc_val, exc_tb): pass # # class RepeatExperimentError(Exception): pass # class Experiment: def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client, num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder, buffer_len=None, wait_range=None, measureme=False, test_network=True): self.save_data_path = save_data_path self.measureme_log_path = measureme_log_path self.num_bytes = num_bytes self.num_streams_per_client = num_streams_per_client self.num_clients = num_clients self.num_guards = num_guards self.num_authorities = num_authorities self.num_exits = num_exits self.circuit_generator_builder = circuit_generator_builder self.buffer_len = buffer_len self.wait_range = wait_range self.measureme = measureme self.test_network = test_network # self.chutney_path = '/home/sengler/code/measureme/chutney' self.tor_path = '/home/sengler/code/measureme/tor' self.server_address = ('127.0.0.1', 12353) # self.nodes = None self.proxy_control_ports = None # self.configure_chutney() # if save_data_path is not None: with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f: settings = {} settings['save_data_path'] = self.save_data_path settings['num_bytes'] = self.num_bytes settings['num_streams_per_client'] = self.num_streams_per_client settings['num_clients'] = self.num_clients settings['num_guards'] = self.num_guards settings['num_authorities'] = self.num_authorities settings['num_exits'] = self.num_exits settings['buffer_len'] = self.buffer_len settings['wait_range'] = self.wait_range settings['measureme'] = self.measureme settings['chutney_path'] = self.chutney_path settings['tor_path'] = self.tor_path settings['server_address'] = self.server_address # json.dump(settings, f) # # # def configure_chutney(self): self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \ [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \ [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \ [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)] # for node in self.nodes: if self.measureme_log_path is not None: node.options['measureme_log_dir'] = measureme_log_path # # numa_remaining = numa.get_numa_overview() numa_sets = {} for (node, index) in zip(self.nodes, range(len(self.nodes))): num_cpus = node.options['num_cpus'] if num_cpus%2 != 0: num_cpus += 1 # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining) node.options['numa_settings'] = (numa_node, processors) numa_sets[node.guess_nickname(index)] = (numa_node, processors) # #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y]) # #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))] self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()] # TODO: ^^ improve this # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f: pickle.dump(numa_sets, f, protocol=4) # # # def start_chutney(self, next_action=None): # (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-') try: with os.fdopen(fd, mode='w') as f: f.write(chutney_manager.create_chutney_config(self.nodes)) # try: chutney_network = None num_attempts = 0 start_time = time.time() while chutney_network is None: try: num_attempts += 1 verification_rounds = 1 if self.test_network else 0 chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds) except KeyboardInterrupt: raise except: logging.exception('The Chutney network failed to start (attempt {}). Trying again...'.format(num_attempts)) if num_attempts > 4: logging.exception('Just kidding, we\'ve tried too many times.') raise # # # num_lines_to_print = 50 time_to_create_network = time.time()-start_time logging.debug('Chutney network started in {} seconds ({:.2f} minutes)'.format(round(time_to_create_network), time_to_create_network/60)) logging.debug('Last '+str(num_lines_to_print)+' lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-num_lines_to_print:])) if self.save_data_path is not None: with open(os.path.join(self.save_data_path, 'chutney-startup.log'), 'w') as f: f.write(chutney_network.startup_output) # # #with chutney_network as net: with chutney_network: nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))] fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames} # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f: pickle.dump(fingerprints, f, protocol=4) # # if next_action is not None: next_action() # # finally: if self.measureme_log_path is not None: for f in os.listdir(self.measureme_log_path): shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f)) # shutil.rmtree(self.measureme_log_path) # # finally: if self.save_data_path is not None: shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file))) # os.remove(tmp_network_file) # # def start_throughput_server(self, next_action=None): stop_event = multiprocessing.Event() server = throughput_server.ThroughputServer(self.server_address, stop_event) def server_run_wrapper(): try: server.run() except KeyboardInterrupt: logging.info('Stopping server (KeyboardInterrupt)') # # p = multiprocessing.Process(target=server_run_wrapper) p.start() # try: if next_action is not None: next_action() # finally: stop_event.set() # p.join() # results = [x['results'] for x in server.results] results_brief = [] # for r in results: to_add = {} to_add['first_byte'] = r['deltas']['timestamps'][0] to_add['last_byte'] = r['deltas']['timestamps'][-1] to_add['data_size'] = r['data_size'] to_add['measured_data_size'] = int(np.sum(r['deltas']['bytes'])) to_add['custom_data'] = json.loads(r['custom_data'].decode('utf-8')) to_add['time_started_push'] = r['time_started_push'] results_brief.append(to_add) # num_expected_results = len(self.proxy_control_ports)*self.num_streams_per_client # threshold = 0.95 if len(results)/num_expected_results < threshold: logging.warn('Less than {}% of streams completed: {}/{}'.format(round(threshold*100), len(results), num_expected_results)) raise RepeatExperimentError # if self.save_data_path is not None: logging.info('Starting to save server results...') with open(os.path.join(self.save_data_path, 'server_results_brief.json'), 'w') as f: json.dump(results_brief, f) # with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f: pickle.dump(results, f, protocol=4) # # if len(results) > 0: avg_data_size = sum([x['data_size'] for x in results])/len(results) avg_transfer_rate = sum([x['transfer_rate'] for x in results])/len(results) time_of_first_byte = min([x['time_of_first_byte'] for x in results]) time_of_last_byte = max([x['time_of_last_byte'] for x in results]) total_transfer_rate = sum([x['data_size'] for x in results])/(time_of_last_byte-time_of_first_byte) # logging.info('Group size: %d/%d', len(results), num_expected_results) logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2)) logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2)) logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2)) # # def start_system_logging(self, next_action=None): stop_cpu_logging_event = multiprocessing.Event() p = multiprocessing.Process(target=log_system_usage.log_cpu_stats, args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.5, [], stop_cpu_logging_event)) p.start() # try: if next_action is not None: next_action() # finally: stop_cpu_logging_event.set() # p.join() # def start_throughput_clients(self): circuit_generator = None consensus_attempts_remaining = 10 num_expecting_relays = len([x for x in range(len(self.nodes)) if ('client', 1) not in self.nodes[x].options.items()]) while circuit_generator is None and consensus_attempts_remaining > 0: logging.debug('Getting consensus') try: consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 10000),)).run() except Exception as e: raise Exception('Unable to retrieve the consensus') from e # num_relays = len([1 for desc in consensus]) logging.info('Got consensus with {}/{} descriptors'.format(num_relays, num_expecting_relays)) # if num_relays != num_expecting_relays: logging.info('Not enough descriptors, trying again in 20 seconds...') time.sleep(20) else: try: circuit_generator = self.circuit_generator_builder(consensus, self.server_address) except AssertionError: logging.exception('Problem with the consensus, trying again in 10 seconds...') time.sleep(10) # # consensus_attempts_remaining -= 1 # assert circuit_generator is not None, 'Could not build the circuit generator' # proxy_addresses = [] for control_port in self.proxy_control_ports: proxy = {} proxy['control'] = ('127.0.0.1', control_port) proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port)) proxy_addresses.append(proxy) # controllers = [] protocol_manager = experiment_client.ExperimentProtocolManager() # client_info = {} client_info['clients'] = [] # circuit_counter = 0 try: for proxy_address in proxy_addresses: controller = experiment_client.ExperimentController(proxy_address['control']) controller.connect() # the controller has to attach new streams to circuits, so the # connection has to stay open until we're done creating streams # for _ in range(self.num_streams_per_client): # make a circuit for each stream controller.build_circuit(circuit_generator, circuit_counter) circuit_counter += 1 #time.sleep(0.05) # controllers.append(controller) # start_event = multiprocessing.Event() # #used_measureme_ids = set() measureme_id_counter = 1 for stream_index in range(self.num_streams_per_client): for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers): #if self.measureme: # measureme_id = stream_index*len(controllers) + controller_index + 1 # assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id' # used_measureme_ids |= set([measureme_id]) #else: # measureme_id = None measureme_id = measureme_id_counter measureme_id_counter += 1 # wait_duration = random.randint(0, self.wait_range) protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'], proxy_address['control'], controller, start_event, self.measureme, wait_duration=wait_duration, measureme_id=measureme_id, num_bytes=self.num_bytes, buffer_len=self.buffer_len) protocol_manager.start_experiment_protocol(protocol, protocol_id=None) # client_info['clients'].append({'measureme_id':measureme_id, 'wait_duration':wait_duration}) # # time.sleep(2) client_info['start_time'] = time.time() start_event.set() # # unfortunately mixing threads and processes can cause python to deadlock, and some client protocols # have been found to deadlock at about 1/1000 probability, so we must provide a timeout to kill # these deadlocked protocol processes # see: https://codewithoutrules.com/2018/09/04/python-multiprocessing/ protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)), kill_timeout=20*60) logging.debug('Client protocols have finished') finally: for controller in controllers: controller.disconnect() # logging.debug('Protocol manager stopping...') protocol_manager.stop() logging.debug('Protocol manager has finished') # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'client_info.pickle.gz'), 'wb') as f: pickle.dump(client_info, f, protocol=4) # # # # def wait_for_keyboard_interrupt(): try: logging.info('Press Ctrl-C to stop.') while True: time.sleep(30) # except KeyboardInterrupt: print('') # # def build_circuit_generator(consensus, server_address): fingerprints = experiment_client.get_fingerprints(consensus) exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address) non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)) # assert len(exit_fingerprints) == 1, 'Need exactly one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # return lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]] # ''' if __name__ == '__main__': # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput.') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options') args = parser.parse_args() # experiment_time = time.time() # if args.debugging != 'only-chutney': save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time))) os.mkdir(save_data_path) else: save_data_path = None # if args.debugging is not None: measureme_log_path = None else: measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time))) os.mkdir(measureme_log_path) # experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client, args.buffer_len, args.wait_range, args.measureme) # start_time = time.time() # if args.debugging == 'no-chutney': experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)) elif args.debugging == 'only-chutney': experiment.start_chutney(wait_for_keyboard_interrupt) else: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))) # logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60)) # ''' if __name__ == '__main__': # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput.') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options') args = parser.parse_args() # num_clients = 4 num_guards = 6 num_authorities = 2 # will also act as a guard num_exits = 1 # experiment_time = time.time() # if args.debugging != 'only-chutney': base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time))) os.mkdir(base_save_data_path) else: base_save_data_path = None # if args.debugging is not None: measureme_log_path = None else: measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time))) # start_time = time.time() all_data_paths = [] # #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]: for num_streams_per_client in [1, 2, 3, 4, 5]: #for num_streams_per_client in [6,7,8]: logging.info('Starting with {} streams per client'.format(num_streams_per_client)) save_data_path = None # if base_save_data_path is not None: save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients)) all_data_paths.append(save_data_path) os.mkdir(save_data_path) # if measureme_log_path is not None: os.mkdir(measureme_log_path) # experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client, num_clients, num_guards, num_authorities, num_exits, build_circuit_generator, args.buffer_len, args.wait_range, args.measureme) # if args.debugging == 'no-chutney': experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)) elif args.debugging == 'only-chutney': experiment.start_chutney(wait_for_keyboard_interrupt) else: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))) # # logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60)) # import parse_measureme_logs for path in all_data_paths: logging.info('Parsing logs for {}'.format(path)) measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')] # logs = [] for name in measureme_tor_logs: with open(name, 'r') as f: logs.append(parse_measureme_logs.read_log(f)) # # streams = parse_measureme_logs.get_streams_from_logs(logs) # with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f: pickle.dump(streams, f, protocol=4) # # #