experiment.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import multiprocessing
  9. import threading
  10. import time
  11. import json
  12. import gzip
  13. import pickle
  14. import tempfile
  15. #
  16. import stem.control
  17. import stem.descriptor.remote
  18. import stem.process
  19. #
  20. import numa
  21. import log_system_usage
  22. import chutney_manager
  23. import throughput_server
  24. import experiment_client
  25. import useful
  26. #
  27. class DummyEnterExit:
  28. def __enter__(self):
  29. return self
  30. #
  31. def __exit__(self, exc_type, exc_val, exc_tb):
  32. pass
  33. #
  34. #
  35. class Experiment:
  36. def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
  37. num_clients, num_guards, num_authorities, num_exits,
  38. buffer_len=None, wait_range=None, measureme=False):
  39. self.save_data_path = save_data_path
  40. self.measureme_log_path = measureme_log_path
  41. self.num_bytes = num_bytes
  42. self.num_streams_per_client = num_streams_per_client
  43. self.num_clients = num_clients
  44. self.num_guards = num_guards
  45. self.num_authorities = num_authorities
  46. self.num_exits = num_exits
  47. self.buffer_len = buffer_len
  48. self.wait_range = wait_range
  49. self.measureme = measureme
  50. #
  51. self.chutney_path = '/home/sengler/code/measureme/chutney'
  52. self.tor_path = '/home/sengler/code/measureme/tor'
  53. self.server_address = ('127.0.0.1', 12353)
  54. #
  55. self.nodes = None
  56. self.proxy_control_ports = None
  57. #
  58. self.configure_chutney()
  59. #
  60. if save_data_path is not None:
  61. with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f:
  62. settings = {}
  63. settings['save_data_path'] = self.save_data_path
  64. settings['num_bytes'] = self.num_bytes
  65. settings['num_streams_per_client'] = self.num_streams_per_client
  66. settings['num_clients'] = self.num_clients
  67. settings['num_guards'] = self.num_guards
  68. settings['num_authorities'] = self.num_authorities
  69. settings['num_exits'] = self.num_exits
  70. settings['buffer_len'] = self.buffer_len
  71. settings['wait_range'] = self.wait_range
  72. settings['measureme'] = self.measureme
  73. settings['chutney_path'] = self.chutney_path
  74. settings['tor_path'] = self.tor_path
  75. settings['server_address'] = self.server_address
  76. #
  77. json.dump(settings, f)
  78. #
  79. #
  80. #
  81. def configure_chutney(self):
  82. self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  83. [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  84. [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  85. [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  86. #
  87. for node in self.nodes:
  88. if self.measureme_log_path is not None:
  89. node.options['measureme_log_dir'] = measureme_log_path
  90. #
  91. #
  92. numa_remaining = numa.get_numa_overview()
  93. numa_sets = {}
  94. for (node, index) in zip(self.nodes, range(len(self.nodes))):
  95. num_cpus = node.options['num_cpus']
  96. if num_cpus%2 != 0:
  97. num_cpus += 1
  98. #
  99. (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  100. node.options['numa_settings'] = (numa_node, processors)
  101. numa_sets[node.guess_nickname(index)] = (numa_node, processors)
  102. #
  103. #unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
  104. #
  105. #nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  106. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  107. # TODO: ^^ improve this
  108. #
  109. if self.save_data_path is not None:
  110. with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f:
  111. pickle.dump(numa_sets, f, protocol=4)
  112. #
  113. #
  114. #
  115. def start_chutney(self, next_action=None):
  116. #
  117. (fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
  118. try:
  119. with os.fdopen(fd, mode='w') as f:
  120. f.write(chutney_manager.create_chutney_config(self.nodes))
  121. #
  122. try:
  123. chutney_network = None
  124. num_attemtps = 0
  125. while chutney_network is None:
  126. try:
  127. num_attemtps += 1
  128. chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file)
  129. except KeyboardInterrupt:
  130. raise
  131. except:
  132. logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
  133. #
  134. #
  135. #with chutney_network as net:
  136. with chutney_network:
  137. nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
  138. fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames}
  139. #
  140. if self.save_data_path is not None:
  141. with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f:
  142. pickle.dump(fingerprints, f, protocol=4)
  143. #
  144. #
  145. if next_action is not None:
  146. next_action()
  147. #
  148. #
  149. finally:
  150. if self.measureme_log_path is not None:
  151. for f in os.listdir(self.measureme_log_path):
  152. shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f))
  153. #
  154. shutil.rmtree(self.measureme_log_path)
  155. #
  156. #
  157. finally:
  158. if self.save_data_path is not None:
  159. shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file)))
  160. #
  161. os.remove(tmp_network_file)
  162. #
  163. #
  164. def start_throughput_server(self, next_action=None):
  165. stop_event = multiprocessing.Event()
  166. server = throughput_server.ThroughputServer(self.server_address, stop_event)
  167. p = multiprocessing.Process(target=server.run)
  168. p.start()
  169. #
  170. try:
  171. if next_action is not None:
  172. next_action()
  173. #
  174. finally:
  175. stop_event.set()
  176. #
  177. p.join()
  178. #
  179. with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
  180. pickle.dump([x['results'] for x in server.results], f, protocol=4)
  181. #
  182. #
  183. def start_system_logging(self, next_action=None):
  184. stop_cpu_logging_event = multiprocessing.Event()
  185. p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
  186. args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event))
  187. p.start()
  188. #
  189. try:
  190. if next_action is not None:
  191. next_action()
  192. #
  193. finally:
  194. stop_cpu_logging_event.set()
  195. #
  196. p.join()
  197. #
  198. def start_throughput_clients(self):
  199. logging.debug('Getting consensus')
  200. try:
  201. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  202. except Exception as e:
  203. raise Exception('Unable to retrieve the consensus') from e
  204. #
  205. fingerprints = experiment_client.get_fingerprints(consensus)
  206. exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, self.server_address)
  207. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
  208. #
  209. assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
  210. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  211. #
  212. circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
  213. #
  214. proxy_addresses = []
  215. for control_port in self.proxy_control_ports:
  216. proxy = {}
  217. proxy['control'] = ('127.0.0.1', control_port)
  218. proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
  219. proxy_addresses.append(proxy)
  220. #
  221. controllers = []
  222. protocol_manager = experiment_client.ExperimentProtocolManager()
  223. #
  224. try:
  225. for proxy_address in proxy_addresses:
  226. controller = experiment_client.ExperimentController(proxy_address['control'])
  227. controller.connect()
  228. # the controller has to attach new streams to circuits, so the
  229. # connection has to stay open until we're done creating streams
  230. #
  231. for _ in range(self.num_streams_per_client):
  232. # make a circuit for each stream
  233. controller.build_circuit(circuit_generator)
  234. time.sleep(0.5)
  235. #
  236. controllers.append(controller)
  237. #
  238. start_event = multiprocessing.Event()
  239. #
  240. used_measureme_ids = set()
  241. for stream_index in range(self.num_streams_per_client):
  242. for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
  243. if self.measureme:
  244. measureme_id = stream_index*len(controllers) + controller_index + 1
  245. assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
  246. used_measureme_ids |= set([measureme_id])
  247. else:
  248. measureme_id = None
  249. #
  250. wait_duration = random.randint(0, self.wait_range)
  251. protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
  252. proxy_address['control'], controller, start_event,
  253. wait_duration=wait_duration, measureme_id=measureme_id,
  254. num_bytes=self.num_bytes, buffer_len=self.buffer_len)
  255. protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
  256. #
  257. #
  258. time.sleep(2)
  259. start_event.set()
  260. #
  261. protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
  262. finally:
  263. for controller in controllers:
  264. controller.disconnect()
  265. #
  266. protocol_manager.stop()
  267. #
  268. #
  269. #
  270. def wait_for_keyboard_interrupt():
  271. try:
  272. logging.info('Press Ctrl-C to stop.')
  273. while True:
  274. time.sleep(30)
  275. #
  276. except KeyboardInterrupt:
  277. print('')
  278. #
  279. #
  280. '''
  281. if __name__ == '__main__':
  282. #
  283. logging.basicConfig(level=logging.DEBUG)
  284. logging.getLogger('stem').setLevel(logging.WARNING)
  285. #
  286. parser = argparse.ArgumentParser(description='Test the network throughput.')
  287. parser.add_argument('num_bytes', type=useful.parse_bytes,
  288. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  289. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  290. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  291. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  292. parser.add_argument('--wait-range', type=int, default=0,
  293. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  294. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  295. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  296. args = parser.parse_args()
  297. #
  298. experiment_time = time.time()
  299. #
  300. if args.debugging != 'only-chutney':
  301. save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  302. os.mkdir(save_data_path)
  303. else:
  304. save_data_path = None
  305. #
  306. if args.debugging is not None:
  307. measureme_log_path = None
  308. else:
  309. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  310. os.mkdir(measureme_log_path)
  311. #
  312. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client,
  313. args.buffer_len, args.wait_range, args.measureme)
  314. #
  315. start_time = time.time()
  316. #
  317. if args.debugging == 'no-chutney':
  318. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  319. elif args.debugging == 'only-chutney':
  320. experiment.start_chutney(wait_for_keyboard_interrupt)
  321. else:
  322. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  323. #
  324. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  325. #
  326. '''
  327. if __name__ == '__main__':
  328. #
  329. logging.basicConfig(level=logging.DEBUG)
  330. logging.getLogger('stem').setLevel(logging.WARNING)
  331. #
  332. parser = argparse.ArgumentParser(description='Test the network throughput.')
  333. parser.add_argument('num_bytes', type=useful.parse_bytes,
  334. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  335. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  336. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  337. parser.add_argument('--wait-range', type=int, default=0,
  338. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  339. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  340. parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
  341. args = parser.parse_args()
  342. #
  343. num_clients = 4
  344. num_guards = 6
  345. num_authorities = 2 # will also act as a guard
  346. num_exits = 1
  347. #
  348. experiment_time = time.time()
  349. #
  350. if args.debugging != 'only-chutney':
  351. base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  352. os.mkdir(base_save_data_path)
  353. else:
  354. base_save_data_path = None
  355. #
  356. if args.debugging is not None:
  357. measureme_log_path = None
  358. else:
  359. measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
  360. #
  361. start_time = time.time()
  362. all_data_paths = []
  363. #
  364. #for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]:
  365. for num_streams_per_client in [1, 2, 3, 4, 5]:
  366. #for num_streams_per_client in [6,7,8]:
  367. logging.info('Starting with {} streams per client'.format(num_streams_per_client))
  368. save_data_path = None
  369. #
  370. if base_save_data_path is not None:
  371. save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients))
  372. all_data_paths.append(save_data_path)
  373. os.mkdir(save_data_path)
  374. #
  375. if measureme_log_path is not None:
  376. os.mkdir(measureme_log_path)
  377. #
  378. experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
  379. num_clients, num_guards, num_authorities, num_exits,
  380. args.buffer_len, args.wait_range, args.measureme)
  381. #
  382. if args.debugging == 'no-chutney':
  383. experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
  384. elif args.debugging == 'only-chutney':
  385. experiment.start_chutney(wait_for_keyboard_interrupt)
  386. else:
  387. experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
  388. #
  389. #
  390. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  391. #
  392. import parse_measureme_logs
  393. for path in all_data_paths:
  394. logging.info('Parsing logs for {}'.format(path))
  395. measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')]
  396. #
  397. logs = []
  398. for name in measureme_tor_logs:
  399. with open(name, 'r') as f:
  400. logs.append(parse_measureme_logs.read_log(f))
  401. #
  402. #
  403. streams = parse_measureme_logs.get_streams_from_logs(logs)
  404. #
  405. with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f:
  406. pickle.dump(streams, f, protocol=4)
  407. #
  408. #
  409. #