basic_protocols.py 15 KB


  1. #!/usr/bin/python3
  2. #
  3. import socket
  4. import struct
  5. import logging
  6. import time
  7. import enum
  8. import select
  9. import os
  10. #
  11. import accelerated_functions
  12. #
  13. class ProtocolException(Exception):
  14. pass
  15. #
  16. class ProtocolHelper():
  17. def __init__(self):
  18. self._buffer = b''
  19. #
  20. def set_buffer(self, data):
  21. self._buffer = data
  22. #
  23. def get_buffer(self):
  24. return self._buffer
  25. #
  26. def recv(self, socket, num_bytes):
  27. data = socket.recv(num_bytes-len(self._buffer))
  28. self._buffer += data
  29. if len(self._buffer) == num_bytes:
  30. return True
  31. #
  32. return False
  33. #
  34. def send(self, socket):
  35. n = socket.send(self._buffer)
  36. self._buffer = self._buffer[n:]
  37. if len(self._buffer) == 0:
  38. return True
  39. #
  40. return False
  41. #
  42. #
  43. class Protocol():
  44. def _run_iteration(self, block=True):
  45. pass
  46. #
  47. def run(self, block=True):
  48. while True:
  49. finished = self._run_iteration(block=block)
  50. #
  51. if finished:
  52. # protocol is done
  53. return True
  54. elif not block:
  55. # not done the protocol yet, but don't block
  56. return False
  57. #
  58. #
  59. #
  60. #
  61. class Socks4Protocol(Protocol):
  62. def __init__(self, socket, addr_port, username=None):
  63. self.socket = socket
  64. self.addr_port = addr_port
  65. self.username = username
  66. #
  67. self.states = enum.Enum('SOCKS_4_STATES', 'READY_TO_BEGIN CONNECTING_TO_PROXY WAITING_FOR_PROXY DONE')
  68. self.state = self.states.READY_TO_BEGIN
  69. #
  70. self.protocol_helper = None
  71. #
  72. def _run_iteration(self, block=True):
  73. if self.state is self.states.READY_TO_BEGIN:
  74. self.protocol_helper = ProtocolHelper()
  75. self.protocol_helper.set_buffer(self.socks_cmd(self.addr_port, self.username))
  76. self.state = self.states.CONNECTING_TO_PROXY
  77. #
  78. if self.state is self.states.CONNECTING_TO_PROXY:
  79. if self.protocol_helper.send(self.socket):
  80. self.protocol_helper = ProtocolHelper()
  81. self.state = self.states.WAITING_FOR_PROXY
  82. #logging.debug('Waiting for reply from proxy')
  83. #
  84. #
  85. if self.state is self.states.WAITING_FOR_PROXY:
  86. response_size = 8
  87. if self.protocol_helper.recv(self.socket, response_size):
  88. response = self.protocol_helper.get_buffer()
  89. if response[1] != 0x5a:
  90. raise ProtocolException('Could not connect to SOCKS proxy, msg: %x'%(response[1],))
  91. #
  92. self.state = self.states.DONE
  93. return True
  94. #
  95. #
  96. return False
  97. #
  98. def socks_cmd(self, addr_port, username=None):
  99. socks_version = 4
  100. command = 1
  101. dnsname = b''
  102. host, port = addr_port
  103. #
  104. try:
  105. username = bytes(username, 'utf8')
  106. except TypeError:
  107. pass
  108. #
  109. if username is None:
  110. username = b''
  111. elif b'\x00' in username:
  112. raise ProtocolException('Username cannot contain a NUL character.')
  113. #
  114. username = username+b'\x00'
  115. #
  116. try:
  117. addr = socket.inet_aton(host)
  118. except socket.error:
  119. addr = b'\x00\x00\x00\x01'
  120. dnsname = bytes(host, 'utf8')+b'\x00'
  121. #
  122. return struct.pack('!BBH', socks_version, command, port) + addr + username + dnsname
  123. #
  124. #
  125. class PushDataProtocol(Protocol):
  126. def __init__(self, socket, total_bytes, data_generator=None, send_max_bytes=1024*512, use_accelerated=True):
  127. if data_generator is None:
  128. data_generator = self._default_data_generator
  129. #
  130. self.socket = socket
  131. self.data_generator = data_generator
  132. self.total_bytes = total_bytes
  133. self.send_max_bytes = send_max_bytes
  134. self.use_accelerated = use_accelerated
  135. #
  136. self.states = enum.Enum('PUSH_DATA_STATES', 'READY_TO_BEGIN SEND_INFO PUSH_DATA RECV_CONFIRMATION DONE')
  137. self.state = self.states.READY_TO_BEGIN
  138. #
  139. self.bytes_written = 0
  140. self.protocol_helper = None
  141. #
  142. def _run_iteration(self, block=True):
  143. if self.state is self.states.READY_TO_BEGIN:
  144. info = self.total_bytes.to_bytes(8, byteorder='big', signed=False)
  145. info += self.send_max_bytes.to_bytes(8, byteorder='big', signed=False)
  146. self.protocol_helper = ProtocolHelper()
  147. self.protocol_helper.set_buffer(info)
  148. self.state = self.states.SEND_INFO
  149. #
  150. if self.state is self.states.SEND_INFO:
  151. if self.protocol_helper.send(self.socket):
  152. self.state = self.states.PUSH_DATA
  153. #
  154. #
  155. if self.state is self.states.PUSH_DATA:
  156. max_block_size = self.send_max_bytes
  157. block_size = min(max_block_size, self.total_bytes-self.bytes_written)
  158. data = self.data_generator(self.bytes_written, block_size)
  159. #
  160. if self.use_accelerated:
  161. if not block:
  162. logging.warning('Protocol set to non-blocking, but using the blocking accelerated function.')
  163. #
  164. ret_val = accelerated_functions.push_data(self.socket.fileno(), self.total_bytes, data)
  165. if ret_val < 0:
  166. raise ProtocolException('Error while pushing data.')
  167. #
  168. self.bytes_written = self.total_bytes
  169. else:
  170. n = self.socket.send(data)
  171. self.bytes_written += n
  172. #
  173. if self.bytes_written >= self.total_bytes:
  174. # finished sending the data
  175. logging.debug('Finished sending the data (%d bytes).', self.bytes_written)
  176. self.protocol_helper = ProtocolHelper()
  177. self.state = self.states.RECV_CONFIRMATION
  178. #
  179. #
  180. if self.state is self.states.RECV_CONFIRMATION:
  181. response_size = 8
  182. if self.protocol_helper.recv(self.socket, response_size):
  183. response = self.protocol_helper.get_buffer()
  184. if response != b'RECEIVED':
  185. raise ProtocolException('Did not receive the expected message: {}'.format(response))
  186. #
  187. self.state = self.states.DONE
  188. return True
  189. #
  190. #
  191. return False
  192. #
  193. def _default_data_generator(self, index, bytes_needed):
  194. return b'0'*bytes_needed
  195. #
  196. #
  197. class PullDataProtocol(Protocol):
  198. def __init__(self, socket, use_accelerated=True):
  199. self.socket = socket
  200. self.use_accelerated = use_accelerated
  201. #
  202. self.states = enum.Enum('PULL_DATA_STATES', 'READY_TO_BEGIN RECV_INFO PULL_DATA SEND_CONFIRMATION DONE')
  203. self.state = self.states.READY_TO_BEGIN
  204. #
  205. self.data_size = None
  206. self.recv_max_bytes = None
  207. self.bytes_read = 0
  208. self.protocol_helper = None
  209. self._time_of_first_byte = None
  210. self.elapsed_time = None
  211. #
  212. def _run_iteration(self, block=True):
  213. if self.state is self.states.READY_TO_BEGIN:
  214. self.protocol_helper = ProtocolHelper()
  215. self.state = self.states.RECV_INFO
  216. #
  217. if self.state is self.states.RECV_INFO:
  218. info_size = 16
  219. if self.protocol_helper.recv(self.socket, info_size):
  220. response = self.protocol_helper.get_buffer()
  221. self.data_size = int.from_bytes(response[0:8], byteorder='big', signed=False)
  222. self.recv_max_bytes = int.from_bytes(response[8:16], byteorder='big', signed=False)
  223. self.state = self.states.PULL_DATA
  224. #
  225. #
  226. if self.state is self.states.PULL_DATA:
  227. max_block_size = self.recv_max_bytes
  228. block_size = min(max_block_size, self.data_size-self.bytes_read)
  229. #
  230. if self.use_accelerated:
  231. if not block:
  232. logging.warning('Protocol set to non-blocking, but using the blocking accelerated function.')
  233. #
  234. (ret_val, elapsed_time) = accelerated_functions.pull_data(self.socket.fileno(), self.data_size, block_size)
  235. if ret_val < 0:
  236. raise ProtocolException('Error while pulling data.')
  237. #
  238. self.bytes_read = self.data_size
  239. self.elapsed_time = elapsed_time
  240. else:
  241. data = self.socket.recv(block_size)
  242. self.bytes_read += len(data)
  243. if self.bytes_read != 0 and self._time_of_first_byte is None:
  244. self._time_of_first_byte = time.time()
  245. #
  246. if self.bytes_read == self.data_size and self.elapsed_time is None:
  247. self.elapsed_time = time.time()-self._time_of_first_byte
  248. #
  249. #
  250. if self.bytes_read == self.data_size:
  251. # finished receiving the data
  252. logging.debug('Finished receiving the data.')
  253. self.protocol_helper = ProtocolHelper()
  254. self.protocol_helper.set_buffer(b'RECEIVED')
  255. self.state = self.states.SEND_CONFIRMATION
  256. #
  257. #
  258. if self.state is self.states.SEND_CONFIRMATION:
  259. if self.protocol_helper.send(self.socket):
  260. self.state = self.states.DONE
  261. return True
  262. #
  263. #
  264. return False
  265. #
  266. def calc_transfer_rate(self):
  267. """ Returns bytes/s. """
  268. assert self.data_size is not None and self.elapsed_time is not None
  269. return self.data_size/self.elapsed_time
  270. #
  271. #
  272. class SendDataProtocol(Protocol):
  273. def __init__(self, socket, data):
  274. self.socket = socket
  275. self.send_data = data
  276. #
  277. self.states = enum.Enum('SEND_DATA_STATES', 'READY_TO_BEGIN SEND_INFO SEND_DATA RECV_CONFIRMATION DONE')
  278. self.state = self.states.READY_TO_BEGIN
  279. #
  280. self.protocol_helper = None
  281. #
  282. def _run_iteration(self, block=True):
  283. if self.state is self.states.READY_TO_BEGIN:
  284. info_size = 20
  285. info = len(self.send_data).to_bytes(info_size, byteorder='big', signed=False)
  286. self.protocol_helper = ProtocolHelper()
  287. self.protocol_helper.set_buffer(info)
  288. self.state = self.states.SEND_INFO
  289. #
  290. if self.state is self.states.SEND_INFO:
  291. if self.protocol_helper.send(self.socket):
  292. self.protocol_helper = ProtocolHelper()
  293. self.protocol_helper.set_buffer(self.send_data)
  294. self.state = self.states.SEND_DATA
  295. #
  296. #
  297. if self.state is self.states.SEND_DATA:
  298. if self.protocol_helper.send(self.socket):
  299. self.protocol_helper = ProtocolHelper()
  300. self.state = self.states.RECV_CONFIRMATION
  301. #
  302. #
  303. if self.state is self.states.RECV_CONFIRMATION:
  304. response_size = 8
  305. if self.protocol_helper.recv(self.socket, response_size):
  306. response = self.protocol_helper.get_buffer()
  307. if response != b'RECEIVED':
  308. raise ProtocolException('Did not receive the expected message: {}'.format(response))
  309. #
  310. self.state = self.states.DONE
  311. return True
  312. #
  313. #
  314. return False
  315. #
  316. #
  317. class ReceiveDataProtocol(Protocol):
  318. def __init__(self, socket):
  319. self.socket = socket
  320. #
  321. self.states = enum.Enum('RECV_DATA_STATES', 'READY_TO_BEGIN RECV_INFO RECV_DATA SEND_CONFIRMATION DONE')
  322. self.state = self.states.READY_TO_BEGIN
  323. #
  324. self.protocol_helper = None
  325. self.data_size = None
  326. self.received_data = None
  327. #
  328. def _run_iteration(self, block=True):
  329. if self.state is self.states.READY_TO_BEGIN:
  330. self.protocol_helper = ProtocolHelper()
  331. self.state = self.states.RECV_INFO
  332. #
  333. if self.state is self.states.RECV_INFO:
  334. info_size = 20
  335. if self.protocol_helper.recv(self.socket, info_size):
  336. response = self.protocol_helper.get_buffer()
  337. self.data_size = int.from_bytes(response, byteorder='big', signed=False)
  338. self.protocol_helper = ProtocolHelper()
  339. self.state = self.states.RECV_DATA
  340. #
  341. #
  342. if self.state is self.states.RECV_DATA:
  343. if self.protocol_helper.recv(self.socket, self.data_size):
  344. response = self.protocol_helper.get_buffer()
  345. self.received_data = response
  346. self.protocol_helper = ProtocolHelper()
  347. self.protocol_helper.set_buffer(b'RECEIVED')
  348. self.state = self.states.SEND_CONFIRMATION
  349. #
  350. #
  351. if self.state is self.states.SEND_CONFIRMATION:
  352. if self.protocol_helper.send(self.socket):
  353. self.state = self.states.DONE
  354. return True
  355. #
  356. #
  357. return False
  358. #
  359. #
  360. class ServerListener():
  361. "A TCP listener, binding, listening and accepting new connections."
  362. def __init__(self, endpoint, accept_callback):
  363. self.callback = accept_callback
  364. #
  365. self.s = socket.socket()
  366. self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  367. self.s.bind(endpoint)
  368. self.s.listen(0)
  369. #
  370. def accept(self):
  371. newsock, endpoint = self.s.accept()
  372. logging.debug("New client from %s:%d (fd=%d)",
  373. endpoint[0], endpoint[1], newsock.fileno())
  374. self.callback(newsock)
  375. #
  376. #
  377. class SimpleClientConnectionProtocol(Protocol):
  378. def __init__(self, endpoint, total_bytes, data_generator=None, proxy=None, username=None):
  379. self.endpoint = endpoint
  380. self.data_generator = data_generator
  381. self.total_bytes = total_bytes
  382. self.proxy = proxy
  383. self.username = username
  384. #
  385. self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN CONNECT_TO_PROXY PUSH_DATA DONE')
  386. self.state = self.states.READY_TO_BEGIN
  387. #
  388. self.socket = socket.socket()
  389. self.sub_protocol = None
  390. #
  391. if self.proxy is None:
  392. logging.debug('Socket %d connecting to endpoint %r...', self.socket.fileno(), self.endpoint)
  393. self.socket.connect(self.endpoint)
  394. else:
  395. logging.debug('Socket %d connecting to proxy %r...', self.socket.fileno(), self.proxy)
  396. self.socket.connect(self.proxy)
  397. #
  398. #
  399. def _run_iteration(self, block=True):
  400. if self.state is self.states.READY_TO_BEGIN:
  401. if self.proxy is None:
  402. self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
  403. self.state = self.states.PUSH_DATA
  404. else:
  405. self.sub_protocol = Socks4Protocol(self.socket, self.endpoint, username=self.username)
  406. self.state = self.states.CONNECT_TO_PROXY
  407. #
  408. #
  409. if self.state is self.states.CONNECT_TO_PROXY:
  410. if self.sub_protocol.run(block=block):
  411. self.sub_protocol = PushDataProtocol(self.socket, self.total_bytes, self.data_generator)
  412. self.state = self.states.PUSH_DATA
  413. #
  414. #
  415. if self.state is self.states.PUSH_DATA:
  416. if self.sub_protocol.run(block=block):
  417. self.state = self.states.DONE
  418. return True
  419. #
  420. #
  421. return False
  422. #
  423. #
  424. class SimpleServerConnectionProtocol(Protocol):
  425. def __init__(self, socket, conn_id, bandwidth_callback=None):
  426. self.socket = socket
  427. self.conn_id = conn_id
  428. self.bandwidth_callback = bandwidth_callback
  429. #
  430. self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN PULL_DATA DONE')
  431. self.state = self.states.READY_TO_BEGIN
  432. #
  433. self.sub_protocol = None
  434. #
  435. def _run_iteration(self, block=True):
  436. if self.state is self.states.READY_TO_BEGIN:
  437. self.sub_protocol = PullDataProtocol(self.socket)
  438. self.state = self.states.PULL_DATA
  439. #
  440. if self.state is self.states.PULL_DATA:
  441. if self.sub_protocol.run(block=block):
  442. self.state = self.states.DONE
  443. if self.bandwidth_callback:
  444. self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.calc_transfer_rate())
  445. #
  446. return True
  447. #
  448. #
  449. return False
  450. #
  451. #
  452. if __name__ == '__main__':
  453. import sys
  454. logging.basicConfig(level=logging.DEBUG)
  455. #
  456. if sys.argv[1] == 'client':
  457. endpoint = ('127.0.0.1', 4747)
  458. proxy = ('127.0.0.1', 9003)
  459. #proxy = None
  460. username = bytes([x for x in os.urandom(12) if x != 0])
  461. #username = None
  462. data_MB = 40
  463. #
  464. client = SimpleClientConnectionProtocol(endpoint, data_MB*2**20, proxy=proxy, username=username)
  465. client.run()
  466. elif sys.argv[1] == 'server':
  467. import multiprocessing
  468. import queue
  469. #
  470. endpoint = ('127.0.0.1', 4747)
  471. processes = []
  472. conn_counter = [0]
  473. #
  474. def bw_callback(conn_id, data_size, transfer_rate):
  475. logging.info('Avg Transferred (MB): %.4f', data_size/(1024**2))
  476. logging.info('Avg Transfer rate (MB/s): %.4f', transfer_rate/(1024**2))
  477. #
  478. def start_server_conn(socket, conn_id):
  479. server = SimpleServerConnectionProtocol(socket, conn_id, bandwidth_callback=bw_callback)
  480. try:
  481. server.run()
  482. except KeyboardInterrupt:
  483. socket.close()
  484. #
  485. #
  486. def accept_callback(socket):
  487. conn_id = conn_counter[0]
  488. conn_counter[0] += 1
  489. #
  490. p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id))
  491. processes.append(p)
  492. p.start()
  493. #
  494. l = ServerListener(endpoint, accept_callback)
  495. #
  496. try:
  497. while True:
  498. l.accept()
  499. #
  500. except KeyboardInterrupt:
  501. print()
  502. #
  503. for p in processes:
  504. p.join()
  505. #
  506. #
  507. #