experiment.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import multiprocessing
  9. import threading
  10. import time
  11. import json
  12. import gzip
  13. import pickle
  14. import tempfile
  15. #
  16. import stem.control
  17. import stem.descriptor.remote
  18. import stem.process
  19. import numpy as np
  20. #
  21. import numa
  22. import log_system_usage
  23. import chutney_manager
  24. import throughput_server
  25. import experiment_client
  26. import useful
  27. #
  28. class DummyEnterExit:
  29. def __enter__(self):
  30. return self
  31. #
  32. def __exit__(self, exc_type, exc_val, exc_tb):
  33. pass
  34. #
  35. #
  36. class RepeatExperimentError(Exception):
  37. pass
  38. #
  39. class Experiment:
  40. def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
  41. num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder,
  42. buffer_len=None, wait_range=None, measureme=False, test_network=True):
  43. self.save_data_path = save_data_path
  44. self.measureme_log_path = measureme_log_path
  45. self.num_bytes = num_bytes
  46. self.num_streams_per_client = num_streams_per_client
  47. self.num_clients = num_clients
  48. self.num_guards = num_guards
  49. self.num_authorities = num_authorities
  50. self.num_exits = num_exits
  51. self.circuit_generator_builder = circuit_generator_builder
  52. self.buffer_len = buffer_len
  53. self.wait_range = wait_range
  54. self.measureme = measureme
  55. self.test_network = test_network
  56. #
  57. self.chutney_path = '/home/sengler/code/measureme/chutney'
  58. self.tor_path = '/home/sengler/code/measureme/tor'
  59. self.server_address = ('127.0.0.1', 12353)
  60. #
  61. self.nodes = None
  62. self.proxy_control_ports = None
  63. #
  64. self.configure_chutney()
  65. #
  66. if save_data_path is not None:
  67. with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f:
  68. settings = {}
  69. settings['save_data_path'] = self.save_data_path
  70. settings['num_bytes'] = self.num_bytes
  71. settings['num_streams_per_client'] = self.num_streams_per_client
  72. settings['num_clients'] = self.num_clients
  73. settings['num_guards'] = self.num_guards
  74. settings['num_authorities'] = self.num_authorities
  75. settings['num_exits'] = self.num_exits
  76. settings['buffer_len'] = self.buffer_len
  77. settings['wait_range'] = self.wait_range
  78. settings['measureme'] = self.measureme
  79. settings['chutney_path'] = self.chutney_path
  80. settings['tor_path'] = self.tor_path
  81. settings['server_address'] = self.server_address
  82. #
  83. json.dump(settings, f)
  84. #
  85. #
  86. #
  87. def configure_chutney(self):
  88. self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  89. [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  90. [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  91. [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  92. #
  93. for node in self.nodes:
  94. if self.measureme_log_path is not None:
  95. node.options['measureme_log_dir'] = measureme_log_path
  96. #
  97. #
  98. numa_remaining = numa.get_numa_overview()
  99. numa_sets = {}
  100. for (node, index) in zip(self.nodes, range(len(self.nodes))):
  101. num_cpus = node.options['num_cpus']
  102. if num_cpus%2 != 0:
  103. num_cpus += 1
  104. #
  105. (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  106. node.options['numa_settings'] = (numa_node, processors)
  107. numa_sets[node.guess_nickname(index)] = (numa_node, processors)
  108. #
  109. #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
  110. #
  111. #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  112. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  113. # TODO: ^^ improve this
  114. #
  115. if self.save_data_path is not None:
  116. with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f:
  117. pickle.dump(numa_sets, f, protocol=4)
  118. #
  119. #
  120. #
  121. def start_chutney(self, next_action=None):
  122. #
  123. (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
  124. try:
  125. with os.fdopen(fd, mode='w') as f:
  126. f.write(chutney_manager.create_chutney_config(self.nodes))
  127. #
  128. try:
  129. chutney_network = None
  130. num_attempts = 0
  131. start_time = time.time()
  132. while chutney_network is None:
  133. try:
  134. num_attempts += 1
  135. verification_rounds = 1 if self.test_network else 0
  136. chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds)
  137. except KeyboardInterrupt:
  138. raise
  139. except:
  140. logging.exception('The Chutney network failed to start (attempt {}). Trying again...'.format(num_attempts))
  141. if num_attempts > 4:
  142. logging.exception('Just kidding, we\'ve tried too many times.')
  143. raise
  144. #
  145. #
  146. #
  147. num_lines_to_print = 50
  148. time_to_create_network = time.time()-start_time
  149. logging.debug('Chutney network started in {} seconds ({:.2f} minutes)'.format(round(time_to_create_network), time_to_create_network/60))
  150. logging.debug('Last '+str(num_lines_to_print)+' lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-num_lines_to_print:]))
  151. if self.save_data_path is not None:
  152. with open(os.path.join(self.save_data_path, 'chutney-startup.log'), 'w') as f:
  153. f.write(chutney_network.startup_output)
  154. #
  155. #
  156. #with chutney_network as net:
  157. with chutney_network:
  158. nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  159. fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames}
  160. #
  161. if self.save_data_path is not None:
  162. with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f:
  163. pickle.dump(fingerprints, f, protocol=4)
  164. #
  165. #
  166. if next_action is not None:
  167. next_action()
  168. #
  169. #
  170. finally:
  171. if self.measureme_log_path is not None:
  172. for f in os.listdir(self.measureme_log_path):
  173. shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f))
  174. #
  175. shutil.rmtree(self.measureme_log_path)
  176. #
  177. #
  178. finally:
  179. if self.save_data_path is not None:
  180. shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file)))
  181. #
  182. os.remove(tmp_network_file)
  183. #
  184. #
  185. def start_throughput_server(self, next_action=None):
  186. stop_event = multiprocessing.Event()
  187. server = throughput_server.ThroughputServer(self.server_address, stop_event)
  188. def server_run_wrapper():
  189. try:
  190. server.run()
  191. except KeyboardInterrupt:
  192. logging.info('Stopping server (KeyboardInterrupt)')
  193. #
  194. #
  195. p = multiprocessing.Process(target=server_run_wrapper)
  196. p.start()
  197. #
  198. try:
  199. if next_action is not None:
  200. next_action()
  201. #
  202. finally:
  203. stop_event.set()
  204. #
  205. p.join()
  206. #
  207. results = [x['results'] for x in server.results]
  208. results_brief = []
  209. #
  210. for r in results:
  211. to_add = {}
  212. to_add['first_byte'] = r['deltas']['timestamps'][0]
  213. to_add['last_byte'] = r['deltas']['timestamps'][-1]
  214. to_add['data_size'] = r['data_size']
  215. to_add['measured_data_size'] = int(np.sum(r['deltas']['bytes']))
  216. to_add['custom_data'] = json.loads(r['custom_data'].decode('utf-8'))
  217. to_add['time_started_push'] = r['time_started_push']
  218. results_brief.append(to_add)
  219. #
  220. num_expected_results = len(self.proxy_control_ports)*self.num_streams_per_client
  221. #
  222. threshold = 0.95
  223. if len(results)/num_expected_results < threshold:
  224. logging.warn('Less than {}% of streams completed: {}/{}'.format(round(threshold*100), len(results), num_expected_results))
  225. raise RepeatExperimentError
  226. #
  227. if self.save_data_path is not None:
  228. logging.info('Starting to save server results...')
  229. with open(os.path.join(self.save_data_path, 'server_results_brief.json'), 'w') as f:
  230. json.dump(results_brief, f)
  231. #
  232. with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
  233. pickle.dump(results, f, protocol=4)
  234. #
  235. #
  236. if len(results) > 0:
  237. avg_data_size = sum([x['data_size'] for x in results])/len(results)
  238. avg_transfer_rate = sum([x['transfer_rate'] for x in results])/len(results)
  239. time_of_first_byte = min([x['time_of_first_byte'] for x in results])
  240. time_of_last_byte = max([x['time_of_last_byte'] for x in results])
  241. total_transfer_rate = sum([x['data_size'] for x in results])/(time_of_last_byte-time_of_first_byte)
  242. #
  243. logging.info('Group size: %d/%d', len(results), num_expected_results)
  244. logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
  245. logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
  246. logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
  247. #
  248. #
  249. def start_system_logging(self, next_action=None):
  250. stop_cpu_logging_event = multiprocessing.Event()
  251. p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
  252. args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.5, [], stop_cpu_logging_event))
  253. p.start()
  254. #
  255. try:
  256. if next_action is not None:
  257. next_action()
  258. #
  259. finally:
  260. stop_cpu_logging_event.set()
  261. #
  262. p.join()
  263. #
  264. def start_throughput_clients(self):
  265. circuit_generator = None
  266. consensus_attempts_remaining = 10
  267. num_expecting_relays = len([x for x in range(len(self.nodes)) if ('client', 1) not in self.nodes[x].options.items()])
  268. while circuit_generator is None and consensus_attempts_remaining > 0:
  269. logging.debug('Getting consensus')
  270. try:
  271. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 10000),)).run()
  272. except Exception as e:
  273. raise Exception('Unable to retrieve the consensus') from e
  274. #
  275. num_relays = len([1 for desc in consensus])
  276. logging.info('Got consensus with {}/{} descriptors'.format(num_relays, num_expecting_relays))
  277. #
  278. if num_relays != num_expecting_relays:
  279. logging.info('Not enough descriptors, trying again in 20 seconds...')
  280. time.sleep(20)
  281. else:
  282. try:
  283. circuit_generator = self.circuit_generator_builder(consensus, self.server_address)
  284. except AssertionError:
  285. logging.exception('Problem with the consensus, trying again in 10 seconds...')
  286. time.sleep(10)
  287. #
  288. #
  289. consensus_attempts_remaining -= 1
  290. #
  291. assert circuit_generator is not None, 'Could not build the circuit generator'
  292. #
  293. proxy_addresses = []
  294. for control_port in self.proxy_control_ports:
  295. proxy = {}
  296. proxy['control'] = ('127.0.0.1', control_port)
  297. proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
  298. proxy_addresses.append(proxy)
  299. #
  300. controllers = []
  301. protocol_manager = experiment_client.ExperimentProtocolManager()
  302. #
  303. client_info = {}
  304. client_info['clients'] = []
  305. #
  306. circuit_counter = 0
  307. try:
  308. for proxy_address in proxy_addresses:
  309. controller = experiment_client.ExperimentController(proxy_address['control'])
  310. controller.connect()
  311. # the controller has to attach new streams to circuits, so the
  312. # connection has to stay open until we're done creating streams
  313. #
  314. for _ in range(self.num_streams_per_client):
  315. # make a circuit for each stream
  316. controller.build_circuit(circuit_generator, circuit_counter)
  317. circuit_counter += 1
  318. #time.sleep(0.05)
  319. #
  320. controllers.append(controller)
  321. #
  322. start_event = multiprocessing.Event()
  323. #
  324. #used_measureme_ids = set()
  325. measureme_id_counter = 1
  326. for stream_index in range(self.num_streams_per_client):
  327. for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
  328. #if self.measureme:
  329. # measureme_id = stream_index*len(controllers) + controller_index + 1
  330. # assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
  331. # used_measureme_ids |= set([measureme_id])
  332. #else:
  333. # measureme_id = None
  334. measureme_id = measureme_id_counter
  335. measureme_id_counter += 1
  336. #
  337. wait_duration = random.randint(0, self.wait_range)
  338. protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
  339. proxy_address['control'], controller, start_event, self.measureme,
  340. wait_duration=wait_duration, measureme_id=measureme_id,
  341. num_bytes=self.num_bytes, buffer_len=self.buffer_len)
  342. protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
  343. #
  344. client_info['clients'].append({'measureme_id':measureme_id, 'wait_duration':wait_duration})
  345. #
  346. #
  347. time.sleep(2)
  348. client_info['start_time'] = time.time()
  349. start_event.set()
  350. #
  351. # unfortunately mixing threads and processes can cause python to deadlock, and some client protocols
  352. # have been found to deadlock at about 1/1000 probability, so we must provide a timeout to kill
  353. # these deadlocked protocol processes
  354. # see: https://codewithoutrules.com/2018/09/04/python-multiprocessing/
  355. protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)), kill_timeout=20*60)
  356. logging.debug('Client protocols have finished')
  357. finally:
  358. for controller in controllers:
  359. controller.disconnect()
  360. #
  361. logging.debug('Protocol manager stopping...')
  362. protocol_manager.stop()
  363. logging.debug('Protocol manager has finished')
  364. #
  365. if self.save_data_path is not None:
  366. with gzip.GzipFile(os.path.join(self.save_data_path, 'client_info.pickle.gz'), 'wb') as f:
  367. pickle.dump(client_info, f, protocol=4)
  368. #
  369. #
  370. #
  371. #
  372. def wait_for_keyboard_interrupt():
  373. try:
  374. logging.info('Press Ctrl-C to stop.')
  375. while True:
  376. time.sleep(30)
  377. #
  378. except KeyboardInterrupt:
  379. print('')
  380. #
  381. #
  382. def build_circuit_generator(consensus, server_address):
  383. fingerprints = experiment_client.get_fingerprints(consensus)
  384. exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
  385. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
  386. #
  387. assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
  388. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  389. #
  390. return lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
  391. #
  392. '''
  393. if __name__ == '__main__':
  394. #
  395. logging.basicConfig(level=logging.DEBUG)
  396. logging.getLogger('stem').setLevel(logging.WARNING)
  397. #
  398. parser = argparse.ArgumentParser(description='Test the network throughput.')
  399. parser.add_argument('num_bytes', type=useful.parse_bytes,
  400. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  401. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  402. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  403. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  404. parser.add_argument('--wait-range', type=int, default=0,
  405. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  406. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  407. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  408. args = parser.parse_args()
  409. #
  410. experiment_time = time.time()
  411. #
  412. if args.debugging != 'only-chutney':
  413. save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  414. os.mkdir(save_data_path)
  415. else:
  416. save_data_path = None
  417. #
  418. if args.debugging is not None:
  419. measureme_log_path = None
  420. else:
  421. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  422. os.mkdir(measureme_log_path)
  423. #
  424. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client,
  425. args.buffer_len, args.wait_range, args.measureme)
  426. #
  427. start_time = time.time()
  428. #
  429. if args.debugging == 'no-chutney':
  430. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  431. elif args.debugging == 'only-chutney':
  432. experiment.start_chutney(wait_for_keyboard_interrupt)
  433. else:
  434. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  435. #
  436. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  437. #
  438. '''
  439. if __name__ == '__main__':
  440. #
  441. logging.basicConfig(level=logging.DEBUG)
  442. logging.getLogger('stem').setLevel(logging.WARNING)
  443. #
  444. parser = argparse.ArgumentParser(description='Test the network throughput.')
  445. parser.add_argument('num_bytes', type=useful.parse_bytes,
  446. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  447. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  448. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  449. parser.add_argument('--wait-range', type=int, default=0,
  450. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  451. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  452. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  453. args = parser.parse_args()
  454. #
  455. num_clients = 4
  456. num_guards = 6
  457. num_authorities = 2 # will also act as a guard
  458. num_exits = 1
  459. #
  460. experiment_time = time.time()
  461. #
  462. if args.debugging != 'only-chutney':
  463. base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  464. os.mkdir(base_save_data_path)
  465. else:
  466. base_save_data_path = None
  467. #
  468. if args.debugging is not None:
  469. measureme_log_path = None
  470. else:
  471. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  472. #
  473. start_time = time.time()
  474. all_data_paths = []
  475. #
  476. #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]:
  477. for num_streams_per_client in [1, 2, 3, 4, 5]:
  478. #for num_streams_per_client in [6,7,8]:
  479. logging.info('Starting with {} streams per client'.format(num_streams_per_client))
  480. save_data_path = None
  481. #
  482. if base_save_data_path is not None:
  483. save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients))
  484. all_data_paths.append(save_data_path)
  485. os.mkdir(save_data_path)
  486. #
  487. if measureme_log_path is not None:
  488. os.mkdir(measureme_log_path)
  489. #
  490. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
  491. num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
  492. args.buffer_len, args.wait_range, args.measureme)
  493. #
  494. if args.debugging == 'no-chutney':
  495. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  496. elif args.debugging == 'only-chutney':
  497. experiment.start_chutney(wait_for_keyboard_interrupt)
  498. else:
  499. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  500. #
  501. #
  502. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  503. #
  504. import parse_measureme_logs
  505. for path in all_data_paths:
  506. logging.info('Parsing logs for {}'.format(path))
  507. measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')]
  508. #
  509. logs = []
  510. for name in measureme_tor_logs:
  511. with open(name, 'r') as f:
  512. logs.append(parse_measureme_logs.read_log(f))
  513. #
  514. #
  515. streams = parse_measureme_logs.get_streams_from_logs(logs)
  516. #
  517. with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f:
  518. pickle.dump(streams, f, protocol=4)
  519. #
  520. #
  521. #