|
@@ -1,3 +1,4 @@
|
|
|
+use mgen::MessageHeaderRef;
|
|
|
use mgen::{log, parse_identifier, updater::Updater, SerializedMessage};
|
|
|
use std::collections::HashMap;
|
|
|
use std::error::Error;
|
|
@@ -50,7 +51,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
|
|
|
// start the new reader thread with a new notify
|
|
|
let notify = Arc::new(Notify::new());
|
|
|
- spawn_message_receiver(rd, snd_db.clone(), notify.clone());
|
|
|
+ spawn_message_receiver(id, rd, snd_db.clone(), notify.clone());
|
|
|
|
|
|
// give the writer thread the new write half of the socket and notify
|
|
|
socket_updater.send((wr, notify)).await;
|
|
@@ -79,7 +80,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
|
|
|
writer_db.insert(id.clone(), socket_updater_snd);
|
|
|
|
|
|
- spawn_message_receiver(rd, snd_db, notify);
|
|
|
+ spawn_message_receiver(id, rd, snd_db, notify);
|
|
|
tokio::spawn(async move {
|
|
|
send_messages(msg_rcv, socket_updater_rcv).await;
|
|
|
});
|
|
@@ -88,13 +89,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
}
|
|
|
|
|
|
fn spawn_message_receiver(
|
|
|
+ sender: String,
|
|
|
rd: OwnedReadHalf,
|
|
|
db: Arc<RwLock<HashMap<ID, mpsc::UnboundedSender<Arc<SerializedMessage>>>>>,
|
|
|
notify: Arc<Notify>,
|
|
|
) {
|
|
|
tokio::spawn(async move {
|
|
|
tokio::select! {
|
|
|
- ret = get_messages(rd, db) => {
|
|
|
+ ret = get_messages(&sender, rd, db) => {
|
|
|
if let Err(e) = ret {
|
|
|
log!("message receiver failed: {:?}", e);
|
|
|
}
|
|
@@ -110,46 +112,47 @@ fn spawn_message_receiver(
|
|
|
/// Loop for receiving messages on the socket, figuring out who to deliver them to,
|
|
|
/// and forwarding them locally to the respective channel.
|
|
|
async fn get_messages(
|
|
|
+ sender: &str,
|
|
|
mut socket: OwnedReadHalf,
|
|
|
db: Arc<RwLock<HashMap<ID, mpsc::UnboundedSender<Arc<SerializedMessage>>>>>,
|
|
|
) -> Result<(), Box<dyn Error>> {
|
|
|
// stores snd's for contacts this client has already sent messages to, to reduce contention on the main db
|
|
|
// if memory ends up being more of a constraint, could be worth getting rid of this
|
|
|
let mut localdb = HashMap::<ID, mpsc::UnboundedSender<Arc<SerializedMessage>>>::new();
|
|
|
- let mut sender = "".to_string();
|
|
|
loop {
|
|
|
log!("waiting for message from {}", sender);
|
|
|
- let (message, buf) = mgen::get_message(&mut socket).await?;
|
|
|
+ let buf = mgen::get_message_bytes(&mut socket).await?;
|
|
|
+ let message = MessageHeaderRef::deserialize(&buf[4..])?;
|
|
|
log!(
|
|
|
"got message from {} for {:?}",
|
|
|
message.sender,
|
|
|
message.recipients
|
|
|
);
|
|
|
- sender = message.sender;
|
|
|
+
|
|
|
let missing = message
|
|
|
.recipients
|
|
|
.iter()
|
|
|
- .filter(|&r| !localdb.contains_key(r))
|
|
|
- .collect::<Vec<&ID>>();
|
|
|
+ .filter(|&&r| !localdb.contains_key(r))
|
|
|
+ .collect::<Vec<&&str>>();
|
|
|
|
|
|
{
|
|
|
let locked_db = db.read().unwrap();
|
|
|
for m in missing {
|
|
|
- if let Some(snd) = locked_db.get(m) {
|
|
|
+ if let Some(snd) = locked_db.get(*m) {
|
|
|
localdb.insert(m.to_string(), snd.clone());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let m = Arc::new(SerializedMessage {
|
|
|
- header: buf,
|
|
|
- body: message.body,
|
|
|
- });
|
|
|
+ let body = message.body;
|
|
|
+
|
|
|
+ let m = Arc::new(SerializedMessage { header: buf, body });
|
|
|
+
|
|
|
+ // FIXME: we could avoid this second deserialization if we stored recipients by group instead of in the message itself
|
|
|
+ let message = MessageHeaderRef::deserialize(&m.header[4..])?;
|
|
|
|
|
|
for recipient in message.recipients.iter() {
|
|
|
- let recipient_sender = localdb
|
|
|
- .get(recipient)
|
|
|
- .unwrap_or_else(|| panic!("Unknown sender: {}", recipient));
|
|
|
+ let recipient_sender = &localdb[*recipient];
|
|
|
recipient_sender
|
|
|
.send(m.clone())
|
|
|
.expect("Recipient closed channel with messages still being sent");
|