Bladeren bron

make peer configs per-user, not per-conversation

Justin Tracey 1 jaar geleden
bovenliggende
commit
8c852bbd4b

+ 0 - 14
shadow/peer/shadow.data.template/hosts/peer1/alice-group1.toml

@@ -1,14 +0,0 @@
-user = {name = "Alice", address = "100.0.0.1:6397"}
-group = "group1"
-recipients = [{name = "Bob", address = "100.0.0.2:6397"}, {name = "Carol", address = "100.0.0.2:6398"}]
-bootstrap = 5.0
-retry = 5.0
-
-[distributions]
-s = 0.5
-r = 0.1
-m = {distribution = "Poisson", lambda = 1.0}
-i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
-w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
-a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
-a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}

+ 0 - 14
shadow/peer/shadow.data.template/hosts/peer1/alice-group2.toml

@@ -1,14 +0,0 @@
-user = {name = "Alice", address = "100.0.0.1:6397"}
-group = "group2"
-recipients = [{name = "Bob", address = "100.0.0.2:6397"}, {name = "Dave", address = "100.0.0.3:6397"}]
-bootstrap = 5.0
-retry = 5.0
-
-[distributions]
-s = 0.5
-r = 0.1
-m = {distribution = "Poisson", lambda = 1.0}
-i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
-w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
-a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
-a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}

+ 39 - 0
shadow/peer/shadow.data.template/hosts/peer1/alice.toml

@@ -0,0 +1,39 @@
+user = {name = "Alice", address = "100.0.0.1:6397"}
+
+[[conversations]]
+group = "group1"
+recipients = [
+  { name = "Bob", address = "100.0.0.2:6397" },
+  { name = "Carol", address = "100.0.0.2:6398" }
+]
+bootstrap = 5.0
+retry = 5.0
+
+[conversations.distributions]
+s = 0.5
+r = 0.1
+m = { distribution = "Poisson", lambda = 1.0 }
+i = { distribution = "Normal", mean = 15.0, std_dev = 30.0 }
+w = { distribution = "Normal", mean = 30.0, std_dev = 30.0 }
+a_s = { distribution = "Normal", mean = 10.0, std_dev = 5.0 }
+a_r = { distribution = "Normal", mean = 10.0, std_dev = 5.0 }
+
+
+[[conversations]]
+group = "group2"
+recipients = [
+  { name = "Bob", address = "100.0.0.2:6397" },
+  { name = "Dave", address = "100.0.0.3:6397" }
+]
+bootstrap = 5.0
+retry = 5.0
+
+[conversations.distributions]
+s = 0.5
+r = 0.1
+m = { distribution = "Poisson", lambda = 1.0 }
+i = { distribution = "Normal", mean = 15.0, std_dev = 30.0 }
+w = { distribution = "Normal", mean = 30.0, std_dev = 30.0 }
+a_s = { distribution = "Normal", mean = 10.0, std_dev = 5.0 }
+a_r = { distribution = "Normal", mean = 10.0, std_dev = 5.0 }
+

+ 0 - 14
shadow/peer/shadow.data.template/hosts/peer2/bob-group1.toml

@@ -1,14 +0,0 @@
-user = {name = "Bob", address = "100.0.0.2:6397"}
-group = "group1"
-recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Carol", address = "100.0.0.2:6398"}]
-bootstrap = 5.0
-retry = 5.0
-
-[distributions]
-s = 0.5
-r = 0.1
-m = {distribution = "Poisson", lambda = 1.0}
-i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
-w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
-a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
-a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}

+ 0 - 14
shadow/peer/shadow.data.template/hosts/peer2/bob-group2.toml

@@ -1,14 +0,0 @@
-user = {name = "Bob", address = "100.0.0.2:6397"}
-group = "group2"
-recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Dave", address = "100.0.0.3:6397"}]
-bootstrap = 5.0
-retry = 5.0
-
-[distributions]
-s = 0.5
-r = 0.1
-m = {distribution = "Poisson", lambda = 1.0}
-i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
-w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
-a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
-a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}

+ 32 - 0
shadow/peer/shadow.data.template/hosts/peer2/bob.toml

@@ -0,0 +1,32 @@
+user = {name = "Bob", address = "100.0.0.2:6397"}
+
+[[conversations]]
+group = "group1"
+recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Carol", address = "100.0.0.2:6398"}]
+bootstrap = 5.0
+retry = 5.0
+
+[conversations.distributions]
+s = 0.5
+r = 0.1
+m = {distribution = "Poisson", lambda = 1.0}
+i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
+w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
+a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
+a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
+
+
+[[conversations]]
+group = "group2"
+recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Dave", address = "100.0.0.3:6397"}]
+bootstrap = 5.0
+retry = 5.0
+
+[conversations.distributions]
+s = 0.5
+r = 0.1
+m = {distribution = "Poisson", lambda = 1.0}
+i = {distribution = "Normal", mean = 15.0, std_dev = 30.0}
+w = {distribution = "Normal", mean = 30.0, std_dev = 30.0}
+a_s = {distribution = "Normal", mean = 10.0, std_dev = 5.0}
+a_r = {distribution = "Normal", mean = 10.0, std_dev = 5.0}

+ 4 - 2
shadow/peer/shadow.data.template/hosts/peer2/carol-group1.toml → shadow/peer/shadow.data.template/hosts/peer2/carol.toml

@@ -1,10 +1,12 @@
-user = {name = "Carol", address = "100.0.0.1:6398"}
+user = {name = "Carol", address = "100.0.0.2:6398"}
+
+[[conversations]]
 group = "group1"
 recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Bob", address = "100.0.0.2:6397"}]
 bootstrap = 5.0
 retry = 5.0
 
-[distributions]
+[conversations.distributions]
 s = 0.5
 r = 0.1
 m = {distribution = "Poisson", lambda = 1.0}

+ 3 - 1
shadow/peer/shadow.data.template/hosts/peer3/dave-group2.toml → shadow/peer/shadow.data.template/hosts/peer3/dave.toml

@@ -1,10 +1,12 @@
 user = {name = "Dave", address = "100.0.0.3:6397"}
+
+[[conversations]]
 group = "group2"
 recipients = [{name = "Alice", address = "100.0.0.1:6397"}, {name = "Bob", address = "100.0.0.2:6397"}]
 bootstrap = 5.0
 retry = 5.0
 
-[distributions]
+[conversations.distributions]
 s = 0.5
 r = 0.1
 m = {distribution = "Poisson", lambda = 1.0}

+ 64 - 65
src/bin/mgen-peer.rs

@@ -294,16 +294,21 @@ struct Peer {
 }
 
 #[derive(Debug, Deserialize)]
-struct Config {
-    user: Peer,
+struct ConversationConfig {
     group: String,
     recipients: Vec<Peer>,
-    socks: Option<String>,
     bootstrap: f64,
     retry: f64,
     distributions: ConfigDistributions,
 }
 
+#[derive(Debug, Deserialize)]
+struct Config {
+    user: Peer,
+    socks: Option<String>,
+    conversations: Vec<ConversationConfig>,
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let mut args = std::env::args();
@@ -317,77 +322,71 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         retry: f64,
     }
 
-    // a map from `user` to {a map from `recipient` to things the (user, recipient) reader/writer threads will need}
-    let mut recipient_maps = HashMap::<String, HashMap<String, ForIoThreads>>::new();
-
     let mut handles = vec![];
 
     for config_file in args.flat_map(|a| glob::glob(a.as_str()).unwrap()) {
         let toml_s = std::fs::read_to_string(config_file?)?;
         let config: Config = toml::from_str(&toml_s)?;
 
-        let (reader_to_state, state_from_reader) = mpsc::unbounded_channel();
-
-        if !recipient_maps.contains_key(&config.user.address) {
-            let map = HashMap::<String, ForIoThreads>::new();
-            recipient_maps.insert(config.user.address.clone(), map);
-        }
-
-        let user_recipient_map = recipient_maps.get_mut(&config.user.address).unwrap();
-
-        let mut conversation_recipient_map =
-            HashMap::<String, StateToWriter<MessageHolder>>::with_capacity(config.recipients.len());
-
-        for recipient in config.recipients.iter() {
-            let state_to_writer = if !user_recipient_map.contains_key(&recipient.name) {
-                let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
-                let mut reader_to_states = HashMap::new();
-                reader_to_states.insert(config.group.clone(), reader_to_state.clone());
-                let str_params = SocksParams {
-                    socks: config.socks.clone(),
-                    target: recipient.address.clone(),
-                    user: config.user.name.clone(),
-                    recipient: recipient.name.clone(),
-                };
-                let for_io = ForIoThreads {
-                    state_to_writer: state_to_writer.clone(),
-                    writer_from_state,
-                    reader_to_states,
-                    str_params,
-                    retry: config.retry,
+        // map from `recipient` to things the (user, recipient) reader/writer threads will need
+        let mut recipient_map = HashMap::<String, ForIoThreads>::new();
+        for conversation in config.conversations.into_iter() {
+            let (reader_to_state, state_from_reader) = mpsc::unbounded_channel();
+
+            let mut conversation_recipient_map =
+                HashMap::<String, StateToWriter<MessageHolder>>::with_capacity(
+                    conversation.recipients.len(),
+                );
+
+            for recipient in conversation.recipients.iter() {
+                let state_to_writer = if !recipient_map.contains_key(&recipient.name) {
+                    let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
+                    let mut reader_to_states = HashMap::new();
+                    reader_to_states.insert(conversation.group.clone(), reader_to_state.clone());
+                    let str_params = SocksParams {
+                        socks: config.socks.clone(),
+                        target: recipient.address.clone(),
+                        user: config.user.name.clone(),
+                        recipient: recipient.name.clone(),
+                    };
+                    let for_io = ForIoThreads {
+                        state_to_writer: state_to_writer.clone(),
+                        writer_from_state,
+                        reader_to_states,
+                        str_params,
+                        retry: conversation.retry,
+                    };
+                    recipient_map.insert(recipient.name.clone(), for_io);
+                    state_to_writer
+                } else {
+                    let for_io = recipient_map.get_mut(&recipient.name).unwrap();
+                    if !for_io.reader_to_states.contains_key(&conversation.group) {
+                        for_io
+                            .reader_to_states
+                            .insert(conversation.group.clone(), reader_to_state.clone());
+                    }
+                    for_io.state_to_writer.clone()
                 };
-                user_recipient_map.insert(recipient.name.clone(), for_io);
-                state_to_writer
-            } else {
-                let for_io = user_recipient_map.get_mut(&recipient.name).unwrap();
-                if !for_io.reader_to_states.contains_key(&config.group) {
-                    for_io
-                        .reader_to_states
-                        .insert(config.group.clone(), reader_to_state.clone());
-                }
-                for_io.state_to_writer.clone()
-            };
-            conversation_recipient_map.insert(
-                recipient.name.clone(),
-                StateToWriter {
-                    channel: state_to_writer,
-                },
-            );
-        }
+                conversation_recipient_map.insert(
+                    recipient.name.clone(),
+                    StateToWriter {
+                        channel: state_to_writer,
+                    },
+                );
+            }
 
-        let distributions: Distributions = config.distributions.try_into()?;
+            let distributions: Distributions = conversation.distributions.try_into()?;
 
-        tokio::spawn(manage_conversation(
-            config.user.name,
-            config.group,
-            distributions,
-            config.bootstrap,
-            state_from_reader,
-            conversation_recipient_map,
-        ));
-    }
+            tokio::spawn(manage_conversation(
+                config.user.name.clone(),
+                conversation.group,
+                distributions,
+                conversation.bootstrap,
+                state_from_reader,
+                conversation_recipient_map,
+            ));
+        }
 
-    for (address, mut recipient_map) in recipient_maps.drain() {
         let mut name_to_io_threads: HashMap<String, (ReadSocketUpdaterIn, WriteSocketUpdaterIn)> =
             HashMap::new();
 
@@ -415,7 +414,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         }
 
         let handle: task::JoinHandle<Result<(), FatalError>> =
-            tokio::spawn(listener(address.clone(), name_to_io_threads));
+            tokio::spawn(listener(config.user.address, name_to_io_threads));
         handles.push(handle);
     }