test_harness.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #!/usr/bin/env python3
  2. from contextlib import contextmanager
  3. from random import Random
  4. from collections import defaultdict
  5. import numpy as np
  6. import resource
  7. import argparse
  8. import sys
  9. import os
  10. directory = os.path.expanduser('library')
  11. sys.path.insert(1, directory)
  12. from dht_common import generate_file, KNOWN_NODE
  13. from dht_simulator import DHT_Simulator
  14. from base_node import Base_Node
  15. from base_client import Base_Client
  16. from rcp_node import RCP_Quorum
  17. from rcp_client import RCP_Client
  18. from qp_node import QP_Quorum
  19. from qp_client import QP_Client
  20. from qplasthop_node import QPLastHop_Quorum
  21. from qplasthop_client import QPLastHop_Client
  22. from dhtpir_node import DHTPIR_Quorum
  23. from dhtpir_client import DHTPIR_Client
  24. ##
  25. # This functionality allows us to temporarily change our working directory
  26. #
  27. # @input newdir - the new directory (relative to our current position) we want to be in
  28. @contextmanager
  29. def cd(newDir, makeNew):
  30. prevDir = os.getcwd()
  31. directory = os.path.expanduser(newDir)
  32. if not os.path.exists(directory) and makeNew:
  33. os.makedirs(directory)
  34. os.chdir(directory)
  35. try:
  36. yield
  37. finally:
  38. os.chdir(prevDir)
  39. ##
  40. # This functionality allows us to temporarily change where stdout routes
  41. #
  42. # @input new_out - the file that stdout will get routed to temporarily
  43. @contextmanager
  44. def change_stdout(newOut):
  45. prevOut = sys.stdout
  46. sys.stdout = open(newOut, 'w')
  47. try:
  48. yield
  49. finally:
  50. sys.stdout.close()
  51. sys.stdout = prevOut
  52. def main(numDocuments, documentSize, numGroups, numNodes, nodeType, clientType, seed):
  53. cryptogen = Random(seed)
  54. testbed = DHT_Simulator(nodeType, numGroups, documentSize, numNodes)
  55. client = clientType(testbed, KNOWN_NODE, documentSize, numNodes)
  56. documentIDs = []
  57. print("Inserting files.")
  58. for i in range(numDocuments):
  59. document = generate_file(documentSize, cryptogen)
  60. documentIDs.append(client.insert_file(document))
  61. clientPubRounds = client.get_num_rounds()
  62. clientPubMessagesSent = client.get_num_messages_sent()
  63. clientPubMessagesRecv = client.get_num_messages_recv()
  64. clientPubBytesSent = client.get_num_bytes_sent()
  65. clientPubBytesRecv = client.get_num_bytes_recv()
  66. numPubRounds = []
  67. numPubMessagesSent = []
  68. numPubMessagesRecv = []
  69. numPubBytesSent = []
  70. numPubBytesRecv = []
  71. numPubNodesInSample = 0
  72. for i in range(numGroups):
  73. if nodeType != Base_Node:
  74. for j in range(numNodes):
  75. currNumRounds = testbed.get_num_rounds(i, j)
  76. currNumMessagesSent = testbed.get_num_messages_sent(i, j)
  77. currNumMessagesRecv = testbed.get_num_messages_recv(i, j)
  78. currNumBytesSent = testbed.get_num_bytes_sent(i, j)
  79. currNumBytesRecv = testbed.get_num_bytes_recv(i, j)
  80. numPubRounds.append(currNumRounds)
  81. numPubMessagesSent.append(currNumMessagesSent)
  82. numPubMessagesRecv.append(currNumMessagesRecv)
  83. numPubBytesSent.append(currNumBytesSent)
  84. numPubBytesRecv.append(currNumBytesRecv)
  85. numPubNodesInSample += 1
  86. else:
  87. currNumRounds = testbed.get_num_rounds_base(i)
  88. currNumMessagesSent = testbed.get_num_messages_sent_base(i)
  89. currNumMessagesRecv = testbed.get_num_messages_recv_base(i)
  90. currNumBytesSent = testbed.get_num_bytes_sent_base(i)
  91. currNumBytesRecv = testbed.get_num_bytes_recv_base(i)
  92. numPubRounds.append(currNumRounds)
  93. numPubMessagesSent.append(currNumMessagesSent)
  94. numPubMessagesRecv.append(currNumMessagesRecv)
  95. numPubBytesSent.append(currNumBytesSent)
  96. numPubBytesRecv.append(currNumBytesRecv)
  97. numPubNodesInSample += 1
  98. numPubRounds = np.array(numPubRounds)
  99. numPubMessagesSent = np.array(numPubMessagesSent)
  100. numPubMessagesRecv = np.array(numPubMessagesRecv)
  101. numPubBytesSent = np.array(numPubBytesSent)
  102. numPubBytesRecv = np.array(numPubBytesRecv)
  103. numPubRounds = [np.mean(numPubRounds), np.percentile(numPubRounds, 25), np.percentile(numPubRounds, 50), np.percentile(numPubRounds, 75), np.std(numPubRounds)]
  104. numPubMessagesSent = [np.mean(numPubMessagesSent), np.percentile(numPubMessagesSent, 25), np.percentile(numPubMessagesSent, 50), np.percentile(numPubMessagesSent, 75), np.std(numPubMessagesSent)]
  105. numPubMessagesRecv = [np.mean(numPubMessagesRecv), np.percentile(numPubMessagesRecv, 25), np.percentile(numPubMessagesRecv, 50), np.percentile(numPubMessagesRecv, 75), np.std(numPubMessagesRecv)]
  106. numPubBytesSent = [np.mean(numPubBytesSent), np.percentile(numPubBytesSent, 25), np.percentile(numPubBytesSent, 50), np.percentile(numPubBytesSent, 75), np.std(numPubBytesSent)]
  107. numPubBytesRecv = [np.mean(numPubBytesRecv), np.percentile(numPubBytesRecv, 25), np.percentile(numPubBytesRecv, 50), np.percentile(numPubBytesRecv, 75), np.std(numPubBytesRecv)]
  108. print("Retrieving files.")
  109. for i in range(numDocuments):
  110. client.retrieve_file(documentIDs[i])
  111. numRounds = []
  112. numMessagesSent = []
  113. numMessagesRecv = []
  114. numBytesSent = []
  115. numBytesRecv = []
  116. numNodesInSample = 0
  117. allFingerTableRangeAccesses = defaultdict(lambda: 0)
  118. allFingerTableAccesses = defaultdict(lambda: 0)
  119. allDatabaseAccesses = defaultdict(lambda: 0)
  120. allPHFGenerations = defaultdict(lambda: 0)
  121. allPIRRetrievals = defaultdict(lambda: 0)
  122. for i in range(numGroups):
  123. if nodeType != Base_Node:
  124. for j in range(numNodes):
  125. currNumRounds = testbed.get_num_rounds(i, j)
  126. currNumMessagesSent = testbed.get_num_messages_sent(i, j)
  127. currNumMessagesRecv = testbed.get_num_messages_recv(i, j)
  128. currNumBytesSent = testbed.get_num_bytes_sent(i, j)
  129. currNumBytesRecv = testbed.get_num_bytes_recv(i, j)
  130. numRounds.append(currNumRounds)
  131. numMessagesSent.append(currNumMessagesSent)
  132. numMessagesRecv.append(currNumMessagesRecv)
  133. numBytesSent.append(currNumBytesSent)
  134. numBytesRecv.append(currNumBytesRecv)
  135. numNodesInSample += 1
  136. if nodeType != RCP_Quorum:
  137. currFingerTableRangeAccesses = testbed.get_finger_table_range_accesses(i, j)
  138. for currKey in currFingerTableRangeAccesses.keys():
  139. allFingerTableRangeAccesses[currKey] += currFingerTableRangeAccesses[currKey]
  140. currFingerTableAccesses = testbed.get_finger_table_accesses(i, j)
  141. for currKey in currFingerTableAccesses.keys():
  142. allFingerTableAccesses[currKey] += currFingerTableAccesses[currKey]
  143. if nodeType == QPLastHop_Quorum:
  144. currDatabaseAccesses = testbed.get_database_accesses(i, j)
  145. for currKey in currDatabaseAccesses.keys():
  146. allDatabaseAccesses[currKey] += currDatabaseAccesses[currKey]
  147. if nodeType == DHTPIR_Quorum:
  148. currPHFGenerations = testbed.get_PHF_generations(i, j)
  149. for currKey in currPHFGenerations.keys():
  150. allPHFGenerations[currKey] += currPHFGenerations[currKey]
  151. currPIRRetrievals = testbed.get_PIR_retrievals(i, j)
  152. for currKey in currPIRRetrievals.keys():
  153. allPIRRetrievals[currKey] += currPIRRetrievals[currKey]
  154. else:
  155. currNumRounds = testbed.get_num_rounds_base(i)
  156. currNumMessagesSent = testbed.get_num_messages_sent_base(i)
  157. currNumMessagesRecv = testbed.get_num_messages_recv_base(i)
  158. currNumBytesSent = testbed.get_num_bytes_sent_base(i)
  159. currNumBytesRecv = testbed.get_num_bytes_recv_base(i)
  160. numRounds.append(currNumRounds)
  161. numMessagesSent.append(currNumMessagesSent)
  162. numMessagesRecv.append(currNumMessagesRecv)
  163. numBytesSent.append(currNumBytesSent)
  164. numBytesRecv.append(currNumBytesRecv)
  165. numNodesInSample += 1
  166. numRounds = np.array(numRounds)
  167. numMessagesSent = np.array(numMessagesSent)
  168. numMessagesRecv = np.array(numMessagesRecv)
  169. numBytesSent = np.array(numBytesSent)
  170. numBytesRecv = np.array(numBytesRecv)
  171. numRounds = [np.mean(numRounds), np.percentile(numRounds, 25), np.percentile(numRounds, 50), np.percentile(numRounds, 75), np.std(numRounds)]
  172. numMessagesSent = [np.mean(numMessagesSent), np.percentile(numMessagesSent, 25), np.percentile(numMessagesSent, 50), np.percentile(numMessagesSent, 75), np.std(numMessagesSent)]
  173. numMessagesRecv = [np.mean(numMessagesRecv), np.percentile(numMessagesRecv, 25), np.percentile(numMessagesRecv, 50), np.percentile(numMessagesRecv, 75), np.std(numMessagesRecv)]
  174. numBytesSent = [np.mean(numBytesSent), np.percentile(numBytesSent, 25), np.percentile(numBytesSent, 50), np.percentile(numBytesSent, 75), np.std(numBytesSent)]
  175. numBytesRecv = [np.mean(numBytesRecv), np.percentile(numBytesRecv, 25), np.percentile(numBytesRecv, 50), np.percentile(numBytesRecv, 75), np.std(numBytesRecv)]
  176. with cd('../outputs/' + nodeType.__name__ + '/' + str(numGroups) + '/' + str(numNodes) + '/' + str(numDocuments) + '/' + seed, True):
  177. with change_stdout('avg_node.out'):
  178. output = str(numNodesInSample) + "\n"
  179. output += ",".join(map(lambda x: str(x), numRounds))
  180. output += "\n"
  181. output += ",".join(map(lambda x: str(x), numMessagesSent))
  182. output += "\n"
  183. output += ",".join(map(lambda x: str(x), numMessagesRecv))
  184. output += "\n"
  185. output += ",".join(map(lambda x: str(x), numBytesSent))
  186. output += "\n"
  187. output += ",".join(map(lambda x: str(x), numBytesRecv))
  188. output += "\n"
  189. print(output)
  190. with change_stdout('client.out'):
  191. currNumRounds = client.get_num_rounds()
  192. currNumMessagesSent = client.get_num_messages_sent()
  193. currNumMessagesRecv = client.get_num_messages_recv()
  194. currNumBytesSent = client.get_num_bytes_sent()
  195. currNumBytesRecv = client.get_num_bytes_recv()
  196. output = ",".join(map(lambda x: str(x), [currNumRounds, currNumMessagesSent, currNumMessagesRecv, currNumBytesSent, currNumBytesRecv]))
  197. print(output)
  198. with change_stdout('avg_node_pub.out'):
  199. output = str(numPubNodesInSample) + "\n"
  200. output += ",".join(map(lambda x: str(x), numPubRounds))
  201. output += "\n"
  202. output += ",".join(map(lambda x: str(x), numPubMessagesSent))
  203. output += "\n"
  204. output += ",".join(map(lambda x: str(x), numPubMessagesRecv))
  205. output += "\n"
  206. output += ",".join(map(lambda x: str(x), numPubBytesSent))
  207. output += "\n"
  208. output += ",".join(map(lambda x: str(x), numPubBytesRecv))
  209. output += "\n"
  210. print(output)
  211. with change_stdout('client_pub.out'):
  212. output = ",".join(map(lambda x: str(x), [clientPubRounds, clientPubMessagesSent, clientPubMessagesRecv, clientPubBytesSent, clientPubBytesRecv]))
  213. print(output)
  214. with change_stdout('usage.out'):
  215. resources_log = resource.getrusage(resource.RUSAGE_SELF)
  216. maxmemmib = resources_log.ru_maxrss/1024
  217. usertime = resources_log.ru_utime
  218. systime = resources_log.ru_stime
  219. output = ",".join(map(lambda x: str(x), [maxmemmib, usertime, systime]))
  220. print(output)
  221. if nodeType == QP_Quorum or nodeType == QPLastHop_Quorum or nodeType == DHTPIR_Quorum:
  222. with change_stdout('client_latency.out'):
  223. print("FT Range Accesses")
  224. currFingerTableRangeAccesses = client.get_finger_table_range_accesses()
  225. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableRangeAccesses.items())))
  226. print("FT Direct Accesses")
  227. currFingerTableAccesses = client.get_finger_table_accesses()
  228. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableAccesses.items())))
  229. if nodeType == QPLastHop_Quorum:
  230. print("Database OT Accesses")
  231. currDatabaseAccesses = client.get_database_accesses()
  232. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currDatabaseAccesses.items())))
  233. if nodeType == DHTPIR_Quorum:
  234. print("PIR Retrievals")
  235. currPIRRetrievals = client.get_PIR_retrievals()
  236. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currPIRRetrievals.items())))
  237. with change_stdout('all_node_calculations.out'):
  238. print("FT Range Accesses")
  239. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableRangeAccesses.items())))
  240. print("FT Direct Accesses")
  241. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableAccesses.items())))
  242. if nodeType == QPLastHop_Quorum:
  243. print("Database OT Accesses")
  244. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allDatabaseAccesses.items())))
  245. if nodeType == DHTPIR_Quorum:
  246. print("PHF Generations")
  247. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPHFGenerations.items())))
  248. print("PIR Retrievals")
  249. print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPIRRetrievals.items())))
  250. if __name__ == "__main__":
  251. parser = argparse.ArgumentParser(description="Experiment harness for DHTPIR")
  252. parser.add_argument('numDocuments', metavar="numDocuments", type=int, help="The number of documents in the experiment")
  253. parser.add_argument('sizeOfDocuments', metavar="sizeOfDocuments", type=int, help="The size of the documents in the experiment")
  254. parser.add_argument('numGroups', metavar="numGroups", type=int, help="The number of groups in the experiment")
  255. parser.add_argument('numNodes', metavar="numNodes", type=int, help="The number of nodes per group in the experiment (not used for Base Nodes)")
  256. parser.add_argument('-b', action='store_true', help="Use Base Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
  257. parser.add_argument('-r', action='store_true', help="Use RCP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
  258. parser.add_argument('-q', action='store_true', help="Use QP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
  259. parser.add_argument('-l', action='store_true', help="Use QP Nodes with last hop OT in the experiment (if not set, defaults to DHTPIR Nodes)")
  260. parser.add_argument('-d', action='store_true', help="Use DHTPIR Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
  261. parser.add_argument('--seed', help="Set the seed for the file generation in this run.")
  262. args = parser.parse_args()
  263. numNodes = 4
  264. if args.numNodes >= 4:
  265. numNodes = args.numNodes
  266. numGroups = args.numGroups
  267. if args.d:
  268. nodeType = DHTPIR_Quorum
  269. clientType = DHTPIR_Client
  270. elif args.l:
  271. nodeType = QPLastHop_Quorum
  272. clientType = QPLastHop_Client
  273. elif args.q:
  274. nodeType = QP_Quorum
  275. clientType = QP_Client
  276. elif args.r:
  277. nodeType = RCP_Quorum
  278. clientType = RCP_Client
  279. elif args.b:
  280. nodeType = Base_Node
  281. clientType = Base_Client
  282. numGroups *= numNodes
  283. numNodes = 1
  284. else:
  285. nodeType = DHTPIR_Quorum
  286. clientType = DHTPIR_Client
  287. seed = ""
  288. if args.seed:
  289. seed = args.seed
  290. main(args.numDocuments, args.sizeOfDocuments, numGroups, numNodes, nodeType, clientType, seed)