basic_protocols.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. #!/usr/bin/python3
  2. #
  3. import socket
  4. import struct
  5. import logging
  6. import time
  7. import enum
  8. import select
  9. import os
  10. #
  11. import accelerated_functions
  12. #
  13. class ProtocolException(Exception):
  14. pass
  15. #
  16. class ProtocolHelper():
  17. def __init__(self):
  18. self._buffer = b''
  19. #
  20. def set_buffer(self, data):
  21. """
  22. Set the buffer contents to the data that you wish to send.
  23. """
  24. #
  25. self._buffer = data
  26. #
  27. def get_buffer(self):
  28. return self._buffer
  29. #
  30. def recv(self, socket, num_bytes):
  31. """
  32. Try to fill up the buffer to a max of 'num_bytes'. If the buffer is filled,
  33. return True, otherwise return False.
  34. """
  35. #
  36. data = socket.recv(num_bytes-len(self._buffer))
  37. #
  38. if len(data) == 0:
  39. raise ProtocolException('The socket was closed.')
  40. #
  41. self._buffer += data
  42. if len(self._buffer) == num_bytes:
  43. return True
  44. #
  45. return False
  46. #
  47. def send(self, socket):
  48. """
  49. Try to send the remainder of the buffer. If the entire buffer has been sent,
  50. return True, otherwise return False.
  51. """
  52. #
  53. n = socket.send(self._buffer)
  54. self._buffer = self._buffer[n:]
  55. if len(self._buffer) == 0:
  56. return True
  57. #
  58. return False
  59. #
  60. #
  61. class Protocol():
  62. def _run_iteration(self):
  63. """
  64. This function should be overridden. It runs a single iteration of the protocol.
  65. """
  66. #
  67. pass
  68. #
  69. def run(self):
  70. while True:
  71. finished = self._run_iteration()
  72. #
  73. if finished:
  74. # protocol is done
  75. return True
  76. #
  77. #
  78. #
  79. def get_desc(self):
  80. """
  81. This function can be overridden.
  82. """
  83. #
  84. return None
  85. #
  86. #
  87. class FakeProxyProtocol(Protocol):
  88. def __init__(self, socket, addr_port):
  89. self.socket = socket
  90. self.addr_port = addr_port
  91. #
  92. self.states = enum.Enum('PROXY_STATES', 'READY_TO_BEGIN CONNECTING_TO_PROXY DONE')
  93. self.state = self.states.READY_TO_BEGIN
  94. #
  95. self.protocol_helper = None
  96. #
  97. def _run_iteration(self):
  98. if self.state is self.states.READY_TO_BEGIN:
  99. self.protocol_helper = ProtocolHelper()
  100. host, port = self.addr_port
  101. addr = socket.inet_aton(host)[::-1]
  102. self.protocol_helper.set_buffer(addr+struct.pack('!H', port))
  103. self.state = self.states.CONNECTING_TO_PROXY
  104. #
  105. if self.state is self.states.CONNECTING_TO_PROXY:
  106. if self.protocol_helper.send(self.socket):
  107. self.protocol_helper = ProtocolHelper()
  108. self.state = self.states.DONE
  109. #
  110. #
  111. if self.state is self.states.DONE:
  112. return True
  113. #
  114. return False
  115. #
  116. #
  117. class ChainedProtocol(Protocol):
  118. def __init__(self, protocols):
  119. self.protocols = protocols
  120. self.current_protocol = 0
  121. #
  122. self.states = enum.Enum('CHAIN_STATES', 'READY_TO_BEGIN RUNNING DONE')
  123. self.state = self.states.READY_TO_BEGIN
  124. #
  125. def _run_iteration(self):
  126. if self.state is self.states.READY_TO_BEGIN:
  127. self.state = self.states.RUNNING
  128. #
  129. if self.state is self.states.RUNNING:
  130. if self.protocols[self.current_protocol] is None or self.protocols[self.current_protocol].run():
  131. self.current_protocol += 1
  132. #
  133. if self.current_protocol >= len(self.protocols):
  134. self.state = self.states.DONE
  135. #
  136. #
  137. if self.state is self.states.DONE:
  138. return True
  139. #
  140. return False
  141. #
  142. #
  143. class Socks4Protocol(Protocol):
  144. def __init__(self, socket, addr_port, username=None):
  145. self.socket = socket
  146. self.addr_port = addr_port
  147. self.username = username
  148. #
  149. self.states = enum.Enum('SOCKS_4_STATES', 'READY_TO_BEGIN CONNECTING_TO_PROXY WAITING_FOR_PROXY DONE')
  150. self.state = self.states.READY_TO_BEGIN
  151. #
  152. self.protocol_helper = None
  153. #
  154. def _run_iteration(self):
  155. if self.state is self.states.READY_TO_BEGIN:
  156. self.protocol_helper = ProtocolHelper()
  157. self.protocol_helper.set_buffer(self.socks_cmd(self.addr_port, self.username))
  158. self.state = self.states.CONNECTING_TO_PROXY
  159. #
  160. if self.state is self.states.CONNECTING_TO_PROXY:
  161. if self.protocol_helper.send(self.socket):
  162. self.protocol_helper = ProtocolHelper()
  163. self.state = self.states.WAITING_FOR_PROXY
  164. #logging.debug('Waiting for reply from proxy')
  165. #
  166. #
  167. if self.state is self.states.WAITING_FOR_PROXY:
  168. response_size = 8
  169. if self.protocol_helper.recv(self.socket, response_size):
  170. response = self.protocol_helper.get_buffer()
  171. if response[1] != 0x5a:
  172. raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
  173. #
  174. self.state = self.states.DONE
  175. #
  176. #
  177. if self.state is self.states.DONE:
  178. return True
  179. #
  180. return False
  181. #
  182. def socks_cmd(self, addr_port, username=None):
  183. socks_version = 4
  184. command = 1
  185. dnsname = b''
  186. host, port = addr_port
  187. #
  188. try:
  189. username = bytes(username, 'utf8')
  190. except TypeError:
  191. pass
  192. #
  193. if username is None:
  194. username = b''
  195. elif b'\x00' in username:
  196. raise ProtocolException('Username cannot contain a NUL character.')
  197. #
  198. username = username+b'\x00'
  199. #
  200. try:
  201. addr = socket.inet_aton(host)
  202. except socket.error:
  203. addr = b'\x00\x00\x00\x01'
  204. dnsname = bytes(host, 'utf8')+b'\x00'
  205. #
  206. return struct.pack('!BBH', socks_version, command, port) + addr + username + dnsname
  207. #
  208. #
  209. class PushDataProtocol(Protocol):
  210. def __init__(self, socket, total_bytes, send_buffer_len=None, use_acceleration=None, push_start_cb=None, push_done_cb=None):
  211. if send_buffer_len is None:
  212. send_buffer_len = 1024*512
  213. #
  214. if use_acceleration is None:
  215. use_acceleration = True
  216. #
  217. self.socket = socket
  218. self.total_bytes = total_bytes
  219. self.use_acceleration = use_acceleration
  220. self.push_start_cb = push_start_cb
  221. self.push_done_cb = push_done_cb
  222. #
  223. self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO START_CALLBACK PUSH_DATA RECV_CONFIRMATION DONE_CALLBACK DONE')
  224. self.state = self.states.READY_TO_BEGIN
  225. #
  226. self.byte_buffer = os.urandom(send_buffer_len)
  227. self.bytes_written = 0
  228. self.time_started_push = None
  229. self.protocol_helper = None
  230. #
  231. def _run_iteration(self):
  232. if self.state is self.states.READY_TO_BEGIN:
  233. info = self.total_bytes.to_bytes(8, byteorder='big', signed=False)
  234. info += len(self.byte_buffer).to_bytes(8, byteorder='big', signed=False)
  235. self.protocol_helper = ProtocolHelper()
  236. self.protocol_helper.set_buffer(info)
  237. self.state = self.states.SEND_INFO
  238. #
  239. if self.state is self.states.SEND_INFO:
  240. if self.protocol_helper.send(self.socket):
  241. self.state = self.states.START_CALLBACK
  242. #
  243. #
  244. if self.state is self.states.START_CALLBACK:
  245. if self.push_start_cb is not None:
  246. self.push_start_cb()
  247. #
  248. self.state = self.states.PUSH_DATA
  249. self.time_started_push = time.time()
  250. #
  251. if self.state is self.states.PUSH_DATA:
  252. if self.use_acceleration:
  253. ret_val = accelerated_functions.push_data(self.socket.fileno(), self.total_bytes, self.byte_buffer)
  254. if ret_val < 0:
  255. raise ProtocolException('Error while pushing data.')
  256. #
  257. self.bytes_written = self.total_bytes
  258. else:
  259. bytes_remaining = self.total_bytes-self.bytes_written
  260. data_size = min(len(self.byte_buffer), bytes_remaining)
  261. if data_size != len(self.byte_buffer):
  262. data = self.byte_buffer[:data_size]
  263. else:
  264. data = self.byte_buffer
  265. # don't make a copy of the byte string each time if we don't need to
  266. #
  267. n = self.socket.send(data)
  268. self.bytes_written += n
  269. #
  270. if self.bytes_written == self.total_bytes:
  271. # finished sending the data
  272. logging.debug('Finished sending the data (%d bytes).', self.bytes_written)
  273. self.protocol_helper = ProtocolHelper()
  274. self.state = self.states.RECV_CONFIRMATION
  275. #
  276. #
  277. if self.state is self.states.RECV_CONFIRMATION:
  278. response_size = 8
  279. if self.protocol_helper.recv(self.socket, response_size):
  280. response = self.protocol_helper.get_buffer()
  281. if response != b'RECEIVED':
  282. raise ProtocolException('Did not receive the expected message: {}'.format(response))
  283. #
  284. self.state = self.states.DONE_CALLBACK
  285. #
  286. #
  287. if self.state is self.states.DONE_CALLBACK:
  288. if self.push_done_cb is not None:
  289. self.push_done_cb()
  290. #
  291. self.state = self.states.DONE
  292. #
  293. if self.state is self.states.DONE:
  294. return True
  295. #
  296. return False
  297. #
  298. #
  299. class PullDataProtocol(Protocol):
  300. def __init__(self, socket, use_acceleration=None):
  301. if use_acceleration is None:
  302. use_acceleration = True
  303. #
  304. self.socket = socket
  305. self.use_acceleration = use_acceleration
  306. #
  307. self.states = enum.Enum('PULL_DATA_STATES', 'READY_TO_BEGIN RECV_INFO PULL_DATA SEND_CONFIRMATION DONE')
  308. self.state = self.states.READY_TO_BEGIN
  309. #
  310. self.data_size = None
  311. self.recv_buffer_len = None
  312. self.bytes_read = 0
  313. self.protocol_helper = None
  314. self.time_of_first_byte = None
  315. self.time_of_last_byte = None
  316. #self.byte_counter = None
  317. #self.byte_counter_start_time = None
  318. self.deltas = None
  319. #
  320. def _run_iteration(self):
  321. if self.state is self.states.READY_TO_BEGIN:
  322. self.protocol_helper = ProtocolHelper()
  323. self.state = self.states.RECV_INFO
  324. #
  325. if self.state is self.states.RECV_INFO:
  326. info_size = 16
  327. if self.protocol_helper.recv(self.socket, info_size):
  328. response = self.protocol_helper.get_buffer()
  329. self.data_size = int.from_bytes(response[0:8], byteorder='big', signed=False)
  330. self.recv_buffer_len = int.from_bytes(response[8:16], byteorder='big', signed=False)
  331. assert(self.recv_buffer_len <= 10*1024*1024)
  332. # don't use a buffer size larget than 10 MiB to avoid using up all memory
  333. self.state = self.states.PULL_DATA
  334. #
  335. #
  336. if self.state is self.states.PULL_DATA:
  337. if self.use_acceleration:
  338. #(ret_val, time_of_first_byte, time_of_last_byte, byte_counter, byte_counter_start_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
  339. (ret_val, time_of_first_byte, time_of_last_byte, deltas) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, self.recv_buffer_len)
  340. if ret_val < 0:
  341. raise ProtocolException('Error while pulling data.')
  342. #
  343. #if sum(byte_counter) != self.data_size:
  344. if sum(deltas['bytes']) != self.data_size:
  345. logging.warning('Lost some history data ({} != {}).'.format(sum(deltas['bytes']), self.data_size))
  346. #
  347. self.bytes_read = self.data_size
  348. self.time_of_first_byte = time_of_first_byte
  349. self.time_of_last_byte = time_of_last_byte
  350. #self.byte_counter = byte_counter
  351. #self.byte_counter_start_time = byte_counter_start_time
  352. self.deltas = deltas
  353. else:
  354. bytes_remaining = self.data_size-self.bytes_read
  355. block_size = min(self.recv_buffer_len, bytes_remaining)
  356. #
  357. data = self.socket.recv(block_size)
  358. #
  359. if len(data) == 0:
  360. raise ProtocolException('The socket was closed.')
  361. #
  362. self.bytes_read += len(data)
  363. #
  364. if self.bytes_read != 0 and self.time_of_first_byte is None:
  365. self.time_of_first_byte = time.time()
  366. #
  367. if self.bytes_read == self.data_size and self.time_of_last_byte is None:
  368. self.time_of_last_byte = time.time()
  369. #
  370. #
  371. if self.bytes_read == self.data_size:
  372. # finished receiving the data
  373. logging.debug('Finished receiving the data.')
  374. self.protocol_helper = ProtocolHelper()
  375. self.protocol_helper.set_buffer(b'RECEIVED')
  376. self.state = self.states.SEND_CONFIRMATION
  377. #
  378. #
  379. if self.state is self.states.SEND_CONFIRMATION:
  380. if self.protocol_helper.send(self.socket):
  381. self.state = self.states.DONE
  382. #
  383. #
  384. if self.state is self.states.DONE:
  385. return True
  386. #
  387. return False
  388. #
  389. def calc_transfer_rate(self):
  390. """ Returns bytes/s. """
  391. assert self.data_size is not None and self.time_of_first_byte is not None and self.time_of_last_byte is not None
  392. try:
  393. return self.data_size/(self.time_of_last_byte-self.time_of_first_byte)
  394. except ZeroDivisionError:
  395. return float('nan')
  396. #
  397. #
  398. #
  399. class SendDataProtocol(Protocol):
  400. def __init__(self, socket, data):
  401. self.socket = socket
  402. self.send_data = data
  403. #
  404. self.states = enum.Enum('SEND_DATA_STATES', 'READY_TO_BEGIN SEND_INFO SEND_DATA RECV_CONFIRMATION DONE')
  405. self.state = self.states.READY_TO_BEGIN
  406. #
  407. self.protocol_helper = None
  408. #
  409. def _run_iteration(self):
  410. if self.state is self.states.READY_TO_BEGIN:
  411. info_size = 20
  412. info = len(self.send_data).to_bytes(info_size, byteorder='big', signed=False)
  413. self.protocol_helper = ProtocolHelper()
  414. self.protocol_helper.set_buffer(info)
  415. self.state = self.states.SEND_INFO
  416. #
  417. if self.state is self.states.SEND_INFO:
  418. if self.protocol_helper.send(self.socket):
  419. self.protocol_helper = ProtocolHelper()
  420. if len(self.send_data) > 0:
  421. self.protocol_helper.set_buffer(self.send_data)
  422. self.state = self.states.SEND_DATA
  423. else:
  424. self.state = self.states.RECV_CONFIRMATION
  425. #
  426. #
  427. #
  428. if self.state is self.states.SEND_DATA:
  429. if self.protocol_helper.send(self.socket):
  430. self.protocol_helper = ProtocolHelper()
  431. self.state = self.states.RECV_CONFIRMATION
  432. #
  433. #
  434. if self.state is self.states.RECV_CONFIRMATION:
  435. response_size = 8
  436. if self.protocol_helper.recv(self.socket, response_size):
  437. response = self.protocol_helper.get_buffer()
  438. if response != b'RECEIVED':
  439. raise ProtocolException('Did not receive the expected message: {}'.format(response))
  440. #
  441. self.state = self.states.DONE
  442. #
  443. #
  444. if self.state is self.states.DONE:
  445. return True
  446. #
  447. return False
  448. #
  449. #
  450. class ReceiveDataProtocol(Protocol):
  451. def __init__(self, socket):
  452. self.socket = socket
  453. #
  454. self.states = enum.Enum('RECV_DATA_STATES', 'READY_TO_BEGIN RECV_INFO RECV_DATA SEND_CONFIRMATION DONE')
  455. self.state = self.states.READY_TO_BEGIN
  456. #
  457. self.protocol_helper = None
  458. self.data_size = None
  459. self.received_data = None
  460. #
  461. def _run_iteration(self):
  462. if self.state is self.states.READY_TO_BEGIN:
  463. self.protocol_helper = ProtocolHelper()
  464. self.state = self.states.RECV_INFO
  465. #
  466. if self.state is self.states.RECV_INFO:
  467. info_size = 20
  468. if self.protocol_helper.recv(self.socket, info_size):
  469. response = self.protocol_helper.get_buffer()
  470. self.data_size = int.from_bytes(response, byteorder='big', signed=False)
  471. self.protocol_helper = ProtocolHelper()
  472. if self.data_size > 0:
  473. self.state = self.states.RECV_DATA
  474. else:
  475. self.received_data = b''
  476. self.protocol_helper.set_buffer(b'RECEIVED')
  477. self.state = self.states.SEND_CONFIRMATION
  478. #
  479. #
  480. #
  481. if self.state is self.states.RECV_DATA:
  482. if self.protocol_helper.recv(self.socket, self.data_size):
  483. response = self.protocol_helper.get_buffer()
  484. self.received_data = response
  485. self.protocol_helper = ProtocolHelper()
  486. self.protocol_helper.set_buffer(b'RECEIVED')
  487. self.state = self.states.SEND_CONFIRMATION
  488. #
  489. #
  490. if self.state is self.states.SEND_CONFIRMATION:
  491. if self.protocol_helper.send(self.socket):
  492. self.state = self.states.DONE
  493. #
  494. #
  495. if self.state is self.states.DONE:
  496. return True
  497. #
  498. return False
  499. #
  500. #
  501. class ServerListener():
  502. def __init__(self, bind_endpoint, accept_callback):
  503. self.callback = accept_callback
  504. #
  505. self.s = socket.socket()
  506. self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  507. self.s.setblocking(0)
  508. self.s.bind(bind_endpoint)
  509. self.s.listen(1000)
  510. #
  511. def accept(self, block=True):
  512. if block:
  513. (readable, _, _) = select.select([self.s], [], [])
  514. else:
  515. readable = [self.s]
  516. #
  517. try:
  518. (newsock, endpoint) = self.s.accept()
  519. logging.debug("New client from %s:%d (fd=%d)",
  520. endpoint[0], endpoint[1], newsock.fileno())
  521. self.callback(newsock)
  522. return True
  523. except BlockingIOError:
  524. return False
  525. #
  526. #
  527. def stop(self):
  528. self.s.shutdown(socket.SHUT_RDWR)
  529. # use 'shutdown' rather than 'close' since 'close' won't stop a blocking 'accept' call
  530. #
  531. #
  532. class SimpleClientConnectionProtocol(Protocol):
  533. def __init__(self, endpoint, total_bytes, data_generator=None, proxy=None, username=None):
  534. self.endpoint = endpoint
  535. self.data_generator = data_generator
  536. self.total_bytes = total_bytes
  537. self.proxy = proxy
  538. self.username = username
  539. #
  540. self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY PUSH_DATA DONE')
  541. self.state = self.states.READY_TO_BEGIN
  542. #
  543. self.socket = socket.socket()
  544. self.sub_protocol = None
  545. #
  546. if self.proxy is None:
  547. logging.debug('Socket %d connecting to endpoint %r...', self.socket.fileno(), self.endpoint)
  548. self.socket.connect(self.endpoint)
  549. else:
  550. logging.debug('Socket %d connecting to proxy %r...', self.socket.fileno(), self.proxy)
  551. self.socket.connect(self.proxy)
  552. #
  553. #
  554. def _run_iteration(self):
  555. if self.state is self.states.READY_TO_BEGIN:
  556. if self.proxy is None:
  557. self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
  558. self.state = self.states.PUSH_DATA
  559. else:
  560. self.sub_protocol = Socks4Protocol(self.socket, self.endpoint, username=self.username)
  561. self.state = self.states.CONNECT_TO_PROXY
  562. #
  563. #
  564. if self.state is self.states.CONNECT_TO_PROXY:
  565. if self.sub_protocol.run():
  566. self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
  567. self.state = self.states.PUSH_DATA
  568. #
  569. #
  570. if self.state is self.states.PUSH_DATA:
  571. if self.sub_protocol.run():
  572. self.state = self.states.DONE
  573. #
  574. #
  575. if self.state is self.states.DONE:
  576. return True
  577. #
  578. return False
  579. #
  580. #
  581. class SimpleServerConnectionProtocol(Protocol):
  582. def __init__(self, socket, conn_id, bandwidth_callback=None):
  583. self.socket = socket
  584. self.conn_id = conn_id
  585. self.bandwidth_callback = bandwidth_callback
  586. #
  587. self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN PULL_DATA DONE')
  588. self.state = self.states.READY_TO_BEGIN
  589. #
  590. self.sub_protocol = None
  591. #
  592. def _run_iteration(self):
  593. if self.state is self.states.READY_TO_BEGIN:
  594. self.sub_protocol = PullDataProtocol(self.socket)
  595. self.state = self.states.PULL_DATA
  596. #
  597. if self.state is self.states.PULL_DATA:
  598. if self.sub_protocol.run():
  599. if self.bandwidth_callback:
  600. self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.calc_transfer_rate())
  601. #
  602. self.state = self.states.DONE
  603. #
  604. #
  605. if self.state is self.states.DONE:
  606. return True
  607. #
  608. return False
  609. #
  610. #
  611. if __name__ == '__main__':
  612. import sys
  613. logging.basicConfig(level=logging.DEBUG)
  614. #
  615. if sys.argv[1] == 'client':
  616. endpoint = ('127.0.0.1', 4747)
  617. #proxy = ('127.0.0.1', 9003)
  618. proxy = None
  619. username = bytes([x for x in os.urandom(12) if x != 0])
  620. #username = None
  621. data_MB = 4000
  622. #
  623. client = SimpleClientConnectionProtocol(endpoint, data_MB*2**20, proxy=proxy, username=username)
  624. client.run()
  625. elif sys.argv[1] == 'server':
  626. import multiprocessing
  627. import queue
  628. #
  629. endpoint = ('127.0.0.1', 4747)
  630. processes = []
  631. conn_counter = [0]
  632. #
  633. def bw_callback(conn_id, data_size, transfer_rate):
  634. logging.info('Avg Transferred (MB): %.4f', data_size/(1024**2))
  635. logging.info('Avg Transfer rate (MB/s): %.4f', transfer_rate/(1024**2))
  636. #
  637. def start_server_conn(socket, conn_id):
  638. server = SimpleServerConnectionProtocol(socket, conn_id, bandwidth_callback=bw_callback)
  639. try:
  640. server.run()
  641. except KeyboardInterrupt:
  642. socket.close()
  643. #
  644. #
  645. def accept_callback(socket):
  646. conn_id = conn_counter[0]
  647. conn_counter[0] += 1
  648. #
  649. p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
  650. processes.append(p)
  651. p.start()
  652. #
  653. l = ServerListener(endpoint, accept_callback)
  654. #
  655. try:
  656. while True:
  657. l.accept()
  658. #
  659. except KeyboardInterrupt:
  660. print()
  661. #
  662. for p in processes:
  663. p.join()
  664. #
  665. #
  666. #