Przeglądaj źródła

adding the orchestrator code

tristangurtler 3 lat temu
rodzic
commit
7811abfb42
2 zmienionych plików z 410 dodań i 16 usunięć
  1. 4 16
      prsona/Makefile
  2. 406 0
      prsona/src/orchestratorMain.cpp

+ 4 - 16
prsona/Makefile

@@ -36,10 +36,10 @@ CFLAGS = -std=c99 -O3 -fomit-frame-pointer -I$(666_INC_PATH) -I$(MG_INC_PATH)
 C_LDFLAGS = -lm
 
 main: $(PRSONA_BIN_PATH) $(PRSONA_OBJ_PATH) $(BGN_OBJ_PATH) $(666_OBJ_PATH) $(PRSONA_BIN_PATH)/main
-test: $(PRSONA_BIN_PATH) $(PRSONA_OBJ_PATH) $(BGN_OBJ_PATH) $(666_OBJ_PATH) $(PRSONA_BIN_PATH)/test
 
 server: $(PRSONA_BIN_PATH) $(PRSONA_OBJ_PATH) $(BGN_OBJ_PATH) $(666_OBJ_PATH) $(PRSONA_BIN_PATH)/server
 client: $(PRSONA_BIN_PATH) $(PRSONA_OBJ_PATH) $(BGN_OBJ_PATH) $(666_OBJ_PATH) $(PRSONA_BIN_PATH)/client
+orchestrator: $(PRSONA_BIN_PATH) $(PRSONA_OBJ_PATH) $(BGN_OBJ_PATH) $(666_OBJ_PATH) $(PRSONA_BIN_PATH)/orchestrator
 
 $(PRSONA_BIN_PATH):
 	mkdir -p $@
@@ -61,25 +61,17 @@ PRSONA_NETWORK_SRC = $(wildcard $(PRSONA_SRC_PATH)/network*.cpp)
 PRSONA_SRC = $(filter-out $(PRSONA_NETWORK_SRC) $(wildcard $(PRSONA_SRC_PATH)/*ain.cpp), $(PRSONA_FULL_SRC)) 
 PRSONA_NETWORK_OBJ = $(patsubst $(PRSONA_SRC_PATH)/%.cpp, $(PRSONA_OBJ_PATH)/%.o, $(PRSONA_NETWORK_SRC))
 PRSONA_OBJ = $(patsubst $(PRSONA_SRC_PATH)/%.cpp, $(PRSONA_OBJ_PATH)/%.o, $(PRSONA_SRC))
-PRSONA_TEST_OBJ = $(patsubst $(PRSONA_SRC_PATH)/%.cpp, $(PRSONA_OBJ_PATH)/%.test.o, $(PRSONA_SRC))
 
 $(PRSONA_OBJ_PATH)/%.o: $(PRSONA_SRC_PATH)/%.cpp
 	$(CPP) $(CPPFLAGS) -DQHASM -c -o $@ $< 
 
-$(PRSONA_OBJ_PATH)/%.test.o: $(PRSONA_SRC_PATH)/%.cpp
-	$(CPP) $(CPPTESTFLAGS) -DQHASM -c -o $@ $< 
-
 BGN_FULL_SRC = $(wildcard $(BGN_SRC_PATH)/*.cpp)
 BGN_SRC = $(filter-out $(BGN_SRC_PATH)/main.cpp, $(BGN_FULL_SRC)) 
 BGN_OBJ = $(patsubst $(BGN_SRC_PATH)/%.cpp, $(BGN_OBJ_PATH)/%.o, $(BGN_SRC))
-BGN_TEST_OBJ = $(patsubst $(BGN_SRC_PATH)/%.cpp, $(BGN_OBJ_PATH)/%.test.o, $(BGN_SRC))
 
 $(BGN_OBJ_PATH)/%.o: $(BGN_SRC_PATH)/%.cpp
 	$(CPP) $(CPPFLAGS) -DQHASM -c -o $@ $< 
 
-$(BGN_OBJ_PATH)/%.test.o: $(BGN_SRC_PATH)/%.cpp
-	$(CPP) $(CPPTESTFLAGS) -DQHASM -c -o $@ $< 
-
 666_ALL_C_SRC = $(wildcard $(666_SRC_PATH)/*.c)
 666_C_SRC = $(filter-out $(666_SRC_PATH)/bilintest.c $(666_SRC_PATH)/speedtest.c $(666_SRC_PATH)/test_curvepoint_multiscalar.c $(666_SRC_PATH)/test_twistpoint_multiscalar.c $(666_SRC_PATH)/twistpoint_fp2_multiscalar.c $(666_SRC_PATH)/curvepoint_fp_multiscalar.c, $(666_ALL_C_SRC))
 666_C_OBJ = $(patsubst $(666_SRC_PATH)/%.c, $(666_OBJ_PATH)/%_c_with_as.o, $(666_C_SRC))
@@ -97,12 +89,8 @@ $(BGN_OBJ_PATH)/bgn.a: $(666_AS_OBJ) $(666_C_OBJ) $(BGN_OBJ)
 	rm -f $@
 	ar cr $@ $^
 
-$(BGN_OBJ_PATH)/bgn.test.a: $(666_AS_OBJ) $(666_C_OBJ) $(BGN_TEST_OBJ)
-	rm -f $@
-	ar cr $@ $^
-
 MG_OBJ += $(MG_OBJ_PATH)/civetweb.o
-MG_OBJ += $(MG_OBJ_PATH)/CiverServer.o
+MG_OBJ += $(MG_OBJ_PATH)/CivetServer.o
 
 $(MG_OBJ_PATH)/%.o: 
 	make -C ../civetweb WITH_WEBSOCKET=1 WITH_CPP=1 lib
@@ -116,8 +104,8 @@ $(PRSONA_BIN_PATH)/server: $(PRSONA_OBJ_PATH)/serverMain.o $(PRSONA_OBJ) $(PRSON
 $(PRSONA_BIN_PATH)/client: $(PRSONA_OBJ_PATH)/clientMain.o $(PRSONA_OBJ) $(PRSONA_NETWORK_OBJ) $(MG_OBJ) $(BGN_OBJ_PATH)/bgn.a
 	$(CPP) $(CPPFLAGS) -no-pie -o $@ $^ $(LDFLAGS)
 
-$(PRSONA_BIN_PATH)/test: $(PRSONA_OBJ_PATH)/main.test.o $(PRSONA_TEST_OBJ) $(BGN_OBJ_PATH)/bgn.test.a
-	$(CPP) $(CPPTESTFLAGS) -no-pie -o $@ $^ $(LDTESTFLAGS)
+$(PRSONA_BIN_PATH)/orchestrator: $(PRSONA_OBJ_PATH)/orchestratorMain.o $(PRSONA_OBJ) $(PRSONA_NETWORK_OBJ) $(MG_OBJ) $(BGN_OBJ_PATH)/bgn.a
+	$(CPP) $(CPPFLAGS) -no-pie -o $@ $^ $(LDFLAGS)
 
 .PHONY: clean
 

+ 406 - 0
prsona/src/orchestratorMain.cpp

@@ -0,0 +1,406 @@
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <string>
+#include <cstring>
+#include <cstdlib>
+#include <vector>
+
+#include "networking.hpp"
+
+using namespace std;
+
+static int clients_websocket_data_handler(
+    struct mg_connection *conn,
+    int bits,
+    char *data,
+    size_t data_len,
+    void *user_data)
+{
+    if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE)
+        return false;
+
+    if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_BINARY)
+    {
+        struct synchronization_tool *synch = (struct synchronization_tool *) user_data;
+
+        unique_lock<mutex> lck(synch->mtx);
+        synch->val = atoi(data);
+
+        return false;
+    }
+
+    if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_DATACOMPLETE)
+        return false;
+
+    std::cerr << "Unknown response when trying to get update lock." << std::endl;
+    return false;    
+}
+
+void start_remote_actor(string target, bool server, string output)
+{
+    stringstream buffer;
+    string command;
+
+    buffer << "ssh tmgurtle@" << target << " \"screen \'~/prsona/prsona/bin/"
+        << (server ? "startServer.sh " : "startClient.sh ") << output << "\'\"" ;
+    command = buffer.str();
+
+    system(command.c_str());
+}
+
+void wait_for_servers_ready(string dealer)
+{
+    bool flag = false;
+    while (!flag)
+    {
+        sleep(1);
+
+        stringstream sysString;
+        string data;
+        char buffer[255];
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                dealer.c_str(), 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        if (!conn)
+            continue;
+
+        sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
+        sysString << "Host: " << dealer << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        size_t readLen = mg_read(conn, (uint8_t*) buffer, 254);
+        buffer[readLen] = 0;
+
+        if (strstr(buffer, "200"))
+            flag = true;
+
+        mg_close_connection(conn);
+    }
+}
+
+void wait_for_clients_ready(string dealer, size_t numClients)
+{
+    struct synchronization_tool numClientsSync;
+
+    bool flag = false;
+    while (!flag)
+    {
+        sleep(1);
+
+        stringstream sysString;
+        string data;
+        char buffer[255];
+
+        struct mg_connection *conn =
+            mg_connect_websocket_client(
+                dealer.c_str(),
+                PRSONA_PORT,
+                USE_SSL,
+                NULL,
+                0,
+                NUM_CLIENTS_URI,
+                "null",
+                clients_websocket_data_handler,
+                synchro_websocket_close_handler,
+                &numClientsSync);
+
+        if (!conn)
+            continue;
+
+        unique_lock<mutex> lck(numClientsSync.mtx);
+        numClientsSync.val = 0;
+        numClientsSync.val2 = 1;
+
+        mg_websocket_client_write(
+            conn,
+            MG_WEBSOCKET_OPCODE_DATACOMPLETE,
+            "",
+            0);
+
+        while (numClientsSync.val2)
+            numClientsSync.cv.wait(lck);
+
+        mg_close_connection(conn);
+
+        if (numClientsSync.val == numClients)
+            flag = true;
+    }
+}
+
+void trigger_epoch(string dealer)
+{
+    sleep(1);
+
+    bool flag = false;
+    while (!flag)
+    {
+        stringstream sysString;
+        string data;
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                dealer.c_str(), 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        if (!conn)
+            continue;
+
+        sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
+        sysString << "Host: " << dealer << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        mg_close_connection(conn);
+
+        flag = true;
+    }
+
+    wait_for_servers_ready(dealer);
+}
+
+void trigger_vote(string target)
+{
+    bool flag = false;
+    while (!flag)
+    {
+        stringstream sysString;
+        string data;
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                target.c_str(), 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        if (!conn)
+            continue;
+
+        sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
+        sysString << "Host: " << dealer << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        mg_close_connection(conn);
+
+        flag = true;
+    }
+}
+
+void trigger_reputation_proof(string target, string verifier)
+{
+    bool flag = false;
+    while (!flag)
+    {
+        stringstream sysString;
+        string data;
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                target.c_str(), 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        if (!conn)
+            continue;
+
+        sysString << "GET " << TRIGGER_REP_URI << verifier << " HTTP/1.1\r\n";
+        sysString << "Host: " << dealer << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        mg_close_connection(conn);
+
+        flag = true;
+    }
+}
+
+void execute_experiment(string dealer)
+{
+    size_t line = 1;
+
+    char buffer[128];
+    ifstream commands("commands.cfg");
+
+    while (!commands.eof())
+    {
+        commands.getline(buffer, 128);
+        if (strlen(buffer) == 0)
+        {
+            line++;
+            continue;
+        }
+
+        cout << "Command " << line << ": " << string(buffer) << endl;
+
+        switch(buffer[0])
+        {
+            case 'V':
+                string target = buffer + 2;
+                trigger_vote(target);
+                break;
+
+            case 'R':
+                char *target, *verifier;
+                target = strtok(buffer, " ");
+                verifier = strtok(NULL, " ");
+                trigger_reputation_proof(string(target), string(verifier));
+                break;
+
+            case 'E':
+                trigger_epoch(dealer);
+                break;
+
+            default:
+                break;
+        }
+
+        line++;
+    }
+}
+
+int main(int argc, char* argv[])
+{
+    string experimentOutput = random_string(8);
+
+#if USE_SSL
+    mg_init_library(0);
+#else
+    mg_init_library(MG_FEATURES_SSL);
+#endif
+
+    vector<string> serverIPs, clientIPs;
+    string dealerIP;
+
+    char buffer[40];
+    ifstream serverConfig("serverIPs.cfg");
+    while (!serverConfig.eof())
+    {
+        serverConfig.getline(buffer, 40);
+        if (strlen(buffer) > 0)
+            serverIPs.push_back(string(buffer));
+    }
+
+    ifstream clientConfig("clientIPs.cfg");
+    while (!clientConfig.eof())
+    {
+        clientConfig.getline(buffer, 40);
+        if (strlen(buffer) > 0)
+            clientIPs.push_back(string(buffer));
+    }
+
+    ifstream dealerConfig("dealerIP.cfg");
+    while (!dealerConfig.eof())
+    {
+        dealerConfig.getline(buffer, 40);
+        if (strlen(buffer) > 0)
+            dealerIP = buffer;
+    }
+
+    size_t numServers = serverIPs.size();
+    size_t numClients = clientIPs.size();
+
+    cout << "This experiment is running with output code: " << experimentOutput << endl;
+    cout << "Starting BGN dealer server." << endl;
+
+    vector<thread> serverStartup, clientStartup;
+    serverStartup.push_back(thread(start_remote_actor, dealerIP, true, experimentOutput));
+
+    sleep(1);
+
+    cout << "Starting other servers." << endl;
+
+    for (size_t i = 0; i < numServers; i++)
+    {
+        if (serverIPs[i] == dealerIP)
+            continue;
+
+        serverStartup.push_back(thread(start_remote_actor, serverIPs[i], true, experimentOutput));
+    }
+
+    cout << "Waiting for confirmation that servers are ready to continue." << endl;
+
+    for (size_t i = 0; i < numServers; i++)
+        serverStartup[i].join();
+
+    wait_for_servers_ready(dealerIP);
+
+    cout << "Starting clients." << endl;
+
+    for (size_t i = 0; i < numClients; i++)
+        clientStartup.push_back(thread(start_remote_actor, clientIPs[i], false, experimentOutput));
+
+    cout << "Waiting for confirmation that servers have all clients logged." << endl;    
+
+    for (size_t i = 0; i < numClients; i++)
+        clientStartup[i].join();
+
+    wait_for_clients_ready(dealerIP);
+
+    cout << "Beginning experiment." << endl;
+
+    execute_experiment(dealerIP);
+
+    cout << "Finishing experiment." << endl;
+    cout << "Sending shutdown commands to clients." << endl;
+
+    for (size_t i = 0; i < clientIPs.size(); i++)
+    {
+        stringstream sysString;
+        string data;
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                clientIPs[i], 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
+        sysString << "Host: " << clientIPs[i] << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        mg_close_connection(conn);
+    }
+
+    cout << "Sending shutdown commands to servers." << endl;
+
+    for (size_t i = 0; i < serverIPs.size(); i++)
+    {
+        stringstream sysString;
+        string data;
+
+        struct mg_connection *conn =
+            mg_connect_client(
+                serverIPs[i], 
+                PRSONA_PORT,
+                USE_SSL, 
+                NULL,
+                0);
+
+        sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
+        sysString << "Host: " << serverIPs[i] << "\r\n\r\n";
+        data = sysString.str();
+
+        mg_write(conn, data.c_str(), data.length());
+        mg_close_connection(conn);
+    }
+
+    return 0;
+}