浏览代码

Automatically configure the cores to use for each experiment

The new core allocator will read numactl -s and /proc/cpuinfo
to try to allocate cores to servers and clients without using
the same physical core more than once.

If you don't have enough physical cores for an experiment, it will by
default skip that experiment, but if you set the environment variable
OVERLOAD_CORES=1, then it will run the experiment anyway, but incur a
performance penalty.
Ian Goldberg 11 月之前
父节点
当前提交
9213665c0f
共有 5 个文件被更改,包括 222 次插入49 次删除
  1. 7 7
      Client/clientlaunch
  2. 181 0
      core_allocator.py
  3. 8 15
      gen_manifest.py
  4. 5 14
      run_all_experiments.py
  5. 21 13
      run_experiments.py

+ 7 - 7
Client/clientlaunch

@@ -20,13 +20,11 @@ PUBKEYS = "../App/pubkeys.yaml"
 # The client binary
 CLIENTS = "./clients"
 
-# Client thread allocation
-prefix = "numactl -C36-39,76-79 "
-#prefix = ""
-
-def launch(config, cmd, threads, lgfile):
+def launch(config, cmd, threads, lgfile, corelist):
     cmdline = ''
-    cmdline += prefix + CLIENTS + " -t " + str(threads) + ""
+    if corelist is not None:
+        cmdline = "numactl -C %s " % corelist
+    cmdline += CLIENTS + " -t " + str(threads) + ""
 
     stdout_file = subprocess.PIPE
     if lgfile:
@@ -60,6 +58,8 @@ if __name__ == "__main__":
         help='pubkeys.yaml file')
     aparse.add_argument('-t', default=1,
         help='number of threads')
+    aparse.add_argument('-T', default=None,
+        help='CPU cores to use for clients')
     aparse.add_argument('-z', default=None,
         help='override message size')
     aparse.add_argument('-u', default=None,
@@ -99,4 +99,4 @@ if __name__ == "__main__":
     # Now add a trailing newline
     config += "\n"
 
-    launch(config, args.cmd, args.t, args.l)
+    launch(config, args.cmd, args.t, args.l, args.T)

+ 181 - 0
core_allocator.py

@@ -0,0 +1,181 @@
+#!/usr/bin/env python3
+
+import os
+import re
+import subprocess
+import sys
+
+# Note on hyperthreading: on Xeon CPUs that support SGX2, each physical
+# core supports two hyperthreads (if enabled in the BIOS), and appears
+# as two virtual cores to the system (in /proc/cpuinfo, for example).
+# The two virtual cores that map to the same physical core are called
+# the "A side" and the "B side".  Each physical core only has one AES
+# circuit, so if both hyperthreads on the same core are trying to do AES
+# at the same time, performance crashes.  So by default, we only use one
+# hyperthread (the "A side") on a given physical core.
+
+# Get the list of virtual cores currently available to us
+def virtual_cores_available():
+    ret = subprocess.run(["numactl", "-s"], capture_output=True)
+    if ret.returncode != 0:
+        print("Unable to run numactl", file=sys.stderr)
+        sys.exit(1)
+    match = re.search(r'physcpubind: ((\d+ )*\d+)', str(ret.stdout))
+    return list(map(int,match.group(1).split(' ')))
+
+# Read /proc/cpuinfo to get a map from virtual core number to CPU number
+# ("physical id") and physical core ("core id").  Only track the cores
+# that are currently available
+def get_core_map():
+    cores_available = virtual_cores_available()
+    coremap = {}
+
+    with open("/proc/cpuinfo") as p:
+        virtcore = None
+        cpuid = None
+        coreid = None
+        while True:
+            l = p.readline()
+            if l == "":
+                break
+            elif l == "\n":
+                if virtcore is None or cpuid is None or coreid is None:
+                    print(virtcore, cpuid, coreid)
+                    print("Could not parse /proc/cpuinfo", file=sys.stderr)
+                    sys.exit(1)
+                if virtcore in cores_available:
+                    coremap[virtcore] = (cpuid, coreid)
+                virtcore = None
+                cpuid = None
+                coreid = None
+            elif match := re.match(r'processor\s*: (\d+)', l):
+                virtcore = int(match.group(1))
+            elif match := re.match(r'physical id\s*: (\d+)', l):
+                cpuid = int(match.group(1))
+            elif match := re.match(r'core id\s*: (\d+)', l):
+                coreid = int(match.group(1))
+    return coremap
+
+# Return an array.  The first element of the array will represent the "A
+# sides", the second will represent the "B sides" (and if you're on some
+# weird CPU with more than 2 hyperthreads per core, the next will be the
+# "C sides", etc.).  Each element will be a map from cpuid to a list of
+# the available virtual cores on that CPU, at most one per physical
+# core.
+def get_core_layout():
+    core_map = get_core_map()
+    retarray = []
+    while core_map:
+        # Extract the first virtual core for each (cpuid, physical core)
+        # from core_map
+        current_side_map = {}
+        virtual_cores_remaining = list(core_map.keys())
+        for vcore in virtual_cores_remaining:
+            (cpuid, coreid) = core_map[vcore]
+            if cpuid not in current_side_map:
+                current_side_map[cpuid] = {}
+            if coreid not in current_side_map[cpuid]:
+                current_side_map[cpuid][coreid] = vcore
+                del core_map[vcore]
+
+        current_side = {}
+        for cpuid in current_side_map:
+            current_side[cpuid] = list(current_side_map[cpuid].values())
+        retarray.append(current_side)
+    return retarray
+
+core_layout = get_core_layout()
+
+# Maximum number of cores to use for clients (but we'll prefer to use
+# fewer cores instead of overloading cores)
+CLIENT_MAX_CORES = 8
+
+# Return a core allocation for an experiment.  Pass in the number of
+# servers, and the number of cores per server.  The return value is a
+# pair.  The first element is a list of length num_servers, each
+# element of which is the core allocation for one server (which will be
+# a list of length cores_per_server).  The second element of the return
+# value is the core allocation for the clients (which will be a list of
+# length between 1 and CLIENT_MAX_CORES).  If the environment variable
+# OVERLOAD_CORES is unset or set to 0, each available physical core will
+# be used at most once.  If that is not possible, (None, None) will be
+# returned.  If OVERLOAD_CORES is set to 1, then physical cores will be
+# reused when necessary in order to run the requested experiment, albeit
+# at a significant performance penalty.  It must be the case in any
+# event that you have at least one CPU with at least cores_per_server
+# physical cores.
+def core_allocation(num_servers, cores_per_server):
+    overload_cores = \
+        os.getenv("OVERLOAD_CORES", '0').lower() in ('true', '1', 't')
+
+    servers_allocation = []
+    client_allocation = []
+
+    # Which index into core_layout we are currently working with
+    hyperthread_side = 0
+
+    # Copy that entry of the core_layout
+    current_cores = dict(core_layout[hyperthread_side])
+
+    while len(servers_allocation) < num_servers or \
+            len(client_allocation) < CLIENT_MAX_CORES:
+        # Find the cpu with the most cores available
+        cpu_most_cores = None
+        num_most_cores = None
+        for cpuid in current_cores:
+            num_cores = len(current_cores[cpuid])
+            if num_cores > 0 and \
+                    (num_most_cores is None or num_cores > num_most_cores):
+                cpu_most_cores = cpuid
+                num_most_cores = num_cores
+
+        if num_most_cores is not None and \
+                num_most_cores >= cores_per_server and \
+                len(servers_allocation) < num_servers:
+            servers_allocation.append(
+                current_cores[cpu_most_cores][0:cores_per_server])
+            current_cores[cpu_most_cores] = \
+                current_cores[cpu_most_cores][cores_per_server:]
+            continue
+
+        # We could not find a suitable allocation for the next server.
+        # Try allocating a core for clients, if we still could use some.
+        if num_most_cores is not None and \
+                num_most_cores >= 1 and \
+                len(client_allocation) < CLIENT_MAX_CORES:
+            client_allocation.append(current_cores[cpu_most_cores][0])
+            current_cores[cpu_most_cores] = \
+                current_cores[cpu_most_cores][1:]
+            continue
+
+        # We can't do an allocation.  If we have all the server
+        # allocations, and at least one client core allocated, that'll
+        # be good enough.
+        if len(servers_allocation) == num_servers and \
+                len(client_allocation) >= 1:
+            break
+
+        # We're going to have to overload cores, if allowed
+        if not overload_cores:
+            return (None, None)
+
+        hyperthread_side = (hyperthread_side + 1) % len(core_layout)
+        # Copy that entry of the core_layout
+        current_cores = dict(core_layout[hyperthread_side])
+
+
+    return (servers_allocation, client_allocation)
+
+
+
+
+if __name__ == "__main__":
+    if len(sys.argv) > 1:
+        num_servers = int(sys.argv[1])
+    else:
+        num_servers = 4
+    if len(sys.argv) > 2:
+        cores_per_server = int(sys.argv[2])
+    else:
+        cores_per_server = 1
+    print(core_allocation(num_servers,cores_per_server))

+ 8 - 15
gen_manifest.py

@@ -4,6 +4,7 @@ import subprocess
 import os
 import sys
 import math
+from core_allocator import core_allocation
 
 # CONFIGS TO SET:
 
@@ -17,7 +18,9 @@ MANIFEST_FILE = "App/manifest.yaml"
     B: msg_size
 '''
 def generate_manifest(N, M, T, B, PRIVATE_ROUTE = True, priv_out=1, priv_in=1, pub_out=1, pub_in=1):
-
+    (servers_allocation, client_allocation) = core_allocation(M, T)
+    assert servers_allocation is not None
+    assert client_allocation is not None
     mf = open(MANIFEST_FILE, 'w+')
     manifest_params = '''params:
   # The total max number of users
@@ -34,8 +37,8 @@ def generate_manifest(N, M, T, B, PRIVATE_ROUTE = True, priv_out=1, priv_in=1, p
   pub_in: {pui}
   # Private or public routing protocol selection
   private_routing: {PRIVATE_ROUTE}
-  # A hardcoded master secret for generating keys to bootstrap
-  # client -> server communications
+  # Currently hardcoding an AES key for client -> server communications,
+  # but in reality, a key exchange would be done
   master_secret: \"AHardCodedAESKey\"\n'''.format(N = str(N), B = str(B), PRIVATE_ROUTE=str(PRIVATE_ROUTE),\
   pro = str(priv_out), pri = str(priv_in), puo = str(pub_out), pui = str(pub_in))
 
@@ -53,19 +56,9 @@ def generate_manifest(N, M, T, B, PRIVATE_ROUTE = True, priv_out=1, priv_in=1, p
   args: -t {T}
   roles: 7\n'''
 
-    # Assign server threads starting at 0
-    numa_base = 0
-    # Do not assign servers threads that are across the processors
-    numa_limit = 36
-    numa_start = numa_base
-
     for s in range(1, M+1):
-        numa_end = numa_start + T -1
-        if(numa_start <= numa_limit and numa_end > numa_limit):
-            numa_start = 40
-            numa_end = numa_start + T
-        numa_string = "-C{s}-{e}".format(s = str(numa_start), e = str(numa_end))
-        numa_start = numa_end + 1
+        core_alloc = servers_allocation[s-1]
+        numa_string = "-C " + ",".join(map(str, core_alloc))
         curr_params = server_params.format(i = str(s), nstring = numa_string, T = str(T))
         # print(curr_params)
         mf.write(curr_params)

+ 5 - 14
run_all_experiments.py

@@ -1,14 +1,5 @@
 #!/usr/bin/python3
-'''
-  NOTE: This script is tailored for a machine with 2 40-core processors.
-  Currently assumes 8 threads (C36-39,76-79) are set aside for client
-  simulator; change this by setting "prefix" in Client/clientlaunch and
-  the -t option to clientlaunch below.
-  Similarly, the use of cores C0-35,40-75 for the TEEMS servers can be
-  changed in the gen_manifest.py program.
-  If you have a different number of cores available for servers, also
-  change M_MAX below.
-'''
+
 from run_experiments import run_exp
 
 LOG_FOLDER = "Experiments/"
@@ -21,7 +12,7 @@ PUB_IN = 1
 # B = message size (bytes)
 B = 256
 
-## Figure 7 Public
+## Figure 5 Public
 PRIVATE_ROUTE = False
 N = [1<<15, 1<<16, 1<<17, 1<<18, 1<<19, 1<<20]
 M = [4]
@@ -29,7 +20,7 @@ T = [4]
 
 run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN, PUB_OUT, PUB_IN)
 
-## Figure 8 Public
+## Figure 6 Public
 PRIVATE_ROUTE = False
 N = [1<<20]
 M = [72, 64, 48, 36, 32, 24, 16, 8, 6, 4]
@@ -37,7 +28,7 @@ T = [1]
 
 run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN, PUB_OUT, PUB_IN)
 
-## Figure 7 Private
+## Figure 5 Private
 PRIVATE_ROUTE = True
 N = [1<<15, 1<<16, 1<<17, 1<<18, 1<<19, 1<<20]
 M = [4]
@@ -45,7 +36,7 @@ T = [4]
 
 run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN, PUB_OUT, PUB_IN)
 
-## Figure 8 Private
+## Figure 6 Private
 PRIVATE_ROUTE = True
 N = [1<<20]
 M = [72, 64, 48, 36, 32, 24, 16, 8, 6, 4]

+ 21 - 13
run_experiments.py

@@ -1,16 +1,5 @@
 #!/usr/bin/python3
 
-'''
-  NOTE: This script is tailored for a machine with 2 40-core processors.
-  Currently assumes 8 threads (C36-39,76-79) are set aside for client
-  simulator; change this by setting "prefix" in Client/clientlaunch and
-  the -t option to clientlaunch below.
-  Similarly, the use of cores C0-35,40-75 for the TEEMS servers can be
-  changed in the gen_manifest.py program.
-  If you have a different number of cores available for servers, also
-  change M_MAX below.
-'''
-
 import subprocess
 import os
 import sys
@@ -19,6 +8,7 @@ import time
 from gen_manifest import generate_manifest
 from gen_enclave_config import generate_config
 from logs_to_csv import parse_output_logs
+from core_allocator import core_allocation
 
 ###############################################################################
 
@@ -71,6 +61,22 @@ def run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN
             for n in N:
                 #for run in ["diagnostic", "experiment"]:
                 for run in ["experiment"]:
+                    (servers_allocation, client_allocation) = \
+                        core_allocation(m,t)
+                    if servers_allocation is None or \
+                            client_allocation is None:
+                        print("""
+***
+*** Not enough physical cores available to run
+*** M = %d servers with T = %d cores each, plus
+*** at least one core for clients.
+***
+*** Consider setting the env var OVERLOAD_CORES=1
+*** (which will severely impact the performance)
+*** or run experiments of reduced size.
+***
+""" % (m,t))
+                        continue
                     num_WN_to_precompute = 0
                     if(PRIVATE_ROUTE):
                         num_WN_to_precompute = 2 * 3
@@ -117,7 +123,7 @@ def run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN
                     epoch_wait_time = math.ceil(epoch_param)
                     print("Epoch_wait_time = %d" % epoch_wait_time)
 
-                    # Since launch is invoked from App/ ; we add a ../ to subfolder before
+                    # Since launch is invoked from App/ we add a ../ to subfolder before
                     # passing it to slaunch
                     log_subfolder = "../" + log_subfolder
 
@@ -158,9 +164,11 @@ def run_exp(LOG_FOLDER, PRIVATE_ROUTE, NUM_EPOCHS, N, M, T, B, PRIV_OUT, PRIV_IN
                     claunch = []
                     claunch.append("./clientlaunch")
                     claunch.append("-t")
-                    claunch.append("8")
+                    claunch.append(str(len(client_allocation)))
                     claunch.append("-l")
                     claunch.append(log_subfolder+"clients.log")
+                    claunch.append("-T")
+                    claunch.append(",".join(map(str, client_allocation)))
                     os.chdir("./../Client/")
                     client_process = subprocess.call(claunch)