rcp_node.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. from dht_common import SIZE_OF_HASH, SIZE_OF_IP_ADDRESS, SIZE_OF_KEY, SIZE_OF_SIGNATURE, SIZE_OF_TIMESTAMP
  2. from base_node import Base_Node
  3. class RCP_Quorum(Base_Node):
  4. def __init__(self, quorumID, documentSize, numNodes, numItems=0, table=[]):
  5. Base_Node.__init__(self, quorumID, documentSize, numItems, table)
  6. self.numNodes = numNodes
  7. self.nodeNumRounds = [0 for i in range(self.numNodes)]
  8. self.nodeNumMessagesSent = [0 for i in range(self.numNodes)]
  9. self.nodeNumMessagesRecv = [0 for i in range(self.numNodes)]
  10. self.nodeNumBytesSent = [0 for i in range(self.numNodes)]
  11. self.nodeNumBytesRecv = [0 for i in range(self.numNodes)]
  12. self.nodeLastNumRounds = [0 for i in range(self.numNodes)]
  13. self.nodeLastNumMessagesSent = [0 for i in range(self.numNodes)]
  14. self.nodeLastNumMessagesRecv = [0 for i in range(self.numNodes)]
  15. self.nodeLastNumBytesSent = [0 for i in range(self.numNodes)]
  16. self.nodeLastNumBytesRecv = [0 for i in range(self.numNodes)]
  17. def insert(self, numKeys, numSignatures):
  18. self.numItems += 1
  19. # Asker's ID
  20. sizeOfRequest = SIZE_OF_HASH
  21. # timestamp
  22. sizeOfRequest += SIZE_OF_TIMESTAMP
  23. # keys in request
  24. sizeOfRequest += SIZE_OF_KEY * numKeys
  25. # signatures in request
  26. sizeOfRequest += SIZE_OF_SIGNATURE * numSignatures
  27. # actual document sent
  28. sizeOfRequest += self.documentSize
  29. # signature over whole thing
  30. sizeOfRequest += SIZE_OF_SIGNATURE
  31. sizeOfResponse = SIZE_OF_HASH + SIZE_OF_SIGNATURE
  32. map(lambda x: x+1, self.nodeNumRounds)
  33. map(lambda x: x+1, self.nodeNumMessagesSent)
  34. map(lambda x: x+1, self.nodeNumMessagesRecv)
  35. map(lambda x: x+sizeOfResponse, self.nodeNumBytesSent)
  36. map(lambda x: x+sizeOfRequest, self.nodeNumBytesRecv)
  37. # This is simulating some extra stuff (signing) so that the correct size is recorded
  38. def retrieve(self, whichNode, numKeys, numSignatures):
  39. # Asker's ID
  40. sizeOfRequest = SIZE_OF_HASH
  41. # timestamp
  42. sizeOfRequest += SIZE_OF_TIMESTAMP
  43. # keys in request
  44. sizeOfRequest += SIZE_OF_KEY * numKeys
  45. # signatures in request
  46. sizeOfRequest += SIZE_OF_SIGNATURE * numSignatures
  47. # actual hash requested
  48. sizeOfRequest += SIZE_OF_HASH
  49. # signature over whole thing
  50. sizeOfRequest += SIZE_OF_SIGNATURE
  51. sizeOfResponse = self.documentSize + SIZE_OF_SIGNATURE
  52. self.nodeNumRounds[whichNode] += 1
  53. self.nodeNumMessagesSent[whichNode] += 1
  54. self.nodeNumMessagesRecv[whichNode] += 1
  55. self.nodeNumBytesSent[whichNode] += sizeOfResponse
  56. self.nodeNumBytesRecv[whichNode] += sizeOfRequest
  57. def get_finger_table_val(self, whichNode, searchID, numKeys, numSignatures):
  58. retval = Base_Node.get_finger_table_val(self, searchID)
  59. # key of previous group
  60. sizeOfResponse = SIZE_OF_KEY
  61. # next groups ID/routing information
  62. sizeOfResponse += SIZE_OF_IP_ADDRESS * self.numNodes + SIZE_OF_HASH
  63. # key of next group
  64. sizeOfResponse += SIZE_OF_KEY
  65. # sign the whole thing
  66. sizeOfResponse += SIZE_OF_SIGNATURE
  67. # Asker's ID
  68. sizeOfRequest = SIZE_OF_HASH
  69. # timestamp
  70. sizeOfRequest += SIZE_OF_TIMESTAMP
  71. # Keys in request
  72. sizeOfRequest += SIZE_OF_KEY * numKeys
  73. # signatures in request
  74. sizeOfRequest += SIZE_OF_SIGNATURE * numSignatures
  75. # ID being searched for
  76. sizeOfRequest += SIZE_OF_HASH
  77. self.nodeNumRounds[whichNode] += 1
  78. self.nodeNumMessagesSent[whichNode] += 1
  79. self.nodeNumMessagesRecv[whichNode] += 1
  80. self.nodeNumBytesSent[whichNode] += sizeOfResponse
  81. self.nodeNumBytesRecv[whichNode] += sizeOfRequest
  82. return retval
  83. def get_first_auth(self, whichNode):
  84. # Asker's ID
  85. sizeOfRequest = SIZE_OF_HASH
  86. # timestamp
  87. sizeOfRequest += SIZE_OF_TIMESTAMP
  88. # response is just the whole thing signed
  89. sizeOfResponse = sizeOfRequest + SIZE_OF_SIGNATURE
  90. self.nodeNumRounds[whichNode] += 1
  91. self.nodeNumMessagesSent[whichNode] += 1
  92. self.nodeNumMessagesRecv[whichNode] += 1
  93. self.nodeNumBytesSent[whichNode] += sizeOfResponse
  94. self.nodeNumBytesRecv[whichNode] += sizeOfRequest
  95. return None
  96. def get_num_nodes(self):
  97. return self.numNodes
  98. def get_num_rounds(self, whichNode):
  99. self.nodeLastNumRounds[whichNode] = self.nodeNumRounds[whichNode]
  100. return self.nodeNumRounds[whichNode]
  101. def get_recent_num_rounds(self, whichNode):
  102. retval = self.nodeNumRounds[whichNode] - self.nodeLastNumRounds[whichNode]
  103. self.nodeLastNumRounds[whichNode] = self.nodeNumRounds[whichNode]
  104. return retval
  105. def get_num_messages_sent(self, whichNode):
  106. self.nodeLastNumMessagesSent[whichNode] = self.nodeNumMessagesSent[whichNode]
  107. return self.nodeNumMessagesSent[whichNode]
  108. def get_recent_num_messages_sent(self, whichNode):
  109. retval = self.nodeNumMessagesSent[whichNode] - self.nodeLastNumMessagesSent[whichNode]
  110. self.nodeLastNumMessagesSent[whichNode] = self.nodeNumMessagesSent[whichNode]
  111. return retval
  112. def get_num_messages_recv(self, whichNode):
  113. self.nodeLastNumMessagesRecv[whichNode] = self.nodeNumMessagesRecv[whichNode]
  114. return self.nodeNumMessagesRecv[whichNode]
  115. def get_recent_num_messages_recv(self, whichNode):
  116. retval = self.nodeNumMessagesRecv[whichNode] - self.nodeLastNumMessagesRecv[whichNode]
  117. self.nodeLastNumMessagesRecv[whichNode] = self.nodeNumMessagesRecv[whichNode]
  118. return retval
  119. def get_num_bytes_sent(self, whichNode):
  120. self.nodeLastNumBytesSent[whichNode] = self.nodeNumBytesSent[whichNode]
  121. return self.nodeNumBytesSent[whichNode]
  122. def get_recent_num_bytes_sent(self, whichNode):
  123. retval = self.nodeNumBytesSent[whichNode] - self.nodeLastNumBytesSent[whichNode]
  124. self.nodeLastNumBytesSent[whichNode] = self.nodeNumBytesSent[whichNode]
  125. return retval
  126. def get_num_bytes_recv(self, whichNode):
  127. self.nodeLastNumBytesRecv[whichNode] = self.nodeNumBytesRecv[whichNode]
  128. return self.nodeNumBytesRecv[whichNode]
  129. def get_recent_num_bytes_recv(self, whichNode):
  130. retval = self.nodeNumBytesRecv[whichNode] - self.nodeLastNumBytesRecv[whichNode]
  131. self.nodeLastNumBytesRecv[whichNode] = self.nodeNumBytesRecv[whichNode]
  132. return retval
  133. # TODO: Add unit tests for size calculations
  134. # TODO: Add unit test to make sure finger_table_val is still firing correctly?
  135. if __name__ == "__main__":
  136. SIZE_OF_DOCUMENTS_IN_TEST = 1024
  137. NUM_NODES_PER_QUORUM_IN_TEST = 10
  138. test = RCP_Quorum(0, SIZE_OF_DOCUMENTS_IN_TEST, NUM_NODES_PER_QUORUM_IN_TEST)
  139. randomNode = 0
  140. [test.insert() for x in range(NUM_NODES_PER_QUORUM_IN_TEST)]
  141. [test.retrieve(randomNode) for x in range(NUM_NODES_PER_QUORUM_IN_TEST)]
  142. print("Insert and retrieval on node 0 fires correctly.")
  143. [test.retrieve(x) for x in range(NUM_NODES_PER_QUORUM_IN_TEST)]
  144. print("Retrieval fires on all nodes correctly.")