瀏覽代碼

Client ifdef options to pick unique IP and ports and randomize sleep delays

Sajin Sasy 1 年之前
父節點
當前提交
1df4dbf38f
共有 2 個文件被更改,包括 123 次插入19 次删除
  1. 112 16
      Client/clients.cpp
  2. 11 3
      Client/clients.hpp

+ 112 - 16
Client/clients.cpp

@@ -11,6 +11,7 @@
 #include "gcm.h"
 #include "sgx_tcrypto.h"
 #include "clients.hpp"
+#include <cstdlib>
 
 #define CEILDIV(x,y) (((x)+(y)-1)/(y))
 
@@ -364,7 +365,7 @@ void Client::initClient(clientid_t cid, uint16_t stg_id,
 }
 
 void Client::initializeStgSocket(boost::asio::io_context &ioc,
-    NodeConfig &stg_server)
+    NodeConfig &stg_server, std::string raw_ip_addr, uint16_t &port_no)
 {
 
     boost::system::error_code err;
@@ -375,19 +376,46 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         std::cerr << "Connecting to " << stg_server.name << "...\n";
         std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
 #endif
+
+        boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(raw_ip_addr, err);
+        if(err) {
+            printf("initializeStgSocket::Invalid IP address\n");
+        }
+
         storage_sock = new boost::asio::ip::tcp::socket(ioc);
+        storage_sock->open(boost::asio::ip::tcp::v4(), err);
+        while(1) {
+            boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
+            storage_sock->bind(ep, err);
+            if(!err) break;
+            else {
+                printf("Error %s\n", err.message().c_str());
+                port_no++;
+                if(port_no == PORT_END) {
+                    port_no = PORT_START;
+                }
+            }
+        }
+
         boost::asio::connect(*storage_sock,
             resolver.resolve(stg_server.slistenhost,
                 stg_server.slistenport), err);
         if (!err) break;
-        std::cerr << "Connection to " << stg_server.name <<
+        std::cerr <<"STG: Connection to " << stg_server.name <<
             " refused, will , epoch_noretry.\n";
-        sleep(1);
+        std::cerr << raw_ip_addr << ":" << port_no << "\n";
+
+#ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
+        int sleep_delay = rand() % 10000000;
+        usleep(sleep_delay);
+#else
+        usleep(1000000);
+#endif
     }
 }
 
 void Client::initializeIngSocket(boost::asio::io_context &ioc,
-    NodeConfig &ing_server)
+    NodeConfig &ing_server, std::string raw_ip_addr, uint16_t &port_no)
 {
 
     boost::system::error_code err;
@@ -398,14 +426,40 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
         std::cerr << "Connecting to " << ing_server.name << "...\n";
         std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
 #endif
+
+        boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(raw_ip_addr, err);
+        if(err) {
+            printf("initializeIngSocket::Invalid IP address\n");
+        }
+
         ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
+        ingestion_sock->open(boost::asio::ip::tcp::v4(), err);
+        while(1) {
+            boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
+            ingestion_sock->bind(ep, err);
+            if(!err) break;
+            else {
+                printf("Error %s\n", err.message().c_str());
+                port_no++;
+                if(port_no == PORT_END) {
+                    port_no = PORT_START;
+                }
+            }
+        }
+
         boost::asio::connect(*ingestion_sock,
             resolver.resolve(ing_server.clistenhost,
                 ing_server.clistenport), err);
         if (!err) break;
-        std::cerr << "Connection to " << ing_server.name <<
+        std::cerr << "ING: Connection to " << ing_server.name <<
             " refused, will , epoch_noretry.\n";
-        sleep(1);
+        std::cerr << raw_ip_addr << ":" << port_no << "\n";
+#ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
+        int sleep_delay = rand() % 10000000;
+        usleep(sleep_delay);
+#else
+        usleep(1000000);
+#endif
     }
 }
 
@@ -541,6 +595,14 @@ int Client::sendIngAuthMessage(unsigned long epoch_no)
     printf("\n");
 #endif
 
+    if(sim_id%7919==0) {
+        printf("Client %d auth_message: \n", sim_id);
+        for(int i=0; i<TOKEN_SIZE; i++) {
+            printf("%x", am_ptr[i]);
+        }
+        printf("\n");
+    }
+
     boost::asio::write(*ingestion_sock,
         boost::asio::buffer(auth_message, auth_size));
 
@@ -581,7 +643,6 @@ int Client::sendStgAuthMessage(unsigned long epoch_no)
     boost::asio::async_write(*storage_sock,
         boost::asio::buffer(auth_message, auth_size),
         [this] (boost::system::error_code ecc, std::size_t) {
-
         if (ecc) {
             if(ecc == boost::asio::error::eof) {
                 delete(storage_sock);
@@ -599,7 +660,8 @@ int Client::sendStgAuthMessage(unsigned long epoch_no)
 }
 
 void Client::setup_client(boost::asio::io_context &io_context,
-    uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id)
+    uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
+    std::string ip_addr, uint16_t port_no)
 {
     // Setup the client's
     // (i) client_id
@@ -611,9 +673,9 @@ void Client::setup_client(boost::asio::io_context &io_context,
 
     initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
 
-    initializeStgSocket(io_context, storage_nodes[stg_node_id]);
-
-    initializeIngSocket(io_context, ingestion_nodes[ing_node_id]);
+    initializeStgSocket(io_context, storage_nodes[stg_node_id], ip_addr, port_no);
+    port_no++;
+    initializeIngSocket(io_context, ingestion_nodes[ing_node_id], ip_addr, port_no);
 
     // Authenticate clients to their ingestion and storage servers
     struct timespec ep;
@@ -622,22 +684,57 @@ void Client::setup_client(boost::asio::io_context &io_context,
     unsigned long epoch_no = CEILDIV(time_in_ns, EPOCH_INTERVAL);
     sendStgAuthMessage(epoch_no);
     sendIngAuthMessage(epoch_no);
+    epoch_process();
 }
 
 void generateClients(boost::asio::io_context &io_context,
-    uint32_t cstart, uint32_t cstop)
+    uint32_t cstart, uint32_t cstop, uint8_t thread_no)
 {
     uint32_t num_clients_total = config.user_count;
     uint16_t num_stg_nodes = storage_nodes.size();
     uint16_t num_ing_nodes = ingestion_nodes.size();
+    uint8_t ip3 =0, ip4 = 0;
+    uint8_t ip2 = thread_no;
+    uint8_t ip1 = 127;
+    uint16_t port_no = PORT_START;
+    std::string ip_address;
 
     for(uint32_t i=cstart; i<cstop; i++) {
+        // Compute client's ip and port
+
+#ifdef CLIENT_UNIQUE_IP
+        if(port_no==PORT_END) {
+            port_no = PORT_START;
+        }
+        ip4++;
+        if(ip4==0) {
+            ip3++;
+            if(ip3==0) {
+                ip2++;
+            }
+        }
+#else
+        if(port_no==PORT_END) {
+            port_no = PORT_START;
+            ip4++;
+            if(ip4==0) {
+                ip3++;
+                if(ip3==0) {
+                    ip2++;
+                }
+            }
+        }
+#endif
+        ip_address = std::to_string(ip1) + "." + std::to_string(ip2) + "."
+            + std::to_string(ip3) + "." + std::to_string(ip4);
 
         uint16_t ing_no = i % num_ing_nodes;
         uint16_t stg_no = i % num_stg_nodes;
         uint16_t stg_node_id = storage_map[stg_no];
         uint16_t ing_node_id = ingestion_map[ing_no];
-        clients[i].setup_client(io_context, i, ing_node_id, stg_node_id);
+        clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
+            ip_address, port_no);
+        port_no++;
     }
 }
 
@@ -670,8 +767,8 @@ void Client::epoch_process() {
             }
             else {
                 printf("Error %s\n", ec.message().c_str());
+                printf("Client::epoch_process boost async_read_tokens failed\n");
             }
-            printf("Client::epoch_process boost async_read_tokens failed\n");
             return;
         }
 
@@ -778,7 +875,7 @@ void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
 #endif
 
         threads.emplace_back(boost::thread(generateClients,
-            boost::ref(io_context), cstart, cstop));
+            boost::ref(io_context), cstart, cstop, i));
     }
 
     for(int i=0; i<nthreads; i++) {
@@ -852,7 +949,6 @@ int main(int argc, char **argv)
 
         initializeClients(io_context, nthreads);
 
-        run_epochs(nthreads);
     });
 
     // Start another thread; one will perform the work and the other

+ 11 - 3
Client/clients.hpp

@@ -2,6 +2,12 @@ typedef uint8_t token[SGX_AESGCM_MAC_SIZE];
 typedef uint8_t aes_key[SGX_AESGCM_KEY_SIZE];
 
 // #define VERBOSE_CLIENT
+// #define RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
+#define CLIENT_UNIQUE_IP
+
+#define PORT_START 32768
+#define PORT_END 65534
+
 
 /*
 
@@ -64,8 +70,10 @@ private:
 
     void sendMessageBundle();
 
-    void initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server);
-    void initializeStgSocket(boost::asio::io_context &ioc, NodeConfig &ing_server);
+    void initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server,
+        std::string raw_ip_addr, uint16_t &port_no);
+    void initializeStgSocket(boost::asio::io_context &ioc, NodeConfig &ing_server,
+        std::string raw_ip_addr, uint16_t &port_no);
 
     void initClient(clientid_t cid, uint16_t stg_id, aes_key ikey, aes_key skey);
 
@@ -81,7 +89,7 @@ public:
     }
 
     void setup_client(boost::asio::io_context &ioc, uint32_t sim_id,
-        uint16_t ing_node_id, uint16_t stg_node_id);
+        uint16_t ing_node_id, uint16_t stg_node_id, std::string, uint16_t pno);
 
     void epoch_process();