throughput_protocols.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #!/usr/bin/python3
  2. #
  3. import basic_protocols
  4. import logging
  5. import enum
  6. import time
  7. import socket
  8. #
  9. class ClientProtocol(basic_protocols.Protocol):
  10. def __init__(self, socket, total_bytes, wait_until=None, send_buffer_len=None, use_acceleration=None):
  11. self.socket = socket
  12. self.total_bytes = total_bytes
  13. self.wait_until = wait_until
  14. self.send_buffer_len = send_buffer_len
  15. self.use_acceleration = use_acceleration
  16. #
  17. self.states = enum.Enum('CLIENT_CONN_STATES', 'READY_TO_BEGIN SEND_GROUP_ID WAIT PUSH_DATA DONE')
  18. self.state = self.states.READY_TO_BEGIN
  19. #
  20. self.sub_protocol = None
  21. self.group_id = int(self.wait_until*1000) if self.wait_until is not None else 0
  22. # a group id of 0 means no group
  23. #
  24. def _run_iteration(self):
  25. if self.state is self.states.READY_TO_BEGIN:
  26. group_id_bytes = self.group_id.to_bytes(8, byteorder='big', signed=False)
  27. self.sub_protocol = basic_protocols.SendDataProtocol(self.socket, group_id_bytes)
  28. self.state = self.states.SEND_GROUP_ID
  29. #
  30. if self.state is self.states.SEND_GROUP_ID:
  31. if self.sub_protocol.run():
  32. self.state = self.states.WAIT
  33. #
  34. #
  35. if self.state is self.states.WAIT:
  36. if self.wait_until is not None:
  37. time.sleep(self.wait_until-time.time())
  38. #
  39. if self.wait_until is None or time.time() >= self.wait_until:
  40. self.sub_protocol = basic_protocols.PushDataProtocol(self.socket, self.total_bytes,
  41. send_buffer_len=self.send_buffer_len,
  42. use_acceleration=self.use_acceleration)
  43. self.state = self.states.PUSH_DATA
  44. #
  45. #
  46. if self.state is self.states.PUSH_DATA:
  47. if self.sub_protocol.run():
  48. self.state = self.states.DONE
  49. #
  50. #
  51. if self.state is self.states.DONE:
  52. return True
  53. #
  54. return False
  55. #
  56. #
  57. class ServerProtocol(basic_protocols.Protocol):
  58. def __init__(self, socket, conn_id, group_id_callback=None, bandwidth_callback=None, use_acceleration=None):
  59. self.socket = socket
  60. self.conn_id = conn_id
  61. self.group_id_callback = group_id_callback
  62. self.bandwidth_callback = bandwidth_callback
  63. self.use_acceleration = use_acceleration
  64. #
  65. self.states = enum.Enum('SERVER_CONN_STATES', 'READY_TO_BEGIN RECV_GROUP_ID PULL_DATA DONE')
  66. self.state = self.states.READY_TO_BEGIN
  67. #
  68. self.sub_protocol = None
  69. #
  70. def _run_iteration(self):
  71. if self.state is self.states.READY_TO_BEGIN:
  72. self.sub_protocol = basic_protocols.ReceiveDataProtocol(self.socket)
  73. self.state = self.states.RECV_GROUP_ID
  74. #
  75. if self.state is self.states.RECV_GROUP_ID:
  76. if self.sub_protocol.run():
  77. group_id = int.from_bytes(self.sub_protocol.received_data, byteorder='big', signed=False)
  78. if group_id == 0:
  79. # a group of 0 means no group
  80. group_id = None
  81. #
  82. self.group_id_callback(self.conn_id, group_id)
  83. self.sub_protocol = basic_protocols.PullDataProtocol(self.socket, use_acceleration=self.use_acceleration)
  84. self.state = self.states.PULL_DATA
  85. #
  86. #
  87. if self.state is self.states.PULL_DATA:
  88. if self.sub_protocol.run():
  89. if self.bandwidth_callback:
  90. self.bandwidth_callback(self.conn_id, self.sub_protocol.data_size, self.sub_protocol.time_of_first_byte, self.sub_protocol.time_of_last_byte, self.sub_protocol.calc_transfer_rate())
  91. #
  92. self.state = self.states.DONE
  93. #
  94. #
  95. if self.state is self.states.DONE:
  96. return True
  97. #
  98. return False
  99. #
  100. #