|
@@ -284,19 +284,23 @@ async fn writer<'a>(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// This user or a recipient.
|
|
|
-/// If this user, address is a local address to listen on.
|
|
|
-/// If a recipient, address is a remote address to send to.
|
|
|
-#[derive(Debug, Deserialize)]
|
|
|
-struct Peer {
|
|
|
- name: String,
|
|
|
- address: String,
|
|
|
+fn parse_hosts_file(file_contents: &str) -> HashMap<&str, &str> {
|
|
|
+ let mut ret = HashMap::new();
|
|
|
+ for line in file_contents.lines() {
|
|
|
+ let mut words = line.split_ascii_whitespace();
|
|
|
+ if let Some(addr) = words.next() {
|
|
|
+ for name in words {
|
|
|
+ ret.insert(name, addr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ret
|
|
|
}
|
|
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
struct ConversationConfig {
|
|
|
group: String,
|
|
|
- recipients: Vec<Peer>,
|
|
|
+ recipients: Vec<String>,
|
|
|
bootstrap: f64,
|
|
|
retry: f64,
|
|
|
distributions: ConfigDistributions,
|
|
@@ -304,13 +308,15 @@ struct ConversationConfig {
|
|
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
struct Config {
|
|
|
- user: Peer,
|
|
|
+ user: String,
|
|
|
socks: Option<String>,
|
|
|
+ listen: Option<String>,
|
|
|
conversations: Vec<ConversationConfig>,
|
|
|
}
|
|
|
|
|
|
fn process_config(
|
|
|
config: Config,
|
|
|
+ hosts_map: &HashMap<&str, &str>,
|
|
|
handles: &mut Vec<JoinHandle<Result<(), FatalError>>>,
|
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
|
struct ForIoThreads {
|
|
@@ -332,15 +338,18 @@ fn process_config(
|
|
|
);
|
|
|
|
|
|
for recipient in conversation.recipients.iter() {
|
|
|
- let state_to_writer = if !recipient_map.contains_key(&recipient.name) {
|
|
|
+ let state_to_writer = if !recipient_map.contains_key(recipient) {
|
|
|
let (state_to_writer, writer_from_state) = mpsc::unbounded_channel();
|
|
|
let mut reader_to_states = HashMap::new();
|
|
|
reader_to_states.insert(conversation.group.clone(), reader_to_state.clone());
|
|
|
+ let address = hosts_map
|
|
|
+ .get(recipient.as_str())
|
|
|
+ .unwrap_or_else(|| panic!("recipient not in hosts file: {}", recipient));
|
|
|
let str_params = SocksParams {
|
|
|
socks: config.socks.clone(),
|
|
|
- target: recipient.address.clone(),
|
|
|
- user: config.user.name.clone(),
|
|
|
- recipient: recipient.name.clone(),
|
|
|
+ target: address.to_string(),
|
|
|
+ user: config.user.clone(),
|
|
|
+ recipient: recipient.clone(),
|
|
|
};
|
|
|
let for_io = ForIoThreads {
|
|
|
state_to_writer: state_to_writer.clone(),
|
|
@@ -349,10 +358,10 @@ fn process_config(
|
|
|
str_params,
|
|
|
retry: conversation.retry,
|
|
|
};
|
|
|
- recipient_map.insert(recipient.name.clone(), for_io);
|
|
|
+ recipient_map.insert(recipient.clone(), for_io);
|
|
|
state_to_writer
|
|
|
} else {
|
|
|
- let for_io = recipient_map.get_mut(&recipient.name).unwrap();
|
|
|
+ let for_io = recipient_map.get_mut(recipient).unwrap();
|
|
|
if !for_io.reader_to_states.contains_key(&conversation.group) {
|
|
|
for_io
|
|
|
.reader_to_states
|
|
@@ -361,7 +370,7 @@ fn process_config(
|
|
|
for_io.state_to_writer.clone()
|
|
|
};
|
|
|
conversation_recipient_map.insert(
|
|
|
- recipient.name.clone(),
|
|
|
+ recipient.clone(),
|
|
|
StateToWriter {
|
|
|
channel: state_to_writer,
|
|
|
},
|
|
@@ -371,7 +380,7 @@ fn process_config(
|
|
|
let distributions: Distributions = conversation.distributions.try_into()?;
|
|
|
|
|
|
tokio::spawn(manage_conversation(
|
|
|
- config.user.name.clone(),
|
|
|
+ config.user.clone(),
|
|
|
conversation.group,
|
|
|
distributions,
|
|
|
conversation.bootstrap,
|
|
@@ -406,8 +415,16 @@ fn process_config(
|
|
|
handles.push(handle);
|
|
|
}
|
|
|
|
|
|
+ let address = if let Some(address) = config.listen {
|
|
|
+ address
|
|
|
+ } else {
|
|
|
+ hosts_map
|
|
|
+ .get(config.user.as_str())
|
|
|
+ .unwrap_or_else(|| panic!("user not found in hosts file: {}", config.user))
|
|
|
+ .to_string()
|
|
|
+ };
|
|
|
let handle: JoinHandle<Result<(), FatalError>> =
|
|
|
- tokio::spawn(listener(config.user.address, name_to_io_threads));
|
|
|
+ tokio::spawn(listener(address, name_to_io_threads));
|
|
|
handles.push(handle);
|
|
|
Ok(())
|
|
|
}
|
|
@@ -416,13 +433,16 @@ fn process_config(
|
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
let mut args = std::env::args();
|
|
|
let _ = args.next();
|
|
|
+ let hosts_file = std::fs::read_to_string(args.next().unwrap())?;
|
|
|
+ let hosts_map = parse_hosts_file(&hosts_file);
|
|
|
+ println!("{:?}", hosts_map);
|
|
|
|
|
|
let mut handles = vec![];
|
|
|
|
|
|
for config_file in args.flat_map(|a| glob::glob(a.as_str()).unwrap()) {
|
|
|
let yaml_s = std::fs::read_to_string(config_file?)?;
|
|
|
let config: Config = serde_yaml::from_str(&yaml_s)?;
|
|
|
- process_config(config, &mut handles)?;
|
|
|
+ process_config(config, &hosts_map, &mut handles)?;
|
|
|
}
|
|
|
|
|
|
let handles: futures::stream::FuturesUnordered<_> = handles.into_iter().collect();
|