/** \file sndthread.cpp \author michael.zohner@ec-spride.de \copyright ABY - A Framework for Efficient Mixed-protocol Secure Two-party Computation Copyright (C) 2019 ENCRYPTO Group, TU Darmstadt This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ABY is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . \brief Receiver Thread Implementation */ #include "sndthread.h" #include "socket.h" #include "constants.h" #include #include SndThread::SndThread(CSocket* sock, CLock *glock) : mysock(sock), sndlock(glock), send(std::make_unique()) { } void SndThread::stop() { kill_task(); } SndThread::~SndThread() { kill_task(); this->Wait(); } CLock* SndThread::getlock() const { return sndlock; } void SndThread::setlock(CLock *glock) { sndlock = glock; } void SndThread::push_task(std::unique_ptr task) { sndlock->Lock(); send_tasks.push(std::move(task)); sndlock->Unlock(); send->Set(); } void SndThread::add_event_snd_task_start_len(CEvent* eventcaller, uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf, uint64_t startid, uint64_t len) { assert(channelid != ADMIN_CHANNEL); auto task = std::make_unique(); task->channelid = channelid; task->eventcaller = eventcaller; size_t bytelen = sndbytes + 2 * sizeof(uint64_t); task->snd_buf.resize(bytelen); memcpy(task->snd_buf.data(), &startid, sizeof(uint64_t)); memcpy(task->snd_buf.data()+sizeof(uint64_t), &len, sizeof(uint64_t)); memcpy(task->snd_buf.data()+2*sizeof(uint64_t), sndbuf, sndbytes); //std::cout << "Adding a new task that is supposed to send " << task->bytelen << " bytes on channel " << (uint32_t) channelid << std::endl; push_task(std::move(task)); } void SndThread::add_snd_task_start_len(uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf, uint64_t startid, uint64_t len) { //Call the method blocking but since callback is nullptr nobody gets notified, other functionallity is equal add_event_snd_task_start_len(nullptr, channelid, sndbytes, sndbuf, startid, len); } void SndThread::add_event_snd_task(CEvent* eventcaller, uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf) { assert(channelid != ADMIN_CHANNEL); auto task = std::make_unique(); task->channelid = channelid; task->eventcaller = eventcaller; task->snd_buf.resize(sndbytes); memcpy(task->snd_buf.data(), sndbuf, sndbytes); push_task(std::move(task)); //std::cout << "Event set" << std::endl; } void SndThread::add_snd_task(uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf) { //Call the method blocking but since callback is nullptr nobody gets notified, other functionallity is equal add_event_snd_task(nullptr, channelid, sndbytes, sndbuf); } void SndThread::signal_end(uint8_t channelid) { add_snd_task(channelid, 0, nullptr); //std::cout << "Signalling end on channel " << (uint32_t) channelid << std::endl; } void SndThread::kill_task() { auto task = std::make_unique(); task->channelid = ADMIN_CHANNEL; task->snd_buf = {0}; push_task(std::move(task)); #ifdef DEBUG_SEND_THREAD std::cout << "Killing channel " << (uint32_t) task->channelid << std::endl; #endif } void SndThread::ThreadMain() { uint8_t channelid; uint32_t iters; bool run = true; bool empty = true; while(run) { sndlock->Lock(); empty = send_tasks.empty(); sndlock->Unlock(); if(empty){ send->Wait(); } //std::cout << "Awoken" << std::endl; sndlock->Lock(); iters = send_tasks.size(); sndlock->Unlock(); while((iters--) && run) { sndlock->Lock(); auto task = std::move(send_tasks.front()); send_tasks.pop(); sndlock->Unlock(); channelid = task->channelid; mysock->Send(&channelid, sizeof(uint8_t)); uint64_t bytelen = task->snd_buf.size(); mysock->Send(&bytelen, sizeof(bytelen)); if(bytelen > 0) { mysock->Send(task->snd_buf.data(), task->snd_buf.size()); } #ifdef DEBUG_SEND_THREAD std::cout << "Sending on channel " << (uint32_t) channelid << " a message of " << task->bytelen << " bytes length" << std::endl; #endif if(channelid == ADMIN_CHANNEL) { //delete sndlock; run = false; } if(task->eventcaller != nullptr) { task->eventcaller->Set(); } } } } ;