123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- #include <iostream>
- #include <fstream>
- #include <sstream>
- #include <algorithm>
- #include <cstdlib>
- #include <thread>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/wait.h>
- #include "networkOrchestrator.hpp"
- /***************************************************
- ********* *********
- ********* orchestrator public functions *********
- ********* *********
- ***************************************************/
- /*
- * START UP AND SHUT DOWN INSTANCES
- */
- void start_remote_actor(
- const std::string& target,
- bool server,
- const std::string& id,
- const std::string& output,
- size_t lambda,
- bool maliciousServers)
- {
- const char* sshFile = "/usr/bin/ssh";
- const char* serverFile = "bin/server";
- const char* clientFile = "bin/client";
- const char* calledFile = (target != "self" && !target.empty() ? sshFile : (server ? serverFile : clientFile));
- char *argv[6];
- char fileBuffer[13];
- strncpy(fileBuffer, calledFile, 13);
- argv[0] = fileBuffer;
- char flagBuffer[3];
- strncpy(flagBuffer, "-n", 3);
- char targetBuffer[64];
- strncpy(targetBuffer, target.c_str(), 64);
- std::string fullArgString("~/prsona/prsona/scripts/startup.sh ");
- char fullArgBuffer[256];
- fullArgString = fullArgString + (server ? "server" : "client") + " ";
- char idBuffer[64];
- strncpy(idBuffer, id.c_str(), 64);
- fullArgString = fullArgString + id + " ";
- char outputBuffer[128];
- strncpy(outputBuffer, output.c_str(), 128);
- fullArgString = fullArgString + output + " ";
- std::stringstream lambdaStream;
- lambdaStream << lambda;
- char lambdaBuffer[3];
- strncpy(lambdaBuffer, lambdaStream.str().c_str(), 3);
- fullArgString = fullArgString + lambdaStream.str() + " ";
- char maliciousBuffer[3];
- if (maliciousServers)
- strncpy(maliciousBuffer, "T", 2);
- else
- strncpy(maliciousBuffer, "F", 2);
- fullArgString = fullArgString + (maliciousServers ? "T" : "F");
- strncpy(fullArgBuffer, fullArgString.c_str(), 256);
- if (target != "self" && !target.empty())
- {
- argv[1] = flagBuffer;
- argv[2] = targetBuffer;
- argv[3] = fullArgBuffer;
- argv[4] = NULL;
- }
- else
- {
- argv[1] = idBuffer;
- argv[2] = outputBuffer;
- argv[3] = lambdaBuffer;
- argv[4] = maliciousBuffer;
- argv[5] = NULL;
- }
-
- int pid = fork();
- if (pid < 0)
- exit(1);
- if (pid == 0)
- execv(calledFile, argv);
- }
- void shut_down_remote_actors(
- const std::vector<std::string>& relevantIPs,
- const std::vector<int>& relevantPorts)
- {
- for (size_t i = 0; i < relevantIPs.size(); i++)
- {
- // Shut downs are triggered by a GET request to the correct location
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
- sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
- data = sysString.str();
- struct mg_connection *conn = NULL;
- // Connect to the instance
- while (!conn)
- {
- conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
- if (!conn)
- std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
- }
- // Make correct GET request
- mg_write(conn, data.c_str(), data.length());
- // Close connection
- mg_close_connection(conn);
- }
- }
- /*
- * SYNCHRONIZATION
- */
- void wait_for_servers_ready(
- std::string dealer,
- int dealerPort)
- {
- // Requesting information about servers being ready is done via a GET request
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
- sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
- data = sysString.str();
- bool ready = false;
- while (!ready)
- {
- struct mg_connection *conn = NULL;
- // Connect to the dealer
- while (!conn)
- {
- conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
- if (!conn)
- {
- std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
- std::this_thread::sleep_for(HALF_SECOND);
- }
- }
- // Make the correct GET request
- mg_write(conn, data.c_str(), data.length());
- // Wait for a response
- mg_get_response(conn, NULL, 0, 250);
- const struct mg_response_info *info = mg_get_response_info(conn);
- // Close connection
- mg_close_connection(conn);
- // If the dealer says it's ready, then we can move on
- if (info->status_code == 200)
- ready = true;
- }
- }
- void wait_for_clients_created(
- std::string dealer,
- int dealerPort,
- size_t numClients)
- {
- bool ready = false;
- while (!ready)
- {
- struct synchronization_tool sync;
- struct mg_connection *conn = NULL;
- // Connect to the dealer
- std::unique_lock<std::mutex> lck(sync.mtx);
- sync.val = 0;
- sync.val2 = 0;
- while (!conn)
- {
- conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
- if (!conn)
- {
- std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
- std::this_thread::sleep_for(HALF_SECOND);
- }
- }
- // Tell the dealer we're ready for its response
- mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
- // Wait for that response
- while (!sync.val2)
- sync.cv.wait(lck);
- // Close connection
- mg_close_connection(conn);
- // If the dealer says it's ready, then we can move on
- if (sync.val == numClients)
- ready = true;
- }
- }
- void wait_for_client_ready(
- std::string client,
- int clientPort)
- {
- // Requesting information about clients being ready is done via a GET request
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n";
- sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n";
- data = sysString.str();
- bool ready = false;
- while (!ready)
- {
- struct mg_connection *conn = NULL;
- // Connect to the client
- while (!conn)
- {
- conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0);
- if (!conn)
- {
- std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl;
- std::this_thread::sleep_for(HALF_SECOND);
- }
- }
- // Make the correct GET request
- mg_write(conn, data.c_str(), data.length());
- // Wait for a response
- mg_get_response(conn, NULL, 0, 250);
- const struct mg_response_info *info = mg_get_response_info(conn);
- // Close connection
- mg_close_connection(conn);
- // If the client says it's ready, then we can move on
- if (info->status_code == 200)
- ready = true;
- }
- }
- /*
- * RUN EXPERIMENT
- */
- void execute_experiment(
- std::default_random_engine& rng,
- std::string dealerIP,
- int dealerPort,
- std::vector<std::string> serverIPs,
- std::vector<int> serverPorts,
- std::vector<std::string> clientIPs,
- std::vector<int> clientPorts,
- const char *filename)
- {
- size_t line = 1;
- // Iterate across each line in the command file, which contains one command per line
- char buffer[128];
- std::ifstream commands(filename);
- while (!commands.eof())
- {
- commands.getline(buffer, 128);
- if (strlen(buffer) == 0)
- {
- line++;
- continue;
- }
- std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
- std::vector<size_t> whichActors;
- std::vector<std::vector<size_t>> proofActors;
- std::vector<std::thread> clientWaiters;
- int numVoters, numProofs;
- // The first character of each command tells us which it is
- switch(buffer[0])
- {
- // Vote triggers come in form `V <numVoters>`
- case 'V':
- numVoters = atoi(strtok(buffer + 1, " "));
- whichActors = generate_random_set(rng, numVoters, clientIPs.size());
- for (size_t i = 0; i < whichActors.size(); i++)
- trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);
- std::this_thread::sleep_for(HALF_SECOND);
- for (size_t i = 0; i < whichActors.size(); i++)
- clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]]));
- for (size_t i = 0; i < clientWaiters.size(); i++)
- clientWaiters[i].join();
- clientWaiters.clear();
- break;
- // Reputation proof triggers come in form `R <numProofs>`
- case 'R':
- numProofs = atoi(strtok(buffer + 1, " "));
- for (int i = 0; i < numProofs; i++)
- {
- whichActors = generate_random_set(rng, 2, clientIPs.size());
-
- trigger_reputation_proof(
- clientIPs[whichActors[0]],
- clientPorts[whichActors[0]],
- clientIPs[whichActors[1]],
- clientPorts[whichActors[1]]);
- proofActors.push_back(whichActors);
- }
- std::this_thread::sleep_for(HALF_SECOND);
- for (size_t i = 0; i < proofActors.size(); i++)
- clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]]));
- for (size_t i = 0; i < clientWaiters.size(); i++)
- clientWaiters[i].join();
- proofActors.clear();
- clientWaiters.clear();
- break;
- // Epoch change triggers come in form `E`
- case 'E':
- trigger_epoch_change(dealerIP, dealerPort);
- std::this_thread::sleep_for(HALF_SECOND);
- wait_for_servers_ready(dealerIP, dealerPort);
- break;
- default:
- break;
- }
- line++;
- }
- // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
- wait_for_servers_ready(dealerIP, dealerPort);
- for (size_t i = 0; i < clientIPs.size(); i++)
- wait_for_client_ready(clientIPs[i], clientPorts[i]);
- }
- /****************************************************
- ********* *********
- ********* orchestrator private functions *********
- ********* *********
- ****************************************************/
- /*
- * TRIGGER EXPERIMENT EVENTS
- */
- void trigger_epoch_change(
- std::string dealer,
- int dealerPort)
- {
- // Epoch changes are triggered via GET request to the correct location
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
- sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
- data = sysString.str();
- struct mg_connection *conn = NULL;
- // Connect to the dealer
- while (!conn)
- {
- conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
- if (!conn)
- std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
- }
- // Make the relevant GET request
- mg_write(conn, data.c_str(), data.length());
- // Close connection
- mg_close_connection(conn);
- }
- void trigger_vote(
- std::string target,
- int targetPort)
- {
- // New votes are triggered via GET request to the correct location
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
- sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
- data = sysString.str();
- struct mg_connection *conn = NULL;
- // Connect to the client
- while (!conn)
- {
- conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
- if (!conn)
- std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
- }
- // Make the relevant GET request
- mg_write(conn, data.c_str(), data.length());
- // Close connection
- mg_close_connection(conn);
- }
- void trigger_reputation_proof(
- std::string target,
- int targetPort,
- std::string verifier,
- int verifierPort)
- {
- // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
- std::stringstream sysString;
- std::string data;
- sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
- sysString << "Host: " << target << "\r\n\r\n";
- data = sysString.str();
- struct mg_connection *conn = NULL;
- // Connect to the client
- while (!conn)
- {
- conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
- if (!conn)
- std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
- }
- // Make the relevant GET request
- mg_write(conn, data.c_str(), data.length());
- // Close connection
- mg_close_connection(conn);
- }
- /*
- * EXECUTOR HELPER
- */
- std::vector<size_t> generate_random_set(
- std::default_random_engine& rng,
- size_t size,
- size_t maxVal)
- {
- std::vector<size_t> holder;
- for (size_t i = 0; i < maxVal; i++)
- holder.push_back(i);
- shuffle(holder.begin(), holder.end(), rng);
- if (size > holder.size())
- size = holder.size();
- return std::vector<size_t>(holder.begin(), holder.begin() + size);
- }
|