|
@@ -25,7 +25,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
};
|
|
|
let listener = TcpListener::bind(listen_addr).await?;
|
|
|
|
|
|
- log!("Listening on {}", listen_addr);
|
|
|
+ log!("listening,{}", listen_addr);
|
|
|
|
|
|
// Maps group name to the table of message channels.
|
|
|
let mut snd_db = HashMap::<ID, Arc<RwLock<HashMap<ID, ReaderToSender>>>>::new();
|
|
@@ -35,19 +35,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
loop {
|
|
|
let socket = match listener.accept().await {
|
|
|
Ok((socket, _)) => socket,
|
|
|
- Err(_) => {
|
|
|
- log!("connection failed during accept");
|
|
|
+ Err(e) => {
|
|
|
+ log!("failed,accept,{}", e.kind());
|
|
|
continue;
|
|
|
}
|
|
|
};
|
|
|
let (mut rd, wr) = socket.into_split();
|
|
|
|
|
|
- let handshake = mgen::get_handshake(&mut rd).await?;
|
|
|
- log!(
|
|
|
- "Accepting channel from {} to {}",
|
|
|
- handshake.sender,
|
|
|
- handshake.group
|
|
|
- );
|
|
|
+ let handshake = match mgen::get_handshake(&mut rd).await {
|
|
|
+ Ok(handshake) => handshake,
|
|
|
+ Err(mgen::Error::Io(e)) => {
|
|
|
+ log!("failed,handshake,{}", e.kind());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Err(mgen::Error::Utf8Error(e)) => panic!("{:?}", e),
|
|
|
+ Err(mgen::Error::MalformedSerialization(_, _)) => panic!(),
|
|
|
+ };
|
|
|
+ log!("accept,{},{}", handshake.sender, handshake.group);
|
|
|
|
|
|
if let Some(socket_updater) = writer_db.get(&handshake) {
|
|
|
// we've seen this client before
|
|
@@ -64,7 +68,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
socket_updater.send((wr, notify));
|
|
|
} else {
|
|
|
// newly-registered client
|
|
|
- log!("New client");
|
|
|
+ log!("register,{},{}", handshake.sender, handshake.group);
|
|
|
|
|
|
// message channel, for sending messages between threads
|
|
|
let (msg_snd, msg_rcv) = mpsc::unbounded_channel::<Arc<SerializedMessage>>();
|
|
@@ -96,11 +100,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
group_snds.clone(),
|
|
|
notify,
|
|
|
);
|
|
|
- writer_db.insert(handshake, socket_updater_snd);
|
|
|
|
|
|
+ let sender = handshake.sender.clone();
|
|
|
+ let group = handshake.group.clone();
|
|
|
tokio::spawn(async move {
|
|
|
- send_messages(msg_rcv, socket_updater_rcv).await;
|
|
|
+ send_messages(sender, group, msg_rcv, socket_updater_rcv).await;
|
|
|
});
|
|
|
+
|
|
|
+ writer_db.insert(handshake, socket_updater_snd);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -117,13 +124,17 @@ fn spawn_message_receiver(
|
|
|
// n.b.: get_message is not cancellation safe,
|
|
|
// but this is one of the cases where that's expected
|
|
|
// (we only cancel when something is wrong with the stream anyway)
|
|
|
- ret = get_messages(sender, group, rd, db) => {
|
|
|
- if let Err(e) = ret {
|
|
|
- log!("message receiver failed: {:?}", e);
|
|
|
+ ret = get_messages(&sender, &group, rd, db) => {
|
|
|
+ match ret {
|
|
|
+ Err(mgen::Error::Io(e)) => log!("failed,receive,{}", e.kind()),
|
|
|
+ Err(mgen::Error::Utf8Error(e)) => panic!("{:?}", e),
|
|
|
+ Err(mgen::Error::MalformedSerialization(v, b)) => panic!(
|
|
|
+ "Malformed Serialization: {:?}\n{:?})", v, b),
|
|
|
+ Ok(()) => panic!("Message receiver returned OK"),
|
|
|
}
|
|
|
}
|
|
|
_ = notify.notified() => {
|
|
|
- log!("receiver terminated");
|
|
|
+ log!("terminated,{},{}", sender, group);
|
|
|
// should cause get_messages to terminate, dropping the socket
|
|
|
}
|
|
|
}
|
|
@@ -133,14 +144,13 @@ fn spawn_message_receiver(
|
|
|
/// Loop for receiving messages on the socket, figuring out who to deliver them to,
|
|
|
/// and forwarding them locally to the respective channel.
|
|
|
async fn get_messages(
|
|
|
- sender: String,
|
|
|
- group: String,
|
|
|
+ sender: &str,
|
|
|
+ group: &str,
|
|
|
mut socket: OwnedReadHalf,
|
|
|
global_db: Arc<RwLock<HashMap<ID, ReaderToSender>>>,
|
|
|
-) -> Result<(), Box<dyn Error>> {
|
|
|
+) -> Result<(), mgen::Error> {
|
|
|
// Wait for the next message to be received before populating our local copy of the db,
|
|
|
// that way other clients have time to register.
|
|
|
- log!("waiting for message from {} to {}", sender, group);
|
|
|
let buf = mgen::get_message_bytes(&mut socket).await?;
|
|
|
|
|
|
let db = global_db.read().await.clone();
|
|
@@ -155,7 +165,7 @@ async fn get_messages(
|
|
|
match message.body {
|
|
|
MessageBody::Size(_) => {
|
|
|
assert!(message.group == group);
|
|
|
- log!("got message from {} for {}", sender, group);
|
|
|
+ log!("received,{},{}", sender, group);
|
|
|
let body = message.body;
|
|
|
let m = Arc::new(SerializedMessage { header: buf, body });
|
|
|
for recipient in message_channels.iter() {
|
|
@@ -163,6 +173,7 @@ async fn get_messages(
|
|
|
}
|
|
|
}
|
|
|
MessageBody::Receipt => {
|
|
|
+ log!("receipt,{},{}", sender, group);
|
|
|
let recipient = &db[message.group];
|
|
|
let body = message.body;
|
|
|
let m = Arc::new(SerializedMessage { header: buf, body });
|
|
@@ -172,7 +183,6 @@ async fn get_messages(
|
|
|
|
|
|
// we never have to update the DB again, so repeat the above, skipping that step
|
|
|
loop {
|
|
|
- log!("waiting for message from {} to {}", sender, group);
|
|
|
let buf = mgen::get_message_bytes(&mut socket).await?;
|
|
|
let message = MessageHeaderRef::deserialize(&buf[4..])?;
|
|
|
assert!(message.sender == sender);
|
|
@@ -180,7 +190,7 @@ async fn get_messages(
|
|
|
match message.body {
|
|
|
MessageBody::Size(_) => {
|
|
|
assert!(message.group == group);
|
|
|
- log!("got message from {} for {}", sender, group);
|
|
|
+ log!("received,{},{}", sender, group);
|
|
|
let body = message.body;
|
|
|
let m = Arc::new(SerializedMessage { header: buf, body });
|
|
|
for recipient in message_channels.iter() {
|
|
@@ -200,6 +210,8 @@ async fn get_messages(
|
|
|
/// Loop for receiving messages on the mpsc channel for this recipient,
|
|
|
/// and sending them out on the associated socket.
|
|
|
async fn send_messages(
|
|
|
+ recipient: ID,
|
|
|
+ group: ID,
|
|
|
mut msg_rcv: mpsc::UnboundedReceiver<Arc<SerializedMessage>>,
|
|
|
mut socket_updater: Updater<(OwnedWriteHalf, Arc<Notify>)>,
|
|
|
) {
|
|
@@ -211,12 +223,11 @@ async fn send_messages(
|
|
|
} else {
|
|
|
message_cache.unwrap()
|
|
|
};
|
|
|
- log!("sending message");
|
|
|
if message.write_all_to(&mut current_socket).await.is_err()
|
|
|
|| current_socket.flush().await.is_err()
|
|
|
{
|
|
|
message_cache = Some(message);
|
|
|
- log!("terminating connection");
|
|
|
+ log!("terminating,{},{}", recipient, group);
|
|
|
// socket is presumably closed, clean up and notify the listening end to close
|
|
|
// (all best-effort, we can ignore errors because it presumably means it's done)
|
|
|
current_watch.notify_one();
|
|
@@ -224,8 +235,8 @@ async fn send_messages(
|
|
|
|
|
|
// wait for the new socket
|
|
|
(current_socket, current_watch) = socket_updater.recv().await;
|
|
|
- log!("socket updated");
|
|
|
} else {
|
|
|
+ log!("sent,{},{}", recipient, group);
|
|
|
message_cache = None;
|
|
|
}
|
|
|
}
|