relay_working_experiment.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. #!/usr/bin/python3
  2. #
  3. import argparse
  4. import shutil
  5. import logging
  6. import random
  7. import os
  8. import subprocess
  9. import multiprocessing
  10. import threading
  11. import time
  12. import json
  13. import gzip
  14. import pickle
  15. import tempfile
  16. import collections
  17. import gc
  18. #
  19. import stem.control
  20. import stem.descriptor.remote
  21. import stem.process
  22. #
  23. import numa
  24. import log_system_usage
  25. import chutney_manager
  26. import throughput_server
  27. import experiment_client
  28. import experiment
  29. import useful
  30. #
  31. #remote_name = 'sengler-rpi'
  32. #remote_name = 'cluck2'
  33. #remote_name = None
  34. #
  35. class CustomExperiment(experiment.Experiment):
  36. def __init__(self, use_helgrind, target_tor, num_additional_eventloops, remote_name, *args, **kwargs):
  37. self.use_helgrind = use_helgrind
  38. self.target_tor = target_tor
  39. self.num_additional_eventloops = num_additional_eventloops
  40. self.remote_name = remote_name
  41. super().__init__(*args, **kwargs)
  42. #
  43. self.chutney_path = '/home/sengler/code/working/chutney'
  44. #self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
  45. self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller'
  46. #self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller-kist-changes'
  47. #self.tor_path = '/home/sengler/code/working/tor'
  48. #
  49. def configure_chutney(self):
  50. #self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
  51. # [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
  52. # [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
  53. # [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
  54. #
  55. #target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
  56. #target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
  57. if self.remote_name == 'cluck2':
  58. local_ip = '172.19.156.16'
  59. target_ip = '172.19.156.136'
  60. #local_ip = '129.97.119.196'
  61. #target_ip = '129.97.119.226'
  62. target_hostname = 'cluck2'
  63. target_dir = '/tmp/chutney-net'
  64. elif self.remote_name == 'sengler-rpi':
  65. local_ip = '129.97.119.196'
  66. target_ip = '129.97.169.9'
  67. target_hostname = 'sengler-rpi'
  68. target_dir = '/tmp/chutney-net'
  69. elif self.remote_name is None:
  70. local_ip = None
  71. target_ip = None
  72. target_hostname = None
  73. target_dir = None
  74. else:
  75. raise Exception('hostname not known')
  76. #
  77. target_optional_args = {}
  78. if self.target_tor is not None:
  79. target_optional_args['tor'] = self.target_tor
  80. if self.use_helgrind:
  81. target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
  82. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
  83. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
  84. #target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/home/sengler/build/lib/libtcmalloc_and_profiler.so'}
  85. #target_optional_args['add_environ_vars'] = {'EVENT_NOEPOLL': '', 'EVENT_SHOW_METHOD': ''}
  86. if target_ip is not None:
  87. target_optional_args['ip'] = target_ip
  88. if target_hostname is not None:
  89. target_optional_args['remote_hostname'] = target_hostname
  90. if target_dir is not None:
  91. target_optional_args['remote_net_dir'] = target_dir
  92. target_optional_args['num_cpus'] = 2 # make sure it can process onion skins fast enough, and keep it consistent between computers
  93. # tor actually uses one more worker thread than what you ask for
  94. target_optional_args['num_additional_eventloops'] = self.num_additional_eventloops
  95. target_optional_args['dircache'] = False
  96. # the voting interval is 40 seconds which puts an unrealistic workload on the target, so we disable it
  97. target_cpu_prof = False #True
  98. target_daemon = False
  99. target_log_throughput = True
  100. target_logs = ['notice']
  101. #other_logs = ['info', 'notice']
  102. other_logs = ['notice']
  103. #if self.use_helgrind:
  104. # valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
  105. #else:
  106. # valgrind_settings = None
  107. #
  108. self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=other_logs) for _ in range(self.num_authorities)] + \
  109. [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=other_logs) for _ in range(self.num_guards)] + \
  110. [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl', log_throughput=target_log_throughput,
  111. daemon=target_daemon, log_files=target_logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
  112. [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=other_logs) for _ in range(self.num_exits)] + \
  113. [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=other_logs) for _ in range(self.num_clients)]
  114. #
  115. for node in self.nodes:
  116. if not 'num_cpus' in node.options:
  117. node.options['num_cpus'] = 2
  118. #
  119. if not 'ip' in node.options and local_ip is not None:
  120. node.options['ip'] = local_ip
  121. #
  122. #
  123. #numa_remaining = numa.get_numa_overview()
  124. #for (node, index) in zip(self.nodes, range(len(self.nodes))):
  125. # num_cpus = node.options['num_cpus']
  126. # if num_cpus%2 != 0:
  127. # num_cpus += 1
  128. # #
  129. # if node.options['tag'] == 'target':
  130. # num_cpus = max(num_cpus, 6)
  131. # #
  132. # #if node.options['tag'] != 'target':
  133. # # (numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
  134. # # node.options['numa_settings'] = (numa_node, processors)
  135. # #
  136. ##
  137. self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
  138. # TODO: ^^ improve this
  139. #
  140. def start_remote_logging(self, next_action=None):
  141. if self.remote_name is None:
  142. # running locally
  143. if next_action is not None:
  144. next_action()
  145. #
  146. return
  147. #
  148. local_script_path = 'log_system_usage.py'
  149. remote_script_path = '/tmp/log_system_usage.py'
  150. remote_save_path = '/tmp/cpu-usage.pickle.gz'
  151. local_save_path = os.path.join(self.save_data_path, 'remote-cpu-usage.pickle.gz')
  152. #
  153. tor_pids = subprocess.check_output(['ssh', self.remote_name, 'pgrep tor']).decode('utf-8').split()
  154. tor_pids = [pid for pid in tor_pids]
  155. logging.info('Logging the following pids on {}: {}'.format(self.remote_name, tor_pids))
  156. command = 'python3 {} --interval 0.2 --pids {} {}'.format(remote_script_path, ','.join(tor_pids), remote_save_path)
  157. #
  158. try:
  159. subprocess.check_output(['scp', local_script_path, '{}:{}'.format(self.remote_name, remote_script_path)], stderr=subprocess.STDOUT)
  160. p = subprocess.Popen(['ssh', self.remote_name, command])
  161. #
  162. time.sleep(5)
  163. # wait a few seconds to make sure it doesn't exit immediately
  164. if p.poll() != None:
  165. raise Exception('Remote CPU monitoring script exited immediately')
  166. #
  167. try:
  168. subprocess.check_output(['ssh', self.remote_name, 'sudo renice -n -10 -g "$(pgrep --full --exact "{}")"'.format(command)], stderr=subprocess.STDOUT)
  169. # need to set the niceness for the process group, not just the process in order to also apply to threads
  170. except:
  171. logging.warn('Could not set the nice value for the remote python script, ignoring...')
  172. #
  173. if next_action is not None:
  174. next_action()
  175. time.sleep(5)
  176. # wait a few seconds so that we have extra data
  177. # this may be useful if we need to do averaging
  178. #
  179. if p.poll() != None:
  180. raise Exception('Remote CPU monitoring script exited before it was supposed to')
  181. #
  182. finally:
  183. try:
  184. subprocess.check_output(['ssh', self.remote_name, 'pkill --full --exact --signal sigint \'{}\''.format(command)], stderr=subprocess.STDOUT)
  185. except:
  186. logging.warn('Could not kill remote python script')
  187. #
  188. try:
  189. p.wait(timeout=30)
  190. except subprocess.TimeoutExpired:
  191. p.kill()
  192. logging.warn('Process did not end as expected, so sent a SIGKILL')
  193. except:
  194. logging.warn('Could not kill')
  195. #
  196. try:
  197. subprocess.check_output(['scp', '{}:{}'.format(self.remote_name, remote_save_path), local_save_path], stderr=subprocess.STDOUT)
  198. except:
  199. logging.warn('Failed to get remote \'{}\' data file'.format(remote_save_path))
  200. #
  201. try:
  202. subprocess.check_output(['ssh', self.remote_name, 'rm', remote_save_path], stderr=subprocess.STDOUT)
  203. except:
  204. logging.warn('Failed to delete remote \'{}\' data file'.format(remote_save_path))
  205. #
  206. try:
  207. subprocess.check_output(['ssh', self.remote_name, 'rm', remote_script_path], stderr=subprocess.STDOUT)
  208. except:
  209. logging.warn('Failed to delete remote \'{}\' script file'.format(remote_script_path))
  210. #
  211. #
  212. #
  213. #
  214. def build_circuit_generator(consensus, server_address):
  215. fingerprints = [desc.nickname for desc in consensus]
  216. exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
  217. authority_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('a')]
  218. #
  219. target_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('target')]
  220. assert len(target_fingerprints) >= 1, 'No target relay in the consensus'
  221. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set(target_fingerprints)-set(authority_fingerprints))
  222. #
  223. assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
  224. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  225. #
  226. non_exit_fingerprints = sorted(non_exit_fingerprints)
  227. target_fingerprints = sorted(target_fingerprints)
  228. exit_fingerprints = sorted(exit_fingerprints)
  229. # try to get reproducible behavior
  230. #
  231. #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
  232. return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprints[gen_id%len(target_fingerprints)], exit_fingerprints[gen_id%len(exit_fingerprints)]]
  233. '''
  234. fingerprints = [desc.fingerprint for desc in consensus]
  235. exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
  236. #
  237. target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0]
  238. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
  239. #
  240. assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
  241. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  242. #
  243. #return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
  244. return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
  245. '''
  246. #
  247. def existing_file(path):
  248. if not os.path.isfile(path):
  249. raise argparse.ArgumentTypeError('The file path is not valid')
  250. return path
  251. #
  252. if __name__ == '__main__':
  253. #
  254. logging.basicConfig(level=logging.DEBUG)
  255. logging.getLogger('stem').setLevel(logging.WARNING)
  256. #
  257. parser = argparse.ArgumentParser(description='Test the network throughput.')
  258. parser.add_argument('num_bytes', type=useful.parse_bytes,
  259. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  260. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  261. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  262. parser.add_argument('--wait-range', type=int, default=0,
  263. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  264. parser.add_argument('--target-tor', type=existing_file, default=None,
  265. help='use a different tor binary for the target', metavar='tor-path')
  266. parser.add_argument('--helgrind', action='store_true',
  267. help='log helgrind data')
  268. args = parser.parse_args()
  269. #
  270. experiment_time = time.time()
  271. #base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
  272. base_save_data_path = os.path.join('/var/ssd-raid/sengler/data/experiments', str(int(experiment_time)))
  273. os.mkdir(base_save_data_path)
  274. #
  275. measureme_log_path = None
  276. measureme = False
  277. #
  278. start_time = time.time()
  279. #
  280. #tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'working-without':'/home/sengler/code/working/tor-without-tcmalloc/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor', 'dev-with':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'}
  281. tors = collections.OrderedDict()
  282. tors['working'] = '/home/sengler/code/working/tor/src/app/tor'
  283. tors['working-without'] = '/home/sengler/code/working/tor-without-tcmalloc/src/app/tor'
  284. tors['dev-with'] = '/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'
  285. tors['dev-without'] = '/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'
  286. hosts = ['sengler-rpi', 'cluck2']
  287. ###hosts = ['cluck2']
  288. ###hosts = ['sengler-rpi']
  289. num_repetitions = 15
  290. nums_additional_eventloops_options = [0, 1, 2, 3]
  291. #nums_additional_eventloops_options = [3, 2, 1, 0]
  292. #
  293. #tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
  294. #hosts = ['cluck2']
  295. #num_repetitions = 1
  296. #nums_additional_eventloops_options = [0, 1, 2, 3]
  297. #
  298. #tors = {'dev-debug-stall':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-debug-stall/src/app/tor'}
  299. #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-test-kist-changes/src/app/tor'}
  300. #tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
  301. #hosts = ['cluck2']
  302. #num_repetitions = 5
  303. #nums_additional_eventloops_options = [3, 2, 1, 0]
  304. #
  305. try:
  306. for repeat in range(num_repetitions):
  307. for host in hosts:
  308. for (tor_name, tor_path) in tors.items():
  309. #num_clients = 4
  310. #num_guards = 6 # number of relays (including guards)
  311. #num_authorities = 2 # will also act as a relay or guard
  312. #num_exits = 8 # will be used only as an exit
  313. #num_streams_per_client = 1
  314. #if True:
  315. # num_clients = 4
  316. # num_guards = 10 # number of relays (including guards)
  317. # num_authorities = 2 # will also act as a relay or guard
  318. # num_exits = 12 # will be used only as an exit
  319. # num_streams_per_client = 3
  320. # num_bytes = 20*(2**20)
  321. if host == 'cluck2':
  322. num_clients = 150
  323. num_guards = 30 # number of relays (including guards)
  324. num_authorities = 2 # will also act as a relay or guard
  325. num_exits = 30 # will be used only as an exit
  326. num_streams_per_client = 10
  327. num_bytes = 20*(2**20)
  328. elif host == 'sengler-rpi':
  329. num_clients = 100
  330. num_guards = 300 # number of relays (including guards)
  331. num_authorities = 3 # will also act as a relay or guard
  332. num_exits = 300 # will be used only as an exit
  333. num_streams_per_client = 6
  334. num_bytes = 5*(2**20)
  335. elif host is None:
  336. num_clients = 10
  337. num_guards = 10 # number of relays (including guards)
  338. num_authorities = 2 # will also act as a relay or guard
  339. num_exits = 12 # will be used only as an exit
  340. num_streams_per_client = 5
  341. num_bytes = 20*(2**20)
  342. else:
  343. raise Exception('host not known')
  344. #
  345. nums_additional_eventloops = [0]
  346. if tor_name == 'working' or tor_name == 'working-without':
  347. nums_additional_eventloops = nums_additional_eventloops_options
  348. #
  349. for num_additional_eventloops in nums_additional_eventloops:
  350. attempt = 0
  351. while True:
  352. attempt_str = '' if attempt == 0 else '_attempt-{}'.format(attempt)
  353. save_data_path = os.path.join(base_save_data_path, '{}_{}_{}_{}{}'.format(host, tor_name, num_additional_eventloops, repeat, attempt_str))
  354. os.mkdir(save_data_path)
  355. logging.info('Starting on {} using {}-{} ({}), repeat {}, attempt {}'.format(host, tor_name, num_additional_eventloops, tor_path, repeat, attempt))
  356. #
  357. #exp = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
  358. exp = CustomExperiment(args.helgrind, tor_path, num_additional_eventloops, host, save_data_path,
  359. measureme_log_path, num_bytes,
  360. num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
  361. build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
  362. #
  363. def sleep_then_run(duration, func):
  364. logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
  365. time.sleep(duration)
  366. logging.info('Done sleeping')
  367. return func()
  368. #
  369. #import subprocess
  370. #p = subprocess.Popen(['ssh', '-t', 'sengler-rpi', 'python3 /tmp/log_system_usage.py /tmp/usage.gz'])
  371. #
  372. try:
  373. exp.start_chutney(lambda: exp.start_throughput_server(lambda: sleep_then_run(120, lambda: exp.start_system_logging(lambda: exp.start_remote_logging(exp.start_throughput_clients)))))
  374. except (stem.Timeout, stem.CircuitExtensionFailed, experiment.RepeatExperimentError):
  375. tries = 9
  376. attempt += 1
  377. if attempt < tries:
  378. logging.exception('Experiment run failed, trying again ({} tries remaining)'.format(tries-attempt))
  379. continue
  380. else:
  381. raise
  382. #
  383. #
  384. os.system('rm -rf /run/user/3271/chutney-net/nodes/*/diff-cache')
  385. os.system('rm -rf /run/user/3271/chutney-net/nodes/*/keys')
  386. os.system('rm -f /run/user/3271/chutney-net/nodes/*/cached-*')
  387. os.system('rm -f /run/user/3271/chutney-net/nodes/*/v3-status-votes')
  388. shutil.copytree('/run/user/3271/chutney-net/nodes', os.path.join(save_data_path, 'nodes'))
  389. os.system("ps u | grep 'tor'")
  390. os.system('rm -rf /run/user/3271/chutney-net/*')
  391. break
  392. #
  393. exp = None
  394. gc.collect()
  395. # not sure if this is actually useful, but hopefully it reduces memory usage for when we need to fork
  396. #
  397. #
  398. #
  399. #
  400. except KeyboardInterrupt:
  401. logging.info('Stopped (KeyboardInterrupt)')
  402. #
  403. logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
  404. #