#!/usr/bin/env python3 from contextlib import contextmanager from random import Random from collections import defaultdict import numpy as np import resource import argparse import sys import os directory = os.path.expanduser('library') sys.path.insert(1, directory) from dht_common import generate_file, KNOWN_NODE from dht_simulator import DHT_Simulator from base_node import Base_Node from base_client import Base_Client from rcp_node import RCP_Quorum from rcp_client import RCP_Client from qp_node import QP_Quorum from qp_client import QP_Client from qplasthop_node import QPLastHop_Quorum from qplasthop_client import QPLastHop_Client from dhtpir_node import DHTPIR_Quorum from dhtpir_client import DHTPIR_Client ## # This functionality allows us to temporarily change our working directory # # @input newdir - the new directory (relative to our current position) we want to be in @contextmanager def cd(newDir, makeNew): prevDir = os.getcwd() directory = os.path.expanduser(newDir) if not os.path.exists(directory) and makeNew: os.makedirs(directory) os.chdir(directory) try: yield finally: os.chdir(prevDir) ## # This functionality allows us to temporarily change where stdout routes # # @input new_out - the file that stdout will get routed to temporarily @contextmanager def change_stdout(newOut): prevOut = sys.stdout sys.stdout = open(newOut, 'w') try: yield finally: sys.stdout.close() sys.stdout = prevOut def main(numDocuments, documentSize, numGroups, numNodes, nodeType, clientType, seed): cryptogen = Random(seed) testbed = DHT_Simulator(nodeType, numGroups, documentSize, numNodes) client = clientType(testbed, KNOWN_NODE, documentSize, numNodes) documentIDs = [] print("Inserting files.") for i in range(numDocuments): document = generate_file(documentSize, cryptogen) documentIDs.append(client.insert_file(document)) clientPubRounds = client.get_num_rounds() clientPubMessagesSent = client.get_num_messages_sent() clientPubMessagesRecv = client.get_num_messages_recv() clientPubBytesSent = client.get_num_bytes_sent() clientPubBytesRecv = client.get_num_bytes_recv() numPubRounds = [] numPubMessagesSent = [] numPubMessagesRecv = [] numPubBytesSent = [] numPubBytesRecv = [] numPubNodesInSample = 0 for i in range(numGroups): if nodeType != Base_Node: for j in range(numNodes): currNumRounds = testbed.get_num_rounds(i, j) currNumMessagesSent = testbed.get_num_messages_sent(i, j) currNumMessagesRecv = testbed.get_num_messages_recv(i, j) currNumBytesSent = testbed.get_num_bytes_sent(i, j) currNumBytesRecv = testbed.get_num_bytes_recv(i, j) numPubRounds.append(currNumRounds) numPubMessagesSent.append(currNumMessagesSent) numPubMessagesRecv.append(currNumMessagesRecv) numPubBytesSent.append(currNumBytesSent) numPubBytesRecv.append(currNumBytesRecv) numPubNodesInSample += 1 else: currNumRounds = testbed.get_num_rounds_base(i) currNumMessagesSent = testbed.get_num_messages_sent_base(i) currNumMessagesRecv = testbed.get_num_messages_recv_base(i) currNumBytesSent = testbed.get_num_bytes_sent_base(i) currNumBytesRecv = testbed.get_num_bytes_recv_base(i) numPubRounds.append(currNumRounds) numPubMessagesSent.append(currNumMessagesSent) numPubMessagesRecv.append(currNumMessagesRecv) numPubBytesSent.append(currNumBytesSent) numPubBytesRecv.append(currNumBytesRecv) numPubNodesInSample += 1 numPubRounds = np.array(numPubRounds) numPubMessagesSent = np.array(numPubMessagesSent) numPubMessagesRecv = np.array(numPubMessagesRecv) numPubBytesSent = np.array(numPubBytesSent) numPubBytesRecv = np.array(numPubBytesRecv) numPubRounds = [np.mean(numPubRounds), np.percentile(numPubRounds, 25), np.percentile(numPubRounds, 50), np.percentile(numPubRounds, 75), np.std(numPubRounds)] numPubMessagesSent = [np.mean(numPubMessagesSent), np.percentile(numPubMessagesSent, 25), np.percentile(numPubMessagesSent, 50), np.percentile(numPubMessagesSent, 75), np.std(numPubMessagesSent)] numPubMessagesRecv = [np.mean(numPubMessagesRecv), np.percentile(numPubMessagesRecv, 25), np.percentile(numPubMessagesRecv, 50), np.percentile(numPubMessagesRecv, 75), np.std(numPubMessagesRecv)] numPubBytesSent = [np.mean(numPubBytesSent), np.percentile(numPubBytesSent, 25), np.percentile(numPubBytesSent, 50), np.percentile(numPubBytesSent, 75), np.std(numPubBytesSent)] numPubBytesRecv = [np.mean(numPubBytesRecv), np.percentile(numPubBytesRecv, 25), np.percentile(numPubBytesRecv, 50), np.percentile(numPubBytesRecv, 75), np.std(numPubBytesRecv)] print("Retrieving files.") for i in range(numDocuments): client.retrieve_file(documentIDs[i]) numRounds = [] numMessagesSent = [] numMessagesRecv = [] numBytesSent = [] numBytesRecv = [] numNodesInSample = 0 allFingerTableRangeAccesses = defaultdict(lambda: 0) allFingerTableAccesses = defaultdict(lambda: 0) allDatabaseAccesses = defaultdict(lambda: 0) allPHFGenerations = defaultdict(lambda: 0) allPIRRetrievals = defaultdict(lambda: 0) for i in range(numGroups): if nodeType != Base_Node: for j in range(numNodes): currNumRounds = testbed.get_num_rounds(i, j) currNumMessagesSent = testbed.get_num_messages_sent(i, j) currNumMessagesRecv = testbed.get_num_messages_recv(i, j) currNumBytesSent = testbed.get_num_bytes_sent(i, j) currNumBytesRecv = testbed.get_num_bytes_recv(i, j) numRounds.append(currNumRounds) numMessagesSent.append(currNumMessagesSent) numMessagesRecv.append(currNumMessagesRecv) numBytesSent.append(currNumBytesSent) numBytesRecv.append(currNumBytesRecv) numNodesInSample += 1 if nodeType != RCP_Quorum: currFingerTableRangeAccesses = testbed.get_finger_table_range_accesses(i, j) for currKey in currFingerTableRangeAccesses.keys(): allFingerTableRangeAccesses[currKey] += currFingerTableRangeAccesses[currKey] currFingerTableAccesses = testbed.get_finger_table_accesses(i, j) for currKey in currFingerTableAccesses.keys(): allFingerTableAccesses[currKey] += currFingerTableAccesses[currKey] if nodeType == QPLastHop_Quorum: currDatabaseAccesses = testbed.get_database_accesses(i, j) for currKey in currDatabaseAccesses.keys(): allDatabaseAccesses[currKey] += currDatabaseAccesses[currKey] if nodeType == DHTPIR_Quorum: currPHFGenerations = testbed.get_PHF_generations(i, j) for currKey in currPHFGenerations.keys(): allPHFGenerations[currKey] += currPHFGenerations[currKey] currPIRRetrievals = testbed.get_PIR_retrievals(i, j) for currKey in currPIRRetrievals.keys(): allPIRRetrievals[currKey] += currPIRRetrievals[currKey] else: currNumRounds = testbed.get_num_rounds_base(i) currNumMessagesSent = testbed.get_num_messages_sent_base(i) currNumMessagesRecv = testbed.get_num_messages_recv_base(i) currNumBytesSent = testbed.get_num_bytes_sent_base(i) currNumBytesRecv = testbed.get_num_bytes_recv_base(i) numRounds.append(currNumRounds) numMessagesSent.append(currNumMessagesSent) numMessagesRecv.append(currNumMessagesRecv) numBytesSent.append(currNumBytesSent) numBytesRecv.append(currNumBytesRecv) numNodesInSample += 1 numRounds = np.array(numRounds) numMessagesSent = np.array(numMessagesSent) numMessagesRecv = np.array(numMessagesRecv) numBytesSent = np.array(numBytesSent) numBytesRecv = np.array(numBytesRecv) numRounds = [np.mean(numRounds), np.percentile(numRounds, 25), np.percentile(numRounds, 50), np.percentile(numRounds, 75), np.std(numRounds)] numMessagesSent = [np.mean(numMessagesSent), np.percentile(numMessagesSent, 25), np.percentile(numMessagesSent, 50), np.percentile(numMessagesSent, 75), np.std(numMessagesSent)] numMessagesRecv = [np.mean(numMessagesRecv), np.percentile(numMessagesRecv, 25), np.percentile(numMessagesRecv, 50), np.percentile(numMessagesRecv, 75), np.std(numMessagesRecv)] numBytesSent = [np.mean(numBytesSent), np.percentile(numBytesSent, 25), np.percentile(numBytesSent, 50), np.percentile(numBytesSent, 75), np.std(numBytesSent)] numBytesRecv = [np.mean(numBytesRecv), np.percentile(numBytesRecv, 25), np.percentile(numBytesRecv, 50), np.percentile(numBytesRecv, 75), np.std(numBytesRecv)] with cd('../outputs/' + nodeType.__name__ + '/' + str(numGroups) + '/' + str(numNodes) + '/' + str(numDocuments) + '/' + seed, True): with change_stdout('avg_node.out'): output = str(numNodesInSample) + "\n" output += ",".join(map(lambda x: str(x), numRounds)) output += "\n" output += ",".join(map(lambda x: str(x), numMessagesSent)) output += "\n" output += ",".join(map(lambda x: str(x), numMessagesRecv)) output += "\n" output += ",".join(map(lambda x: str(x), numBytesSent)) output += "\n" output += ",".join(map(lambda x: str(x), numBytesRecv)) output += "\n" print(output) with change_stdout('client.out'): currNumRounds = client.get_num_rounds() currNumMessagesSent = client.get_num_messages_sent() currNumMessagesRecv = client.get_num_messages_recv() currNumBytesSent = client.get_num_bytes_sent() currNumBytesRecv = client.get_num_bytes_recv() output = ",".join(map(lambda x: str(x), [currNumRounds, currNumMessagesSent, currNumMessagesRecv, currNumBytesSent, currNumBytesRecv])) print(output) with change_stdout('avg_node_pub.out'): output = str(numPubNodesInSample) + "\n" output += ",".join(map(lambda x: str(x), numPubRounds)) output += "\n" output += ",".join(map(lambda x: str(x), numPubMessagesSent)) output += "\n" output += ",".join(map(lambda x: str(x), numPubMessagesRecv)) output += "\n" output += ",".join(map(lambda x: str(x), numPubBytesSent)) output += "\n" output += ",".join(map(lambda x: str(x), numPubBytesRecv)) output += "\n" print(output) with change_stdout('client_pub.out'): output = ",".join(map(lambda x: str(x), [clientPubRounds, clientPubMessagesSent, clientPubMessagesRecv, clientPubBytesSent, clientPubBytesRecv])) print(output) with change_stdout('usage.out'): resources_log = resource.getrusage(resource.RUSAGE_SELF) maxmemmib = resources_log.ru_maxrss/1024 usertime = resources_log.ru_utime systime = resources_log.ru_stime output = ",".join(map(lambda x: str(x), [maxmemmib, usertime, systime])) print(output) if nodeType == QP_Quorum or nodeType == QPLastHop_Quorum or nodeType == DHTPIR_Quorum: with change_stdout('client_latency.out'): print("FT Range Accesses") currFingerTableRangeAccesses = client.get_finger_table_range_accesses() print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableRangeAccesses.items()))) print("FT Direct Accesses") currFingerTableAccesses = client.get_finger_table_accesses() print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currFingerTableAccesses.items()))) if nodeType == QPLastHop_Quorum: print("Database OT Accesses") currDatabaseAccesses = client.get_database_accesses() print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currDatabaseAccesses.items()))) if nodeType == DHTPIR_Quorum: print("PIR Retrievals") currPIRRetrievals = client.get_PIR_retrievals() print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), currPIRRetrievals.items()))) with change_stdout('all_node_calculations.out'): print("FT Range Accesses") print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableRangeAccesses.items()))) print("FT Direct Accesses") print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allFingerTableAccesses.items()))) if nodeType == QPLastHop_Quorum: print("Database OT Accesses") print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allDatabaseAccesses.items()))) if nodeType == DHTPIR_Quorum: print("PHF Generations") print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPHFGenerations.items()))) print("PIR Retrievals") print("\n".join(map(lambda x: str(x[0]) + "," + str(x[1]), allPIRRetrievals.items()))) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Experiment harness for DHTPIR") parser.add_argument('numDocuments', metavar="numDocuments", type=int, help="The number of documents in the experiment") parser.add_argument('sizeOfDocuments', metavar="sizeOfDocuments", type=int, help="The size of the documents in the experiment") parser.add_argument('numGroups', metavar="numGroups", type=int, help="The number of groups in the experiment") parser.add_argument('numNodes', metavar="numNodes", type=int, help="The number of nodes per group in the experiment (not used for Base Nodes)") parser.add_argument('-b', action='store_true', help="Use Base Nodes in the experiment (if not set, defaults to DHTPIR Nodes)") parser.add_argument('-r', action='store_true', help="Use RCP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)") parser.add_argument('-q', action='store_true', help="Use QP Nodes in the experiment (if not set, defaults to DHTPIR Nodes)") parser.add_argument('-l', action='store_true', help="Use QP Nodes with last hop OT in the experiment (if not set, defaults to DHTPIR Nodes)") parser.add_argument('-d', action='store_true', help="Use DHTPIR Nodes in the experiment (if not set, defaults to DHTPIR Nodes)") parser.add_argument('--seed', help="Set the seed for the file generation in this run.") args = parser.parse_args() numNodes = 4 if args.numNodes >= 4: numNodes = args.numNodes numGroups = args.numGroups if args.d: nodeType = DHTPIR_Quorum clientType = DHTPIR_Client elif args.l: nodeType = QPLastHop_Quorum clientType = QPLastHop_Client elif args.q: nodeType = QP_Quorum clientType = QP_Client elif args.r: nodeType = RCP_Quorum clientType = RCP_Client elif args.b: nodeType = Base_Node clientType = Base_Client numGroups *= numNodes numNodes = 1 else: nodeType = DHTPIR_Quorum clientType = DHTPIR_Client seed = "" if args.seed: seed = args.seed main(args.numDocuments, args.sizeOfDocuments, numGroups, numNodes, nodeType, clientType, seed)