Browse Source

Added files.

Steven Engler 5 years ago
parent
commit
d8d056ad52

+ 3 - 2
Makefile

@@ -1,6 +1,6 @@
 CC=gcc
-CFLAGS=-O3
-PYTHON_INC=/usr/include/python3.6
+CFLAGS=-O3 -std=c99 -D_BSD_SOURCE
+PYTHON_INC=/usr/include/python3.4
 
 PY_BIN_FILES:=$(patsubst src/%.py,bin/%.py,$(wildcard src/*.py))
 PY_DEV_FILES:=$(patsubst src/%.py,dev/%.py,$(wildcard src/*.py))
@@ -28,6 +28,7 @@ bin_dir:
 #######
 
 dev/%.py: src/%.py
+	rm -f $@
 	ln $< $@
 
 dev/%.so: src/%.c

+ 35 - 0
chutney.patch

@@ -0,0 +1,35 @@
+diff --git a/lib/chutney/TorNet.py b/lib/chutney/TorNet.py
+index bf26978..73f4e5f 100644
+--- a/lib/chutney/TorNet.py
++++ b/lib/chutney/TorNet.py
+@@ -682,7 +682,7 @@ class LocalNodeController(NodeController):
+     def waitOnLaunch(self):
+         """Check whether we can wait() for the tor process to launch"""
+         # TODO: is this the best place for this code?
+-        # RunAsDaemon default is 0
++        # RunAsDaemon default is 1
+         runAsDaemon = False
+         with open(self._getTorrcFname(), 'r') as f:
+             for line in f.readlines():
+@@ -691,7 +691,7 @@ class LocalNodeController(NodeController):
+                 if len(stline) > 0:
+                     splline = stline.split()
+                     # if the line has at least two tokens on it
+-                    if (len(splline) > 0 and
++                    if (len(splline) > 1 and
+                             splline[0].lower() == "RunAsDaemon".lower() and
+                             splline[1] == "1"):
+                         # use the RunAsDaemon value from the torrc
+@@ -874,9 +874,9 @@ class TorEnviron(chutney.Templating.Environ):
+         ocp_line = ('__OwningControllerProcess %d' % (cpid))
+         # if we want to leave the network running, or controlling_pid is 1
+         # (or invalid)
+-        if (os.environ.get('CHUTNEY_START_TIME', 0) < 0 or
+-            os.environ.get('CHUTNEY_BOOTSTRAP_TIME', 0) < 0 or
+-            os.environ.get('CHUTNEY_STOP_TIME', 0) < 0 or
++        if (int(os.environ.get('CHUTNEY_START_TIME', 0)) < 0 or
++            int(os.environ.get('CHUTNEY_BOOTSTRAP_TIME', 0)) < 0 or
++            int(os.environ.get('CHUTNEY_STOP_TIME', 0)) < 0 or
+             cpid <= 1):
+             return '#' + ocp_line
+         else:

+ 99 - 4
src/accelerated_functions.c

@@ -1,8 +1,11 @@
 #include <string.h>
+#include <sys/time.h>
 #include <sys/poll.h>
 #include <sys/socket.h>
 #include <Python.h>
 //
+#define USE_NEW_HISTORY
+//
 static PyObject *py_push_data(PyObject *self, PyObject *args);
 static char push_data_docstring[] =
     "Send data as quickly as possible into a socket.";
@@ -76,10 +79,28 @@ int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
 	return 0;
 }
 //
-int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr, double* time_last_ptr){
+#ifdef USE_NEW_HISTORY
+typedef struct {
+	unsigned long bytes;
+	double timestamp;
+} byte_delta;
+
+int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
+              double* time_last_ptr, byte_delta* deltas, size_t deltas_len,
+              size_t* deltas_elements_needed){
+#else
+int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
+              double* time_last_ptr, unsigned long* byte_counter,
+              size_t byte_counter_len, size_t* byte_counter_elements_needed,
+              unsigned long* byte_counter_start){
+#endif
 	long bytes_read = 0;
 	char* buffer = malloc(buffer_len);
-	struct timeval time_of_first_byte, time_of_last_byte;
+	struct timeval time_of_first_byte, time_of_last_byte, time_current, time_elapsed;
+#ifdef USE_NEW_HISTORY
+#else
+	struct timeval rounded_time_of_first_byte;
+#endif
 	//
 	struct pollfd poll_fds[1];
 	int num_poll_fds = 0;
@@ -88,6 +109,10 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 		return -1;
 	}
 	//
+#ifdef USE_NEW_HISTORY
+	*deltas_elements_needed = 0;
+#endif
+	//
 	memset(poll_fds, 0, sizeof(poll_fds));
 	poll_fds[0].fd = socket;
 	poll_fds[0].events = POLLIN;
@@ -125,6 +150,28 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 		//
 		if(n > 0 && bytes_read == 0){
 			gettimeofday(&time_of_first_byte, NULL);
+#ifdef USE_NEW_HISTORY
+#else
+			rounded_time_of_first_byte.tv_sec = time_of_first_byte.tv_sec;
+			rounded_time_of_first_byte.tv_usec = 0;
+#endif
+		}
+		//
+		if(n > 0){
+			gettimeofday(&time_current, NULL);
+#ifdef USE_NEW_HISTORY
+			if(*deltas_elements_needed < deltas_len){
+				deltas[*deltas_elements_needed].bytes = n;
+				deltas[*deltas_elements_needed].timestamp = time_current.tv_sec + time_current.tv_usec/(1000.0*1000.0);
+				*deltas_elements_needed += 1;
+			}
+#else
+			timersub(&time_current, &rounded_time_of_first_byte, &time_elapsed);
+			*byte_counter_elements_needed = time_elapsed.tv_sec+1;
+			if(time_elapsed.tv_sec < byte_counter_len){
+				byte_counter[time_elapsed.tv_sec] += n;
+			}
+#endif
 		}
 		//
 		bytes_read += n;
@@ -133,6 +180,10 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_p
 	gettimeofday(&time_of_last_byte, NULL);
 	*time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0);
 	*time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0);
+#ifdef USE_NEW_HISTORY
+#else
+	*byte_counter_start = rounded_time_of_first_byte.tv_sec;
+#endif
 	//
 	free(buffer);
 	return 0;
@@ -173,14 +224,58 @@ static PyObject *py_pull_data(PyObject *self, PyObject *args){
 	//
 	double time_of_first_byte = 0;
 	double time_of_last_byte = 0;
+#ifdef USE_NEW_HISTORY
+	byte_delta deltas[20000] = {0};
+	size_t deltas_elements_needed = 0;
+#else
+	unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data
+	size_t byte_counter_elements_needed = 0;
+	unsigned long byte_counter_start = 0;
+#endif
 	int ret_val;
+	//
 	Py_BEGIN_ALLOW_THREADS
 	// GIL is unlocked, but don't do expensive operations in
 	// other threads or it might slow this one down
-	ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte);
+#ifdef USE_NEW_HISTORY
+	ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
+	                    deltas, sizeof(deltas)/sizeof(deltas[0]), &deltas_elements_needed);
+#else
+	ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
+	                    byte_counter, sizeof(byte_counter)/sizeof(byte_counter[0]),
+	                    &byte_counter_elements_needed, &byte_counter_start);
+#endif
 	Py_END_ALLOW_THREADS
 	//
-	PyObject* py_ret_val = Py_BuildValue("(idd)", ret_val, time_of_first_byte, time_of_last_byte);
+#ifdef USE_NEW_HISTORY
+	size_t deltas_elements_used = deltas_elements_needed;
+	if(deltas_elements_used > sizeof(deltas)/sizeof(deltas[0])){
+		deltas_elements_used = sizeof(deltas)/sizeof(deltas[0]);
+	}
+	//
+	PyObject* py_delta_bytes = PyList_New(deltas_elements_used);
+	PyObject* py_delta_timestamps = PyList_New(deltas_elements_used);
+	for(size_t i=0; i<deltas_elements_used; i++){
+		PyList_SetItem(py_delta_bytes, i, PyLong_FromLong(deltas[i].bytes));
+		PyList_SetItem(py_delta_timestamps, i, PyFloat_FromDouble(deltas[i].timestamp));
+	}
+#else
+	size_t byte_counter_elements_used = byte_counter_elements_needed;
+	if(byte_counter_elements_used > sizeof(byte_counter)/sizeof(byte_counter[0])){
+		byte_counter_elements_used = sizeof(byte_counter)/sizeof(byte_counter[0]);
+	}
+	//
+	PyObject* py_byte_counter = PyList_New(byte_counter_elements_used);
+	for(size_t i=0; i<byte_counter_elements_used; i++){
+		PyList_SetItem(py_byte_counter, i, PyLong_FromLong(byte_counter[i]));
+	}
+#endif
+	//
+#ifdef USE_NEW_HISTORY
+	PyObject* py_ret_val = Py_BuildValue("(idd{sNsN})", ret_val, time_of_first_byte, time_of_last_byte, "bytes", py_delta_bytes, "timestamps", py_delta_timestamps);
+#else
+	PyObject* py_ret_val = Py_BuildValue("(iddNi)", ret_val, time_of_first_byte, time_of_last_byte, py_byte_counter, byte_counter_start);
+#endif
 	//
 	return py_ret_val;
 }

+ 78 - 11
src/basic_protocols.py

@@ -77,6 +77,36 @@ class Protocol():
 		#
 	#
 #
+class FakeProxyProtocol(Protocol):
+	def __init__(self, socket, addr_port):
+		self.socket = socket
+		self.addr_port = addr_port
+		#
+		self.states = enum.Enum('PROXY_STATES', 'READY_TO_BEGIN CONNECTING_TO_PROXY DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.protocol_helper = None
+	#
+	def _run_iteration(self):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.protocol_helper = ProtocolHelper()
+			host, port = self.addr_port
+			addr = socket.inet_aton(host)[::-1]
+			self.protocol_helper.set_buffer(addr+struct.pack('!H', port))
+			self.state = self.states.CONNECTING_TO_PROXY
+		#
+		if self.state is self.states.CONNECTING_TO_PROXY:
+			if self.protocol_helper.send(self.socket):
+				self.protocol_helper = ProtocolHelper()
+				self.state = self.states.DONE
+			#
+		#
+		if self.state is self.states.DONE:
+			return True
+		#
+		return False
+	#
+#
 class ChainedProtocol(Protocol):
 	def __init__(self, protocols):
 		self.protocols = protocols
@@ -170,7 +200,7 @@ class Socks4Protocol(Protocol):
 	#
 #
 class PushDataProtocol(Protocol):
-	def __init__(self, socket, total_bytes, send_buffer_len=None, use_acceleration=None):
+	def __init__(self, socket, total_bytes, send_buffer_len=None, use_acceleration=None, push_start_cb=None, push_done_cb=None):
 		if send_buffer_len is None:
 			send_buffer_len = 1024*512
 		#
@@ -180,8 +210,10 @@ class PushDataProtocol(Protocol):
 		self.socket = socket
 		self.total_bytes = total_bytes
 		self.use_acceleration = use_acceleration
+		self.push_start_cb = push_start_cb
+		self.push_done_cb = push_done_cb
 		#
-		self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO PUSH_DATA RECV_CONFIRMATION DONE')
+		self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO START_CALLBACK PUSH_DATA RECV_CONFIRMATION DONE_CALLBACK DONE')
 		self.state = self.states.READY_TO_BEGIN
 		#
 		self.byte_buffer = os.urandom(send_buffer_len)
@@ -198,8 +230,14 @@ class PushDataProtocol(Protocol):
 		#
 		if self.state is self.states.SEND_INFO:
 			if self.protocol_helper.send(self.socket):
-				self.state = self.states.PUSH_DATA
+				self.state = self.states.START_CALLBACK
+			#
+		#
+		if self.state is self.states.START_CALLBACK:
+			if self.push_start_cb is not None:
+				self.push_start_cb()
 			#
+			self.state = self.states.PUSH_DATA
 		#
 		if self.state is self.states.PUSH_DATA:
 			if self.use_acceleration:
@@ -234,8 +272,14 @@ class PushDataProtocol(Protocol):
 				if response != b'RECEIVED':
 					raise ProtocolException('Did not receive the expected message: {}'.format(response))
 				#
-				self.state = self.states.DONE
+				self.state = self.states.DONE_CALLBACK
+			#
+		#
+		if self.state is self.states.DONE_CALLBACK:
+			if self.push_done_cb is not None:
+				self.push_done_cb()
 			#
+			self.state = self.states.DONE
 		#
 		if self.state is self.states.DONE:
 			return True
@@ -260,6 +304,9 @@ class PullDataProtocol(Protocol):
 		self.protocol_helper = None
 		self.time_of_first_byte = None
 		self.time_of_last_byte = None
+		#self.byte_counter = None
+		#self.byte_counter_start_time = None
+		self.deltas = None
 	#
 	def _run_iteration(self):
 		if self.state is self.states.READY_TO_BEGIN:
@@ -272,18 +319,28 @@ class PullDataProtocol(Protocol):
 				response = self.protocol_helper.get_buffer()
 				self.data_size = int.from_bytes(response[0:8], byteorder='big', signed=False)
 				self.recv_buffer_len = int.from_bytes(response[8:16], byteorder='big', signed=False)
+				assert(self.recv_buffer_len <= 10*1024*1024)
+				# don't use a buffer size larget than 10 MiB to avoid using up all memory
 				self.state = self.states.PULL_DATA
 			#
 		#
 		if self.state is self.states.PULL_DATA:
 			if self.use_acceleration:
-				(ret_val, time_of_first_byte, time_of_last_byte) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
+				#(ret_val, time_of_first_byte, time_of_last_byte, byte_counter, byte_counter_start_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
+				(ret_val, time_of_first_byte, time_of_last_byte, deltas) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
 				if ret_val < 0:
 					raise ProtocolException('Error while pulling data.')
 				#
+				#if sum(byte_counter) != self.data_size:
+				if sum(deltas['bytes']) != self.data_size:
+					logging.warning('Lost some history data ({} != {}).'.format(sum(deltas['bytes']), self.data_size))
+				#
 				self.bytes_read = self.data_size
 				self.time_of_first_byte = time_of_first_byte
 				self.time_of_last_byte = time_of_last_byte
+				#self.byte_counter = byte_counter
+				#self.byte_counter_start_time = byte_counter_start_time
+				self.deltas = deltas
 			else:
 				bytes_remaining = self.data_size-self.bytes_read
 				block_size = min(self.recv_buffer_len, bytes_remaining)
@@ -351,8 +408,12 @@ class SendDataProtocol(Protocol):
 		if self.state is self.states.SEND_INFO:
 			if self.protocol_helper.send(self.socket):
 				self.protocol_helper = ProtocolHelper()
-				self.protocol_helper.set_buffer(self.send_data)
-				self.state = self.states.SEND_DATA
+				if len(self.send_data) > 0:
+					self.protocol_helper.set_buffer(self.send_data)
+					self.state = self.states.SEND_DATA
+				else:
+					self.state = self.states.RECV_CONFIRMATION
+				#
 			#
 		#
 		if self.state is self.states.SEND_DATA:
@@ -399,7 +460,13 @@ class ReceiveDataProtocol(Protocol):
 				response = self.protocol_helper.get_buffer()
 				self.data_size = int.from_bytes(response, byteorder='big', signed=False)
 				self.protocol_helper = ProtocolHelper()
-				self.state = self.states.RECV_DATA
+				if self.data_size > 0:
+					self.state = self.states.RECV_DATA
+				else:
+					self.received_data = b''
+					self.protocol_helper.set_buffer(b'RECEIVED')
+					self.state = self.states.SEND_CONFIRMATION
+				#
 			#
 		#
 		if self.state is self.states.RECV_DATA:
@@ -523,11 +590,11 @@ if __name__ == '__main__':
 	#
 	if sys.argv[1] == 'client':
 		endpoint = ('127.0.0.1', 4747)
-		proxy = ('127.0.0.1', 9003)
-		#proxy = None
+		#proxy = ('127.0.0.1', 9003)
+		proxy = None
 		username = bytes([x for x in os.urandom(12) if x != 0])
 		#username = None
-		data_MB = 40
+		data_MB = 4000
 		#
 		client = SimpleClientConnectionProtocol(endpoint, data_MB*2**20, proxy=proxy, username=username)
 		client.run()

+ 167 - 0
src/correctness_tester.py

@@ -0,0 +1,167 @@
+#!/usr/bin/python3
+#
+import basic_protocols
+import logging
+import enum
+import time
+import socket
+#
+class ClientConnectionProtocol(basic_protocols.Protocol):
+	def __init__(self, endpoint, data, proxy=None, username=None):
+		self.endpoint = endpoint
+		self.data = data
+		self.proxy = proxy
+		self.username = username
+		#
+		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY SEND_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.socket = socket.socket()
+		self.sub_protocol = None
+		#
+		if self.proxy is None:
+			logging.debug('Socket %d connecting to endpoint %r...', self.socket.fileno(), self.endpoint)
+			self.socket.connect(self.endpoint)
+		else:
+			logging.debug('Socket %d connecting to proxy %r...', self.socket.fileno(), self.proxy)
+			self.socket.connect(self.proxy)
+		#
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			if self.proxy is None:
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, self.data)
+				self.state = self.states.SEND_DATA
+			else:
+				#self.sub_protocol = basic_protocols.Socks4Protocol(self.socket, self.endpoint, username=self.username)
+				self.sub_protocol = basic_protocols.WeirdProtocol(self.socket, self.endpoint)
+				self.state = self.states.CONNECT_TO_PROXY
+			#
+		#
+		if self.state is self.states.CONNECT_TO_PROXY:
+			if self.sub_protocol.run(block=block):
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, self.data)
+				self.state = self.states.SEND_DATA
+			#
+		#
+		if self.state is self.states.SEND_DATA:
+			if self.sub_protocol.run(block=block):
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class ServerConnectionProtocol(basic_protocols.Protocol):
+	def __init__(self, socket, conn_id, data_callback=None):
+		self.socket = socket
+		self.conn_id = conn_id
+		self.data_callback = data_callback
+		#
+		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.sub_protocol = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
+			self.state = self.states.RECV_DATA
+		#
+		if self.state is self.states.RECV_DATA:
+			if self.sub_protocol.run(block=block):
+				self.data_callback(self.conn_id, self.sub_protocol.received_data)
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+if __name__ == '__main__':
+	import sys
+	logging.basicConfig(level=logging.DEBUG)
+	#
+	import random
+	random.seed(10)
+	data_to_send = bytearray(random.getrandbits(8) for _ in range(1024*1024*100))
+	#
+	print('Generated bytes')
+	#
+	if sys.argv[1] == 'client':
+		import os
+		#
+		endpoint = ('127.0.0.1', 4747)
+		#endpoint = ('127.0.0.1', 8627)
+		#proxy = ('127.0.0.1', 9003+int(sys.argv[3])-1)
+		#proxy = ('127.0.0.1', 9003)
+		proxy = ('127.0.0.1', 12849)
+		#proxy = None
+		username = bytes([x for x in os.urandom(12) if x != 0])
+		#username = None
+		#
+		client = ClientConnectionProtocol(endpoint, data_to_send, proxy=proxy, username=username)
+		client.run()
+		#
+	elif sys.argv[1] == 'server':
+		import multiprocessing
+		import queue
+		#
+		endpoint = ('127.0.0.1', 4747)
+		processes = []
+		processes_map = {}
+		joinable_connections = multiprocessing.Queue()
+		conn_counter = [0]
+		group_queue = multiprocessing.Queue()
+		bw_queue = multiprocessing.Queue()
+		#
+		def data_callback(conn_id, data):
+			# check data here
+			print('Received {} MB'.format(len(data)/(1024**2)))
+			print('Data matches: {}'.format(data==data_to_send))
+		#
+		def start_server_conn(socket, conn_id):
+			server = ServerConnectionProtocol(socket, conn_id, data_callback=data_callback)
+			try:
+				server.run()
+			except KeyboardInterrupt:
+				socket.close()
+			finally:
+				joinable_connections.put(conn_id)
+			#
+		#
+		def accept_callback(socket):
+			conn_id = conn_counter[0]
+			conn_counter[0] += 1
+			#logging.debug('Adding connection %d', conn_id)
+			p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
+			processes.append(p)
+			processes_map[conn_id] = p
+			p.start()
+			socket.close()
+			# close this process' copy of the socket
+		#
+		l = basic_protocols.ServerListener(endpoint, accept_callback)
+		#
+		try:
+			while True:
+				l.accept()
+				try:
+					while True:
+						conn_id = joinable_connections.get(False)
+						p = processes_map[conn_id]
+						p.join()
+					#
+				except queue.Empty:
+					pass
+				#
+			#
+		except KeyboardInterrupt:
+			print()
+		#
+		for p in processes:
+			p.join()
+		#
+	#
+#

+ 141 - 0
src/log_system_usage.py

@@ -0,0 +1,141 @@
+#!/usr/bin/python3
+#
+import time
+import threading
+import json
+import gzip
+#
+PROC_STAT_HEADERS = ('user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice')
+#
+def get_cpu_stats(path='/proc/stat'):
+	"""
+	Get CPU statistics from /proc/stat. Output is returned for the system and each CPU.
+
+	Ex:
+
+	{'system': {'user': 8696397, 'nice': 22431, ...},
+	 'cpus': {
+	     0: {'user': 4199206, 'nice': 11615, ...},
+	     1: {'user': 4199308, 'nice': 10642, ...}
+	 }
+	}
+	"""
+	#
+	with open(path, 'r') as f:
+		lines = f.readlines()
+	#
+	cpu_lines = [l for l in lines if l.startswith('cpu')]
+	stats = {'system':None, 'cpus':{}}
+	#
+	for l in cpu_lines:
+		l_split = l.split()
+		cpu_index = l_split[0][3:]
+		cpu_stats = {x[0]: int(x[1]) for x in zip(PROC_STAT_HEADERS, l_split[1:])}
+		#
+		if cpu_index == '':
+			stats['system'] = cpu_stats
+		else:
+			stats['cpus'][int(cpu_index)] = cpu_stats
+		#
+	#
+	return stats
+#
+def calculate_cpu_usage(initial, current):
+	"""
+	Calculation adapted from: https://stackoverflow.com/questions/23367857/accurate-calculation-of-cpu-usage-given-in-percentage-in-linux/
+	"""
+	#
+	initial_idle = initial['idle'] + initial['iowait']
+	current_idle = current['idle'] + current['iowait']
+	#
+	initial_non_idle = initial['user'] + initial['nice'] + initial['system'] + initial['irq'] + initial['softirq'] + initial['steal']
+	current_non_idle = current['user'] + current['nice'] + current['system'] + current['irq'] + current['softirq'] + current['steal']
+	#
+	initial_total = initial_idle + initial_non_idle
+	current_total = current_idle + current_non_idle
+	#
+	return (current_non_idle-initial_non_idle)/(current_total-initial_total)
+#
+def log_cpu_stats(path, interval, stop_event):
+	"""
+	Log the cpu stats to a gz compressed JSON file. Storing JSON
+	seems to only use about 10% more disk space than storing
+	bytes directly (4 bytes per value), so JSON is used for
+	simplicity.
+
+	path: file to save to
+	interval: how many seconds to wait before getting more data
+	stop_event: a threading.Event which stops the function
+	"""
+	#
+	stats = {'timestamps':[], 'stats':{'system':[], 'cpus':{}}}
+	while not stop_event.is_set():
+		stats['timestamps'].append(time.time())
+		#stats['stats'].append(get_cpu_stats())
+		current_stats = get_cpu_stats()
+		stats['stats']['system'].append(current_stats['system'])
+		for cpu in current_stats['cpus']:
+			if cpu not in stats['stats']['cpus']:
+				stats['stats']['cpus'][cpu] = []
+			#
+			stats['stats']['cpus'][cpu].append(current_stats['cpus'][cpu])
+		#
+		stop_event.wait(interval)
+	#
+	with gzip.GzipFile(path, 'w') as f:
+		# json.dump writes a string, but a gzip.GzipFile can only write bytes, so monkey-patch it
+		old_write = f.write
+		f.write = lambda s: old_write(s.encode('utf-8'))
+		json.dump(stats, f)
+	#
+#
+def load_cpu_stats(path):
+	with gzip.GzipFile(path, 'r') as f:
+		return json.load(f)
+	#
+#
+'''
+def log_cpu_stats(path, interval, stop_event):
+	path: file to save to
+	interval: how many seconds to wait before getting more data
+	stop_event: a threading.Event which stops the function
+	#
+	with gzip.GzipFile(path+'.2.gz', 'w') as f:
+		f.write(' '.join(PROC_STAT_HEADERS).encode('utf-8'))
+		f.write('\n\n'.encode('utf-8'))
+		#
+		while not stop_event.is_set():
+			f.write(str(time.time()).encode('utf-8'))
+			f.write('\n'.encode('utf-8'))
+			stats = get_cpu_stats()
+			f.write('cpu '.encode('utf-8'))
+			#f.write(' '.join([str(stats['system'][x]) for x in PROC_STAT_HEADERS]).encode('utf-8'))
+			f.write(b''.join([stats['system'][x].to_bytes(4, byteorder='big') for x in PROC_STAT_HEADERS]))
+			f.write('\n'.encode('utf-8'))
+			for cpu in stats['cpus']:
+				f.write('cpu{} '.format(cpu).encode('utf-8'))
+				#f.write(' '.join([str(stats['cpus'][cpu][x]) for x in PROC_STAT_HEADERS]).encode('utf-8'))
+				f.write(b''.join([stats['cpus'][cpu][x].to_bytes(4, byteorder='big') for x in PROC_STAT_HEADERS]))
+				f.write('\n'.encode('utf-8'))
+			#
+			f.write('\n'.encode('utf-8'))
+			time.sleep(interval)
+		#
+	#
+#
+'''
+if __name__ == '__main__':
+	stop_event = threading.Event()
+	t = threading.Thread(target=log_cpu_stats, args=('/tmp/cpu_stats.json.gz', 0.5, stop_event))
+	t.start()
+	#
+	try:
+		while True:
+			time.sleep(100)
+		#
+	except KeyboardInterrupt:
+		stop_event.set()
+		print()
+	#
+	t.join()
+#

+ 22 - 0
src/start_clients.sh

@@ -0,0 +1,22 @@
+#!/bin/bash
+
+START_TIME="$(python3 -c 'import time; print(int(time.time())+5)')"
+
+for i in {1..30}; do
+	echo "Starting client ${i}..."
+	python3 throughput_client.py 127.0.0.1 4373 --wait "${START_TIME}" &
+	pids[${i}]=$!
+	python3 -c "import time; time.sleep(0.01)"
+done
+
+trap stop_everything INT
+
+function stop_everything(){
+	for pid in ${pids[*]}; do
+		kill $pid
+	done
+}
+
+for pid in ${pids[*]}; do
+	wait $pid
+done

+ 23 - 0
src/stem_test.py

@@ -0,0 +1,23 @@
+import stem.descriptor.remote
+
+
+#try:
+#  for desc in stem.descriptor.remote.get_consensus():
+#    print("found relay %s (%s)" % (desc.nickname, desc.fingerprint))
+#except Exception as exc:
+#  print("Unable to retrieve the consensus: %s" % exc
+
+endpoint = ('127.0.0.1', 8080)
+
+try:
+  consensus = stem.descriptor.remote.get_consensus(
+    endpoints = (stem.DirPort('127.0.0.1', 7000),)
+  )
+  print('got consensus: {}'.format(consensus))
+  for desc in consensus:
+    print("found relay %s (%s) - %s" % (desc.nickname, desc.fingerprint, desc.exit_policy))
+
+  for x in [desc for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]:
+    print('%s' % (x.nickname))
+except Exception as exc:
+  print("Unable to retrieve the consensus: %s" % exc)

+ 237 - 0
src/stress_tester.py

@@ -0,0 +1,237 @@
+#!/usr/bin/python3
+#
+import throughput_protocols
+import basic_protocols
+import useful
+import time
+import os
+import argparse
+import logging
+import socket
+import random
+import multiprocessing
+import stem.control
+import stem.descriptor.remote
+import base64
+import binascii
+#
+logging.getLogger('stem').setLevel(logging.WARNING)
+#
+def start_client_process(protocol, id_num, finished_queue):
+	p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
+	p.start()
+	return p
+#
+def run_client(protocol, id_num, finished_queue):
+	try:
+		print('Starting protocol (id: {})'.format(id_num))
+		protocol.run()
+		print('Done protocol (id: {})'.format(id_num))
+	finally:
+		finished_queue.put(id_num)
+	#
+#
+def parse_range(range_str):
+	return tuple(int(x) for x in range_str.split('-'))
+#
+def get_socks_ports(control_ports):
+	ports = []
+	#
+	for x in control_ports:
+		#print(x)
+		with stem.control.Controller.from_port(port=x) as controller:
+			controller.authenticate()
+			#
+			socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
+			#print(socks_addresses)
+			assert(len(socks_addresses) == 1)
+			assert(socks_addresses[0][0] == '127.0.0.1')
+			#
+			ports.append(socks_addresses[0][1])
+		#
+	#
+	return ports
+#
+if __name__ == '__main__':
+	logging.basicConfig(level=logging.DEBUG)
+	#
+	parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
+	parser.add_argument('ip', type=str, help='destination ip address')
+	parser.add_argument('port', type=int, help='destination port')
+	parser.add_argument('num_bytes', type=useful.parse_bytes,
+	                    help='number of bytes to send (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
+	parser.add_argument('proxy_control_port_range', type=parse_range, help='range of ports for the control ports')
+	#parser.add_argument('--proxy', type=str, help='proxy ip address and port', metavar=('ip','port'), nargs=2)
+	#parser.add_argument('--fake-proxy', action='store_true', help='connecting to a fake-tor proxy')
+	parser.add_argument('--wait', type=int,
+	                    help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
+	parser.add_argument('--buffer-len', type=useful.parse_bytes,
+	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
+	parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
+	args = parser.parse_args()
+	#
+	endpoint = (args.ip, args.port)
+	proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
+	#
+	streams_per_client = 3
+	#
+	socks_ports = get_socks_ports(proxy_control_ports)
+	#
+	try:
+		consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
+		#
+		relay_fingerprints = [desc.fingerprint for desc in consensus]
+		exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
+	except Exception as e:
+		raise Exception('Unable to retrieve the consensus') from e
+	#
+	print('Num socks ports: {}'.format(len(socks_ports)))
+	print('Num relays: {}'.format(len(relay_fingerprints)))
+	print('Num exits: {}'.format(len(exit_fingerprints)))
+	#
+	assert(len(relay_fingerprints) >= len(socks_ports)*3+1)
+	assert(len(exit_fingerprints) >= len(socks_ports)+1)
+	#
+	remaining_relays = list(relay_fingerprints)
+	#
+	target_relay = exit_fingerprints[0]
+	remaining_relays = list(set(remaining_relays)-set([target_relay]))
+	exit_relays = exit_fingerprints[1:1+len(socks_ports)]
+	remaining_relays = list(set(remaining_relays)-set(exit_fingerprints))
+	guard_relays = remaining_relays[:len(socks_ports)]
+	remaining_relays = list(set(remaining_relays)-set(guard_relays))
+	middle_relays = remaining_relays[:len(socks_ports)]
+	remaining_relays = list(set(remaining_relays)-set(middle_relays))
+	#
+	exit_relays = list(exit_relays)
+	guard_relays = list(guard_relays)
+	#
+	controllers = []
+	#
+	controller_circuits = {}
+	fraction_middle = 1
+	#
+	for x in range(len(proxy_control_ports)):
+		#with stem.control.Controller.from_port(port=x) as controller:
+		controller = stem.control.Controller.from_port(port=proxy_control_ports[x])
+		controller.authenticate()
+		#
+		controller_circuits[controller] = []
+		for y in range(streams_per_client):
+			if (x*streams_per_client+y)/(len(proxy_control_ports)*streams_per_client+y) < fraction_middle:
+				circuit = [random.choice(guard_relays), target_relay, random.choice(exit_relays)]
+			else:
+				circuit = [random.choice(guard_relays), random.choice(middle_relays), target_relay]
+			#
+			#circuit = [random.choice(guard_relays), random.choice(middle_relays), random.choice(exit_relays)]
+			#circuit = [middle_relay, random.choice(exit_relays), random.choice(guard_relays)]
+			#circuit = [random.choice(exit_relays), random.choice(guard_relays), middle_relay]
+			print('New circuit #{}'.format(y))
+			print(circuit)
+			circuit_id = controller.new_circuit(circuit, await_build=True)
+			controller_circuits[controller].append(circuit_id)
+		#
+		def attach_stream(stream, controller):
+			#print(stream)
+			#print(controller)
+			#print(circuit_id)
+			if stream.status == 'NEW' and stream.purpose == 'USER':
+				print('Attaching (num_remaining={})'.format(len(controller_circuits[controller])-1))
+				#controller.attach_stream(stream.id, circuit_id)
+				controller.attach_stream(stream.id, controller_circuits[controller][0])
+				controller_circuits[controller] = controller_circuits[controller][1:]
+			#
+		#
+		controller.add_event_listener(lambda x, controller=controller: attach_stream(x, controller), stem.control.EventType.STREAM)
+		controller.set_conf('__LeaveStreamsUnattached', '1')
+		controllers.append(controller)
+		#
+	#
+	processes = {}
+	process_counter = 0
+	finished_processes = multiprocessing.Queue()
+	#
+	for y in range(streams_per_client):
+		for x in socks_ports:
+			client_socket = socket.socket()
+			protocols = []
+			#
+			proxy_username = bytes([z for z in os.urandom(12) if z != 0])
+			proxy_endpoint = ('127.0.0.1', x)
+			#
+			logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
+			client_socket.connect(proxy_endpoint)
+			logging.debug('Socket %d connected', client_socket.fileno())
+			#
+			proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
+			protocols.append(proxy_protocol)
+			#
+			throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
+														 wait_until=args.wait,
+														 send_buffer_len=args.buffer_len,
+														 use_acceleration=(not args.no_accel))
+			protocols.append(throughput_protocol)
+			#
+			combined_protocol = basic_protocols.ChainedProtocol(protocols)
+			processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
+			process_counter += 1
+			client_socket.close()
+			#
+			time.sleep(0.01)
+		#
+	#
+	print('Starting in {:.2f} seconds'.format(args.wait-time.time()))
+	#
+	try:
+		while len(processes) > 0:
+			print('Waiting for processes ({} left)'.format(len(processes)))
+			p_id = finished_processes.get()
+			p = processes[p_id]
+			p.join()
+			processes.pop(p_id)
+		#
+	except KeyboardInterrupt as e:
+		print()
+		for p_id in processes:
+			processes[p_id].terminate()
+		#
+	#
+	print('Processes finished')
+	#
+	for c in controllers:
+		c.close()
+	#
+#
+
+
+
+# old code, keeping just in case
+
+	'''
+	with stem.control.Controller.from_port(port=proxy_control_ports[0]) as controller:
+		controller.authenticate()
+		#print(controller.get_version())
+		#print(stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
+		#print(controller.get_version() >= stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
+		#print('-------')
+		#print([x.exit_policy for x in controller.get_network_statuses()])
+		relay_fingerprints = list(set([desc.fingerprint for desc in controller.get_network_statuses()]))
+		#print(relay_fingerprints)
+		relay_digest_map = {desc.digest: desc.fingerprint for desc in controller.get_network_statuses()}
+		print(relay_digest_map)
+		relay_exit_digests = list(set([desc.digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)]))
+		#print(relay_exit_digests)
+		print([desc.microdescriptor_digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)])
+		print([binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40] for digest in relay_exit_digests])
+		relay_exits = list(set([relay_digest_map[binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40]] for digest in relay_exit_digests]))
+		#print(relay_exits)
+		#
+		#print(dir(list(controller.get_network_statuses())[0]))
+		#print(type(list(controller.get_network_statuses())[0]))
+		#print([desc for desc in controller.get_microdescriptors()])
+		#print([desc.exit_policy for desc in controller.get_microdescriptors()])
+		#print([desc.exit_policy.can_exit_to(*endpoint) for desc in controller.get_microdescriptors()])
+		#print([desc.fingerprint for desc in controller.get_microdescriptors()])
+		#print([desc.flags for desc in controller.get_microdescriptors()])
+	#
+	'''

+ 402 - 0
src/test_relay.py

@@ -0,0 +1,402 @@
+#!/usr/bin/python3
+#
+import throughput_protocols
+import basic_protocols
+import useful
+import time
+import os
+import argparse
+import logging
+import socket
+import random
+import multiprocessing
+import stem.control
+import stem.descriptor.remote
+import stem.process
+import base64
+import binascii
+#
+logging.getLogger('stem').setLevel(logging.WARNING)
+#
+def start_client_process(protocol, id_num, finished_queue):
+	p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
+	p.start()
+	return p
+#
+def run_client(protocol, id_num, finished_queue):
+	had_error = False
+	try:
+		logging.info('Starting protocol (id: {})'.format(id_num))
+		protocol.run()
+		logging.info('Done protocol (id: {})'.format(id_num))
+	except:
+		had_error = True
+		logging.warning('Protocol error')
+		logging.exception('Protocol id: {} had an error'.format(id_num))
+	finally:
+		finished_queue.put((id_num, had_error))
+	#
+#
+def parse_range(range_str):
+	return tuple(int(x) for x in range_str.split('-'))
+#
+def get_socks_port(control_port):
+	with stem.control.Controller.from_port(port=control_port) as controller:
+		controller.authenticate()
+		#
+		socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
+		#logging.info(socks_addresses)
+		assert(len(socks_addresses) == 1)
+		assert(socks_addresses[0][0] == '127.0.0.1')
+		#
+		return socks_addresses[0][1]
+	#
+#
+def send_measureme(controller, circuit_id, measureme_id, hop):
+	response = controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
+	stem.response.convert('SINGLELINE', response)
+	#
+	if not response.is_ok():
+		if response.code in ('512', '552'):
+			if response.message.startswith('Unknown circuit '):
+				raise stem.InvalidArguments(response.code, response.message, [circuit_id])
+			#
+			raise stem.InvalidRequest(response.code, response.message)
+		else:
+			raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
+		#
+	#
+#
+def connect_and_send_measureme(control_port, circuit_id, measureme_id, hop):
+	with stem.control.Controller.from_port(port=control_port) as controller:
+		controller.authenticate()
+		send_measureme(controller, circuit_id, measureme_id, hop)
+	#
+#
+def send_all_measuremes(controller, circuit_id, measureme_id):
+	send_measureme(controller, circuit_id, measureme_id, 2)
+	send_measureme(controller, circuit_id, measureme_id, 0)
+#
+def push_start_measureme_cb(control_port, circuit_id, measureme_id, wait_until, hops):
+	logging.info('Sending measuremes to control port {}, then sleeping'.format(control_port))
+	with stem.control.Controller.from_port(port=control_port) as controller:
+		controller.authenticate()
+		for hop in hops:
+			send_measureme(controller, circuit_id, measureme_id, hop)
+		#
+	#
+	time.sleep(wait_until-time.time())
+#
+if __name__ == '__main__':
+	logging.basicConfig(level=logging.DEBUG)
+	#
+	parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
+	parser.add_argument('ip', type=str, help='destination ip address')
+	parser.add_argument('port', type=int, help='destination port')
+	parser.add_argument('num_bytes', type=useful.parse_bytes,
+	                    help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
+	parser.add_argument('num_clients', type=int, help='number of Tor clients to start', metavar='num-clients')
+	parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
+	#parser.add_argument('--wait', type=int,
+	#                    help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
+	parser.add_argument('--buffer-len', type=useful.parse_bytes,
+	                    help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
+	parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
+	parser.add_argument('--wait-range', type=int, default=0,
+	                    help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
+	parser.add_argument('--proxy-control-port-range', type=parse_range, help='range of ports for the control ports')
+	parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
+	args = parser.parse_args()
+	#
+	endpoint = (args.ip, args.port)
+	#
+	streams_per_client = args.num_streams_per_client
+	#
+	if args.num_clients > 0:
+		num_clients = args.num_clients
+		socks_port_start = 9551
+		control_port_start = 12001
+		target_relay = 'BCEDF6C193AA687AE471B8A22EBF6BC57C2D285E' # gurgle
+		other_relays = list(set(['BC630CBBB518BE7E9F4E09712AB0269E9DC7D626', '18CFB7BA07F13AEABF50A7148786DA68773B2498', 'B771AA877687F88E6F1CA5354756DF6C8A7B6B24', '204DFD2A2C6A0DC1FA0EACB495218E0B661704FD', 'F6740DEABFD5F62612FA025A5079EA72846B1F67', 'A21D24309FD17F57395CDB0B4A3B813AE73FBC5A', 'B204DE75B37064EF6A4C6BAF955C5724578D0B32', '874D84382C892F3F61CC9E106BF08843DE0B865A', '25990FC54D7268C914170A118EE4EE75025451DA', 'B872BA6804C8C6E140AE1897B44CF32B42FD2397', 'B143D439B72D239A419F8DCE07B8A8EB1B486FA7', 'DA4B488C2826DFBBD04D635DA1E71A2BA5B20747', 'D80EA21626BFAE8044E4037FE765252E157E3586']))
+		# bad relays = ['3F62F05E859D7F98B086F702A31F7714D566E49A', '8EE0534532EA31AA5172B1892F53B2F25C76EB02', '7DC52AE6667A30536BA2383CD102CFC24F20AD71']
+		# no longer exist = ['DBAD17D706E2B6D5D917C2077961750513BDF879']
+		#
+		logging.info('Getting consensus')
+		#
+		try:
+			consensus = stem.descriptor.remote.get_consensus()
+			#relay_fingerprints = [desc.fingerprint for desc in consensus]
+			logging.info([desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==target_relay])
+		except Exception as e:
+			raise Exception('Unable to retrieve the consensus') from e
+		#
+		logging.info('Starting tor processes')
+		#
+		clients = []
+		#
+		try:
+			for client_index in range(num_clients):
+				# start a tor client
+				#
+				socks_port = socks_port_start+client_index
+				control_port = control_port_start+client_index
+				#
+				tor_process = stem.process.launch_tor_with_config(
+					config = {
+						'Log': ['notice file /tmp/tor{}/log'.format(client_index), 'notice stdout'],
+						'TruncateLogFile': '1',
+						'MeasuremeLogFile': '/ramdisk/sengler/real/measureme-{}.log'.format(client_index),
+						'SafeLogging': 'relay',
+						'SocksPort': str(socks_port),
+						'ControlPort': str(control_port),
+						'DataDirectory': '/tmp/tor{}'.format(client_index),
+					}
+				)
+				clients.append({'socks_port':socks_port, 'control_port':control_port, 'process':tor_process})
+				logging.info('Started '+str(client_index))
+			#
+		except:
+			for c in clients:
+				if 'process' in c:
+					c['process'].kill()
+				#
+			#
+			raise
+		#
+	else:
+		proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
+		socks_ports = [get_socks_port(x) for x in proxy_control_ports]
+		#
+		logging.info('Getting consensus')
+		#
+		try:
+			consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
+			#
+			relay_fingerprints = [desc.fingerprint for desc in consensus]
+			exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
+		except Exception as e:
+			raise Exception('Unable to retrieve the consensus') from e
+		#
+		print('Num socks ports: {}'.format(len(socks_ports)))
+		print('Num relays: {}'.format(len(relay_fingerprints)))
+		print('Num exits: {}'.format(len(exit_fingerprints)))
+		#
+		assert(len(relay_fingerprints) >= 2)
+		assert(len(exit_fingerprints) == 1)
+		#
+		target_relay = exit_fingerprints[0]
+		other_relays = list(set(relay_fingerprints)-set(exit_fingerprints))
+		#
+		clients = []
+		#
+		for client_index in range(len(proxy_control_ports)):
+			socks_port = socks_ports[client_index]
+			control_port = proxy_control_ports[client_index]
+			#
+			clients.append({'socks_port':socks_port, 'control_port':control_port})
+		#
+	#
+	######################################3
+	#
+	controllers = []
+	#
+	for client_index in range(len(clients)):
+		# make connections to client control ports
+		#
+		connection = stem.control.Controller.from_port(port=clients[client_index]['control_port'])
+		connection.authenticate()
+		#
+		controllers.append({'connection':connection, 'id':client_index})
+	#
+	all_circuits_okay = True
+	#
+	for controller in controllers:
+		# for each client, override the circuits for new streams
+		#
+		controller['circuits_remaining'] = []
+		controller['circuit_ids'] = []
+		controller['circuit_verbose'] = []
+		#
+		logging.info('Setting up controller id={}'.format(controller['id']))
+		#
+		for y in range(streams_per_client):
+			circuit_id = None
+			#
+			while circuit_id is None:
+				#circuit = [random.choice(relay_fingerprints), target_relay]
+				first_relay = random.choice(other_relays)
+				exit_relay = target_relay
+				#circuit = [target_relay, exit_relay]
+				circuit = [first_relay, exit_relay]
+				#
+				#if [desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==exit_relay][0] is False:
+				#	logging.info('Relay {} can\'t exit!'.format(exit_relay))
+				#	all_circuits_okay = False
+				#
+				try:
+					circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
+					logging.info('New circuit (id={}): {}'.format(circuit_id, circuit))
+				except stem.CircuitExtensionFailed:
+					logging.info('Failed circuit: {}'.format(circuit))
+					logging.warning('Circuit creation failed. Retrying...')
+				#
+			#
+			#try:
+			#	circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
+			#except stem.CircuitExtensionFailed:
+			#	for c in controllers:
+			#		c['connection'].close()
+			#	#
+			#	for c in clients:
+			#		c['process'].kill()
+			#	#
+			#	raise
+			#
+			controller['circuits_remaining'].append(circuit_id)
+			controller['circuit_ids'].append(circuit_id)
+			controller['circuit_verbose'].append(circuit)
+			time.sleep(0.5)#1.5
+		#
+		def attach_stream(stream, controller):
+			try:
+				if stream.status == 'NEW':
+					# by default, let tor handle new streams
+					circuit_id = 0
+					#
+					if stream.purpose == 'USER':
+						# this is probably one of our streams (although not guaranteed)
+						circuit_id = controller['circuits_remaining'][0]
+						controller['circuits_remaining'] = controller['circuits_remaining'][1:]
+					#
+					try:
+						controller['connection'].attach_stream(stream.id, circuit_id)
+						#logging.info('Attaching to circuit {}'.format(circuit_id))
+					except stem.UnsatisfiableRequest:
+						if stream.purpose != 'USER':
+							# could not attach a non-user stream, so probably raised:
+							# stem.UnsatisfiableRequest: Connection is not managed by controller.
+							# therefore we should ignore this exception
+							pass
+						else:
+							raise
+						#
+					#
+				#
+			except:
+				logging.exception('Error while attaching the stream (controller_id={}, circuit_id={}).'.format(controller['id'], circuit_id))
+				raise
+			#
+		#
+		controller['connection'].add_event_listener(lambda x, controller=controller: attach_stream(x, controller),
+		                                            stem.control.EventType.STREAM)
+		controller['connection'].set_conf('__LeaveStreamsUnattached', '1')
+	#
+	'''
+	if not all_circuits_okay:
+		for c in clients:
+			if 'process' in c:
+				c['process'].kill()
+			#
+		#
+		raise Exception('Not all circuits can exit! Stopping...')
+	#
+	'''
+	processes = {}
+	circuits = {}
+	process_counter = 0
+	finished_processes = multiprocessing.Queue()
+	#
+	logging.info('Starting protocols')
+	#
+	wait_time = int(time.time()+30)
+	#
+	for stream_index in range(streams_per_client):
+		for client_index in range(len(clients)):
+			client_socket = socket.socket()
+			protocols = []
+			#
+			proxy_username = bytes([z for z in os.urandom(12) if z != 0])
+			proxy_endpoint = ('127.0.0.1', clients[client_index]['socks_port'])
+			#
+			logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
+			client_socket.connect(proxy_endpoint)
+			logging.debug('Socket %d connected', client_socket.fileno())
+			#
+			proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
+			protocols.append(proxy_protocol)
+			#
+			wait_until = wait_time+random.randint(0, args.wait_range)
+			#
+			if args.measureme:
+				control_port = clients[client_index]['control_port']
+				controller = controllers[client_index]['connection']
+				circuit_id = controllers[client_index]['circuit_ids'][stream_index]
+				measureme_id = stream_index*streams_per_client + client_index + 1
+				#print('Data: {}, {}'.format(circuit_id, measureme_id))
+				#measureme_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id: connect_and_send_measureme(control_port, circuit_id, measureme_id, 2)   <---- need to change this to also send to hop 0
+				#measureme_cb = lambda controller=controller, circuit_id=circuit_id, measureme_id=measureme_id: send_all_measuremes(controller, circuit_id, measureme_id)
+				#measureme_cb()
+				if args.num_clients==0:
+					# using Chutney
+					hops = [2, 1, 0]
+				else:
+					hops = [0]
+				#
+				start_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id, \
+				                  until=wait_until, hops=hops: \
+				                  push_start_measureme_cb(control_port, circuit_id, measureme_id, until, hops)
+				circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]+':'+str(measureme_id)).encode('utf-8')
+			else:
+				start_cb = lambda until=wait_until: time.sleep(until-time.time())
+				circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]).encode('utf-8')
+			#
+			throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
+#														 wait_until=wait_time+random.randint(0, args.wait_range),
+			                                             group_id=wait_time,
+			                                             custom_data=circuit_bytes,
+														 send_buffer_len=args.buffer_len,
+														 use_acceleration=(not args.no_accel),
+			                                             push_start_cb=start_cb)
+			protocols.append(throughput_protocol)
+			#
+			combined_protocol = basic_protocols.ChainedProtocol(protocols)
+			processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
+			circuits[process_counter] = controllers[client_index]['circuit_verbose'][stream_index]
+			process_counter += 1
+			client_socket.close()
+			#
+			time.sleep(0.01)
+		#
+	#
+	if wait_time is not None:
+		logging.info('Starting in {:.2f} seconds'.format(wait_time-time.time()))
+	#
+	try:
+		while len(processes) > 0:
+			logging.info('Waiting for processes ({} left)'.format(len(processes)))
+			(p_id, error) = finished_processes.get()
+			p = processes[p_id]
+			p.join()
+			processes.pop(p_id)
+			if error:
+				logging.info('Circuit with error: '+str(circuits[p_id]))
+			#
+			circuits.pop(p_id)
+		#
+	except KeyboardInterrupt as e:
+		print()
+		for p_id in processes:
+			processes[p_id].terminate()
+		#
+	#
+	logging.info('Processes finished')
+	#
+	for c in controllers:
+		c['connection'].close()
+	#
+	for c in clients:
+		if 'process' in c:
+			c['process'].terminate()
+		#
+	#
+#

+ 21 - 1
src/throughput_client.py

@@ -17,6 +17,7 @@ if __name__ == '__main__':
 	parser.add_argument('num_bytes', type=useful.parse_bytes,
 	                    help='number of bytes to send (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
 	parser.add_argument('--proxy', type=str, help='proxy ip address and port', metavar=('ip','port'), nargs=2)
+	parser.add_argument('--fake-proxy', action='store_true', help='connecting to a fake-tor proxy')
 	parser.add_argument('--wait', type=int,
 	                    help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
 	parser.add_argument('--buffer-len', type=useful.parse_bytes,
@@ -24,6 +25,15 @@ if __name__ == '__main__':
 	parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
 	args = parser.parse_args()
 	#
+	'''
+	endpoint = ('127.0.0.1', 4741)
+	#endpoint = ('127.0.0.1', 8627)
+	#proxy = ('127.0.0.1', 9003+int(sys.argv[3])-1)
+	proxy = ('127.0.0.1', int(sys.argv[3]))
+	#proxy = ('127.0.0.1', 9003)
+	#proxy = ('127.0.0.1', 12849)
+	#proxy = None
+	'''
 	#
 	endpoint = (args.ip, args.port)
 	client_socket = socket.socket()
@@ -32,7 +42,7 @@ if __name__ == '__main__':
 	if args.proxy is None:
 		logging.debug('Socket %d connecting to endpoint %r...', client_socket.fileno(), endpoint)
 		client_socket.connect(endpoint)
-	else:
+	elif not args.fake_proxy:
 		proxy_username = bytes([x for x in os.urandom(12) if x != 0])
 		proxy_endpoint = (args.proxy[0], int(args.proxy[1]))
 		#
@@ -41,9 +51,19 @@ if __name__ == '__main__':
 		#
 		proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
 		protocols.append(proxy_protocol)
+	elif args.fake_proxy:
+		proxy_endpoint = (args.proxy[0], int(args.proxy[1]))
+		#
+		logging.debug('Socket %d connecting to fake proxy %r...', client_socket.fileno(), proxy_endpoint)
+		client_socket.connect(proxy_endpoint)
+		#
+		proxy_protocol = basic_protocols.FakeProxyProtocol(client_socket, endpoint)
+		protocols.append(proxy_protocol)
 	#
+	group_id = int(args.wait*1000) if args.wait is not None else None
 	throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
 	                                             wait_until=args.wait,
+	                                             group_id=group_id,
 	                                             send_buffer_len=args.buffer_len,
 	                                             use_acceleration=(not args.no_accel))
 	protocols.append(throughput_protocol)

+ 38 - 9
src/throughput_protocols.py

@@ -7,19 +7,22 @@ import time
 import socket
 #
 class ClientProtocol(basic_protocols.Protocol):
-	def __init__(self, socket, total_bytes, wait_until=None, send_buffer_len=None, use_acceleration=None):
+	def __init__(self, socket, total_bytes, group_id=None, send_buffer_len=None, use_acceleration=None, custom_data=b'', push_start_cb=None, push_done_cb=None): #wait_until=None
 		self.socket = socket
 		self.total_bytes = total_bytes
-		self.wait_until = wait_until
+		#self.wait_until = wait_until
 		self.send_buffer_len = send_buffer_len
 		self.use_acceleration = use_acceleration
+		self.custom_data = custom_data
+		self.push_start_cb = push_start_cb
+		self.push_done_cb = push_done_cb
+		self.group_id = group_id if group_id is not None else 0
+		# a group id of 0 means no group
 		#
-		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_GROUP_ID WAIT PUSH_DATA DONE')
+		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_GROUP_ID SEND_CUSTOM_DATA PUSH_DATA DONE') #WAIT
 		self.state = self.states.READY_TO_BEGIN
 		#
 		self.sub_protocol = None
-		self.group_id = int(self.wait_until*1000) if self.wait_until is not None else 0
-		# a group id of 0 means no group
 	#
 	def _run_iteration(self):
 		if self.state is self.states.READY_TO_BEGIN:
@@ -29,9 +32,22 @@ class ClientProtocol(basic_protocols.Protocol):
 		#
 		if self.state is self.states.SEND_GROUP_ID:
 			if self.sub_protocol.run():
-				self.state = self.states.WAIT
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, self.custom_data)
+				self.state = self.states.SEND_CUSTOM_DATA
+			#
+		#
+		if self.state is self.states.SEND_CUSTOM_DATA:
+			if self.sub_protocol.run():
+				#self.state = self.states.WAIT
+				self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
+				                                                     send_buffer_len=self.send_buffer_len,
+				                                                     use_acceleration=self.use_acceleration,
+				                                                     push_start_cb=self.push_start_cb,
+				                                                     push_done_cb=self.push_done_cb)
+				self.state = self.states.PUSH_DATA
 			#
 		#
+		'''
 		if self.state is self.states.WAIT:
 			if self.wait_until is not None:
 				time.sleep(self.wait_until-time.time())
@@ -39,10 +55,13 @@ class ClientProtocol(basic_protocols.Protocol):
 			if self.wait_until is None or time.time() >= self.wait_until:
 				self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
 				                                                     send_buffer_len=self.send_buffer_len,
-				                                                     use_acceleration=self.use_acceleration)
+				                                                     use_acceleration=self.use_acceleration,
+				                                                     push_start_cb=self.push_start_cb,
+				                                                     push_done_cb=self.push_done_cb)
 				self.state = self.states.PUSH_DATA
 			#
 		#
+		'''
 		if self.state is self.states.PUSH_DATA:
 			if self.sub_protocol.run():
 				self.state = self.states.DONE
@@ -62,10 +81,11 @@ class ServerProtocol(basic_protocols.Protocol):
 		self.bandwidth_callback = bandwidth_callback
 		self.use_acceleration = use_acceleration
 		#
-		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID PULL_DATA DONE')
+		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID RECV_CUSTOM_DATA PULL_DATA DONE')
 		self.state = self.states.READY_TO_BEGIN
 		#
 		self.sub_protocol = None
+		self.custom_data = None
 	#
 	def _run_iteration(self):
 		if self.state is self.states.READY_TO_BEGIN:
@@ -80,6 +100,14 @@ class ServerProtocol(basic_protocols.Protocol):
 					group_id = None
 				#
 				self.group_id_callback(self.conn_id, group_id)
+				self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
+				self.state = self.states.RECV_CUSTOM_DATA
+			#
+		#
+		if self.state is self.states.RECV_CUSTOM_DATA:
+			if self.sub_protocol.run():
+				self.custom_data = self.sub_protocol.received_data
+				#
 				self.sub_protocol = basic_protocols.PullDataProtocol(self.socket, use_acceleration=self.use_acceleration)
 				self.state = self.states.PULL_DATA
 			#
@@ -87,7 +115,8 @@ class ServerProtocol(basic_protocols.Protocol):
 		if self.state is self.states.PULL_DATA:
 			if self.sub_protocol.run():
 				if self.bandwidth_callback:
-					self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate())
+					#self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate(), self.sub_protocol.byte_counter, self.sub_protocol.byte_counter_start_time)
+					self.bandwidth_callback(self.conn_id, self.custom_data, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate(), self.sub_protocol.deltas)
 				#
 				self.state = self.states.DONE
 			#

+ 159 - 5
src/throughput_server.py

@@ -4,10 +4,31 @@ import throughput_protocols
 import basic_protocols
 import os
 import multiprocessing
+import threading
 import queue
 import logging
 import argparse
 #
+def overlap_byte_counters(byte_counters):
+	start_time = None
+	finish_time = None
+	for x in byte_counters:
+		if start_time is None or x['start_time'] < start_time:
+			start_time = x['start_time']
+		#
+		if finish_time is None or x['start_time']+len(x['history']) > finish_time:
+			finish_time = x['start_time']+len(x['history'])
+		#
+	#
+	total_history = [0]*(finish_time-start_time)
+	#
+	for x in byte_counters:
+		for y in range(len(x['history'])):
+			total_history[(x['start_time']-start_time)+y] += x['history'][y]
+		#
+	#
+	return total_history
+#
 if __name__ == '__main__':
 	logging.basicConfig(level=logging.DEBUG)
 	#
@@ -25,18 +46,23 @@ if __name__ == '__main__':
 	processes = []
 	processes_map = {}
 	joinable_connections = multiprocessing.Queue()
+	joinable_connections_list = []
 	conn_counter = [0]
 	group_queue = multiprocessing.Queue()
+	group_queue_list = []
 	bw_queue = multiprocessing.Queue()
+	bw_queue_list = []
 	#
 	def group_id_callback(conn_id, group_id):
 		# put them in a queue to display later
 		#logging.debug('For conn %d Received group id: %d', conn_id, group_id)
 		group_queue.put({'conn_id':conn_id, 'group_id':group_id})
 	#
-	def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate):
+	#def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate, byte_counter, byte_counter_start_time):
+	def bw_callback(conn_id, custom_data, data_size, time_first_byte, time_last_byte, transfer_rate, deltas):
 		# put them in a queue to display later
-		bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate})
+		#bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'byte_counter':byte_counter, 'byte_counter_start_time':byte_counter_start_time})
+		bw_queue.put({'conn_id':conn_id, 'custom_data':custom_data, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'deltas':deltas})
 	#
 	def start_server_conn(socket, conn_id):
 		server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback,
@@ -47,6 +73,26 @@ if __name__ == '__main__':
 			socket.close()
 		finally:
 			joinable_connections.put(conn_id)
+			'''
+			while True:
+				# while we're waiting to join, we might get a KeyboardInterrupt,
+				# in which case we cannot let the process end since it will kill
+				# the queue threads, which may be waiting to push data to the pipe
+				try:
+					joinable_connections.close()
+					group_queue.close()
+					bw_queue.close()
+					#
+					group_queue.join_thread()
+					bw_queue.join_thread()
+					joinable_connections.join_thread()
+					#
+					break
+				except KeyboardInterrupt:
+					pass
+				#
+			#
+			'''
 		#
 	#
 	def accept_callback(socket):
@@ -60,11 +106,32 @@ if __name__ == '__main__':
 		socket.close()
 		# close this process' copy of the socket
 	#
+	def unqueue(q, l, print_len=False):
+		while True:
+			val = q.get()
+			if val is None:
+				break
+			#
+			l.append(val)
+			if print_len:
+				print('Queue length: {}'.format(len(l)), end='\r')
+			#
+		#
+	#
 	l = basic_protocols.ServerListener(endpoint, accept_callback)
 	#
+	t_joinable_connections = threading.Thread(target=unqueue, args=(joinable_connections, joinable_connections_list))
+	t_group_queue = threading.Thread(target=unqueue, args=(group_queue, group_queue_list))
+	t_bw_queue = threading.Thread(target=unqueue, args=(bw_queue, bw_queue_list, True))
+	#
+	t_joinable_connections.start()
+	t_group_queue.start()
+	t_bw_queue.start()
+	#
 	try:
 		while True:
 			l.accept()
+			'''
 			try:
 				while True:
 					conn_id = joinable_connections.get(False)
@@ -74,20 +141,56 @@ if __name__ == '__main__':
 			except queue.Empty:
 				pass
 			#
+			'''
 		#
 	except KeyboardInterrupt:
 		print()
 		#
+		try:
+			for p in processes:
+				p.join()
+			#
+		except KeyboardInterrupt:
+			pass
+		#
+		joinable_connections.put(None)
+		group_queue.put(None)
+		bw_queue.put(None)
+		t_joinable_connections.join()
+		t_group_queue.join()
+		t_bw_queue.join()
+		#
 		bw_values = {}
 		group_values = {}
 		#
+		'''
+		logging.info('BW queue length: {}'.format(bw_queue.qsize()))
+		logging.info('Group queue length: {}'.format(group_queue.qsize()))
+		#
+		temp_counter = 0
+		try:
+			while True:
+				bw_val = bw_queue.get(False)
+				bw_values[bw_val['conn_id']] = bw_val
+				temp_counter += 1
+			#
+		except queue.Empty:
+			pass
+		#
+		logging.info('temp counter: {}'.format(temp_counter))
+		import time
+		time.sleep(2)
 		try:
 			while True:
 				bw_val = bw_queue.get(False)
 				bw_values[bw_val['conn_id']] = bw_val
+				temp_counter += 1
 			#
 		except queue.Empty:
 			pass
+		#
+		logging.info('temp counter: {}'.format(temp_counter))
+		
 		#
 		try:
 			while True:
@@ -97,6 +200,21 @@ if __name__ == '__main__':
 		except queue.Empty:
 			pass
 		#
+		logging.info('bw_values length: {}'.format(len(bw_values)))
+		logging.info('group_values length: {}'.format(len(group_values)))
+		logging.info('group_values set: {}'.format(list(set([x['group_id'] for x in group_values.values()]))))
+		#
+		'''
+		#
+		#logging.info('BW list length: {}'.format(len(bw_queue_list)))
+		#logging.info('Group list length: {}'.format(len(group_queue_list)))
+		#
+		for x in bw_queue_list:
+			bw_values[x['conn_id']] = x
+		#
+		for x in group_queue_list:
+			group_values[x['conn_id']] = x
+		#
 		group_set = set([x['group_id'] for x in group_values.values()])
 		for group in group_set:
 			# doesn't handle group == None
@@ -108,9 +226,45 @@ if __name__ == '__main__':
 				total_transfer_rate = sum([x['data_size'] for x in in_group])/(max([x['time_of_last_byte'] for x in in_group])-min([x['time_of_first_byte'] for x in in_group]))
 				#
 				logging.info('Group size: %d', len(in_group))
-				logging.info('Avg Transferred (MB): %.4f', avg_data_size/(1024**2))
-				logging.info('Avg Transfer rate (MB/s): %.4f', avg_transfer_rate/(1024**2))
-				logging.info('Total Transfer rate (MB/s): %.4f', total_transfer_rate/(1024**2))
+				logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
+				logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
+				logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
+				#
+				'''
+				import math
+				histories = [{'start_time':x['byte_counter_start_time'], 'history':x['byte_counter']} for x in in_group]
+				total_history = overlap_byte_counters(histories)
+				#
+				logging.info('Max Transfer rate (MiB/s): %.4f', max(total_history)/(1024**2))
+				if sum(total_history) != sum([x['data_size'] for x in in_group]):
+					logging.warning('History doesn\'t add up ({} != {}).'.format(sum(total_history), sum([x['data_size'] for x in in_group])))
+				#
+				import json
+				with open('/tmp/group-{}.json'.format(group), 'w') as f:
+					json.dump({'id':group, 'history':total_history, 'individual_histories':histories, 'size':len(in_group), 'avg_transferred':avg_data_size,
+					           'avg_transfer_rate':avg_transfer_rate, 'total_transfer_rate':total_transfer_rate}, f)
+				#
+				'''
+				custom_data = [x['custom_data'].decode('utf-8') for x in in_group]
+				#
+				histories = [x['deltas'] for x in in_group]
+				combined_timestamps, combined_bytes = zip(*sorted(zip([x for y in histories for x in y['timestamps']],
+				                                                      [x for y in histories for x in y['bytes']])))
+				combined_history = {'bytes':combined_bytes, 'timestamps':combined_timestamps}
+				#combined_history = sorted([item for sublist in histories for item in sublist['deltas']], key=lambda x: x['timestamp'])
+				#
+				sum_history_bytes = sum(combined_history['bytes'])
+				sum_data_bytes = sum([x['data_size'] for x in in_group])
+				if sum_history_bytes != sum_data_bytes:
+					logging.warning('History doesn\'t add up ({} != {}).'.format(sum_history_bytes, sum_data_bytes))
+				#
+				import json
+				import gzip
+				with gzip.GzipFile('/tmp/group-{}.json.gz'.format(group), 'w') as f:
+					f.write(json.dumps({'id':group, 'history':combined_history, 'individual_histories':histories, 'size':len(in_group),
+					                    'avg_transferred':avg_data_size, 'avg_transfer_rate':avg_transfer_rate,
+					                    'total_transfer_rate':total_transfer_rate, 'custom_data':custom_data}, f).encode('utf-8'))
+				#
 			#
 		#
 	#

+ 258 - 0
tmp.patch

@@ -0,0 +1,258 @@
+diff --git a/src/basic_protocols.py b/src/basic_protocols.py
+index ba8b847..0dcecc8 100755
+--- a/src/basic_protocols.py
++++ b/src/basic_protocols.py
+@@ -123,26 +123,28 @@ class Socks4Protocol(Protocol):
+ 	#
+ #
+ class PushDataProtocol(Protocol):
+-	def __init__(self, socket, total_bytes, data_generator=None, send_max_bytes=1024*512, use_accelerated=True):
+-		if data_generator is None:
+-			data_generator = self._default_data_generator
++	def __init__(self, socket, total_bytes, send_buffer_len=None, use_acceleration=None):
++		if send_buffer_len is None:
++			send_buffer_len = 1024*512
++		#
++		if use_acceleration is None:
++			use_acceleration = True
+ 		#
+ 		self.socket = socket
+-		self.data_generator = data_generator
+ 		self.total_bytes = total_bytes
+-		self.send_max_bytes = send_max_bytes
+-		self.use_accelerated = use_accelerated
++		self.use_acceleration = use_acceleration
+ 		#
+ 		self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO PUSH_DATA RECV_CONFIRMATION DONE')
+ 		self.state = self.states.READY_TO_BEGIN
+ 		#
++		self.byte_buffer = os.urandom(send_buffer_len)
+ 		self.bytes_written = 0
+ 		self.protocol_helper = None
+ 	#
+ 	def _run_iteration(self, block=True):
+ 		if self.state is self.states.READY_TO_BEGIN:
+ 			info = self.total_bytes.to_bytes(8, byteorder='big', signed=False)
+-			info += self.send_max_bytes.to_bytes(8, byteorder='big', signed=False)
++			info += len(self.byte_buffer).to_bytes(8, byteorder='big', signed=False)
+ 			self.protocol_helper = ProtocolHelper()
+ 			self.protocol_helper.set_buffer(info)
+ 			self.state = self.states.SEND_INFO
+@@ -153,24 +155,28 @@ class PushDataProtocol(Protocol):
+ 			#
+ 		#
+ 		if self.state is self.states.PUSH_DATA:
+-			max_block_size = self.send_max_bytes
+-			block_size = min(max_block_size, self.total_bytes-self.bytes_written)
+-			data = self.data_generator(self.bytes_written, block_size)
+-			#
+-			if self.use_accelerated:
++			if self.use_acceleration:
+ 				if not block:
+ 					logging.warning('Protocol set to non-blocking, but using the blocking accelerated function.')
+ 				#
+-				ret_val = accelerated_functions.push_data(self.socket.fileno(), self.total_bytes, data)
++				ret_val = accelerated_functions.push_data(self.socket.fileno(), self.total_bytes, self.byte_buffer)
+ 				if ret_val < 0:
+ 					raise ProtocolException('Error while pushing data.')
+ 				#
+ 				self.bytes_written = self.total_bytes
+ 			else:
++				bytes_remaining = self.total_bytes-self.bytes_written
++				data_size = min(len(self.byte_buffer), bytes_remaining)
++				if data_size != len(self.byte_buffer):
++					data = self.byte_buffer[:data_size]
++				else:
++					data = self.byte_buffer
++					# don't make a copy of the byte string each time if we don't need to
++				#
+ 				n = self.socket.send(data)
+ 				self.bytes_written += n
+ 			#
+-			if self.bytes_written >= self.total_bytes:
++			if self.bytes_written == self.total_bytes:
+ 				# finished sending the data
+ 				logging.debug('Finished sending the data (%d bytes).', self.bytes_written)
+ 				self.protocol_helper = ProtocolHelper()
+@@ -190,20 +196,20 @@ class PushDataProtocol(Protocol):
+ 		#
+ 		return False
+ 	#
+-	def _default_data_generator(self, index, bytes_needed):
+-		return os.urandom(bytes_needed)
+-	#
+ #
+ class PullDataProtocol(Protocol):
+-	def __init__(self, socket, use_accelerated=True):
++	def __init__(self, socket, use_acceleration=None):
++		if use_acceleration is None:
++			use_acceleration = True
++		#
+ 		self.socket = socket
+-		self.use_accelerated = use_accelerated
++		self.use_acceleration = use_acceleration
+ 		#
+ 		self.states = enum.Enum('PULL_DATA_STATES', 'READY_TO_BEGIN RECV_INFO PULL_DATA SEND_CONFIRMATION DONE')
+ 		self.state = self.states.READY_TO_BEGIN
+ 		#
+ 		self.data_size = None
+-		self.recv_max_bytes = None
++		self.recv_buffer_len = None
+ 		self.bytes_read = 0
+ 		self.protocol_helper = None
+ 		self._time_of_first_byte = None
+@@ -219,27 +225,28 @@ class PullDataProtocol(Protocol):
+ 			if self.protocol_helper.recv(self.socket, info_size):
+ 				response = self.protocol_helper.get_buffer()
+ 				self.data_size = int.from_bytes(response[0:8], byteorder='big', signed=False)
+-				self.recv_max_bytes = int.from_bytes(response[8:16], byteorder='big', signed=False)
++				self.recv_buffer_len = int.from_bytes(response[8:16], byteorder='big', signed=False)
+ 				self.state = self.states.PULL_DATA
+ 			#
+ 		#
+ 		if self.state is self.states.PULL_DATA:
+-			max_block_size = self.recv_max_bytes
+-			block_size = min(max_block_size, self.data_size-self.bytes_read)
+-			#
+-			if self.use_accelerated:
++			if self.use_acceleration:
+ 				if not block:
+ 					logging.warning('Protocol set to non-blocking, but using the blocking accelerated function.')
+ 				#
+-				(ret_val, elapsed_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, block_size)
++				(ret_val, elapsed_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
+ 				if ret_val < 0:
+ 					raise ProtocolException('Error while pulling data.')
+ 				#
+ 				self.bytes_read = self.data_size
+ 				self.elapsed_time = elapsed_time
+ 			else:
++				bytes_remaining = self.data_size-self.bytes_read
++				block_size = min(self.recv_buffer_len, bytes_remaining)
++				#
+ 				data = self.socket.recv(block_size)
+ 				self.bytes_read += len(data)
++				#
+ 				if self.bytes_read != 0 and self._time_of_first_byte is None:
+ 					self._time_of_first_byte = time.time()
+ 				#
+diff --git a/src/throughput_client.py b/src/throughput_client.py
+index d45dffe..0be8289 100644
+--- a/src/throughput_client.py
++++ b/src/throughput_client.py
+@@ -17,6 +17,8 @@ if __name__ == '__main__':
+ 	parser.add_argument('--proxy', type=str, help='proxy ip address and port', metavar=('ip','port'), nargs=2)
+ 	parser.add_argument('--wait', type=int,
+ 	                    help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
++	parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
++	parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
+ 	args = parser.parse_args()
+ 	#
+ 	endpoint = (args.ip, args.port)
+@@ -27,7 +29,20 @@ if __name__ == '__main__':
+ 	#
+ 	username = bytes([x for x in os.urandom(12) if x != 0])
+ 	#username = None
++	'''
++	data_MB = 200 #20000
++	data_B = data_MB*2**20
+ 	#
+-	client = throughput_protocols.ClientProtocol(endpoint, args.num_bytes, proxy=proxy, username=username, wait_until=args.wait)
++	if len(sys.argv) > 2:
++		wait_until = int(sys.argv[2])
++	else:
++		wait_until = None
++	#
++	'''
++	#
++	client = throughput_protocols.ClientProtocol(endpoint, args.num_bytes, proxy=proxy,
++	                                             username=username, wait_until=args.wait,
++	                                             send_buffer_len=args.buffer_len,
++	                                             use_acceleration=(not args.no_accel))
+ 	client.run()
+ #
+diff --git a/src/throughput_protocols.py b/src/throughput_protocols.py
+index 5dec4b6..3eb3d60 100755
+--- a/src/throughput_protocols.py
++++ b/src/throughput_protocols.py
+@@ -7,13 +7,14 @@ import time
+ import socket
+ #
+ class ClientProtocol(basic_protocols.Protocol):
+-	def __init__(self, endpoint, total_bytes, data_generator=None, proxy=None, username=None, wait_until=None):
++	def __init__(self, endpoint, total_bytes, proxy=None, username=None, wait_until=None, send_buffer_len=None, use_acceleration=None):
+ 		self.endpoint = endpoint
+-		self.data_generator = data_generator
+ 		self.total_bytes = total_bytes
+ 		self.proxy = proxy
+ 		self.username = username
+ 		self.wait_until = wait_until
++		self.send_buffer_len = send_buffer_len
++		self.use_acceleration = use_acceleration
+ 		#
+ 		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY SEND_GROUP_ID PUSH_DATA DONE')
+ 		self.state = self.states.READY_TO_BEGIN
+@@ -47,7 +48,6 @@ class ClientProtocol(basic_protocols.Protocol):
+ 				group_id_bytes = self.group_id.to_bytes(8, byteorder='big', signed=False)
+ 				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, group_id_bytes)
+ 				self.state = self.states.SEND_GROUP_ID
+-				#logging.debug('Sent group ID.')
+ 			#
+ 		#
+ 		if self.state is self.states.SEND_GROUP_ID:
+@@ -56,8 +56,8 @@ class ClientProtocol(basic_protocols.Protocol):
+ 			#
+ 			if (self.wait_until is None or time.time() >= self.wait_until) and self.sub_protocol.run(block=block):
+ 				self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
+-				                                                     data_generator=self.data_generator,
+-				                                                     send_max_bytes=1024*512)
++				                                                     send_buffer_len=self.send_buffer_len,
++				                                                     use_acceleration=self.use_acceleration)
+ 				self.state = self.states.PUSH_DATA
+ 			#
+ 		#
+@@ -71,11 +71,12 @@ class ClientProtocol(basic_protocols.Protocol):
+ 	#
+ #
+ class ServerProtocol(basic_protocols.Protocol):
+-	def __init__(self, socket, conn_id, group_id_callback=None, bandwidth_callback=None):
++	def __init__(self, socket, conn_id, group_id_callback=None, bandwidth_callback=None, use_acceleration=None):
+ 		self.socket = socket
+ 		self.conn_id = conn_id
+ 		self.group_id_callback = group_id_callback
+ 		self.bandwidth_callback = bandwidth_callback
++		self.use_acceleration = use_acceleration
+ 		#
+ 		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID PULL_DATA DONE')
+ 		self.state = self.states.READY_TO_BEGIN
+@@ -95,7 +96,7 @@ class ServerProtocol(basic_protocols.Protocol):
+ 					group_id = None
+ 				#
+ 				self.group_id_callback(self.conn_id, group_id)
+-				self.sub_protocol = basic_protocols.PullDataProtocol(self.socket)
++				self.sub_protocol = basic_protocols.PullDataProtocol(self.socket, use_acceleration=self.use_acceleration)
+ 				self.state = self.states.PULL_DATA
+ 			#
+ 		#
+diff --git a/src/throughput_server.py b/src/throughput_server.py
+index a22ed8f..0217d14 100644
+--- a/src/throughput_server.py
++++ b/src/throughput_server.py
+@@ -13,6 +13,7 @@ if __name__ == '__main__':
+ 	#
+ 	parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
+ 	parser.add_argument('port', type=int, help='listen on port')
++	parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
+ 	args = parser.parse_args()
+ 	#
+ 	endpoint = ('127.0.0.1', args.port)
+@@ -34,7 +35,8 @@ if __name__ == '__main__':
+ 		bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'transfer_rate':transfer_rate})
+ 	#
+ 	def start_server_conn(socket, conn_id):
+-		server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback, bandwidth_callback=bw_callback)
++		server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback,
++		                                             bandwidth_callback=bw_callback, use_acceleration=(not args.no_accel))
+ 		try:
+ 			server.run()
+ 		except KeyboardInterrupt: