Ver código fonte

Clean up the log parser and have it record Waksman network creation times as well

Ian Goldberg 1 ano atrás
pai
commit
1c6525f520
1 arquivos alterados com 108 adições e 124 exclusões
  1. 108 124
      logs_to_csv.py

+ 108 - 124
logs_to_csv.py

@@ -1,25 +1,95 @@
 #!/usr/bin/python3
 
-import subprocess
 import os
 import sys
 import math
 import numpy as np
+import re
 
-FILE_PARSE_STATE = ["ROUND1", "ESP", "BYTES", "PADDING", "EPOCH", "SCMT"]
-
 '''
-parse_output_logs is used in two ways:
-  (i) For logs_to_csv.py script which produces the output csv given the
-  LOGS_FOLDER with multiple experiment log folders within it. This is done when
-  this script is run directly
+Produce the output csv given the LOGS_FOLDER with multiple experiment
+log folders within it.
+'''
 
-  (ii) For run_experiments, which invokes parse_output_logs with (LOGS_FOLDER,
-  experiment_name), takes the LOGS_FOLDER pointing to a diagnostic output
-  folder to extract num_sizes of waksman networks, the WN precomputation time,
-  epoch time, and send_client_mailbox time.
+# The statistics to report in the output csv file. Each will be output
+# as mean,stddev,max
+stats_keys = ['epoch', 'wn', 'bytes']
+
+def get_epoch_stats(f, expected_real):
+    epoch_stats = {}
+    looking_for_epoch_start = True
+    epoch_start = None
+    num_real = 0
+    waksman_precompute_start = None
+    while True:
+        line = f.readline()
+
+        # If we get to the end of file, indicate that by returning 'EOF'
+        if line == "":
+            return 'EOF'
+
+        # Look for the start of the next epoch
+        if looking_for_epoch_start:
+            if matches := re.match(r'(\d+\.\d+): Epoch \d+ start', line):
+                [ts] = matches.groups()
+                epoch_start = float(ts)
+                looking_for_epoch_start = False
+            next
+
+        # If we see the start of a new experiment, then the current
+        # epoch is interrupted, or we were scanning the trailing bit of
+        # the previous experiment
+        if re.match(r'Loaded sealed private key', line):
+            return None
+
+        # If we see the end of the epoch, check to see if the number of
+        # real messages we received was as expected (that is, if we've
+        # reached steady state, as opposed to just starting up)
+        if re.match(r'(\d+\.\d+): Sleeping for ', line):
+            if num_real >= expected_real:
+                return epoch_stats
+            else:
+                return None
+
+        # If we see a report of the number of real and padding messages
+        # received, record the number of real messages
+        if matches := re.match(r'(\d+) real, \d+ padding', line):
+            [rm] = matches.groups()
+            num_real = int(rm)
+
+        # If we see the end of an epoch, record the epoch time
+        if matches := re.match(r'(\d+\.\d+): Epoch \d+ complete', line):
+            [ts] = matches.groups()
+            epoch_stats['epoch'] = float(ts)-epoch_start
+
+        # If we see Waksman network precompute start/stop times,
+        # record those
+        if matches := re.match(r'(\d+\.\d+): (Begin|End) Waksman networks precompute', line):
+            [ts, be] = matches.groups()
+            if be == 'Begin':
+                waksman_precompute_start = float(ts)
+            elif be == 'End':
+                epoch_stats['wn'] = float(ts)-waksman_precompute_start
+
+        # If we see the number of bytes sent, record that
+        if matches := re.match(r'bytes_sent = (\d+)', line):
+            [bs] = matches.groups()
+            epoch_stats['bytes'] = int(bs)
+
+def stats_string(vals):
+    """Return a string of the form ',mean,stddev,max' for the given
+    input values"""
+    if vals is None or len(vals) == 0:
+        return ",0,0,0"
+
+    if len(vals) == 1:
+        return f",{vals[0]},0,{vals[0]}"
+
+    mean = np.mean(vals)
+    stddev = np.std(vals)
+    mx = np.max(vals)
+    return f",{mean:.3f},{stddev:.3f},{mx:.3f}"
 
-'''
 def parse_output_logs(LOGS_FOLDER, experiment_name, generate_csv = False, op_file = None):
 
     params = experiment_name.split('_')
@@ -30,119 +100,29 @@ def parse_output_logs(LOGS_FOLDER, experiment_name, generate_csv = False, op_fil
     b = int(params[3].strip('/'))
 
     expected_real = math.floor(n/M)
-    epoch_time = []
-    # scm = send_client_mailbox
-    scm_time = []
-    storage_time = []
-    # pwn = precompute_Waksman_Network
-    pwn_time = []
-    bytes_sent = 0
-    num_sizes = 0
-    state = FILE_PARSE_STATE[0]
+    stats = {}
     for m in range(1,M+1):
-        EOF = False
-        f = open(os.path.join(LOGS_FOLDER, experiment_name, 's'+str(m)+'.log'),'r')
-        print(os.path.join(LOGS_FOLDER, experiment_name, 's'+str(m)+'.log'))
+        logfilename = os.path.join(LOGS_FOLDER, experiment_name, 's'+str(m)+'.log')
+        f = open(logfilename,'r')
+        print(logfilename)
         line_cnt = 0
-        while(1):
-            line = f.readline()
-            line_cnt+=1
-            if(line == ""):
+        while True:
+            epoch_stats = get_epoch_stats(f, expected_real)
+            if epoch_stats == 'EOF':
                 break
+            if epoch_stats is not None:
+                for key in epoch_stats:
+                    if key not in stats:
+                        stats[key] = []
+                    stats[key].append(epoch_stats[key])
 
-            if('end precompute Waksman Network' in line):
-                value = line.split(' ')[1]
-                value = value.strip('(')
-                value = value.strip(')')
-                pwn_time.append(float(value))
-
-            if('Precompute num_sizes' in line):
-                value = line.split(' ')[-1]
-                num_sizes = int(value)
-
-            elif(state == "ROUND1"):
-                if("Round 1 " in line):
-                    state = "PADDING"
-                    #print("R1: " + str(line_cnt))
-
-            elif(state == "ESP"):
-                if("end storage processing" in line):
-                    value = line.split(' ')[1]
-                    value = value.strip('(')
-                    value = value.strip(')')
-                    stg_time = float(value)
-                    storage_time.append(stg_time)
-                    state = "BYTES"
-
-            elif(state == "PADDING"):
-                if('padding' in line):
-                    #print("PADDING: " + str(line_cnt))
-                    words = line.split(' ')
-                    log_real = int(words[0])
-                    if(log_real >= expected_real):
-                        state = "ESP"
-                    else:
-                        state = "ROUND1"
-
-            elif(state == "BYTES"):
-                if('bytes_sent' in line):
-                    words = line.split(' ')
-                    bytes_sent = int(words[-1])
-                    state = "EPOCH"
-
-            elif(state == "EPOCH"):
-                if('Epoch' in line and 'time' in line):
-                    #print("EPOCH: " + str(line_cnt))
-                    nwords = line.split(' ')
-                    epoch_time.append(float(nwords[-2]))
-                    state = "SCMT"
-
-            elif(state == "SCMT"):
-                if('send_client_mailbox time' in line):
-                    scm_time = float(line.split(' ')[-2])
-                    state = "ROUND1"
-
-
-    if(len(epoch_time)!=0):
-        route_time = []
-        for i in range(len(epoch_time)):
-            route_time.append(epoch_time[i] - storage_time[i])
-        epoch_mean = np.mean(epoch_time)
-        route_mean = np.mean(route_time)
-        storage_mean = np.mean(storage_time)
-        scm_mean = np.mean(scm_time)
-        pwn_mean = np.mean(pwn_time)
-
-        epoch_max = np.max(epoch_time)
-        scm_max = np.max(scm_time)
-        pwn_max = np.max(pwn_time)
-
-        epoch_stddev = np.std(epoch_time)
-        route_stddev = np.std(route_time)
-        storage_stddev = np.std(storage_time)
-        scm_stddev = np.std(scm_time)
-        pwn_stddev = np.mean(pwn_time)
-
-        epochs = int(len(epoch_time)/M)
-        print("Num epochs = %d" % epochs);
-        print("Route time = %f +/- %f" %(route_mean, route_stddev))
-        print("Storage time = %f +/- %f" %(storage_mean, storage_stddev))
-        print("Epoch time = %f +/- %f (Max epoch_time = %f)" %(epoch_mean, epoch_stddev, epoch_max))
-        print("PWN time = %f +/- %f (Max PWN = %f)" %(pwn_mean, pwn_stddev, pwn_max))
-        print("SCM time = %f +/- %f (Max SCM = %f)" %(scm_mean, scm_stddev, scm_max))
-
-        if(generate_csv):
-            # Insert it into the output csv file
-            op_line = str(n) + "," + str(M) + "," + str(t) + "," + str(b) + "," + str(epochs) + ","
-            op_line+= "{:.4f}".format(epoch_mean) + (",") + "{:.4f}".format(epoch_stddev) + ","
-            op_line+= "{:.4f}".format(route_mean) + (",") + "{:.4f}".format(route_stddev) + ","
-            op_line+= "{:.4f}".format(storage_mean) + (",") + "{:.4f}".format(storage_stddev) + ","
-            op_line+= str(bytes_sent) + "\n"
-            op_file.write(op_line)
-        else:
-            return(num_sizes, pwn_max, epoch_max, scm_max)
-    else:
-        print("No valid logs for %s" % LOGS_FOLDER)
+    epochs = int(len(stats['epoch'])/M)
+    print("Num epochs = %d" % epochs);
+    op_line = f"{n},{M},{t},{b},{epochs}"
+    for key in stats_keys:
+        op_line += stats_string(stats[key] if key in stats else None)
+    op_line += "\n"
+    op_file.write(op_line)
 
 
 if __name__ == "__main__":
@@ -155,11 +135,15 @@ if __name__ == "__main__":
 
     LOGS_FOLDER = sys.argv[1]
     OUTPUT_FILE = sys.argv[2]
-    op_file = open(OUTPUT_FILE, 'w+')
-    op_file.write("N,M,T,B,E,epoch_total_time,epoch_stddev,\
-route_time,route_stddev,storage_time,storage_stddev,bytes_sent\n")
+    op_file = open(OUTPUT_FILE, 'w')
+    op_header = "N,M,T,B,E"
+    for key in stats_keys:
+        op_header += f",{key}_mean,{key}_stddev,{key}_max"
+    op_header += "\n"
+    op_file.write(op_header)
 
     for exp_name in os.listdir(LOGS_FOLDER):
         parse_output_logs(LOGS_FOLDER, exp_name, True, op_file)
 
     op_file.close()
+