瀏覽代碼

Update the Lamport clock on received messages

But this isn't quite right, as the next test will show
Ian Goldberg 1 年之前
父節點
當前提交
198ce0e216
共有 3 個文件被更改,包括 31 次插入7 次删除
  1. 27 7
      mpcio.hpp
  2. 2 0
      online.cpp
  3. 2 0
      preproc.cpp

+ 27 - 7
mpcio.hpp

@@ -65,13 +65,14 @@ void PreCompStorage<T>::get(T& nextval) {
 // clock member of MPCIO to the IO functions for simplicity, but they're
 // ignored if this isn't defined
 #define SEND_LAMPORT_CLOCKS
-using lamport_t = std::atomic<uint32_t>;
+using lamport_t = uint32_t;
+using atomic_lamport_t = std::atomic<lamport_t>;
 #ifdef SEND_LAMPORT_CLOCKS
 struct MessageWithHeader {
     std::string header;
     std::string message;
 
-    MessageWithHeader(std::string &&msg, lamport_t &lamport) :
+    MessageWithHeader(std::string &&msg, lamport_t lamport) :
         message(std::move(msg)) {
             char hdr[sizeof(uint32_t) + sizeof(lamport_t)];
             uint32_t msglen = uint32_t(message.size());
@@ -169,7 +170,7 @@ public:
     MPCSingleIO(tcp::socket &&sock) :
         sock(std::move(sock)), totread(0), totwritten(0) {}
 
-    void queue(const void *data, size_t len, lamport_t &lamport) {
+    void queue(const void *data, size_t len, lamport_t lamport) {
         dataqueue.append((const char *)data, len);
 
         // If we already have some full packets worth of data, may as
@@ -179,7 +180,7 @@ public:
         }
     }
 
-    void send(lamport_t &lamport) {
+    void send(lamport_t lamport) {
         size_t thissize = dataqueue.size();
         // Ignore spurious calls to send()
         if (thissize == 0) return;
@@ -204,7 +205,7 @@ public:
         messagequeuelock.unlock();
     }
 
-    size_t recv(void *data, size_t len, lamport_t &lamport) {
+    size_t recv(void *data, size_t len, atomic_lamport_t &lamport) {
 #ifdef SEND_LAMPORT_CLOCKS
         char *cdata = (char *)data;
         size_t res = 0;
@@ -213,10 +214,29 @@ public:
                 // Read a new header
                 char hdr[sizeof(uint32_t) + sizeof(lamport_t)];
                 uint32_t datalen;
-                lamport_t::value_type recv_lamport;
+                lamport_t recv_lamport;
                 boost::asio::read(sock, boost::asio::buffer(hdr, sizeof(hdr)));
                 memmove(&datalen, hdr, sizeof(datalen));
                 memmove(&recv_lamport, hdr+sizeof(datalen), sizeof(lamport_t));
+                // Update our Lamport time to be max of recv_lamport+1
+                // and what we thought it was before.  We use this
+                // compare_exchange construction in order to atomically
+                // do the comparison, computation, and replacement
+                lamport_t old_lamport = lamport;
+                lamport_t new_lamport = recv_lamport + 1;
+                do {
+                    if (new_lamport < old_lamport) {
+                        new_lamport = old_lamport;
+                    }
+                // The next line atomically checks if lamport still has
+                // the value old_lamport; if so, it changes its value to
+                // new_lamport and returns true (ending the loop).  If
+                // not, it sets old_lamport to the current value of
+                // lamport, and returns false (continuing the loop so
+                // that new_lamport can be recomputed based on this new
+                // value).
+                } while (!lamport.compare_exchange_weak(
+                    old_lamport, new_lamport));
                 if (datalen > 0) {
                     recvdata.resize(datalen, '\0');
                     boost::asio::read(sock, boost::asio::buffer(recvdata));
@@ -269,7 +289,7 @@ public:
 struct MPCIO {
     int player;
     bool preprocessing;
-    lamport_t lamport;
+    atomic_lamport_t lamport;
 
     MPCIO(int player, bool preprocessing) :
         player(player), preprocessing(preprocessing), lamport(0) {}

+ 2 - 0
online.cpp

@@ -73,6 +73,8 @@ static void online_test(MPCIO &mpcio, int num_threads, char **args)
         mpcpio.dump_precomp_stats(std::cout);
     }
 
+    std::cout << "Lamport clock = " << mpcio.lamport << "\n";
+
     delete[] A;
 }
 

+ 2 - 0
preproc.cpp

@@ -77,6 +77,7 @@ void preprocessing_comp(MPCIO &mpcio, int num_threads, char **args)
         });
     }
     pool.join();
+    std::cout << "Lamport clock = " << mpcio.lamport << "\n";
 }
 
 void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
@@ -147,4 +148,5 @@ void preprocessing_server(MPCServerIO &mpcsrvio, int num_threads, char **args)
         });
     }
     pool.join();
+    std::cout << "Lamport clock = " << mpcsrvio.lamport << "\n";
 }