123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- #!/usr/bin/python3
- #
- import argparse
- import shutil
- import logging
- import random
- import os
- import multiprocessing
- import threading
- import time
- import json
- import gzip
- import pickle
- import tempfile
- #
- import stem.control
- import stem.descriptor.remote
- import stem.process
- #
- import numa
- import log_system_usage
- import chutney_manager
- import throughput_server
- import experiment_client
- import experiment
- import useful
- #
- class CustomExperiment(experiment.Experiment):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- #
- self.chutney_path = '/home/sengler/code/measureme/chutney'
- self.tor_path = '/home/sengler/code/parallel/tor-single'
- #
- def configure_chutney(self):
- #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
- # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
- # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
- # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
- #
- new_tor_path = '/home/sengler/code/parallel/tor-parallel/src/app/tor'
- valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
- #
- self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
- [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
- [chutney_manager.Node(tag='target', tor=new_tor_path, valgrind_settings=valgrind_settings, relay=1, torrc='relay-non-exit.tmpl')] + \
- [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
- [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
- #
- self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
- # TODO: ^^ improve this
- #
- #
- def build_circuit_generator(consensus, server_address):
- fingerprints = [desc.nickname for desc in consensus]
- exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
- #
- target_fingerprint = [desc.nickname for desc in consensus if desc.nickname.endswith('target')][0]
- non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
- #
- assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
- assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
- #
- return lambda: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
- #
- if __name__ == '__main__':
- #
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput.')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- args = parser.parse_args()
- #
- num_clients = 2
- num_guards = 2 # number of relays (including guards)
- num_authorities = 2 # will also act as a relay or guard
- num_exits = 3 # will be used only as an exit
- #
- experiment_time = time.time()
- #
- save_data_path = None
- measureme_log_path = None
- measureme = False
- #
- start_time = time.time()
- #
- num_streams_per_client = 1
- logging.info('Starting with {} streams per client'.format(num_streams_per_client))
- #
- experiment = CustomExperiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
- num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
- args.buffer_len, args.wait_range, measureme, test_network=False)
- #
- def sleep_then_run(duration, func):
- logging.info('Sleeping for {} seconds before running {}'.format(duration, func))
- time.sleep(duration)
- return func()
- #
- experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
- #
- logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
- #
|