Browse Source

Add a VERBOSE_COMMS compile-time flag

With this flag enabled (in the Makefile), all communication is logged to
stdout
Ian Goldberg 2 years ago
parent
commit
6cbcbc1e39
3 changed files with 48 additions and 10 deletions
  1. 3 0
      Makefile
  2. 37 8
      mpcio.cpp
  3. 8 2
      mpcio.hpp

+ 3 - 0
Makefile

@@ -4,6 +4,9 @@ CXXFLAGS=-march=native -std=c++17 -Wall -Wno-ignored-attributes -ggdb -O3
 LDFLAGS=-ggdb
 LDLIBS=-lbsd -lboost_system -lboost_context -lboost_chrono -lboost_thread -lpthread
 
+# Enable this to have all communication logged to stdout
+CXXFLAGS += -DVERBOSE_COMMS
+
 BIN=prac
 SRCS=prac.cpp mpcio.cpp preproc.cpp online.cpp mpcops.cpp rdpf.cpp \
     cdpf.cpp

+ 37 - 8
mpcio.cpp

@@ -94,6 +94,15 @@ size_t MPCSingleIO::queue(const void *data, size_t len, lamport_t lamport)
         newmsg = 1;
     }
 
+#ifdef VERBOSE_COMMS
+    printf("Queue %s.%d len=%lu lamp=%u: ", dest.c_str(), thread_num,
+        len, message_lamport.value());
+    for (size_t i=0;i<len;++i) {
+        printf("%02x", ((const unsigned char*)data)[i]);
+    }
+    printf("\n");
+#endif
+
     // If we already have some full packets worth of data, may as
     // well send it.
     if (dataqueue.size() > 28800) {
@@ -147,9 +156,16 @@ void MPCSingleIO::send(bool implicit_send)
 
 size_t MPCSingleIO::recv(void *data, size_t len, lamport_t &lamport)
 {
+#ifdef VERBOSE_COMMS
+    size_t orig_len = len;
+    printf("Recv %s.%d len=%lu lamp=%u ", dest.c_str(), thread_num,
+        len, lamport);
+#endif
+
 #ifdef SEND_LAMPORT_CLOCKS
     char *cdata = (char *)data;
     size_t res = 0;
+
     while (len > 0) {
         while (recvdataremain == 0) {
             // Read a new header
@@ -183,6 +199,13 @@ size_t MPCSingleIO::recv(void *data, size_t len, lamport_t &lamport)
 #else
     size_t res = boost::asio::read(sock, boost::asio::buffer(data, len));
 #endif
+#ifdef VERBOSE_COMMS
+    printf("nlamp=%u: ", lamport);
+    for (size_t i=0;i<orig_len;++i) {
+        printf("%02x", ((const unsigned char*)data)[i]);
+    }
+    printf("\n");
+#endif
 #ifdef RECORD_IOTRACE
     iotrace.push_back(-(ssize_t(res)));
 #endif
@@ -278,11 +301,11 @@ MPCPeerIO::MPCPeerIO(unsigned player, ProcessingMode mode,
     for (unsigned i=0; i<num_threads; ++i) {
         cdpfs.emplace_back(player, mode, "cdpf", i);
     }
-    for (auto &&sock : peersocks) {
-        peerios.emplace_back(std::move(sock));
+    for (unsigned i=0; i<num_threads; ++i) {
+        peerios.emplace_back(std::move(peersocks[i]), "peer", i);
     }
-    for (auto &&sock : serversocks) {
-        serverios.emplace_back(std::move(sock));
+    for (unsigned i=0; i<num_threads; ++i) {
+        serverios.emplace_back(std::move(serversocks[i]), "srv", i);
     }
 }
 
@@ -338,11 +361,11 @@ MPCServerIO::MPCServerIO(ProcessingMode mode,
                 "rdpf", i, depth);
         }
     }
-    for (auto &&sock : p0socks) {
-        p0ios.emplace_back(std::move(sock));
+    for (unsigned i=0; i<num_threads; ++i) {
+        p0ios.emplace_back(std::move(p0socks[i]), "p0", i);
     }
-    for (auto &&sock : p1socks) {
-        p1ios.emplace_back(std::move(sock));
+    for (unsigned i=0; i<num_threads; ++i) {
+        p1ios.emplace_back(std::move(p1socks[i]), "p1", i);
     }
 }
 
@@ -382,6 +405,9 @@ void MPCServerIO::dump_stats(std::ostream &os)
 MPCTIO::MPCTIO(MPCIO &mpcio, int thread_num) :
         thread_num(thread_num), thread_lamport(mpcio.lamport),
         mpcio(mpcio)
+#ifdef VERBOSE_COMMS
+        , round_num(0)
+#endif
 {
     if (mpcio.player < 2) {
         MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
@@ -520,6 +546,9 @@ size_t MPCTIO::recv_p1(void *data, size_t len)
 
 void MPCTIO::send()
 {
+#ifdef VERBOSE_COMMS
+    printf("Thread %u sending round %lu\n", thread_num, ++round_num);
+#endif
     if (mpcio.player < 2) {
         MPCPeerIO &mpcpio = static_cast<MPCPeerIO&>(mpcio);
         mpcpio.peerios[thread_num].send();

+ 8 - 2
mpcio.hpp

@@ -75,6 +75,8 @@ struct MessageWithHeader {
 class MPCSingleIO {
     tcp::socket sock;
     size_t totread, totwritten;
+    std::string dest;
+    int thread_num;
 #ifdef RECORD_IOTRACE
     std::vector<ssize_t> iotrace;
 #endif
@@ -143,8 +145,9 @@ class MPCSingleIO {
     void async_send_from_msgqueue();
 
 public:
-    MPCSingleIO(tcp::socket &&sock) :
-        sock(std::move(sock)), totread(0), totwritten(0)
+    MPCSingleIO(tcp::socket &&sock, const char *dest, int thread_num) :
+        sock(std::move(sock)), totread(0), totwritten(0), dest(dest),
+        thread_num(thread_num)
 #ifdef SEND_LAMPORT_CLOCKS
         , recvdataremain(0)
 #endif
@@ -278,6 +281,9 @@ class MPCTIO {
     std::optional<MPCSingleIOStream> server_iostream;
     std::optional<MPCSingleIOStream> p0_iostream;
     std::optional<MPCSingleIOStream> p1_iostream;
+#ifdef VERBOSE_COMMS
+    size_t round_num;
+#endif
 
 public:
     MPCTIO(MPCIO &mpcio, int thread_num);