Browse Source

Minor tweaks to client simulator (IP address and port ranges). Client launch tweaked to store logs for experiment scripting. Client simulator is tailored for 8 threads for now.

Sajin 1 year ago
parent
commit
cdd53b1052
3 changed files with 41 additions and 18 deletions
  1. 21 12
      Client/clientlaunch
  2. 18 4
      Client/clients.cpp
  3. 2 2
      Client/clients.hpp

+ 21 - 12
Client/clientlaunch

@@ -21,21 +21,31 @@ PUBKEYS = "./../App/pubkeys.yaml"
 CLIENTS = "./clients"
 
 # Client thread allocation
-prefix = "numactl -C24-31 "
+prefix = "numactl -C36-39,76-79 "
 
-def launch(config, cmd, threads):
+def launch(config, cmd, threads, ip_start, lgfile):
     cmdline = ''
     cmdline += prefix + CLIENTS + " -t " + str(threads) + ""
+
+    stdout_file = subprocess.PIPE
+    if(lgfile):
+        stdout_file = open(lgfile, "a+")
+
     proc = subprocess.Popen(shlex.split(cmdline) + cmd,
-        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+        stdin=subprocess.PIPE, stdout=stdout_file,
         stderr=subprocess.STDOUT, bufsize=0)
     print(cmdline)
     proc.stdin.write(config.encode('utf-8'))
-    while True:
-        line = proc.stdout.readline()
-        if not line:
-            break
-        print(line.decode('utf-8'), end='', flush=True)
+
+    if(lgfile):
+        proc.wait()
+        stdout_file.close()
+    else:
+        while True:
+            line = proc.stdout.readline()
+            if not line:
+                break
+            print(line.decode('utf-8'), end='', flush=True)
 
 
 if __name__ == "__main__":
@@ -61,6 +71,8 @@ if __name__ == "__main__":
         help='override max number of outgoing public messages per user per epoch')
     aparse.add_argument('-c', default=None,
         help='override max number of incoming public messages per user per epoch')
+    aparse.add_argument('-l', default=None,
+        help='log file to store client simulator log for an experiment')
     aparse.add_argument('-n', nargs='*', help='nodes to include')
     aparse.add_argument('cmd', nargs='*', help='experiment to run')
     args = aparse.parse_args()
@@ -86,7 +98,4 @@ if __name__ == "__main__":
     # Now add a trailing newline
     config += "\n"
 
-    thread = threading.Thread(target=launch,
-        args=(config, args.cmd, args.t))
-    thread.start()
-    thread.join()
+    launch(config, args.cmd, args.t, args.q, args.l)

+ 18 - 4
Client/clients.cpp

@@ -406,6 +406,7 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport));
         // just for printing
         // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
+
         storage_sock->connect(stg_ep, err);
         if (!err) {
             break;
@@ -713,7 +714,7 @@ void Client::setup_client(boost::asio::io_context &io_context,
     struct timespec ep;
     clock_gettime(CLOCK_REALTIME_COARSE, &ep);
     unsigned long time_in_ns = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
-    unsigned long epoch_no = CEILDIV(time_in_ns, EPOCH_INTERVAL);
+    unsigned long epoch_no = CEILDIV(time_in_ns, 5000000);
     sendStgAuthMessage(epoch_no);
     sendIngAuthMessage(epoch_no);
     epoch_process();
@@ -729,7 +730,7 @@ void generateClients(boost::asio::io_context &io_context,
 
     ip_addr curr_ip;
     curr_ip.ip1 = 127;
-    curr_ip.ip2 = thread_no;
+    curr_ip.ip2 = 1 + thread_no;
     curr_ip.ip3 = 0;
     curr_ip.ip4 = 0;
 
@@ -755,6 +756,7 @@ void generateClients(boost::asio::io_context &io_context,
         clients[i].setup_client(io_context, i, ing_node_id, stg_node_id,
             &curr_ip, port_no);
     }
+    printf("Done with all client_setup calls. Thread_no = %d\n", thread_no);
 }
 
 /*
@@ -935,7 +937,7 @@ int main(int argc, char **argv)
             }
             nthreads = uint16_t(atoi(argv[1]));
             argv += 2;
-        } else {
+        }  else {
             usage(progname);
         }
     }
@@ -969,11 +971,23 @@ int main(int argc, char **argv)
 
     });
 
-    // Start another thread; one will perform the work and the other
+    // Start background threads; one will perform the work and the other
     // will execute the async_write handlers
+    // TODO: Cleanup and distribute this based on nthreads.
+    // Currently assumes 8 threads are available for client simulator.
     boost::thread t([&]{io_context.run();});
+    boost::thread t2([&]{io_context.run();});
+    boost::thread t3([&]{io_context.run();});
+    boost::thread t4([&]{io_context.run();});
+    boost::thread t5([&]{io_context.run();});
+    boost::thread t6([&]{io_context.run();});
     io_context.run();
     t.join();
+    t2.join();
+    t3.join();
+    t4.join();
+    t5.join();
+    t6.join();
 
     delete [] clients;
 }

+ 2 - 2
Client/clients.hpp

@@ -5,8 +5,8 @@ typedef uint8_t aes_key[SGX_AESGCM_KEY_SIZE];
 #define RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
 // #define CLIENT_UNIQUE_IP
 
-#define PORT_START 32768
-#define PORT_END 65534
+#define PORT_START 32769
+#define PORT_END 60000
 
 
 /*