Selaa lähdekoodia

at large scales, synchronization was breaking down. This fixes that

tristangurtler 3 vuotta sitten
vanhempi
commit
f5d5621937

+ 17 - 0
prsona/inc/networkClient.hpp

@@ -154,6 +154,23 @@ std::string make_rep_proof_string(
     const Twistpoint& shortTermPublicKey,
     const Scalar& threshold);
 
+/* OTHER CLIENT-RELEVANT HANDLERS */
+
+// Used to tell orchestrator when the client is no longer doing a triggered task
+class ClientReadyHandler : public CivetHandler
+{
+    public:
+        ClientReadyHandler(
+            struct synchronization_tool *exitSync);
+
+        bool handleGet(
+            CivetServer *server,
+            struct mg_connection *conn);
+
+    private:
+        struct synchronization_tool *exitSync;
+};
+
 /* CLIENT-SPECIFIC HANDLER */
 
 class PrsonaClientWebSocketHandler : public CivetWebSocketHandler {

+ 6 - 1
prsona/inc/networkOrchestrator.hpp

@@ -8,6 +8,7 @@
 #include "networking.hpp"
 
 const std::chrono::seconds ONE_SECOND(1);
+const std::chrono::milliseconds HALF_SECOND(500);
 
 /* "PUBLIC" FUNCTIONS */
 
@@ -29,11 +30,15 @@ void wait_for_servers_ready(
     std::string dealer,
     int dealerPort);
 
-void wait_for_clients_ready(
+void wait_for_clients_created(
     std::string dealer,
     int dealerPort,
     size_t numClients);
 
+void wait_for_client_ready(
+    std::string client,
+    int clientPort);
+
 // RUN EXPERIMENT
 void execute_experiment(
     std::default_random_engine& rng,

+ 3 - 0
prsona/inc/networking.hpp

@@ -141,6 +141,9 @@ enum RequestType {
 // SERVER-ORCHESTRATOR SYNCHRONIZATION
 #define EPOCH_READY_URI "/ready"
 
+// CLIENT-ORCHESTRATOR SYNCHRONIZATION
+#define CLIENT_READY_URI "/taskdone"
+
 // SERVER EXPERIMENT TRIGGER
 #define TRIGGER_EPOCH_URI "/epoch"
 

+ 4 - 0
prsona/src/clientMain.cpp

@@ -150,6 +150,10 @@ int main(int argc, char *argv[])
     RemoteControlHandler exitHandler(&exitSync, "Client coming down!");
     server.addHandler(EXIT_URI, exitHandler);
 
+    // This handler tells the orchestrator when the client has finished an assigned task
+    ClientReadyHandler clientReadyHandler(&exitSync);
+    server.addHandler(CLIENT_READY_URI, clientReadyHandler);
+
     // Make-vote handler (allows orchestrator to cause this client to make a new, random vote)
     AltRemoteControlHandler triggerVoteHandler(CLIENT_MAKE_VOTE, &exitSync, "Client will make new votes!");
     server.addHandler(TRIGGER_VOTE_URI, triggerVoteHandler);

+ 39 - 0
prsona/src/networkClient.cpp

@@ -874,6 +874,45 @@ std::string make_rep_proof_string(
     return buffer.str();
 }
 
+/**********************************************************
+ ****                                                  ****
+ ****  other client-relevant handler member functions  ****
+ ****                                                  ****
+ **********************************************************/
+
+/*
+ * CLIENT READY HANDLER
+ */
+
+ClientReadyHandler::ClientReadyHandler(
+    struct synchronization_tool *exitSync)
+: exitSync(exitSync)
+{ /* */ }
+
+bool ClientReadyHandler::handleGet(
+    CivetServer *server,
+    struct mg_connection *conn)
+{
+    std::unique_lock<std::mutex> exitLock(exitSync->mtx, std::defer_lock);
+
+    if (!exitLock.try_lock())
+    {
+        mg_printf(conn, "HTTP/1.1 503 Service Unavailable\r\n"
+                        "Content-Type: text/plain\r\n"
+                        "Connection: close\r\n\r\n");
+        mg_printf(conn, "Client is still making previous votes or a reputation proof.\n");   
+    }
+    else
+    {
+        mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+                        "Content-Type: text/plain\r\n"
+                        "Connection: close\r\n\r\n");
+        mg_printf(conn, "Client is ready to move forward.\n");
+    }
+
+    return true;
+}
+
 /*********************************************************
  ****                                                 ****
  ****  PrsonaClientWebSocketHandler member functions  ****

+ 92 - 45
prsona/src/networkOrchestrator.cpp

@@ -1,6 +1,7 @@
 #include <iostream>
 #include <fstream>
 #include <sstream>
+#include <algorithm>
 #include <cstdlib>
 #include <thread>
 
@@ -91,12 +92,13 @@ void wait_for_servers_ready(
         // Connect to the dealer
         while (!conn)
         {
-            std::this_thread::sleep_for(ONE_SECOND);
-
             conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
 
             if (!conn)
+            {
                 std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
+                std::this_thread::sleep_for(HALF_SECOND);
+            }
         }
 
         // Make the correct GET request
@@ -115,7 +117,7 @@ void wait_for_servers_ready(
     }
 }
 
-void wait_for_clients_ready(
+void wait_for_clients_created(
     std::string dealer,
     int dealerPort,
     size_t numClients)
@@ -132,12 +134,13 @@ void wait_for_clients_ready(
         sync.val2 = 0;
         while (!conn)
         {
-            std::this_thread::sleep_for(ONE_SECOND);
-
             conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
 
             if (!conn)
+            {
                 std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
+                std::this_thread::sleep_for(HALF_SECOND);
+            }
         }
 
         // Tell the dealer we're ready for its response
@@ -156,6 +159,50 @@ void wait_for_clients_ready(
     }
 }
 
+void wait_for_client_ready(
+    std::string client,
+    int clientPort)
+{
+    // Requesting information about clients being ready is done via a GET request
+    std::stringstream sysString;
+    std::string data;
+    sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n";
+    sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n";
+    data = sysString.str();
+
+    bool ready = false;
+    while (!ready)
+    {
+        struct mg_connection *conn = NULL;
+
+        // Connect to the client
+        while (!conn)
+        {
+            conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0);
+
+            if (!conn)
+            {
+                std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl;
+                std::this_thread::sleep_for(HALF_SECOND);
+            }
+        }
+
+        // Make the correct GET request
+        mg_write(conn, data.c_str(), data.length());
+
+        // Wait for a response
+        mg_get_response(conn, NULL, 0, 250);
+        const struct mg_response_info *info = mg_get_response_info(conn);
+
+        // Close connection
+        mg_close_connection(conn);
+
+        // If the client says it's ready, then we can move on
+        if (info->status_code == 200)
+            ready = true;
+    }
+}
+
 /*
  * RUN EXPERIMENT
  */
@@ -187,6 +234,8 @@ void execute_experiment(
         std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
 
         std::vector<size_t> whichActors;
+        std::vector<std::vector<size_t>> proofActors;
+        std::vector<std::thread> clientWaiters;
         int numVoters, numProofs;
 
         // The first character of each command tells us which it is
@@ -197,7 +246,12 @@ void execute_experiment(
                 numVoters = atoi(strtok(buffer + 1, " "));
                 whichActors = generate_random_set(rng, numVoters, clientIPs.size());
                 for (size_t i = 0; i < whichActors.size(); i++)
-                    trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);    
+                    trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);
+                for (size_t i = 0; i < whichActors.size(); i++)
+                    clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]]));
+                for (size_t i = 0; i < clientWaiters.size(); i++)
+                    clientWaiters[i].join();
+                clientWaiters.clear();
                 break;
 
             // Reputation proof triggers come in form `R <numProofs>`
@@ -212,12 +266,21 @@ void execute_experiment(
                         clientPorts[whichActors[0]],
                         clientIPs[whichActors[1]],
                         clientPorts[whichActors[1]]);
+
+                    proofActors.push_back(whichActors);
                 }
+                for (size_t i = 0; i < proofActors.size(); i++)
+                    clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]]));
+                for (size_t i = 0; i < clientWaiters.size(); i++)
+                    clientWaiters[i].join();
+                proofActors.clear();
+                clientWaiters.clear();
                 break;
 
             // Epoch change triggers come in form `E`
             case 'E':
                 trigger_epoch_change(dealerIP, dealerPort);
+                wait_for_servers_ready(dealerIP, dealerPort);
                 break;
 
             default:
@@ -229,6 +292,8 @@ void execute_experiment(
 
     // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
     wait_for_servers_ready(dealerIP, dealerPort);
+    for (size_t i = 0; i < clientIPs.size(); i++)
+        wait_for_client_ready(clientIPs[i], clientPorts[i]);
 }
 
 /****************************************************
@@ -237,37 +302,6 @@ void execute_experiment(
  *********                                  *********
  ****************************************************/
 
-std::vector<size_t> generate_random_set(
-    std::default_random_engine& rng,
-    size_t size,
-    size_t maxVal)
-{
-    std::vector<size_t> retval;
-
-    if (size > maxVal)
-        return retval;
-
-    if (size == maxVal)
-    {
-        for (size_t i = 0; i < size; i++)
-            retval.push_back(i);
-
-        return retval;
-    }
-
-    std::vector<size_t> weights(maxVal, 1);
-
-    for (size_t i = 0; i < size; i++)
-    {
-        std::discrete_distribution<size_t> dist(weights.begin(), weights.end());
-
-        retval.push_back(dist(rng));
-        weights[retval[i]] = 0;
-    }
-
-    return retval;
-}
-
 /*
  * TRIGGER EXPERIMENT EVENTS
  */
@@ -276,9 +310,6 @@ void trigger_epoch_change(
     std::string dealer,
     int dealerPort)
 {
-    // Give other updates a chance to resolve
-    wait_for_servers_ready(dealer, dealerPort);
-
     // Epoch changes are triggered via GET request to the correct location
     std::stringstream sysString;
     std::string data;
@@ -302,9 +333,6 @@ void trigger_epoch_change(
 
     // Close connection
     mg_close_connection(conn);
-
-    // Don't bother giving new commands until this one has resolved
-    wait_for_servers_ready(dealer, dealerPort);
 }
 
 void trigger_vote(
@@ -365,7 +393,26 @@ void trigger_reputation_proof(
 
     // Close connection
     mg_close_connection(conn);
+}
+
+/*
+ * EXECUTOR HELPER
+ */
+
+std::vector<size_t> generate_random_set(
+    std::default_random_engine& rng,
+    size_t size,
+    size_t maxVal)
+{
+    std::vector<size_t> holder;
+
+    for (size_t i = 0; i < maxVal; i++)
+        holder.push_back(i);
+
+    shuffle(holder.begin(), holder.end(), rng);
+
+    if (size > holder.size())
+        size = holder.size();
 
-    // Give this command a small amount of time to resolve
-    std::this_thread::sleep_for(ONE_SECOND);
+    return std::vector<size_t>(holder.begin(), holder.begin() + size);
 }

+ 1 - 1
prsona/src/orchestratorMain.cpp

@@ -127,7 +127,7 @@ int main(int argc, char* argv[])
     for (size_t i = 0; i < numClients; i++)
         clientStartup[i].join();
 
-    wait_for_clients_ready(dealerIP, dealerPort, numClients);
+    wait_for_clients_created(dealerIP, dealerPort, numClients);
 
     /*
      * MAIN ORCHESTRATOR LOOP CODE