123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- #!/usr/bin/env python3
- from contextlib import contextmanager
- from random import Random
- from collections import defaultdict
- import numpy as np
- import resource
- import argparse
- import sys
- import os
- directory = os.path.expanduser('library')
- sys.path.insert(1, directory)
- from dht_common import generate_file, KNOWN_NODE
- from dht_simulator import DHT_Simulator
- from base_node import Base_Node
- from base_client import Base_Client
- from rcp_node import RCP_Quorum
- from rcp_client import RCP_Client
- from qp_node import QP_Quorum
- from qp_client import QP_Client
- from qplasthop_node import QPLastHop_Quorum
- from qplasthop_client import QPLastHop_Client
- from dhtpir_node import DHTPIR_Quorum
- from dhtpir_client import DHTPIR_Client
- ##
- # This functionality allows us to temporarily change our working directory
- #
- # @input newdir - the new directory (relative to our current position) we want to be in
- @contextmanager
- def cd(newDir, makeNew):
- prevDir = os.getcwd()
- directory = os.path.expanduser(newDir)
- if not os.path.exists(directory) and makeNew:
- os.makedirs(directory)
- os.chdir(directory)
- try:
- yield
- finally:
- os.chdir(prevDir)
- ##
- # This functionality allows us to temporarily change where stdout routes
- #
- # @input new_out - the file that stdout will get routed to temporarily
- @contextmanager
- def change_stdout(newOut):
- prevOut = sys.stdout
- sys.stdout = open(newOut, 'w')
- try:
- yield
- finally:
- sys.stdout.close()
- sys.stdout = prevOut
- def main(numDocuments, documentSize, numGroups, numNodes, nodeType, clientType, seed):
- cryptogen = Random(seed)
- testbed = DHT_Simulator(nodeType, numGroups, documentSize, numNodes)
- client = clientType(testbed, KNOWN_NODE, documentSize, numNodes)
- documentIDs = []
- print("Inserting files.")
- for i in range(numDocuments):
- document = generate_file(documentSize, cryptogen)
- documentIDs.append(client.insert_file(document))
- clientPubRounds = client.get_num_rounds()
- clientPubMessagesSent = client.get_num_messages_sent()
- clientPubMessagesRecv = client.get_num_messages_recv()
- clientPubBytesSent = client.get_num_bytes_sent()
- clientPubBytesRecv = client.get_num_bytes_recv()
- numPubRounds = []
- numPubMessagesSent = []
- numPubMessagesRecv = []
- numPubBytesSent = []
- numPubBytesRecv = []
- numPubNodesInSample = 0
- for i in range(numGroups):
- if nodeType != Base_Node:
- for j in range(numNodes):
- currNumRounds = testbed.get_num_rounds(i, j)
- currNumMessagesSent = testbed.get_num_messages_sent(i, j)
- currNumMessagesRecv = testbed.get_num_messages_recv(i, j)
- currNumBytesSent = testbed.get_num_bytes_sent(i, j)
- currNumBytesRecv = testbed.get_num_bytes_recv(i, j)
- numPubRounds.append(currNumRounds)
- numPubMessagesSent.append(currNumMessagesSent)
- numPubMessagesRecv.append(currNumMessagesRecv)
- numPubBytesSent.append(currNumBytesSent)
- numPubBytesRecv.append(currNumBytesRecv)
- numPubNodesInSample += 1
- else:
- currNumRounds = testbed.get_num_rounds_base(i)
- currNumMessagesSent = testbed.get_num_messages_sent_base(i)
- currNumMessagesRecv = testbed.get_num_messages_recv_base(i)
- currNumBytesSent = testbed.get_num_bytes_sent_base(i)
- currNumBytesRecv = testbed.get_num_bytes_recv_base(i)
-
- numPubRounds.append(currNumRounds)
- numPubMessagesSent.append(currNumMessagesSent)
- numPubMessagesRecv.append(currNumMessagesRecv)
- numPubBytesSent.append(currNumBytesSent)
- numPubBytesRecv.append(currNumBytesRecv)
- numPubNodesInSample += 1
- numPubRounds = np.array(numPubRounds)
- numPubMessagesSent = np.array(numPubMessagesSent)
- numPubMessagesRecv = np.array(numPubMessagesRecv)
- numPubBytesSent = np.array(numPubBytesSent)
- numPubBytesRecv = np.array(numPubBytesRecv)
-
- numPubRounds = [np.mean(numPubRounds), np.percentile(numPubRounds, 25), np.percentile(numPubRounds, 50), np.percentile(numPubRounds, 75), np.std(numPubRounds)]
- numPubMessagesSent = [np.mean(numPubMessagesSent), np.percentile(numPubMessagesSent, 25), np.percentile(numPubMessagesSent, 50), np.percentile(numPubMessagesSent, 75), np.std(numPubMessagesSent)]
- numPubMessagesRecv = [np.mean(numPubMessagesRecv), np.percentile(numPubMessagesRecv, 25), np.percentile(numPubMessagesRecv, 50), np.percentile(numPubMessagesRecv, 75), np.std(numPubMessagesRecv)]
- numPubBytesSent = [np.mean(numPubBytesSent), np.percentile(numPubBytesSent, 25), np.percentile(numPubBytesSent, 50), np.percentile(numPubBytesSent, 75), np.std(numPubBytesSent)]
- numPubBytesRecv = [np.mean(numPubBytesRecv), np.percentile(numPubBytesRecv, 25), np.percentile(numPubBytesRecv, 50), np.percentile(numPubBytesRecv, 75), np.std(numPubBytesRecv)]
- print("Retrieving files.")
- for i in range(numDocuments):
- client.retrieve_file(documentIDs[i])
- numRounds = []
- numMessagesSent = []
- numMessagesRecv = []
- numBytesSent = []
- numBytesRecv = []
- numNodesInSample = 0
- allFingerTableRangeAccesses = defaultdict(lambda: 0)
- allFingerTableAccesses = defaultdict(lambda: 0)
- allDatabaseAccesses = defaultdict(lambda: 0)
- allPHFGenerations = defaultdict(lambda: 0)
- allPIRRetrievals = defaultdict(lambda: 0)
- for i in range(numGroups):
- if nodeType != Base_Node:
- for j in range(numNodes):
- currNumRounds = testbed.get_num_rounds(i, j)
- currNumMessagesSent = testbed.get_num_messages_sent(i, j)
- currNumMessagesRecv = testbed.get_num_messages_recv(i, j)
- currNumBytesSent = testbed.get_num_bytes_sent(i, j)
- currNumBytesRecv = testbed.get_num_bytes_recv(i, j)
- numRounds.append(currNumRounds)
- numMessagesSent.append(currNumMessagesSent)
- numMessagesRecv.append(currNumMessagesRecv)
- numBytesSent.append(currNumBytesSent)
- numBytesRecv.append(currNumBytesRecv)
- numNodesInSample += 1
- if nodeType != RCP_Quorum:
- currFingerTableRangeAccesses = testbed.get_finger_table_range_accesses(i, j)
- for currKey in currFingerTableRangeAccesses.keys():
- allFingerTableRangeAccesses[currKey] += currFingerTableRangeAccesses[currKey]
-
- currFingerTableAccesses = testbed.get_finger_table_accesses(i, j)
- for currKey in currFingerTableAccesses.keys():
- allFingerTableAccesses[currKey] += currFingerTableAccesses[currKey]
- if nodeType == QPLastHop_Quorum:
- currDatabaseAccesses = testbed.get_database_accesses(i, j)
- for currKey in currDatabaseAccesses.keys():
- allDatabaseAccesses[currKey] += currDatabaseAccesses[currKey]
- if nodeType == DHTPIR_Quorum:
- currPHFGenerations = testbed.get_PHF_generations(i, j)
- for currKey in currPHFGenerations.keys():
- allPHFGenerations[currKey] += currPHFGenerations[currKey]
-
- currPIRRetrievals = testbed.get_PIR_retrievals(i, j)
- for currKey in currPIRRetrievals.keys():
- allPIRRetrievals[currKey] += currPIRRetrievals[currKey]
- else:
- currNumRounds = testbed.get_num_rounds_base(i)
- currNumMessagesSent = testbed.get_num_messages_sent_base(i)
- currNumMessagesRecv = testbed.get_num_messages_recv_base(i)
- currNumBytesSent = testbed.get_num_bytes_sent_base(i)
- currNumBytesRecv = testbed.get_num_bytes_recv_base(i)
-
- numRounds.append(currNumRounds)
- numMessagesSent.append(currNumMessagesSent)
- numMessagesRecv.append(currNumMessagesRecv)
- numBytesSent.append(currNumBytesSent)
- numBytesRecv.append(currNumBytesRecv)
- numNodesInSample += 1
- numRounds = np.array(numRounds)
- numMessagesSent = np.array(numMessagesSent)
- numMessagesRecv = np.array(numMessagesRecv)
- numBytesSent = np.array(numBytesSent)
- numBytesRecv = np.array(numBytesRecv)
-
- numRounds = [np.mean(numRounds), np.percentile(numRounds, 25), np.percentile(numRounds, 50), np.percentile(numRounds, 75), np.std(numRounds)]
- numMessagesSent = [np.mean(numMessagesSent), np.percentile(numMessagesSent, 25), np.percentile(numMessagesSent, 50), np.percentile(numMessagesSent, 75), np.std(numMessagesSent)]
- numMessagesRecv = [np.mean(numMessagesRecv), np.percentile(numMessagesRecv, 25), np.percentile(numMessagesRecv, 50), np.percentile(numMessagesRecv, 75), np.std(numMessagesRecv)]
- numBytesSent = [np.mean(numBytesSent), np.percentile(numBytesSent, 25), np.percentile(numBytesSent, 50), np.percentile(numBytesSent, 75), np.std(numBytesSent)]
- numBytesRecv = [np.mean(numBytesRecv), np.percentile(numBytesRecv, 25), np.percentile(numBytesRecv, 50), np.percentile(numBytesRecv, 75), np.std(numBytesRecv)]
- with cd('../outputs/' + nodeType.__name__ + '/' + str(numGroups) + '/' + str(numNodes) + '/' + str(numDocuments) + '/' + seed, True):
- with change_stdout('avg_node.out'):
- output = str(numNodesInSample) + "\n"
- output += ",".join(map(lambda x: str(x), numRounds))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numMessagesSent))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numMessagesRecv))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numBytesSent))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numBytesRecv))
- output += "\n"
- print(output)
- with change_stdout('client.out'):
- currNumRounds = client.get_num_rounds()
- currNumMessagesSent = client.get_num_messages_sent()
- currNumMessagesRecv = client.get_num_messages_recv()
- currNumBytesSent = client.get_num_bytes_sent()
- currNumBytesRecv = client.get_num_bytes_recv()
- output = ",".join(map(lambda x: str(x), [currNumRounds, currNumMessagesSent, currNumMessagesRecv, currNumBytesSent, currNumBytesRecv]))
- print(output)
- with change_stdout('avg_node_pub.out'):
- output = str(numPubNodesInSample) + "\n"
- output += ",".join(map(lambda x: str(x), numPubRounds))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numPubMessagesSent))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numPubMessagesRecv))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numPubBytesSent))
- output += "\n"
- output += ",".join(map(lambda x: str(x), numPubBytesRecv))
- output += "\n"
- print(output)
- with change_stdout('client_pub.out'):
- output = ",".join(map(lambda x: str(x), [clientPubRounds, clientPubMessagesSent, clientPubMessagesRecv, clientPubBytesSent, clientPubBytesRecv]))
- print(output)
- with change_stdout('usage.out'):
- resources_log = resource.getrusage(resource.RUSAGE_SELF)
- maxmemmib = resources_log.ru_maxrss/1024
- usertime = resources_log.ru_utime
- systime = resources_log.ru_stime
- output = ",".join(map(lambda x: str(x), [maxmemmib, usertime, systime]))
- print(output)
- if nodeType == QP_Quorum or nodeType == QPLastHop_Quorum or nodeType == DHTPIR_Quorum:
- with change_stdout('client_latency.out'):
- print("FT Range Accesses")
- currFingerTableRangeAccesses = client.get_finger_table_range_accesses()
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableRangeAccesses.items())))
- print("FT Direct Accesses")
- currFingerTableAccesses = client.get_finger_table_accesses()
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableAccesses.items())))
- if nodeType == QPLastHop_Quorum:
- print("Database OT Accesses")
- currDatabaseAccesses = client.get_database_accesses()
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currDatabaseAccesses.items())))
- if nodeType == DHTPIR_Quorum:
- print("PIR Retrievals")
- currPIRRetrievals = client.get_PIR_retrievals()
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currPIRRetrievals.items())))
- with change_stdout('all_node_calculations.out'):
- print("FT Range Accesses")
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableRangeAccesses.items())))
- print("FT Direct Accesses")
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableAccesses.items())))
- if nodeType == QPLastHop_Quorum:
- print("Database OT Accesses")
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allDatabaseAccesses.items())))
- if nodeType == DHTPIR_Quorum:
- print("PHF Generations")
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPHFGenerations.items())))
- print("PIR Retrievals")
- print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPIRRetrievals.items())))
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Experiment harness for DHTPIR")
- parser.add_argument('numDocuments', metavar="numDocuments", type=int, help="The number of documents in the experiment")
- parser.add_argument('sizeOfDocuments', metavar="sizeOfDocuments", type=int, help="The size of the documents in the experiment")
- parser.add_argument('numGroups', metavar="numGroups", type=int, help="The number of groups in the experiment")
- parser.add_argument('numNodes', metavar="numNodes", type=int, help="The number of nodes per group in the experiment (not used for Base Nodes)")
- parser.add_argument('-b', action='store_true', help="Use Base Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
- parser.add_argument('-r', action='store_true', help="Use RCP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
- parser.add_argument('-q', action='store_true', help="Use QP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
- parser.add_argument('-l', action='store_true', help="Use QP Nodes with last hop OT in the experiment (if not set, defaults to DHTPIR Nodes)")
- parser.add_argument('-d', action='store_true', help="Use DHTPIR Nodes in the experiment (if not set, defaults to DHTPIR Nodes)")
- parser.add_argument('--seed', help="Set the seed for the file generation in this run.")
- args = parser.parse_args()
- numNodes = 4
- if args.numNodes >= 4:
- numNodes = args.numNodes
- numGroups = args.numGroups
- if args.d:
- nodeType = DHTPIR_Quorum
- clientType = DHTPIR_Client
- elif args.l:
- nodeType = QPLastHop_Quorum
- clientType = QPLastHop_Client
- elif args.q:
- nodeType = QP_Quorum
- clientType = QP_Client
- elif args.r:
- nodeType = RCP_Quorum
- clientType = RCP_Client
- elif args.b:
- nodeType = Base_Node
- clientType = Base_Client
- numGroups *= numNodes
- numNodes = 1
- else:
- nodeType = DHTPIR_Quorum
- clientType = DHTPIR_Client
- seed = ""
- if args.seed:
- seed = args.seed
- main(args.numDocuments, args.sizeOfDocuments, numGroups, numNodes, nodeType, clientType, seed)
|