throughput_protocols.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #!/usr/bin/python3
  2. #
  3. import basic_protocols
  4. import logging
  5. import enum
  6. import time
  7. import socket
  8. #
  9. class ClientProtocol(basic_protocols.Protocol):
  10. def __init__(self, socket, total_bytes, group_id=None, send_buffer_len=None, use_acceleration=None, custom_data=b'', push_start_cb=None, push_done_cb=None): #wait_until=None
  11. self.socket = socket
  12. self.total_bytes = total_bytes
  13. #self.wait_until = wait_until
  14. self.send_buffer_len = send_buffer_len
  15. self.use_acceleration = use_acceleration
  16. self.custom_data = custom_data
  17. self.push_start_cb = push_start_cb
  18. self.push_done_cb = push_done_cb
  19. self.group_id = group_id if group_id is not None else 0
  20. # a group id of 0 means no group
  21. #
  22. self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_GROUP_ID SEND_CUSTOM_DATA PUSH_DATA DONE') #WAIT
  23. self.state = self.states.READY_TO_BEGIN
  24. #
  25. self.sub_protocol = None
  26. #
  27. def _run_iteration(self):
  28. if self.state is self.states.READY_TO_BEGIN:
  29. group_id_bytes = self.group_id.to_bytes(8, byteorder='big', signed=False)
  30. self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, group_id_bytes)
  31. self.state = self.states.SEND_GROUP_ID
  32. #
  33. if self.state is self.states.SEND_GROUP_ID:
  34. if self.sub_protocol.run():
  35. self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, self.custom_data)
  36. self.state = self.states.SEND_CUSTOM_DATA
  37. #
  38. #
  39. if self.state is self.states.SEND_CUSTOM_DATA:
  40. if self.sub_protocol.run():
  41. #self.state = self.states.WAIT
  42. self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
  43. send_buffer_len=self.send_buffer_len,
  44. use_acceleration=self.use_acceleration,
  45. push_start_cb=self.push_start_cb,
  46. push_done_cb=self.push_done_cb)
  47. self.state = self.states.PUSH_DATA
  48. #
  49. #
  50. '''
  51. if self.state is self.states.WAIT:
  52. if self.wait_until is not None:
  53. time.sleep(self.wait_until-time.time())
  54. #
  55. if self.wait_until is None or time.time() >= self.wait_until:
  56. self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
  57. send_buffer_len=self.send_buffer_len,
  58. use_acceleration=self.use_acceleration,
  59. push_start_cb=self.push_start_cb,
  60. push_done_cb=self.push_done_cb)
  61. self.state = self.states.PUSH_DATA
  62. #
  63. #
  64. '''
  65. if self.state is self.states.PUSH_DATA:
  66. if self.sub_protocol.run():
  67. self.state = self.states.DONE
  68. #
  69. #
  70. if self.state is self.states.DONE:
  71. return True
  72. #
  73. return False
  74. #
  75. #
  76. class ServerProtocol(basic_protocols.Protocol):
  77. def __init__(self, socket, conn_id, group_id_callback=None, bandwidth_callback=None, use_acceleration=None):
  78. self.socket = socket
  79. self.conn_id = conn_id
  80. self.group_id_callback = group_id_callback
  81. self.bandwidth_callback = bandwidth_callback
  82. self.use_acceleration = use_acceleration
  83. #
  84. self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID RECV_CUSTOM_DATA PULL_DATA DONE')
  85. self.state = self.states.READY_TO_BEGIN
  86. #
  87. self.sub_protocol = None
  88. self.custom_data = None
  89. #
  90. def _run_iteration(self):
  91. if self.state is self.states.READY_TO_BEGIN:
  92. self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
  93. self.state = self.states.RECV_GROUP_ID
  94. #
  95. if self.state is self.states.RECV_GROUP_ID:
  96. if self.sub_protocol.run():
  97. group_id = int.from_bytes(self.sub_protocol.received_data, byteorder='big', signed=False)
  98. if group_id == 0:
  99. # a group of 0 means no group
  100. group_id = None
  101. #
  102. self.group_id_callback(self.conn_id, group_id)
  103. self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
  104. self.state = self.states.RECV_CUSTOM_DATA
  105. #
  106. #
  107. if self.state is self.states.RECV_CUSTOM_DATA:
  108. if self.sub_protocol.run():
  109. self.custom_data = self.sub_protocol.received_data
  110. #
  111. self.sub_protocol = basic_protocols.PullDataProtocol(self.socket, use_acceleration=self.use_acceleration)
  112. self.state = self.states.PULL_DATA
  113. #
  114. #
  115. if self.state is self.states.PULL_DATA:
  116. if self.sub_protocol.run():
  117. if self.bandwidth_callback:
  118. #self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate(), self.sub_protocol.byte_counter, self.sub_protocol.byte_counter_start_time)
  119. self.bandwidth_callback(self.conn_id, self.custom_data, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate(), self.sub_protocol.deltas)
  120. #
  121. self.state = self.states.DONE
  122. #
  123. #
  124. if self.state is self.states.DONE:
  125. return True
  126. #
  127. return False
  128. #
  129. #