Browse Source

Latest changes before starting new work (relay testing changes).

Steven Engler 4 years ago
parent
commit
28d4c95712
6 changed files with 440 additions and 162 deletions
  1. 8 3
      src/chutney_manager.py
  2. 324 136
      src/experiment.py
  3. 7 0
      src/log_system_usage.py
  4. 11 3
      src/parse_measureme_logs.py
  5. 88 19
      src/plot_streams.py
  6. 2 1
      src/test_relay.py

+ 8 - 3
src/chutney_manager.py

@@ -14,7 +14,7 @@ def start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=
 	try:
 		subprocess.check_output(args, stderr=subprocess.STDOUT)
 	except subprocess.CalledProcessError as e:
-		logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
+		#logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
 		raise
 	#
 #
@@ -32,7 +32,12 @@ class ChutneyNetwork:
 		self.chutney_path = chutney_path
 		self.network_file = network_file
 		#
-		start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=controlling_pid)
+		try:
+			start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=controlling_pid)
+		except:
+			self.stop()
+			raise
+		#
 	#
 	def stop(self):
 		stop_chutney_network(self.chutney_path, self.network_file)
@@ -95,7 +100,7 @@ def create_chutney_config(nodes):
 		return None
 	#
 	config = ''
-	config += 'NODES = [{}]\n'.format(', \n'.join([str(node) for node in nodes]))
+	config += 'NODES = [{}]\n'.format(',\n'.join([str(node) for node in nodes]))
 	config += '\n'
 	config += 'ConfigureNodes(NODES)'
 	#

+ 324 - 136
src/experiment.py

@@ -32,100 +32,252 @@ class DummyEnterExit:
 		pass
 	#
 #
-def asdasd():
-	server_address = ('127.0.0.1', 12353)
-	#
-	stop_event = multiprocessing.Event()
-	server = throughput_server.ThroughputServer(server_address, stop_event)
-	p = multiprocessing.Process(target=server.run)
-	p.start()
-	#
-	try:
-		stop_cpu_logging_event = threading.Event()
-		t = threading.Thread(target=log_system_usage.log_cpu_stats,
-							 args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.5, stop_cpu_logging_event))
-		t.start()
+class Experiment:
+	def __init__(self, save_data_path, measureme_log_path, num_bytes, num_streams_per_client,
+	             num_clients, num_guards, num_authorities, num_exits,
+	             buffer_len=None, wait_range=None, measureme=False):
+		self.save_data_path = save_data_path
+		self.measureme_log_path = measureme_log_path
+		self.num_bytes = num_bytes
+		self.num_streams_per_client = num_streams_per_client
+		self.num_clients = num_clients
+		self.num_guards = num_guards
+		self.num_authorities = num_authorities
+		self.num_exits = num_exits
+		self.buffer_len = buffer_len
+		self.wait_range = wait_range
+		self.measureme = measureme
 		#
-		try:
-			logging.debug('Getting consensus')
-			try:
-				consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
-			except Exception as e:
-				raise Exception('Unable to retrieve the consensus') from e
-			#
-			fingerprints = experiment_client.get_fingerprints(consensus)
-			exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, server_address)
-			non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+		self.chutney_path = '/home/sengler/code/measureme/chutney'
+		self.tor_path = '/home/sengler/code/measureme/tor'
+		self.server_address = ('127.0.0.1', 12353)
+		#
+		self.nodes = None
+		self.proxy_control_ports = None
+		#
+		self.configure_chutney()
+		#
+		if save_data_path is not None:
+			with open(os.path.join(save_data_path, 'experiment-settings.json'), 'w') as f:
+				settings = {}
+				settings['save_data_path'] = self.save_data_path
+				settings['num_bytes'] = self.num_bytes
+				settings['num_streams_per_client'] = self.num_streams_per_client
+				settings['num_clients'] = self.num_clients
+				settings['num_guards'] = self.num_guards
+				settings['num_authorities'] = self.num_authorities
+				settings['num_exits'] = self.num_exits
+				settings['buffer_len'] = self.buffer_len
+				settings['wait_range'] = self.wait_range
+				settings['measureme'] = self.measureme
+				settings['chutney_path'] = self.chutney_path
+				settings['tor_path'] = self.tor_path
+				settings['server_address'] = self.server_address
+				#
+				json.dump(settings, f)
 			#
-			assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
-			assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+		#
+	#
+	def configure_chutney(self):
+		self.nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
+		        [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(self.num_guards)] + \
+		        [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(self.num_exits)] + \
+		        [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(self.num_clients)]
+		#
+		for node in self.nodes:
+			if self.measureme_log_path is not None:
+				node.options['measureme_log_dir'] = measureme_log_path
 			#
-			circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+		#
+		numa_remaining = numa.get_numa_overview()
+		numa_sets = {}
+		for (node, index) in zip(self.nodes, range(len(self.nodes))):
+			num_cpus = node.options['num_cpus']
+			if num_cpus%2 != 0:
+				num_cpus += 1
 			#
-			proxy_addresses = []
-			for control_port in proxy_control_ports:
-				proxy = {}
-				proxy['control'] = ('127.0.0.1', control_port)
-				proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
-				proxy_addresses.append(proxy)
+			(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
+			node.options['numa_settings'] = (numa_node, processors)
+			numa_sets[node.guess_nickname(index)] = (numa_node, processors)
+		#
+		#unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
+		#
+		#nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
+		self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
+		# TODO: ^^ improve this
+		#
+		if self.save_data_path is not None:
+			with gzip.GzipFile(os.path.join(self.save_data_path, 'numa_data.pickle.gz'), 'wb') as f:
+				pickle.dump(numa_sets, f, protocol=4)
 			#
-			controllers = []
-			protocol_manager = experiment_client.ExperimentProtocolManager()
+		#
+	#
+	def start_chutney(self, next_action=None):
+		#
+		(fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
+		try:
+			with os.fdopen(fd, mode='w') as f:
+				f.write(chutney_manager.create_chutney_config(self.nodes))
 			#
 			try:
-				for proxy_address in proxy_addresses:
-					controller = experiment_client.ExperimentController(proxy_address['control'])
-					controller.connect()
-					# the controller has to attach new streams to circuits, so the
-					# connection has to stay open until we're done creating streams
+				chutney_network = None
+				num_attemtps = 0
+				while chutney_network is None:
+					try:
+						num_attemtps += 1
+						chutney_network = chutney_manager.ChutneyNetwork(self.chutney_path, self.tor_path, tmp_network_file)
+					except KeyboardInterrupt:
+						raise
+					except:
+						logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
 					#
-					for _ in range(args.num_streams_per_client):
-						# make a circuit for each stream
-						controller.build_circuit(circuit_generator)
-						time.sleep(0.5)
-					#
-					controllers.append(controller)
-				#
-				start_event = multiprocessing.Event()
 				#
-				for stream_index in range(args.num_streams_per_client):
-					for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
-						if args.measureme:
-							measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
-						else:
-							measureme_id = None
+				#with chutney_network as net:
+				with chutney_network:
+					nicknames = [self.nodes[x].guess_nickname(x) for x in range(len(self.nodes))]
+					fingerprints = {nick: chutney_manager.read_fingerprint(nick, self.chutney_path) for nick in nicknames}
+					#
+					if self.save_data_path is not None:
+						with gzip.GzipFile(os.path.join(self.save_data_path, 'fingerprints.pickle.gz'), 'wb') as f:
+							pickle.dump(fingerprints, f, protocol=4)
 						#
-						wait_duration = random.randint(0, args.wait_range)
-						protocol = experiment_client.build_client_protocol(server_address, proxy_address['socks'],
-														 proxy_address['control'], controller, start_event,
-														 wait_duration=wait_duration, measureme_id=measureme_id,
-														 num_bytes=args.num_bytes, buffer_len=args.buffer_len)
-						protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
+					#
+					if next_action is not None:
+						next_action()
 					#
 				#
-				time.sleep(2)
-				start_event.set()
-				#
-				protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
 			finally:
-				for controller in controllers:
-					controller.disconnect()
+				if self.measureme_log_path is not None:
+					for f in os.listdir(self.measureme_log_path):
+						shutil.move(os.path.join(self.measureme_log_path, f), os.path.join(self.save_data_path, f))
+					#
+					shutil.rmtree(self.measureme_log_path)
 				#
-				protocol_manager.stop()
+			#
+		finally:
+			if self.save_data_path is not None:
+				shutil.copyfile(tmp_network_file, os.path.join(self.save_data_path, os.path.basename(tmp_network_file)))
+			#
+			os.remove(tmp_network_file)
+		#
+	#
+	def start_throughput_server(self, next_action=None):
+		stop_event = multiprocessing.Event()
+		server = throughput_server.ThroughputServer(self.server_address, stop_event)
+		p = multiprocessing.Process(target=server.run)
+		p.start()
+		#
+		try:
+			if next_action is not None:
+				next_action()
+			#
+		finally:
+			stop_event.set()
+		#
+		p.join()
+		#
+		with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
+			pickle.dump([x['results'] for x in server.results], f, protocol=4)
+		#
+	#
+	def start_system_logging(self, next_action=None):
+		stop_cpu_logging_event = multiprocessing.Event()
+		p = multiprocessing.Process(target=log_system_usage.log_cpu_stats,
+		                            args=(os.path.join(save_data_path, 'cpu_stats.pickle.gz'), 0.1, stop_cpu_logging_event))
+		p.start()
+		#
+		try:
+			if next_action is not None:
+				next_action()
 			#
 		finally:
 			stop_cpu_logging_event.set()
 		#
-		t.join()
-	finally:
-		stop_event.set()
+		p.join()
 	#
-	p.join()
+	def start_throughput_clients(self):
+		logging.debug('Getting consensus')
+		try:
+			consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
+		except Exception as e:
+			raise Exception('Unable to retrieve the consensus') from e
+		#
+		fingerprints = experiment_client.get_fingerprints(consensus)
+		exit_fingerprints = experiment_client.get_exit_fingerprints(consensus, self.server_address)
+		non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
+		#
+		assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
+		assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
+		#
+		circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
+		#
+		proxy_addresses = []
+		for control_port in self.proxy_control_ports:
+			proxy = {}
+			proxy['control'] = ('127.0.0.1', control_port)
+			proxy['socks'] = ('127.0.0.1', experiment_client.get_socks_port(control_port))
+			proxy_addresses.append(proxy)
+		#
+		controllers = []
+		protocol_manager = experiment_client.ExperimentProtocolManager()
+		#
+		try:
+			for proxy_address in proxy_addresses:
+				controller = experiment_client.ExperimentController(proxy_address['control'])
+				controller.connect()
+				# the controller has to attach new streams to circuits, so the
+				# connection has to stay open until we're done creating streams
+				#
+				for _ in range(self.num_streams_per_client):
+					# make a circuit for each stream
+					controller.build_circuit(circuit_generator)
+					time.sleep(0.5)
+				#
+				controllers.append(controller)
+			#
+			start_event = multiprocessing.Event()
+			#
+			used_measureme_ids = set()
+			for stream_index in range(self.num_streams_per_client):
+				for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
+					if self.measureme:
+						measureme_id = stream_index*len(controllers) + controller_index + 1
+						assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
+						used_measureme_ids |= set([measureme_id])
+					else:
+						measureme_id = None
+					#
+					wait_duration = random.randint(0, self.wait_range)
+					protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
+													 proxy_address['control'], controller, start_event,
+													 wait_duration=wait_duration, measureme_id=measureme_id,
+													 num_bytes=self.num_bytes, buffer_len=self.buffer_len)
+					protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
+				#
+			#
+			time.sleep(2)
+			start_event.set()
+			#
+			protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
+		finally:
+			for controller in controllers:
+				controller.disconnect()
+			#
+			protocol_manager.stop()
+		#
 	#
-	with gzip.GzipFile(os.path.join(save_data_path, 'server_results.pickle.gz'), 'wb') as f:
-		pickle.dump([x['results'] for x in server.results], f, protocol=4)
+#
+def wait_for_keyboard_interrupt():
+	try:
+		logging.info('Press Ctrl-C to stop.')
+		while True:
+			time.sleep(30)
+		#
+	except KeyboardInterrupt:
+		print('')
 	#
 #
+'''
 if __name__ == '__main__':
 	#
 	logging.basicConfig(level=logging.DEBUG)
@@ -140,82 +292,118 @@ if __name__ == '__main__':
 	parser.add_argument('--wait-range', type=int, default=0,
 	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
 	parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
-	parser.add_argument('--chutney-only', action='store_true', help='only start chutney')
-	parser.add_argument('--no-chutney', action='store_true', help='don\'t start chutney')
+	parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
 	args = parser.parse_args()
 	#
-	chutney_path = '/home/sengler/code/measureme/chutney'
-	tor_path = '/home/sengler/code/measureme/tor'
+	experiment_time = time.time()
 	#
-	if not args.chutney_only:
-		save_data_path = os.path.join('/home/sengler/data/experiments', str(int(time.time())))
+	if args.debugging != 'only-chutney':
+		save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
 		os.mkdir(save_data_path)
-		if not args.no_chutney:
-			measureme_log_dir = os.path.join('/ramdisk/sengler/chutney', str(int(time.time())))
-			os.mkdir(measureme_log_dir)
-		#
+	else:
+		save_data_path = None
 	#
-	nodes = [chutney_manager.Node(tag='a', relay=1, num_cpus=2, authority=1, torrc='authority.tmpl') for _ in range(2)] + \
-	        [chutney_manager.Node(tag='r', relay=1, num_cpus=2, torrc='relay-non-exit.tmpl') for _ in range(2)] + \
-	        [chutney_manager.Node(tag='e', exit=1, num_cpus=2, torrc='relay.tmpl') for _ in range(1)] + \
-	        [chutney_manager.Node(tag='c', client=1, num_cpus=1, torrc='client.tmpl') for _ in range(2)]
+	if args.debugging is not None:
+		measureme_log_path = None
+	else:
+		measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
+		os.mkdir(measureme_log_path)
 	#
-	numa_remaining = numa.get_numa_overview()
-	numa_sets = []
-	for node in nodes:
-		if not args.chutney_only and not args.no_chutney:
-			node.options['measureme_log_dir'] = measureme_log_dir
-		#
-		num_cpus = node.options['num_cpus']
-		if num_cpus%2 != 0:
-			num_cpus += 1
-		#
-		(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
-		node.options['numa_settings'] = (numa_node, processors)
-		numa_sets.append((numa_node, processors))
+	experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, args.num_streams_per_client,
+	                        args.buffer_len, args.wait_range, args.measureme)
 	#
-	unused_processors = useful.generate_range_list([z for node in numa_remaining for y in numa_remaining[node]['physical_cores'] for z in y])
+	start_time = time.time()
 	#
-	nicknames = [nodes[x].guess_nickname(x) for x in range(len(nodes))]
-	proxy_control_ports = [nodes[x].guess_control_port(x) for x in range(len(nodes)) if ('client', 1) in nodes[x].options.items()]
+	if args.debugging == 'no-chutney':
+		experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
+	elif args.debugging == 'only-chutney':
+		experiment.start_chutney(wait_for_keyboard_interrupt)
+	else:
+		experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
 	#
-	(fd, tmp_network_file) = tempfile.mkstemp(prefix='chutney-network-')
-	try:
-		with os.fdopen(fd, mode='w') as f:
-			f.write(chutney_manager.create_chutney_config(nodes))
+	logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
+#
+'''
+if __name__ == '__main__':
+	#
+	logging.basicConfig(level=logging.DEBUG)
+	logging.getLogger('stem').setLevel(logging.WARNING)
+	#
+	parser = argparse.ArgumentParser(description='Test the network throughput.')
+	parser.add_argument('num_bytes', type=useful.parse_bytes,
+	                    help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
+	parser.add_argument('--buffer-len', type=useful.parse_bytes,
+	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
+	parser.add_argument('--wait-range', type=int, default=0,
+	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
+	parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
+	parser.add_argument('--debugging', choices=['only-chutney','no-chutney'], help='debugging options')
+	args = parser.parse_args()
+	#
+	num_clients = 4
+	num_guards = 6
+	num_authorities = 2 # will also act as a guard
+	num_exits = 1
+	#
+	experiment_time = time.time()
+	#
+	if args.debugging != 'only-chutney':
+		base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
+		os.mkdir(base_save_data_path)
+	else:
+		base_save_data_path = None
+	#
+	if args.debugging is not None:
+		measureme_log_path = None
+	else:
+		measureme_log_path = os.path.join('/ramdisk/sengler/chutney', str(int(experiment_time)))
+	#
+	start_time = time.time()
+	all_data_paths = []
+	#
+	#for num_streams_per_client in [1, 2, 4, 6, 8, 10, 12]:
+	for num_streams_per_client in [1, 2, 3, 4, 5]:
+	#for num_streams_per_client in [6,7,8]:
+		logging.info('Starting with {} streams per client'.format(num_streams_per_client))
+		save_data_path = None
+		#
+		if base_save_data_path is not None:
+			save_data_path = os.path.join(base_save_data_path, 'streams-{:04d}'.format(num_streams_per_client*num_clients))
+			all_data_paths.append(save_data_path)
+			os.mkdir(save_data_path)
+		#
+		if measureme_log_path is not None:
+			os.mkdir(measureme_log_path)
 		#
-		if args.no_chutney:
-			asdasd()
+		experiment = Experiment(save_data_path, measureme_log_path, args.num_bytes, num_streams_per_client,
+		                        num_clients, num_guards, num_authorities, num_exits,
+								args.buffer_len, args.wait_range, args.measureme)
+		#
+		if args.debugging == 'no-chutney':
+			experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients))
+		elif args.debugging == 'only-chutney':
+			experiment.start_chutney(wait_for_keyboard_interrupt)
 		else:
-			try:
-				with chutney_manager.ChutneyNetwork(chutney_path, tor_path, tmp_network_file) as net:
-					if args.chutney_only:
-						try:
-							logging.info('Press Ctrl-C to stop.')
-							while True:
-								time.sleep(30)
-							#
-						except KeyboardInterrupt:
-							print('')
-						#
-					else:
-						fingerprints = []
-						for nick in nicknames:
-							fingerprints.append(chutney_manager.read_fingerprint(nick, chutney_path))
-						#
-						asdasd()
-					#
-				#
-			finally:
-				if not args.chutney_only:
-					for f in os.listdir(measureme_log_dir):
-						shutil.move(os.path.join(measureme_log_dir, f), os.path.join(save_data_path, f))
-					#
-					shutil.rmtree(measureme_log_dir)
-				#
+			experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: experiment.start_system_logging(experiment.start_throughput_clients)))
+		#
+	#
+	logging.info('Total time: {:.2f} minutes'.format((time.time()-start_time)/60))
+	#
+	import parse_measureme_logs
+	for path in all_data_paths:
+		logging.info('Parsing logs for {}'.format(path))
+		measureme_tor_logs = [os.path.join(path, f) for f in os.listdir(path) if f.startswith('measureme-')]
+		#
+		logs = []
+		for name in measureme_tor_logs:
+			with open(name, 'r') as f:
+				logs.append(parse_measureme_logs.read_log(f))
 			#
 		#
-	finally:
-		os.remove(tmp_network_file)
+		streams = parse_measureme_logs.get_streams_from_logs(logs)
+		#
+		with gzip.GzipFile(os.path.join(path, 'measureme-data.pickle.gz'), 'wb') as f:
+			pickle.dump(streams, f, protocol=4)
+		#
 	#
 #

+ 7 - 0
src/log_system_usage.py

@@ -56,6 +56,13 @@ def calculate_cpu_usage(initial, current):
 	#
 	return (current_non_idle-initial_non_idle)/(current_total-initial_total)
 #
+def calculate_cpu_usage_continuous(stats):
+	cpu_usages = []
+	for i in range(len(stats)-1):
+		cpu_usages.append(calculate_cpu_usage(stats[i], stats[i+1]))
+	#
+	return cpu_usages
+#
 def log_cpu_stats(path, interval, stop_event):
 	"""
 	Log the cpu stats to a gz compressed JSON file. Storing JSON

+ 11 - 3
src/parse_measureme_logs.py

@@ -248,6 +248,16 @@ def get_streams_from_logs(logs):
 	relays = [log for log in logs if log['fingerprint'] is not None]
 	#relays_by_fingerprint = {log['fingerprint']: log for log in logs if log['fingerprint'] is not None}
 	#
+	measureme_ids = [proxy['measureme_id'].keys() for proxy in proxies]
+	for x in range(len(proxies)):
+		for y in range(x+1, len(proxies)):
+			# make sure that two different proxies do not use the same measureme_id
+			overlapping_ids = set(measureme_ids[x]) & set(measureme_ids[y])
+			if len(overlapping_ids) > 0:
+				raise Exception('Two proxies ({} and {}) have circuits with the same measureme_id ({}).'.format(x, y, overlapping_ids))
+			#
+		#
+	#
 	linked_relay_cells = {}
 	#
 	for proxy in proxies:
@@ -258,8 +268,6 @@ def get_streams_from_logs(logs):
 			#linked_relay_cells[(logs.index(relay), logs.index(proxy))] = reverse_links(links)
 		#
 	#
-	#print(linked_relay_cells)
-	#exit(1)
 	print('Done proxies')
 	for x in range(len(relays)):
 		for y in range(x+1, len(relays)):
@@ -390,7 +398,7 @@ if __name__ == '__main__':
 	#
 	logs = []
 	#
-	save_to = 'processed-data.pickle.gz'
+	save_to = 'measureme-data.pickle.gz'
 	if os.path.isfile(save_to):
 		okay_to_overwrite = input('Output file \'{}\' already exists. Would you like to overwrite it? [y/n]: '.format(save_to)).strip()
 		okay_to_overwrite = (okay_to_overwrite.lower() == 'y')

+ 88 - 19
src/plot_streams.py

@@ -1,11 +1,14 @@
 #!/usr/bin/env python3
 #
 import gzip
+import sys
 import pickle
 import matplotlib.pylab as plt
 import numpy as np
 #
-def plot_cells(stream, plot_widget, time_offset=None, clickable=False, label=None, color=None):
+import log_system_usage
+#
+def plot_cells(stream, ax, time_offset=None, clickable=False, label=None, color=None):
 	num_bytes = np.cumsum(stream['length'])
 	timestamps = np.asarray(stream['timestamp'])
 	#
@@ -28,7 +31,7 @@ def onresize(event):
 	#
 	event.canvas.figure.subplots_adjust(left=w_padding/w+0.01, right=1-((w_padding/2)/w), top=1-(h_padding/h), bottom=h_padding/h+0.01)
 #
-def onscroll(event):
+def zoom_axes_cb(event):
 	scale = 2**(-event.step)
 	ax = event.inaxes
 	mouse_x = event.xdata
@@ -41,7 +44,10 @@ def onscroll(event):
 	#
 	event.canvas.draw()
 #
-def onpick(event, lines):
+def pick_measureme_line_cb(event, lines, target_axes=None):
+	if target_axes != None and event.mouseevent.inaxes not in target_axes:
+		return
+	#
 	this_line = event.artist
 	if event.mouseevent.button == 1 and not event.mouseevent.dblclick and event.mouseevent.key is None:
 		# if the mouse is single-clicked and no keyboard key is held down
@@ -57,12 +63,15 @@ def onpick(event, lines):
 					for_legend.append(line)
 				#
 			#
-			event.mouseevent.inaxes.legend(for_legend, [x.get_label() for x in for_legend], loc='upper right')
+			event.mouseevent.inaxes.legend(for_legend, [x.get_label() for x in for_legend], loc='lower right')
 			event.canvas.draw_idle()
 		#
 	#
 #
-def onclick(event, lines):
+def reset_measureme_lines_cb(event, lines, target_axes=None):
+	if target_axes != None and event.inaxes not in target_axes:
+		return
+	#
 	if event.button == 1 and event.dblclick and event.key is None:
 		# if the mouse is double-clicked and no keyboard key is held down
 		for (line, info) in lines.items():
@@ -72,11 +81,14 @@ def onclick(event, lines):
 				line.set_visible(False)
 			#
 		#
-		event.inaxes.get_legend().remove()
+		legend = event.inaxes.get_legend()
+		if legend != None:
+			legend.remove()
+		#
 		event.canvas.draw_idle()
 	#
 #
-def onmotion(event, pan_settings):
+def mouse_pan_cb(event, pan_settings):
 	if event.inaxes is not None and event.key == 'control' and event.button == 1:
 		ax = event.inaxes
 		pixel_to_data = ax.transData.inverted()
@@ -94,7 +106,10 @@ def onmotion(event, pan_settings):
 	pan_settings['start_x'] = event.x
 	pan_settings['start_y'] = event.y
 #
-def onkeypress(event, lines):
+def subsample_plot_cb(event, lines, target_axes=None):
+	if target_axes != None and event.inaxes not in target_axes:
+		return
+	#
 	if event.key == 'control':
 		num_points = 0
 		range_x = event.inaxes.xaxis.get_view_interval()
@@ -126,7 +141,10 @@ def onkeypress(event, lines):
 		event.canvas.draw_idle()
 	#
 #
-def onkeyrelease(event, lines):
+def undo_subsampling_cb(event, lines, target_axes=None):
+	if target_axes != None and event.inaxes not in target_axes:
+		return
+	#
 	if event.key == 'control':
 		for (line, info) in lines.items():
 			line.set_xdata(line.orig_x)
@@ -140,10 +158,21 @@ def get_complimentary_color(color_index):
 	return (color_index+1 if color_index%2==0 else color_index-1)
 #
 if __name__ == '__main__':
-	with gzip.GzipFile('processed-data.pickle.gz', 'rb') as f:
+	with gzip.GzipFile(sys.argv[1], 'rb') as f:
 		streams = pickle.load(f)
 	#
-	fig, ax = plt.subplots()#constrained_layout=True
+	with gzip.GzipFile(sys.argv[2], 'rb') as f:
+		system_usage = pickle.load(f)
+		system_usage['timestamps'] = np.array(system_usage['timestamps'])
+	#
+	with gzip.GzipFile(sys.argv[3], 'rb') as f:
+		numa_data = pickle.load(f)
+	#
+	with gzip.GzipFile(sys.argv[4], 'rb') as f:
+		fingerprints = pickle.load(f)
+	#
+	#fig, ax = plt.subplots()#constrained_layout=True
+	fig, (ax, ax_cpu_usage) = plt.subplots(2, 1, sharex=True)
 	#
 	start_time = min([hop[t]['timestamp'][0] for m in streams for s in streams[m] for d in streams[m][s]
 	                  for hop in streams[m][s][d] for t in ('received','sent')])
@@ -180,7 +209,9 @@ if __name__ == '__main__':
 					is_main_plot = (hop_index == len(streams[measureme_id][stream_id][direction])-1 and transmission == 'sent')
 					direction_label = direction_shortforms[direction]
 					transmission_label = transmission_shortforms[transmission]
-					label = 'hop={}, {:.4}, {:.4}, mid={}, sid={}'.format(hop_index, direction_label, transmission_label, measureme_id, stream_id)
+					fingerprint = str(streams[measureme_id][stream_id][direction][hop_index]['fingerprint'])
+					label = 'hop={}, {:.4}, {:.4}, mid={}, sid={}, fprnt={:.6s}'.format(hop_index, direction_label, transmission_label,
+					                                                               measureme_id, stream_id, fingerprint)
 					line = plot_cells(data, ax, time_offset=start_time, clickable=is_main_plot, label=label, color=colormap(color_index))
 					if not is_main_plot:
 						line.set_visible(False)
@@ -191,19 +222,57 @@ if __name__ == '__main__':
 			#
 		#
 	#
+	guard_counter = {}
+	for measureme_id in streams:
+		for stream_id in streams[measureme_id]:
+			direction = 'forward'
+			fingerprint = streams[measureme_id][stream_id][direction][1]['fingerprint']
+			if fingerprint not in guard_counter:
+				guard_counter[fingerprint] = 0
+			#
+			guard_counter[fingerprint] += 1
+		#
+	#
+	system_usage_timestamps = (system_usage['timestamps'][1:]+system_usage['timestamps'][:-1])/2 - start_time
+	cpu_usages = {int(cpu): np.array(log_system_usage.calculate_cpu_usage_continuous(system_usage['stats']['cpus'][cpu])) for cpu in system_usage['stats']['cpus']}
+	tor_cpu_usages = {nick: np.sum([cpu_usages[cpu] for cpu in numa_data[nick][1]], axis=0) for nick in numa_data}
+	#
+	mask = system_usage_timestamps > 0
+	system_usage_timestamps = system_usage_timestamps[mask]
+	tor_cpu_usages = {nick: tor_cpu_usages[nick][mask] for nick in tor_cpu_usages}
+	#
+	for nick in sorted(fingerprints.keys()):
+		#if fingerprints[nick] != None:
+		# is not an OP
+		guard_circuit_count = 0
+		if fingerprints[nick] in guard_counter:
+			guard_circuit_count = guard_counter[fingerprints[nick]]
+		elif fingerprints[nick] is None:
+			guard_circuit_count = '?'
+		#
+		ax_cpu_usage.plot(system_usage_timestamps, tor_cpu_usages[nick]*100, label='{} / {:.6s} (guard for {} circuits)'.format(nick, str(fingerprints[nick]),
+		                                                                                                              guard_circuit_count))
+		#
+	#
+	ax_cpu_usage.set_xlabel('Time (s)')
+	ax_cpu_usage.set_ylabel('CPU Usage (%)')
+	ax_cpu_usage.legend()
+	ax_cpu_usage.grid(linestyle=':')
+	#
 	ax.set_xlabel('Time (s)')
 	ax.set_ylabel('Data (MiB)')
-	ax.set_title('Test')
+	ax.set_title(sys.argv[1])
+	ax.grid(linestyle=':')
 	fig.tight_layout(pad=0)
 	#ax.set_ylim(0, None)
 	#
 	fig.canvas.mpl_connect('resize_event', onresize)
-	fig.canvas.mpl_connect('scroll_event', onscroll)
-	fig.canvas.mpl_connect('pick_event', lambda event,lines=lines: onpick(event, lines))
-	fig.canvas.mpl_connect('button_press_event', lambda event,lines=lines: onclick(event, lines))
-	fig.canvas.mpl_connect('motion_notify_event', lambda event,pan_settings={'start_x':0,'start_y':0}: onmotion(event, pan_settings))
-	fig.canvas.mpl_connect('key_press_event', lambda event,lines=lines: onkeypress(event, lines))
-	fig.canvas.mpl_connect('key_release_event', lambda event,lines=lines: onkeyrelease(event, lines))
+	fig.canvas.mpl_connect('scroll_event', zoom_axes_cb)
+	fig.canvas.mpl_connect('pick_event', lambda event,lines=lines,axes=[ax]: pick_measureme_line_cb(event, lines, target_axes=axes))
+	fig.canvas.mpl_connect('button_press_event', lambda event,lines=lines,axes=[ax]: reset_measureme_lines_cb(event, lines, target_axes=axes))
+	fig.canvas.mpl_connect('motion_notify_event', lambda event,pan_settings={'start_x':0,'start_y':0}: mouse_pan_cb(event, pan_settings))
+	fig.canvas.mpl_connect('key_press_event', lambda event,lines=lines,axes=[ax]: subsample_plot_cb(event, lines, target_axes=axes))
+	fig.canvas.mpl_connect('key_release_event', lambda event,lines=lines,axes=[ax]: undo_subsampling_cb(event, lines, target_axes=axes))
 	#
 	plt.show(fig)
 #

+ 2 - 1
src/test_relay.py

@@ -331,7 +331,8 @@ if __name__ == '__main__':
 				control_port = clients[client_index]['control_port']
 				controller = controllers[client_index]['connection']
 				circuit_id = controllers[client_index]['circuit_ids'][stream_index]
-				measureme_id = stream_index*streams_per_client + client_index + 1
+				measureme_id = stream_index*streams_per_client + client_index + 1 # this is not calculated correctly, leaving as-is for now
+				raise Exception('The above line is broken (measureme_id not set properly)!!!')
 				#print('Data: {}, {}'.format(circuit_id, measureme_id))
 				#measureme_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id: connect_and_send_measureme(control_port, circuit_id, measureme_id, 2)   <---- need to change this to also send to hop 0
 				#measureme_cb = lambda controller=controller, circuit_id=circuit_id, measureme_id=measureme_id: send_all_measuremes(controller, circuit_id, measureme_id)