Browse Source

Added total throughput calculation when using multiple clients.

Steven Engler 5 years ago
parent
commit
b57dd27683
4 changed files with 23 additions and 18 deletions
  1. 7 5
      src/accelerated_functions.c
  2. 11 10
      src/basic_protocols.py
  3. 1 1
      src/throughput_protocols.py
  4. 4 2
      src/throughput_server.py

+ 7 - 5
src/accelerated_functions.c

@@ -76,7 +76,7 @@ int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
 	return 0;
 }
 //
-int pull_data(int socket, long bytes_total, int buffer_len, double* time_ptr){
+int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr, double* time_last_ptr){
 	long bytes_read = 0;
 	char* buffer = malloc(buffer_len);
 	struct timeval time_of_first_byte, time_of_last_byte;
@@ -131,7 +131,8 @@ int pull_data(int socket, long bytes_total, int buffer_len, double* time_ptr){
 	}
 	//
 	gettimeofday(&time_of_last_byte, NULL);
-	*time_ptr = (time_of_last_byte.tv_sec-time_of_first_byte.tv_sec) + (time_of_last_byte.tv_usec-time_of_first_byte.tv_usec)/(1000.0*1000.0);
+	*time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0);
+	*time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0);
 	//
 	free(buffer);
 	return 0;
@@ -170,15 +171,16 @@ static PyObject *py_pull_data(PyObject *self, PyObject *args){
 		return NULL;
 	}
 	//
-	double elapsed_time = 0;
+	double time_of_first_byte = 0;
+	double time_of_last_byte = 0;
 	int ret_val;
 	Py_BEGIN_ALLOW_THREADS
 	// GIL is unlocked, but don't do expensive operations in
 	// other threads or it might slow this one down
-	ret_val = pull_data(socket, bytes_total, buffer_len, &elapsed_time);
+	ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte);
 	Py_END_ALLOW_THREADS
 	//
-	PyObject* py_ret_val = Py_BuildValue("(id)", ret_val, elapsed_time);
+	PyObject* py_ret_val = Py_BuildValue("(idd)", ret_val, time_of_first_byte, time_of_last_byte);
 	//
 	return py_ret_val;
 }

+ 11 - 10
src/basic_protocols.py

@@ -258,8 +258,8 @@ class PullDataProtocol(Protocol):
 		self.recv_buffer_len = None
 		self.bytes_read = 0
 		self.protocol_helper = None
-		self._time_of_first_byte = None
-		self.elapsed_time = None
+		self.time_of_first_byte = None
+		self.time_of_last_byte = None
 	#
 	def _run_iteration(self):
 		if self.state is self.states.READY_TO_BEGIN:
@@ -277,12 +277,13 @@ class PullDataProtocol(Protocol):
 		#
 		if self.state is self.states.PULL_DATA:
 			if self.use_acceleration:
-				(ret_val, elapsed_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
+				(ret_val, time_of_first_byte, time_of_last_byte) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
 				if ret_val < 0:
 					raise ProtocolException('Error while pulling data.')
 				#
 				self.bytes_read = self.data_size
-				self.elapsed_time = elapsed_time
+				self.time_of_first_byte = time_of_first_byte
+				self.time_of_last_byte = time_of_last_byte
 			else:
 				bytes_remaining = self.data_size-self.bytes_read
 				block_size = min(self.recv_buffer_len, bytes_remaining)
@@ -294,11 +295,11 @@ class PullDataProtocol(Protocol):
 				#
 				self.bytes_read += len(data)
 				#
-				if self.bytes_read != 0 and self._time_of_first_byte is None:
-					self._time_of_first_byte = time.time()
+				if self.bytes_read != 0 and self.time_of_first_byte is None:
+					self.time_of_first_byte = time.time()
 				#
-				if self.bytes_read == self.data_size and self.elapsed_time is None:
-					self.elapsed_time = time.time()-self._time_of_first_byte
+				if self.bytes_read == self.data_size and self.time_of_last_byte is None:
+					self.time_of_last_byte = time.time()
 				#
 			#
 			if self.bytes_read == self.data_size:
@@ -321,8 +322,8 @@ class PullDataProtocol(Protocol):
 	#
 	def calc_transfer_rate(self):
 		""" Returns bytes/s. """
-		assert self.data_size is not None and self.elapsed_time is not None
-		return self.data_size/self.elapsed_time
+		assert self.data_size is not None and self.time_of_first_byte is not None and self.time_of_last_byte is not None
+		return self.data_size/(self.time_of_last_byte-self.time_of_first_byte)
 	#
 #
 class SendDataProtocol(Protocol):

+ 1 - 1
src/throughput_protocols.py

@@ -87,7 +87,7 @@ class ServerProtocol(basic_protocols.Protocol):
 		if self.state is self.states.PULL_DATA:
 			if self.sub_protocol.run():
 				if self.bandwidth_callback:
-					self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.calc_transfer_rate())
+					self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate())
 				#
 				self.state = self.states.DONE
 			#

+ 4 - 2
src/throughput_server.py

@@ -30,9 +30,9 @@ if __name__ == '__main__':
 		#logging.debug('For conn %d Received group id: %d', conn_id, group_id)
 		group_queue.put({'conn_id':conn_id, 'group_id':group_id})
 	#
-	def bw_callback(conn_id, data_size, transfer_rate):
+	def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate):
 		# put them in a queue to display later
-		bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'transfer_rate':transfer_rate})
+		bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate})
 	#
 	def start_server_conn(socket, conn_id):
 		server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback,
@@ -101,10 +101,12 @@ if __name__ == '__main__':
 			if len(in_group) > 0:
 				avg_data_size = sum([x['data_size'] for x in in_group])/len(in_group)
 				avg_transfer_rate = sum([x['transfer_rate'] for x in in_group])/len(in_group)
+				total_transfer_rate = sum([x['data_size'] for x in in_group])/(max([x['time_of_last_byte'] for x in in_group])-min([x['time_of_first_byte'] for x in in_group]))
 				#
 				logging.info('Group size: %d', len(in_group))
 				logging.info('Avg Transferred (MB): %.4f', avg_data_size/(1024**2))
 				logging.info('Avg Transfer rate (MB/s): %.4f', avg_transfer_rate/(1024**2))
+				logging.info('Total Transfer rate (MB/s): %.4f', total_transfer_rate/(1024**2))
 			#
 		#
 	#