/** \file channel.cpp \author michael.zohner@ec-spride.de \copyright ABY - A Framework for Efficient Mixed-protocol Secure Two-party Computation Copyright (C) 2019 Engineering Cryptographic Protocols Group, TU Darmstadt This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ABY is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "channel.h" #include "typedefs.h" #include "rcvthread.h" #include "sndthread.h" #include #include channel::channel(uint8_t channelid, RcvThread* rcver, SndThread* snder) : m_bChannelID(channelid), m_cRcver(rcver), m_cSnder(snder), m_eRcved(std::make_unique()), m_eFin(std::make_unique()), m_bSndAlive(true), m_bRcvAlive(true), m_qRcvedBlocks(rcver->add_listener(channelid, m_eRcved.get(), m_eFin.get())), m_qRcvedBlocks_mutex_(rcver->get_listener_mutex(channelid)) { assert(rcver->getlock() == snder->getlock()); } channel::~channel() { if(m_bRcvAlive) { m_cRcver->remove_listener(m_bChannelID); } } void channel::send(uint8_t* buf, uint64_t nbytes) { assert(m_bSndAlive); m_cSnder->add_snd_task(m_bChannelID, nbytes, buf); } void channel::blocking_send(CEvent* eventcaller, uint8_t* buf, uint64_t nbytes) { assert(m_bSndAlive); m_cSnder->add_event_snd_task(eventcaller, m_bChannelID, nbytes, buf); eventcaller->Wait(); } void channel::send_id_len(uint8_t* buf, uint64_t nbytes, uint64_t id, uint64_t len) { assert(m_bSndAlive); m_cSnder->add_snd_task_start_len(m_bChannelID, nbytes, buf, id, len); } void channel::blocking_send_id_len(CEvent* eventcaller, uint8_t* buf, uint64_t nbytes, uint64_t id, uint64_t len) { assert(m_bSndAlive); m_cSnder->add_event_snd_task_start_len(eventcaller, m_bChannelID, nbytes, buf, id, len); eventcaller->Wait(); } //buf needs to be freed, data contains the payload uint8_t* channel::blocking_receive_id_len(uint8_t** data, uint64_t* id, uint64_t* len) { uint8_t* buf = blocking_receive(); *data = buf; *id = *((uint64_t*) *data); (*data) += sizeof(uint64_t); *len = *((uint64_t*) *data); (*data) += sizeof(uint64_t); return buf; } bool channel::queue_empty() const { std::lock_guard lock(m_qRcvedBlocks_mutex_); bool qempty = m_qRcvedBlocks->empty(); return qempty; } uint8_t* channel::blocking_receive() { assert(m_bRcvAlive); while(queue_empty()) m_eRcved->Wait(); rcv_ctx* ret = nullptr; uint8_t* ret_block = nullptr; { std::lock_guard lock(m_qRcvedBlocks_mutex_); ret = (rcv_ctx*) m_qRcvedBlocks->front(); ret_block = ret->buf; m_qRcvedBlocks->pop(); } free(ret); return ret_block; } void channel::blocking_receive(uint8_t* rcvbuf, uint64_t rcvsize) { assert(m_bRcvAlive); while(queue_empty()) m_eRcved->Wait(); std::unique_lock lock(m_qRcvedBlocks_mutex_); rcv_ctx* ret = (rcv_ctx*) m_qRcvedBlocks->front(); uint8_t* ret_block = ret->buf; uint64_t rcved_this_call = ret->rcvbytes; if(rcved_this_call == rcvsize) { m_qRcvedBlocks->pop(); lock.unlock(); free(ret); } else if(rcvsize < rcved_this_call) { //if the block contains too much data, copy only the receive size ret->rcvbytes -= rcvsize; uint8_t* newbuf = (uint8_t*) malloc(ret->rcvbytes); memcpy(newbuf, ret->buf+rcvsize, ret->rcvbytes); ret->buf = newbuf; lock.unlock(); rcved_this_call = rcvsize; } else { //I want to receive more data than are in that block. Perform recursive call (might become troublesome for too many recursion steps) m_qRcvedBlocks->pop(); lock.unlock(); free(ret); uint8_t* new_rcvbuf_start = rcvbuf + rcved_this_call; uint64_t new_rcvsize = rcvsize -rcved_this_call; blocking_receive(new_rcvbuf_start, new_rcvsize); } memcpy(rcvbuf, ret_block, rcved_this_call); free(ret_block); } bool channel::is_alive() { return (!(queue_empty() && m_eFin->IsSet())); } bool channel::data_available() { return !queue_empty(); } void channel::signal_end() { m_cSnder->signal_end(m_bChannelID); m_bSndAlive = false; } void channel::wait_for_fin() { m_eFin->Wait(); m_bRcvAlive = false; } void channel::synchronize_end() { if(m_bSndAlive) signal_end(); if(m_bRcvAlive) m_cRcver->flush_queue(m_bChannelID); if(m_bRcvAlive) wait_for_fin(); }