Browse Source

Experiment code working.

Steven Engler 5 years ago
parent
commit
5de601ec50
5 changed files with 518 additions and 292 deletions
  1. 8 0
      src/chutney_manager.py
  2. 165 282
      src/experiment.py
  3. 338 0
      src/experiment_client.py
  4. 6 9
      src/log_system_usage.py
  5. 1 1
      src/throughput_server.py

+ 8 - 0
src/chutney_manager.py

@@ -56,6 +56,14 @@ class Node:
 		#
 		return '{:03}{}'.format(index, self.options['tag'])
 	#
+	def guess_control_port(self, index):
+		"""
+		This guesses the control port based on the format Chutney uses. There is
+		no good way to get the actual value.
+		"""
+		#
+		return 8000+index
+	#
 	def _value_formatter(self, value):
 		if type(value) == str:
 			return "'{}'".format(value)

+ 165 - 282
src/experiment.py

@@ -1,262 +1,137 @@
 #!/usr/bin/python3
 #
-import stem.control
-import stem.descriptor.remote
-import stem.process
-import socket
+import argparse
+import shutil
 import logging
-import multiprocessing
-import queue
 import random
+import os
+import multiprocessing
+import threading
 import time
 import json
-import os
-#
-import basic_protocols
-import throughput_protocols
-import useful
+import gzip
+import pickle
+import tempfile
 #
-def get_socks_port(control_port):
-	with stem.control.Controller.from_port(port=control_port) as controller:
-		controller.authenticate()
-		#
-		socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
-		assert(len(socks_addresses) == 1)
-		assert(socks_addresses[0][0] == '127.0.0.1')
-		#
-		return socks_addresses[0][1]
-	#
-#
-def wait_then_sleep(event, duration):
-	event.wait()
-	time.sleep(duration)
-#
-def send_measureme(stem_controller, circuit_id, measureme_id, hop):
-	response = stem_controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
-	stem.response.convert('SINGLELINE', response)
-	#
-	if not response.is_ok():
-		if response.code in ('512', '552'):
-			if response.message.startswith('Unknown circuit '):
-				raise stem.InvalidArguments(response.code, response.message, [circuit_id])
-			#
-			raise stem.InvalidRequest(response.code, response.message)
-		else:
-			raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
-		#
-	#
-#
-def send_measureme_cells(control_address, circuit_id, measureme_id, hops):
-	logging.debug('Sending measuremes to control address {}, then sleeping'.format(control_address))
-	with stem.control.Controller.from_port(address=control_address[0], port=control_address[1]) as controller:
-		controller.authenticate()
-		for hop in hops:
-			send_measureme(controller, circuit_id, measureme_id, hop)
-		#
-	#
+import stem.control
+import stem.descriptor.remote
+import stem.process
 #
-def send_measureme_cells_and_wait(control_port, circuit_id, measureme_id, hops, wait_event, wait_offset):
-	send_measureme_cells(control_port, circuit_id, measureme_id, hops)
-	wait_then_sleep(wait_event, wait_offset)
+import numa
+import log_system_usage
+import chutney_manager
+import throughput_server
+import experiment_client
+import useful
 #
-def get_fingerprints(consensus):
-	"""
-	Get the fingerprints of all relays.
-	"""
+class DummyEnterExit:
+	def __enter__(self):
+		return self
 	#
-	return [desc.fingerprint for desc in consensus]
-#
-def get_exit_fingerprints(consensus, endpoint):
-	"""
-	Get the fingerprints of relays that can exit to the endpoint.
-	"""
+	def __exit__(self, exc_type, exc_val, exc_tb):
+		pass
 	#
-	return [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
 #
-class ExperimentController:
-	def __init__(self, control_address):
-		self.control_address = control_address
-		self.connection = None
-		self.circuits = {}
-		self.unassigned_circuit_ids = []
-		self.assigned_streams = {}
+def asdasd():
+	server_address = ('127.0.0.1', 12353)
 	#
-	def connect(self):
-		self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1])
-		self.connection.authenticate()
-		#
-		self.connection.add_event_listener(self._attach_stream, stem.control.EventType.STREAM)
-		self.connection.set_conf('__LeaveStreamsUnattached', '1')
+	stop_event = multiprocessing.Event()
+	server = throughput_server.ThroughputServer(server_address, stop_event)
+	p = multiprocessing.Process(target=server.run)
+	p.start()
 	#
-	def disconnect(self):
-		#if len(self.unused_circuit_ids) > 0:
-		#	logging.warning('Closed stem controller before all circuits were used')
+	try:
+		stop_cpu_logging_event = threading.Event()
+		t = threading.Thread(target=log_system_usage.log_cpu_stats,
+							 args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.5, stop_cpu_logging_event))
+		t.start()
 		#
-		self.connection.close()
-	#
-	def assign_stream(self, from_address):
-		circuit_id = self.unassigned_circuit_ids.pop(0)
-		self.assigned_streams[from_address] = circuit_id
-		return circuit_id
-	#
-	def _attach_stream(self, stream):
 		try:
-			if stream.status == 'NEW':
-				# by default, let tor handle new streams
-				circuit_id = 0
+			logging.debug('Getting consensus')
+			try:
+				consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
+			except Exception as e:
+				raise Exception('Unable to retrieve the consensus') from e
+			#
+			fingerprints = experiment_client.get_fingerprints(consensus)
+			exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
+			non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+			#
+			assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
+			assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+			#
+			circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+			#
+			proxy_addresses = []
+			for control_port in proxy_control_ports:
+				proxy = {}
+				proxy['control'] = ('127.0.0.1', control_port)
+				proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
+				proxy_addresses.append(proxy)
+			#
+			controllers = []
+			protocol_manager = experiment_client.ExperimentProtocolManager()
+			#
+			try:
+				for proxy_address in proxy_addresses:
+					controller = experiment_client.ExperimentController(proxy_address['control'])
+					controller.connect()
+					# the controller has to attach new streams to circuits, so the
+					# connection has to stay open until we're done creating streams
+					#
+					for _ in range(args.num_streams_per_client):
+						# make a circuit for each stream
+						controller.build_circuit(circuit_generator)
+						time.sleep(0.5)
+					#
+					controllers.append(controller)
 				#
-				if stream.purpose == 'USER':
-					# this is probably one of our streams (although not guaranteed)
-					circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)]
+				start_event = multiprocessing.Event()
 				#
-				try:
-					self.connection.attach_stream(stream.id, circuit_id)
-					#logging.debug('Attaching to circuit {}'.format(circuit_id))
-				except stem.UnsatisfiableRequest:
-					if stream.purpose != 'USER':
-						# could not attach a non-user stream, so probably raised:
-						# stem.UnsatisfiableRequest: Connection is not managed by controller.
-						# therefore we should ignore this exception
-						pass
-					else:
-						raise
+				for stream_index in range(args.num_streams_per_client):
+					for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
+						if args.measureme:
+							measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
+						else:
+							measureme_id = None
+						#
+						wait_duration = random.randint(0, args.wait_range)
+						protocol = experiment_client.build_client_protocol(server_address, proxy_address['socks'],
+														 proxy_address['control'], controller, start_event,
+														 wait_duration=wait_duration, measureme_id=measureme_id,
+														 num_bytes=args.num_bytes, buffer_len=args.buffer_len)
+						protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
 					#
 				#
+				time.sleep(2)
+				start_event.set()
+				#
+				protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
+			finally:
+				for controller in controllers:
+					controller.disconnect()
+				#
+				protocol_manager.stop()
 			#
-		except:
-			logging.exception('Error while attaching the stream (control_port={}, circuit_id={}).'.format(self.control_port, circuit_id))
-			raise
-		#
-	#
-	def build_circuit(self, circuit_generator):
-		circuit_id = None
-		#
-		while circuit_id is None:
-			try:
-				circuit = circuit_generator()
-				circuit_id = self.connection.new_circuit(circuit, await_build=True)
-				logging.debug('New circuit (id={}): {}'.format(circuit_id, circuit))
-			except stem.CircuitExtensionFailed:
-				logging.debug('Failed circuit: {}'.format(circuit))
-				logging.warning('Circuit creation failed. Retrying...')
-			#
-		#
-		self.unassigned_circuit_ids.append(circuit_id)
-		self.circuits[circuit_id] = circuit
-	#
-#
-class ExperimentProtocol(basic_protocols.ChainedProtocol):
-	def __init__(self, socket, endpoint, num_bytes, custom_data=None, send_buffer_len=None, push_start_cb=None):
-		proxy_username = bytes([z for z in os.urandom(12) if z != 0])
-		proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username)
-		#
-		throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes,
-		                                                          custom_data=custom_data,
-		                                                          send_buffer_len=send_buffer_len,
-		                                                          use_acceleration=True,
-		                                                          push_start_cb=push_start_cb)
-		#
-		super().__init__([proxy_protocol, throughput_protocol])
-	#
-#
-class ExperimentProtocolManager():
-	def __init__(self):
-		self.stopped = False
-		self.process_counter = 0
-		self.used_ids = []
-		self.running_processes = {}
-		self.global_finished_process_queue = multiprocessing.Queue()
-		self.local_finished_process_queue = queue.Queue()
-		self.queue_getter = useful.QueueGetter(self.global_finished_process_queue,
-		                                       self.local_finished_process_queue.put)
-	#
-	def _run_client(self, protocol, protocol_id):
-		had_error = False
-		try:
-			logging.debug('Starting protocol (id: {})'.format(protocol_id))
-			protocol.run()
-			logging.debug('Done protocol (id: {})'.format(protocol_id))
-		except:
-			had_error = True
-			logging.warning('Protocol error')
-			logging.exception('Protocol id: {} had an error'.format(protocol_id))
 		finally:
-			self.global_finished_process_queue.put((protocol_id, had_error))
-		#
-	#
-	def start_experiment_protocol(self, protocol, protocol_id=None):
-		if protocol_id is None:
-			protocol_id = self.process_counter
-		#
-		assert not self.stopped
-		assert protocol_id not in self.used_ids, 'Protocol ID already used'
-		#
-		p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id))
-		self.running_processes[protocol_id] = p
-		self.used_ids.append(protocol_id)
+			stop_cpu_logging_event.set()
 		#
-		p.start()
-		self.process_counter += 1
-		#
-		#protocol.socket.close()
-	#
-	def wait(self, finished_protocol_cb=None):
-		while len(self.running_processes) > 0:
-			logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes)))
-			#
-			(protocol_id, had_error) = self.local_finished_process_queue.get()
-			p = self.running_processes[protocol_id]
-			p.join()
-			self.running_processes.pop(protocol_id)
-			finished_protocol_cb(protocol_id, had_error)
-		#
-	#
-	def stop(self):
-		self.wait()
-		self.queue_getter.stop()
-		self.queue_getter.join()
-		self.stopped = True
-	#
-#
-def build_client_protocol(endpoint, socks_address, control_address, controller, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None):
-	client_socket = socket.socket()
-	#
-	logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address)
-	client_socket.connect(socks_address)
-	logging.debug('Socket %d connected', client_socket.fileno())
-	#
-	custom_data = {}
+		t.join()
+	finally:
+		stop_event.set()
 	#
-	circuit_id = controller.assign_stream(client_socket.getsockname())
-	custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id])
+	p.join()
 	#
-	if measureme_id is not None:
-		custom_data['measureme_id'] = measureme_id
-		#
-		hops = list(range(len(controller.circuits[circuit_id])+1))[::-1]
-		# send the measureme cells to the last relay first
-		start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \
-						  hops=hops, event=start_event, wait_duration=wait_duration: \
-						  send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_duration)
-	else:
-		start_cb = lambda event=start_event, duration=wait_duration: wait_then_sleep(event, duration)
+	with gzip.GzipFile(os.path.join(save_data_path, 'server_results.pickle.gz'), 'wb') as f:
+		pickle.dump([x['results'] for x in server.results], f, protocol=4)
 	#
-	custom_data = json.dumps(custom_data).encode('utf-8')
-	protocol = ExperimentProtocol(client_socket, endpoint, args.num_bytes,
-								  custom_data=custom_data,
-								  send_buffer_len=args.buffer_len,
-								  push_start_cb=start_cb)
-	return protocol
 #
 if __name__ == '__main__':
-	import argparse
 	#
 	logging.basicConfig(level=logging.DEBUG)
 	logging.getLogger('stem').setLevel(logging.WARNING)
 	#
-	parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
-	parser.add_argument('ip', type=str, help='destination ip address')
-	parser.add_argument('port', type=int, help='destination port')
+	parser = argparse.ArgumentParser(description='Test the network throughput.')
 	parser.add_argument('num_bytes', type=useful.parse_bytes,
 	                    help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
 	parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
@@ -264,75 +139,83 @@ if __name__ == '__main__':
 	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
 	parser.add_argument('--wait-range', type=int, default=0,
 	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
-	parser.add_argument('--proxy-control-ports', type=useful.parse_range_list, help='range of ports for the control ports', metavar='control-ports')
 	parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
+	parser.add_argument('--chutney-only', action='store_true', help='only start chutney')
+	parser.add_argument('--no-chutney', action='store_true', help='don\'t start chutney')
 	args = parser.parse_args()
 	#
-	endpoint = (args.ip, args.port)
+	chutney_path = '/home/sengler/code/measureme/chutney'
+	tor_path = '/home/sengler/code/measureme/tor'
 	#
-	logging.debug('Getting consensus')
-	try:
-		consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
-	except Exception as e:
-		raise Exception('Unable to retrieve the consensus') from e
-	#
-	fingerprints = get_fingerprints(consensus)
-	exit_fingerprints = get_exit_fingerprints(consensus, endpoint)
-	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+	if not args.chutney_only:
+		save_data_path = os.path.join('/home/sengler/data/experiments', str(int(time.time())))
+		os.mkdir(save_data_path)
+		if not args.no_chutney:
+			measureme_log_dir = os.path.join('/ramdisk/sengler/chutney', str(int(time.time())))
+			os.mkdir(measureme_log_dir)
+		#
 	#
-	assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
-	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+	nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(2)] + \
+	        [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(2)] + \
+	        [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(1)] + \
+	        [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(2)]
 	#
-	circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+	numa_remaining = numa.get_numa_overview()
+	numa_sets = []
+	for node in nodes:
+		if not args.chutney_only and not args.no_chutney:
+			node.options['measureme_log_dir'] = measureme_log_dir
+		#
+		num_cpus = node.options['num_cpus']
+		if num_cpus%2 != 0:
+			num_cpus += 1
+		#
+		(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
+		node.options['numa_settings'] = (numa_node, processors)
+		numa_sets.append((numa_node, processors))
 	#
-	proxy_addresses = []
-	for control_port in args.proxy_control_ports:
-		proxy = {}
-		proxy['control'] = ('127.0.0.1', control_port)
-		proxy['socks'] = ('127.0.0.1', get_socks_port(control_port))
-		proxy_addresses.append(proxy)
+	unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
 	#
-	controllers = []
-	protocol_manager = ExperimentProtocolManager()
+	nicknames = [nodes[x].guess_nickname(x) for x in range(len(nodes))]
+	proxy_control_ports = [nodes[x].guess_control_port(x) for x in range(len(nodes)) if ('client', 1) in nodes[x].options.items()]
 	#
+	(fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
 	try:
-		for proxy_address in proxy_addresses:
-			controller = ExperimentController(proxy_address['control'])
-			controller.connect()
-			# the controller has to attach new streams to circuits, so the
-			# connection has to stay open until we're done creating streams
-			#
-			for _ in range(args.num_streams_per_client):
-				# make a circuit for each stream
-				controller.build_circuit(circuit_generator)
-				time.sleep(0.5)
-			#
-			controllers.append(controller)
-		#
-		start_event = multiprocessing.Event()
+		with os.fdopen(fd, mode='w') as f:
+			f.write(chutney_manager.create_chutney_config(nodes))
 		#
-		for stream_index in range(args.num_streams_per_client):
-			for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
-				if args.measureme:
-					measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
-				else:
-					measureme_id = None
+		if args.no_chutney:
+			asdasd()
+		else:
+			try:
+				with chutney_manager.ChutneyNetwork(chutney_path, tor_path, tmp_network_file) as net:
+					if args.chutney_only:
+						try:
+							logging.info('Press Ctrl-C to stop.')
+							while True:
+								time.sleep(30)
+							#
+						except KeyboardInterrupt:
+							print('')
+						#
+					else:
+						fingerprints = []
+						for nick in nicknames:
+							fingerprints.append(chutney_manager.read_fingerprint(nick, chutney_path))
+						#
+						asdasd()
+					#
+				#
+			finally:
+				if not args.chutney_only:
+					for f in os.listdir(measureme_log_dir):
+						shutil.move(os.path.join(measureme_log_dir, f), os.path.join(save_data_path, f))
+					#
+					shutil.rmtree(measureme_log_dir)
 				#
-				wait_duration = random.randint(0, args.wait_range)
-				protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller,
-				                                 wait_duration=wait_duration, measureme_id=measureme_id,
-				                                 num_bytes=args.num_bytes, buffer_len=args.buffer_len)
-				protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
 			#
 		#
-		time.sleep(2)
-		start_event.set()
-		#
-		protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
 	finally:
-		for controller in controllers:
-			controller.disconnect()
-		#
-		protocol_manager.stop()
+		os.remove(tmp_network_file)
 	#
 #

+ 338 - 0
src/experiment_client.py

@@ -0,0 +1,338 @@
+#!/usr/bin/python3
+#
+import stem.control
+import stem.descriptor.remote
+import stem.process
+import socket
+import logging
+import multiprocessing
+import queue
+import random
+import time
+import json
+import os
+#
+import basic_protocols
+import throughput_protocols
+import useful
+#
+def get_socks_port(control_port):
+	with stem.control.Controller.from_port(port=control_port) as controller:
+		controller.authenticate()
+		#
+		socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
+		assert(len(socks_addresses) == 1)
+		assert(socks_addresses[0][0] == '127.0.0.1')
+		#
+		return socks_addresses[0][1]
+	#
+#
+def wait_then_sleep(event, duration):
+	event.wait()
+	time.sleep(duration)
+#
+def send_measureme(stem_controller, circuit_id, measureme_id, hop):
+	response = stem_controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
+	stem.response.convert('SINGLELINE', response)
+	#
+	if not response.is_ok():
+		if response.code in ('512', '552'):
+			if response.message.startswith('Unknown circuit '):
+				raise stem.InvalidArguments(response.code, response.message, [circuit_id])
+			#
+			raise stem.InvalidRequest(response.code, response.message)
+		else:
+			raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
+		#
+	#
+#
+def send_measureme_cells(control_address, circuit_id, measureme_id, hops):
+	logging.debug('Sending measuremes to control address {}, then sleeping'.format(control_address))
+	with stem.control.Controller.from_port(address=control_address[0], port=control_address[1]) as controller:
+		controller.authenticate()
+		for hop in hops:
+			send_measureme(controller, circuit_id, measureme_id, hop)
+		#
+	#
+#
+def send_measureme_cells_and_wait(control_port, circuit_id, measureme_id, hops, wait_event, wait_offset):
+	send_measureme_cells(control_port, circuit_id, measureme_id, hops)
+	wait_then_sleep(wait_event, wait_offset)
+#
+def get_fingerprints(consensus):
+	"""
+	Get the fingerprints of all relays.
+	"""
+	#
+	return [desc.fingerprint for desc in consensus]
+#
+def get_exit_fingerprints(consensus, endpoint):
+	"""
+	Get the fingerprints of relays that can exit to the endpoint.
+	"""
+	#
+	return [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
+#
+class ExperimentController:
+	def __init__(self, control_address):
+		self.control_address = control_address
+		self.connection = None
+		self.circuits = {}
+		self.unassigned_circuit_ids = []
+		self.assigned_streams = {}
+	#
+	def connect(self):
+		self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1])
+		self.connection.authenticate()
+		#
+		self.connection.add_event_listener(self._attach_stream, stem.control.EventType.STREAM)
+		self.connection.set_conf('__LeaveStreamsUnattached', '1')
+	#
+	def disconnect(self):
+		#if len(self.unused_circuit_ids) > 0:
+		#	logging.warning('Closed stem controller before all circuits were used')
+		#
+		self.connection.close()
+	#
+	def assign_stream(self, from_address):
+		circuit_id = self.unassigned_circuit_ids.pop(0)
+		self.assigned_streams[from_address] = circuit_id
+		return circuit_id
+	#
+	def _attach_stream(self, stream):
+		try:
+			if stream.status == 'NEW':
+				# by default, let tor handle new streams
+				circuit_id = 0
+				#
+				if stream.purpose == 'USER':
+					# this is probably one of our streams (although not guaranteed)
+					circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)]
+				#
+				try:
+					self.connection.attach_stream(stream.id, circuit_id)
+					#logging.debug('Attaching to circuit {}'.format(circuit_id))
+				except stem.UnsatisfiableRequest:
+					if stream.purpose != 'USER':
+						# could not attach a non-user stream, so probably raised:
+						# stem.UnsatisfiableRequest: Connection is not managed by controller.
+						# therefore we should ignore this exception
+						pass
+					else:
+						raise
+					#
+				#
+			#
+		except:
+			logging.exception('Error while attaching the stream (control_port={}, circuit_id={}).'.format(self.control_port, circuit_id))
+			raise
+		#
+	#
+	def build_circuit(self, circuit_generator):
+		circuit_id = None
+		#
+		while circuit_id is None:
+			try:
+				circuit = circuit_generator()
+				circuit_id = self.connection.new_circuit(circuit, await_build=True)
+				logging.debug('New circuit (id={}): {}'.format(circuit_id, circuit))
+			except stem.CircuitExtensionFailed:
+				logging.debug('Failed circuit: {}'.format(circuit))
+				logging.warning('Circuit creation failed. Retrying...')
+			#
+		#
+		self.unassigned_circuit_ids.append(circuit_id)
+		self.circuits[circuit_id] = circuit
+	#
+#
+class ExperimentProtocol(basic_protocols.ChainedProtocol):
+	def __init__(self, socket, endpoint, num_bytes, custom_data=None, send_buffer_len=None, push_start_cb=None):
+		proxy_username = bytes([z for z in os.urandom(12) if z != 0])
+		proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username)
+		#
+		throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes,
+		                                                          custom_data=custom_data,
+		                                                          send_buffer_len=send_buffer_len,
+		                                                          use_acceleration=True,
+		                                                          push_start_cb=push_start_cb)
+		#
+		super().__init__([proxy_protocol, throughput_protocol])
+	#
+#
+class ExperimentProtocolManager():
+	def __init__(self):
+		self.stopped = False
+		self.process_counter = 0
+		self.used_ids = []
+		self.running_processes = {}
+		self.global_finished_process_queue = multiprocessing.Queue()
+		self.local_finished_process_queue = queue.Queue()
+		self.queue_getter = useful.QueueGetter(self.global_finished_process_queue,
+		                                       self.local_finished_process_queue.put)
+	#
+	def _run_client(self, protocol, protocol_id):
+		had_error = False
+		try:
+			logging.debug('Starting protocol (id: {})'.format(protocol_id))
+			protocol.run()
+			logging.debug('Done protocol (id: {})'.format(protocol_id))
+		except:
+			had_error = True
+			logging.warning('Protocol error')
+			logging.exception('Protocol id: {} had an error'.format(protocol_id))
+		finally:
+			self.global_finished_process_queue.put((protocol_id, had_error))
+		#
+	#
+	def start_experiment_protocol(self, protocol, protocol_id=None):
+		if protocol_id is None:
+			protocol_id = self.process_counter
+		#
+		assert not self.stopped
+		assert protocol_id not in self.used_ids, 'Protocol ID already used'
+		#
+		p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id))
+		self.running_processes[protocol_id] = p
+		self.used_ids.append(protocol_id)
+		#
+		p.start()
+		self.process_counter += 1
+		#
+		#protocol.socket.close()
+	#
+	def wait(self, finished_protocol_cb=None):
+		while len(self.running_processes) > 0:
+			logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes)))
+			#
+			(protocol_id, had_error) = self.local_finished_process_queue.get()
+			p = self.running_processes[protocol_id]
+			p.join()
+			self.running_processes.pop(protocol_id)
+			finished_protocol_cb(protocol_id, had_error)
+		#
+	#
+	def stop(self):
+		self.wait()
+		self.queue_getter.stop()
+		self.queue_getter.join()
+		self.stopped = True
+	#
+#
+def build_client_protocol(endpoint, socks_address, control_address, controller, start_event, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None):
+	client_socket = socket.socket()
+	#
+	logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address)
+	client_socket.connect(socks_address)
+	logging.debug('Socket %d connected', client_socket.fileno())
+	#
+	custom_data = {}
+	#
+	circuit_id = controller.assign_stream(client_socket.getsockname())
+	custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id])
+	#
+	if measureme_id is not None:
+		custom_data['measureme_id'] = measureme_id
+		#
+		hops = list(range(len(controller.circuits[circuit_id])+1))[::-1]
+		# send the measureme cells to the last relay first
+		start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \
+						  hops=hops, event=start_event, wait_duration=wait_duration: \
+						  send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_duration)
+	else:
+		start_cb = lambda event=start_event, duration=wait_duration: wait_then_sleep(event, duration)
+	#
+	custom_data = json.dumps(custom_data).encode('utf-8')
+	protocol = ExperimentProtocol(client_socket, endpoint, num_bytes,
+								  custom_data=custom_data,
+								  send_buffer_len=buffer_len,
+								  push_start_cb=start_cb)
+	return protocol
+#
+if __name__ == '__main__':
+	import argparse
+	#
+	logging.basicConfig(level=logging.DEBUG)
+	logging.getLogger('stem').setLevel(logging.WARNING)
+	#
+	parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
+	parser.add_argument('ip', type=str, help='destination ip address')
+	parser.add_argument('port', type=int, help='destination port')
+	parser.add_argument('num_bytes', type=useful.parse_bytes,
+	                    help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
+	parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
+	parser.add_argument('--buffer-len', type=useful.parse_bytes,
+	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
+	parser.add_argument('--wait-range', type=int, default=0,
+	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
+	parser.add_argument('--proxy-control-ports', type=useful.parse_range_list, help='range of ports for the control ports', metavar='control-ports')
+	parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
+	args = parser.parse_args()
+	#
+	endpoint = (args.ip, args.port)
+	#
+	logging.debug('Getting consensus')
+	try:
+		consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
+	except Exception as e:
+		raise Exception('Unable to retrieve the consensus') from e
+	#
+	fingerprints = get_fingerprints(consensus)
+	exit_fingerprints = get_exit_fingerprints(consensus, endpoint)
+	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+	#
+	assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
+	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+	#
+	circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+	#
+	proxy_addresses = []
+	for control_port in args.proxy_control_ports:
+		proxy = {}
+		proxy['control'] = ('127.0.0.1', control_port)
+		proxy['socks'] = ('127.0.0.1', get_socks_port(control_port))
+		proxy_addresses.append(proxy)
+	#
+	controllers = []
+	protocol_manager = ExperimentProtocolManager()
+	#
+	try:
+		for proxy_address in proxy_addresses:
+			controller = ExperimentController(proxy_address['control'])
+			controller.connect()
+			# the controller has to attach new streams to circuits, so the
+			# connection has to stay open until we're done creating streams
+			#
+			for _ in range(args.num_streams_per_client):
+				# make a circuit for each stream
+				controller.build_circuit(circuit_generator)
+				time.sleep(0.5)
+			#
+			controllers.append(controller)
+		#
+		start_event = multiprocessing.Event()
+		#
+		for stream_index in range(args.num_streams_per_client):
+			for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
+				if args.measureme:
+					measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
+				else:
+					measureme_id = None
+				#
+				wait_duration = random.randint(0, args.wait_range)
+				protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller, start_event,
+				                                 wait_duration=wait_duration, measureme_id=measureme_id,
+				                                 num_bytes=args.num_bytes, buffer_len=args.buffer_len)
+				protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
+			#
+		#
+		time.sleep(2)
+		start_event.set()
+		#
+		protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
+	finally:
+		for controller in controllers:
+			controller.disconnect()
+		#
+		protocol_manager.stop()
+	#
+#

+ 6 - 9
src/log_system_usage.py

@@ -2,7 +2,7 @@
 #
 import time
 import threading
-import json
+import pickle
 import gzip
 #
 PROC_STAT_HEADERS = ('user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice')
@@ -82,16 +82,13 @@ def log_cpu_stats(path, interval, stop_event):
 		#
 		stop_event.wait(interval)
 	#
-	with gzip.GzipFile(path, 'w') as f:
-		# json.dump writes a string, but a gzip.GzipFile can only write bytes, so monkey-patch it
-		old_write = f.write
-		f.write = lambda s: old_write(s.encode('utf-8'))
-		json.dump(stats, f)
+	with gzip.GzipFile(path, 'wb') as f:
+		pickle.dump(stats, f, protocol=4)
 	#
 #
 def load_cpu_stats(path):
-	with gzip.GzipFile(path, 'r') as f:
-		return json.load(f)
+	with gzip.GzipFile(path, 'rb') as f:
+		return pickle.load(f)
 	#
 #
 '''
@@ -126,7 +123,7 @@ def log_cpu_stats(path, interval, stop_event):
 '''
 if __name__ == '__main__':
 	stop_event = threading.Event()
-	t = threading.Thread(target=log_cpu_stats, args=('/tmp/cpu_stats.json.gz', 0.5, stop_event))
+	t = threading.Thread(target=log_cpu_stats, args=('/tmp/cpu_stats.pickle.gz', 0.5, stop_event))
 	t.start()
 	#
 	try:

+ 1 - 1
src/throughput_server.py

@@ -103,7 +103,7 @@ if __name__ == '__main__':
 	else:
 		bind_to = ('0.0.0.0', args.port)
 	#
-	stop_event = multiprocessing.Event()
+	#stop_event = multiprocessing.Event()
 	server = ThroughputServer(bind_to, None)
 	try:
 		server.run()