throughput_server.new.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #!/usr/bin/python3
  2. #
  3. import throughput_protocols
  4. import basic_protocols
  5. import os
  6. import multiprocessing
  7. import threading
  8. import queue
  9. import logging
  10. import argparse
  11. #
  12. class QueueGetter:
  13. def __init__(self, queue, unqueued):
  14. self.queue = queue
  15. self.unqueued = unqueued
  16. #
  17. self.t = threading.Thread(target=self._unqueue)
  18. self.t.start()
  19. #
  20. def _unqueue(self):
  21. while True:
  22. val = self.queue.get()
  23. if val is None:
  24. break
  25. #
  26. self.unqueued.append(val)
  27. #
  28. #
  29. def stop(self):
  30. self.queue.put(None)
  31. #
  32. def join(self):
  33. self.t.join()
  34. #
  35. #
  36. class ThroughputServer:
  37. def __init__(self, bind_endpoint, stop_event=None):
  38. self.bind = bind_endpoint
  39. self.stop_event = stop_event
  40. #
  41. self.server_listener = basic_protocols.ServerListener(bind_endpoint, self._accept_callback)
  42. #
  43. self.processes = []
  44. self.process_counter = 0
  45. #
  46. self.results_queue = multiprocessing.Queue()
  47. self.results = []
  48. self.results_getter = QueueGetter(self.results_queue, self.results)
  49. #
  50. if self.stop_event is not None:
  51. self.event_thread = threading.Thread(target=self._wait_for_event, args=(self.stop_event, self._stop))
  52. self.event_thread.start()
  53. else:
  54. self.event_thread = None
  55. #
  56. #
  57. def _accept_callback(self, socket):
  58. conn_id = self.process_counter
  59. self.process_counter += 1
  60. #
  61. p = multiprocessing.Process(target=self._start_server_conn, args=(socket, conn_id))
  62. self.processes.append(p)
  63. p.start()
  64. #
  65. # close this process' copy of the socket
  66. socket.close()
  67. #
  68. def _start_server_conn(self, socket, conn_id):
  69. results_callback = lambda results: self.results_queue.put({'conn_id':conn_id, 'results':results})
  70. protocol = throughput_protocols.ServerProtocol(socket, results_callback=results_callback,
  71. use_acceleration=True)
  72. try:
  73. protocol.run()
  74. finally:
  75. socket.close()
  76. #
  77. #
  78. def _wait_for_event(self, event, callback):
  79. event.wait()
  80. callback()
  81. #
  82. def _stop(self):
  83. self.server_listener.stop()
  84. #
  85. def run(self):
  86. try:
  87. while True:
  88. self.server_listener.accept()
  89. #
  90. except OSError as e:
  91. if e.errno == 22 and self.stop_event is not None and self.stop_event.is_set():
  92. # we closed the socket on purpose
  93. pass
  94. else:
  95. raise
  96. #
  97. finally:
  98. if self.stop_event is not None:
  99. # set the event to stop the thread
  100. self.stop_event.set()
  101. #
  102. if self.event_thread is not None:
  103. # make sure the event thread is stopped
  104. self.event_thread.join()
  105. #
  106. for p in self.processes:
  107. # wait for all processes to finish
  108. p.join()
  109. #
  110. # finish reading from the results queue
  111. self.results_getter.stop()
  112. self.results_getter.join()
  113. #
  114. #
  115. #
  116. if __name__ == '__main__':
  117. logging.basicConfig(level=logging.DEBUG)
  118. #
  119. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  120. parser.add_argument('port', type=int, help='listen on port')
  121. parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0')
  122. args = parser.parse_args()
  123. #
  124. if args.localhost:
  125. bind_to = ('127.0.0.1', args.port)
  126. else:
  127. bind_to = ('0.0.0.0', args.port)
  128. #
  129. stop_event = multiprocessing.Event()
  130. server = ThroughputServer(bind_to, None)
  131. try:
  132. server.run()
  133. except KeyboardInterrupt:
  134. print('')
  135. logging.debug('Server stopped (KeyboardInterrupt).')
  136. #
  137. #t = threading.Thread(target=server.run)
  138. #t.start()
  139. #try:
  140. # t.join()
  141. #except KeyboardInterrupt:
  142. # print('')
  143. # stop_event.set()
  144. #
  145. #p = multiprocessing.Process(target=server.run)
  146. #p.start()
  147. #try:
  148. # p.join()
  149. #except KeyboardInterrupt:
  150. # print('')
  151. # stop_event.set()
  152. #
  153. results = server.results
  154. #
  155. group_ids = list(set([x['results']['custom_data'] for x in results]))
  156. groups = [(g, [r['results'] for r in results if r['results']['custom_data'] == g]) for g in group_ids]
  157. #
  158. for (group_id, group) in groups:
  159. avg_data_size = sum([x['data_size'] for x in group])/len(group)
  160. avg_transfer_rate = sum([x['transfer_rate'] for x in group])/len(group)
  161. time_of_first_byte = min([x['time_of_first_byte'] for x in group])
  162. time_of_last_byte = max([x['time_of_last_byte'] for x in group])
  163. total_transfer_rate = sum([x['data_size'] for x in group])/(time_of_last_byte-time_of_first_byte)
  164. #
  165. logging.info('Group id: %s', int.from_bytes(group_id, byteorder='big') if len(group_id)!=0 else None)
  166. logging.info(' Group size: %d', len(group))
  167. logging.info(' Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
  168. logging.info(' Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
  169. logging.info(' Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
  170. #
  171. #