Browse Source

Better measurements for start of streams

Steven Engler 4 years ago
parent
commit
8149148bab
3 changed files with 32 additions and 14 deletions
  1. 2 0
      src/basic_protocols.py
  2. 2 1
      src/experiment.py
  3. 28 13
      src/throughput_protocols.py

+ 2 - 0
src/basic_protocols.py

@@ -225,6 +225,7 @@ class PushDataProtocol(Protocol):
 		#
 		self.byte_buffer = os.urandom(send_buffer_len)
 		self.bytes_written = 0
+		self.time_started_push = None
 		self.protocol_helper = None
 	#
 	def _run_iteration(self):
@@ -245,6 +246,7 @@ class PushDataProtocol(Protocol):
 				self.push_start_cb()
 			#
 			self.state = self.states.PUSH_DATA
+			self.time_started_push = time.time()
 		#
 		if self.state is self.states.PUSH_DATA:
 			if self.use_acceleration:

+ 2 - 1
src/experiment.py

@@ -211,6 +211,7 @@ class Experiment:
 			to_add['data_size'] = r['data_size']
 			to_add['measured_data_size'] = int(np.sum(r['deltas']['bytes']))
 			to_add['custom_data'] = json.loads(r['custom_data'].decode('utf-8'))
+			to_add['time_started_push'] = r['time_started_push']
 			results_brief.append(to_add)
 		#
 		num_expected_results = len(self.proxy_control_ports)*self.num_streams_per_client
@@ -341,8 +342,8 @@ class Experiment:
 				#
 			#
 			time.sleep(2)
-			start_event.set()
 			client_info['start_time'] = time.time()
+			start_event.set()
 			#
 			# unfortunately mixing threads and processes can cause python to deadlock, and some client protocols
 			# have been found to deadlock at about 1/1000 probability, so we must provide a timeout to kill

+ 28 - 13
src/throughput_protocols.py

@@ -5,6 +5,7 @@ import logging
 import enum
 import time
 import socket
+import struct
 #
 class ClientProtocol(basic_protocols.Protocol):
 	def __init__(self, socket, total_bytes, send_buffer_len=None, use_acceleration=None, custom_data=b'', push_start_cb=None, push_done_cb=None):
@@ -16,7 +17,7 @@ class ClientProtocol(basic_protocols.Protocol):
 		self.push_start_cb = push_start_cb
 		self.push_done_cb = push_done_cb
 		#
-		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_CUSTOM_DATA PUSH_DATA DONE')
+		self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_CUSTOM_DATA PUSH_DATA SEND_RESULTS DONE')
 		self.state = self.states.READY_TO_BEGIN
 		#
 		self.sub_protocol = None
@@ -37,6 +38,13 @@ class ClientProtocol(basic_protocols.Protocol):
 			#
 		#
 		if self.state is self.states.PUSH_DATA:
+			if self.sub_protocol.run():
+				self.sub_protocol = basic_protocols.SendDataProtocol(self.socket,
+				                                                     struct.pack('d', self.sub_protocol.time_started_push))
+				self.state = self.states.SEND_RESULTS
+			#
+		#
+		if self.state is self.states.SEND_RESULTS:
 			if self.sub_protocol.run():
 				self.state = self.states.DONE
 			#
@@ -53,11 +61,11 @@ class ServerProtocol(basic_protocols.Protocol):
 		self.results_callback = results_callback
 		self.use_acceleration = use_acceleration
 		#
-		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_CUSTOM_DATA PULL_DATA DONE')
+		self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_CUSTOM_DATA PULL_DATA RECV_RESULTS DONE')
 		self.state = self.states.READY_TO_BEGIN
 		#
 		self.sub_protocol = None
-		self.custom_data = None
+		self.results = {}
 	#
 	def _run_iteration(self):
 		if self.state is self.states.READY_TO_BEGIN:
@@ -66,7 +74,7 @@ class ServerProtocol(basic_protocols.Protocol):
 		#
 		if self.state is self.states.RECV_CUSTOM_DATA:
 			if self.sub_protocol.run():
-				self.custom_data = self.sub_protocol.received_data
+				self.results['custom_data'] = self.sub_protocol.received_data
 				#
 				self.sub_protocol = basic_protocols.PullDataProtocol(self.socket, use_acceleration=self.use_acceleration)
 				self.state = self.states.PULL_DATA
@@ -74,16 +82,23 @@ class ServerProtocol(basic_protocols.Protocol):
 		#
 		if self.state is self.states.PULL_DATA:
 			if self.sub_protocol.run():
+				self.results['data_size'] = self.sub_protocol.data_size
+				self.results['time_of_first_byte'] = self.sub_protocol.time_of_first_byte
+				self.results['time_of_last_byte'] = self.sub_protocol.time_of_last_byte
+				self.results['transfer_rate'] = self.sub_protocol.calc_transfer_rate()
+				self.results['deltas'] = self.sub_protocol.deltas
+				#
+				self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
+				self.state = self.states.RECV_RESULTS
+			#
+		#
+		if self.state is self.states.RECV_RESULTS:
+			if self.sub_protocol.run():
+				time_started_push = struct.unpack('d', self.sub_protocol.received_data)[0]
+				self.results['time_started_push'] = time_started_push
+				#
 				if self.results_callback:
-					results = {}
-					results['custom_data'] = self.custom_data
-					results['data_size'] = self.sub_protocol.data_size
-					results['time_of_first_byte'] = self.sub_protocol.time_of_first_byte
-					results['time_of_last_byte'] = self.sub_protocol.time_of_last_byte
-					results['transfer_rate'] = self.sub_protocol.calc_transfer_rate()
-					results['deltas'] = self.sub_protocol.deltas
-					#
-					self.results_callback(results)
+					self.results_callback(self.results)
 				#
 				self.state = self.states.DONE
 			#