Browse Source

communicator: rework, use bincode

Lennart Braun 2 years ago
parent
commit
61feb28d55

+ 1 - 1
communicator/Cargo.toml

@@ -6,7 +6,7 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-funty = "2.0.0"
+bincode = "2.0.0-rc.2"
 
 [dev-dependencies]
 rand = "0.8.5"

+ 65 - 57
communicator/src/communicator.rs

@@ -1,63 +1,86 @@
-use crate::fut::{BytesFut, MyFut, MyMultiFut};
-use crate::AbstractCommunicator;
-use crate::Serializable;
+use crate::{AbstractCommunicator, Error, Fut, Serializable};
+use bincode;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io::{Read, Write};
-use std::sync::mpsc::{channel, sync_channel, Sender, SyncSender};
+use std::sync::mpsc::{channel, sync_channel, Receiver, Sender};
 use std::thread;
 
+pub struct MyFut<T: Serializable> {
+    data_rx: Receiver<Result<T, Error>>,
+}
+
+impl<T: Serializable> MyFut<T> {
+    pub fn new(data_rx: Receiver<Result<T, Error>>) -> Self {
+        Self { data_rx }
+    }
+}
+
+impl<T: Serializable> Fut<T> for MyFut<T> {
+    fn get(self) -> Result<T, Error> {
+        match self.data_rx.recv() {
+            Ok(x) => x,
+            Err(e) => Err(e.into()),
+        }
+    }
+}
+
 /// Thread to receive messages in the background.
 #[derive(Clone, Debug)]
 struct ReceiverThread {
-    data_request_tx: Sender<(usize, SyncSender<Vec<u8>>)>,
+    data_request_tx: Sender<Box<dyn FnOnce(&mut dyn Read) + Send>>,
 }
 
 impl ReceiverThread {
     pub fn from_reader<R: Debug + Read + Send + 'static>(mut reader: R) -> Self {
-        let (data_request_tx, data_request_rx) = channel::<(usize, SyncSender<Vec<u8>>)>();
+        let (data_request_tx, data_request_rx) = channel::<Box<dyn FnOnce(&mut dyn Read) + Send>>();
         let _join_handle = thread::spawn(move || {
-            for (size, sender) in data_request_rx.iter() {
-                let mut buf = vec![0u8; size];
-                reader.read_exact(&mut buf).expect("read failed");
-                sender.send(buf).expect("send failed");
+            for func in data_request_rx.iter() {
+                func(&mut reader);
             }
         });
-
         Self { data_request_tx }
     }
 
-    pub fn receive_bytes(&mut self, size: usize) -> BytesFut {
+    pub fn receive<T: Serializable>(&mut self) -> Result<MyFut<T>, Error> {
         let (data_tx, data_rx) = sync_channel(1);
         self.data_request_tx
-            .send((size, data_tx))
-            .expect("send failed");
-        BytesFut { size, data_rx }
+            .send(Box::new(move |mut reader: &mut dyn Read| {
+                let new: Result<T, Error> =
+                    bincode::decode_from_std_read(&mut reader, bincode::config::standard())
+                        .map_err(|e| e.into());
+                data_tx.send(new).expect("send failed");
+            }))?;
+        Ok(MyFut::new(data_rx.into()))
     }
 }
 
 /// Thread to send messages in the background.
 #[derive(Clone, Debug)]
 struct SenderThread {
-    data_tx: Sender<Vec<u8>>,
+    data_submission_tx: Sender<Box<dyn FnOnce(&mut dyn Write) + Send>>,
 }
 
 impl SenderThread {
     pub fn from_writer<W: Debug + Write + Send + 'static>(mut writer: W) -> Self {
-        let (data_tx, data_rx) = channel::<Vec<u8>>();
+        let (data_submission_tx, data_submission_rx) =
+            channel::<Box<dyn FnOnce(&mut dyn Write) + Send>>();
         let _join_handle = thread::spawn(move || {
-            for buf in data_rx.iter() {
-                writer.write_all(&buf).expect("write failed");
-                writer.flush().expect("flush failed");
+            for func in data_submission_rx.iter() {
+                func(&mut writer);
             }
             writer.flush().expect("flush failed");
         });
-
-        Self { data_tx }
+        Self { data_submission_tx }
     }
 
-    pub fn send_bytes(&mut self, buf: Vec<u8>) {
-        self.data_tx.send(buf).expect("send failed");
+    pub fn send<T: Serializable>(&mut self, data: T) -> Result<(), Error> {
+        self.data_submission_tx
+            .send(Box::new(move |mut writer: &mut dyn Write| {
+                bincode::encode_into_std_write(data, &mut writer, bincode::config::standard())
+                    .expect("encode failed");
+            }))?;
+        Ok(())
     }
 }
 
@@ -109,7 +132,6 @@ impl Communicator {
 
 impl AbstractCommunicator for Communicator {
     type Fut<T: Serializable> = MyFut<T>;
-    type MultiFut<T: Serializable> = MyMultiFut<T>;
 
     fn get_num_parties(&self) -> usize {
         self.num_parties
@@ -119,41 +141,27 @@ impl AbstractCommunicator for Communicator {
         self.my_id
     }
 
-    fn send<T: Serializable>(&mut self, party_id: usize, val: T) {
-        self.sender_threads
-            .get_mut(&party_id)
-            .expect(&format!("SenderThread for party {} not found", party_id))
-            .send_bytes(val.to_bytes())
-    }
-
-    fn send_slice<T: Serializable>(&mut self, party_id: usize, val: &[T]) {
-        let mut bytes = vec![0u8; val.len() * T::bytes_required()];
-        for (i, v) in val.iter().enumerate() {
-            bytes[i * T::bytes_required()..(i + 1) * T::bytes_required()]
-                .copy_from_slice(&v.to_bytes());
+    fn send<T: Serializable>(&mut self, party_id: usize, val: T) -> Result<(), Error> {
+        match self.sender_threads.get_mut(&party_id) {
+            Some(t) => {
+                t.send(val)?;
+                Ok(())
+            }
+            None => Err(Error::LogicError(format!(
+                "SenderThread for party {} not found",
+                party_id
+            ))),
         }
-        self.sender_threads
-            .get_mut(&party_id)
-            .expect(&format!("SenderThread for party {} not found", party_id))
-            .send_bytes(bytes);
-    }
-
-    fn receive<T: Serializable>(&mut self, party_id: usize) -> Self::Fut<T> {
-        let bytes_fut = self
-            .receiver_threads
-            .get_mut(&party_id)
-            .expect(&format!("ReceiverThread for party {} not found", party_id))
-            .receive_bytes(T::bytes_required());
-        MyFut::new(bytes_fut)
     }
 
-    fn receive_n<T: Serializable>(&mut self, party_id: usize, n: usize) -> Self::MultiFut<T> {
-        let bytes_fut = self
-            .receiver_threads
-            .get_mut(&party_id)
-            .expect(&format!("ReceiverThread for party {} not found", party_id))
-            .receive_bytes(n * T::bytes_required());
-        MyMultiFut::new(n, bytes_fut)
+    fn receive<T: Serializable>(&mut self, party_id: usize) -> Result<Self::Fut<T>, Error> {
+        match self.receiver_threads.get_mut(&party_id) {
+            Some(t) => t.receive::<T>(),
+            None => Err(Error::LogicError(format!(
+                "ReceiverThread for party {} not found",
+                party_id
+            ))),
+        }
     }
 
     fn shutdown(&mut self) {

+ 0 - 91
communicator/src/fut.rs

@@ -1,91 +0,0 @@
-use crate::{Error, Fut, MultiFut, Serializable};
-use std::marker::PhantomData;
-use std::sync::mpsc::Receiver;
-
-pub struct BytesFut {
-    pub size: usize,
-    pub data_rx: Receiver<Vec<u8>>,
-}
-
-impl BytesFut {
-    pub fn get(self) -> Vec<u8> {
-        let buf = self.data_rx.recv().expect("receive failed");
-        assert_eq!(buf.len(), self.size);
-        buf
-    }
-}
-
-pub struct MyFut<T: Serializable> {
-    bytes_fut: BytesFut,
-    _phantom: PhantomData<T>,
-}
-
-impl<T: Serializable> MyFut<T> {
-    pub fn new(bytes_fut: BytesFut) -> Self {
-        Self {
-            bytes_fut,
-            _phantom: PhantomData,
-        }
-    }
-}
-
-impl<T: Serializable> Fut<T> for MyFut<T> {
-    fn get(self) -> Result<T, Error> {
-        T::from_bytes(&self.bytes_fut.get())
-    }
-}
-
-pub struct MyMultiFut<T: Serializable> {
-    size: usize,
-    bytes_fut: BytesFut,
-    _phantom: PhantomData<T>,
-}
-
-impl<T: Serializable> MyMultiFut<T> {
-    pub fn new(size: usize, bytes_fut: BytesFut) -> Self {
-        Self {
-            size,
-            bytes_fut,
-            _phantom: PhantomData,
-        }
-    }
-}
-
-impl<T: Serializable> MultiFut<T> for MyMultiFut<T> {
-    fn len(&self) -> usize {
-        self.size
-    }
-
-    fn get(self) -> Result<Vec<T>, Error> {
-        let data_buf = self.bytes_fut.get();
-        if data_buf.len() != self.size * T::bytes_required() {
-            return Err(Error::DeserializationError(
-                "received buffer of unexpected size".to_owned(),
-            ));
-        }
-        let mut output = Vec::with_capacity(self.size);
-        for c in data_buf.chunks_exact(T::bytes_required()) {
-            match T::from_bytes(c) {
-                Ok(v) => output.push(v),
-                Err(e) => return Err(e),
-            }
-        }
-        Ok(output)
-    }
-
-    fn get_into(self, buf: &mut [T]) -> Result<(), Error> {
-        if buf.len() != self.size {
-            return Err(Error::DeserializationError(
-                "supplied buffer has unexpected size".to_owned(),
-            ));
-        }
-        let data_buf = self.bytes_fut.get();
-        if data_buf.len() != self.size * T::bytes_required() {
-            return Err(Error::DeserializationError(
-                "received buffer of unexpected size".to_owned(),
-            ));
-        }
-
-        Ok(())
-    }
-}

+ 47 - 63
communicator/src/lib.rs

@@ -1,11 +1,14 @@
 pub mod communicator;
-mod fut;
 pub mod tcp;
-pub mod traits;
 pub mod unix;
 
-use crate::traits::Serializable;
+use bincode::error::DecodeError;
 use std::io::Error as IoError;
+use std::sync::mpsc::{RecvError, SendError};
+
+pub trait Serializable: Clone + Send + 'static + bincode::Encode + bincode::Decode {}
+
+impl<T> Serializable for T where T: Clone + Send + 'static + bincode::Encode + bincode::Decode {}
 
 /// Represent data of type T that we expect to receive
 pub trait Fut<T> {
@@ -13,107 +16,59 @@ pub trait Fut<T> {
     fn get(self) -> Result<T, Error>;
 }
 
-/// Represent data consisting of multiple Ts that we expect to receive
-pub trait MultiFut<T> {
-    /// How many items of type T we expect.
-    fn len(&self) -> usize;
-    /// Wait until the data has arrived and obtain it.
-    fn get(self) -> Result<Vec<T>, Error>;
-    /// Wait until the data has arrived and write it into the provided buffer.
-    fn get_into(self, buf: &mut [T]) -> Result<(), Error>;
-}
-
 /// Abstract communication interface between multiple parties
 pub trait AbstractCommunicator: Clone {
     type Fut<T: Serializable>: Fut<T>;
-    type MultiFut<T: Serializable>: MultiFut<T>;
 
     /// How many parties N there are in total
     fn get_num_parties(&self) -> usize;
+
     /// My party id in [0, N)
     fn get_my_id(&self) -> usize;
 
     /// Send a message of type T to given party
-    fn send<T: Serializable>(&mut self, party_id: usize, val: T);
-    /// Send a message of multiple Ts to given party
-    fn send_slice<T: Serializable>(&mut self, party_id: usize, val: &[T]);
+    fn send<T: Serializable>(&mut self, party_id: usize, val: T) -> Result<(), Error>;
 
     /// Send a message of type T to next party
-    fn send_next<T: Serializable>(&mut self, val: T) {
-        self.send((self.get_my_id() + 1) % self.get_num_parties(), val);
-    }
-    /// Send a message of multiple Ts to next party
-    fn send_next_slice<T: Serializable>(&mut self, val: &[T]) {
-        self.send_slice((self.get_my_id() + 1) % self.get_num_parties(), val);
+    fn send_next<T: Serializable>(&mut self, val: T) -> Result<(), Error> {
+        self.send((self.get_my_id() + 1) % self.get_num_parties(), val)
     }
 
     /// Send a message of type T to previous party
-    fn send_previous<T: Serializable>(&mut self, val: T) {
+    fn send_previous<T: Serializable>(&mut self, val: T) -> Result<(), Error> {
         self.send(
             (self.get_num_parties() + self.get_my_id() - 1) % self.get_num_parties(),
             val,
-        );
-    }
-    /// Send a message of multiple Ts to previous party
-    fn send_previous_slice<T: Serializable>(&mut self, val: &[T]) {
-        self.send_slice(
-            (self.get_num_parties() + self.get_my_id() - 1) % self.get_num_parties(),
-            val,
-        );
+        )
     }
 
     /// Send a message of type T all parties
-    fn broadcast<T: Serializable>(&mut self, val: T) {
-        let my_id = self.get_my_id();
-        for party_id in 0..self.get_num_parties() {
-            if party_id == my_id {
-                continue;
-            }
-            self.send(party_id, val.clone());
-        }
-    }
-    /// Send a message of multiple Ts to all parties
-    fn broadcast_slice<T: Serializable>(&mut self, val: &[T]) {
+    fn broadcast<T: Serializable>(&mut self, val: T) -> Result<(), Error> {
         let my_id = self.get_my_id();
         for party_id in 0..self.get_num_parties() {
             if party_id == my_id {
                 continue;
             }
-            self.send_slice(party_id, val);
+            self.send(party_id, val.clone())?;
         }
+        Ok(())
     }
 
     /// Expect to receive message of type T from given party.  Use the returned future to obtain
     /// the message once it has arrived.
-    fn receive<T: Serializable>(&mut self, party_id: usize) -> Self::Fut<T>;
-    /// Expect to receive message of multiple Ts from given party.  Use the returned future to obtain
-    /// the message once it has arrived.
-    fn receive_n<T: Serializable>(&mut self, party_id: usize, n: usize) -> Self::MultiFut<T>;
+    fn receive<T: Serializable>(&mut self, party_id: usize) -> Result<Self::Fut<T>, Error>;
 
     /// Expect to receive message of type T from the next party.  Use the returned future to obtain
     /// the message once it has arrived.
-    fn receive_next<T: Serializable>(&mut self) -> Self::Fut<T> {
+    fn receive_next<T: Serializable>(&mut self) -> Result<Self::Fut<T>, Error> {
         self.receive((self.get_my_id() + 1) % self.get_num_parties())
     }
-    /// Expect to receive message of multiple Ts from the next party.  Use the returned future to obtain
-    /// the message once it has arrived.
-    fn receive_next_n<T: Serializable>(&mut self, n: usize) -> Self::MultiFut<T> {
-        self.receive_n((self.get_my_id() + 1) % self.get_num_parties(), n)
-    }
 
     /// Expect to receive message of type T from the previous party.  Use the returned future to obtain
     /// the message once it has arrived.
-    fn receive_previous<T: Serializable>(&mut self) -> Self::Fut<T> {
+    fn receive_previous<T: Serializable>(&mut self) -> Result<Self::Fut<T>, Error> {
         self.receive((self.get_num_parties() + self.get_my_id() - 1) % self.get_num_parties())
     }
-    /// Expect to receive message of multiple Ts from the previous party.  Use the returned future to obtain
-    /// the message once it has arrived.
-    fn receive_previous_n<T: Serializable>(&mut self, n: usize) -> Self::MultiFut<T> {
-        self.receive_n(
-            (self.get_num_parties() + self.get_my_id() - 1) % self.get_num_parties(),
-            n,
-        )
-    }
 
     /// Shutdown the communication system
     fn shutdown(&mut self);
@@ -124,8 +79,16 @@ pub trait AbstractCommunicator: Clone {
 pub enum Error {
     /// The connection has not been established
     ConnectionSetupError,
+    /// The API was not used correctly
+    LogicError(String),
     /// Some std::io::Error appeared
     IoError(IoError),
+    /// Some std::sync::mpsc::RecvError appeared
+    RecvError(RecvError),
+    /// Some std::sync::mpsc::SendError appeared
+    SendError(String),
+    /// Some bincode::error::DecodeError appeared
+    DecodeError(DecodeError),
     /// Serialization of data failed
     SerializationError(String),
     /// Deserialization of data failed
@@ -138,3 +101,24 @@ impl From<IoError> for Error {
         Error::IoError(e)
     }
 }
+
+/// Enable automatic conversions from std::sync::mpsc::RecvError
+impl From<RecvError> for Error {
+    fn from(e: RecvError) -> Error {
+        Error::RecvError(e)
+    }
+}
+
+/// Enable automatic conversions from std::sync::mpsc::SendError
+impl<T> From<SendError<T>> for Error {
+    fn from(e: SendError<T>) -> Error {
+        Error::SendError(e.to_string())
+    }
+}
+
+/// Enable automatic conversions from bincode::error::DecodeError
+impl From<DecodeError> for Error {
+    fn from(e: DecodeError) -> Error {
+        Error::DecodeError(e)
+    }
+}

+ 12 - 12
communicator/src/tcp.rs

@@ -235,10 +235,10 @@ mod tests {
             .map(|(party_id, mut communicator)| {
                 thread::spawn(move || {
                     if party_id == 0 {
-                        let fut_1 = communicator.receive::<u32>(1);
-                        let fut_2 = communicator.receive::<[u32; 2]>(2);
-                        communicator.send(1, msg_0);
-                        communicator.send(2, msg_0);
+                        let fut_1 = communicator.receive::<u32>(1).unwrap();
+                        let fut_2 = communicator.receive::<[u32; 2]>(2).unwrap();
+                        communicator.send(1, msg_0).unwrap();
+                        communicator.send(2, msg_0).unwrap();
                         let val_1 = fut_1.get();
                         let val_2 = fut_2.get();
                         assert!(val_1.is_ok());
@@ -246,10 +246,10 @@ mod tests {
                         assert_eq!(val_1.unwrap(), msg_1);
                         assert_eq!(val_2.unwrap(), msg_2);
                     } else if party_id == 1 {
-                        let fut_0 = communicator.receive::<u8>(0);
-                        let fut_2 = communicator.receive::<[u32; 2]>(2);
-                        communicator.send(0, msg_1);
-                        communicator.send(2, msg_1);
+                        let fut_0 = communicator.receive::<u8>(0).unwrap();
+                        let fut_2 = communicator.receive::<[u32; 2]>(2).unwrap();
+                        communicator.send(0, msg_1).unwrap();
+                        communicator.send(2, msg_1).unwrap();
                         let val_0 = fut_0.get();
                         let val_2 = fut_2.get();
                         assert!(val_0.is_ok());
@@ -257,10 +257,10 @@ mod tests {
                         assert_eq!(val_0.unwrap(), msg_0);
                         assert_eq!(val_2.unwrap(), msg_2);
                     } else if party_id == 2 {
-                        let fut_0 = communicator.receive::<u8>(0);
-                        let fut_1 = communicator.receive::<u32>(1);
-                        communicator.send(0, msg_2);
-                        communicator.send(1, msg_2);
+                        let fut_0 = communicator.receive::<u8>(0).unwrap();
+                        let fut_1 = communicator.receive::<u32>(1).unwrap();
+                        communicator.send(0, msg_2).unwrap();
+                        communicator.send(1, msg_2).unwrap();
                         let val_0 = fut_0.get();
                         let val_1 = fut_1.get();
                         assert!(val_0.is_ok());

+ 0 - 266
communicator/src/traits.rs

@@ -1,266 +0,0 @@
-use crate::Error;
-use core::array;
-
-/// Allow a type to get serialized into bytes
-pub trait Serializable: Clone + Sized {
-    /// How many bytes are needed?
-    fn bytes_required() -> usize;
-    /// Convert to bytes and store them in a new vector
-    fn to_bytes(&self) -> Vec<u8> {
-        let mut buf = vec![0u8; Self::bytes_required()];
-        self.into_bytes(&mut buf)
-            .expect("does not fail, buffer has the right size");
-        buf
-    }
-    /// Convert to bytes and store them in the given buffer. Fails if the buffer has not the right size.
-    fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error>;
-    /// Convert the bytes in the given buffer into an object. Fails if the buffer has not the right size.
-    fn from_bytes(buf: &[u8]) -> Result<Self, Error>;
-}
-
-/// Convert a slice of a serializable type into a new byte vector.
-pub fn slice_to_bytes<T: Serializable>(slice: &[T]) -> Vec<u8> {
-    slice.iter().flat_map(|x| x.to_bytes()).collect()
-}
-
-/// Convert a slice of a serializable type into bytes and write them into a given buffer.
-pub fn slice_into_bytes<T: Serializable>(slice: &[T], buf: &mut [u8]) -> Result<(), Error> {
-    let bytes_required = slice.len() * T::bytes_required();
-    if !buf.len() == bytes_required {
-        return Err(Error::SerializationError(
-            "supplied buffer has unexpected size".to_owned(),
-        ));
-    }
-    slice
-        .iter()
-        .zip(buf.chunks_exact_mut(T::bytes_required()))
-        .for_each(|(x, c)| {
-            x.into_bytes(c)
-                .expect("does not fail, since chunks have the right size");
-        });
-    Ok(())
-}
-
-/// Convert given buffer of bytes into objects and store them in a given a slice.
-pub fn slice_from_bytes<T: Serializable>(slice: &mut [T], buf: &[u8]) -> Result<(), Error> {
-    let bytes_required = slice.len() * T::bytes_required();
-    if !buf.len() == bytes_required {
-        return Err(Error::DeserializationError(
-            "supplied buffer has unexpected size".to_owned(),
-        ));
-    }
-    for (i, c) in buf.chunks_exact(T::bytes_required()).enumerate() {
-        match T::from_bytes(c) {
-            Ok(v) => slice[i] = v,
-            Err(e) => return Err(e),
-        }
-    }
-    Ok(())
-}
-
-impl<T: Serializable + Default, const N: usize> Serializable for [T; N] {
-    fn bytes_required() -> usize {
-        T::bytes_required() * N
-    }
-
-    fn to_bytes(&self) -> Vec<u8> {
-        slice_to_bytes(self.as_slice())
-    }
-
-    fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error> {
-        slice_into_bytes(self.as_slice(), buf)
-    }
-
-    fn from_bytes(buf: &[u8]) -> Result<Self, Error> {
-        if buf.len() != Self::bytes_required() {
-            return Err(Error::DeserializationError(
-                "supplied buffer has unexpected size".to_owned(),
-            ));
-        }
-
-        let mut output = array::from_fn(|_| Default::default());
-        slice_from_bytes(&mut output, buf)?;
-        Ok(output)
-    }
-}
-
-impl<T: Serializable, U: Serializable> Serializable for (T, U) {
-    fn bytes_required() -> usize {
-        T::bytes_required() + U::bytes_required()
-    }
-
-    fn to_bytes(&self) -> Vec<u8> {
-        let num_t_bytes = T::bytes_required();
-        let num_u_bytes = U::bytes_required();
-        let mut buf = vec![0u8; num_t_bytes + num_u_bytes];
-        self.0.into_bytes(&mut buf[..num_t_bytes]).unwrap();
-        self.1.into_bytes(&mut buf[num_t_bytes..]).unwrap();
-        buf
-    }
-
-    fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error> {
-        let num_t_bytes = T::bytes_required();
-        let num_u_bytes = U::bytes_required();
-        if buf.len() != num_t_bytes + num_u_bytes {
-            return Err(Error::SerializationError(
-                "supplied buffer has unexpected size".to_owned(),
-            ));
-        }
-        self.0.into_bytes(&mut buf[..num_t_bytes]).unwrap();
-        self.1.into_bytes(&mut buf[num_t_bytes..]).unwrap();
-        Ok(())
-    }
-
-    fn from_bytes(buf: &[u8]) -> Result<Self, Error> {
-        let num_t_bytes = T::bytes_required();
-        let num_u_bytes = U::bytes_required();
-        if buf.len() != num_t_bytes + num_u_bytes {
-            return Err(Error::DeserializationError(
-                "supplied buffer has unexpected size".to_owned(),
-            ));
-        }
-        let t = T::from_bytes(&buf[..num_t_bytes])?;
-        let u = U::from_bytes(&buf[num_t_bytes..])?;
-        Ok((t, u))
-    }
-}
-
-macro_rules! impl_serializable_for_uints {
-    ($type:ty) => {
-        impl Serializable for $type {
-            fn bytes_required() -> usize {
-                <$type>::BITS as usize / 8
-            }
-
-            fn to_bytes(&self) -> Vec<u8> {
-                let bytes = self.to_be_bytes();
-                bytes.into()
-            }
-
-            fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error> {
-                if !buf.len() == Self::bytes_required() {
-                    return Err(Error::SerializationError("buffer to small".to_owned()));
-                }
-                buf.copy_from_slice(&self.to_be_bytes());
-                Ok(())
-            }
-
-            fn from_bytes(buf: &[u8]) -> Result<Self, Error> {
-                // assert_eq!(buf.len(), Self::bytes_required());
-                match buf.try_into().map(Self::from_be_bytes) {
-                    Ok(v) => Ok(v),
-                    Err(_) => Err(Error::DeserializationError(
-                        "supplied buffer has unexpected size".to_owned(),
-                    )),
-                }
-            }
-        }
-    };
-}
-
-impl_serializable_for_uints!(u8);
-impl_serializable_for_uints!(u16);
-impl_serializable_for_uints!(u32);
-impl_serializable_for_uints!(u64);
-impl_serializable_for_uints!(u128);
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use rand::{thread_rng, Rng};
-
-    macro_rules! make_test_serialiable_for_uints {
-        ($test_name:ident, $type:ty) => {
-            #[test]
-            fn $test_name() {
-                type T = $type;
-                assert_eq!(T::bytes_required(), T::BITS as usize / 8);
-                for _ in 0..100 {
-                    let val: T = thread_rng().gen();
-                    let expected_bytes = val.to_be_bytes();
-                    assert_eq!(val.to_bytes(), expected_bytes);
-                    let mut buf = vec![0u8; T::bytes_required()];
-                    val.into_bytes(&mut buf).unwrap();
-                    assert_eq!(buf, expected_bytes);
-                    let new_val = T::from_bytes(&val.to_bytes());
-                    assert!(new_val.is_ok());
-                    assert_eq!(new_val.unwrap(), val);
-                }
-            }
-        };
-    }
-
-    make_test_serialiable_for_uints!(test_serialize_u8, u8);
-    make_test_serialiable_for_uints!(test_serialize_u16, u16);
-    make_test_serialiable_for_uints!(test_serialize_u32, u32);
-    make_test_serialiable_for_uints!(test_serialize_u64, u64);
-    make_test_serialiable_for_uints!(test_serialize_u128, u128);
-
-    macro_rules! make_test_serialiable_for_uint_arrays {
-        ($test_name:ident, $type:ty, $len:expr) => {
-            #[test]
-            fn $test_name() {
-                type T = $type;
-                type A = [T; $len];
-                assert_eq!(A::bytes_required(), T::BITS as usize / 8 * $len);
-                for _ in 0..100 {
-                    let val: A = array::from_fn(|_| thread_rng().gen());
-                    let serialized = val.to_bytes();
-                    let mut serialized2 = vec![0u8; A::bytes_required()];
-                    val.into_bytes(&mut serialized2).unwrap();
-                    assert_eq!(serialized.len(), A::bytes_required());
-                    for i in 0..$len {
-                        let expected_bytes = val[i].to_be_bytes();
-                        assert_eq!(
-                            serialized[i * T::bytes_required()..(i + 1) * T::bytes_required()],
-                            expected_bytes
-                        );
-                    }
-                    let new_val = <A>::from_bytes(&val.to_bytes());
-                    assert!(new_val.is_ok());
-                    assert_eq!(new_val.unwrap(), val);
-                }
-            }
-        };
-    }
-
-    make_test_serialiable_for_uint_arrays!(test_serialize_u8_array, u8, 42);
-    make_test_serialiable_for_uint_arrays!(test_serialize_u16_array, u16, 42);
-    make_test_serialiable_for_uint_arrays!(test_serialize_u32_array, u32, 42);
-    make_test_serialiable_for_uint_arrays!(test_serialize_u64_array, u64, 42);
-    make_test_serialiable_for_uint_arrays!(test_serialize_u128_array, u128, 42);
-
-    macro_rules! make_test_serialiable_for_pairs {
-        ($test_name:ident, $type_t:ty, $type_u:ty) => {
-            #[test]
-            fn $test_name() {
-                type T = $type_t;
-                type U = $type_u;
-                type P = (T, U);
-                assert_eq!(
-                    P::bytes_required(),
-                    T::bytes_required() + U::bytes_required()
-                );
-                for _ in 0..100 {
-                    let val: P = thread_rng().gen();
-                    let serialized = val.to_bytes();
-                    let mut serialized2 = vec![0u8; P::bytes_required()];
-                    val.into_bytes(&mut serialized2).unwrap();
-                    assert_eq!(serialized.len(), P::bytes_required());
-                    let new_val = <P>::from_bytes(&val.to_bytes());
-                    assert!(new_val.is_ok());
-                    assert_eq!(new_val.unwrap(), val);
-                }
-            }
-        };
-    }
-
-    make_test_serialiable_for_pairs!(test_serialize_pair_u8_u32, u8, u32);
-    make_test_serialiable_for_pairs!(test_serialize_pair_u8array_u32, [u8; 13], u32);
-    make_test_serialiable_for_pairs!(test_serialize_pair_u128array_u16array, u128, [u16; 7]);
-    make_test_serialiable_for_pairs!(
-        test_serialize_pair_nested_uints,
-        u8,
-        (u16, (u32, (u64, u128)))
-    );
-}

+ 12 - 12
communicator/src/unix.rs

@@ -46,10 +46,10 @@ mod tests {
             .map(|(party_id, mut communicator)| {
                 thread::spawn(move || {
                     if party_id == 0 {
-                        let fut_1 = communicator.receive::<u32>(1);
-                        let fut_2 = communicator.receive::<[u32; 2]>(2);
-                        communicator.send(1, msg_0);
-                        communicator.send(2, msg_0);
+                        let fut_1 = communicator.receive::<u32>(1).unwrap();
+                        let fut_2 = communicator.receive::<[u32; 2]>(2).unwrap();
+                        communicator.send(1, msg_0).unwrap();
+                        communicator.send(2, msg_0).unwrap();
                         let val_1 = fut_1.get();
                         let val_2 = fut_2.get();
                         assert!(val_1.is_ok());
@@ -57,10 +57,10 @@ mod tests {
                         assert_eq!(val_1.unwrap(), msg_1);
                         assert_eq!(val_2.unwrap(), msg_2);
                     } else if party_id == 1 {
-                        let fut_0 = communicator.receive::<u8>(0);
-                        let fut_2 = communicator.receive::<[u32; 2]>(2);
-                        communicator.send(0, msg_1);
-                        communicator.send(2, msg_1);
+                        let fut_0 = communicator.receive::<u8>(0).unwrap();
+                        let fut_2 = communicator.receive::<[u32; 2]>(2).unwrap();
+                        communicator.send(0, msg_1).unwrap();
+                        communicator.send(2, msg_1).unwrap();
                         let val_0 = fut_0.get();
                         let val_2 = fut_2.get();
                         assert!(val_0.is_ok());
@@ -68,10 +68,10 @@ mod tests {
                         assert_eq!(val_0.unwrap(), msg_0);
                         assert_eq!(val_2.unwrap(), msg_2);
                     } else if party_id == 2 {
-                        let fut_0 = communicator.receive::<u8>(0);
-                        let fut_1 = communicator.receive::<u32>(1);
-                        communicator.send(0, msg_2);
-                        communicator.send(1, msg_2);
+                        let fut_0 = communicator.receive::<u8>(0).unwrap();
+                        let fut_1 = communicator.receive::<u32>(1).unwrap();
+                        communicator.send(0, msg_2).unwrap();
+                        communicator.send(1, msg_2).unwrap();
                         let val_0 = fut_0.get();
                         let val_1 = fut_1.get();
                         assert!(val_0.is_ok());