#!/usr/bin/python3 # import argparse import shutil import logging import random import os import multiprocessing import threading import time import json import gzip import pickle import tempfile # import stem.control import stem.descriptor.remote import stem.process # import numa import log_system_usage import chutney_manager import throughput_server import experiment_client import useful # class DummyEnterExit: def __enter__(self): return self # def __exit__(self, exc_type, exc_val, exc_tb): pass # # class Experiment: def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client, num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder, buffer_len=None, wait_range=None, measureme=False, test_network=True): self.save_data_path = save_data_path self.measureme_log_path = measureme_log_path self.num_bytes = num_bytes self.num_streams_per_client = num_streams_per_client self.num_clients = num_clients self.num_guards = num_guards self.num_authorities = num_authorities self.num_exits = num_exits self.circuit_generator_builder = circuit_generator_builder self.buffer_len = buffer_len self.wait_range = wait_range self.measureme = measureme self.test_network = test_network # self.chutney_path = '/home/sengler/code/measureme/chutney' self.tor_path = '/home/sengler/code/measureme/tor' self.server_address = ('127.0.0.1', 12353) # self.nodes = None self.proxy_control_ports = None # self.configure_chutney() # if save_data_path is not None: with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f: settings = {} settings['save_data_path'] = self.save_data_path settings['num_bytes'] = self.num_bytes settings['num_streams_per_client'] = self.num_streams_per_client settings['num_clients'] = self.num_clients settings['num_guards'] = self.num_guards settings['num_authorities'] = self.num_authorities settings['num_exits'] = self.num_exits settings['buffer_len'] = self.buffer_len settings['wait_range'] = self.wait_range settings['measureme'] = self.measureme settings['chutney_path'] = self.chutney_path settings['tor_path'] = self.tor_path settings['server_address'] = self.server_address # json.dump(settings, f) # # # def configure_chutney(self): self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \ [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \ [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \ [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)] # for node in self.nodes: if self.measureme_log_path is not None: node.options['measureme_log_dir'] = measureme_log_path # # numa_remaining = numa.get_numa_overview() numa_sets = {} for (node, index) in zip(self.nodes, range(len(self.nodes))): num_cpus = node.options['num_cpus'] if num_cpus%2 != 0: num_cpus += 1 # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining) node.options['numa_settings'] = (numa_node, processors) numa_sets[node.guess_nickname(index)] = (numa_node, processors) # #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y]) # #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))] self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()] # TODO: ^^ improve this # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f: pickle.dump(numa_sets, f, protocol=4) # # # def start_chutney(self, next_action=None): # (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-') try: with os.fdopen(fd, mode='w') as f: f.write(chutney_manager.create_chutney_config(self.nodes)) # try: chutney_network = None num_attempts = 0 while chutney_network is None: try: num_attempts += 1 verification_rounds = 1 if self.test_network else 0 chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds) except KeyboardInterrupt: raise except: logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts)) # # logging.debug('Last 40 lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-40:])) #with chutney_network as net: with chutney_network: nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))] fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames} # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f: pickle.dump(fingerprints, f, protocol=4) # # if next_action is not None: next_action() # # finally: if self.measureme_log_path is not None: for f in os.listdir(self.measureme_log_path): shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f)) # shutil.rmtree(self.measureme_log_path) # # finally: if self.save_data_path is not None: shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file))) # os.remove(tmp_network_file) # # def start_throughput_server(self, next_action=None): stop_event = multiprocessing.Event() server = throughput_server.ThroughputServer(self.server_address, stop_event) def server_run_wrapper(): try: server.run() except KeyboardInterrupt: logging.info('Stopping server (KeyboardInterrupt)') # # p = multiprocessing.Process(target=server_run_wrapper) p.start() # try: if next_action is not None: next_action() # finally: stop_event.set() # p.join() # if self.save_data_path is not None: with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f: pickle.dump([x['results'] for x in server.results], f, protocol=4) # # # def start_system_logging(self, next_action=None): stop_cpu_logging_event = multiprocessing.Event() p = multiprocessing.Process(target=log_system_usage.log_cpu_stats, args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event)) p.start() # try: if next_action is not None: next_action() # finally: stop_cpu_logging_event.set() # p.join() # def start_throughput_clients(self): logging.debug('Getting consensus') try: consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),)) except Exception as e: raise Exception('Unable to retrieve the consensus') from e # circuit_generator = self.circuit_generator_builder(consensus, self.server_address) # proxy_addresses = [] for control_port in self.proxy_control_ports: proxy = {} proxy['control'] = ('127.0.0.1', control_port) proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port)) proxy_addresses.append(proxy) # controllers = [] protocol_manager = experiment_client.ExperimentProtocolManager() # try: for proxy_address in proxy_addresses: controller = experiment_client.ExperimentController(proxy_address['control']) controller.connect() # the controller has to attach new streams to circuits, so the # connection has to stay open until we're done creating streams # for _ in range(self.num_streams_per_client): # make a circuit for each stream controller.build_circuit(circuit_generator) time.sleep(0.5) # controllers.append(controller) # start_event = multiprocessing.Event() # used_measureme_ids = set() for stream_index in range(self.num_streams_per_client): for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers): if self.measureme: measureme_id = stream_index*len(controllers) + controller_index + 1 assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id' used_measureme_ids |= set([measureme_id]) else: measureme_id = None # wait_duration = random.randint(0, self.wait_range) protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'], proxy_address['control'], controller, start_event, wait_duration=wait_duration, measureme_id=measureme_id, num_bytes=self.num_bytes, buffer_len=self.buffer_len) protocol_manager.start_experiment_protocol(protocol, protocol_id=None) # # time.sleep(2) start_event.set() # protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error))) finally: for controller in controllers: controller.disconnect() # protocol_manager.stop() # # # def wait_for_keyboard_interrupt(): try: logging.info('Press Ctrl-C to stop.') while True: time.sleep(30) # except KeyboardInterrupt: print('') # # def build_circuit_generator(consensus, server_address): fingerprints = experiment_client.get_fingerprints(consensus) exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address) non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)) # assert len(exit_fingerprints) == 1, 'Need exactly one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # return lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]] # ''' if __name__ == '__main__': # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput.') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options') args = parser.parse_args() # experiment_time = time.time() # if args.debugging != 'only-chutney': save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time))) os.mkdir(save_data_path) else: save_data_path = None # if args.debugging is not None: measureme_log_path = None else: measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time))) os.mkdir(measureme_log_path) # experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client, args.buffer_len, args.wait_range, args.measureme) # start_time = time.time() # if args.debugging == 'no-chutney': experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)) elif args.debugging == 'only-chutney': experiment.start_chutney(wait_for_keyboard_interrupt) else: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))) # logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60)) # ''' if __name__ == '__main__': # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput.') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options') args = parser.parse_args() # num_clients = 4 num_guards = 6 num_authorities = 2 # will also act as a guard num_exits = 1 # experiment_time = time.time() # if args.debugging != 'only-chutney': base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time))) os.mkdir(base_save_data_path) else: base_save_data_path = None # if args.debugging is not None: measureme_log_path = None else: measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time))) # start_time = time.time() all_data_paths = [] # #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]: for num_streams_per_client in [1, 2, 3, 4, 5]: #for num_streams_per_client in [6,7,8]: logging.info('Starting with {} streams per client'.format(num_streams_per_client)) save_data_path = None # if base_save_data_path is not None: save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients)) all_data_paths.append(save_data_path) os.mkdir(save_data_path) # if measureme_log_path is not None: os.mkdir(measureme_log_path) # experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client, num_clients, num_guards, num_authorities, num_exits, build_circuit_generator, args.buffer_len, args.wait_range, args.measureme) # if args.debugging == 'no-chutney': experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)) elif args.debugging == 'only-chutney': experiment.start_chutney(wait_for_keyboard_interrupt) else: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))) # # logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60)) # import parse_measureme_logs for path in all_data_paths: logging.info('Parsing logs for {}'.format(path)) measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')] # logs = [] for name in measureme_tor_logs: with open(name, 'r') as f: logs.append(parse_measureme_logs.read_log(f)) # # streams = parse_measureme_logs.get_streams_from_logs(logs) # with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f: pickle.dump(streams, f, protocol=4) # # #