relay_working_experiment.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import multiprocessing
  9. import threading
  10. import time
  11. import json
  12. import gzip
  13. import pickle
  14. import tempfile
  15. #
  16. import stem.control
  17. import stem.descriptor.remote
  18. import stem.process
  19. #
  20. import numa
  21. import log_system_usage
  22. import chutney_manager
  23. import throughput_server
  24. import experiment_client
  25. import experiment
  26. import useful
  27. #
  28. class CustomExperiment(experiment.Experiment):
  29. def __init__(self, *args, **kwargs):
  30. super().__init__(*args, **kwargs)
  31. #
  32. self.chutney_path = '/home/sengler/code/measureme/chutney'
  33. self.tor_path = '/home/sengler/code/parallel/tor-single'
  34. #
  35. def configure_chutney(self):
  36. #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  37. # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  38. # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  39. # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  40. #
  41. new_tor_path = '/home/sengler/code/parallel/tor-parallel/src/app/tor'
  42. valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
  43. #
  44. self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  45. [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  46. [chutney_manager.Node(tag='target', tor=new_tor_path, valgrind_settings=valgrind_settings, relay=1, torrc='relay-non-exit.tmpl')] + \
  47. [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  48. [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  49. #
  50. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  51. # TODO: ^^ improve this
  52. #
  53. #
  54. def build_circuit_generator(consensus, server_address):
  55. fingerprints = [desc.nickname for desc in consensus]
  56. exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
  57. #
  58. target_fingerprint = [desc.nickname for desc in consensus if desc.nickname.endswith('target')][0]
  59. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
  60. #
  61. assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
  62. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  63. #
  64. return lambda: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
  65. #
  66. if __name__ == '__main__':
  67. #
  68. logging.basicConfig(level=logging.DEBUG)
  69. logging.getLogger('stem').setLevel(logging.WARNING)
  70. #
  71. parser = argparse.ArgumentParser(description='Test the network throughput.')
  72. parser.add_argument('num_bytes', type=useful.parse_bytes,
  73. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  74. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  75. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  76. parser.add_argument('--wait-range', type=int, default=0,
  77. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  78. args = parser.parse_args()
  79. #
  80. num_clients = 2
  81. num_guards = 2 # number of relays (including guards)
  82. num_authorities = 2 # will also act as a relay or guard
  83. num_exits = 3 # will be used only as an exit
  84. #
  85. experiment_time = time.time()
  86. #
  87. save_data_path = None
  88. measureme_log_path = None
  89. measureme = False
  90. #
  91. start_time = time.time()
  92. #
  93. num_streams_per_client = 1
  94. logging.info('Starting with {} streams per client'.format(num_streams_per_client))
  95. #
  96. experiment = CustomExperiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
  97. num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
  98. args.buffer_len, args.wait_range, measureme, test_network=False)
  99. #
  100. def sleep_then_run(duration, func):
  101. logging.info('Sleeping for {} seconds before running {}'.format(duration, func))
  102. time.sleep(duration)
  103. return func()
  104. #
  105. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
  106. #
  107. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  108. #