Browse Source

Latest changes to support testing a single relay, and running Helgrind.

Steven Engler 4 years ago
parent
commit
0fdb096a61
5 changed files with 183 additions and 25 deletions
  1. 12 2
      Makefile
  2. 8 5
      src/chutney_manager.py
  3. 34 18
      src/experiment.py
  4. 21 0
      src/libevent.supp
  5. 108 0
      src/relay_working_experiment.py

+ 12 - 2
Makefile

@@ -5,8 +5,11 @@ PYTHON_INC=/usr/include/python3.6
 PY_BIN_FILES:=$(patsubst src/%.py,bin/%.py,$(wildcard src/*.py))
 PY_DEV_FILES:=$(patsubst src/%.py,dev/%.py,$(wildcard src/*.py))
 
-all: bin_dir $(PY_BIN_FILES) bin/accelerated_functions.so
-dev: dev_dir $(PY_DEV_FILES) dev/accelerated_functions.so
+VALGRIND_SUPPRESSION_BIN_FILES:=$(patsubst src/%.supp,bin/%.supp,$(wildcard src/*.supp))
+VALGRIND_SUPPRESSION_DEV_FILES:=$(patsubst src/%.supp,dev/%.supp,$(wildcard src/*.supp))
+
+all: bin_dir $(PY_BIN_FILES) bin/accelerated_functions.so $(VALGRIND_SUPPRESSION_BIN_FILES)
+dev: dev_dir $(PY_DEV_FILES) dev/accelerated_functions.so $(VALGRIND_SUPPRESSION_DEV_FILES)
 
 clean:
 	@if [ -d bin ]; then rm -r bin; fi
@@ -23,6 +26,9 @@ bin/%.so: src/%.c
 bin/%.py: src/%.py
 	@cp $< $@
 
+bin/%.supp: src/%.supp
+	@cp $< $@
+
 bin_dir:
 	@mkdir -p bin
 
@@ -35,5 +41,9 @@ dev/%.py: src/%.py
 	rm -f $@
 	ln $< $@
 
+dev/%.supp: src/%.supp
+	rm -f $@
+	ln $< $@
+
 dev_dir:
 	@mkdir -p dev

+ 8 - 5
src/chutney_manager.py

@@ -5,16 +5,19 @@ import logging
 import os
 import sys
 #
-def start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=None):
+def start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=None, verification_rounds=None):
 	args = [os.path.join(chutney_path, 'tools/test-network.sh'), '--chutney-path', chutney_path,
 	        '--tor-path', tor_path, '--stop-time', '-1', '--network', network_file]
 	if controlling_pid is not None:
 		args.extend(['--controlling-pid', str(controlling_pid)])
 	#
+	if verification_rounds is not None:
+		args.extend(['--rounds', str(verification_rounds)])
+	#
 	try:
-		subprocess.check_output(args, stderr=subprocess.STDOUT)
+		return subprocess.check_output(args, stderr=subprocess.STDOUT).decode(sys.stdout.encoding)
 	except subprocess.CalledProcessError as e:
-		#logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
+		logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
 		raise
 	#
 #
@@ -28,12 +31,12 @@ def stop_chutney_network(chutney_path, network_file):
 	#
 #
 class ChutneyNetwork:
-	def __init__(self, chutney_path, tor_path, network_file, controlling_pid=None):
+	def __init__(self, chutney_path, tor_path, network_file, controlling_pid=None, verification_rounds=None):
 		self.chutney_path = chutney_path
 		self.network_file = network_file
 		#
 		try:
-			start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=controlling_pid)
+			self.startup_output = start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=controlling_pid, verification_rounds=verification_rounds)
 		except:
 			self.stop()
 			raise

+ 34 - 18
src/experiment.py

@@ -34,8 +34,8 @@ class DummyEnterExit:
 #
 class Experiment:
 	def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
-	             num_clients, num_guards, num_authorities, num_exits,
-	             buffer_len=None, wait_range=None, measureme=False):
+	             num_clients, num_guards, num_authorities, num_exits, circuit_generator_builder,
+	             buffer_len=None, wait_range=None, measureme=False, test_network=True):
 		self.save_data_path = save_data_path
 		self.measureme_log_path = measureme_log_path
 		self.num_bytes = num_bytes
@@ -44,9 +44,11 @@ class Experiment:
 		self.num_guards = num_guards
 		self.num_authorities = num_authorities
 		self.num_exits = num_exits
+		self.circuit_generator_builder = circuit_generator_builder
 		self.buffer_len = buffer_len
 		self.wait_range = wait_range
 		self.measureme = measureme
+		self.test_network = test_network
 		#
 		self.chutney_path = '/home/sengler/code/measureme/chutney'
 		self.tor_path = '/home/sengler/code/measureme/tor'
@@ -121,17 +123,19 @@ class Experiment:
 			#
 			try:
 				chutney_network = None
-				num_attemtps = 0
+				num_attempts = 0
 				while chutney_network is None:
 					try:
-						num_attemtps += 1
-						chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file)
+						num_attempts += 1
+						verification_rounds = 1 if self.test_network else 0
+						chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file, verification_rounds=verification_rounds)
 					except KeyboardInterrupt:
 						raise
 					except:
 						logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
 					#
 				#
+				logging.debug('Last 40 lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-40:]))
 				#with chutney_network as net:
 				with chutney_network:
 					nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
@@ -164,7 +168,14 @@ class Experiment:
 	def start_throughput_server(self, next_action=None):
 		stop_event = multiprocessing.Event()
 		server = throughput_server.ThroughputServer(self.server_address, stop_event)
-		p = multiprocessing.Process(target=server.run)
+		def server_run_wrapper():
+			try:
+				server.run()
+			except KeyboardInterrupt:
+				logging.info('Stopping server (KeyboardInterrupt)')
+			#
+		#
+		p = multiprocessing.Process(target=server_run_wrapper)
 		p.start()
 		#
 		try:
@@ -176,14 +187,16 @@ class Experiment:
 		#
 		p.join()
 		#
-		with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
-			pickle.dump([x['results'] for x in server.results], f, protocol=4)
+		if self.save_data_path is not None:
+			with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
+				pickle.dump([x['results'] for x in server.results], f, protocol=4)
+			#
 		#
 	#
 	def start_system_logging(self, next_action=None):
 		stop_cpu_logging_event = multiprocessing.Event()
 		p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
-		                            args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event))
+		                            args=(os.path.join(self.save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event))
 		p.start()
 		#
 		try:
@@ -202,14 +215,7 @@ class Experiment:
 		except Exception as e:
 			raise Exception('Unable to retrieve the consensus') from e
 		#
-		fingerprints = experiment_client.get_fingerprints(consensus)
-		exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, self.server_address)
-		non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
-		#
-		assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
-		assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
-		#
-		circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+		circuit_generator = self.circuit_generator_builder(consensus, self.server_address)
 		#
 		proxy_addresses = []
 		for control_port in self.proxy_control_ports:
@@ -277,6 +283,16 @@ def wait_for_keyboard_interrupt():
 		print('')
 	#
 #
+def build_circuit_generator(consensus, server_address):
+	fingerprints = experiment_client.get_fingerprints(consensus)
+	exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
+	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+	#
+	assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
+	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+	#
+	return lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+#
 '''
 if __name__ == '__main__':
 	#
@@ -376,7 +392,7 @@ if __name__ == '__main__':
 			os.mkdir(measureme_log_path)
 		#
 		experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
-		                        num_clients, num_guards, num_authorities, num_exits,
+		                        num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
 								args.buffer_len, args.wait_range, args.measureme)
 		#
 		if args.debugging == 'no-chutney':

+ 21 - 0
src/libevent.supp

@@ -0,0 +1,21 @@
+{
+   Libevent_add_event_debug_mode_too_late
+   Helgrind:Race
+   fun:event_debug_note_add_
+   fun:event_add_nolock_
+   ...
+}
+{
+   Libevent_del_event_debug_mode_too_late
+   Helgrind:Race
+   fun:event_debug_note_del_
+   fun:event_del_nolock_
+   ...
+}
+{
+   Libevent_setup_event_debug_mode_too_late
+   Helgrind:Race
+   fun:event_debug_note_setup_
+   fun:event_assign
+   ...
+}

+ 108 - 0
src/relay_working_experiment.py

@@ -0,0 +1,108 @@
+#!/usr/bin/python3
+#
+import argparse
+import shutil
+import logging
+import random
+import os
+import multiprocessing
+import threading
+import time
+import json
+import gzip
+import pickle
+import tempfile
+#
+import stem.control
+import stem.descriptor.remote
+import stem.process
+#
+import numa
+import log_system_usage
+import chutney_manager
+import throughput_server
+import experiment_client
+import experiment
+import useful
+#
+class CustomExperiment(experiment.Experiment):
+	def __init__(self, *args, **kwargs):
+		super().__init__(*args, **kwargs)
+		#
+		self.chutney_path = '/home/sengler/code/measureme/chutney'
+		self.tor_path = '/home/sengler/code/parallel/tor-single'
+	#
+	def configure_chutney(self):
+		#self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
+		#        [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
+		#        [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
+		#        [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
+		#
+		new_tor_path = '/home/sengler/code/parallel/tor-parallel/src/app/tor'
+		valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
+		#
+		self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
+		        [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
+		        [chutney_manager.Node(tag='target', tor=new_tor_path, valgrind_settings=valgrind_settings, relay=1, torrc='relay-non-exit.tmpl')] + \
+		        [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
+		        [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
+		#
+		self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
+		# TODO: ^^ improve this
+	#
+#
+def build_circuit_generator(consensus, server_address):
+	fingerprints = [desc.nickname for desc in consensus]
+	exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
+	#
+	target_fingerprint = [desc.nickname for desc in consensus if desc.nickname.endswith('target')][0]
+	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
+	#
+	assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
+	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+	#
+	return lambda: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
+#
+if __name__ == '__main__':
+	#
+	logging.basicConfig(level=logging.DEBUG)
+	logging.getLogger('stem').setLevel(logging.WARNING)
+	#
+	parser = argparse.ArgumentParser(description='Test the network throughput.')
+	parser.add_argument('num_bytes', type=useful.parse_bytes,
+	                    help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
+	parser.add_argument('--buffer-len', type=useful.parse_bytes,
+	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
+	parser.add_argument('--wait-range', type=int, default=0,
+	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
+	args = parser.parse_args()
+	#
+	num_clients = 2
+	num_guards = 2 # number of relays (including guards)
+	num_authorities = 2 # will also act as a relay or guard
+	num_exits = 3 # will be used only as an exit
+	#
+	experiment_time = time.time()
+	#
+	save_data_path = None
+	measureme_log_path = None
+	measureme = False
+	#
+	start_time = time.time()
+	#
+	num_streams_per_client = 1
+	logging.info('Starting with {} streams per client'.format(num_streams_per_client))
+	#
+	experiment = CustomExperiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
+	                              num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
+	                              args.buffer_len, args.wait_range, measureme, test_network=False)
+	#
+	def sleep_then_run(duration, func):
+		logging.info('Sleeping for {} seconds before running {}'.format(duration, func))
+		time.sleep(duration)
+		return func()
+	#
+	experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
+	#
+	logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
+#