Browse Source

Gather and dump stats on number of messages, number of bytes, wall clock and CPU times, Lamport time

Ian Goldberg 2 years ago
parent
commit
099228a946
4 changed files with 76 additions and 16 deletions
  1. 1 1
      Makefile
  2. 73 14
      mpcio.hpp
  3. 2 0
      oblivds.cpp
  4. 0 1
      online.cpp

+ 1 - 1
Makefile

@@ -2,7 +2,7 @@ all: oblivds
 
 CXXFLAGS=-std=c++17 -Wall -ggdb
 LDFLAGS=-ggdb
-LDLIBS=-lbsd -lboost_system -lboost_context -lboost_thread -lpthread
+LDLIBS=-lbsd -lboost_system -lboost_context -lboost_chrono -lboost_thread -lpthread
 
 BIN=oblivds
 SRCS=oblivds.cpp mpcio.cpp preproc.cpp online.cpp mpcops.cpp

+ 73 - 14
mpcio.hpp

@@ -13,6 +13,7 @@
 
 #include <boost/asio.hpp>
 #include <boost/thread.hpp>
+#include <boost/chrono.hpp>
 
 #include "types.hpp"
 
@@ -122,6 +123,9 @@ class MPCSingleIO {
     // iff messagequeue is nonempty.
 #ifdef SEND_LAMPORT_CLOCKS
     std::queue<MessageWithHeader> messagequeue;
+#else
+    std::queue<std::string> messagequeue;
+#endif
 
     // If a single message is broken into chunks in order to get the
     // first part of it out on the wire while the rest of it is still
@@ -133,9 +137,6 @@ class MPCSingleIO {
     // the current lamport clock when that first queue() after each
     // explicit send() happens.
     opt_lamport_t message_lamport;
-#else
-    std::queue<std::string> messagequeue;
-#endif
 
 #ifdef SEND_LAMPORT_CLOCKS
     // If Lamport clocks are being sent, then the data stream is divided
@@ -183,10 +184,13 @@ public:
     MPCSingleIO(tcp::socket &&sock) :
         sock(std::move(sock)), totread(0), totwritten(0) {}
 
-    void queue(const void *data, size_t len, lamport_t lamport) {
+    // Returns 1 if a new message is started, 0 otherwise
+    size_t queue(const void *data, size_t len, lamport_t lamport) {
+        // Is this a new message?
+        size_t newmsg = 0;
+
         dataqueue.append((const char *)data, len);
 
-#ifdef SEND_LAMPORT_CLOCKS
         // If this is the first queue() since the last explicit send(),
         // which we'll know because message_lamport will be nullopt, set
         // message_lamport to the current Lamport clock.  Note that the
@@ -194,14 +198,16 @@ public:
         // whether its value is zero.
         if (!message_lamport) {
             message_lamport = lamport;
+            newmsg = 1;
         }
-#endif
 
         // If we already have some full packets worth of data, may as
         // well send it.
         if (dataqueue.size() > 28800) {
             send(true);
         }
+
+        return newmsg;
     }
 
     void send(bool implicit_send = false) {
@@ -314,10 +320,55 @@ public:
 struct MPCIO {
     int player;
     bool preprocessing;
+    size_t num_threads;
     atomic_lamport_t lamport;
+    std::vector<size_t> msgs_sent;
+    std::vector<size_t> msg_bytes_sent;
+    boost::chrono::steady_clock::time_point steady_start;
+    boost::chrono::process_cpu_clock::time_point cpu_start;
+
+    MPCIO(int player, bool preprocessing, size_t num_threads) :
+        player(player), preprocessing(preprocessing),
+        num_threads(num_threads), lamport(0)
+    {
+        reset_stats();
+    }
+
+    void reset_stats()
+    {
+        msgs_sent.clear();
+        msg_bytes_sent.clear();
+        for (size_t i=0; i<num_threads; ++i) {
+            msgs_sent.push_back(0);
+            msg_bytes_sent.push_back(0);
+        }
+        steady_start = boost::chrono::steady_clock::now();
+        cpu_start = boost::chrono::process_cpu_clock::now();
+    }
 
-    MPCIO(int player, bool preprocessing) :
-        player(player), preprocessing(preprocessing), lamport(0) {}
+    void dump_stats(std::ostream &os)
+    {
+        size_t tot_msgs_sent = 0;
+        size_t tot_msg_bytes_sent = 0;
+        for (auto& n : msgs_sent) {
+            tot_msgs_sent += n;
+        }
+        for (auto& n : msg_bytes_sent) {
+            tot_msg_bytes_sent += n;
+        }
+        auto steady_elapsed =
+            boost::chrono::steady_clock::now() - steady_start;
+        auto cpu_elapsed =
+            boost::chrono::process_cpu_clock::now() - cpu_start;
+
+        os << tot_msgs_sent << " messages sent\n";
+        os << tot_msg_bytes_sent << " message bytes sent\n";
+        os << lamport << " Lamport clock (latencies)\n";
+        os << boost::chrono::duration_cast
+            <boost::chrono::milliseconds>(steady_elapsed) <<
+            " wall clock time\n";
+        os << cpu_elapsed << " {real;user;system}\n";
+    }
 };
 
 // A class to represent all of a computation peer's IO, either to other
@@ -335,7 +386,7 @@ struct MPCPeerIO : public MPCIO {
     MPCPeerIO(unsigned player, bool preprocessing,
             std::deque<tcp::socket> &peersocks,
             std::deque<tcp::socket> &serversocks) :
-        MPCIO(player, preprocessing)
+        MPCIO(player, preprocessing, peersocks.size())
     {
         unsigned num_threads = unsigned(peersocks.size());
         for (unsigned i=0; i<num_threads; ++i) {
@@ -383,7 +434,7 @@ struct MPCServerIO : public MPCIO {
     MPCServerIO(bool preprocessing,
             std::deque<tcp::socket> &p0socks,
             std::deque<tcp::socket> &p1socks) :
-        MPCIO(2, preprocessing)
+        MPCIO(2, preprocessing, p0socks.size())
     {
         for (auto &&sock : p0socks) {
             p0ios.emplace_back(std::move(sock));
@@ -447,14 +498,18 @@ public:
     void queue_peer(const void *data, size_t len) {
         if (mpcio.player < 2) {
             MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
-            mpcpio.peerios[thread_num].queue(data, len, thread_lamport);
+            size_t newmsg = mpcpio.peerios[thread_num].queue(data, len, thread_lamport);
+            mpcpio.msgs_sent[thread_num] += newmsg;
+            mpcpio.msg_bytes_sent[thread_num] += len;
         }
     }
 
     void queue_server(const void *data, size_t len) {
         if (mpcio.player < 2) {
             MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
-            mpcpio.serverios[thread_num].queue(data, len, thread_lamport);
+            size_t newmsg = mpcpio.serverios[thread_num].queue(data, len, thread_lamport);
+            mpcpio.msgs_sent[thread_num] += newmsg;
+            mpcpio.msg_bytes_sent[thread_num] += len;
         }
     }
 
@@ -481,14 +536,18 @@ public:
     void queue_p0(const void *data, size_t len) {
         if (mpcio.player == 2) {
             MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
-            mpcsrvio.p0ios[thread_num].queue(data, len, thread_lamport);
+            size_t newmsg = mpcsrvio.p0ios[thread_num].queue(data, len, thread_lamport);
+            mpcsrvio.msgs_sent[thread_num] += newmsg;
+            mpcsrvio.msg_bytes_sent[thread_num] += len;
         }
     }
 
     void queue_p1(const void *data, size_t len) {
         if (mpcio.player == 2) {
             MPCServerIO &mpcsrvio = static_cast<MPCServerIO&>(mpcio);
-            mpcsrvio.p1ios[thread_num].queue(data, len, thread_lamport);
+            size_t newmsg = mpcsrvio.p1ios[thread_num].queue(data, len, thread_lamport);
+            mpcsrvio.msgs_sent[thread_num] += newmsg;
+            mpcsrvio.msg_bytes_sent[thread_num] += len;
         }
     }
 

+ 2 - 0
oblivds.cpp

@@ -41,6 +41,7 @@ static void comp_player_main(boost::asio::io_context &io_context,
     boost::thread t([&]{io_context.run();});
     io_context.run();
     t.join();
+    mpcio.dump_stats(std::cout);
 }
 
 static void server_player_main(boost::asio::io_context &io_context,
@@ -66,6 +67,7 @@ static void server_player_main(boost::asio::io_context &io_context,
     boost::thread t([&]{io_context.run();});
     io_context.run();
     t.join();
+    mpcserverio.dump_stats(std::cout);
 }
 
 int main(int argc, char **argv)

+ 0 - 1
online.cpp

@@ -120,7 +120,6 @@ static void lamport_test(MPCIO &mpcio, int num_threads, char **args)
         });
     }
     pool.join();
-    std::cout << "Lamport clock = " << mpcio.lamport << "\n";
 }
 
 void online_main(MPCIO &mpcio, int num_threads, char **args)