relay_working_experiment.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import subprocess
  9. import multiprocessing
  10. import threading
  11. import time
  12. import json
  13. import gzip
  14. import pickle
  15. import tempfile
  16. #
  17. import stem.control
  18. import stem.descriptor.remote
  19. import stem.process
  20. #
  21. import numa
  22. import log_system_usage
  23. import chutney_manager
  24. import throughput_server
  25. import experiment_client
  26. import experiment
  27. import useful
  28. #
  29. #remote_name = 'sengler-rpi'
  30. #remote_name = 'cluck2'
  31. #remote_name = None
  32. #
  33. class CustomExperiment(experiment.Experiment):
  34. def __init__(self, use_helgrind, target_tor, num_additional_eventloops, remote_name, *args, **kwargs):
  35. self.use_helgrind = use_helgrind
  36. self.target_tor = target_tor
  37. self.num_additional_eventloops = num_additional_eventloops
  38. self.remote_name = remote_name
  39. super().__init__(*args, **kwargs)
  40. #
  41. self.chutney_path = '/home/sengler/code/working/chutney'
  42. #self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
  43. self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller'
  44. #self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller-kist-changes'
  45. #self.tor_path = '/home/sengler/code/working/tor'
  46. #
  47. def configure_chutney(self):
  48. #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  49. # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  50. # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  51. # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  52. #
  53. #target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
  54. #target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
  55. if self.remote_name == 'cluck2':
  56. local_ip = '172.19.156.16'
  57. target_ip = '172.19.156.136'
  58. #local_ip = '129.97.119.196'
  59. #target_ip = '129.97.119.226'
  60. target_hostname = 'cluck2'
  61. elif self.remote_name == 'sengler-rpi':
  62. local_ip = '129.97.119.196'
  63. target_ip = '129.97.169.9'
  64. target_hostname = 'sengler-rpi'
  65. elif self.remote_name is None:
  66. local_ip = None
  67. target_ip = None
  68. target_hostname = None
  69. else:
  70. raise Exception('hostname not known')
  71. #
  72. target_optional_args = {}
  73. if self.target_tor is not None:
  74. target_optional_args['tor'] = self.target_tor
  75. if self.use_helgrind:
  76. target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
  77. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
  78. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
  79. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/home/sengler/build/lib/libtcmalloc_and_profiler.so'}
  80. #target_optional_args['add_environ_vars'] = {'EVENT_NOEPOLL': '', 'EVENT_SHOW_METHOD': ''}
  81. if target_ip is not None:
  82. target_optional_args['ip'] = target_ip
  83. if target_hostname is not None:
  84. target_optional_args['remote_hostname'] = target_hostname
  85. target_optional_args['num_cpus'] = 4 # make sure it can process onion skins fast enough, and keep it consistent between computers
  86. target_optional_args['num_additional_eventloops'] = self.num_additional_eventloops
  87. target_cpu_prof = False #True
  88. target_daemon = False
  89. target_log_throughput = True
  90. target_logs = ['notice']
  91. #other_logs = ['info', 'notice']
  92. other_logs = ['notice']
  93. #if self.use_helgrind:
  94. # valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
  95. #else:
  96. # valgrind_settings = None
  97. #
  98. self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=other_logs) for _ in range(self.num_authorities)] + \
  99. [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=other_logs) for _ in range(self.num_guards)] + \
  100. [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl', log_throughput=target_log_throughput,
  101. daemon=target_daemon, log_files=target_logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
  102. [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=other_logs) for _ in range(self.num_exits)] + \
  103. [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=other_logs) for _ in range(self.num_clients)]
  104. #
  105. for node in self.nodes:
  106. if not 'num_cpus' in node.options:
  107. node.options['num_cpus'] = 2
  108. #
  109. if not 'ip' in node.options and local_ip is not None:
  110. node.options['ip'] = local_ip
  111. #
  112. #
  113. #numa_remaining = numa.get_numa_overview()
  114. #for (node, index) in zip(self.nodes, range(len(self.nodes))):
  115. # num_cpus = node.options['num_cpus']
  116. # if num_cpus%2 != 0:
  117. # num_cpus += 1
  118. # #
  119. # if node.options['tag'] == 'target':
  120. # num_cpus = max(num_cpus, 6)
  121. # #
  122. # #if node.options['tag'] != 'target':
  123. # # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  124. # # node.options['numa_settings'] = (numa_node, processors)
  125. # #
  126. ##
  127. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  128. # TODO: ^^ improve this
  129. #
  130. def start_remote_logging(self, next_action=None):
  131. if self.remote_name is None:
  132. # running locally
  133. if next_action is not None:
  134. next_action()
  135. #
  136. return
  137. #
  138. local_script_path = 'log_system_usage.py'
  139. remote_script_path = '/tmp/log_system_usage.py'
  140. remote_save_path = '/tmp/cpu-usage.pickle.gz'
  141. local_save_path = os.path.join(self.save_data_path, 'remote-cpu-usage.pickle.gz')
  142. command = 'python3 {} 0.1 {}'.format(remote_script_path, remote_save_path)
  143. #
  144. try:
  145. subprocess.check_output(['scp', local_script_path, '{}:{}'.format(self.remote_name, remote_script_path)], stderr=subprocess.STDOUT)
  146. p = subprocess.Popen(['ssh', self.remote_name, command])
  147. #
  148. time.sleep(5)
  149. # wait a few seconds to make sure it doesn't exit immediately
  150. if p.poll() != None:
  151. raise Exception('Remote CPU monitoring script exited immediately')
  152. #
  153. if next_action is not None:
  154. next_action()
  155. #
  156. if p.poll() != None:
  157. raise Exception('Remote CPU monitoring script exited before it was supposed to')
  158. #
  159. finally:
  160. try:
  161. subprocess.check_output(['ssh', self.remote_name, 'pkill --full --signal sigint \'{}\''.format(command)], stderr=subprocess.STDOUT)
  162. except:
  163. logging.warn('Could not kill remote python script')
  164. #
  165. try:
  166. p.wait(timeout=30)
  167. except subprocess.TimeoutExpired:
  168. p.kill()
  169. logging.warn('Process did not end as expected, so sent a SIGKILL')
  170. except:
  171. logging.warn('Could not kill')
  172. #
  173. try:
  174. subprocess.check_output(['scp', '{}:{}'.format(self.remote_name, remote_save_path), local_save_path], stderr=subprocess.STDOUT)
  175. except:
  176. logging.warn('Failed to get remote \'{}\' data file'.format(remote_save_path))
  177. #
  178. try:
  179. subprocess.check_output(['ssh', self.remote_name, 'rm', remote_save_path], stderr=subprocess.STDOUT)
  180. except:
  181. logging.warn('Failed to delete remote \'{}\' data file'.format(remote_save_path))
  182. #
  183. try:
  184. subprocess.check_output(['ssh', self.remote_name, 'rm', remote_script_path], stderr=subprocess.STDOUT)
  185. except:
  186. logging.warn('Failed to delete remote \'{}\' script file'.format(remote_script_path))
  187. #
  188. #
  189. #
  190. #
  191. def build_circuit_generator(consensus, server_address):
  192. fingerprints = [desc.nickname for desc in consensus]
  193. exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
  194. #
  195. target_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('target')]
  196. assert len(target_fingerprints) >= 1, 'No target relay in the consensus'
  197. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set(target_fingerprints))
  198. #
  199. assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
  200. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  201. #
  202. non_exit_fingerprints = sorted(non_exit_fingerprints)
  203. target_fingerprints = sorted(target_fingerprints)
  204. exit_fingerprints = sorted(exit_fingerprints)
  205. # try to get reproducible behavior
  206. #
  207. #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
  208. return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprints[gen_id%len(target_fingerprints)], exit_fingerprints[gen_id%len(exit_fingerprints)]]
  209. '''
  210. fingerprints = [desc.fingerprint for desc in consensus]
  211. exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
  212. #
  213. target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0]
  214. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
  215. #
  216. assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
  217. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  218. #
  219. #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
  220. return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
  221. '''
  222. #
  223. def existing_file(path):
  224. if not os.path.isfile(path):
  225. raise argparse.ArgumentTypeError('The file path is not valid')
  226. return path
  227. #
  228. if __name__ == '__main__':
  229. #
  230. logging.basicConfig(level=logging.DEBUG)
  231. logging.getLogger('stem').setLevel(logging.WARNING)
  232. #
  233. parser = argparse.ArgumentParser(description='Test the network throughput.')
  234. parser.add_argument('num_bytes', type=useful.parse_bytes,
  235. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  236. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  237. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  238. parser.add_argument('--wait-range', type=int, default=0,
  239. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  240. parser.add_argument('--target-tor', type=existing_file, default=None,
  241. help='use a different tor binary for the target', metavar='tor-path')
  242. parser.add_argument('--helgrind', action='store_true',
  243. help='log helgrind data')
  244. args = parser.parse_args()
  245. #
  246. experiment_time = time.time()
  247. #base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  248. base_save_data_path = os.path.join('/var/ssd-raid/sengler/data/experiments', str(int(experiment_time)))
  249. os.mkdir(base_save_data_path)
  250. #
  251. measureme_log_path = None
  252. measureme = False
  253. #
  254. start_time = time.time()
  255. #
  256. tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'working-without':'/home/sengler/code/working/tor-without-tcmalloc/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor', 'dev-with':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'}
  257. hosts = ['sengler-rpi', 'cluck2']
  258. ###hosts = ['cluck2']
  259. ###hosts = ['sengler-rpi']
  260. num_repetitions = 15
  261. nums_additional_eventloops_options = [0, 1, 2, 3]
  262. #
  263. #tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
  264. #hosts = ['cluck2']
  265. #num_repetitions = 1
  266. #nums_additional_eventloops_options = [0, 1, 2, 3]
  267. #
  268. #tors = {'dev-debug-stall':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-debug-stall/src/app/tor'}
  269. #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-test-kist-changes/src/app/tor'}
  270. #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
  271. #hosts = ['cluck2']
  272. #num_repetitions = 5
  273. #nums_additional_eventloops_options = [3, 2, 1, 0]
  274. #
  275. try:
  276. for repeat in range(num_repetitions):
  277. for host in hosts:
  278. for (tor_name, tor_path) in tors.items():
  279. #num_clients = 4
  280. #num_guards = 6 # number of relays (including guards)
  281. #num_authorities = 2 # will also act as a relay or guard
  282. #num_exits = 8 # will be used only as an exit
  283. #num_streams_per_client = 1
  284. #if True:
  285. # num_clients = 4
  286. # num_guards = 10 # number of relays (including guards)
  287. # num_authorities = 2 # will also act as a relay or guard
  288. # num_exits = 12 # will be used only as an exit
  289. # num_streams_per_client = 3
  290. # num_bytes = 20*(2**20)
  291. if host == 'cluck2':
  292. num_clients = 150
  293. num_guards = 58 # number of relays (including guards)
  294. num_authorities = 2 # will also act as a relay or guard
  295. num_exits = 60 # will be used only as an exit
  296. num_streams_per_client = 10
  297. num_bytes = 20*(2**20)
  298. elif host == 'sengler-rpi':
  299. num_clients = 30
  300. num_guards = 58 # number of relays (including guards)
  301. num_authorities = 2 # will also act as a relay or guard
  302. num_exits = 60 # will be used only as an exit
  303. num_streams_per_client = 8
  304. num_bytes = 10*(2**20)
  305. elif host is None:
  306. num_clients = 10
  307. num_guards = 10 # number of relays (including guards)
  308. num_authorities = 2 # will also act as a relay or guard
  309. num_exits = 12 # will be used only as an exit
  310. num_streams_per_client = 5
  311. num_bytes = 20*(2**20)
  312. else:
  313. raise Exception('host not known')
  314. #
  315. nums_additional_eventloops = [0]
  316. if tor_name == 'working' or tor_name == 'working-without':
  317. nums_additional_eventloops = nums_additional_eventloops_options
  318. #
  319. for num_additional_eventloops in nums_additional_eventloops:
  320. attempt = 0
  321. while True:
  322. attempt_str = '' if attempt == 0 else '_attempt-{}'.format(attempt)
  323. save_data_path = os.path.join(base_save_data_path, '{}_{}_{}_{}{}'.format(host, tor_name, num_additional_eventloops, repeat, attempt_str))
  324. os.mkdir(save_data_path)
  325. logging.info('Starting on {} using {}-{} ({}), repeat {}, attempt {}'.format(host, tor_name, num_additional_eventloops, tor_path, repeat, attempt))
  326. #
  327. #experiment = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
  328. experiment = CustomExperiment(args.helgrind, tor_path, num_additional_eventloops, host, save_data_path,
  329. measureme_log_path, num_bytes,
  330. num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
  331. build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
  332. #
  333. def sleep_then_run(duration, func):
  334. logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
  335. time.sleep(duration)
  336. logging.info('Done sleeping')
  337. return func()
  338. #
  339. #import subprocess
  340. #p = subprocess.Popen(['ssh', '-t', 'sengler-rpi', 'python3 /tmp/log_system_usage.py /tmp/usage.gz'])
  341. #
  342. try:
  343. experiment.start_system_logging(lambda: experiment.start_remote_logging(lambda: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))))
  344. except (stem.Timeout, stem.CircuitExtensionFailed):
  345. tries = 5
  346. attempt += 1
  347. if attempt < tries:
  348. logging.exception('Experiment run failed, trying again ({} tries remaining)'.format(tries-attempt))
  349. continue
  350. else:
  351. raise
  352. #
  353. #
  354. shutil.copytree('/tmp/chutney-net/nodes', os.path.join(save_data_path, 'nodes'))
  355. os.system("ps u | grep 'tor'")
  356. os.system("rm -rf /tmp/chutney-net/*")
  357. break
  358. #
  359. #
  360. #
  361. #
  362. #
  363. except KeyboardInterrupt:
  364. logging.info('Stopped (KeyboardInterrupt)')
  365. #
  366. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  367. #