experiment.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import multiprocessing
  9. import threading
  10. import time
  11. import json
  12. import gzip
  13. import pickle
  14. import tempfile
  15. #
  16. import stem.control
  17. import stem.descriptor.remote
  18. import stem.process
  19. #
  20. import numa
  21. import log_system_usage
  22. import chutney_manager
  23. import throughput_server
  24. import experiment_client
  25. import useful
  26. #
  27. class DummyEnterExit:
  28. def __enter__(self):
  29. return self
  30. #
  31. def __exit__(self, exc_type, exc_val, exc_tb):
  32. pass
  33. #
  34. #
  35. def asdasd():
  36. server_address = ('127.0.0.1', 12353)
  37. #
  38. stop_event = multiprocessing.Event()
  39. server = throughput_server.ThroughputServer(server_address, stop_event)
  40. p = multiprocessing.Process(target=server.run)
  41. p.start()
  42. #
  43. try:
  44. stop_cpu_logging_event = threading.Event()
  45. t = threading.Thread(target=log_system_usage.log_cpu_stats,
  46. args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.5, stop_cpu_logging_event))
  47. t.start()
  48. #
  49. try:
  50. logging.debug('Getting consensus')
  51. try:
  52. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  53. except Exception as e:
  54. raise Exception('Unable to retrieve the consensus') from e
  55. #
  56. fingerprints = experiment_client.get_fingerprints(consensus)
  57. exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
  58. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
  59. #
  60. assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
  61. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  62. #
  63. circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
  64. #
  65. proxy_addresses = []
  66. for control_port in proxy_control_ports:
  67. proxy = {}
  68. proxy['control'] = ('127.0.0.1', control_port)
  69. proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
  70. proxy_addresses.append(proxy)
  71. #
  72. controllers = []
  73. protocol_manager = experiment_client.ExperimentProtocolManager()
  74. #
  75. try:
  76. for proxy_address in proxy_addresses:
  77. controller = experiment_client.ExperimentController(proxy_address['control'])
  78. controller.connect()
  79. # the controller has to attach new streams to circuits, so the
  80. # connection has to stay open until we're done creating streams
  81. #
  82. for _ in range(args.num_streams_per_client):
  83. # make a circuit for each stream
  84. controller.build_circuit(circuit_generator)
  85. time.sleep(0.5)
  86. #
  87. controllers.append(controller)
  88. #
  89. start_event = multiprocessing.Event()
  90. #
  91. for stream_index in range(args.num_streams_per_client):
  92. for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
  93. if args.measureme:
  94. measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
  95. else:
  96. measureme_id = None
  97. #
  98. wait_duration = random.randint(0, args.wait_range)
  99. protocol = experiment_client.build_client_protocol(server_address, proxy_address['socks'],
  100. proxy_address['control'], controller, start_event,
  101. wait_duration=wait_duration, measureme_id=measureme_id,
  102. num_bytes=args.num_bytes, buffer_len=args.buffer_len)
  103. protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
  104. #
  105. #
  106. time.sleep(2)
  107. start_event.set()
  108. #
  109. protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
  110. finally:
  111. for controller in controllers:
  112. controller.disconnect()
  113. #
  114. protocol_manager.stop()
  115. #
  116. finally:
  117. stop_cpu_logging_event.set()
  118. #
  119. t.join()
  120. finally:
  121. stop_event.set()
  122. #
  123. p.join()
  124. #
  125. with gzip.GzipFile(os.path.join(save_data_path, 'server_results.pickle.gz'), 'wb') as f:
  126. pickle.dump([x['results'] for x in server.results], f, protocol=4)
  127. #
  128. #
  129. if __name__ == '__main__':
  130. #
  131. logging.basicConfig(level=logging.DEBUG)
  132. logging.getLogger('stem').setLevel(logging.WARNING)
  133. #
  134. parser = argparse.ArgumentParser(description='Test the network throughput.')
  135. parser.add_argument('num_bytes', type=useful.parse_bytes,
  136. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  137. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  138. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  139. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  140. parser.add_argument('--wait-range', type=int, default=0,
  141. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  142. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  143. parser.add_argument('--chutney-only', action='store_true', help='only start chutney')
  144. parser.add_argument('--no-chutney', action='store_true', help='don\'t start chutney')
  145. args = parser.parse_args()
  146. #
  147. chutney_path = '/home/sengler/code/measureme/chutney'
  148. tor_path = '/home/sengler/code/measureme/tor'
  149. #
  150. if not args.chutney_only:
  151. save_data_path = os.path.join('/home/sengler/data/experiments', str(int(time.time())))
  152. os.mkdir(save_data_path)
  153. if not args.no_chutney:
  154. measureme_log_dir = os.path.join('/ramdisk/sengler/chutney', str(int(time.time())))
  155. os.mkdir(measureme_log_dir)
  156. #
  157. #
  158. nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(2)] + \
  159. [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(2)] + \
  160. [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(1)] + \
  161. [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(2)]
  162. #
  163. numa_remaining = numa.get_numa_overview()
  164. numa_sets = []
  165. for node in nodes:
  166. if not args.chutney_only and not args.no_chutney:
  167. node.options['measureme_log_dir'] = measureme_log_dir
  168. #
  169. num_cpus = node.options['num_cpus']
  170. if num_cpus%2 != 0:
  171. num_cpus += 1
  172. #
  173. (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  174. node.options['numa_settings'] = (numa_node, processors)
  175. numa_sets.append((numa_node, processors))
  176. #
  177. unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
  178. #
  179. nicknames = [nodes[x].guess_nickname(x) for x in range(len(nodes))]
  180. proxy_control_ports = [nodes[x].guess_control_port(x) for x in range(len(nodes)) if ('client', 1) in nodes[x].options.items()]
  181. #
  182. (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
  183. try:
  184. with os.fdopen(fd, mode='w') as f:
  185. f.write(chutney_manager.create_chutney_config(nodes))
  186. #
  187. if args.no_chutney:
  188. asdasd()
  189. else:
  190. try:
  191. with chutney_manager.ChutneyNetwork(chutney_path, tor_path, tmp_network_file) as net:
  192. if args.chutney_only:
  193. try:
  194. logging.info('Press Ctrl-C to stop.')
  195. while True:
  196. time.sleep(30)
  197. #
  198. except KeyboardInterrupt:
  199. print('')
  200. #
  201. else:
  202. fingerprints = []
  203. for nick in nicknames:
  204. fingerprints.append(chutney_manager.read_fingerprint(nick, chutney_path))
  205. #
  206. asdasd()
  207. #
  208. #
  209. finally:
  210. if not args.chutney_only:
  211. for f in os.listdir(measureme_log_dir):
  212. shutil.move(os.path.join(measureme_log_dir, f), os.path.join(save_data_path, f))
  213. #
  214. shutil.rmtree(measureme_log_dir)
  215. #
  216. #
  217. #
  218. finally:
  219. os.remove(tmp_network_file)
  220. #
  221. #