throughput_server.old.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #!/usr/bin/python3
  2. #
  3. import throughput_protocols
  4. import basic_protocols
  5. import os
  6. import multiprocessing
  7. import threading
  8. import queue
  9. import logging
  10. import argparse
  11. #
  12. def overlap_byte_counters(byte_counters):
  13. start_time = None
  14. finish_time = None
  15. for x in byte_counters:
  16. if start_time is None or x['start_time'] < start_time:
  17. start_time = x['start_time']
  18. #
  19. if finish_time is None or x['start_time']+len(x['history']) > finish_time:
  20. finish_time = x['start_time']+len(x['history'])
  21. #
  22. #
  23. total_history = [0]*(finish_time-start_time)
  24. #
  25. for x in byte_counters:
  26. for y in range(len(x['history'])):
  27. total_history[(x['start_time']-start_time)+y] += x['history'][y]
  28. #
  29. #
  30. return total_history
  31. #
  32. if __name__ == '__main__':
  33. logging.basicConfig(level=logging.DEBUG)
  34. #
  35. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  36. parser.add_argument('port', type=int, help='listen on port')
  37. parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
  38. parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0')
  39. args = parser.parse_args()
  40. #
  41. if args.localhost:
  42. endpoint = ('127.0.0.1', args.port)
  43. else:
  44. endpoint = ('0.0.0.0', args.port)
  45. #
  46. processes = []
  47. processes_map = {}
  48. joinable_connections = multiprocessing.Queue()
  49. joinable_connections_list = []
  50. conn_counter = [0]
  51. group_queue = multiprocessing.Queue()
  52. group_queue_list = []
  53. bw_queue = multiprocessing.Queue()
  54. bw_queue_list = []
  55. #
  56. def group_id_callback(conn_id, group_id):
  57. # put them in a queue to display later
  58. #logging.debug('For conn %d Received group id: %d', conn_id, group_id)
  59. group_queue.put({'conn_id':conn_id, 'group_id':group_id})
  60. #
  61. #def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate, byte_counter, byte_counter_start_time):
  62. def bw_callback(conn_id, custom_data, data_size, time_first_byte, time_last_byte, transfer_rate, deltas):
  63. # put them in a queue to display later
  64. #bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'byte_counter':byte_counter, 'byte_counter_start_time':byte_counter_start_time})
  65. bw_queue.put({'conn_id':conn_id, 'custom_data':custom_data, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'deltas':deltas})
  66. #
  67. def start_server_conn(socket, conn_id):
  68. server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback,
  69. bandwidth_callback=bw_callback, use_acceleration=(not args.no_accel))
  70. try:
  71. server.run()
  72. except KeyboardInterrupt:
  73. socket.close()
  74. finally:
  75. joinable_connections.put(conn_id)
  76. '''
  77. while True:
  78. # while we're waiting to join, we might get a KeyboardInterrupt,
  79. # in which case we cannot let the process end since it will kill
  80. # the queue threads, which may be waiting to push data to the pipe
  81. try:
  82. joinable_connections.close()
  83. group_queue.close()
  84. bw_queue.close()
  85. #
  86. group_queue.join_thread()
  87. bw_queue.join_thread()
  88. joinable_connections.join_thread()
  89. #
  90. break
  91. except KeyboardInterrupt:
  92. pass
  93. #
  94. #
  95. '''
  96. #
  97. #
  98. def accept_callback(socket):
  99. conn_id = conn_counter[0]
  100. conn_counter[0] += 1
  101. #logging.debug('Adding connection %d', conn_id)
  102. p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
  103. processes.append(p)
  104. processes_map[conn_id] = p
  105. p.start()
  106. socket.close()
  107. # close this process' copy of the socket
  108. #
  109. def unqueue(q, l, print_len=False):
  110. while True:
  111. val = q.get()
  112. if val is None:
  113. break
  114. #
  115. l.append(val)
  116. if print_len:
  117. print('Queue length: {}'.format(len(l)), end='\r')
  118. #
  119. #
  120. #
  121. l = basic_protocols.ServerListener(endpoint, accept_callback)
  122. #
  123. t_joinable_connections = threading.Thread(target=unqueue, args=(joinable_connections, joinable_connections_list))
  124. t_group_queue = threading.Thread(target=unqueue, args=(group_queue, group_queue_list))
  125. t_bw_queue = threading.Thread(target=unqueue, args=(bw_queue, bw_queue_list, True))
  126. #
  127. t_joinable_connections.start()
  128. t_group_queue.start()
  129. t_bw_queue.start()
  130. #
  131. try:
  132. while True:
  133. l.accept()
  134. '''
  135. try:
  136. while True:
  137. conn_id = joinable_connections.get(False)
  138. p = processes_map[conn_id]
  139. p.join()
  140. #
  141. except queue.Empty:
  142. pass
  143. #
  144. '''
  145. #
  146. except KeyboardInterrupt:
  147. print()
  148. #
  149. try:
  150. for p in processes:
  151. p.join()
  152. #
  153. except KeyboardInterrupt:
  154. pass
  155. #
  156. joinable_connections.put(None)
  157. group_queue.put(None)
  158. bw_queue.put(None)
  159. t_joinable_connections.join()
  160. t_group_queue.join()
  161. t_bw_queue.join()
  162. #
  163. bw_values = {}
  164. group_values = {}
  165. #
  166. '''
  167. logging.info('BW queue length: {}'.format(bw_queue.qsize()))
  168. logging.info('Group queue length: {}'.format(group_queue.qsize()))
  169. #
  170. temp_counter = 0
  171. try:
  172. while True:
  173. bw_val = bw_queue.get(False)
  174. bw_values[bw_val['conn_id']] = bw_val
  175. temp_counter += 1
  176. #
  177. except queue.Empty:
  178. pass
  179. #
  180. logging.info('temp counter: {}'.format(temp_counter))
  181. import time
  182. time.sleep(2)
  183. try:
  184. while True:
  185. bw_val = bw_queue.get(False)
  186. bw_values[bw_val['conn_id']] = bw_val
  187. temp_counter += 1
  188. #
  189. except queue.Empty:
  190. pass
  191. #
  192. logging.info('temp counter: {}'.format(temp_counter))
  193. #
  194. try:
  195. while True:
  196. group_val = group_queue.get(False)
  197. group_values[group_val['conn_id']] = group_val
  198. #
  199. except queue.Empty:
  200. pass
  201. #
  202. logging.info('bw_values length: {}'.format(len(bw_values)))
  203. logging.info('group_values length: {}'.format(len(group_values)))
  204. logging.info('group_values set: {}'.format(list(set([x['group_id'] for x in group_values.values()]))))
  205. #
  206. '''
  207. #
  208. #logging.info('BW list length: {}'.format(len(bw_queue_list)))
  209. #logging.info('Group list length: {}'.format(len(group_queue_list)))
  210. #
  211. for x in bw_queue_list:
  212. bw_values[x['conn_id']] = x
  213. #
  214. for x in group_queue_list:
  215. group_values[x['conn_id']] = x
  216. #
  217. group_set = set([x['group_id'] for x in group_values.values()])
  218. for group in group_set:
  219. # doesn't handle group == None
  220. conns_in_group = [x[0] for x in group_values.items() if x[1]['group_id'] == group]
  221. in_group = [x for x in bw_values.values() if x['conn_id'] in conns_in_group]
  222. if len(in_group) > 0:
  223. avg_data_size = sum([x['data_size'] for x in in_group])/len(in_group)
  224. avg_transfer_rate = sum([x['transfer_rate'] for x in in_group])/len(in_group)
  225. total_transfer_rate = sum([x['data_size'] for x in in_group])/(max([x['time_of_last_byte'] for x in in_group])-min([x['time_of_first_byte'] for x in in_group]))
  226. #
  227. logging.info('Group size: %d', len(in_group))
  228. logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
  229. logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
  230. logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
  231. #
  232. '''
  233. import math
  234. histories = [{'start_time':x['byte_counter_start_time'], 'history':x['byte_counter']} for x in in_group]
  235. total_history = overlap_byte_counters(histories)
  236. #
  237. logging.info('Max Transfer rate (MiB/s): %.4f', max(total_history)/(1024**2))
  238. if sum(total_history) != sum([x['data_size'] for x in in_group]):
  239. logging.warning('History doesn\'t add up ({} != {}).'.format(sum(total_history), sum([x['data_size'] for x in in_group])))
  240. #
  241. import json
  242. with open('/tmp/group-{}.json'.format(group), 'w') as f:
  243. json.dump({'id':group, 'history':total_history, 'individual_histories':histories, 'size':len(in_group), 'avg_transferred':avg_data_size,
  244. 'avg_transfer_rate':avg_transfer_rate, 'total_transfer_rate':total_transfer_rate}, f)
  245. #
  246. '''
  247. custom_data = [x['custom_data'].decode('utf-8') for x in in_group]
  248. #
  249. histories = [x['deltas'] for x in in_group]
  250. combined_timestamps, combined_bytes = zip(*sorted(zip([x for y in histories for x in y['timestamps']],
  251. [x for y in histories for x in y['bytes']])))
  252. combined_history = {'bytes':combined_bytes, 'timestamps':combined_timestamps}
  253. #combined_history = sorted([item for sublist in histories for item in sublist['deltas']], key=lambda x: x['timestamp'])
  254. #
  255. sum_history_bytes = sum(combined_history['bytes'])
  256. sum_data_bytes = sum([x['data_size'] for x in in_group])
  257. if sum_history_bytes != sum_data_bytes:
  258. logging.warning('History doesn\'t add up ({} != {}).'.format(sum_history_bytes, sum_data_bytes))
  259. #
  260. import json
  261. import gzip
  262. with gzip.GzipFile('/tmp/group-{}.json.gz'.format(group), 'w') as f:
  263. f.write(json.dumps({'id':group, 'history':combined_history, 'individual_histories':histories, 'size':len(in_group),
  264. 'avg_transferred':avg_data_size, 'avg_transfer_rate':avg_transfer_rate,
  265. 'total_transfer_rate':total_transfer_rate, 'custom_data':custom_data}, f).encode('utf-8'))
  266. #
  267. #
  268. #
  269. #
  270. for p in processes:
  271. p.join()
  272. #
  273. #