Browse Source

Experiments.

Kyle Fredrickson 1 year ago
parent
commit
2325a7ef1d

+ 1 - 1
.gitignore

@@ -1,4 +1,4 @@
 target/
 data/
 *.svg
-/*.perf*
+perf.*

+ 53 - 0
baseline/Cargo.lock

@@ -57,6 +57,7 @@ version = "0.1.0"
 dependencies = [
  "clap",
  "otils",
+ "rayon",
 ]
 
 [[package]]
@@ -111,6 +112,37 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
 
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
+
+[[package]]
+name = "either"
+version = "1.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b"
+
 [[package]]
 name = "heck"
 version = "0.5.0"
@@ -128,6 +160,7 @@ name = "otils"
 version = "0.1.0"
 dependencies = [
  "cc",
+ "rayon",
 ]
 
 [[package]]
@@ -148,6 +181,26 @@ dependencies = [
  "proc-macro2",
 ]
 
+[[package]]
+name = "rayon"
+version = "1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+dependencies = [
+ "crossbeam-deque",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "strsim"
 version = "0.11.1"

+ 3 - 2
baseline/Cargo.toml

@@ -9,8 +9,9 @@ debug = true
 [dependencies]
 otils = { path = "../otils" }
 clap = { version = "4.5.4", features = ["derive"] }
+rayon = "1.10.0"
 
 [package.metadata.fortanix-sgx]
 stack-size=0x400000
-heap-size=0x100000000
-threads=9
+heap-size=0x800000000
+threads=49

+ 10 - 5
baseline/src/omq.rs

@@ -1,16 +1,21 @@
 use crate::request::Request;
 use otils::ObliviousOps;
+use rayon::ThreadPool;
 
 #[derive(Debug)]
 pub struct ObliviousMultiQueue {
-    num_threads: usize,
+    pool: ThreadPool,
     message_store: Vec<Request>,
 }
 
 impl ObliviousMultiQueue {
     pub fn new(num_threads: usize) -> Self {
+        let pool = rayon::ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .build()
+            .unwrap();
         ObliviousMultiQueue {
-            num_threads,
+            pool,
             message_store: Vec::new(),
         }
     }
@@ -36,7 +41,7 @@ impl ObliviousMultiQueue {
         let fetch_sum = fetches.iter().fold(0, |acc, f| acc + f.volume) as usize;
         self.update_store(fetches, fetch_sum);
 
-        self.message_store = otils::sort(std::mem::take(&mut self.message_store), self.num_threads);
+        self.message_store = otils::sort(std::mem::take(&mut self.message_store), &self.pool);
 
         let mut user_sum: isize = 0;
         let mut prev_user: i32 = -1;
@@ -58,7 +63,7 @@ impl ObliviousMultiQueue {
         otils::compact(
             &mut self.message_store[..],
             |r| r.should_deliver(),
-            self.num_threads,
+            &self.pool,
         );
         let deliver: Vec<Request> = self.message_store.drain(0..fetch_sum).collect();
         // for r in deliver.iter() {
@@ -68,7 +73,7 @@ impl ObliviousMultiQueue {
         otils::compact(
             &mut self.message_store[..],
             |r| r.should_defer(),
-            self.num_threads,
+            &self.pool,
         );
         self.message_store.truncate(final_size);
         // for r in self.message_store.iter() {

+ 10 - 9
experiments/message_scaling.py

@@ -1,15 +1,14 @@
 import os
 import subprocess
 
-SENDS = [2**i for i in range(20, 30)]
-FETCHES = 50000
-USERS = int(FETCHES / 2)
+SENDS = [2**i for i in range(18, 26)]
+FETCHES = 8192
+USERS = FETCHES
 
+THREADS = 48
+MAPS = 5
 
-THREADS = 8
-MAPS = 4
-
-RUNS = 1
+RUNS = 10
 WARMUP = 0
 
 DATA_DIR = os.path.join(os.getcwd(), "data", "message-scaling")
@@ -23,16 +22,18 @@ SPARTA_FILE = os.path.join(DATA_DIR, f"sparta-{FETCHES}-{THREADS}-{MAPS}.csv")
 
 
 def sparta_cmd(sends):
+
     cmd = ["cargo", "run", "--release", "--",
-           str(sends), str(FETCHES), str(THREADS), str(USERS), str(MAPS)]
+           str(sends), str(FETCHES), str(THREADS), str(USERS), str(MAPS), "-r", str(RUNS), "-w", str(WARMUP)]
     result = subprocess.run(cmd, capture_output=True,
                             text=True, cwd=SPARTA_DIR)
+    print(result.stderr)
     return result.stdout
 
 
 def baseline_cmd(sends):
     cmd = ["cargo", "run", "--release", "--",
-           str(sends), str(FETCHES), str(THREADS)]
+           str(sends), str(FETCHES), str(THREADS),  "-r", str(RUNS), "-w", str(WARMUP)]
     result = subprocess.run(cmd, capture_output=True,
                             text=True, cwd=BASELINE_DIR)
     return result.stdout

+ 9 - 0
experiments/run-experiments.sh

@@ -0,0 +1,9 @@
+#! /bin/bash
+
+cd ..
+
+python3 experiments/message_scaling.py
+python3 experiments/submap_scaling.py
+python3 experiments/user_and_message_scaling.py
+
+cd -

+ 54 - 0
experiments/submap_scaling.py

@@ -0,0 +1,54 @@
+import os
+import subprocess
+
+SENDS = 1048576
+FETCHES = 8192
+USERS = FETCHES
+
+THREADS = 8
+MAP_THREADS = [(i, THREADS * i + THREADS) for i in range(1, int(48 / THREADS))]
+
+RUNS = 10
+WARMUP = 0
+
+DATA_DIR = os.path.join(os.getcwd(), "data", "submap-scaling")
+os.makedirs(DATA_DIR, exist_ok=True)
+
+SPARTA_DIR = os.path.join(os.getcwd(), "sparta")
+BASELINE_DIR = os.path.join(os.getcwd(), "baseline")
+
+BASELINE_FILE = os.path.join(DATA_DIR, f"baseline-{FETCHES}-{THREADS}.csv")
+SPARTA_FILE = os.path.join(DATA_DIR, f"sparta-{FETCHES}-{THREADS}.csv")
+
+
+def sparta_cmd(mt):
+    (num_maps, num_threads) = mt
+    cmd = ["cargo", "run", "--release", "--",
+           str(SENDS), str(FETCHES), str(num_threads),
+           str(USERS), str(num_maps), "-r", str(RUNS), "-w", str(WARMUP)]
+    result = subprocess.run(cmd, capture_output=True,
+                            text=True, cwd=SPARTA_DIR)
+    return result.stdout
+
+
+def baseline_cmd(mt):
+    (_num_maps, num_threads) = mt
+    cmd = ["cargo", "run", "--release", "--",
+           str(SENDS), str(FETCHES), str(num_threads), "-r", str(RUNS), "-w", str(WARMUP)]
+    result = subprocess.run(cmd, capture_output=True,
+                            text=True, cwd=BASELINE_DIR)
+    return result.stdout
+
+
+for mt in MAP_THREADS:
+    print(mt)
+
+    with open(SPARTA_FILE, "a") as sparta_file:
+        output = sparta_cmd(mt)
+        print("\tsparta:", output)
+        sparta_file.write(output)
+
+    with open(BASELINE_FILE, "a") as baseline_file:
+        output = baseline_cmd(mt)
+        print("\tbaseline:", output)
+        baseline_file.write(output)

+ 0 - 0
experiments/thread_scaling.py


+ 49 - 0
experiments/user_and_message_scaling.py

@@ -0,0 +1,49 @@
+import os
+import subprocess
+
+SENDS = [2**i for i in range(18, 25)]
+
+THREADS = 48
+MAPS = 5
+
+RUNS = 10
+WARMUP = 0
+
+DATA_DIR = os.path.join(os.getcwd(), "data", "user-message-scaling")
+os.makedirs(DATA_DIR, exist_ok=True)
+
+SPARTA_DIR = os.path.join(os.getcwd(), "sparta")
+BASELINE_DIR = os.path.join(os.getcwd(), "baseline")
+
+BASELINE_FILE = os.path.join(DATA_DIR, f"baseline-{THREADS}.csv")
+SPARTA_FILE = os.path.join(DATA_DIR, f"sparta-{THREADS}.csv")
+
+
+def sparta_cmd(sends):
+    cmd = ["cargo", "run", "--release", "--",
+           str(sends), str(sends), str(THREADS), str(sends), str(MAPS), "-r", str(RUNS), "-w", str(WARMUP)]
+    result = subprocess.run(cmd, capture_output=True,
+                            text=True, cwd=SPARTA_DIR)
+    return result.stdout
+
+
+def baseline_cmd(sends):
+    cmd = ["cargo", "run", "--release", "--",
+           str(sends), str(sends), str(THREADS), "-r", str(RUNS), "-w", str(WARMUP)]
+    result = subprocess.run(cmd, capture_output=True,
+                            text=True, cwd=BASELINE_DIR)
+    return result.stdout
+
+
+for send in SENDS:
+    print(send)
+
+    with open(SPARTA_FILE, "a") as sparta_file:
+        output = sparta_cmd(send)
+        print("\tsparta:", output)
+        sparta_file.write(output)
+
+    with open(BASELINE_FILE, "a") as baseline_file:
+        output = baseline_cmd(send)
+        print("\tbaseline:", output)
+        baseline_file.write(output)

+ 0 - 0
experiments/user_scaling.py


+ 1 - 1
otils

@@ -1 +1 @@
-Subproject commit 2b0a0bc57376e658253f01274e98e270cc8452e1
+Subproject commit b372424e36f1ebc6f3d088e2b61062469d331482

+ 53 - 0
sparta/Cargo.lock

@@ -140,6 +140,37 @@ version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
 
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
+
+[[package]]
+name = "either"
+version = "1.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b"
+
 [[package]]
 name = "fastapprox"
 version = "0.3.1"
@@ -163,6 +194,7 @@ name = "otils"
 version = "0.1.0"
 dependencies = [
  "cc",
+ "rayon",
 ]
 
 [[package]]
@@ -183,6 +215,26 @@ dependencies = [
  "proc-macro2",
 ]
 
+[[package]]
+name = "rayon"
+version = "1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+dependencies = [
+ "crossbeam-deque",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "sparta"
 version = "0.1.0"
@@ -191,6 +243,7 @@ dependencies = [
  "clap",
  "fastapprox",
  "otils",
+ "rayon",
 ]
 
 [[package]]

+ 4 - 3
sparta/Cargo.toml

@@ -8,11 +8,12 @@ otils = { path = "../otils" }
 blake3 = "1.5.1"
 fastapprox = "0.3.1"
 clap = { version = "4.5.4", features = ["derive"] }
+rayon = "1.10.0"
 
 [package.metadata.fortanix-sgx]
 stack-size=0x400000
-heap-size=0x100000000
-threads=9
+heap-size=0x2000000000
+threads=49
 
 [profile.release]
-# debug = true
+debug = true

+ 46 - 48
sparta/src/load_balancer.rs

@@ -2,34 +2,45 @@ use crate::omap::ObliviousMap;
 pub use crate::record::{IndexRecord, Record, RecordType, SubmapRecord};
 use fastapprox::fast;
 use otils::{self, ObliviousOps};
-use std::{cmp, f64::consts::E, thread, time::UNIX_EPOCH};
+use rayon::ThreadPool;
+use std::{
+    cmp,
+    f64::consts::E,
+    sync::{Arc, Mutex},
+    time::UNIX_EPOCH,
+};
 
 const LAMBDA: usize = 128;
 
 pub struct LoadBalancer {
     num_users: i64,
-    num_threads: usize,
     num_submaps: usize,
 
+    pool: ThreadPool,
     pub user_store: Vec<IndexRecord>,
     pub submaps: Vec<ObliviousMap>,
 }
 
 impl LoadBalancer {
     pub fn new(num_users: i64, num_threads: usize, num_submaps: usize) -> Self {
+        let pool = rayon::ThreadPoolBuilder::new()
+            .num_threads(num_threads / (num_submaps + 1))
+            .build()
+            .unwrap();
+
         let mut user_store = Vec::new();
         user_store.reserve(num_users as usize);
         user_store.extend((0..num_users).map(|i| IndexRecord::new(i, RecordType::User)));
 
         let mut submaps = Vec::with_capacity(num_submaps as usize);
         submaps.extend(
-            (0..num_submaps).map(|_| ObliviousMap::new(num_threads / num_submaps as usize)),
+            (0..num_submaps).map(|_| ObliviousMap::new(num_threads / (num_submaps + 1) as usize)),
         );
 
         LoadBalancer {
             num_users,
-            num_threads,
             num_submaps,
+            pool,
             user_store,
             submaps,
         }
@@ -73,7 +84,7 @@ impl LoadBalancer {
 
         let mut requests = self.pad_for_submap(requests, submap_size, is_send);
 
-        requests = otils::sort(requests, self.num_threads); // sort by omap, then by dummy
+        requests = otils::sort(requests, &self.pool); // sort by omap, then by dummy
 
         let mut prev_map = self.num_submaps;
         let mut remaining_marks = submap_size as i32;
@@ -89,7 +100,7 @@ impl LoadBalancer {
             prev_map = submap as usize;
         }
 
-        otils::compact(&mut requests[..], |r| r.0.mark == 1, self.num_threads);
+        otils::compact(&mut requests[..], |r| r.0.mark == 1, &self.pool);
         requests.truncate(self.num_submaps * submap_size);
         requests
     }
@@ -126,20 +137,16 @@ impl LoadBalancer {
         self.user_store.reserve(num_requests);
         self.user_store.extend(sends);
 
-        self.user_store = otils::sort(std::mem::take(&mut self.user_store), self.num_threads);
+        self.user_store = otils::sort(std::mem::take(&mut self.user_store), &self.pool);
         self.propagate_send_indices();
 
-        otils::compact(
-            &mut self.user_store[..],
-            |r| r.is_request(),
-            self.num_threads,
-        );
+        otils::compact(&mut self.user_store[..], |r| r.is_request(), &self.pool);
         let requests = self.user_store.drain(0..num_requests).collect();
 
         otils::compact(
             &mut self.user_store[..],
             |r| r.is_updated_user_store(),
-            self.num_threads,
+            &self.pool,
         );
 
         self.user_store.truncate(self.num_users as usize);
@@ -162,24 +169,21 @@ impl LoadBalancer {
 
         let mut remaining_submaps = &mut self.submaps[..];
 
-        thread::scope(|s| {
-            for _ in 0..self.num_submaps - 1 {
+        self.pool.scope(|s| {
+            for _ in 0..self.num_submaps {
                 let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
                 remaining_submaps = rest_submaps;
 
                 let batch = requests.drain(0..submap_size).collect();
-                s.spawn(|| submap[0].batch_send(batch));
+                s.spawn(|_| submap[0].batch_send(batch));
             }
 
-            let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
-            remaining_submaps = rest_submaps;
+            // let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
+            // remaining_submaps = rest_submaps;
 
-            let batch = requests.drain(0..submap_size).collect();
-            submap[0].batch_send(batch);
+            // let batch = requests.drain(0..submap_size).collect();
+            // submap[0].batch_send(batch);
         });
-
-        // parallelize
-        for _ in 0..self.num_submaps {}
     }
 
     fn update_with_fetches(&mut self, fetches: Vec<IndexRecord>, num_fetches: usize) {
@@ -219,20 +223,16 @@ impl LoadBalancer {
     ) -> Vec<IndexRecord> {
         self.update_with_fetches(fetches, num_requests);
 
-        self.user_store = otils::sort(std::mem::take(&mut self.user_store), self.num_threads);
+        self.user_store = otils::sort(std::mem::take(&mut self.user_store), &self.pool);
         self.propagate_fetch_indices();
 
-        otils::compact(
-            &mut self.user_store[..],
-            |r| r.is_request(),
-            self.num_threads,
-        );
+        otils::compact(&mut self.user_store[..], |r| r.is_request(), &self.pool);
         let deliver = self.user_store.drain(0..num_requests).collect();
 
         otils::compact(
             &mut self.user_store[..],
             |r| r.is_updated_user_store(),
-            self.num_threads,
+            &self.pool,
         );
 
         self.user_store.truncate(self.num_users as usize);
@@ -278,33 +278,31 @@ impl LoadBalancer {
         // println!("submap requests {}: {}", requests.len(), end - start);
 
         let mut remaining_submaps = &mut self.submaps[..];
-        let mut responses: Vec<IndexRecord> = Vec::with_capacity(submap_size * self.num_submaps);
+        let responses: Arc<Mutex<Vec<IndexRecord>>> = Arc::new(Mutex::new(Vec::with_capacity(
+            submap_size * self.num_submaps,
+        )));
 
         // let start = std::time::SystemTime::now()
         //     .duration_since(UNIX_EPOCH)
         //     .unwrap()
         //     .as_nanos();
-        thread::scope(|s| {
-            let mut handles = Vec::new();
-            for _ in 0..self.num_submaps - 1 {
+        self.pool.scope(|s| {
+            for _ in 0..self.num_submaps {
                 let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
                 remaining_submaps = rest_submaps;
                 let batch = requests.drain(0..submap_size).collect();
 
-                handles.push(s.spawn(|| submap[0].batch_fetch(batch)));
-            }
-
-            let (submap, rest_submaps) = remaining_submaps.split_at_mut(1);
-            remaining_submaps = rest_submaps;
-            let batch = requests.drain(0..submap_size).collect();
-
-            let response = submap[0].batch_fetch(batch);
-
-            responses.extend(response);
-            for handle in handles.into_iter() {
-                responses.extend(handle.join().unwrap());
+                s.spawn(|_| {
+                    let responses = Arc::clone(&responses);
+                    let response = submap[0].batch_fetch(batch);
+                    let mut responses = responses.lock().unwrap();
+                    responses.extend(response);
+                });
             }
         });
+
+        let mutex = Arc::into_inner(responses).unwrap();
+        let mut responses: Vec<IndexRecord> = mutex.into_inner().unwrap();
         // let end = std::time::SystemTime::now()
         //     .duration_since(UNIX_EPOCH)
         //     .unwrap()
@@ -316,8 +314,8 @@ impl LoadBalancer {
         //     .duration_since(UNIX_EPOCH)
         //     .unwrap()
         //     .as_nanos();
-        responses = otils::sort(responses, self.num_threads);
-        otils::compact(&mut responses, |r| r.0.is_send(), self.num_threads);
+        responses = otils::sort(responses, &self.pool);
+        otils::compact(&mut responses, |r| r.0.is_send(), &self.pool);
         // let end = std::time::SystemTime::now()
         //     .duration_since(UNIX_EPOCH)
         //     .unwrap()

+ 6 - 1
sparta/src/main.rs

@@ -51,11 +51,16 @@ fn main() {
                 .duration_since(UNIX_EPOCH)
                 .unwrap()
                 .as_nanos();
-            let _ = l.batch_fetch(vec![Record::fetch(0, args.fetches)]);
+            let _responses = l.batch_fetch(vec![Record::fetch(0, args.fetches)]);
             let end = std::time::SystemTime::now()
                 .duration_since(UNIX_EPOCH)
                 .unwrap()
                 .as_nanos();
+
+            // for response in responses.iter() {
+            //     println!("{:?}", response);
+            // }
+
             end - start
         })
         .collect();

+ 14 - 6
sparta/src/omap.rs

@@ -1,5 +1,6 @@
 use crate::record::{IndexRecord, Record, RecordType};
 use otils::{Max, ObliviousOps};
+use rayon::ThreadPool;
 use std::cmp::Ordering;
 
 struct MapRecord(Record);
@@ -41,22 +42,27 @@ impl Max for MapRecord {
     }
 }
 
-#[derive(Default)]
 pub struct ObliviousMap {
-    num_threads: usize,
+    pool: ThreadPool,
     message_store: Vec<MapRecord>,
 }
 
 impl ObliviousMap {
     pub fn new(num_threads: usize) -> Self {
+        let pool = rayon::ThreadPoolBuilder::new()
+            .num_threads(num_threads)
+            .build()
+            .unwrap();
+
         let message_store = Vec::new();
         ObliviousMap {
-            num_threads,
+            pool,
             message_store,
         }
     }
 
     pub fn batch_send(&mut self, requests: Vec<Record>) {
+        // println!("num sends {}", requests.len());
         self.message_store.reserve(requests.len());
         self.message_store
             .extend(requests.into_iter().map(|r| MapRecord(r)));
@@ -78,12 +84,14 @@ impl ObliviousMap {
     }
 
     pub fn batch_fetch(&mut self, requests: Vec<Record>) -> Vec<IndexRecord> {
+        // println!("num fetches {}", requests.len());
+
         let final_size = self.message_store.len();
         let num_requests = requests.len();
 
         self.update_with_fetches(requests);
 
-        self.message_store = otils::sort(std::mem::take(&mut self.message_store), self.num_threads);
+        self.message_store = otils::sort(std::mem::take(&mut self.message_store), &self.pool);
 
         let mut prev_idx = u32::MAX;
         let mut remaining = 0;
@@ -98,7 +106,7 @@ impl ObliviousMap {
         otils::compact(
             &mut self.message_store[..],
             |r| r.should_deliver(),
-            self.num_threads,
+            &self.pool,
         );
         let response = self
             .message_store
@@ -109,7 +117,7 @@ impl ObliviousMap {
         otils::compact(
             &mut self.message_store[..],
             |record| record.should_defer(),
-            self.num_threads,
+            &self.pool,
         );
         self.message_store.truncate(final_size);
         response