Browse Source

communicator: add initial version

Lennart Braun 2 years ago
parent
commit
a31a0fe9de

+ 1 - 0
Cargo.toml

@@ -1,6 +1,7 @@
 [workspace]
 
 members = [
+    "communicator",
     "cuckoo",
     "dpf",
     "oram",

+ 12 - 0
communicator/Cargo.toml

@@ -0,0 +1,12 @@
+[package]
+name = "communicator"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+funty = "2.0.0"
+
+[dev-dependencies]
+rand = "0.8.5"

+ 163 - 0
communicator/src/communicator.rs

@@ -0,0 +1,163 @@
+use crate::fut::{BytesFut, MyFut, MyMultiFut};
+use crate::AbstractCommunicator;
+use crate::Serializable;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::{Read, Write};
+use std::sync::mpsc::{channel, sync_channel, Sender, SyncSender};
+use std::thread;
+
+/// Thread to receive messages in the background.
+#[derive(Clone, Debug)]
+struct ReceiverThread {
+    data_request_tx: Sender<(usize, SyncSender<Vec<u8>>)>,
+}
+
+impl ReceiverThread {
+    pub fn from_reader<R: Debug + Read + Send + 'static>(mut reader: R) -> Self {
+        let (data_request_tx, data_request_rx) = channel::<(usize, SyncSender<Vec<u8>>)>();
+        let _join_handle = thread::spawn(move || {
+            for (size, sender) in data_request_rx.iter() {
+                let mut buf = vec![0u8; size];
+                reader.read_exact(&mut buf).expect("read failed");
+                sender.send(buf).expect("send failed");
+            }
+        });
+
+        Self { data_request_tx }
+    }
+
+    pub fn receive_bytes(&mut self, size: usize) -> BytesFut {
+        let (data_tx, data_rx) = sync_channel(1);
+        self.data_request_tx
+            .send((size, data_tx))
+            .expect("send failed");
+        BytesFut { size, data_rx }
+    }
+}
+
+/// Thread to send messages in the background.
+#[derive(Clone, Debug)]
+struct SenderThread {
+    data_tx: Sender<Vec<u8>>,
+}
+
+impl SenderThread {
+    pub fn from_writer<W: Debug + Write + Send + 'static>(mut writer: W) -> Self {
+        let (data_tx, data_rx) = channel::<Vec<u8>>();
+        let _join_handle = thread::spawn(move || {
+            for buf in data_rx.iter() {
+                writer.write_all(&buf).expect("write failed");
+                writer.flush().expect("flush failed");
+            }
+            writer.flush().expect("flush failed");
+        });
+
+        Self { data_tx }
+    }
+
+    pub fn send_bytes(&mut self, buf: Vec<u8>) {
+        self.data_tx.send(buf).expect("send failed");
+    }
+}
+
+/// Communicator that uses background threads to send and receive messages.
+#[derive(Clone, Debug)]
+pub struct Communicator {
+    num_parties: usize,
+    my_id: usize,
+    receiver_threads: HashMap<usize, ReceiverThread>,
+    sender_threads: HashMap<usize, SenderThread>,
+}
+
+impl Communicator {
+    /// Create a new Communicator from a collection of readers and writers that are connected with
+    /// the other parties.
+    pub fn from_reader_writer<
+        R: Read + Send + Debug + 'static,
+        W: Send + Write + Debug + 'static,
+    >(
+        num_parties: usize,
+        my_id: usize,
+        mut rw_map: HashMap<usize, (R, W)>,
+    ) -> Self {
+        assert_eq!(rw_map.len(), num_parties - 1);
+        assert!((0..num_parties)
+            .filter(|&pid| pid != my_id)
+            .all(|pid| rw_map.contains_key(&pid)));
+
+        let mut receiver_threads = HashMap::with_capacity(num_parties - 1);
+        let mut sender_threads = HashMap::with_capacity(num_parties - 1);
+
+        for pid in 0..num_parties {
+            if pid == my_id {
+                continue;
+            }
+            let (reader, writer) = rw_map.remove(&pid).unwrap();
+            receiver_threads.insert(pid, ReceiverThread::from_reader(reader));
+            sender_threads.insert(pid, SenderThread::from_writer(writer));
+        }
+
+        Self {
+            num_parties,
+            my_id,
+            receiver_threads,
+            sender_threads,
+        }
+    }
+}
+
+impl AbstractCommunicator for Communicator {
+    type Fut<T: Serializable> = MyFut<T>;
+    type MultiFut<T: Serializable> = MyMultiFut<T>;
+
+    fn get_num_parties(&self) -> usize {
+        self.num_parties
+    }
+
+    fn get_my_id(&self) -> usize {
+        self.my_id
+    }
+
+    fn send<T: Serializable>(&mut self, party_id: usize, val: T) {
+        self.sender_threads
+            .get_mut(&party_id)
+            .expect(&format!("SenderThread for party {} not found", party_id))
+            .send_bytes(val.to_bytes())
+    }
+
+    fn send_slice<T: Serializable>(&mut self, party_id: usize, val: &[T]) {
+        let mut bytes = vec![0u8; val.len() * T::bytes_required()];
+        for (i, v) in val.iter().enumerate() {
+            bytes[i * T::bytes_required()..(i + 1) * T::bytes_required()]
+                .copy_from_slice(&v.to_bytes());
+        }
+        self.sender_threads
+            .get_mut(&party_id)
+            .expect(&format!("SenderThread for party {} not found", party_id))
+            .send_bytes(bytes);
+    }
+
+    fn receive<T: Serializable>(&mut self, party_id: usize) -> Self::Fut<T> {
+        let bytes_fut = self
+            .receiver_threads
+            .get_mut(&party_id)
+            .expect(&format!("ReceiverThread for party {} not found", party_id))
+            .receive_bytes(T::bytes_required());
+        MyFut::new(bytes_fut)
+    }
+
+    fn receive_n<T: Serializable>(&mut self, party_id: usize, n: usize) -> Self::MultiFut<T> {
+        let bytes_fut = self
+            .receiver_threads
+            .get_mut(&party_id)
+            .expect(&format!("ReceiverThread for party {} not found", party_id))
+            .receive_bytes(n * T::bytes_required());
+        MyMultiFut::new(n, bytes_fut)
+    }
+
+    fn shutdown(&mut self) {
+        self.sender_threads.drain();
+        self.receiver_threads.drain();
+    }
+}

+ 91 - 0
communicator/src/fut.rs

@@ -0,0 +1,91 @@
+use crate::{Error, Fut, MultiFut, Serializable};
+use std::marker::PhantomData;
+use std::sync::mpsc::Receiver;
+
+pub struct BytesFut {
+    pub size: usize,
+    pub data_rx: Receiver<Vec<u8>>,
+}
+
+impl BytesFut {
+    pub fn get(self) -> Vec<u8> {
+        let buf = self.data_rx.recv().expect("receive failed");
+        assert_eq!(buf.len(), self.size);
+        buf
+    }
+}
+
+pub struct MyFut<T: Serializable> {
+    bytes_fut: BytesFut,
+    _phantom: PhantomData<T>,
+}
+
+impl<T: Serializable> MyFut<T> {
+    pub fn new(bytes_fut: BytesFut) -> Self {
+        Self {
+            bytes_fut,
+            _phantom: PhantomData,
+        }
+    }
+}
+
+impl<T: Serializable> Fut<T> for MyFut<T> {
+    fn get(self) -> Result<T, Error> {
+        T::from_bytes(&self.bytes_fut.get())
+    }
+}
+
+pub struct MyMultiFut<T: Serializable> {
+    size: usize,
+    bytes_fut: BytesFut,
+    _phantom: PhantomData<T>,
+}
+
+impl<T: Serializable> MyMultiFut<T> {
+    pub fn new(size: usize, bytes_fut: BytesFut) -> Self {
+        Self {
+            size,
+            bytes_fut,
+            _phantom: PhantomData,
+        }
+    }
+}
+
+impl<T: Serializable> MultiFut<T> for MyMultiFut<T> {
+    fn len(&self) -> usize {
+        self.size
+    }
+
+    fn get(self) -> Result<Vec<T>, Error> {
+        let data_buf = self.bytes_fut.get();
+        if data_buf.len() != self.size * T::bytes_required() {
+            return Err(Error::DeserializationError(
+                "received buffer of unexpected size".to_owned(),
+            ));
+        }
+        let mut output = Vec::with_capacity(self.size);
+        for c in data_buf.chunks_exact(T::bytes_required()) {
+            match T::from_bytes(c) {
+                Ok(v) => output.push(v),
+                Err(e) => return Err(e),
+            }
+        }
+        Ok(output)
+    }
+
+    fn get_into(self, buf: &mut [T]) -> Result<(), Error> {
+        if buf.len() != self.size {
+            return Err(Error::DeserializationError(
+                "supplied buffer has unexpected size".to_owned(),
+            ));
+        }
+        let data_buf = self.bytes_fut.get();
+        if data_buf.len() != self.size * T::bytes_required() {
+            return Err(Error::DeserializationError(
+                "received buffer of unexpected size".to_owned(),
+            ));
+        }
+
+        Ok(())
+    }
+}

+ 91 - 0
communicator/src/lib.rs

@@ -0,0 +1,91 @@
+pub mod communicator;
+mod fut;
+pub mod tcp;
+pub mod traits;
+pub mod unix;
+
+use crate::traits::Serializable;
+use std::io::Error as IoError;
+
+/// Represent data of type T that we expect to receive
+pub trait Fut<T> {
+    /// Wait until the data has arrived and obtain it.
+    fn get(self) -> Result<T, Error>;
+}
+
+/// Represent data consisting of multiple Ts that we expect to receive
+pub trait MultiFut<T> {
+    /// How many items of type T we expect.
+    fn len(&self) -> usize;
+    /// Wait until the data has arrived and obtain it.
+    fn get(self) -> Result<Vec<T>, Error>;
+    /// Wait until the data has arrived and write it into the provided buffer.
+    fn get_into(self, buf: &mut [T]) -> Result<(), Error>;
+}
+
+/// Abstract communication interface between multiple parties
+pub trait AbstractCommunicator: Clone {
+    type Fut<T: Serializable>: Fut<T>;
+    type MultiFut<T: Serializable>: MultiFut<T>;
+
+    /// How many parties N there are in total
+    fn get_num_parties(&self) -> usize;
+    /// My party id in [0, N)
+    fn get_my_id(&self) -> usize;
+
+    /// Send a message of type T to given party
+    fn send<T: Serializable>(&mut self, party_id: usize, val: T);
+    /// Send a message of multiple Ts to given party
+    fn send_slice<T: Serializable>(&mut self, party_id: usize, val: &[T]);
+
+    /// Send a message of type T all parties
+    fn broadcast<T: Serializable>(&mut self, val: T) {
+        let my_id = self.get_my_id();
+        for party_id in 0..self.get_num_parties() {
+            if party_id == my_id {
+                continue;
+            }
+            self.send(party_id, val.clone());
+        }
+    }
+    /// Send a message of multiple Ts to all parties
+    fn broadcast_slice<T: Serializable>(&mut self, val: &[T]) {
+        let my_id = self.get_my_id();
+        for party_id in 0..self.get_num_parties() {
+            if party_id == my_id {
+                continue;
+            }
+            self.send_slice(party_id, val);
+        }
+    }
+
+    /// Expect to receive message of type T from given party.  Use the returned future to obtain
+    /// the message once it has arrived.
+    fn receive<T: Serializable>(&mut self, party_id: usize) -> Self::Fut<T>;
+    /// Expect to receive message of multiple Ts from given party.  Use the returned future to obtain
+    /// the message once it has arrived.
+    fn receive_n<T: Serializable>(&mut self, party_id: usize, n: usize) -> Self::MultiFut<T>;
+
+    /// Shutdown the communication system
+    fn shutdown(&mut self);
+}
+
+/// Custom error type
+#[derive(Debug)]
+pub enum Error {
+    /// The connection has not been established
+    ConnectionSetupError,
+    /// Some std::io::Error appeared
+    IoError(IoError),
+    /// Serialization of data failed
+    SerializationError(String),
+    /// Deserialization of data failed
+    DeserializationError(String),
+}
+
+/// Enable automatic conversions from std::io::Error
+impl From<IoError> for Error {
+    fn from(e: IoError) -> Error {
+        Error::IoError(e)
+    }
+}

+ 278 - 0
communicator/src/tcp.rs

@@ -0,0 +1,278 @@
+use crate::communicator::Communicator;
+use crate::{AbstractCommunicator, Error};
+use std::collections::{HashMap, HashSet};
+use std::io::{Read, Write};
+use std::net::{TcpListener, TcpStream};
+use std::thread;
+use std::time::Duration;
+
+/// Network connection options for a single party: Either we listen for an incoming connection, or
+/// we connect to a given host and port.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum NetworkPartyInfo {
+    Listen,
+    Connect(String, u16),
+}
+
+/// Network connection options
+#[derive(Debug, Clone)]
+pub struct NetworkOptions {
+    /// Which address to listen on for incoming connections
+    pub listen_host: String,
+    /// Which port to listen on for incoming connections
+    pub listen_port: u16,
+    /// Connection info for each party
+    pub connect_info: Vec<NetworkPartyInfo>,
+    /// How long to try connecting before aborting
+    pub connect_timeout_seconds: usize,
+}
+
+fn tcp_connect(
+    my_id: usize,
+    other_id: usize,
+    host: &str,
+    port: u16,
+    timeout_seconds: usize,
+) -> Result<TcpStream, Error> {
+    // repeatedly try to connect
+    fn connect_socket(host: &str, port: u16, timeout_seconds: usize) -> Result<TcpStream, Error> {
+        // try every 100ms
+        for _ in 0..(10 * timeout_seconds) {
+            match TcpStream::connect((host, port)) {
+                Ok(socket) => return Ok(socket),
+                Err(_) => (),
+            }
+            thread::sleep(Duration::from_millis(100));
+        }
+        match TcpStream::connect((host, port)) {
+            Ok(socket) => return Ok(socket),
+            Err(e) => Err(Error::IoError(e)),
+        }
+    }
+    // connect to the other party
+    let mut stream = connect_socket(host, port, timeout_seconds)?;
+    {
+        // send our party id
+        stream.write(&(my_id as u32).to_be_bytes())?;
+        // check that we talk to the right party
+        let mut other_id_bytes = [0u8; 4];
+        stream.read_exact(&mut other_id_bytes)?;
+        if u32::from_be_bytes(other_id_bytes) != other_id as u32 {
+            return Err(Error::ConnectionSetupError);
+        }
+    }
+    Ok(stream)
+}
+
+fn tcp_accept_connections(
+    my_id: usize,
+    options: &NetworkOptions,
+) -> Result<HashMap<usize, TcpStream>, Error> {
+    // prepare function output
+    let mut output = HashMap::<usize, TcpStream>::new();
+    // compute set of parties that should connect to us
+    let mut expected_parties: HashSet<usize> = options
+        .connect_info
+        .iter()
+        .enumerate()
+        .filter_map(|(party_id, npi)| {
+            if party_id != my_id && *npi == NetworkPartyInfo::Listen {
+                Some(party_id)
+            } else {
+                None
+            }
+        })
+        .collect();
+    // if nobody should connect to us, we are done
+    if expected_parties.len() == 0 {
+        return Ok(output);
+    }
+    // create a listender and iterate over incoming connections
+    let listener = TcpListener::bind((options.listen_host.clone(), options.listen_port))?;
+    for mut stream in listener.incoming().filter_map(Result::ok) {
+        // see which party has connected
+        let mut other_id_bytes = [0u8; 4];
+        if stream.read_exact(&mut other_id_bytes).is_err() {
+            continue;
+        }
+        let other_id = u32::from_be_bytes(other_id_bytes) as usize;
+        // check if we expect this party
+        if !expected_parties.contains(&other_id) {
+            continue;
+        }
+        // respond with our party id
+        if stream.write_all(&(my_id as u32).to_be_bytes()).is_err() {
+            continue;
+        }
+        // connection has been established
+        expected_parties.remove(&other_id);
+        output.insert(other_id, stream);
+        // check if we have received connections from every party
+        if expected_parties.len() == 0 {
+            break;
+        }
+    }
+    if expected_parties.len() > 0 {
+        Err(Error::ConnectionSetupError)
+    } else {
+        Ok(output)
+    }
+}
+
+/// Setup TCP connections
+pub fn setup_connection(
+    num_parties: usize,
+    my_id: usize,
+    options: &NetworkOptions,
+) -> Result<HashMap<usize, TcpStream>, Error> {
+    // make a copy of the options to pass it into the new thread
+    let options_cpy: NetworkOptions = (*options).clone();
+
+    // spawn thread to listen for incoming connections
+    let listen_thread_handle = thread::spawn(move || tcp_accept_connections(my_id, &options_cpy));
+
+    // prepare the map of connection we will return
+    let mut output = HashMap::with_capacity(num_parties - 1);
+
+    // connect to all parties that we are supposed to connect to
+    for (party_id, info) in options.connect_info.iter().enumerate() {
+        if party_id == my_id {
+            continue;
+        }
+        match info {
+            NetworkPartyInfo::Listen => {}
+            NetworkPartyInfo::Connect(host, port) => {
+                output.insert(
+                    party_id,
+                    tcp_connect(
+                        my_id,
+                        party_id,
+                        host,
+                        *port,
+                        options.connect_timeout_seconds,
+                    )?,
+                );
+            }
+        }
+    }
+
+    // join the listen thread and obtain the connections that reached us
+    let accepted_connections = match listen_thread_handle.join() {
+        Ok(accepted_connections) => accepted_connections,
+        Err(_) => return Err(Error::ConnectionSetupError),
+    }?;
+
+    // return the union of both maps
+    output.extend(accepted_connections);
+    Ok(output)
+}
+
+/// Create communicator using TCP connections
+pub fn make_tcp_communicator(
+    num_parties: usize,
+    my_id: usize,
+    options: &NetworkOptions,
+) -> Result<impl AbstractCommunicator, Error> {
+    // create connections with other parties
+    let stream_map = setup_connection(num_parties, my_id, options)?;
+    // use streams as reader/writer pairs
+    let rw_map = stream_map
+        .into_iter()
+        .map(|(party_id, stream)| (party_id, (stream.try_clone().unwrap(), stream)))
+        .collect();
+    // create new communicator
+    Ok(Communicator::from_reader_writer(num_parties, my_id, rw_map))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Fut;
+    use std::thread;
+
+    #[test]
+    fn test_tcp_communicators() {
+        let num_parties = 3;
+        let msg_0: u8 = 42;
+        let msg_1: u32 = 0x_dead_beef;
+        let msg_2: [u32; 2] = [0x_1333_3337, 0x_c0ff_ffee];
+
+        let ports: [u16; 3] = [20_000, 20_001, 20_002];
+
+        let opts: Vec<_> = (0..num_parties)
+            .map(|party_id| NetworkOptions {
+                listen_host: "localhost".to_owned(),
+                listen_port: ports[party_id],
+                connect_info: (0..num_parties)
+                    .map(|other_id| {
+                        if other_id < party_id {
+                            NetworkPartyInfo::Connect("localhost".to_owned(), ports[other_id])
+                        } else {
+                            NetworkPartyInfo::Listen
+                        }
+                    })
+                    .collect(),
+                connect_timeout_seconds: 3,
+            })
+            .collect();
+
+        let communicators: Vec<_> = opts
+            .iter()
+            .enumerate()
+            .map(|(party_id, opts)| {
+                let opts_cpy = (*opts).clone();
+                thread::spawn(move || make_tcp_communicator(num_parties, party_id, &opts_cpy))
+            })
+            .collect();
+        let communicators: Vec<_> = communicators
+            .into_iter()
+            .map(|h| h.join().unwrap().unwrap())
+            .collect();
+
+        let thread_handles: Vec<_> = communicators
+            .into_iter()
+            .enumerate()
+            .map(|(party_id, mut communicator)| {
+                thread::spawn(move || {
+                    if party_id == 0 {
+                        let fut_1 = communicator.receive::<u32>(1);
+                        let fut_2 = communicator.receive::<[u32; 2]>(2);
+                        communicator.send(1, msg_0);
+                        communicator.send(2, msg_0);
+                        let val_1 = fut_1.get();
+                        let val_2 = fut_2.get();
+                        assert!(val_1.is_ok());
+                        assert!(val_2.is_ok());
+                        assert_eq!(val_1.unwrap(), msg_1);
+                        assert_eq!(val_2.unwrap(), msg_2);
+                    } else if party_id == 1 {
+                        let fut_0 = communicator.receive::<u8>(0);
+                        let fut_2 = communicator.receive::<[u32; 2]>(2);
+                        communicator.send(0, msg_1);
+                        communicator.send(2, msg_1);
+                        let val_0 = fut_0.get();
+                        let val_2 = fut_2.get();
+                        assert!(val_0.is_ok());
+                        assert!(val_2.is_ok());
+                        assert_eq!(val_0.unwrap(), msg_0);
+                        assert_eq!(val_2.unwrap(), msg_2);
+                    } else if party_id == 2 {
+                        let fut_0 = communicator.receive::<u8>(0);
+                        let fut_1 = communicator.receive::<u32>(1);
+                        communicator.send(0, msg_2);
+                        communicator.send(1, msg_2);
+                        let val_0 = fut_0.get();
+                        let val_1 = fut_1.get();
+                        assert!(val_0.is_ok());
+                        assert!(val_1.is_ok());
+                        assert_eq!(val_0.unwrap(), msg_0);
+                        assert_eq!(val_1.unwrap(), msg_1);
+                    }
+                    communicator.shutdown();
+                })
+            })
+            .collect();
+
+        thread_handles.into_iter().for_each(|h| h.join().unwrap());
+    }
+}

+ 187 - 0
communicator/src/traits.rs

@@ -0,0 +1,187 @@
+use crate::Error;
+use core::array;
+
+/// Allow a type to get serialized into bytes
+pub trait Serializable: Clone + Sized {
+    /// How many bytes are needed?
+    fn bytes_required() -> usize;
+    /// Convert to bytes and store them in a new vector
+    fn to_bytes(&self) -> Vec<u8> {
+        let mut buf = vec![0u8; Self::bytes_required()];
+        self.into_bytes(&mut buf)
+            .expect("does not fail, buffer has the right size");
+        buf
+    }
+    /// Convert to bytes and store them in the given buffer. Fails if the buffer has not the right size.
+    fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error>;
+    /// Convert the bytes in the given buffer into an object. Fails if the buffer has not the right size.
+    fn from_bytes(buf: &[u8]) -> Result<Self, Error>;
+}
+
+/// Convert a slice of a serializable type into a new byte vector.
+pub fn slice_to_bytes<T: Serializable>(slice: &[T]) -> Vec<u8> {
+    slice.iter().flat_map(|x| x.to_bytes()).collect()
+}
+
+/// Convert a slice of a serializable type into bytes and write them into a given buffer.
+pub fn slice_into_bytes<T: Serializable>(slice: &[T], buf: &mut [u8]) -> Result<(), Error> {
+    let bytes_required = slice.len() * T::bytes_required();
+    if !buf.len() == bytes_required {
+        return Err(Error::SerializationError(
+            "supplied buffer has unexpected size".to_owned(),
+        ));
+    }
+    slice
+        .iter()
+        .zip(buf.chunks_exact_mut(T::bytes_required()))
+        .for_each(|(x, c)| {
+            x.into_bytes(c)
+                .expect("does not fail, since chunks have the right size");
+        });
+    Ok(())
+}
+
+/// Convert given buffer of bytes into objects and store them in a given a slice.
+pub fn slice_from_bytes<T: Serializable>(slice: &mut [T], buf: &[u8]) -> Result<(), Error> {
+    let bytes_required = slice.len() * T::bytes_required();
+    if !buf.len() == bytes_required {
+        return Err(Error::DeserializationError(
+            "supplied buffer has unexpected size".to_owned(),
+        ));
+    }
+    for (i, c) in buf.chunks_exact(T::bytes_required()).enumerate() {
+        match T::from_bytes(c) {
+            Ok(v) => slice[i] = v,
+            Err(e) => return Err(e),
+        }
+    }
+    Ok(())
+}
+
+impl<T: Serializable + Default, const N: usize> Serializable for [T; N] {
+    fn bytes_required() -> usize {
+        T::bytes_required() * N
+    }
+    fn to_bytes(&self) -> Vec<u8> {
+        slice_to_bytes(self.as_slice())
+    }
+    fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error> {
+        slice_into_bytes(self.as_slice(), buf)
+    }
+    fn from_bytes(buf: &[u8]) -> Result<Self, Error> {
+        if buf.len() != Self::bytes_required() {
+            return Err(Error::DeserializationError(
+                "supplied buffer has unexpected size".to_owned(),
+            ));
+        }
+
+        let mut output = array::from_fn(|_| Default::default());
+        slice_from_bytes(&mut output, buf)?;
+        Ok(output)
+    }
+}
+
+macro_rules! impl_serializable_for_uints {
+    ($type:ty) => {
+        impl Serializable for $type {
+            fn bytes_required() -> usize {
+                <$type>::BITS as usize / 8
+            }
+
+            fn to_bytes(&self) -> Vec<u8> {
+                let bytes = self.to_be_bytes();
+                bytes.into()
+            }
+
+            fn into_bytes(&self, buf: &mut [u8]) -> Result<(), Error> {
+                if !buf.len() == Self::bytes_required() {
+                    return Err(Error::SerializationError("buffer to small".to_owned()));
+                }
+                buf.copy_from_slice(&self.to_be_bytes());
+                Ok(())
+            }
+
+            fn from_bytes(buf: &[u8]) -> Result<Self, Error> {
+                // assert_eq!(buf.len(), Self::bytes_required());
+                match buf.try_into().map(Self::from_be_bytes) {
+                    Ok(v) => Ok(v),
+                    Err(_) => Err(Error::DeserializationError(
+                        "supplied buffer has unexpected size".to_owned(),
+                    )),
+                }
+            }
+        }
+    };
+}
+
+impl_serializable_for_uints!(u8);
+impl_serializable_for_uints!(u16);
+impl_serializable_for_uints!(u32);
+impl_serializable_for_uints!(u64);
+impl_serializable_for_uints!(u128);
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use rand::{thread_rng, Rng};
+
+    macro_rules! make_test_serialiable_for_uints {
+        ($test_name:ident, $type:ty) => {
+            #[test]
+            fn $test_name() {
+                type T = $type;
+                assert_eq!(T::bytes_required(), T::BITS as usize / 8);
+                for _ in 0..100 {
+                    let val: T = thread_rng().gen();
+                    let expected_bytes = val.to_be_bytes();
+                    assert_eq!(val.to_bytes(), expected_bytes);
+                    let mut buf = vec![0u8; T::bytes_required()];
+                    val.into_bytes(&mut buf).unwrap();
+                    assert_eq!(buf, expected_bytes);
+                    let new_val = T::from_bytes(&val.to_bytes());
+                    assert!(new_val.is_ok());
+                    assert_eq!(new_val.unwrap(), val);
+                }
+            }
+        };
+    }
+
+    make_test_serialiable_for_uints!(test_serialize_u8, u8);
+    make_test_serialiable_for_uints!(test_serialize_u16, u16);
+    make_test_serialiable_for_uints!(test_serialize_u32, u32);
+    make_test_serialiable_for_uints!(test_serialize_u64, u64);
+    make_test_serialiable_for_uints!(test_serialize_u128, u128);
+
+    macro_rules! make_test_serialiable_for_uint_arrays {
+        ($test_name:ident, $type:ty, $len:expr) => {
+            #[test]
+            fn $test_name() {
+                type T = $type;
+                type A = [T; $len];
+                assert_eq!(A::bytes_required(), T::BITS as usize / 8 * $len);
+                for _ in 0..100 {
+                    let val: A = array::from_fn(|_| thread_rng().gen());
+                    let serialized = val.to_bytes();
+                    let mut serialized2 = vec![0u8; A::bytes_required()];
+                    val.into_bytes(&mut serialized2).unwrap();
+                    assert_eq!(serialized.len(), A::bytes_required());
+                    for i in 0..$len {
+                        let expected_bytes = val[i].to_be_bytes();
+                        assert_eq!(
+                            serialized[i * T::bytes_required()..(i + 1) * T::bytes_required()],
+                            expected_bytes
+                        );
+                    }
+                    let new_val = <A>::from_bytes(&val.to_bytes());
+                    assert!(new_val.is_ok());
+                    assert_eq!(new_val.unwrap(), val);
+                }
+            }
+        };
+    }
+    make_test_serialiable_for_uint_arrays!(test_serialize_u8_array, u8, 42);
+    make_test_serialiable_for_uint_arrays!(test_serialize_u16_array, u16, 42);
+    make_test_serialiable_for_uint_arrays!(test_serialize_u32_array, u32, 42);
+    make_test_serialiable_for_uint_arrays!(test_serialize_u64_array, u64, 42);
+    make_test_serialiable_for_uint_arrays!(test_serialize_u128_array, u128, 42);
+}

+ 89 - 0
communicator/src/unix.rs

@@ -0,0 +1,89 @@
+use crate::communicator::Communicator;
+use crate::AbstractCommunicator;
+use std::collections::HashMap;
+use std::os::unix::net::UnixStream;
+
+/// Create a set of connected Communicators that are based on local Unix sockets
+pub fn make_unix_communicators(num_parties: usize) -> Vec<impl AbstractCommunicator> {
+    // prepare maps for each parties to store readers and writers to every other party
+    let mut rw_maps: Vec<_> = (0..num_parties)
+        .map(|_| HashMap::with_capacity(num_parties - 1))
+        .collect();
+    // create pairs of unix sockets connecting each pair of parties
+    for party_i in 0..num_parties {
+        for party_j in 0..party_i {
+            let (stream_i_to_j, stream_j_to_i) = UnixStream::pair().unwrap();
+            rw_maps[party_i].insert(party_j, (stream_i_to_j.try_clone().unwrap(), stream_i_to_j));
+            rw_maps[party_j].insert(party_i, (stream_j_to_i.try_clone().unwrap(), stream_j_to_i));
+        }
+    }
+    // create communicators from the reader/writer maps
+    rw_maps
+        .into_iter()
+        .enumerate()
+        .map(|(party_id, rw_map)| Communicator::from_reader_writer(num_parties, party_id, rw_map))
+        .collect()
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::Fut;
+    use std::thread;
+
+    #[test]
+    fn test_unix_communicators() {
+        let num_parties = 3;
+        let msg_0: u8 = 42;
+        let msg_1: u32 = 0x_dead_beef;
+        let msg_2: [u32; 2] = [0x_1333_3337, 0x_c0ff_ffee];
+
+        let communicators = make_unix_communicators(num_parties);
+
+        let thread_handles: Vec<_> = communicators
+            .into_iter()
+            .enumerate()
+            .map(|(party_id, mut communicator)| {
+                thread::spawn(move || {
+                    if party_id == 0 {
+                        let fut_1 = communicator.receive::<u32>(1);
+                        let fut_2 = communicator.receive::<[u32; 2]>(2);
+                        communicator.send(1, msg_0);
+                        communicator.send(2, msg_0);
+                        let val_1 = fut_1.get();
+                        let val_2 = fut_2.get();
+                        assert!(val_1.is_ok());
+                        assert!(val_2.is_ok());
+                        assert_eq!(val_1.unwrap(), msg_1);
+                        assert_eq!(val_2.unwrap(), msg_2);
+                    } else if party_id == 1 {
+                        let fut_0 = communicator.receive::<u8>(0);
+                        let fut_2 = communicator.receive::<[u32; 2]>(2);
+                        communicator.send(0, msg_1);
+                        communicator.send(2, msg_1);
+                        let val_0 = fut_0.get();
+                        let val_2 = fut_2.get();
+                        assert!(val_0.is_ok());
+                        assert!(val_2.is_ok());
+                        assert_eq!(val_0.unwrap(), msg_0);
+                        assert_eq!(val_2.unwrap(), msg_2);
+                    } else if party_id == 2 {
+                        let fut_0 = communicator.receive::<u8>(0);
+                        let fut_1 = communicator.receive::<u32>(1);
+                        communicator.send(0, msg_2);
+                        communicator.send(1, msg_2);
+                        let val_0 = fut_0.get();
+                        let val_1 = fut_1.get();
+                        assert!(val_0.is_ok());
+                        assert!(val_1.is_ok());
+                        assert_eq!(val_0.unwrap(), msg_0);
+                        assert_eq!(val_1.unwrap(), msg_1);
+                    }
+                    communicator.shutdown();
+                })
+            })
+            .collect();
+
+        thread_handles.into_iter().for_each(|h| h.join().unwrap());
+    }
+}