123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- #!/usr/bin/python3
- #
- import throughput_protocols
- import basic_protocols
- import os
- import multiprocessing
- import threading
- import queue
- import logging
- import argparse
- import useful
- #
- class ThroughputServer:
- def __init__(self, bind_endpoint, stop_event=None):
- self.bind = bind_endpoint
- self.stop_event = stop_event
- #
- self.server_listener = basic_protocols.ServerListener(bind_endpoint, self._accept_callback)
- #
- self.accepted_pending = []
- self.processes = []
- self.process_counter = 0
- #
- self.results_queue = multiprocessing.Queue()
- self.results = []
- self.results_getter = useful.QueueGetter(self.results_queue, self.results.append)
- #
- if self.stop_event is not None:
- self.event_thread = threading.Thread(target=self._wait_for_event, args=(self.stop_event, self._stop))
- self.event_thread.start()
- else:
- self.event_thread = None
- #
- #
- def _accept_callback(self, socket):
- conn_id = self.process_counter
- self.process_counter += 1
- logging.info('Server protocol id: {} accepted'.format(conn_id))
- #
- self.accepted_pending.append((socket, conn_id))
- #
- def _start_new_process(self, socket, conn_id):
- p = multiprocessing.Process(target=self._start_server_conn, args=(socket, conn_id))
- self.processes.append(p)
- p.start()
- #
- # close this process' copy of the socket
- socket.close()
- #
- def _start_server_conn(self, socket, conn_id):
- results_callback = lambda results: self.results_queue.put({'conn_id':conn_id, 'results':results})
- protocol = throughput_protocols.ServerProtocol(socket, results_callback=results_callback,
- use_acceleration=True)
- try:
- logging.info('Server protocol id: {} starting'.format(conn_id))
- protocol.run()
- except KeyboardInterrupt:
- logging.info('Server protocol id: {} stopped (KeyboardInterrupt)'.format(conn_id))
- except:
- logging.exception('Server protocol id: {} had an error'.format(conn_id))
- finally:
- logging.info('Server protocol id: {} done'.format(conn_id))
- socket.close()
- #
- #
- def _wait_for_event(self, event, callback):
- event.wait()
- callback()
- #
- def _stop(self):
- self.server_listener.stop()
- #
- def run(self):
- try:
- while True:
- block = True
- if len(self.accepted_pending) > 0:
- block = False
- #
- accepted_something = self.server_listener.accept(False)
- if not accepted_something and len(self.accepted_pending) > 0:
- # didn't accept anything and we have things to do
- (socket, conn_id) = self.accepted_pending.pop(0)
- self._start_new_process(socket, conn_id)
- #
- #
- except OSError as e:
- if e.errno == 22 and self.stop_event is not None and self.stop_event.is_set():
- # we closed the socket on purpose
- pass
- else:
- raise
- #
- finally:
- if self.stop_event is not None:
- # set the event to stop the thread
- self.stop_event.set()
- #
- if self.event_thread is not None:
- # make sure the event thread is stopped
- self.event_thread.join()
- #
- for p in self.processes:
- # wait for all processes to finish
- p.join()
- #
- # finish reading from the results queue
- self.results_getter.stop()
- self.results_getter.join()
- #
- #
- #
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
- parser.add_argument('port', type=int, help='listen on port')
- parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0')
- args = parser.parse_args()
- #
- if args.localhost:
- bind_to = ('127.0.0.1', args.port)
- else:
- bind_to = ('0.0.0.0', args.port)
- #
- #stop_event = multiprocessing.Event()
- server = ThroughputServer(bind_to, None)
- try:
- server.run()
- except KeyboardInterrupt:
- print('')
- logging.debug('Server stopped (KeyboardInterrupt).')
- #
- #t = threading.Thread(target=server.run)
- #t.start()
- #try:
- # t.join()
- #except KeyboardInterrupt:
- # print('')
- # stop_event.set()
- #
-
- #p = multiprocessing.Process(target=server.run)
- #p.start()
- #try:
- # p.join()
- #except KeyboardInterrupt:
- # print('')
- # stop_event.set()
- #
- results = server.results
- #
- group_ids = list(set([x['results']['custom_data'] for x in results]))
- groups = [(g, [r['results'] for r in results if r['results']['custom_data'] == g]) for g in group_ids]
- #
- for (group_id, group) in groups:
- avg_data_size = sum([x['data_size'] for x in group])/len(group)
- avg_transfer_rate = sum([x['transfer_rate'] for x in group])/len(group)
- time_of_first_byte = min([x['time_of_first_byte'] for x in group])
- time_of_last_byte = max([x['time_of_last_byte'] for x in group])
- total_transfer_rate = sum([x['data_size'] for x in group])/(time_of_last_byte-time_of_first_byte)
- #
- logging.info('Group id: %s', int.from_bytes(group_id, byteorder='big') if len(group_id)!=0 else None)
- logging.info(' Group size: %d', len(group))
- logging.info(' Avg Transferred (MiB): %.4f', avg_data_size/(1024**2))
- logging.info(' Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2))
- logging.info(' Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2))
- #
- #
|