sndthread.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /**
  2. \file sndthread.cpp
  3. \author michael.zohner@ec-spride.de
  4. \copyright ABY - A Framework for Efficient Mixed-protocol Secure Two-party Computation
  5. Copyright (C) 2019 ENCRYPTO Group, TU Darmstadt
  6. This program is free software: you can redistribute it and/or modify
  7. it under the terms of the GNU Lesser General Public License as published
  8. by the Free Software Foundation, either version 3 of the License, or
  9. (at your option) any later version.
  10. ABY is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. GNU Lesser General Public License for more details.
  14. You should have received a copy of the GNU Lesser General Public License
  15. along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. \brief Receiver Thread Implementation
  17. */
  18. #include "sndthread.h"
  19. #include "socket.h"
  20. #include "constants.h"
  21. #include <cassert>
  22. #include <cstring>
  23. SndThread::SndThread(CSocket* sock, CLock *glock)
  24. : mysock(sock), sndlock(glock), send(std::make_unique<CEvent>())
  25. {
  26. }
  27. void SndThread::stop() {
  28. kill_task();
  29. }
  30. SndThread::~SndThread() {
  31. kill_task();
  32. this->Wait();
  33. }
  34. CLock* SndThread::getlock() const {
  35. return sndlock;
  36. }
  37. void SndThread::setlock(CLock *glock) {
  38. sndlock = glock;
  39. }
  40. void SndThread::push_task(std::unique_ptr<snd_task> task)
  41. {
  42. sndlock->Lock();
  43. send_tasks.push(std::move(task));
  44. sndlock->Unlock();
  45. send->Set();
  46. }
  47. void SndThread::add_event_snd_task_start_len(CEvent* eventcaller, uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf, uint64_t startid, uint64_t len) {
  48. assert(channelid != ADMIN_CHANNEL);
  49. auto task = std::make_unique<snd_task>();
  50. task->channelid = channelid;
  51. task->eventcaller = eventcaller;
  52. size_t bytelen = sndbytes + 2 * sizeof(uint64_t);
  53. task->snd_buf.resize(bytelen);
  54. memcpy(task->snd_buf.data(), &startid, sizeof(uint64_t));
  55. memcpy(task->snd_buf.data()+sizeof(uint64_t), &len, sizeof(uint64_t));
  56. memcpy(task->snd_buf.data()+2*sizeof(uint64_t), sndbuf, sndbytes);
  57. //std::cout << "Adding a new task that is supposed to send " << task->bytelen << " bytes on channel " << (uint32_t) channelid << std::endl;
  58. push_task(std::move(task));
  59. }
  60. void SndThread::add_snd_task_start_len(uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf, uint64_t startid, uint64_t len) {
  61. //Call the method blocking but since callback is nullptr nobody gets notified, other functionallity is equal
  62. add_event_snd_task_start_len(nullptr, channelid, sndbytes, sndbuf, startid, len);
  63. }
  64. void SndThread::add_event_snd_task(CEvent* eventcaller, uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf) {
  65. assert(channelid != ADMIN_CHANNEL);
  66. auto task = std::make_unique<snd_task>();
  67. task->channelid = channelid;
  68. task->eventcaller = eventcaller;
  69. task->snd_buf.resize(sndbytes);
  70. memcpy(task->snd_buf.data(), sndbuf, sndbytes);
  71. push_task(std::move(task));
  72. //std::cout << "Event set" << std::endl;
  73. }
  74. void SndThread::add_snd_task(uint8_t channelid, uint64_t sndbytes, uint8_t* sndbuf) {
  75. //Call the method blocking but since callback is nullptr nobody gets notified, other functionallity is equal
  76. add_event_snd_task(nullptr, channelid, sndbytes, sndbuf);
  77. }
  78. void SndThread::signal_end(uint8_t channelid) {
  79. add_snd_task(channelid, 0, nullptr);
  80. //std::cout << "Signalling end on channel " << (uint32_t) channelid << std::endl;
  81. }
  82. void SndThread::kill_task() {
  83. auto task = std::make_unique<snd_task>();
  84. task->channelid = ADMIN_CHANNEL;
  85. task->snd_buf = {0};
  86. push_task(std::move(task));
  87. #ifdef DEBUG_SEND_THREAD
  88. std::cout << "Killing channel " << (uint32_t) task->channelid << std::endl;
  89. #endif
  90. }
  91. void SndThread::ThreadMain() {
  92. uint8_t channelid;
  93. uint32_t iters;
  94. bool run = true;
  95. bool empty = true;
  96. while(run) {
  97. sndlock->Lock();
  98. empty = send_tasks.empty();
  99. sndlock->Unlock();
  100. if(empty){
  101. send->Wait();
  102. }
  103. //std::cout << "Awoken" << std::endl;
  104. sndlock->Lock();
  105. iters = send_tasks.size();
  106. sndlock->Unlock();
  107. while((iters--) && run) {
  108. sndlock->Lock();
  109. auto task = std::move(send_tasks.front());
  110. send_tasks.pop();
  111. sndlock->Unlock();
  112. channelid = task->channelid;
  113. mysock->Send(&channelid, sizeof(uint8_t));
  114. uint64_t bytelen = task->snd_buf.size();
  115. mysock->Send(&bytelen, sizeof(bytelen));
  116. if(bytelen > 0) {
  117. mysock->Send(task->snd_buf.data(), task->snd_buf.size());
  118. }
  119. #ifdef DEBUG_SEND_THREAD
  120. std::cout << "Sending on channel " << (uint32_t) channelid << " a message of " << task->bytelen << " bytes length" << std::endl;
  121. #endif
  122. if(channelid == ADMIN_CHANNEL) {
  123. //delete sndlock;
  124. run = false;
  125. }
  126. if(task->eventcaller != nullptr) {
  127. task->eventcaller->Set();
  128. }
  129. }
  130. }
  131. }
  132. ;