Browse Source

Fixed client IP and port selection. Now binds to chosen IP and port pair correctly.

Sajin Sasy 1 year ago
parent
commit
27e62db661
2 changed files with 91 additions and 51 deletions
  1. 64 46
      Client/clients.cpp
  2. 27 5
      Client/clients.hpp

+ 64 - 46
Client/clients.cpp

@@ -22,6 +22,7 @@ std::vector<NodeConfig> ingestion_nodes, storage_nodes;
 std::vector<uint16_t> storage_map;
 std::vector<uint16_t> ingestion_map;
 unsigned long setup_time;
+uint16_t nthreads = 1;
 
 // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
 // into a host part "127.0.0.1" and a port part "12000".
@@ -365,11 +366,10 @@ void Client::initClient(clientid_t cid, uint16_t stg_id,
 }
 
 void Client::initializeStgSocket(boost::asio::io_context &ioc,
-    NodeConfig &stg_server, std::string raw_ip_addr, uint16_t &port_no)
+    NodeConfig &stg_server, ip_addr *curr_ip, uint16_t &port_no)
 {
 
     boost::system::error_code err;
-    boost::asio::ip::tcp::resolver resolver(ioc);
 
     while(1) {
 #ifdef VERBOSE_CLIENT
@@ -377,7 +377,10 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         std::cout << stg_server.slistenhost << ":" << stg_server.slistenport;
 #endif
 
-        boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(raw_ip_addr, err);
+        std::string ip_str = curr_ip->ip_str();
+
+        boost::asio::ip::address ip_address =
+            boost::asio::ip::address::from_string(ip_str, err);
         if(err) {
             printf("initializeStgSocket::Invalid IP address\n");
         }
@@ -389,33 +392,40 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
             storage_sock->bind(ep, err);
             if(!err) break;
             else {
-                printf("Error %s\n", err.message().c_str());
+                printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
                 port_no++;
-                if(port_no == PORT_END) {
+                if(port_no >= PORT_END) {
                     port_no = PORT_START;
+                    curr_ip->increment(nthreads);
                 }
             }
         }
 
-        boost::asio::connect(*storage_sock,
-            resolver.resolve(stg_server.slistenhost,
-                stg_server.slistenport), err);
-        if (!err) break;
+
+        boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err);
+        boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport));
+        // just for printing
+        // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
+        storage_sock->connect(stg_ep, err);
+        if (!err) {
+            break;
+        }
         std::cerr <<"STG: Connection to " << stg_server.name <<
             " refused, will , epoch_noretry.\n";
-        std::cerr << raw_ip_addr << ":" << port_no << "\n";
+        std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
 
 #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
-        int sleep_delay = rand() % 10000000;
+        int sleep_delay = rand() % 100000;
         usleep(sleep_delay);
 #else
         usleep(1000000);
 #endif
+        delete(storage_sock);
     }
 }
 
 void Client::initializeIngSocket(boost::asio::io_context &ioc,
-    NodeConfig &ing_server, std::string raw_ip_addr, uint16_t &port_no)
+    NodeConfig &ing_server, ip_addr *curr_ip, uint16_t &port_no)
 {
 
     boost::system::error_code err;
@@ -427,7 +437,9 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
         std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
 #endif
 
-        boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(raw_ip_addr, err);
+        std::string ip_str = curr_ip->ip_str();
+        boost::asio::ip::address ip_address =
+            boost::asio::ip::address::from_string(ip_str, err);
         if(err) {
             printf("initializeIngSocket::Invalid IP address\n");
         }
@@ -439,27 +451,44 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
             ingestion_sock->bind(ep, err);
             if(!err) break;
             else {
-                printf("Error %s\n", err.message().c_str());
+                printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
                 port_no++;
-                if(port_no == PORT_END) {
+                if(port_no >= PORT_END) {
                     port_no = PORT_START;
+                    curr_ip->increment(nthreads);
                 }
             }
         }
 
+
+        // just for printing
+        boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
+        boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err);
+        boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport));
+        //std::cout << "Ing endpoint:" << ing_ep << "\n";
+        //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n";
+        ingestion_sock->connect(ing_ep, err);
+
+        /*
         boost::asio::connect(*ingestion_sock,
             resolver.resolve(ing_server.clistenhost,
                 ing_server.clistenport), err);
-        if (!err) break;
+        */
+
+        if (!err) {
+           //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n";
+           break;
+        }
         std::cerr << "ING: Connection to " << ing_server.name <<
             " refused, will , epoch_noretry.\n";
-        std::cerr << raw_ip_addr << ":" << port_no << "\n";
+        std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
 #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
-        int sleep_delay = rand() % 10000000;
+        int sleep_delay = rand() % 100000;
         usleep(sleep_delay);
 #else
         usleep(1000000);
 #endif
+        delete(ingestion_sock);
     }
 }
 
@@ -595,6 +624,7 @@ int Client::sendIngAuthMessage(unsigned long epoch_no)
     printf("\n");
 #endif
 
+    /*
     if(sim_id%7919==0) {
         printf("Client %d auth_message: \n", sim_id);
         for(int i=0; i<TOKEN_SIZE; i++) {
@@ -602,6 +632,7 @@ int Client::sendIngAuthMessage(unsigned long epoch_no)
         }
         printf("\n");
     }
+    */
 
     boost::asio::write(*ingestion_sock,
         boost::asio::buffer(auth_message, auth_size));
@@ -661,7 +692,7 @@ int Client::sendStgAuthMessage(unsigned long epoch_no)
 
 void Client::setup_client(boost::asio::io_context &io_context,
     uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
-    std::string ip_addr, uint16_t port_no)
+    ip_addr *curr_ip, uint16_t &port_no)
 {
     // Setup the client's
     // (i) client_id
@@ -673,9 +704,10 @@ void Client::setup_client(boost::asio::io_context &io_context,
 
     initClient(sim_id, stg_node_id, client_ing_key, client_stg_key);
 
-    initializeStgSocket(io_context, storage_nodes[stg_node_id], ip_addr, port_no);
+    initializeStgSocket(io_context, storage_nodes[stg_node_id], curr_ip, port_no);
+    port_no++;
+    initializeIngSocket(io_context, ingestion_nodes[ing_node_id], curr_ip, port_no);
     port_no++;
-    initializeIngSocket(io_context, ingestion_nodes[ing_node_id], ip_addr, port_no);
 
     // Authenticate clients to their ingestion and storage servers
     struct timespec ep;
@@ -693,48 +725,35 @@ void generateClients(boost::asio::io_context &io_context,
     uint32_t num_clients_total = config.user_count;
     uint16_t num_stg_nodes = storage_nodes.size();
     uint16_t num_ing_nodes = ingestion_nodes.size();
-    uint8_t ip3 =0, ip4 = 0;
-    uint8_t ip2 = thread_no;
-    uint8_t ip1 = 127;
     uint16_t port_no = PORT_START;
-    std::string ip_address;
+
+    ip_addr curr_ip;
+    curr_ip.ip1 = 127;
+    curr_ip.ip2 = thread_no;
+    curr_ip.ip3 = 0;
+    curr_ip.ip4 = 0;
 
     for(uint32_t i=cstart; i<cstop; i++) {
         // Compute client's ip and port
 
 #ifdef CLIENT_UNIQUE_IP
-        if(port_no==PORT_END) {
+        if(port_no>=PORT_END) {
             port_no = PORT_START;
         }
-        ip4++;
-        if(ip4==0) {
-            ip3++;
-            if(ip3==0) {
-                ip2++;
-            }
-        }
+        curr_ip.increment(nthreads);
 #else
-        if(port_no==PORT_END) {
+        if(port_no>=PORT_END) {
             port_no = PORT_START;
-            ip4++;
-            if(ip4==0) {
-                ip3++;
-                if(ip3==0) {
-                    ip2++;
-                }
-            }
+            curr_ip.increment(nthreads);
         }
 #endif
-        ip_address = std::to_string(ip1) + "." + std::to_string(ip2) + "."
-            + std::to_string(ip3) + "." + std::to_string(ip4);
 
         uint16_t ing_no = i % num_ing_nodes;
         uint16_t stg_no = i % num_stg_nodes;
         uint16_t stg_node_id = storage_map[stg_no];
         uint16_t ing_node_id = ingestion_map[ing_no];
         clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
-            ip_address, port_no);
-        port_no++;
+            &curr_ip, port_no);
     }
 }
 
@@ -905,7 +924,6 @@ int main(int argc, char **argv)
     // Unbuffer stdout
     setbuf(stdout, NULL);
 
-    uint16_t nthreads = 1;
     const char *progname = argv[0];
     ++argv;
 

+ 27 - 5
Client/clients.hpp

@@ -2,8 +2,8 @@ typedef uint8_t token[SGX_AESGCM_MAC_SIZE];
 typedef uint8_t aes_key[SGX_AESGCM_KEY_SIZE];
 
 // #define VERBOSE_CLIENT
-// #define RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
-#define CLIENT_UNIQUE_IP
+#define RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
+// #define CLIENT_UNIQUE_IP
 
 #define PORT_START 32768
 #define PORT_END 65534
@@ -25,6 +25,27 @@ typedef uint8_t aes_key[SGX_AESGCM_KEY_SIZE];
         (and not sim_id)
 */
 
+struct ip_addr
+{
+    uint8_t ip1;
+    uint8_t ip2;
+    uint8_t ip3;
+    uint8_t ip4;
+
+    void increment(int nthreads) {
+        ip4++;
+        if(ip4==0) {
+            ip3++;
+            if(ip3==0) {
+                ip2+=nthreads;
+            }
+        }
+    }
+    std::string ip_str() {
+        return(std::to_string(ip1) + "." + std::to_string(ip2) + "."
+            + std::to_string(ip3) + "." + std::to_string(ip4));
+    }
+};
 
 /*
     Structure for capture each individual simulated client's state
@@ -71,9 +92,9 @@ private:
     void sendMessageBundle();
 
     void initializeIngSocket(boost::asio::io_context &ioc, NodeConfig &ing_server,
-        std::string raw_ip_addr, uint16_t &port_no);
+        ip_addr *curr_ip, uint16_t &port_no);
     void initializeStgSocket(boost::asio::io_context &ioc, NodeConfig &ing_server,
-        std::string raw_ip_addr, uint16_t &port_no);
+        ip_addr *curr_ip, uint16_t &port_no);
 
     void initClient(clientid_t cid, uint16_t stg_id, aes_key ikey, aes_key skey);
 
@@ -89,7 +110,8 @@ public:
     }
 
     void setup_client(boost::asio::io_context &ioc, uint32_t sim_id,
-        uint16_t ing_node_id, uint16_t stg_node_id, std::string, uint16_t pno);
+        uint16_t ing_node_id, uint16_t stg_node_id,
+        ip_addr *curr_ip, uint16_t &pno);
 
     void epoch_process();