experiment.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import multiprocessing
  9. import threading
  10. import time
  11. import json
  12. import gzip
  13. import pickle
  14. import tempfile
  15. #
  16. import stem.control
  17. import stem.descriptor.remote
  18. import stem.process
  19. #
  20. import numa
  21. import log_system_usage
  22. import chutney_manager
  23. import throughput_server
  24. import experiment_client
  25. import useful
  26. #
  27. class DummyEnterExit:
  28. def __enter__(self):
  29. return self
  30. #
  31. def __exit__(self, exc_type, exc_val, exc_tb):
  32. pass
  33. #
  34. #
  35. class Experiment:
  36. def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
  37. num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder,
  38. buffer_len=None, wait_range=None, measureme=False, test_network=True):
  39. self.save_data_path = save_data_path
  40. self.measureme_log_path = measureme_log_path
  41. self.num_bytes = num_bytes
  42. self.num_streams_per_client = num_streams_per_client
  43. self.num_clients = num_clients
  44. self.num_guards = num_guards
  45. self.num_authorities = num_authorities
  46. self.num_exits = num_exits
  47. self.circuit_generator_builder = circuit_generator_builder
  48. self.buffer_len = buffer_len
  49. self.wait_range = wait_range
  50. self.measureme = measureme
  51. self.test_network = test_network
  52. #
  53. self.chutney_path = '/home/sengler/code/measureme/chutney'
  54. self.tor_path = '/home/sengler/code/measureme/tor'
  55. self.server_address = ('127.0.0.1', 12353)
  56. #
  57. self.nodes = None
  58. self.proxy_control_ports = None
  59. #
  60. self.configure_chutney()
  61. #
  62. if save_data_path is not None:
  63. with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f:
  64. settings = {}
  65. settings['save_data_path'] = self.save_data_path
  66. settings['num_bytes'] = self.num_bytes
  67. settings['num_streams_per_client'] = self.num_streams_per_client
  68. settings['num_clients'] = self.num_clients
  69. settings['num_guards'] = self.num_guards
  70. settings['num_authorities'] = self.num_authorities
  71. settings['num_exits'] = self.num_exits
  72. settings['buffer_len'] = self.buffer_len
  73. settings['wait_range'] = self.wait_range
  74. settings['measureme'] = self.measureme
  75. settings['chutney_path'] = self.chutney_path
  76. settings['tor_path'] = self.tor_path
  77. settings['server_address'] = self.server_address
  78. #
  79. json.dump(settings, f)
  80. #
  81. #
  82. #
  83. def configure_chutney(self):
  84. self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  85. [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  86. [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  87. [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  88. #
  89. for node in self.nodes:
  90. if self.measureme_log_path is not None:
  91. node.options['measureme_log_dir'] = measureme_log_path
  92. #
  93. #
  94. numa_remaining = numa.get_numa_overview()
  95. numa_sets = {}
  96. for (node, index) in zip(self.nodes, range(len(self.nodes))):
  97. num_cpus = node.options['num_cpus']
  98. if num_cpus%2 != 0:
  99. num_cpus += 1
  100. #
  101. (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  102. node.options['numa_settings'] = (numa_node, processors)
  103. numa_sets[node.guess_nickname(index)] = (numa_node, processors)
  104. #
  105. #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
  106. #
  107. #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  108. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  109. # TODO: ^^ improve this
  110. #
  111. if self.save_data_path is not None:
  112. with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f:
  113. pickle.dump(numa_sets, f, protocol=4)
  114. #
  115. #
  116. #
  117. def start_chutney(self, next_action=None):
  118. #
  119. (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
  120. try:
  121. with os.fdopen(fd, mode='w') as f:
  122. f.write(chutney_manager.create_chutney_config(self.nodes))
  123. #
  124. try:
  125. chutney_network = None
  126. num_attempts = 0
  127. while chutney_network is None:
  128. try:
  129. num_attempts += 1
  130. verification_rounds = 1 if self.test_network else 0
  131. chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds)
  132. except KeyboardInterrupt:
  133. raise
  134. except:
  135. logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
  136. #
  137. #
  138. logging.debug('Last 40 lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-40:]))
  139. #with chutney_network as net:
  140. with chutney_network:
  141. nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  142. fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames}
  143. #
  144. if self.save_data_path is not None:
  145. with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f:
  146. pickle.dump(fingerprints, f, protocol=4)
  147. #
  148. #
  149. if next_action is not None:
  150. next_action()
  151. #
  152. #
  153. finally:
  154. if self.measureme_log_path is not None:
  155. for f in os.listdir(self.measureme_log_path):
  156. shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f))
  157. #
  158. shutil.rmtree(self.measureme_log_path)
  159. #
  160. #
  161. finally:
  162. if self.save_data_path is not None:
  163. shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file)))
  164. #
  165. os.remove(tmp_network_file)
  166. #
  167. #
  168. def start_throughput_server(self, next_action=None):
  169. stop_event = multiprocessing.Event()
  170. server = throughput_server.ThroughputServer(self.server_address, stop_event)
  171. def server_run_wrapper():
  172. try:
  173. server.run()
  174. except KeyboardInterrupt:
  175. logging.info('Stopping server (KeyboardInterrupt)')
  176. #
  177. #
  178. p = multiprocessing.Process(target=server_run_wrapper)
  179. p.start()
  180. #
  181. try:
  182. if next_action is not None:
  183. next_action()
  184. #
  185. finally:
  186. stop_event.set()
  187. #
  188. p.join()
  189. #
  190. if self.save_data_path is not None:
  191. with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
  192. pickle.dump([x['results'] for x in server.results], f, protocol=4)
  193. #
  194. #
  195. #
  196. def start_system_logging(self, next_action=None):
  197. stop_cpu_logging_event = multiprocessing.Event()
  198. p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
  199. args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event))
  200. p.start()
  201. #
  202. try:
  203. if next_action is not None:
  204. next_action()
  205. #
  206. finally:
  207. stop_cpu_logging_event.set()
  208. #
  209. p.join()
  210. #
  211. def start_throughput_clients(self):
  212. logging.debug('Getting consensus')
  213. try:
  214. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  215. except Exception as e:
  216. raise Exception('Unable to retrieve the consensus') from e
  217. #
  218. circuit_generator = self.circuit_generator_builder(consensus, self.server_address)
  219. #
  220. proxy_addresses = []
  221. for control_port in self.proxy_control_ports:
  222. proxy = {}
  223. proxy['control'] = ('127.0.0.1', control_port)
  224. proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
  225. proxy_addresses.append(proxy)
  226. #
  227. controllers = []
  228. protocol_manager = experiment_client.ExperimentProtocolManager()
  229. #
  230. try:
  231. for proxy_address in proxy_addresses:
  232. controller = experiment_client.ExperimentController(proxy_address['control'])
  233. controller.connect()
  234. # the controller has to attach new streams to circuits, so the
  235. # connection has to stay open until we're done creating streams
  236. #
  237. for _ in range(self.num_streams_per_client):
  238. # make a circuit for each stream
  239. controller.build_circuit(circuit_generator)
  240. time.sleep(0.5)
  241. #
  242. controllers.append(controller)
  243. #
  244. start_event = multiprocessing.Event()
  245. #
  246. used_measureme_ids = set()
  247. for stream_index in range(self.num_streams_per_client):
  248. for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
  249. if self.measureme:
  250. measureme_id = stream_index*len(controllers) + controller_index + 1
  251. assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
  252. used_measureme_ids |= set([measureme_id])
  253. else:
  254. measureme_id = None
  255. #
  256. wait_duration = random.randint(0, self.wait_range)
  257. protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
  258. proxy_address['control'], controller, start_event,
  259. wait_duration=wait_duration, measureme_id=measureme_id,
  260. num_bytes=self.num_bytes, buffer_len=self.buffer_len)
  261. protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
  262. #
  263. #
  264. time.sleep(2)
  265. start_event.set()
  266. #
  267. protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
  268. finally:
  269. for controller in controllers:
  270. controller.disconnect()
  271. #
  272. protocol_manager.stop()
  273. #
  274. #
  275. #
  276. def wait_for_keyboard_interrupt():
  277. try:
  278. logging.info('Press Ctrl-C to stop.')
  279. while True:
  280. time.sleep(30)
  281. #
  282. except KeyboardInterrupt:
  283. print('')
  284. #
  285. #
  286. def build_circuit_generator(consensus, server_address):
  287. fingerprints = experiment_client.get_fingerprints(consensus)
  288. exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
  289. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
  290. #
  291. assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
  292. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  293. #
  294. return lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
  295. #
  296. '''
  297. if __name__ == '__main__':
  298. #
  299. logging.basicConfig(level=logging.DEBUG)
  300. logging.getLogger('stem').setLevel(logging.WARNING)
  301. #
  302. parser = argparse.ArgumentParser(description='Test the network throughput.')
  303. parser.add_argument('num_bytes', type=useful.parse_bytes,
  304. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  305. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  306. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  307. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  308. parser.add_argument('--wait-range', type=int, default=0,
  309. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  310. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  311. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  312. args = parser.parse_args()
  313. #
  314. experiment_time = time.time()
  315. #
  316. if args.debugging != 'only-chutney':
  317. save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  318. os.mkdir(save_data_path)
  319. else:
  320. save_data_path = None
  321. #
  322. if args.debugging is not None:
  323. measureme_log_path = None
  324. else:
  325. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  326. os.mkdir(measureme_log_path)
  327. #
  328. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client,
  329. args.buffer_len, args.wait_range, args.measureme)
  330. #
  331. start_time = time.time()
  332. #
  333. if args.debugging == 'no-chutney':
  334. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  335. elif args.debugging == 'only-chutney':
  336. experiment.start_chutney(wait_for_keyboard_interrupt)
  337. else:
  338. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  339. #
  340. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  341. #
  342. '''
  343. if __name__ == '__main__':
  344. #
  345. logging.basicConfig(level=logging.DEBUG)
  346. logging.getLogger('stem').setLevel(logging.WARNING)
  347. #
  348. parser = argparse.ArgumentParser(description='Test the network throughput.')
  349. parser.add_argument('num_bytes', type=useful.parse_bytes,
  350. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  351. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  352. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  353. parser.add_argument('--wait-range', type=int, default=0,
  354. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  355. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  356. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  357. args = parser.parse_args()
  358. #
  359. num_clients = 4
  360. num_guards = 6
  361. num_authorities = 2 # will also act as a guard
  362. num_exits = 1
  363. #
  364. experiment_time = time.time()
  365. #
  366. if args.debugging != 'only-chutney':
  367. base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  368. os.mkdir(base_save_data_path)
  369. else:
  370. base_save_data_path = None
  371. #
  372. if args.debugging is not None:
  373. measureme_log_path = None
  374. else:
  375. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  376. #
  377. start_time = time.time()
  378. all_data_paths = []
  379. #
  380. #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]:
  381. for num_streams_per_client in [1, 2, 3, 4, 5]:
  382. #for num_streams_per_client in [6,7,8]:
  383. logging.info('Starting with {} streams per client'.format(num_streams_per_client))
  384. save_data_path = None
  385. #
  386. if base_save_data_path is not None:
  387. save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients))
  388. all_data_paths.append(save_data_path)
  389. os.mkdir(save_data_path)
  390. #
  391. if measureme_log_path is not None:
  392. os.mkdir(measureme_log_path)
  393. #
  394. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
  395. num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
  396. args.buffer_len, args.wait_range, args.measureme)
  397. #
  398. if args.debugging == 'no-chutney':
  399. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  400. elif args.debugging == 'only-chutney':
  401. experiment.start_chutney(wait_for_keyboard_interrupt)
  402. else:
  403. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  404. #
  405. #
  406. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  407. #
  408. import parse_measureme_logs
  409. for path in all_data_paths:
  410. logging.info('Parsing logs for {}'.format(path))
  411. measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')]
  412. #
  413. logs = []
  414. for name in measureme_tor_logs:
  415. with open(name, 'r') as f:
  416. logs.append(parse_measureme_logs.read_log(f))
  417. #
  418. #
  419. streams = parse_measureme_logs.get_streams_from_logs(logs)
  420. #
  421. with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f:
  422. pickle.dump(streams, f, protocol=4)
  423. #
  424. #
  425. #