Browse Source

Added original code.

Steven Engler 5 years ago
parent
commit
0124fc1a56
2 changed files with 710 additions and 0 deletions
  1. 220 0
      src/bandwidth_tester.py
  2. 490 0
      src/basic_protocols.py

+ 220 - 0
src/bandwidth_tester.py

@@ -0,0 +1,220 @@
+#!/usr/bin/python3
+#
+import basic_protocols
+import logging
+import enum
+import time
+import socket
+#
+class ClientConnectionProtocol(basic_protocols.Protocol):
+	def __init__(self, endpoint, total_bytes, data_generator=None, proxy=None, username=None, wait_until=None):
+		self.endpoint = endpoint
+		self.data_generator = data_generator
+		self.total_bytes = total_bytes
+		self.proxy = proxy
+		self.username = username
+		self.wait_until = wait_until
+		#
+		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY SEND_GROUP_ID PUSH_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.socket = socket.socket()
+		self.sub_protocol = None
+		self.group_id = int(self.wait_until*1000) if self.wait_until is not None else 0
+		# a group id of 0 means no group
+		#
+		if self.proxy is None:
+			logging.debug('Socket %d connecting to endpoint %r...', self.socket.fileno(), self.endpoint)
+			self.socket.connect(self.endpoint)
+		else:
+			logging.debug('Socket %d connecting to proxy %r...', self.socket.fileno(), self.proxy)
+			self.socket.connect(self.proxy)
+		#
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			if self.proxy is None:
+				group_id_bytes = self.group_id.to_bytes(8, byteorder='big', signed=False)
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, group_id_bytes)
+				self.state = self.states.SEND_GROUP_ID
+			else:
+				self.sub_protocol = basic_protocols.Socks4Protocol(self.socket, self.endpoint, username=self.username)
+				self.state = self.states.CONNECT_TO_PROXY
+			#
+		#
+		if self.state is self.states.CONNECT_TO_PROXY:
+			if self.sub_protocol.run(block=block):
+				group_id_bytes = self.group_id.to_bytes(8, byteorder='big', signed=False)
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, group_id_bytes)
+				self.state = self.states.SEND_GROUP_ID
+				#logging.debug('Sent group ID.')
+			#
+		#
+		if self.state is self.states.SEND_GROUP_ID:
+			if block:
+				time.sleep(self.wait_until-time.time())
+			#
+			if time.time() >= self.wait_until and self.sub_protocol.run(block=block):
+				self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
+				                                                     data_generator=self.data_generator,
+				                                                     send_max_bytes=1024*512)
+				self.state = self.states.PUSH_DATA
+			#
+		#
+		if self.state is self.states.PUSH_DATA:
+			if self.sub_protocol.run(block=block):
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class ServerConnectionProtocol(basic_protocols.Protocol):
+	def __init__(self, socket, conn_id, group_id_callback=None, bandwidth_callback=None):
+		self.socket = socket
+		self.conn_id = conn_id
+		self.group_id_callback = group_id_callback
+		self.bandwidth_callback = bandwidth_callback
+		#
+		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID PULL_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.sub_protocol = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
+			self.state = self.states.RECV_GROUP_ID
+		#
+		if self.state is self.states.RECV_GROUP_ID:
+			if self.sub_protocol.run(block=block):
+				group_id = int.from_bytes(self.sub_protocol.received_data, byteorder='big', signed=False)
+				if group_id == 0:
+					# a group of 0 means no group
+					group_id = None
+				#
+				self.group_id_callback(self.conn_id, group_id)
+				self.sub_protocol = basic_protocols.PullDataProtocolWithMetrics(self.socket)
+				self.state = self.states.PULL_DATA
+			#
+		#
+		if self.state is self.states.PULL_DATA:
+			if self.sub_protocol.run(block=block):
+				self.state = self.states.DONE
+				if self.bandwidth_callback:
+					self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.calc_transfer_rate())
+				#
+				return True
+			#
+		#
+		return False
+	#
+#
+if __name__ == '__main__':
+	import sys
+	logging.basicConfig(level=logging.DEBUG)
+	#
+	if sys.argv[1] == 'client':
+		import os
+		#
+		endpoint = ('127.0.0.1', 4747)
+		#endpoint = ('127.0.0.1', 8627)
+		#proxy = ('127.0.0.1', 9003+int(sys.argv[3])-1)
+		#proxy = ('127.0.0.1', 9003)
+		proxy = None
+		username = bytes([x for x in os.urandom(12) if x != 0])
+		#username = None
+		data_MB = 500000
+		#
+		if len(sys.argv) > 2:
+			wait_until = int(sys.argv[2])
+		else:
+			wait_until = None
+		#
+		client = ClientConnectionProtocol(endpoint, data_MB*2**20, proxy=proxy, username=username, wait_until=wait_until)
+		client.run()
+		#
+	elif sys.argv[1] == 'server':
+		import multiprocessing
+		import queue
+		#
+		endpoint = ('127.0.0.1', 4747)
+		processes = []
+		conn_counter = [0]
+		group_queue = multiprocessing.Queue()
+		bw_queue = multiprocessing.Queue()
+		#
+		def group_id_callback(conn_id, group_id):
+			# put them in a queue to display later
+			#logging.debug('For conn %d Received group id: %d', conn_id, group_id)
+			group_queue.put({'conn_id':conn_id, 'group_id':group_id})
+		#
+		def bw_callback(conn_id, data_size, transfer_rate):
+			# put them in a queue to display later
+			bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'transfer_rate':transfer_rate})
+		#
+		def start_server_conn(socket, conn_id):
+			server = ServerConnectionProtocol(socket, conn_id, group_id_callback=group_id_callback, bandwidth_callback=bw_callback)
+			try:
+				server.run()
+			except KeyboardInterrupt:
+				socket.close()
+			#
+		#
+		def accept_callback(socket):
+			conn_id = conn_counter[0]
+			conn_counter[0] += 1
+			#logging.debug('Adding connection %d', conn_id)
+			p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
+			processes.append(p)
+			p.start()
+		#
+		l = basic_protocols.ServerListener(endpoint, accept_callback)
+		#
+		try:
+			while True:
+				l.accept()
+			#
+		except KeyboardInterrupt:
+			print()
+			#
+			bw_values = {}
+			group_values = {}
+			#
+			try:
+				while True:
+					bw_val = bw_queue.get(False)
+					bw_values[bw_val['conn_id']] = bw_val
+				#
+			except queue.Empty:
+				pass
+			#
+			try:
+				while True:
+					group_val = group_queue.get(False)
+					group_values[group_val['conn_id']] = group_val
+				#
+			except queue.Empty:
+				pass
+			#
+			group_set = set([x['group_id'] for x in group_values.values()])
+			for group in group_set:
+				# doesn't handle group == None
+				conns_in_group = [x[0] for x in group_values.items() if x[1]['group_id'] == group]
+				in_group = [x for x in bw_values.values() if x['conn_id'] in conns_in_group]
+				if len(in_group) > 0:
+					avg_data_size = sum([x['data_size'] for x in in_group])/len(in_group)
+					avg_transfer_rate = sum([x['transfer_rate'] for x in in_group])/len(in_group)
+					#
+					logging.info('Group size: %d', len(in_group))
+					logging.info('Avg Transferred (MB): %.4f', avg_data_size/(1024**2))
+					logging.info('Avg Transfer rate (MB/s): %.4f', avg_transfer_rate/(1024**2))
+				#
+			#
+		#
+		for p in processes:
+			p.join()
+		#
+	#
+#

+ 490 - 0
src/basic_protocols.py

@@ -0,0 +1,490 @@
+#!/usr/bin/python3
+#
+import socket
+import struct
+import logging
+import time
+import enum
+import select
+import os
+#
+class ProtocolException(Exception):
+    pass
+#
+class ProtocolHelper():
+	def __init__(self):
+		self._buffer = b''
+	#
+	def set_buffer(self, data):
+		self._buffer = data
+	#
+	def get_buffer(self):
+		return self._buffer
+	#
+	def recv(self, socket, num_bytes):
+		data = socket.recv(num_bytes-len(self._buffer))
+		self._buffer += data
+		if len(self._buffer) == num_bytes:
+			return True
+		#
+		return False
+	#
+	def send(self, socket):
+		n = socket.send(self._buffer)
+		self._buffer = self._buffer[n:]
+		if len(self._buffer) == 0:
+			return True
+		#
+		return False
+	#
+#
+class Protocol():
+	def _run_iteration(self, block=True):
+		pass
+	#
+	def run(self, block=True):
+		while True:
+			finished = self._run_iteration(block=block)
+			#
+			if finished:
+				# protocol is done
+				return True
+			elif not block:
+				# not done the protocol yet, but don't block
+				return False
+			#
+		#
+	#
+#
+class Socks4Protocol(Protocol):
+	def __init__(self, socket, addr_port, username=None):
+		self.socket = socket
+		self.addr_port = addr_port
+		self.username = username
+		#
+		self.states = enum.Enum('SOCKS_4_STATES', 'READY_TO_BEGIN CONNECTING_TO_PROXY WAITING_FOR_PROXY DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.protocol_helper = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.protocol_helper = ProtocolHelper()
+			self.protocol_helper.set_buffer(self.socks_cmd(self.addr_port, self.username))
+			self.state = self.states.CONNECTING_TO_PROXY
+		#
+		if self.state is self.states.CONNECTING_TO_PROXY:
+			if self.protocol_helper.send(self.socket):
+				self.protocol_helper = ProtocolHelper()
+				self.state = self.states.WAITING_FOR_PROXY
+				#logging.debug('Waiting for reply from proxy')
+			#
+		#
+		if self.state is self.states.WAITING_FOR_PROXY:
+			response_size = 8
+			if self.protocol_helper.recv(self.socket, response_size):
+				response = self.protocol_helper.get_buffer()
+				if response[1] != 0x5a:
+					raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
+				#
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+	def socks_cmd(self, addr_port, username=None):
+		socks_version = 4
+		command = 1
+		dnsname = b''
+		host, port = addr_port
+		#
+		try:
+			username = bytes(username, 'utf8')
+		except TypeError:
+			pass
+		#
+		if username is None:
+			username = b''
+		elif b'\x00' in username:
+			raise ProtocolException('Username cannot contain a NUL character.')
+		#
+		username = username+b'\x00'
+		#
+		try:
+			addr = socket.inet_aton(host)
+		except socket.error:
+			addr = b'\x00\x00\x00\x01'
+			dnsname = bytes(host, 'utf8')+b'\x00'
+		#
+		return struct.pack('!BBH', socks_version, command, port) + addr + username + dnsname
+	#
+#
+class PushDataProtocol(Protocol):
+	def __init__(self, socket, total_bytes, data_generator=None, send_max_bytes=1024*512):
+		if data_generator is None:
+			data_generator = self._default_data_generator
+		#
+		self.socket = socket
+		self.data_generator = data_generator
+		self.total_bytes = total_bytes
+		self.send_max_bytes = send_max_bytes
+		#
+		self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO PUSH_DATA RECV_CONFIRMATION DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.bytes_written = 0
+		self.protocol_helper = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			info = self.total_bytes.to_bytes(8, byteorder='big', signed=False)
+			info += self.send_max_bytes.to_bytes(8, byteorder='big', signed=False)
+			self.protocol_helper = ProtocolHelper()
+			self.protocol_helper.set_buffer(info)
+			self.state = self.states.SEND_INFO
+		#
+		if self.state is self.states.SEND_INFO:
+			if self.protocol_helper.send(self.socket):
+				self.state = self.states.PUSH_DATA
+			#
+		#
+		if self.state is self.states.PUSH_DATA:
+			max_block_size = self.send_max_bytes
+			bytes_needed = min(max_block_size, self.total_bytes-self.bytes_written)
+			data = self.data_generator(self.bytes_written, bytes_needed)
+			n = self.socket.send(data)
+			self.bytes_written += n
+			if self.bytes_written >= self.total_bytes:
+				# finished sending the data
+				logging.debug('Finished sending the data (%d bytes).', self.bytes_written)
+				self.protocol_helper = ProtocolHelper()
+				self.state = self.states.RECV_CONFIRMATION
+			#
+		#
+		if self.state is self.states.RECV_CONFIRMATION:
+			response_size = 8
+			if self.protocol_helper.recv(self.socket, response_size):
+				response = self.protocol_helper.get_buffer()
+				if response != b'RECEIVED':
+					raise ProtocolException('Did not receive the expected message: {}'.format(response))
+				#
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+	def _default_data_generator(self, index, bytes_needed):
+		return b'0'*bytes_needed
+	#
+#
+class PullDataProtocol(Protocol):
+	def __init__(self, socket):
+		self.socket = socket
+		#
+		self.states = enum.Enum('PULL_DATA_STATES', 'READY_TO_BEGIN RECV_INFO PULL_DATA SEND_CONFIRMATION DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.data_size = None
+		self.recv_max_bytes = None
+		self.bytes_read = 0
+		self.protocol_helper = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.protocol_helper = ProtocolHelper()
+			self.state = self.states.RECV_INFO
+		#
+		if self.state is self.states.RECV_INFO:
+			info_size = 16
+			if self.protocol_helper.recv(self.socket, info_size):
+				response = self.protocol_helper.get_buffer()
+				self.data_size = int.from_bytes(response[0:8], byteorder='big', signed=False)
+				self.recv_max_bytes = int.from_bytes(response[8:16], byteorder='big', signed=False)
+				self.state = self.states.PULL_DATA
+			#
+		#
+		if self.state is self.states.PULL_DATA:
+			max_block_size = self.recv_max_bytes
+			bytes_needed = min(max_block_size, self.data_size-self.bytes_read)	
+			data = self.socket.recv(bytes_needed)
+			self.bytes_read += len(data)
+			#logging.debug('Read %d bytes', self.bytes_read)
+			if self.bytes_read == self.data_size:
+				# finished receiving the data
+				logging.debug('Finished receiving the data.')
+				self.protocol_helper = ProtocolHelper()
+				self.protocol_helper.set_buffer(b'RECEIVED')
+				self.state = self.states.SEND_CONFIRMATION
+			#
+		#
+		if self.state is self.states.SEND_CONFIRMATION:
+			if self.protocol_helper.send(self.socket):
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class PullDataProtocolWithMetrics(PullDataProtocol):
+	def __init__(self, *args, **kwargs):
+		super().__init__(*args, **kwargs)
+		#
+		self.time_of_first_byte = None
+		self.time_of_last_byte = None
+	#
+	def _run_iteration(self, *args, **kwargs):
+		data = super()._run_iteration(*args, **kwargs)
+		#
+		if self.bytes_read != 0 and self.time_of_first_byte is None:
+			self.time_of_first_byte = time.time()
+		#
+		if self.bytes_read == self.data_size and self.time_of_last_byte is None:
+			self.time_of_last_byte = time.time()
+		#
+		return data
+	#
+	def calc_transfer_rate(self):
+		""" Returns bytes/s. """
+		assert self.data_size is not None and self.time_of_first_byte is not None and self.time_of_last_byte is not None
+		return self.data_size/(self.time_of_last_byte-self.time_of_first_byte)
+	#
+#
+class SendDataProtocol(Protocol):
+	def __init__(self, socket, data):
+		self.socket = socket
+		self.send_data = data
+		#
+		self.states = enum.Enum('SEND_DATA_STATES', 'READY_TO_BEGIN SEND_INFO SEND_DATA RECV_CONFIRMATION DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.protocol_helper = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			info_size = 20
+			info = len(self.send_data).to_bytes(info_size, byteorder='big', signed=False)
+			self.protocol_helper = ProtocolHelper()
+			self.protocol_helper.set_buffer(info)
+			self.state = self.states.SEND_INFO
+		#
+		if self.state is self.states.SEND_INFO:
+			if self.protocol_helper.send(self.socket):
+				self.protocol_helper = ProtocolHelper()
+				self.protocol_helper.set_buffer(self.send_data)
+				self.state = self.states.SEND_DATA
+			#
+		#
+		if self.state is self.states.SEND_DATA:
+			if self.protocol_helper.send(self.socket):
+				self.protocol_helper = ProtocolHelper()
+				self.state = self.states.RECV_CONFIRMATION
+			#
+		#
+		if self.state is self.states.RECV_CONFIRMATION:
+			response_size = 8
+			if self.protocol_helper.recv(self.socket, response_size):
+				response = self.protocol_helper.get_buffer()
+				if response != b'RECEIVED':
+					raise ProtocolException('Did not receive the expected message: {}'.format(response))
+				#
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class ReceiveDataProtocol(Protocol):
+	def __init__(self, socket):
+		self.socket = socket
+		#
+		self.states = enum.Enum('RECV_DATA_STATES', 'READY_TO_BEGIN RECV_INFO RECV_DATA SEND_CONFIRMATION DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.protocol_helper = None
+		self.data_size = None
+		self.received_data = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.protocol_helper = ProtocolHelper()
+			self.state = self.states.RECV_INFO
+		#
+		if self.state is self.states.RECV_INFO:
+			info_size = 20
+			if self.protocol_helper.recv(self.socket, info_size):
+				response = self.protocol_helper.get_buffer()
+				self.data_size = int.from_bytes(response, byteorder='big', signed=False)
+				self.protocol_helper = ProtocolHelper()
+				self.state = self.states.RECV_DATA
+			#
+		#
+		if self.state is self.states.RECV_DATA:
+			if self.protocol_helper.recv(self.socket, self.data_size):
+				response = self.protocol_helper.get_buffer()
+				self.received_data = response
+				self.protocol_helper = ProtocolHelper()
+				self.protocol_helper.set_buffer(b'RECEIVED')
+				self.state = self.states.SEND_CONFIRMATION
+			#
+		#
+		if self.state is self.states.SEND_CONFIRMATION:
+			if self.protocol_helper.send(self.socket):
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class ServerListener():
+	"A TCP listener, binding, listening and accepting new connections."
+	def __init__(self, endpoint, accept_callback):
+		self.callback = accept_callback
+		#
+		self.s = socket.socket()
+		self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+		self.s.bind(endpoint)
+		self.s.listen(0)
+	#
+	def accept(self):
+		newsock, endpoint = self.s.accept()
+		logging.debug("New client from %s:%d (fd=%d)",
+					  endpoint[0], endpoint[1], newsock.fileno())
+		self.callback(newsock)
+	#
+#
+class SimpleClientConnectionProtocol(Protocol):
+	def __init__(self, endpoint, total_bytes, data_generator=None, proxy=None, username=None):
+		self.endpoint = endpoint
+		self.data_generator = data_generator
+		self.total_bytes = total_bytes
+		self.proxy = proxy
+		self.username = username
+		#
+		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY  PUSH_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.socket = socket.socket()
+		self.sub_protocol = None
+		#
+		if self.proxy is None:
+			logging.debug('Socket %d connecting to endpoint %r...', self.socket.fileno(), self.endpoint)
+			self.socket.connect(self.endpoint)
+		else:
+			logging.debug('Socket %d connecting to proxy %r...', self.socket.fileno(), self.proxy)
+			self.socket.connect(self.proxy)
+		#
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			if self.proxy is None:
+				self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
+				self.state = self.states.PUSH_DATA
+			else:
+				self.sub_protocol = Socks4Protocol(self.socket, self.endpoint, username=self.username)
+				self.state = self.states.CONNECT_TO_PROXY
+			#
+		#
+		if self.state is self.states.CONNECT_TO_PROXY:
+			if self.sub_protocol.run(block=block):
+				self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
+				self.state = self.states.PUSH_DATA
+			#
+		#
+		if self.state is self.states.PUSH_DATA:
+			if self.sub_protocol.run(block=block):
+				self.state = self.states.DONE
+				return True
+			#
+		#
+		return False
+	#
+#
+class SimpleServerConnectionProtocol(Protocol):
+	def __init__(self, socket, conn_id, bandwidth_callback=None):
+		self.socket = socket
+		self.conn_id = conn_id
+		self.bandwidth_callback = bandwidth_callback
+		#
+		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN PULL_DATA DONE')
+		self.state = self.states.READY_TO_BEGIN
+		#
+		self.sub_protocol = None
+	#
+	def _run_iteration(self, block=True):
+		if self.state is self.states.READY_TO_BEGIN:
+			self.sub_protocol = PullDataProtocolWithMetrics(self.socket)
+			self.state = self.states.PULL_DATA
+		#
+		if self.state is self.states.PULL_DATA:
+			if self.sub_protocol.run(block=block):
+				self.state = self.states.DONE
+				if self.bandwidth_callback:
+					self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.calc_transfer_rate())
+				#
+				return True
+			#
+		#
+		return False
+	#
+#
+if __name__ == '__main__':
+	import sys
+	logging.basicConfig(level=logging.DEBUG)
+	#
+	if sys.argv[1] == 'client':
+		endpoint = ('127.0.0.1', 4747)
+		proxy = ('127.0.0.1', 9003)
+		#proxy = None
+		username = bytes([x for x in os.urandom(12) if x != 0])
+		#username = None
+		data_MB = 40
+		#
+		client = SimpleClientConnectionProtocol(endpoint, data_MB*2**20, proxy=proxy, username=username)
+		client.run()
+	elif sys.argv[1] == 'server':
+		import multiprocessing
+		import queue
+		#
+		endpoint = ('127.0.0.1', 4747)
+		processes = []
+		conn_counter = [0]
+		#
+		def bw_callback(conn_id, data_size, transfer_rate):
+			logging.info('Avg Transferred (MB): %.4f', data_size/(1024**2))
+			logging.info('Avg Transfer rate (MB/s): %.4f', transfer_rate/(1024**2))
+		#
+		def start_server_conn(socket, conn_id):
+			server = SimpleServerConnectionProtocol(socket, conn_id, bandwidth_callback=bw_callback)
+			try:
+				server.run()
+			except KeyboardInterrupt:
+				socket.close()
+			#
+		#
+		def accept_callback(socket):
+			conn_id = conn_counter[0]
+			conn_counter[0] += 1
+			#
+			p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
+			processes.append(p)
+			p.start()
+		#
+		l = ServerListener(endpoint, accept_callback)
+		#
+		try:
+			while True:
+				l.accept()
+			#
+		except KeyboardInterrupt:
+			print()
+		#
+		for p in processes:
+			p.join()
+		#
+	#
+#