#include #include #include #include #include #include #include #include #include #include "networkOrchestrator.hpp" /*************************************************** ********* ********* ********* orchestrator public functions ********* ********* ********* ***************************************************/ /* * START UP AND SHUT DOWN INSTANCES */ void start_remote_actor( const std::string& target, bool server, const std::string& id, const std::string& output, size_t lambda, bool maliciousServers) { const char* sshFile = "/usr/bin/ssh"; const char* serverFile = "bin/server"; const char* clientFile = "bin/client"; const char* calledFile = (target != "self" && !target.empty() ? sshFile : (server ? serverFile : clientFile)); char *argv[6]; char fileBuffer[13]; strncpy(fileBuffer, calledFile, 13); argv[0] = fileBuffer; char flagBuffer[3]; strncpy(flagBuffer, "-n", 3); char targetBuffer[64]; strncpy(targetBuffer, target.c_str(), 64); std::string fullArgString("~/prsona/prsona/scripts/startup.sh "); char fullArgBuffer[256]; fullArgString = fullArgString + (server ? "server" : "client") + " "; char idBuffer[64]; strncpy(idBuffer, id.c_str(), 64); fullArgString = fullArgString + id + " "; char outputBuffer[128]; strncpy(outputBuffer, output.c_str(), 128); fullArgString = fullArgString + output + " "; std::stringstream lambdaStream; lambdaStream << lambda; char lambdaBuffer[3]; strncpy(lambdaBuffer, lambdaStream.str().c_str(), 3); fullArgString = fullArgString + lambdaStream.str() + " "; char maliciousBuffer[3]; if (maliciousServers) strncpy(maliciousBuffer, "T", 2); else strncpy(maliciousBuffer, "F", 2); fullArgString = fullArgString + (maliciousServers ? "T" : "F"); strncpy(fullArgBuffer, fullArgString.c_str(), 256); if (target != "self" && !target.empty()) { argv[1] = flagBuffer; argv[2] = targetBuffer; argv[3] = fullArgBuffer; argv[4] = NULL; } else { argv[1] = idBuffer; argv[2] = outputBuffer; argv[3] = lambdaBuffer; argv[4] = maliciousBuffer; argv[5] = NULL; } int pid = fork(); if (pid < 0) exit(1); if (pid == 0) execv(calledFile, argv); } void shut_down_remote_actors( const std::vector& relevantIPs, const std::vector& relevantPorts) { for (size_t i = 0; i < relevantIPs.size(); i++) { // Shut downs are triggered by a GET request to the correct location std::stringstream sysString; std::string data; sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n"; sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n"; data = sysString.str(); struct mg_connection *conn = NULL; // Connect to the instance while (!conn) { conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0); if (!conn) std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl; } // Make correct GET request mg_write(conn, data.c_str(), data.length()); // Close connection mg_close_connection(conn); } } /* * SYNCHRONIZATION */ void wait_for_servers_ready( std::string dealer, int dealerPort) { // Requesting information about servers being ready is done via a GET request std::stringstream sysString; std::string data; sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n"; sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n"; data = sysString.str(); bool ready = false; while (!ready) { struct mg_connection *conn = NULL; // Connect to the dealer while (!conn) { conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0); if (!conn) { std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl; std::this_thread::sleep_for(HALF_SECOND); } } // Make the correct GET request mg_write(conn, data.c_str(), data.length()); // Wait for a response mg_get_response(conn, NULL, 0, 250); const struct mg_response_info *info = mg_get_response_info(conn); // Close connection mg_close_connection(conn); // If the dealer says it's ready, then we can move on if (info->status_code == 200) ready = true; } } void wait_for_clients_created( std::string dealer, int dealerPort, size_t numClients) { bool ready = false; while (!ready) { struct synchronization_tool sync; struct mg_connection *conn = NULL; // Connect to the dealer std::unique_lock lck(sync.mtx); sync.val = 0; sync.val2 = 0; while (!conn) { conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync); if (!conn) { std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl; std::this_thread::sleep_for(HALF_SECOND); } } // Tell the dealer we're ready for its response mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0); // Wait for that response while (!sync.val2) sync.cv.wait(lck); // Close connection mg_close_connection(conn); // If the dealer says it's ready, then we can move on if (sync.val == numClients) ready = true; } } void wait_for_client_ready( std::string client, int clientPort) { // Requesting information about clients being ready is done via a GET request std::stringstream sysString; std::string data; sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n"; sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n"; data = sysString.str(); bool ready = false; while (!ready) { struct mg_connection *conn = NULL; // Connect to the client while (!conn) { conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0); if (!conn) { std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl; std::this_thread::sleep_for(HALF_SECOND); } } // Make the correct GET request mg_write(conn, data.c_str(), data.length()); // Wait for a response mg_get_response(conn, NULL, 0, 250); const struct mg_response_info *info = mg_get_response_info(conn); // Close connection mg_close_connection(conn); // If the client says it's ready, then we can move on if (info->status_code == 200) ready = true; } } /* * RUN EXPERIMENT */ void execute_experiment( std::default_random_engine& rng, std::string dealerIP, int dealerPort, std::vector serverIPs, std::vector serverPorts, std::vector clientIPs, std::vector clientPorts, const char *filename) { size_t line = 1; // Iterate across each line in the command file, which contains one command per line char buffer[128]; std::ifstream commands(filename); while (!commands.eof()) { commands.getline(buffer, 128); if (strlen(buffer) == 0) { line++; continue; } std::cout << "Command " << line << ": " << std::string(buffer) << std::endl; std::vector whichActors; std::vector> proofActors; std::vector clientWaiters; int numVoters, numProofs; // The first character of each command tells us which it is switch(buffer[0]) { // Vote triggers come in form `V ` case 'V': numVoters = atoi(strtok(buffer + 1, " ")); whichActors = generate_random_set(rng, numVoters, clientIPs.size()); for (size_t i = 0; i < whichActors.size(); i++) trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]); std::this_thread::sleep_for(HALF_SECOND); for (size_t i = 0; i < whichActors.size(); i++) clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]])); for (size_t i = 0; i < clientWaiters.size(); i++) clientWaiters[i].join(); clientWaiters.clear(); break; // Reputation proof triggers come in form `R ` case 'R': numProofs = atoi(strtok(buffer + 1, " ")); for (int i = 0; i < numProofs; i++) { whichActors = generate_random_set(rng, 2, clientIPs.size()); trigger_reputation_proof( clientIPs[whichActors[0]], clientPorts[whichActors[0]], clientIPs[whichActors[1]], clientPorts[whichActors[1]]); proofActors.push_back(whichActors); } std::this_thread::sleep_for(HALF_SECOND); for (size_t i = 0; i < proofActors.size(); i++) clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]])); for (size_t i = 0; i < clientWaiters.size(); i++) clientWaiters[i].join(); proofActors.clear(); clientWaiters.clear(); break; // Epoch change triggers come in form `E` case 'E': trigger_epoch_change(dealerIP, dealerPort); std::this_thread::sleep_for(HALF_SECOND); wait_for_servers_ready(dealerIP, dealerPort); break; default: break; } line++; } // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else wait_for_servers_ready(dealerIP, dealerPort); for (size_t i = 0; i < clientIPs.size(); i++) wait_for_client_ready(clientIPs[i], clientPorts[i]); } /**************************************************** ********* ********* ********* orchestrator private functions ********* ********* ********* ****************************************************/ /* * TRIGGER EXPERIMENT EVENTS */ void trigger_epoch_change( std::string dealer, int dealerPort) { // Epoch changes are triggered via GET request to the correct location std::stringstream sysString; std::string data; sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n"; sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n"; data = sysString.str(); struct mg_connection *conn = NULL; // Connect to the dealer while (!conn) { conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0); if (!conn) std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl; } // Make the relevant GET request mg_write(conn, data.c_str(), data.length()); // Close connection mg_close_connection(conn); } void trigger_vote( std::string target, int targetPort) { // New votes are triggered via GET request to the correct location std::stringstream sysString; std::string data; sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n"; sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n"; data = sysString.str(); struct mg_connection *conn = NULL; // Connect to the client while (!conn) { conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0); if (!conn) std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl; } // Make the relevant GET request mg_write(conn, data.c_str(), data.length()); // Close connection mg_close_connection(conn); } void trigger_reputation_proof( std::string target, int targetPort, std::string verifier, int verifierPort) { // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier) std::stringstream sysString; std::string data; sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n"; sysString << "Host: " << target << "\r\n\r\n"; data = sysString.str(); struct mg_connection *conn = NULL; // Connect to the client while (!conn) { conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0); if (!conn) std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl; } // Make the relevant GET request mg_write(conn, data.c_str(), data.length()); // Close connection mg_close_connection(conn); } /* * EXECUTOR HELPER */ std::vector generate_random_set( std::default_random_engine& rng, size_t size, size_t maxVal) { std::vector holder; for (size_t i = 0; i < maxVal; i++) holder.push_back(i); shuffle(holder.begin(), holder.end(), rng); if (size > holder.size()) size = holder.size(); return std::vector(holder.begin(), holder.begin() + size); }