Prechádzať zdrojové kódy

Plot the epochs, rounds, and precomputation phases

Also fix the sleeping logic so that the epochs start in sync
Ian Goldberg 1 rok pred
rodič
commit
d2b97fef39
3 zmenil súbory, kde vykonal 168 pridanie a 70 odobranie
  1. 46 5
      App/start.cpp
  2. 2 2
      logs_to_csv.py
  3. 120 63
      plot_traces

+ 46 - 5
App/start.cpp

@@ -30,8 +30,11 @@ class Epoch {
     #endif
 
     void round_cb(uint32_t round_num) {
+        struct timeval now;
+        gettimeofday(&now, NULL);
         if (round_num) {
-            printf("Round %u complete\n", round_num);
+            printf("%lu.%06lu: Round %u complete\n", now.tv_sec,
+                now.tv_usec, round_num);
 
             #ifdef DEBUG_PUB_TIMES
             struct timespec tp;
@@ -49,7 +52,8 @@ class Epoch {
                 proceed();
             });
         } else {
-            printf("Epoch %u complete\n", epoch_num);
+            printf("%lu.%06lu: Epoch %u complete\n", now.tv_sec,
+                now.tv_usec, epoch_num);
             {
                 std::lock_guard lk(m);
                 epoch_complete = true;
@@ -177,6 +181,12 @@ static unsigned long epoch_clients(NetIO &netio) {
     unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
     printf("Epoch end time = %lu\n", end);
 
+    if (FOREGROUND_PRECOMPUTE) {
+        struct timeval now;
+        gettimeofday(&now, NULL);
+        printf("%lu.%06lu: Begin Waksman networks precompute\n",
+            now.tv_sec, now.tv_usec);
+    }
     // Launch threads to refill the precomputed Waksman networks we used.
     size_t num_sizes = ecall_precompute_sort(-1);
     std::vector<boost::thread> ts;
@@ -197,6 +207,10 @@ static unsigned long epoch_clients(NetIO &netio) {
         for (auto& t: ts) {
             t.join();
         }
+        struct timeval now;
+        gettimeofday(&now, NULL);
+        printf("%lu.%06lu: End Waksman networks precompute\n",
+            now.tv_sec, now.tv_usec);
     }
     ++epoch_num;
     return end;
@@ -209,6 +223,9 @@ static void route_clients_test(NetIO &netio)
     unsigned long epoch_interval = epoch_duration * 1000000;
     printf("Epoch duration = %d\n", epoch_duration);
 
+    struct timeval exp_start;
+    gettimeofday(&exp_start, NULL);
+
     // Precompute some WaksmanNetworks
     size_t num_sizes = ecall_precompute_sort(-2);
     // Setting num_WN_per_size for background computation mode
@@ -218,6 +235,8 @@ static void route_clients_test(NetIO &netio)
         num_WN_per_size = 2;
     }
 
+    printf("%lu.%06lu: Begin Waksman networks precompute\n",
+        exp_start.tv_sec, exp_start.tv_usec);
     std::vector<boost::thread> ts;
     for (int i=0;i<int(num_sizes);++i) {
         for (int j=0; j<num_WN_per_size; ++j) {
@@ -229,12 +248,26 @@ static void route_clients_test(NetIO &netio)
     for (auto& t: ts) {
         t.join();
     }
+    struct timeval exp_now;
+    gettimeofday(&exp_now, NULL);
+    printf("%lu.%06lu: End Waksman networks precompute\n",
+        exp_now.tv_sec, exp_now.tv_usec);
 
     // Sleep one epoch_interval for clients to connect
-    usleep((useconds_t) epoch_interval);
+    long remaining_us =
+        epoch_interval
+        - (exp_now.tv_sec - exp_start.tv_sec) * 1000000
+        - (exp_now.tv_usec - exp_start.tv_usec);
+    if (remaining_us > 0) {
+        usleep((useconds_t) remaining_us);
+    }
 
     // Run epoch
     for (int i=1; i<=num_epochs; ++i) {
+        struct timeval epoch_start;
+        gettimeofday(&epoch_start, NULL);
+        printf("%ld.%06ld: Epoch %d start\n", epoch_start.tv_sec,
+            epoch_start.tv_usec, i);
         struct timespec tp;
         clock_gettime(CLOCK_REALTIME_COARSE, &tp);
         unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
@@ -253,9 +286,17 @@ static void route_clients_test(NetIO &netio)
         printf("bytes_sent = %ld\n", netio.reset_bytes_sent());
         printf("Epoch %d time: %lu.%06lu s\n", i, diff/1000000, diff%1000000);
 
+        struct timeval now;
+        gettimeofday(&now, NULL);
+        remaining_us = epoch_interval
+            - (now.tv_sec - epoch_start.tv_sec) * 1000000
+            - (now.tv_usec - epoch_start.tv_usec);
+
         // Sleep for the rest of the epoch interval
-        if (diff < epoch_interval) {
-            usleep((useconds_t) epoch_interval - (useconds_t) diff);
+        printf("%lu.%06lu: Sleeping for %ld us\n", now.tv_sec,
+            now.tv_usec, remaining_us);
+        if (remaining_us > 0) {
+            usleep((useconds_t)remaining_us);
         }
 
     }

+ 2 - 2
logs_to_csv.py

@@ -61,7 +61,7 @@ def parse_output_logs(LOGS_FOLDER, experiment_name, generate_csv = False, op_fil
                 num_sizes = int(value)
 
             elif(state == "ROUND1"):
-                if("Round 1" in line):
+                if("Round 1 " in line):
                     state = "PADDING"
                     #print("R1: " + str(line_cnt))
 
@@ -142,7 +142,7 @@ def parse_output_logs(LOGS_FOLDER, experiment_name, generate_csv = False, op_fil
         else:
             return(num_sizes, pwn_max, epoch_max, scm_max)
     else:
-        print("No valid logs for %s" % name)
+        print("No valid logs for %s" % LOGS_FOLDER)
 
 
 if __name__ == "__main__":

+ 120 - 63
plot_traces

@@ -25,87 +25,121 @@ nodelogs.sort(key=lambda filename : int(filename[1:-4]))
 
 # Pass 1: For each sender and receiver, make a list of each message
 # queued from that sender to receiver, noting its queue start time,
-# queue end time, size, and type
+# queue end time, size, and type.  Also gather all timestamp labels for
+# each node
 
 queued_messages = {}
+labels = {}
 min_ts = None
+max_ts = None
 
 for nodelog in nodelogs:
     node = nodelog[:-4]
+    labels[node] = []
     with open(nodelog) as logf:
         queueing = {}
         for logline in logf:
             logline = logline.rstrip()
-            if "RTE" in logline:
-                if "queueing" in logline:
-                    matches = re.match(
-                        r'(\d+\.\d+): RTE queueing (\d+) bytes to (\S+)',
-                        logline)
-                    [ts, size, recv] = matches.groups()
-                    assert(recv not in queueing)
-                    tsf = float(ts)
-                    queueing[recv] = \
-                        { 'queue_start': tsf, 'size': size }
-                    if min_ts is None or min_ts > tsf:
-                        min_ts = tsf
-                if "queued" in logline:
-                    matches = re.match(
-                        r'(\d+\.\d+): RTE queued (\d+) bytes to (\S+)',
-                        logline)
-                    [ts, size, recv] = matches.groups()
-                    assert(recv in queueing)
-                    assert(queueing[recv]['size'] == size)
-                    if (node, recv) not in queued_messages:
-                        queued_messages[(node, recv)] = []
-                    msg = {
-                        'queue_start': queueing[recv]['queue_start'],
-                        'queue_end': float(ts),
-                        'size': size,
-                        'type': 'RTE',
-                    }
-                    queued_messages[(node, recv)].append(msg)
-                    del queueing[recv]
+            if matches := re.match(
+                    r'(\d+\.\d+): RTE queueing (\d+) bytes to (\S+)',
+                    logline):
+                [ts, size, recv] = matches.groups()
+                assert(recv not in queueing)
+                tsf = float(ts)
+                queueing[recv] = \
+                    { 'queue_start': tsf, 'size': size }
+                if min_ts is None or min_ts > tsf:
+                    min_ts = tsf
+            elif matches := re.match(
+                    r'(\d+\.\d+): RTE queued (\d+) bytes to (\S+)',
+                    logline):
+                [ts, size, recv] = matches.groups()
+                assert(recv in queueing)
+                assert(queueing[recv]['size'] == size)
+                if (node, recv) not in queued_messages:
+                    queued_messages[(node, recv)] = []
+                msg = {
+                    'queue_start': queueing[recv]['queue_start'],
+                    'queue_end': float(ts),
+                    'size': size,
+                    'type': 'RTE',
+                }
+                queued_messages[(node, recv)].append(msg)
+                del queueing[recv]
+            elif matches := re.match(
+                    r'(\d+\.\d+): Epoch \d+ (start|complete)',
+                    logline):
+                [ts, typ] = matches.groups()
+                tsf = float(ts)
+                if typ == 'start':
+                    typ = 'S'
+                elif typ == 'complete':
+                    typ = 'F'
+                labels[node].append((tsf, typ))
+                if max_ts is None or max_ts < tsf:
+                    max_ts = tsf
+            elif matches := re.match(
+                    r'(\d+\.\d+): Round (.*) complete',
+                    logline):
+                [ts, rnd] = matches.groups()
+                tsf = float(ts)
+                if rnd == '11':
+                    rnd = 'A'
+                elif rnd == '12':
+                    rnd = 'B'
+                elif rnd == '13':
+                    rnd = 'C'
+                labels[node].append((tsf, rnd))
+                if max_ts is None or max_ts < tsf:
+                    max_ts = tsf
+            elif matches := re.match(
+                    r'(\d+\.\d+): (Begin|End) Waksman networks precompute',
+                    logline):
+                [ts, typ] = matches.groups()
+                tsf = float(ts)
+                if typ == 'Begin':
+                    typ = 'P'
+                elif typ == 'End':
+                    typ = 'W'
+                labels[node].append((tsf, typ))
+                if max_ts is None or max_ts < tsf:
+                    max_ts = tsf
 
 # Pass 2: For each sender and receiver, note the receive start time and
 # receive end time for each message in the queued_messages list
 
 messages = {}
 
-max_ts = None
-
 for nodelog in nodelogs:
     node = nodelog[:-4]
     with open(nodelog) as logf:
         receiving = {}
         for logline in logf:
             logline = logline.rstrip()
-            if "RTE" in logline:
-                if "receiving" in logline:
-                    matches = re.match(
-                        r'(\d+\.\d+): RTE receiving (\d+) bytes from (\S+)',
-                        logline)
-                    [ts, size, snd] = matches.groups()
-                    assert(snd not in receiving)
-                    receiving[snd] = \
-                        { 'recv_start': float(ts), 'size': size }
-                if "received" in logline:
-                    matches = re.match(
-                        r'(\d+\.\d+): RTE received (\d+) bytes from (\S+)',
-                        logline)
-                    [ts, size, snd] = matches.groups()
-                    assert(snd in receiving)
-                    assert(receiving[snd]['size'] == size)
-                    assert(queued_messages[(snd, node)][0]['size'] == size)
-                    tsf = float(ts)
-                    if (snd, node) not in messages:
-                        messages[(snd, node)] = []
-                    msg = queued_messages[(snd, node)].pop(0)
-                    msg['recv_start'] = receiving[snd]['recv_start']
-                    msg['recv_end'] = tsf
-                    if max_ts is None or max_ts < tsf:
-                        max_ts = tsf
-                    messages[(snd, node)].append(msg)
-                    del receiving[snd]
+            if matches := re.match(
+                    r'(\d+\.\d+): RTE receiving (\d+) bytes from (\S+)',
+                    logline):
+                [ts, size, snd] = matches.groups()
+                assert(snd not in receiving)
+                receiving[snd] = \
+                    { 'recv_start': float(ts), 'size': size }
+            elif matches := re.match(
+                    r'(\d+\.\d+): RTE received (\d+) bytes from (\S+)',
+                    logline):
+                [ts, size, snd] = matches.groups()
+                assert(snd in receiving)
+                assert(receiving[snd]['size'] == size)
+                assert(queued_messages[(snd, node)][0]['size'] == size)
+                tsf = float(ts)
+                if (snd, node) not in messages:
+                    messages[(snd, node)] = []
+                msg = queued_messages[(snd, node)].pop(0)
+                msg['recv_start'] = receiving[snd]['recv_start']
+                msg['recv_end'] = tsf
+                if max_ts is None or max_ts < tsf:
+                    max_ts = tsf
+                messages[(snd, node)].append(msg)
+                del receiving[snd]
 
 # Write a latex file that draws the messages
 
@@ -115,10 +149,14 @@ timescale = 2
 # Nodescale (cm between nodes)
 nodescale = 1
 
+# Seconds between time label ticks
+time_tick = 1
+
 with open("trace.tex", "w") as tf:
     print(r'''\documentclass{article}
 \usepackage[paperwidth=%fcm,paperheight=%fcm,margin=1cm]{geometry}
 \usepackage{tikz}
+\usepackage{times}
 \setlength\parindent{0pt}
 \pagestyle{empty}
 \begin{document}
@@ -127,19 +165,31 @@ with open("trace.tex", "w") as tf:
 
     nodenum = 0
     nodepos = {}
+
+    print(r'''\draw[thick] (0,0) -- ++(%lf,0);''' %
+        ((max_ts-min_ts)*timescale), file=tf)
+    ts=0
+    while ts<(max_ts-min_ts):
+        print(r'''\draw [thick] (%lf,.1) node [anchor=south] { %s } -- ++(0,-.1);''' %
+            (ts*timescale, str(ts)), file=tf)
+        ts+=time_tick
+
     for nodelog in nodelogs:
         node = nodelog[:-4]
         nodenum += 1
         nodepos[node] = -nodenum * nodescale
-        print(r'''\node [anchor=east] at (0,%f) { %s };
-\draw[thick] (0,%f) -- ++(%f,0);''' %
+        print(r'''\node [anchor=east] at (0,%lf) { %s };
+\draw[thick] (0,%lf) -- ++(%lf,0);''' %
             (nodepos[node], node, nodepos[node],
             (max_ts-min_ts)*timescale), file=tf)
 
     for (snd,recv) in messages:
         for msg in messages[(snd,recv)]:
-            print(msg)
-            print(r'''\fill [fill=%s,fill opacity=.2] (%f,%f) -- (%f,%f) -- (%f,%f) -- (%f,%f) -- cycle;''' %
+            print(r'''%% %s %s %s %lf %lf %lf %lf''' % (snd, recv,
+                msg['size'], msg['queue_start'], msg['queue_end'],
+                msg['recv_start'], msg['recv_end']), file=tf)
+            print(r'''\fill [fill=%s,fill opacity=.2] (%lf,%lf) --
+            (%lf,%lf) -- (%lf,%lf) -- (%lf,%lf) -- cycle;''' %
                 ('black',
                 (msg['queue_start']-min_ts)*timescale, nodepos[snd],
                 (msg['queue_end']-min_ts)*timescale, nodepos[snd],
@@ -147,6 +197,13 @@ with open("trace.tex", "w") as tf:
                 (msg['recv_start']-min_ts)*timescale, nodepos[recv]),
                 file=tf)
 
+    for node in labels:
+        for (ts,label) in labels[node]:
+            print(r'''%% %lf''' % ts, file=tf)
+            print(r'''\draw [thick] (%lf,%lf) node [anchor=south] { %s } -- ++(0,-.1);''' %
+                ((ts-min_ts)*timescale, nodepos[node]+.1, label),
+                file=tf)
+
     print(r'''\end{tikzpicture}
 \end{document}''', file=tf)