Browse Source

temp commit, changes before supporting multiple servers

Steven Engler 4 years ago
parent
commit
5f041240b5
6 changed files with 196 additions and 42 deletions
  1. 2 1
      Makefile
  2. 5 1
      src/accelerated_functions.c
  3. 21 4
      src/experiment.py
  4. 66 16
      src/experiment_client.py
  5. 98 20
      src/relay_working_experiment.py
  6. 4 0
      src/throughput_server.py

+ 2 - 1
Makefile

@@ -1,6 +1,7 @@
 CC=gcc
 CFLAGS=-O3 -std=c99 -D_DEFAULT_SOURCE
-PYTHON_INC=/usr/include/python3.6
+#PYTHON_INC=/usr/include/python3.6
+PYTHON_INC=/usr/include/python3.5
 
 PY_BIN_FILES:=$(patsubst src/%.py,bin/%.py,$(wildcard src/*.py))
 PY_DEV_FILES:=$(patsubst src/%.py,dev/%.py,$(wildcard src/*.py))

+ 5 - 1
src/accelerated_functions.c

@@ -126,7 +126,7 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 			free(buffer);
 			return -1;
 		}else if(rc == 0){
-			printf("Here2\n");
+			printf("Call to poll() timed out (bytes_read=%ld)\n", bytes_read);
 			free(buffer);
 			return -1;
 		}
@@ -146,6 +146,10 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 			printf("Here4\n");
 			free(buffer);
 			return -1;
+		}else if(n == 0){
+			// the socket was closed gracefully, but before we finished reading all the data
+			free(buffer);
+			return -1;
 		}
 		//
 		if(n > 0 && bytes_read == 0){

+ 21 - 4
src/experiment.py

@@ -135,7 +135,8 @@ class Experiment:
 						logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
 					#
 				#
-				logging.debug('Last 40 lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-40:]))
+				num_lines_to_print = 200
+				logging.debug('Last '+str(num_lines_to_print)+' lines of Chutney output:\n'+'\n'.join(chutney_network.startup_output.split('\n')[-num_lines_to_print:]))
 				#with chutney_network as net:
 				with chutney_network:
 					nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
@@ -187,10 +188,24 @@ class Experiment:
 		#
 		p.join()
 		#
+		results = [x['results'] for x in server.results]
+		#
 		if self.save_data_path is not None:
 			with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
-				pickle.dump([x['results'] for x in server.results], f, protocol=4)
+				pickle.dump(results, f, protocol=4)
+			#
+		#
+		if len(results) > 0:
+			avg_data_size = sum([x['data_size'] for x in results])/len(results)
+			avg_transfer_rate = sum([x['transfer_rate'] for x in results])/len(results)
+			time_of_first_byte = min([x['time_of_first_byte'] for x in results])
+			time_of_last_byte = max([x['time_of_last_byte'] for x in results])
+			total_transfer_rate = sum([x['data_size'] for x in results])/(time_of_last_byte-time_of_first_byte)
 			#
+			logging.info('Group size: %d/%d', len(results), len(self.proxy_control_ports)*self.num_streams_per_client)
+			logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
+			logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
+			logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
 		#
 	#
 	def start_system_logging(self, next_action=None):
@@ -227,6 +242,7 @@ class Experiment:
 		controllers = []
 		protocol_manager = experiment_client.ExperimentProtocolManager()
 		#
+		circuit_counter = 0
 		try:
 			for proxy_address in proxy_addresses:
 				controller = experiment_client.ExperimentController(proxy_address['control'])
@@ -236,7 +252,8 @@ class Experiment:
 				#
 				for _ in range(self.num_streams_per_client):
 					# make a circuit for each stream
-					controller.build_circuit(circuit_generator)
+					controller.build_circuit(circuit_generator, circuit_counter)
+					circuit_counter += 1
 					time.sleep(0.5)
 				#
 				controllers.append(controller)
@@ -291,7 +308,7 @@ def build_circuit_generator(consensus, server_address):
 	assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
 	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
 	#
-	return lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+	return lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
 #
 '''
 if __name__ == '__main__':

+ 66 - 16
src/experiment_client.py

@@ -11,6 +11,7 @@ import random
 import time
 import json
 import os
+import datetime
 #
 import basic_protocols
 import throughput_protocols
@@ -95,6 +96,10 @@ class ExperimentController:
 		self.connection.close()
 	#
 	def assign_stream(self, from_address):
+		"""
+		Should run this function before starting the protocol, and therefore before telling
+		the SOCKS proxy where we're connecting to and before the stream is created.
+		"""
 		circuit_id = self.unassigned_circuit_ids.pop(0)
 		self.assigned_streams[from_address] = circuit_id
 		return circuit_id
@@ -112,6 +117,13 @@ class ExperimentController:
 				try:
 					self.connection.attach_stream(stream.id, circuit_id)
 					#logging.debug('Attaching to circuit {}'.format(circuit_id))
+				except stem.InvalidRequest:
+					if stream.purpose != 'USER':
+						# could not attach a non-user stream, ignoring
+						pass
+					else:
+						raise
+					#
 				except stem.UnsatisfiableRequest:
 					if stream.purpose != 'USER':
 						# could not attach a non-user stream, so probably raised:
@@ -124,21 +136,32 @@ class ExperimentController:
 				#
 			#
 		except:
-			logging.exception('Error while attaching the stream (control_port={}, circuit_id={}).'.format(self.control_port, circuit_id))
+			logging.exception('Error while attaching the stream.')
 			raise
 		#
 	#
-	def build_circuit(self, circuit_generator):
+	def build_circuit(self, circuit_generator, gen_id):
 		circuit_id = None
 		#
 		while circuit_id is None:
 			try:
-				circuit = circuit_generator()
+				circuit = circuit_generator(gen_id)
 				circuit_id = self.connection.new_circuit(circuit, await_build=True)
-				logging.debug('New circuit (id={}): {}'.format(circuit_id, circuit))
-			except stem.CircuitExtensionFailed:
+				logging.debug('New circuit (id={}, controller={}): {}'.format(circuit_id, self.control_address, circuit))
+			except stem.CircuitExtensionFailed as e:
+				wait_seconds = 1
+				logging.debug('Failed circuit: {}'.format(circuit))
+				logging.warning('Circuit creation failed (CircuitExtensionFailed: {}). Retrying in {} second{}...'.format(str(e),
+				                                                                                                          wait_seconds,
+				                                                                                                          's' if wait_seconds != 1 else ''))
+				time.sleep(wait_seconds)
+			except stem.InvalidRequest as e:
+				wait_seconds = 15
 				logging.debug('Failed circuit: {}'.format(circuit))
-				logging.warning('Circuit creation failed. Retrying...')
+				logging.warning('Circuit creation failed (InvalidRequest: {}). Retrying in {} second{}...'.format(str(e),
+				                                                                                                  wait_seconds,
+				                                                                                                  's' if wait_seconds != 1 else ''))
+				time.sleep(wait_seconds)
 			#
 		#
 		self.unassigned_circuit_ids.append(circuit_id)
@@ -173,15 +196,21 @@ class ExperimentProtocolManager():
 	def _run_client(self, protocol, protocol_id):
 		had_error = False
 		try:
-			logging.debug('Starting protocol (id: {})'.format(protocol_id))
+			logging.debug('Starting client protocol (id: {})'.format(protocol_id))
 			protocol.run()
-			logging.debug('Done protocol (id: {})'.format(protocol_id))
+			logging.debug('Done client protocol (id: {})'.format(protocol_id))
+		except KeyboardInterrupt:
+			had_error = True
+			logging.info('Client protocol id: {} stopped (KeyboardInterrupt)'.format(protocol_id))
 		except:
 			had_error = True
-			logging.warning('Protocol error')
-			logging.exception('Protocol id: {} had an error'.format(protocol_id))
+			logging.warning('Client protocol error')
+			logging.exception('Client protocol id: {} had an error ({})'.format(protocol_id, datetime.datetime.now().time()))
 		finally:
 			self.global_finished_process_queue.put((protocol_id, had_error))
+			if had_error:
+				logging.warning('Client protocol with error successfully added self to global queue')
+			#
 		#
 	#
 	def start_experiment_protocol(self, protocol, protocol_id=None):
@@ -200,19 +229,40 @@ class ExperimentProtocolManager():
 		#
 		#protocol.socket.close()
 	#
-	def wait(self, finished_protocol_cb=None):
+	def wait(self, finished_protocol_cb=None, kill_timeout=None):
+		timed_out = False
+		#
 		while len(self.running_processes) > 0:
 			logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes)))
 			#
-			(protocol_id, had_error) = self.local_finished_process_queue.get()
-			p = self.running_processes[protocol_id]
+			if not timed_out:
+				try:
+					(protocol_id, had_error) = self.local_finished_process_queue.get(timeout=kill_timeout)
+					p = self.running_processes[protocol_id]
+				except queue.Empty:
+					if kill_timeout is None:
+						raise
+					#
+					logging.warning('Timed out waiting for processes to finish, will terminate remaining processes')
+					timed_out = True
+				#
+			#
+			if timed_out:
+				(protocol_id, p) = next(iter(self.running_processes.items()))
+				# just get any process and kill it
+				had_error = True
+				p.terminate()
+				logging.debug('Terminated protocol {}'.format(protocol_id))
+			#
 			p.join()
 			self.running_processes.pop(protocol_id)
-			finished_protocol_cb(protocol_id, had_error)
+			if finished_protocol_cb is not None:
+				finished_protocol_cb(protocol_id, had_error)
+			#
 		#
 	#
 	def stop(self):
-		self.wait()
+		self.wait(kill_timeout=1.5)
 		self.queue_getter.stop()
 		self.queue_getter.join()
 		self.stopped = True
@@ -283,7 +333,7 @@ if __name__ == '__main__':
 	assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
 	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
 	#
-	circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+	circuit_generator = lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
 	#
 	proxy_addresses = []
 	for control_port in args.proxy_control_ports:

+ 98 - 20
src/relay_working_experiment.py

@@ -26,11 +26,13 @@ import experiment
 import useful
 #
 class CustomExperiment(experiment.Experiment):
-	def __init__(self, *args, **kwargs):
+	def __init__(self, use_helgrind, target_tor, *args, **kwargs):
+		self.use_helgrind = use_helgrind
+		self.target_tor = target_tor
 		super().__init__(*args, **kwargs)
 		#
-		self.chutney_path = '/home/sengler/code/measureme/chutney'
-		self.tor_path = '/home/sengler/code/parallel/tor-single'
+		self.chutney_path = '/home/sengler/code/working/chutney'
+		self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
 	#
 	def configure_chutney(self):
 		#self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
@@ -38,14 +40,58 @@ class CustomExperiment(experiment.Experiment):
 		#        [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
 		#        [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
 		#
-		new_tor_path = '/home/sengler/code/parallel/tor-parallel/src/app/tor'
-		valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
+		#target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
+		#target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
+
+		local_ip = '172.19.156.16'
+		target_ip = '172.19.156.136'
+		target_hostname = 'cluck2'
+
+		target_optional_args = {}
+		if self.target_tor is not None:
+			target_optional_args['tor'] = self.target_tor
+		if self.use_helgrind:
+			target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
+
+		#target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
+		target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
+		target_optional_args['ip'] = target_ip
+		target_optional_args['remote_hostname'] = target_hostname
+		target_cpu_prof = False #True
+		target_daemon = False
+		logs = ['notice']
+		#if self.use_helgrind:
+		#	valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
+		#else:
+		#	valgrind_settings = None
 		#
-		self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
-		        [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
-		        [chutney_manager.Node(tag='target', tor=new_tor_path, valgrind_settings=valgrind_settings, relay=1, torrc='relay-non-exit.tmpl')] + \
-		        [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
-		        [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl') for _ in range(self.num_clients)]
+
+		self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=logs) for _ in range(self.num_authorities)] + \
+		    [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=logs) for _ in range(self.num_guards)] + \
+		    [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl',
+		      daemon=target_daemon, log_files=logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
+		    [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=logs) for _ in range(self.num_exits)] + \
+		    [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=logs) for _ in range(self.num_clients)]
+		#
+		for node in self.nodes:
+			if not 'num_cpus' in node.options:
+				node.options['num_cpus'] = 2
+			#
+			if not 'ip' in node.options:
+				node.options['ip'] = local_ip
+			#
+		#
+		numa_remaining = numa.get_numa_overview()
+		for (node, index) in zip(self.nodes, range(len(self.nodes))):
+			num_cpus = node.options['num_cpus']
+			if num_cpus%2 != 0:
+				num_cpus += 1
+			#
+			if node.options['tag'] == 'target':
+				num_cpus = max(num_cpus, 6)
+			#
+			(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
+			node.options['numa_settings'] = (numa_node, processors)
 		#
 		self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
 		# TODO: ^^ improve this
@@ -61,7 +107,26 @@ def build_circuit_generator(consensus, server_address):
 	assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
 	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
 	#
-	return lambda: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
+	#return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
+	return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
+	'''
+	fingerprints = [desc.fingerprint for desc in consensus]
+	exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
+	#
+	target_fingerprint = [desc.fingerprint for desc in consensus if desc.nickname.endswith('target')][0]
+	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
+	#
+	assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
+	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+	#
+	#return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
+	return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
+	'''
+#
+def existing_file(path):
+	if not os.path.isfile(path):
+		raise argparse.ArgumentTypeError('The file path is not valid')
+	return path
 #
 if __name__ == '__main__':
 	#
@@ -75,12 +140,20 @@ if __name__ == '__main__':
 	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
 	parser.add_argument('--wait-range', type=int, default=0,
 	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
+	parser.add_argument('--target-tor', type=existing_file, default=None,
+	                    help='use a different tor binary for the target', metavar='tor-path')
+	parser.add_argument('--helgrind', action='store_true',
+	                    help='log helgrind data')
 	args = parser.parse_args()
 	#
-	num_clients = 2
-	num_guards = 2 # number of relays (including guards)
+	#num_clients = 4
+	#num_guards = 6 # number of relays (including guards)
+	#num_authorities = 2 # will also act as a relay or guard
+	#num_exits = 8 # will be used only as an exit
+	num_clients = 12
+	num_guards = 14 # number of relays (including guards)
 	num_authorities = 2 # will also act as a relay or guard
-	num_exits = 3 # will be used only as an exit
+	num_exits = 16 # will be used only as an exit
 	#
 	experiment_time = time.time()
 	#
@@ -90,19 +163,24 @@ if __name__ == '__main__':
 	#
 	start_time = time.time()
 	#
-	num_streams_per_client = 1
+	#num_streams_per_client = 1
+	num_streams_per_client = 6
 	logging.info('Starting with {} streams per client'.format(num_streams_per_client))
 	#
-	experiment = CustomExperiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
-	                              num_clients, num_guards, num_authorities, num_exits, build_circuit_generator,
-	                              args.buffer_len, args.wait_range, measureme, test_network=False)
+	experiment = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
+	                              num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
+	                              build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
 	#
 	def sleep_then_run(duration, func):
-		logging.info('Sleeping for {} seconds before running {}'.format(duration, func))
+		logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
 		time.sleep(duration)
+		logging.info('Done sleeping')
 		return func()
 	#
-	experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
+	try:
+		experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
+	except KeyboardInterrupt:
+		logging.info('Stopped (KeyboardInterrupt)')
 	#
 	logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
 #

+ 4 - 0
src/throughput_server.py

@@ -48,6 +48,10 @@ class ThroughputServer:
 		                                               use_acceleration=True)
 		try:
 			protocol.run()
+		except KeyboardInterrupt:
+			logging.info('Server protocol id: {} stopped (KeyboardInterrupt)'.format(conn_id))
+		except:
+			logging.exception('Server protocol id: {} had an error'.format(conn_id))
 		finally:
 			socket.close()
 		#