Browse Source

Improved experiment scripts and support multiple servers

Steven Engler 4 years ago
parent
commit
eb32c1ff1d

+ 3 - 3
src/accelerated_functions.c

@@ -52,7 +52,7 @@ int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
 	num_poll_fds++;
 	//
 	while(bytes_written < bytes_total){
-		int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
+		int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
 		//
 		if(rc < 0){
 			return -1;
@@ -119,7 +119,7 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 	num_poll_fds++;
 	//
 	while(bytes_read < bytes_total){
-		int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
+		int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
 		//
 		if(rc < 0){
 			printf("Here1\n");
@@ -229,7 +229,7 @@ static PyObject *py_pull_data(PyObject *self, PyObject *args){
 	double time_of_first_byte = 0;
 	double time_of_last_byte = 0;
 #ifdef USE_NEW_HISTORY
-	byte_delta deltas[20000] = {0};
+	byte_delta deltas[100000] = {0}; // 100000*16 B = ~1.5 MiB
 	size_t deltas_elements_needed = 0;
 #else
 	unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data

+ 24 - 6
src/basic_protocols.py

@@ -76,6 +76,13 @@ class Protocol():
 			#
 		#
 	#
+	def get_desc(self):
+		"""
+		This function can be overridden.
+		"""
+		#
+		return None
+	#
 #
 class FakeProxyProtocol(Protocol):
 	def __init__(self, socket, addr_port):
@@ -495,14 +502,25 @@ class ServerListener():
 		#
 		self.s = socket.socket()
 		self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+		self.s.setblocking(0)
 		self.s.bind(bind_endpoint)
-		self.s.listen(0)
+		self.s.listen(1000)
 	#
-	def accept(self):
-		newsock, endpoint = self.s.accept()
-		logging.debug("New client from %s:%d (fd=%d)",
-					  endpoint[0], endpoint[1], newsock.fileno())
-		self.callback(newsock)
+	def accept(self, block=True):
+		if block:
+			(readable, _, _) = select.select([self.s], [], [])
+		else:
+			readable = [self.s]
+		#
+		try:
+			(newsock, endpoint) = self.s.accept()
+			logging.debug("New client from %s:%d (fd=%d)",
+						  endpoint[0], endpoint[1], newsock.fileno())
+			self.callback(newsock)
+			return True
+		except BlockingIOError:
+			return False
+		#
 	#
 	def stop(self):
 		self.s.shutdown(socket.SHUT_RDWR)

+ 23 - 4
src/chutney_manager.py

@@ -4,6 +4,7 @@ import subprocess
 import logging
 import os
 import sys
+import time
 #
 def start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=None, verification_rounds=None):
 	args = [os.path.join(chutney_path, 'tools/test-network.sh'), '--chutney-path', chutney_path,
@@ -21,10 +22,24 @@ def start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=
 		raise
 	#
 #
-def stop_chutney_network(chutney_path, network_file):
+def stop_chutney_network(chutney_path, tor_path, network_file):
+	env = os.environ.copy()
+	if 'CHUTNEY_TOR' not in env:
+		env['CHUTNEY_TOR'] = os.path.join(tor_path, 'src/app/tor')
+	#
 	args = [os.path.join(chutney_path, 'chutney'), 'stop', network_file]
 	try:
-		subprocess.check_output(args, stderr=subprocess.STDOUT)
+		subprocess.check_output(args, stderr=subprocess.STDOUT, env=env)
+	except subprocess.CalledProcessError as e:
+		logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
+		raise
+	#
+	time.sleep(5)
+	# chutney crashes sometimes and the error message gets cut off, so maybe this will help
+	#
+	args = [os.path.join(chutney_path, 'chutney'), 'get_remote_files', network_file]
+	try:
+		subprocess.check_output(args, stderr=subprocess.STDOUT, env=env)
 	except subprocess.CalledProcessError as e:
 		logging.error('Chutney error:\n' + e.output.decode(sys.stdout.encoding))
 		raise
@@ -33,17 +48,21 @@ def stop_chutney_network(chutney_path, network_file):
 class ChutneyNetwork:
 	def __init__(self, chutney_path, tor_path, network_file, controlling_pid=None, verification_rounds=None):
 		self.chutney_path = chutney_path
+		self.tor_path = tor_path
 		self.network_file = network_file
 		#
 		try:
 			self.startup_output = start_chutney_network(chutney_path, tor_path, network_file, controlling_pid=controlling_pid, verification_rounds=verification_rounds)
 		except:
-			self.stop()
+			try:
+				self.stop()
+			except:
+				logging.exception('Could not stop the Chutney network')
 			raise
 		#
 	#
 	def stop(self):
-		stop_chutney_network(self.chutney_path, self.network_file)
+		stop_chutney_network(self.chutney_path, self.tor_path, self.network_file)
 	#
 	def __enter__(self):
 		return self

+ 43 - 0
src/data_helpers.py

@@ -0,0 +1,43 @@
+import numpy as np
+import gzip
+import pickle
+import json
+#
+def read_server_results(filename):
+	with gzip.GzipFile(filename, 'rb') as f:
+		results = pickle.load(f)
+		for x in range(len(results)):
+			results[x]['deltas']['bytes'] = np.array(results[x]['deltas']['bytes'])
+			results[x]['deltas']['timestamps'] = np.array(results[x]['deltas']['timestamps'])
+			results[x]['custom_data'] = json.loads(results[x]['custom_data'].decode('utf-8'))
+			if np.sum(results[x]['deltas']['bytes']) != results[x]['data_size']:
+				print('Note: missing some data')
+			#
+		#
+		return results
+	#
+#
+def read_client_info(filename):
+	with gzip.GzipFile(filename, 'rb') as f:
+		return pickle.load(f)
+	#
+#
+def read_relay_throughput(filename):
+	with open(filename, 'r') as f:
+		header = f.readline()
+		data_unprocessed = [[float(x) for x in line.split(',')] for line in f.readlines()]
+		data_unprocessed = np.array(data_unprocessed)
+	#
+	data = {}
+	data['timestamps'] = data_unprocessed[:,0]
+	data['threads'] = data_unprocessed[:,1:]
+	#
+	return (header, data)
+#
+def normalize_relay_throughput(data):
+	time_deltas = data['timestamps'][1:]-data['timestamps'][:-1]
+	normalized_data = {}
+	normalized_data['timestamps'] = np.copy(data['timestamps'][:-1])
+	normalized_data['threads'] = data['threads'][:-1,:]/(time_deltas[:,None])
+	return normalized_data
+#

+ 302 - 0
src/experiment-errors

@@ -0,0 +1,302 @@
+DEBUG:root:Starting client protocol (id: 55, desc: ('127.0.0.1', 9118) -> 16: ['test003r', 'test030target', 'test048e'])
+DEBUG:root:Starting client protocol (id: 5, desc: ('127.0.0.1', 9068) -> 17: ['test006r', 'test030target', 'test047e'])
+DEBUG:root:Starting client protocol (id: 7, desc: ('127.0.0.1', 9070) -> 18: ['test003r', 'test030target', 'test048e'])
+DEBUG:root:Starting client protocol (id: 15, desc: ('127.0.0.1', 9078) -> 17: ['test024r', 'test030target', 'test059e'])
+
+
+DEBUG:root:Circuit 17 (CONTROLLER, controller=('127.0.0.1', 8068)) CLOSED: FINISHED; None
+
+
+
+ERROR:root:Client protocol id: 55 had an error (07:23:07.419891)
+ERROR:root:Client protocol id: 5 had an error (07:24:56.158961)
+ERROR:root:Client protocol id: 7 had an error (07:24:56.194669)
+ERROR:root:Client protocol id: 15 had an error (07:24:57.395246)
+
+
+
+DEBUG:root:Stream 74 (None, controller=('127.0.0.1', 8119)) FAILED: TIMEOUT; None
+Starting client protocol
+Socket 2565 connected
+
+
+ERROR:root:Client protocol id: 55 had an error (07:23:07.419891)
+
+
+
+Feb 12 07:23:07.184 [info] connection_edge_process_relay_cell_not_open(): 'connected' received for circid 2355205080 streamid 34331 after 8 seconds.
+Feb 12 07:23:07.184 [info] exit circ (length 3): $5AFB76E7105B036D4A5297712ED84F2B4D1D7AF0(open) $C0DE6D7DDCDB12B59FF33E123BA7AB3CA58D7CEA(open) $3A73C6B8BDC470EFBC0BF55D56B59001006E1483(open)
+Feb 12 07:23:07.416 [info] handle_relay_cell_command(): 34: end cell (connection reset) for stream 34331. Removing stream.
+
+
+Feb 12 07:23:40.710 [info] connection_edge_process_relay_cell_not_open(): 'connected' received for circid 2818801185 streamid 24523 after 3 seconds.
+Feb 12 07:23:40.710 [info] exit circ (length 3): $E4E39348FCB5CC9494CFA3FBD6386630B64662A6(open) $C0DE6D7DDCDB12B59FF33E123BA7AB3CA58D7CEA(open) $335338CD4C5E48852859E29D8371FACB4A2155DE(open)
+Feb 12 07:25:27.298 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 16 to 172.19.156.16:5021 (9AB7F01CA3AA2749B2115C49B5CE4596B9169E1B) after 6060 ms. Delta 3ms
+Feb 12 07:25:27.395 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 13 to 172.19.156.16:5009 (9B20E5578F75081B63326B7C8A762DBE081C8256) after 5716 ms. Delta 1ms
+Feb 12 07:25:28.742 [info] handle_relay_cell_command(): 37: end cell (connection reset) for stream 24523. Removing stream.
+Feb 12 07:25:29.558 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 8 to 172.19.156.16:5027 (03813EFF1620E9F45CAB94808011FA966C2A8869) after 6872 ms. Delta 1ms
+Feb 12 07:25:29.940 [info] connection_handle_listener_read(): New SOCKS connection opened from 127.0.0.1
+
+
+
+ERROR:root:Client protocol id: 514 had an error (07:25:28.743687)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 245, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/throughput_protocols.py", line 30, in _run_iteration
+    if self.sub_protocol.run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 434, in _run_iteration
+    if self.protocol_helper.recv(self.socket, response_size):
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 39, in recv
+    raise ProtocolException('The socket was closed.')
+basic_protocols.ProtocolException: The socket was closed.
+WARNING:root:Client protocol with error successfully added self to global queue
+
+ERROR:root:Client protocol id: 55 had an error (07:23:07.419891)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 245, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/throughput_protocols.py", line 30, in _run_iteration
+    if self.sub_protocol.run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 434, in _run_iteration
+    if self.protocol_helper.recv(self.socket, response_size):
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 39, in recv
+    raise ProtocolException('The socket was closed.')
+basic_protocols.ProtocolException: The socket was closed.
+WARNING:root:Client protocol with error successfully added self to global queue
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+DEBUG:root:Starting client protocol (id: 1379, desc: ('127.0.0.1', 9092) -> 26: ['test002r', 'test030target', 'test043e'])
+
+ERROR:root:Client protocol id: 1379 had an error (07:27:50.439308)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 245, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+Feb 12 07:27:50.386 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 12 07:27:50.414 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 12 07:27:50.414 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 12 07:27:50.438 [notice] Tried for 120 seconds to get a connection to 127.0.0.1:12353. Giving up. (waiting for controller)
+Feb 12 07:27:50.440 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 12 07:27:50.440 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 12 07:27:50.466 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+
+
+
+
+
+
+DEBUG:root:Starting client protocol (id: 545, desc: ('127.0.0.1', 9158) -> 20: ['test000a', 'test030target', 'test054e'])
+
+ERROR:root:Client protocol id: 545 had an error (07:25:41.147103)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 245, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+Feb 12 07:25:38.260 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 9 to 172.19.156.16:5017 (A1986143EF2C1F7EF93742DDBB0AD5119EC33156) after 5924 ms. Delta 0ms
+Feb 12 07:25:38.755 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 5 to 172.19.156.16:5009 (9B20E5578F75081B63326B7C8A762DBE081C8256) after 6728 ms. Delta 3ms
+Feb 12 07:25:41.146 [notice] Tried for 120 seconds to get a connection to 127.0.0.1:12353. Giving up. (waiting for controller)
+Feb 12 07:25:41.666 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 8 to 172.19.156.16:5006 (296F6955D60EBE2EEF910CED0C4F1F00D178951D) after 6848 ms. Delta 3ms
+Feb 12 07:25:41.846 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 12 to 172.19.156.16:5010 (EB9D945D3E554868A8E5F15B375D129D20449E77) after 6028 ms. Delta 4ms
+Feb 12 07:25:43.166 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 11 to 172.19.156.16:5000 (CC455FEC9679E29B17762238281AB07727FEBF47) after 6412 ms. Delta 3ms
+
+
+
+
+
+See: connection_ap_expire_beginning()
+
+
+
+
+
+
+Feb 12 22:28:22.429 [info] connection_handle_listener_read(): New SOCKS connection opened from 127.0.0.1.
+Feb 12 22:28:22.638 [info] connection_edge_process_inbuf(): data from edge while in 'waiting for controller' state. Leaving it on buffer.
+Feb 12 22:28:22.658 [info] rep_hist_note_used_port(): New port prediction added. Will continue predictive circ building for 3394 more seconds.
+Feb 12 22:28:22.658 [info] link_apconn_to_circ(): Looks like completed circuit to hidden service doesn't allow optimistic data for connection to 127.0.0.1
+Feb 12 22:28:22.658 [info] connection_ap_handshake_send_begin(): Sending relay cell 0 on circ 4294765208 to begin stream 5930.
+Feb 12 22:28:22.658 [info] connection_ap_handshake_send_begin(): Address/port sent, ap socket 40, n_circ_id 4294765208
+
+
+Feb 12 22:28:51.633 [info] circuit_mark_for_close_(): Circuit 4294765208 (id: 24) marked for close at src/core/or/circuituse.c:1507 (orig reason: 9, new reason: 0)
+
+DEBUG:root:Starting client protocol (id: 844, desc: ('127.0.0.1', 9157) -> 24: ['test028r', 'test030target', 'test036e'])
+
+ERROR:root:Client protocol id: 844 had an error (22:30:22.999643)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 246, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+
+
+
+
+DEBUG:root:Starting client protocol (id: 1245, desc: ('127.0.0.1', 9108) -> 24: ['test014r', 'test030target', 'test045e'])
+
+ERROR:root:Client protocol id: 1245 had an error (23:19:45.882900)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 246, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+Feb 12 23:17:45.684 [info] connection_ap_handshake_send_begin(): Sending relay cell 0 on circ 2533565025 to begin stream 56942.
+Feb 12 23:17:45.684 [info] connection_ap_handshake_send_begin(): Address/port sent, ap socket 15, n_circ_id 2533565025
+
+Feb 12 23:17:55.194 [info] update_consensus_networkstatus_downloads(): Launching microdesc standard networkstatus consensus download.
+Feb 12 23:17:55.194 [info] select_primary_guard_for_circuit(): Selected primary guard test006r ($C296AF35EAD984D5419333340E1DE30AE71D8E48) for circuit.
+Feb 12 23:17:55.194 [info] connection_ap_make_link(): Making internal direct tunnel to 172.19.156.16:5006 ...
+Feb 12 23:17:55.194 [info] connection_ap_make_link(): ... application connection created and linked.
+Feb 12 23:17:55.194 [info] directory_send_command(): Downloading consensus from 172.19.156.16:5006 using /tor/status-vote/current/consensus-microdesc/58BF94+BAF767.z
+Feb 12 23:17:55.194 [warn] connection_ap_expire_beginning(): Bug: circuit->purpose == CIRCUIT_PURPOSE_C_GENERAL failed. The purpose on the circuit was Circuit made by controller; it was in state open, path_state new. (on Tor 0.4.2.6 971a6beff5a53434)
+Feb 12 23:17:55.194 [info] connection_ap_expire_beginning(): We tried for 10 seconds to connect to '127.0.0.1' using exit $514B92EF502BA1FD644BBDEEDF35E2CC8F2EF5AA~test045e at 172.19.156.16. Retrying on a new circuit
+
+Feb 12 23:18:18.285 [info] circuit_expire_old_circuits_clientside(): Closing n_circ_id 2533565025 (dirty 633 sec ago, purpose 22)
+Feb 12 23:18:18.286 [info] circuit_mark_for_close_(): Circuit 2533565025 (id: 24) marked for close at src/core/or/circuituse.c:1507 (orig reason: 9, new reason: 0)
+
+Feb 12 23:19:45.881 [notice] Tried for 120 seconds to get a connection to 127.0.0.1:12353. Giving up. (waiting for controller)
+
+
+
+
+
+
+DEBUG:root:Starting client protocol (id: 336, desc: ('127.0.0.1', 9099) -> 18: ['test002r', 'test030target', 'test058e'])
+
+ERROR:root:Client protocol id: 336 had an error (01:05:33.670825)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 246, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+Feb 13 01:03:23.054 [info] connection_handle_listener_read(): New SOCKS connection opened from 127.0.0.1.
+Feb 13 01:03:23.132 [info] connection_edge_process_inbuf(): data from edge while in 'waiting for controller' state. Leaving it on buffer.
+Feb 13 01:03:23.146 [info] channelpadding_send_padding_cell_for_callback(): Sending netflow keepalive on 8 to 172.19.156.16:5015 (0ADE7252BDDA535282B306B0A82CBCC9FEDE2C50) after 5108 ms. Delta 2ms
+Feb 13 01:03:23.152 [info] rep_hist_note_used_port(): New port prediction added. Will continue predictive circ building for 1998 more seconds.
+Feb 13 01:03:23.152 [info] link_apconn_to_circ(): Looks like completed circuit to hidden service doesn't allow optimistic data for connection to 127.0.0.1
+Feb 13 01:03:23.152 [info] connection_ap_handshake_send_begin(): Sending relay cell 0 on circ 3126016833 to begin stream 24424.
+Feb 13 01:03:23.152 [info] connection_ap_handshake_send_begin(): Address/port sent, ap socket 36, n_circ_id 3126016833
+
+Feb 13 01:05:32.666 [info] connection_ap_expire_beginning(): Controller circuit has tried for 129 seconds to connect to '127.0.0.1' using exit $56D109A57085FEAF4B7EBC789C30E13EECEDBCC8~test058e at 17
+2.19.156.16. But not giving up!
+Feb 13 01:05:33.478 [info] connection_ap_process_end_not_open(): Address '127.0.0.1' refused due to 'misc error'. Considering retrying.
+Feb 13 01:05:33.478 [info] client_dns_incr_failures(): Address 127.0.0.1 now has 1 resolve failures.
+Feb 13 01:05:33.478 [info] exit circ (length 3): $610EC312C40076330EBB4C72522995C03D8C442A(open) $B8F98F5966E41AAB5DCDEED7ECEF637644ACF11C(open) $56D109A57085FEAF4B7EBC789C30E13EECEDBCC8(open)
+Feb 13 01:05:33.670 [notice] Tried for 130 seconds to get a connection to 127.0.0.1:12353. Giving up. (waiting for controller)
+
+Feb 13 01:06:01.782 [info] circuit_expire_old_circuits_clientside(): Closing n_circ_id 3126016833 (dirty 758 sec ago, purpose 22)
+Feb 13 01:06:01.782 [info] circuit_mark_for_close_(): Circuit 3126016833 (id: 18) marked for close at src/core/or/circuituse.c:1507 (orig reason: 9, new reason: 0)
+
+
+
+
+
+
+DEBUG:root:Starting client protocol (id: 1084, desc: ('127.0.0.1', 9097) -> 23: ['test021r', 'test030target', 'test045e'])
+
+ERROR:root:Client protocol id: 1084 had an error (22:13:40.143767)
+Traceback (most recent call last):
+  File "/home/sengler/code/working/tor-benchmarking/dev/experiment_client.py", line 257, in _run_client
+    protocol.run()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 130, in _run_iteration
+    if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 71, in run
+    finished = self._run_iteration()
+  File "/home/sengler/code/working/tor-benchmarking/dev/basic_protocols.py", line 172, in _run_iteration
+    raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+basic_protocols.ProtocolException: Could not connect to SOCKS proxy, msg: 5b
+
+
+Feb 15 22:11:40.074 [info] connection_handle_listener_read(): New SOCKS connection opened from 127.0.0.1.
+Feb 15 22:11:40.288 [info] connection_edge_process_inbuf(): data from edge while in 'waiting for controller' state. Leaving it on buffer.
+Feb 15 22:11:40.308 [info] rep_hist_note_used_port(): New port prediction added. Will continue predictive circ building for 3043 more seconds.
+Feb 15 22:11:40.308 [info] link_apconn_to_circ(): Looks like completed circuit to hidden service doesn't allow optimistic data for connection to 127.0.0.1
+Feb 15 22:11:40.308 [info] connection_ap_handshake_send_begin(): Sending relay cell 0 on circ 3202562382 to begin stream 29645.
+Feb 15 22:11:40.308 [info] connection_ap_handshake_send_begin(): Address/port sent, ap socket 4, n_circ_id 3202562382
+
+Feb 15 22:12:16.809 [info] circuit_expire_old_circuits_clientside(): Closing n_circ_id 3202562382 (dirty 636 sec ago, purpose 22)
+Feb 15 22:12:16.809 [info] circuit_mark_for_close_(): Circuit 3202562382 (id: 23) marked for close at src/core/or/circuituse.c:1507 (orig reason: 9, new reason: 0)
+Feb 15 22:12:16.810 [info] circuit_free_(): Circuit 0 (id: 23) has been freed.
+
+Feb 15 22:13:40.141 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 15 22:13:40.141 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 15 22:13:40.142 [notice] Tried for 120 seconds to get a connection to 127.0.0.1:12353. Giving up. (waiting for controller)
+Feb 15 22:13:40.147 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.
+Feb 15 22:13:40.147 [info] connection_edge_package_raw_inbuf(): called with package_window 0. Skipping.

+ 37 - 11
src/experiment.py

@@ -132,7 +132,11 @@ class Experiment:
 					except KeyboardInterrupt:
 						raise
 					except:
-						logging.exception('The Chutney network failed to start (attempt {})'.format(num_attempts))
+						logging.exception('The Chutney network failed to start (attempt {}). Trying again...'.format(num_attempts))
+						if num_attempts > 4:
+							logging.exception('Just kidding, we\'ve tried too many times.')
+							raise
+						#
 					#
 				#
 				num_lines_to_print = 200
@@ -191,6 +195,7 @@ class Experiment:
 		results = [x['results'] for x in server.results]
 		#
 		if self.save_data_path is not None:
+			logging.info('Starting to save server results...')
 			with gzip.GzipFile(os.path.join(self.save_data_path, 'server_results.pickle.gz'), 'wb') as f:
 				pickle.dump(results, f, protocol=4)
 			#
@@ -242,6 +247,9 @@ class Experiment:
 		controllers = []
 		protocol_manager = experiment_client.ExperimentProtocolManager()
 		#
+		client_info = {}
+		client_info['clients'] = []
+		#
 		circuit_counter = 0
 		try:
 			for proxy_address in proxy_addresses:
@@ -254,39 +262,57 @@ class Experiment:
 					# make a circuit for each stream
 					controller.build_circuit(circuit_generator, circuit_counter)
 					circuit_counter += 1
-					time.sleep(0.5)
+					#time.sleep(0.05)
 				#
 				controllers.append(controller)
 			#
 			start_event = multiprocessing.Event()
 			#
-			used_measureme_ids = set()
+			#used_measureme_ids = set()
+			measureme_id_counter = 1
 			for stream_index in range(self.num_streams_per_client):
 				for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
-					if self.measureme:
-						measureme_id = stream_index*len(controllers) + controller_index + 1
-						assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
-						used_measureme_ids |= set([measureme_id])
-					else:
-						measureme_id = None
+					#if self.measureme:
+					#	measureme_id = stream_index*len(controllers) + controller_index + 1
+					#	assert len(set([measureme_id]) & used_measureme_ids) == 0, 'Sanity check: Attempting to use a previously-used measureme_id'
+					#	used_measureme_ids |= set([measureme_id])
+					#else:
+					#	measureme_id = None
+					measureme_id = measureme_id_counter
+					measureme_id_counter += 1
 					#
 					wait_duration = random.randint(0, self.wait_range)
 					protocol = experiment_client.build_client_protocol(self.server_address, proxy_address['socks'],
-													 proxy_address['control'], controller, start_event,
+													 proxy_address['control'], controller, start_event, self.measureme,
 													 wait_duration=wait_duration, measureme_id=measureme_id,
 													 num_bytes=self.num_bytes, buffer_len=self.buffer_len)
 					protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
+					#
+					client_info['clients'].append({'measureme_id':measureme_id, 'wait_duration':wait_duration})
 				#
 			#
 			time.sleep(2)
 			start_event.set()
+			client_info['start_time'] = time.time()
 			#
-			protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
+			# unfortunately mixing threads and processes can cause python to deadlock, and some client protocols
+			# have been found to deadlock at about 1/1000 probability, so we must provide a timeout to kill
+			# these deadlocked protocol processes
+			# see: https://codewithoutrules.com/2018/09/04/python-multiprocessing/
+			protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)), kill_timeout=20*60)
+			logging.debug('Client protocols have finished')
 		finally:
 			for controller in controllers:
 				controller.disconnect()
 			#
+			logging.debug('Protocol manager stopping...')
 			protocol_manager.stop()
+			logging.debug('Protocol manager has finished')
+		#
+		if self.save_data_path is not None:
+			with gzip.GzipFile(os.path.join(self.save_data_path, 'client_info.pickle.gz'), 'wb') as f:
+				pickle.dump(client_info, f, protocol=4)
+			#
 		#
 	#
 #

+ 176 - 48
src/experiment_client.py

@@ -86,8 +86,11 @@ class ExperimentController:
 		self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1])
 		self.connection.authenticate()
 		#
-		self.connection.add_event_listener(self._attach_stream, stem.control.EventType.STREAM)
+		self.connection.add_event_listener(self.stream_event, stem.control.EventType.STREAM)
+		self.connection.add_event_listener(self.circuit_event, stem.control.EventType.CIRC)
 		self.connection.set_conf('__LeaveStreamsUnattached', '1')
+		#self.connection.set_conf('__DisablePredictedCircuits', '1')
+		# we still need to generate circuits for things like directory fetches
 	#
 	def disconnect(self):
 		#if len(self.unused_circuit_ids) > 0:
@@ -104,53 +107,80 @@ class ExperimentController:
 		self.assigned_streams[from_address] = circuit_id
 		return circuit_id
 	#
-	def _attach_stream(self, stream):
+	def stream_event(self, stream):
 		try:
 			if stream.status == 'NEW':
 				# by default, let tor handle new streams
 				circuit_id = 0
 				#
 				if stream.purpose == 'USER':
+					# NOTE: we used to try to attach all streams (including non-user streams,
+					# which we attached to circuit 0, but Stem was found to hang sometimes
+					# when attaching DIR_FETCH streams, so now we only attach user streams
+					# and let tor take care of other streams
+					#
 					# this is probably one of our streams (although not guaranteed)
 					circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)]
-				#
-				try:
-					self.connection.attach_stream(stream.id, circuit_id)
-					#logging.debug('Attaching to circuit {}'.format(circuit_id))
-				except stem.InvalidRequest:
-					if stream.purpose != 'USER':
-						# could not attach a non-user stream, ignoring
-						pass
-					else:
-						raise
 					#
-				except stem.UnsatisfiableRequest:
-					if stream.purpose != 'USER':
-						# could not attach a non-user stream, so probably raised:
-						# stem.UnsatisfiableRequest: Connection is not managed by controller.
-						# therefore we should ignore this exception
-						pass
-					else:
-						raise
+					try:
+						logging.debug('Attaching to circuit {}'.format(circuit_id))
+						self.connection.attach_stream(stream.id, circuit_id)
+						logging.debug('Attached to circuit {}'.format(circuit_id))
+					except stem.InvalidRequest:
+						if stream.purpose != 'USER':
+							# could not attach a non-user stream, ignoring
+							pass
+						else:
+							raise
+						#
+					except stem.UnsatisfiableRequest:
+						if stream.purpose != 'USER':
+							# could not attach a non-user stream, so probably raised:
+							# stem.UnsatisfiableRequest: Connection is not managed by controller.
+							# therefore we should ignore this exception
+							pass
+						else:
+							raise
+						#
+					except stem.SocketClosed:
+						logging.debug('Stream {} ({}, controller={}) {}: socket closed while attaching'.format(stream.id,
+									  stream.purpose, self.control_address, stream.status))
+						raise 
 					#
 				#
 			#
+			if stream.status == 'DETACHED' or stream.status == 'FAILED':
+				logging.debug('Stream {} ({}, controller={}) {}: {}; {}'.format(stream.id, stream.purpose, self.control_address,
+				              stream.status, stream.reason, stream.remote_reason))
+			#
 		except:
 			logging.exception('Error while attaching the stream.')
 			raise
 		#
 	#
+	def circuit_event(self, circuit):
+		if circuit.purpose == 'CONTROLLER' and (circuit.status == 'FAILED' or circuit.status == 'CLOSED'):
+			logging.debug('Circuit {} ({}, controller={}) {}: {}; {}'.format(circuit.id, circuit.purpose, self.control_address,
+			              circuit.status, circuit.reason, circuit.remote_reason))
+		#
+	#
 	def build_circuit(self, circuit_generator, gen_id):
 		circuit_id = None
+		tries_remaining = 5
 		#
-		while circuit_id is None:
+		while circuit_id is None and tries_remaining > 0:
 			try:
 				circuit = circuit_generator(gen_id)
-				circuit_id = self.connection.new_circuit(circuit, await_build=True)
-				logging.debug('New circuit (id={}, controller={}): {}'.format(circuit_id, self.control_address, circuit))
+				tries_remaining -= 1
+				circuit_id = self.connection.new_circuit(circuit, await_build=True, purpose='controller', timeout=10)
+				logging.debug('New circuit (circ_id={}, controller={}): {}'.format(circuit_id, self.control_address, circuit))
 			except stem.CircuitExtensionFailed as e:
 				wait_seconds = 1
 				logging.debug('Failed circuit: {}'.format(circuit))
+				if tries_remaining == 0:
+					logging.warning('Tried too many times')
+					raise
+				#
 				logging.warning('Circuit creation failed (CircuitExtensionFailed: {}). Retrying in {} second{}...'.format(str(e),
 				                                                                                                          wait_seconds,
 				                                                                                                          's' if wait_seconds != 1 else ''))
@@ -158,10 +188,25 @@ class ExperimentController:
 			except stem.InvalidRequest as e:
 				wait_seconds = 15
 				logging.debug('Failed circuit: {}'.format(circuit))
+				if tries_remaining == 0:
+					logging.warning('Tried too many times')
+					raise
+				#
 				logging.warning('Circuit creation failed (InvalidRequest: {}). Retrying in {} second{}...'.format(str(e),
 				                                                                                                  wait_seconds,
 				                                                                                                  's' if wait_seconds != 1 else ''))
 				time.sleep(wait_seconds)
+			except stem.Timeout as e:
+				wait_seconds = 5
+				logging.debug('Failed circuit: {}'.format(circuit))
+				if tries_remaining == 0:
+					logging.warning('Tried too many times')
+					raise
+				#
+				logging.warning('Circuit creation timed out (Timeout: {}). Retrying in {} second{}...'.format(str(e),
+				                                                                                                  wait_seconds,
+				                                                                                                  's' if wait_seconds != 1 else ''))
+				time.sleep(wait_seconds)
 			#
 		#
 		self.unassigned_circuit_ids.append(circuit_id)
@@ -169,10 +214,13 @@ class ExperimentController:
 	#
 #
 class ExperimentProtocol(basic_protocols.ChainedProtocol):
-	def __init__(self, socket, endpoint, num_bytes, custom_data=None, send_buffer_len=None, push_start_cb=None):
+	def __init__(self, socket, endpoint, num_bytes, circuit_info, custom_data=None, send_buffer_len=None, push_start_cb=None):
 		proxy_username = bytes([z for z in os.urandom(12) if z != 0])
 		proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username)
 		#
+		self.proxy_info = socket.getpeername()
+		self.circuit_info = circuit_info
+		#
 		throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes,
 		                                                          custom_data=custom_data,
 		                                                          send_buffer_len=send_buffer_len,
@@ -181,6 +229,13 @@ class ExperimentProtocol(basic_protocols.ChainedProtocol):
 		#
 		super().__init__([proxy_protocol, throughput_protocol])
 	#
+	def get_desc(self):
+		super_desc = super().get_desc()
+		if super_desc is not None:
+			return '{} -> {} - {}'.format(self.proxy_info, self.circuit_info, super_desc)
+		else:
+			return '{} -> {}'.format(self.proxy_info, self.circuit_info)
+	#
 #
 class ExperimentProtocolManager():
 	def __init__(self):
@@ -188,6 +243,7 @@ class ExperimentProtocolManager():
 		self.process_counter = 0
 		self.used_ids = []
 		self.running_processes = {}
+		self.checked_in = multiprocessing.Manager().dict()
 		self.global_finished_process_queue = multiprocessing.Queue()
 		self.local_finished_process_queue = queue.Queue()
 		self.queue_getter = useful.QueueGetter(self.global_finished_process_queue,
@@ -196,7 +252,8 @@ class ExperimentProtocolManager():
 	def _run_client(self, protocol, protocol_id):
 		had_error = False
 		try:
-			logging.debug('Starting client protocol (id: {})'.format(protocol_id))
+			logging.debug('Starting client protocol (id: {}, desc: {})'.format(protocol_id, protocol.get_desc()))
+			self.checked_in[protocol_id] = True
 			protocol.run()
 			logging.debug('Done client protocol (id: {})'.format(protocol_id))
 		except KeyboardInterrupt:
@@ -220,8 +277,11 @@ class ExperimentProtocolManager():
 		assert not self.stopped
 		assert protocol_id not in self.used_ids, 'Protocol ID already used'
 		#
+		#logging.debug('Launching client protocol (id: {})'.format(protocol_id))
 		p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id))
 		self.running_processes[protocol_id] = p
+		self.checked_in[protocol_id] = False
+		# because of Python multiprocessing bugs, the process may deadlock when it starts
 		self.used_ids.append(protocol_id)
 		#
 		p.start()
@@ -229,46 +289,110 @@ class ExperimentProtocolManager():
 		#
 		#protocol.socket.close()
 	#
-	def wait(self, finished_protocol_cb=None, kill_timeout=None):
-		timed_out = False
+	def _get_not_checked_in(self):
+		temp = self.checked_in.copy()
+		return [x for x in temp if not temp[x]]
+	#
+	#def _count_checked_in(self):
+	#	temp = self.checked_in.copy()
+	#	only_checked_in = [True for x in temp if temp is True]
+	#	return (len(only_checked_in), len(self.checked_in))
+	#
+	def _get_dead_processes(self):
+		dead_processes = []
+		for (protocol_id, p) in self.running_processes.items():
+			if not p.is_alive():
+				dead_processes.append((protocol_id, p))
+			#
+		#
+		return dead_processes
+	#
+	def _cleanup_process(self, p, protocol_id, had_error, finished_protocol_cb):
+		p.join()
+		self.running_processes.pop(protocol_id)
+		if finished_protocol_cb is not None:
+			finished_protocol_cb(protocol_id, had_error)
+		#
+	#
+	def _wait(self, timeout=None, finished_protocol_cb=None):
+		return_on_timeout = True if timeout is not None else False
+		timeout = timeout if timeout is not None else 10
+		last_waiting_message = None
 		#
 		while len(self.running_processes) > 0:
-			logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes)))
+			dead_processes = self._get_dead_processes()
 			#
-			if not timed_out:
+			while len(self.running_processes) > 0:
+				#checked_in_count = self._count_checked_in()
+				#not_checked_in = checked_in_count[1]-checked_in_count[0]
+				not_checked_in = self._get_not_checked_in()
+				#
+				if last_waiting_message is None or last_waiting_message != len(self.running_processes):
+					logging.debug('Waiting for processes ({} left, {} not checked in)'.format(len(self.running_processes),
+					                                                                          len(not_checked_in)))
+					last_waiting_message = len(self.running_processes)
+				#
+				if len(self.running_processes) <= len(not_checked_in):
+					running_not_checked_in = [protocol_id for protocol_id in self.running_processes if protocol_id in not_checked_in]
+					if len(self.running_processes) == len(running_not_checked_in):
+						logging.debug('The remaining processes have not checked in, so stopping the wait')
+						return
+					#
+				#
 				try:
-					(protocol_id, had_error) = self.local_finished_process_queue.get(timeout=kill_timeout)
+					(protocol_id, had_error) = self.local_finished_process_queue.get(timeout=timeout)
 					p = self.running_processes[protocol_id]
+					self._cleanup_process(p, protocol_id, had_error, finished_protocol_cb)
+					if (protocol_id, p) in dead_processes:
+						dead_processes.remove((protocol_id, p))
+					#
+					logging.debug('Completed protocol (id: {}, checked_in={})'.format(protocol_id,
+					                                                                  self.checked_in[protocol_id]))
 				except queue.Empty:
-					if kill_timeout is None:
-						raise
+					if return_on_timeout:
+						return
+					else:
+						break
+					#
+					#if kill_timeout is not None:
+					#	logging.warning('Timed out waiting for processes to finish, will terminate remaining processes')
+					#	kill_remaining = True
 					#
-					logging.warning('Timed out waiting for processes to finish, will terminate remaining processes')
-					timed_out = True
 				#
 			#
-			if timed_out:
-				(protocol_id, p) = next(iter(self.running_processes.items()))
-				# just get any process and kill it
-				had_error = True
-				p.terminate()
-				logging.debug('Terminated protocol {}'.format(protocol_id))
+			for (protocol_id, p) in dead_processes:
+				# these processes were dead but didn't add themselves to the finished queue
+				logging.debug('Found a dead process (id: {})'.format(protocol_id))
+				self._cleanup_process(p, protocol_id, True, finished_protocol_cb)
 			#
-			p.join()
-			self.running_processes.pop(protocol_id)
-			if finished_protocol_cb is not None:
-				finished_protocol_cb(protocol_id, had_error)
+		#
+	#
+	def wait(self, finished_protocol_cb=None, kill_timeout=None):
+		self._wait(kill_timeout, finished_protocol_cb)
+		#
+		if len(self.running_processes) > 0:
+			logging.warning('Timed out ({} seconds) waiting for processes to finish, will terminate remaining processes'.format(kill_timeout))
+		#
+		while len(self.running_processes) > 0:
+			(protocol_id, p) = next(iter(self.running_processes.items()))
+			# just get any process and kill it
+			was_alive = p.is_alive()
+			p.terminate()
+			logging.debug('Terminated protocol (id: {}, was_dead={}, checked_in={})'.format(protocol_id,
+			                                                                                (not was_alive),
+			                                                                                self.checked_in[protocol_id]))
 			#
+			self._cleanup_process(p, protocol_id, True, finished_protocol_cb)
 		#
 	#
 	def stop(self):
 		self.wait(kill_timeout=1.5)
 		self.queue_getter.stop()
-		self.queue_getter.join()
+		self.queue_getter.join(timeout=10)
 		self.stopped = True
 	#
 #
-def build_client_protocol(endpoint, socks_address, control_address, controller, start_event, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None):
+def build_client_protocol(endpoint, socks_address, control_address, controller, start_event, send_measureme, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None):
 	client_socket = socket.socket()
 	#
 	logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address)
@@ -282,7 +406,9 @@ def build_client_protocol(endpoint, socks_address, control_address, controller,
 	#
 	if measureme_id is not None:
 		custom_data['measureme_id'] = measureme_id
-		#
+	#
+	if send_measureme:
+		assert measureme_id != None
 		hops = list(range(len(controller.circuits[circuit_id])+1))[::-1]
 		# send the measureme cells to the last relay first
 		start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \
@@ -293,6 +419,7 @@ def build_client_protocol(endpoint, socks_address, control_address, controller,
 	#
 	custom_data = json.dumps(custom_data).encode('utf-8')
 	protocol = ExperimentProtocol(client_socket, endpoint, num_bytes,
+	                              '{}: {}'.format(circuit_id, controller.circuits[circuit_id]),
 								  custom_data=custom_data,
 								  send_buffer_len=buffer_len,
 								  push_start_cb=start_cb)
@@ -369,7 +496,8 @@ if __name__ == '__main__':
 					measureme_id = None
 				#
 				wait_duration = random.randint(0, args.wait_range)
-				protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller, start_event,
+				protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'],
+				                                 controller, start_event, args.measureme,
 				                                 wait_duration=wait_duration, measureme_id=measureme_id,
 				                                 num_bytes=args.num_bytes, buffer_len=args.buffer_len)
 				protocol_manager.start_experiment_protocol(protocol, protocol_id=None)

+ 21 - 2
src/log_system_usage.py

@@ -2,6 +2,8 @@
 #
 import time
 import threading
+import subprocess
+import sys
 import pickle
 import gzip
 #
@@ -63,6 +65,17 @@ def calculate_cpu_usage_continuous(stats):
 	#
 	return cpu_usages
 #
+def get_running_processes():
+	lines = subprocess.check_output(['ps', '-a', '-x', '-o', 'pid,state,args', '--no-headers']).decode('utf-8').split('\n')
+	lines = [line.strip() for line in lines]
+	lines = [line.split(' ', 2) for line in lines if len(line) != 0]
+	#
+	data = []
+	for line in lines:
+		data.append({'pid':int(line[0]), 'state':line[1], 'args':line[2]})
+	#
+	return data
+#
 def log_cpu_stats(path, interval, stop_event):
 	"""
 	Log the cpu stats to a gz compressed JSON file. Storing JSON
@@ -75,7 +88,7 @@ def log_cpu_stats(path, interval, stop_event):
 	stop_event: a threading.Event which stops the function
 	"""
 	#
-	stats = {'timestamps':[], 'stats':{'system':[], 'cpus':{}}}
+	stats = {'timestamps':[], 'stats':{'system':[], 'cpus':{}}, 'processes':[]}
 	while not stop_event.is_set():
 		stats['timestamps'].append(time.time())
 		#stats['stats'].append(get_cpu_stats())
@@ -87,6 +100,7 @@ def log_cpu_stats(path, interval, stop_event):
 			#
 			stats['stats']['cpus'][cpu].append(current_stats['cpus'][cpu])
 		#
+		stats['processes'].append(get_running_processes())
 		stop_event.wait(interval)
 	#
 	with gzip.GzipFile(path, 'wb') as f:
@@ -130,7 +144,12 @@ def log_cpu_stats(path, interval, stop_event):
 '''
 if __name__ == '__main__':
 	stop_event = threading.Event()
-	t = threading.Thread(target=log_cpu_stats, args=('/tmp/cpu_stats.pickle.gz', 0.5, stop_event))
+	#
+	assert len(sys.argv) == 3
+	interval = float(sys.argv[1])
+	file_name = sys.argv[2]
+	#
+	t = threading.Thread(target=log_cpu_stats, args=(file_name, interval, stop_event))
 	t.start()
 	#
 	try:

+ 21 - 0
src/old-network-settings

@@ -0,0 +1,21 @@
+                    if host == 'cluck2':
+                        num_clients = 150
+                        num_guards = 28 # number of relays (including guards)
+                        num_authorities = 2 # will also act as a relay or guard
+                        num_exits = 32 # will be used only as an exit
+                        num_streams_per_client = 10
+                        num_bytes = 20*(2**20)
+                    elif host == 'sengler-rpi':
+                        num_clients = 24
+                        num_guards = 28 # number of relays (including guards)
+                        num_authorities = 2 # will also act as a relay or guard
+                        num_exits = 32 # will be used only as an exit
+                        num_streams_per_client = 8
+                        num_bytes = 10*(2**20)
+                    elif host is None:
+                        num_clients = 10
+                        num_guards = 10 # number of relays (including guards)
+                        num_authorities = 2 # will also act as a relay or guard
+                        num_exits = 12 # will be used only as an exit
+                        num_streams_per_client = 5
+                        num_bytes = 20*(2**20)

+ 162 - 0
src/plot_combined.py

@@ -0,0 +1,162 @@
+import sys
+import os
+import time
+#
+import numpy as np
+import matplotlib.pylab as plt
+#
+import data_helpers
+import log_system_usage
+#
+if __name__ == '__main__':
+	initial_time = time.time()
+	#
+	title = sys.argv[1]
+	(_, throughput_data) = data_helpers.read_relay_throughput(sys.argv[2])
+	client_data = data_helpers.read_client_info(sys.argv[3])
+	server_data = data_helpers.read_server_results(sys.argv[4])
+	try:
+		host_system_usage = log_system_usage.load_cpu_stats(sys.argv[5])
+		remote_system_usage = log_system_usage.load_cpu_stats(sys.argv[6])
+	except FileNotFoundError:
+		print('The system usage logs weren\'t found, so skipping them...')
+		plot_cpu_usage = False
+	else:
+		plot_cpu_usage = True
+	#
+	print('Loaded Files: {}'.format(time.time()-initial_time))
+	#
+	norm_throughput = data_helpers.normalize_relay_throughput(throughput_data)
+	start_time = client_data['start_time']
+	last_byte_times = np.array([x['time_of_last_byte'] for x in server_data])
+	end_time = np.max(last_byte_times)
+	#
+	if plot_cpu_usage:
+		host_system_usage['timestamps'] = np.array(host_system_usage['timestamps'])
+		host_cpu_usage = {int(cpu): np.array(log_system_usage.calculate_cpu_usage_continuous(host_system_usage['stats']['cpus'][cpu])) for cpu in host_system_usage['stats']['cpus']}
+		remote_system_usage['timestamps'] = np.array(remote_system_usage['timestamps'])
+		remote_cpu_usage = {int(cpu): np.array(log_system_usage.calculate_cpu_usage_continuous(remote_system_usage['stats']['cpus'][cpu])) for cpu in remote_system_usage['stats']['cpus']}
+		#
+		plot_processes = ('processes' in remote_system_usage)
+		plot_processes = False
+		#
+		if plot_processes:
+			target_tor_proc_states = [[y['state'] for y in x if 'target/torrc' in y['args']] for x in remote_system_usage['processes']]
+			host_tor_proc_states = [[y['state'] for y in x if '/torrc' in y['args'] and 'target/torrc' not in y['args']] for x in host_system_usage['processes']]
+			host_all_proc_states = [[y['state'] for y in x] for x in host_system_usage['processes']]
+			#print(remote_system_usage['processes'][700])
+			any_host_tor_proc_states_D = ['D' in x for x in host_tor_proc_states]
+			any_host_proc_states_D = ['D' in x for x in host_all_proc_states]
+			#any_proc_states_D = ['D' in x for x in [[y['state'] for y in z] for z in remote_system_usage['processes']]]
+			assert set([len(x) for x in target_tor_proc_states]) == {0,1}
+			for x in target_tor_proc_states:
+				if len(x) == 0:
+					x.append(None)
+				#
+			#
+			print(set([y['args'] for x in host_system_usage['processes'] for y in x if y['state'] == 'D']))
+			#
+			target_tor_proc_states = [x[0] for x in target_tor_proc_states]
+		#
+	#
+	throughput_start_index = np.argmax(norm_throughput['timestamps'] > start_time)-5
+	throughput_end_index = np.argmax(norm_throughput['timestamps'] > end_time)+5
+	if plot_cpu_usage:
+		host_cpu_start_index = np.argmax(host_system_usage['timestamps'] > start_time)-20
+		host_cpu_end_index = np.argmax(host_system_usage['timestamps'] > end_time)+20
+		remote_cpu_start_index = np.argmax(remote_system_usage['timestamps'] > start_time)-20
+		remote_cpu_end_index = np.argmax(remote_system_usage['timestamps'] > end_time)+20
+	#
+	#start_time = 0
+	#
+	print('Processed Data: {}'.format(time.time()-initial_time))
+	#
+	fig, (ax1, ax2, ax3) = plt.subplots(3, sharex=True, figsize=(20,13))
+	fig.suptitle('{}\n\n{}'.format(title, os.path.basename(sys.argv[2])))
+	#
+	ax1_colors = plt.get_cmap('tab20').colors[0:2]
+	for x in range(2):
+		ax1.step(norm_throughput['timestamps'][throughput_start_index:throughput_end_index]-start_time,
+		         0.5*np.sum(norm_throughput['threads'][throughput_start_index:throughput_end_index,x::2],
+		         axis=1)/2**20, where='post', color=ax1_colors[x])
+	#for x in range(int(norm_throughput['threads'].shape[1]/2)):
+	#	ax1.step(norm_throughput['timestamps'][throughput_start_index:throughput_end_index]-start_time,
+	#	         0.5*np.sum(norm_throughput['threads'][throughput_start_index:throughput_end_index,x*2:x*2+2],
+	#	         axis=1)/2**20, where='post', color=ax1_colors[0])
+	#
+	ax1.step(norm_throughput['timestamps'][throughput_start_index:throughput_end_index]-start_time,
+	         0.5*np.sum(norm_throughput['threads'][throughput_start_index:throughput_end_index,:],
+	         axis=1)/2**20, where='post', color='grey')
+	ax1.set_ylabel('Throughput (MiB/s)', color=ax1_colors[0])
+	#
+	ax1_twin = ax1.twinx()
+	ax1_twin_color = plt.get_cmap('tab20').colors[4]
+	ax1_twin.plot(np.sort(last_byte_times)-start_time, np.arange(len(last_byte_times)), color=ax1_twin_color)
+	#ax1_twin.set_ylim([0, None])
+	ax1_twin.set_ylabel('Number of completed streams', color=ax1_twin_color)
+	#
+	print('Finished plotting ax1: {}'.format(time.time()-initial_time))
+	#
+	colormap = plt.get_cmap('tab20').colors #'tab10'
+	assigned_colors = []
+	#
+	for transfer in server_data:
+		color_selector = transfer['custom_data']['circuit'][1][-1]
+		if color_selector in assigned_colors:
+			color_index = assigned_colors.index(color_selector)
+		else:
+			color_index = len(assigned_colors)
+			assigned_colors.append(color_selector)
+		#
+		#bins = np.arange(start_time, transfer['deltas']['timestamps'][-1], 0.1)
+		#binned_indexes = np.digitize(transfer['deltas']['timestamps'], bins)
+		#binned_deltas = np.zeros(bins.shape)
+		#for x in range(len(binned_indexes)):
+		#	binned_deltas[binned_indexes[x]-1] += transfer['deltas']['bytes'][x]
+		#
+		#zeros = (binned_deltas == 0).nonzero()[0]
+		#bins = np.delete(bins, zeros)
+		#binned_deltas = np.delete(binned_deltas, zeros)
+		#ax2.step(bins-start_time, np.cumsum(binned_deltas), color=colormap[color_index%len(colormap)], where='post')
+		ax2.step(transfer['deltas']['timestamps']-start_time, np.cumsum(transfer['deltas']['bytes']),
+		         color=colormap[color_index%len(colormap)], where='post')
+	#
+	ax2.set_ylabel('Bytes')
+	ax2_twin = ax2.twinx()
+	ax2_twin.set_ylim([x/(2**20) for x in ax2.get_ylim()])
+	ax2_twin.set_ylabel('MiB')
+	#
+	print('Finished plotting ax2: {}'.format(time.time()-initial_time))
+	#
+	if plot_cpu_usage:
+		for cpu in remote_cpu_usage:
+			ax3.step(remote_system_usage['timestamps'][remote_cpu_start_index:remote_cpu_end_index]-start_time,
+			         100*remote_cpu_usage[cpu][remote_cpu_start_index:remote_cpu_end_index],
+			         label='CPU {}'.format(cpu))
+		#
+		if plot_processes:
+			vals = list(set(target_tor_proc_states))
+			vals.remove(None)
+			vals = sorted(vals) + [None]
+			print(vals)
+			ax3.step(remote_system_usage['timestamps'][remote_cpu_start_index:remote_cpu_end_index]-start_time,
+			         [10*vals.index(x)+120 for x in target_tor_proc_states[remote_cpu_start_index:remote_cpu_end_index]])
+			ax3.step(host_system_usage['timestamps'][host_cpu_start_index:host_cpu_end_index]-start_time,
+			         [int(x)*20+160 for x in any_host_proc_states_D[host_cpu_start_index:host_cpu_end_index]])
+		#
+	#
+	import matplotlib
+	ax3.grid()
+	ax3.xaxis.set_major_formatter(matplotlib.ticker.FormatStrFormatter('%.3f'))
+	ax3.set_xlabel('Time (s)')
+	ax3.set_ylabel('Per-Core CPU Usage (%)')
+	ax3.legend()
+	#
+	print('Finished plotting ax3: {}'.format(time.time()-initial_time))
+	#
+	fig.tight_layout()
+	plt.subplots_adjust(top=0.92)
+	output_filename = title.lower().replace(' ','-').replace('/','').replace('(','').replace(')','').replace(',','')
+	#plt.savefig(os.path.join('/tmp', output_filename))
+	plt.show(fig)
+#

+ 53 - 0
src/plot_relay_throughput.py

@@ -0,0 +1,53 @@
+import matplotlib.pylab as plt
+import numpy as np
+import math
+import sys
+#
+import data_helpers
+#
+#filenames = ['relay_throughput.log', 'multithreaded-throughput-rpi.log']
+#filenames = ['cluck2_working_0_0.log', 'original-throughput.log']
+filenames = sys.argv[1:]
+#
+selected_data = {}
+for name in filenames:
+	(_, data) = data_helpers.read_relay_throughput(name)
+	#
+	totals = np.sum(data['threads'], axis=1)
+	start_time = np.where(totals/2**20 > 5)[0][0]-10
+	end_time = np.where(totals/2**20 > 5)[0][-1]+10
+	#
+	selected_data[name] = {}
+	selected_data[name]['timestamps'] = data['timestamps'][start_time:end_time]
+	selected_data[name]['timestamps'] -= selected_data[name]['timestamps'][0]
+	selected_data[name]['threads'] = data['threads'][start_time:end_time,:]
+	#
+	normalized_data = data_helpers.normalize_relay_throughput(data)
+	selected_data[name]['timestamps_normalized'] = normalized_data['timestamps'][start_time:end_time]
+	selected_data[name]['threads_normalized'] = normalized_data['threads'][start_time:end_time,:]
+	#
+	time_deltas = selected_data[name]['timestamps'][1:]-selected_data[name]['timestamps'][:-1]
+	num_timesteps = int(round(30/np.mean(time_deltas)))
+	max_throughput = np.max(np.convolve(0.5*np.sum(selected_data[name]['threads_normalized'], axis=1)/2**20, np.ones(num_timesteps)/num_timesteps, 'valid'))
+	# we divide by 2 since we the throughput is the average of the sent and received bytes
+	print('{}: {:.2f} MiB/s'.format(name, max_throughput))
+#
+for name in selected_data:
+	plt.step(selected_data[name]['timestamps_normalized'], 0.5*np.sum(selected_data[name]['threads_normalized'], axis=1)/2**20, label=name, where='post')
+	for thread in range(int(selected_data[name]['threads'].shape[1]/2)):
+		for (ind, direction) in zip(range(2), ['sent', 'recv']):
+			label = '{} - {} - {}'.format(name, thread, direction)
+			#plt.plot(selected_data[name]['timestamps_normalized'], selected_data[name]['threads_normalized'][:,2*thread+ind]/2**20, label=label)
+			#plt.plot(selected_data[name]['timestamps_normalized'], np.cumsum(selected_data[name]['threads_normalized'][:,2*thread+ind])/2**20, label=label)
+		#
+	#
+	for (ind, direction) in zip(range(2), ['sent', 'recv']):
+		label = '{} - {}'.format(name, direction)
+		#plt.plot(selected_data[name]['timestamps_normalized'], np.sum(selected_data[name]['threads_normalized'][:,ind::2], axis=1)/2**20, label=label)
+		#plt.plot(selected_data[name]['timestamps_normalized'], np.cumsum(np.sum(selected_data[name]['threads_normalized'][:,ind::2], axis=1))/2**20, label=label)
+	#
+#
+plt.xlabel('Time (s)')
+plt.ylabel('Throughput (MiB/s)')
+plt.legend()
+plt.show()

+ 40 - 0
src/plot_server_results.py

@@ -0,0 +1,40 @@
+import sys
+import numpy as np
+import matplotlib.pylab as plt
+import pdb
+#
+import data_helpers
+#
+if __name__ == '__main__':
+	transfers = data_helpers.read_server_results(sys.argv[1])
+	clients = data_helpers.read_client_info(sys.argv[2])
+	#
+	#approx_start_time = np.min([x['time_of_first_byte'] for x in transfers])
+	approx_start_time = clients['start_time']
+	#
+	colormap = plt.get_cmap('tab20').colors #'tab10'
+	assigned_colors = []
+	#
+	for transfer in transfers:
+		if transfer['custom_data']['circuit'][1][-1] in assigned_colors:
+			color_index = assigned_colors.index(transfer['custom_data']['circuit'][1][-1])
+		else:
+			color_index = len(assigned_colors)
+			assigned_colors.append(transfer['custom_data']['circuit'][1][-1])
+		#
+		plt.step(transfer['deltas']['timestamps']-approx_start_time, np.cumsum(transfer['deltas']['bytes']),
+		         color=colormap[color_index%len(colormap)], where='post')
+	#
+	ax1 = plt.gca()
+	ax1.set_ylabel('Bytes')
+	ax2 = ax1.twinx()
+	ax2.set_ylim([x/(2**20) for x in ax1.get_ylim()])
+	ax2.set_ylabel('MiB')
+	plt.show()
+	#
+	last_byte_times = np.array([x['time_of_last_byte'] for x in transfers])
+	plt.plot(np.sort(last_byte_times)-approx_start_time, np.arange(len(last_byte_times)))
+	plt.xlim([0, None])
+	plt.ylim([0, None])
+	plt.show()
+#

+ 238 - 52
src/relay_working_experiment.py

@@ -5,6 +5,7 @@ import shutil
 import logging
 import random
 import os
+import subprocess
 import multiprocessing
 import threading
 import time
@@ -25,14 +26,23 @@ import experiment_client
 import experiment
 import useful
 #
+#remote_name = 'sengler-rpi'
+#remote_name = 'cluck2'
+#remote_name = None
+#
 class CustomExperiment(experiment.Experiment):
-	def __init__(self, use_helgrind, target_tor, *args, **kwargs):
+	def __init__(self, use_helgrind, target_tor, num_additional_eventloops, remote_name, *args, **kwargs):
 		self.use_helgrind = use_helgrind
 		self.target_tor = target_tor
+		self.num_additional_eventloops = num_additional_eventloops
+		self.remote_name = remote_name
 		super().__init__(*args, **kwargs)
 		#
 		self.chutney_path = '/home/sengler/code/working/chutney'
-		self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
+		#self.tor_path = '/home/sengler/code/releases/tor-0.4.2.5'
+		self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller'
+		#self.tor_path = '/home/sengler/code/dev/tor-0.4.2.6-fixed-controller-kist-changes'
+		#self.tor_path = '/home/sengler/code/working/tor'
 	#
 	def configure_chutney(self):
 		#self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl') for _ in range(self.num_authorities)] + \
@@ -43,9 +53,23 @@ class CustomExperiment(experiment.Experiment):
 		#target_tor_path = '/home/sengler/code/working/tor/src/app/tor'
 		#target_tor_path = '/home/sengler/code/releases/tor-0.4.2.5/src/app/tor'
 
-		local_ip = '172.19.156.16'
-		target_ip = '172.19.156.136'
-		target_hostname = 'cluck2'
+		if self.remote_name == 'cluck2':
+			local_ip = '172.19.156.16'
+			target_ip = '172.19.156.136'
+			#local_ip = '129.97.119.196'
+			#target_ip = '129.97.119.226'
+			target_hostname = 'cluck2'
+		elif self.remote_name == 'sengler-rpi':
+			local_ip = '129.97.119.196'
+			target_ip = '129.97.169.9'
+			target_hostname = 'sengler-rpi'
+		elif self.remote_name is None:
+			local_ip = None
+			target_ip = None
+			target_hostname = None
+		else:
+			raise Exception('hostname not known')
+		#
 
 		target_optional_args = {}
 		if self.target_tor is not None:
@@ -54,61 +78,139 @@ class CustomExperiment(experiment.Experiment):
 			target_optional_args['valgrind_settings'] = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
 
 		#target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libprofiler.so.0'}
-		target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
-		target_optional_args['ip'] = target_ip
-		target_optional_args['remote_hostname'] = target_hostname
+		#target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/usr/lib/libtcmalloc_and_profiler.so.4'}
+		#target_optional_args['add_environ_vars'] = {'LD_PRELOAD': '/home/sengler/build/lib/libtcmalloc_and_profiler.so'}
+		#target_optional_args['add_environ_vars'] = {'EVENT_NOEPOLL': '', 'EVENT_SHOW_METHOD': ''}
+		if target_ip is not None:
+			target_optional_args['ip'] = target_ip
+		if target_hostname is not None:
+			target_optional_args['remote_hostname'] = target_hostname
+
+		target_optional_args['num_cpus'] = 4 # make sure it can process onion skins fast enough, and keep it consistent between computers
+		target_optional_args['num_additional_eventloops'] = self.num_additional_eventloops
 		target_cpu_prof = False #True
 		target_daemon = False
-		logs = ['notice']
+		target_log_throughput = True
+		target_logs = ['notice']
+		#other_logs = ['info', 'notice']
+		other_logs = ['notice']
 		#if self.use_helgrind:
 		#	valgrind_settings = ['--tool=helgrind', '-v', '--suppressions=libevent.supp', '--read-var-info=yes']
 		#else:
 		#	valgrind_settings = None
 		#
 
-		self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=logs) for _ in range(self.num_authorities)] + \
-		    [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=logs) for _ in range(self.num_guards)] + \
-		    [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl',
-		      daemon=target_daemon, log_files=logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
-		    [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=logs) for _ in range(self.num_exits)] + \
-		    [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=logs) for _ in range(self.num_clients)]
+		self.nodes = [chutney_manager.Node(tag='a', relay=1, authority=1, torrc='authority.tmpl', log_files=other_logs) for _ in range(self.num_authorities)] + \
+		    [chutney_manager.Node(tag='r', relay=1, torrc='relay-non-exit.tmpl', log_files=other_logs) for _ in range(self.num_guards)] + \
+		    [chutney_manager.Node(tag='target', relay=1, torrc='relay-non-exit.tmpl', log_throughput=target_log_throughput,
+		      daemon=target_daemon, log_files=target_logs, sandbox=0, google_cpu_profiler=target_cpu_prof, **target_optional_args)] + \
+		    [chutney_manager.Node(tag='e', exit=1, torrc='relay.tmpl', log_files=other_logs) for _ in range(self.num_exits)] + \
+		    [chutney_manager.Node(tag='c', client=1, torrc='client.tmpl', log_files=other_logs) for _ in range(self.num_clients)]
 		#
 		for node in self.nodes:
 			if not 'num_cpus' in node.options:
 				node.options['num_cpus'] = 2
 			#
-			if not 'ip' in node.options:
+			if not 'ip' in node.options and local_ip is not None:
 				node.options['ip'] = local_ip
 			#
 		#
-		numa_remaining = numa.get_numa_overview()
-		for (node, index) in zip(self.nodes, range(len(self.nodes))):
-			num_cpus = node.options['num_cpus']
-			if num_cpus%2 != 0:
-				num_cpus += 1
+		#numa_remaining = numa.get_numa_overview()
+		#for (node, index) in zip(self.nodes, range(len(self.nodes))):
+		#	num_cpus = node.options['num_cpus']
+		#	if num_cpus%2 != 0:
+		#		num_cpus += 1
+		#	#
+		#	if node.options['tag'] == 'target':
+		#		num_cpus = max(num_cpus, 6)
+		#	#
+		#	#if node.options['tag'] != 'target':
+		#	#	(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
+		#	#	node.options['numa_settings'] = (numa_node, processors)
+		#	#
+		##
+		self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
+		# TODO: ^^ improve this
+	#
+	def start_remote_logging(self, next_action=None):
+		if self.remote_name is None:
+			# running locally
+			if next_action is not None:
+				next_action()
+			#
+			return
+		#
+		local_script_path = 'log_system_usage.py'
+		remote_script_path = '/tmp/log_system_usage.py'
+		remote_save_path = '/tmp/cpu-usage.pickle.gz'
+		local_save_path = os.path.join(self.save_data_path, 'remote-cpu-usage.pickle.gz')
+		command = 'python3 {} 0.1 {}'.format(remote_script_path, remote_save_path)
+		#
+		try:
+			subprocess.check_output(['scp', local_script_path, '{}:{}'.format(self.remote_name, remote_script_path)], stderr=subprocess.STDOUT)
+			p = subprocess.Popen(['ssh', self.remote_name, command])
+			#
+			time.sleep(5)
+			# wait a few seconds to make sure it doesn't exit immediately
+			if p.poll() != None:
+				raise Exception('Remote CPU monitoring script exited immediately')
+			#
+			if next_action is not None:
+				next_action()
+			#
+			if p.poll() != None:
+				raise Exception('Remote CPU monitoring script exited before it was supposed to')
 			#
-			if node.options['tag'] == 'target':
-				num_cpus = max(num_cpus, 6)
+		finally:
+			try:
+				subprocess.check_output(['ssh', self.remote_name, 'pkill --full --signal sigint \'{}\''.format(command)], stderr=subprocess.STDOUT)
+			except:
+				logging.warn('Could not kill remote python script')
+			#
+			try:
+				p.wait(timeout=30)
+			except subprocess.TimeoutExpired:
+				p.kill()
+				logging.warn('Process did not end as expected, so sent a SIGKILL')
+			except:
+				logging.warn('Could not kill')
+			#
+			try:
+				subprocess.check_output(['scp', '{}:{}'.format(self.remote_name, remote_save_path), local_save_path], stderr=subprocess.STDOUT)
+			except:
+				logging.warn('Failed to get remote \'{}\' data file'.format(remote_save_path))
+			#
+			try:
+				subprocess.check_output(['ssh', self.remote_name, 'rm', remote_save_path], stderr=subprocess.STDOUT)
+			except:
+				logging.warn('Failed to delete remote \'{}\' data file'.format(remote_save_path))
+			#
+			try:
+				subprocess.check_output(['ssh', self.remote_name, 'rm', remote_script_path], stderr=subprocess.STDOUT)
+			except:
+				logging.warn('Failed to delete remote \'{}\' script file'.format(remote_script_path))
 			#
-			(numa_node, processors) = chutney_manager.numa_scheduler(num_cpus, numa_remaining)
-			node.options['numa_settings'] = (numa_node, processors)
 		#
-		self.proxy_control_ports = [self.nodes[x].guess_control_port(x) for x in range(len(self.nodes)) if ('client', 1) in self.nodes[x].options.items()]
-		# TODO: ^^ improve this
 	#
 #
 def build_circuit_generator(consensus, server_address):
 	fingerprints = [desc.nickname for desc in consensus]
 	exit_fingerprints = [desc.nickname for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
 	#
-	target_fingerprint = [desc.nickname for desc in consensus if desc.nickname.endswith('target')][0]
-	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set([target_fingerprint]))
+	target_fingerprints = [desc.nickname for desc in consensus if desc.nickname.endswith('target')]
+	assert len(target_fingerprints) >= 1, 'No target relay in the consensus'
+	non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)-set(target_fingerprints))
 	#
 	assert len(exit_fingerprints) >= 1, 'Need at least one exit relay'
 	assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
 	#
+	non_exit_fingerprints = sorted(non_exit_fingerprints)
+	target_fingerprints = sorted(target_fingerprints)
+	exit_fingerprints = sorted(exit_fingerprints)
+	# try to get reproducible behavior
+	#
 	#return lambda gen_id=None: [random.choice(non_exit_fingerprints), target_fingerprint, random.choice(exit_fingerprints)]
-	return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprint, exit_fingerprints[gen_id%len(exit_fingerprints)]]
+	return lambda gen_id: [non_exit_fingerprints[gen_id%len(non_exit_fingerprints)], target_fingerprints[gen_id%len(target_fingerprints)], exit_fingerprints[gen_id%len(exit_fingerprints)]]
 	'''
 	fingerprints = [desc.fingerprint for desc in consensus]
 	exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*server_address)]
@@ -146,39 +248,123 @@ if __name__ == '__main__':
 	                    help='log helgrind data')
 	args = parser.parse_args()
 	#
-	#num_clients = 4
-	#num_guards = 6 # number of relays (including guards)
-	#num_authorities = 2 # will also act as a relay or guard
-	#num_exits = 8 # will be used only as an exit
-	num_clients = 12
-	num_guards = 14 # number of relays (including guards)
-	num_authorities = 2 # will also act as a relay or guard
-	num_exits = 16 # will be used only as an exit
-	#
 	experiment_time = time.time()
+	#base_save_data_path = os.path.join('/home/sengler/data/experiments', str(int(experiment_time)))
+	base_save_data_path = os.path.join('/var/ssd-raid/sengler/data/experiments', str(int(experiment_time)))
+	os.mkdir(base_save_data_path)
 	#
-	save_data_path = None
 	measureme_log_path = None
 	measureme = False
 	#
 	start_time = time.time()
 	#
-	#num_streams_per_client = 1
-	num_streams_per_client = 6
-	logging.info('Starting with {} streams per client'.format(num_streams_per_client))
+	tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'working-without':'/home/sengler/code/working/tor-without-tcmalloc/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor', 'dev-with':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-with-tcmalloc/src/app/tor'}
+	hosts = ['sengler-rpi', 'cluck2']
+	###hosts = ['cluck2']
+	###hosts = ['sengler-rpi']
+	num_repetitions = 15
+	nums_additional_eventloops_options = [0, 1, 2, 3]
 	#
-	experiment = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
-	                              num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
-	                              build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
+	#tors = {'working':'/home/sengler/code/working/tor/src/app/tor', 'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
+	#hosts = ['cluck2']
+	#num_repetitions = 1
+	#nums_additional_eventloops_options = [0, 1, 2, 3]
 	#
-	def sleep_then_run(duration, func):
-		logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
-		time.sleep(duration)
-		logging.info('Done sleeping')
-		return func()
+	#tors = {'dev-debug-stall':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-debug-stall/src/app/tor'}
+	#tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-test-kist-changes/src/app/tor'}
+	#tors = {'dev-without':'/home/sengler/code/dev/tor-throughput-log-0.4.2.6-without-tcmalloc/src/app/tor'}
+	#hosts = ['cluck2']
+	#num_repetitions = 5
+	#nums_additional_eventloops_options = [3, 2, 1, 0]
 	#
 	try:
-		experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))
+		for repeat in range(num_repetitions):
+			for host in hosts:
+				for (tor_name, tor_path) in tors.items():
+					#num_clients = 4
+					#num_guards = 6 # number of relays (including guards)
+					#num_authorities = 2 # will also act as a relay or guard
+					#num_exits = 8 # will be used only as an exit
+					#num_streams_per_client = 1
+					#if True:
+					#	num_clients = 4
+					#	num_guards = 10 # number of relays (including guards)
+					#	num_authorities = 2 # will also act as a relay or guard
+					#	num_exits = 12 # will be used only as an exit
+					#	num_streams_per_client = 3
+					#	num_bytes = 20*(2**20)
+					if host == 'cluck2':
+						num_clients = 150
+						num_guards = 58 # number of relays (including guards)
+						num_authorities = 2 # will also act as a relay or guard
+						num_exits = 60 # will be used only as an exit
+						num_streams_per_client = 10
+						num_bytes = 20*(2**20)
+					elif host == 'sengler-rpi':
+						num_clients = 30
+						num_guards = 58 # number of relays (including guards)
+						num_authorities = 2 # will also act as a relay or guard
+						num_exits = 60 # will be used only as an exit
+						num_streams_per_client = 8
+						num_bytes = 10*(2**20)
+					elif host is None:
+						num_clients = 10
+						num_guards = 10 # number of relays (including guards)
+						num_authorities = 2 # will also act as a relay or guard
+						num_exits = 12 # will be used only as an exit
+						num_streams_per_client = 5
+						num_bytes = 20*(2**20)
+					else:
+						raise Exception('host not known')
+					#
+					nums_additional_eventloops = [0]
+					if tor_name == 'working' or tor_name == 'working-without':
+						nums_additional_eventloops = nums_additional_eventloops_options
+					#
+					for num_additional_eventloops in nums_additional_eventloops:
+						attempt = 0
+						while True:
+							attempt_str = '' if attempt == 0 else '_attempt-{}'.format(attempt)
+							save_data_path = os.path.join(base_save_data_path, '{}_{}_{}_{}{}'.format(host, tor_name, num_additional_eventloops, repeat, attempt_str))
+							os.mkdir(save_data_path)
+							logging.info('Starting on {} using {}-{} ({}), repeat {}, attempt {}'.format(host, tor_name, num_additional_eventloops, tor_path, repeat, attempt))
+							#
+							#experiment = CustomExperiment(args.helgrind, args.target_tor, save_data_path, measureme_log_path, args.num_bytes,
+							experiment = CustomExperiment(args.helgrind, tor_path, num_additional_eventloops, host, save_data_path,
+							                              measureme_log_path, num_bytes,
+							                              num_streams_per_client, num_clients, num_guards, num_authorities, num_exits,
+							                              build_circuit_generator, args.buffer_len, args.wait_range, measureme, test_network=False)
+							#
+							def sleep_then_run(duration, func):
+								logging.info('Sleeping for {} seconds before running \'{}\''.format(duration, func.__name__))
+								time.sleep(duration)
+								logging.info('Done sleeping')
+								return func()
+							#
+							#import subprocess
+							#p = subprocess.Popen(['ssh', '-t', 'sengler-rpi', 'python3 /tmp/log_system_usage.py /tmp/usage.gz'])
+							#
+							try:
+								experiment.start_system_logging(lambda: experiment.start_remote_logging(lambda: experiment.start_chutney(lambda: experiment.start_throughput_server(lambda: sleep_then_run(20, experiment.start_throughput_clients)))))
+							except (stem.Timeout, stem.CircuitExtensionFailed):
+								tries = 5
+								attempt += 1
+								if attempt < tries:
+									logging.exception('Experiment run failed, trying again ({} tries remaining)'.format(tries-attempt))
+									continue
+								else:
+									raise
+								#
+							#
+							shutil.copytree('/tmp/chutney-net/nodes', os.path.join(save_data_path, 'nodes'))
+							os.system("ps u | grep 'tor'")
+							os.system("rm -rf /tmp/chutney-net/*")
+							break
+						#
+					#
+				#
+			#
+		#
 	except KeyboardInterrupt:
 		logging.info('Stopped (KeyboardInterrupt)')
 	#

+ 17 - 1
src/throughput_server.py

@@ -17,6 +17,7 @@ class ThroughputServer:
 		#
 		self.server_listener = basic_protocols.ServerListener(bind_endpoint, self._accept_callback)
 		#
+		self.accepted_pending = []
 		self.processes = []
 		self.process_counter = 0
 		#
@@ -34,7 +35,11 @@ class ThroughputServer:
 	def _accept_callback(self, socket):
 		conn_id = self.process_counter
 		self.process_counter += 1
+		logging.info('Server protocol id: {} accepted'.format(conn_id))
 		#
+		self.accepted_pending.append((socket, conn_id))
+	#
+	def _start_new_process(self, socket, conn_id):
 		p = multiprocessing.Process(target=self._start_server_conn, args=(socket, conn_id))
 		self.processes.append(p)
 		p.start()
@@ -47,12 +52,14 @@ class ThroughputServer:
 		protocol = throughput_protocols.ServerProtocol(socket, results_callback=results_callback,
 		                                               use_acceleration=True)
 		try:
+			logging.info('Server protocol id: {} starting'.format(conn_id))
 			protocol.run()
 		except KeyboardInterrupt:
 			logging.info('Server protocol id: {} stopped (KeyboardInterrupt)'.format(conn_id))
 		except:
 			logging.exception('Server protocol id: {} had an error'.format(conn_id))
 		finally:
+			logging.info('Server protocol id: {} done'.format(conn_id))
 			socket.close()
 		#
 	#
@@ -66,7 +73,16 @@ class ThroughputServer:
 	def run(self):
 		try:
 			while True:
-				self.server_listener.accept()
+				block = True
+				if len(self.accepted_pending) > 0:
+					block = False
+				#
+				accepted_something = self.server_listener.accept(False)
+				if not accepted_something and len(self.accepted_pending) > 0:
+					# didn't accept anything and we have things to do
+					(socket, conn_id) = self.accepted_pending.pop(0)
+					self._start_new_process(socket, conn_id)
+				#
 			#
 		except OSError as e:
 			if e.errno == 22 and self.stop_event is not None and self.stop_event.is_set():

+ 5 - 2
src/useful.py

@@ -75,7 +75,10 @@ class QueueGetter:
 	def stop(self):
 		self.queue.put(None)
 	#
-	def join(self):
-		self.t.join()
+	def join(self, timeout=None):
+		self.t.join(timeout=timeout)
+		if self.t.is_alive():
+			raise TimeoutError('Could not join QueueGetter thread')
+		#
 	#
 #