throughput_server.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/python3
  2. #
  3. import throughput_protocols
  4. import basic_protocols
  5. import os
  6. import multiprocessing
  7. import threading
  8. import queue
  9. import logging
  10. import argparse
  11. import useful
  12. #
  13. class ThroughputServer:
  14. def __init__(self, bind_endpoint, stop_event=None):
  15. self.bind = bind_endpoint
  16. self.stop_event = stop_event
  17. #
  18. self.server_listener = basic_protocols.ServerListener(bind_endpoint, self._accept_callback)
  19. #
  20. self.accepted_pending = []
  21. self.processes = []
  22. self.process_counter = 0
  23. #
  24. self.results_queue = multiprocessing.Queue()
  25. self.results = []
  26. self.results_getter = useful.QueueGetter(self.results_queue, self.results.append)
  27. #
  28. if self.stop_event is not None:
  29. self.event_thread = threading.Thread(target=self._wait_for_event, args=(self.stop_event, self._stop))
  30. self.event_thread.start()
  31. else:
  32. self.event_thread = None
  33. #
  34. #
  35. def _accept_callback(self, socket):
  36. conn_id = self.process_counter
  37. self.process_counter += 1
  38. logging.info('Server protocol id: {} accepted'.format(conn_id))
  39. #
  40. self.accepted_pending.append((socket, conn_id))
  41. #
  42. def _start_new_process(self, socket, conn_id):
  43. p = multiprocessing.Process(target=self._start_server_conn, args=(socket, conn_id))
  44. self.processes.append(p)
  45. p.start()
  46. #
  47. # close this process' copy of the socket
  48. socket.close()
  49. #
  50. def _start_server_conn(self, socket, conn_id):
  51. results_callback = lambda results: self.results_queue.put({'conn_id':conn_id, 'results':results})
  52. protocol = throughput_protocols.ServerProtocol(socket, results_callback=results_callback,
  53. use_acceleration=True)
  54. try:
  55. logging.info('Server protocol id: {} starting'.format(conn_id))
  56. protocol.run()
  57. except KeyboardInterrupt:
  58. logging.info('Server protocol id: {} stopped (KeyboardInterrupt)'.format(conn_id))
  59. except:
  60. logging.exception('Server protocol id: {} had an error'.format(conn_id))
  61. finally:
  62. logging.info('Server protocol id: {} done'.format(conn_id))
  63. socket.close()
  64. #
  65. #
  66. def _wait_for_event(self, event, callback):
  67. event.wait()
  68. callback()
  69. #
  70. def _stop(self):
  71. self.server_listener.stop()
  72. #
  73. def run(self):
  74. try:
  75. while True:
  76. block = True
  77. if len(self.accepted_pending) > 0:
  78. block = False
  79. #
  80. accepted_something = self.server_listener.accept(False)
  81. if not accepted_something and len(self.accepted_pending) > 0:
  82. # didn't accept anything and we have things to do
  83. (socket, conn_id) = self.accepted_pending.pop(0)
  84. self._start_new_process(socket, conn_id)
  85. #
  86. #
  87. except OSError as e:
  88. if e.errno == 22 and self.stop_event is not None and self.stop_event.is_set():
  89. # we closed the socket on purpose
  90. pass
  91. else:
  92. raise
  93. #
  94. finally:
  95. if self.stop_event is not None:
  96. # set the event to stop the thread
  97. self.stop_event.set()
  98. #
  99. if self.event_thread is not None:
  100. # make sure the event thread is stopped
  101. self.event_thread.join()
  102. #
  103. for p in self.processes:
  104. # wait for all processes to finish
  105. p.join()
  106. #
  107. # finish reading from the results queue
  108. self.results_getter.stop()
  109. self.results_getter.join()
  110. #
  111. #
  112. #
  113. if __name__ == '__main__':
  114. logging.basicConfig(level=logging.DEBUG)
  115. #
  116. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  117. parser.add_argument('port', type=int, help='listen on port')
  118. parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0')
  119. args = parser.parse_args()
  120. #
  121. if args.localhost:
  122. bind_to = ('127.0.0.1', args.port)
  123. else:
  124. bind_to = ('0.0.0.0', args.port)
  125. #
  126. #stop_event = multiprocessing.Event()
  127. server = ThroughputServer(bind_to, None)
  128. try:
  129. server.run()
  130. except KeyboardInterrupt:
  131. print('')
  132. logging.debug('Server stopped (KeyboardInterrupt).')
  133. #
  134. #t = threading.Thread(target=server.run)
  135. #t.start()
  136. #try:
  137. # t.join()
  138. #except KeyboardInterrupt:
  139. # print('')
  140. # stop_event.set()
  141. #
  142. #p = multiprocessing.Process(target=server.run)
  143. #p.start()
  144. #try:
  145. # p.join()
  146. #except KeyboardInterrupt:
  147. # print('')
  148. # stop_event.set()
  149. #
  150. results = server.results
  151. #
  152. group_ids = list(set([x['results']['custom_data'] for x in results]))
  153. groups = [(g, [r['results'] for r in results if r['results']['custom_data'] == g]) for g in group_ids]
  154. #
  155. for (group_id, group) in groups:
  156. avg_data_size = sum([x['data_size'] for x in group])/len(group)
  157. avg_transfer_rate = sum([x['transfer_rate'] for x in group])/len(group)
  158. time_of_first_byte = min([x['time_of_first_byte'] for x in group])
  159. time_of_last_byte = max([x['time_of_last_byte'] for x in group])
  160. total_transfer_rate = sum([x['data_size'] for x in group])/(time_of_last_byte-time_of_first_byte)
  161. #
  162. logging.info('Group id: %s', int.from_bytes(group_id, byteorder='big') if len(group_id)!=0 else None)
  163. logging.info(' Group size: %d', len(group))
  164. logging.info(' Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
  165. logging.info(' Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
  166. logging.info(' Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
  167. #
  168. #