123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- #!/usr/bin/python3
- #
- import argparse
- import shutil
- import logging
- import random
- import os
- import multiprocessing
- import threading
- import time
- import json
- import gzip
- import pickle
- import tempfile
- #
- import stem.control
- import stem.descriptor.remote
- import stem.process
- #
- import numa
- import log_system_usage
- import chutney_manager
- import throughput_server
- import experiment_client
- import useful
- #
- class DummyEnterExit:
- def __enter__(self):
- return self
- #
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
- #
- #
- def asdasd():
- server_address = ('127.0.0.1', 12353)
- #
- stop_event = multiprocessing.Event()
- server = throughput_server.ThroughputServer(server_address, stop_event)
- p = multiprocessing.Process(target=server.run)
- p.start()
- #
- try:
- stop_cpu_logging_event = threading.Event()
- t = threading.Thread(target=log_system_usage.log_cpu_stats,
- args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.5, stop_cpu_logging_event))
- t.start()
- #
- try:
- logging.debug('Getting consensus')
- try:
- consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
- except Exception as e:
- raise Exception('Unable to retrieve the consensus') from e
- #
- fingerprints = experiment_client.get_fingerprints(consensus)
- exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
- #
- assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
- #
- proxy_addresses = []
- for control_port in proxy_control_ports:
- proxy = {}
- proxy['control'] = ('127.0.0.1', control_port)
- proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
- proxy_addresses.append(proxy)
- #
- controllers = []
- protocol_manager = experiment_client.ExperimentProtocolManager()
- #
- try:
- for proxy_address in proxy_addresses:
- controller = experiment_client.ExperimentController(proxy_address['control'])
- controller.connect()
- # the controller has to attach new streams to circuits, so the
- # connection has to stay open until we're done creating streams
- #
- for _ in range(args.num_streams_per_client):
- # make a circuit for each stream
- controller.build_circuit(circuit_generator)
- time.sleep(0.5)
- #
- controllers.append(controller)
- #
- start_event = multiprocessing.Event()
- #
- for stream_index in range(args.num_streams_per_client):
- for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
- if args.measureme:
- measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
- else:
- measureme_id = None
- #
- wait_duration = random.randint(0, args.wait_range)
- protocol = experiment_client.build_client_protocol(server_address, proxy_address['socks'],
- proxy_address['control'], controller, start_event,
- wait_duration=wait_duration, measureme_id=measureme_id,
- num_bytes=args.num_bytes, buffer_len=args.buffer_len)
- protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
- #
- #
- time.sleep(2)
- start_event.set()
- #
- protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
- finally:
- for controller in controllers:
- controller.disconnect()
- #
- protocol_manager.stop()
- #
- finally:
- stop_cpu_logging_event.set()
- #
- t.join()
- finally:
- stop_event.set()
- #
- p.join()
- #
- with gzip.GzipFile(os.path.join(save_data_path, 'server_results.pickle.gz'), 'wb') as f:
- pickle.dump([x['results'] for x in server.results], f, protocol=4)
- #
- #
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
- parser.add_argument('--chutney-only', action='store_true', help='only start chutney')
- parser.add_argument('--no-chutney', action='store_true', help='don\'t start chutney')
- args = parser.parse_args()
- #
- chutney_path = '/home/sengler/code/measureme/chutney'
- tor_path = '/home/sengler/code/measureme/tor'
- #
- if not args.chutney_only:
- save_data_path = os.path.join('/home/sengler/data/experiments', str(int(time.time())))
- os.mkdir(save_data_path)
- if not args.no_chutney:
- measureme_log_dir = os.path.join('/ramdisk/sengler/chutney', str(int(time.time())))
- os.mkdir(measureme_log_dir)
- #
- #
- nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(2)] + \
- [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(2)] + \
- [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(1)] + \
- [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(2)]
- #
- numa_remaining = numa.get_numa_overview()
- numa_sets = []
- for node in nodes:
- if not args.chutney_only and not args.no_chutney:
- node.options['measureme_log_dir'] = measureme_log_dir
- #
- num_cpus = node.options['num_cpus']
- if num_cpus%2 != 0:
- num_cpus += 1
- #
- (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
- node.options['numa_settings'] = (numa_node, processors)
- numa_sets.append((numa_node, processors))
- #
- unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
- #
- nicknames = [nodes[x].guess_nickname(x) for x in range(len(nodes))]
- proxy_control_ports = [nodes[x].guess_control_port(x) for x in range(len(nodes)) if ('client', 1) in nodes[x].options.items()]
- #
- (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
- try:
- with os.fdopen(fd, mode='w') as f:
- f.write(chutney_manager.create_chutney_config(nodes))
- #
- if args.no_chutney:
- asdasd()
- else:
- try:
- with chutney_manager.ChutneyNetwork(chutney_path, tor_path, tmp_network_file) as net:
- if args.chutney_only:
- try:
- logging.info('Press Ctrl-C to stop.')
- while True:
- time.sleep(30)
- #
- except KeyboardInterrupt:
- print('')
- #
- else:
- fingerprints = []
- for nick in nicknames:
- fingerprints.append(chutney_manager.read_fingerprint(nick, chutney_path))
- #
- asdasd()
- #
- #
- finally:
- if not args.chutney_only:
- for f in os.listdir(measureme_log_dir):
- shutil.move(os.path.join(measureme_log_dir, f), os.path.join(save_data_path, f))
- #
- shutil.rmtree(measureme_log_dir)
- #
- #
- #
- finally:
- os.remove(tmp_network_file)
- #
- #
|