rcvthread.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /**
  2. \file rcvthread.cpp
  3. \author michael.zohner@ec-spride.de
  4. \copyright ABY - A Framework for Efficient Mixed-protocol Secure Two-party Computation
  5. Copyright (C) 2019 ENCRYPTO Group, TU Darmstadt
  6. This program is free software: you can redistribute it and/or modify
  7. it under the terms of the GNU Lesser General Public License as published
  8. by the Free Software Foundation, either version 3 of the License, or
  9. (at your option) any later version.
  10. ABY is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. GNU Lesser General Public License for more details.
  14. You should have received a copy of the GNU Lesser General Public License
  15. along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. \brief Receiver Thread Implementation
  17. */
  18. #include "rcvthread.h"
  19. #include "typedefs.h"
  20. #include "constants.h"
  21. #include "socket.h"
  22. #include <cassert>
  23. #include <cstdlib>
  24. #include <iostream>
  25. RcvThread::RcvThread(CSocket* sock, CLock *glock)
  26. :rcvlock(glock), mysock(sock), listeners()
  27. {
  28. listeners[ADMIN_CHANNEL].inuse = true;
  29. }
  30. RcvThread::~RcvThread() {
  31. this->Wait();
  32. for(size_t i = 0; i < listeners.size(); i++) {
  33. flush_queue(i);
  34. }
  35. //delete rcvlock;
  36. }
  37. CLock* RcvThread::getlock() const {
  38. return rcvlock;
  39. }
  40. void RcvThread::setlock(CLock *glock) {
  41. rcvlock = glock;
  42. }
  43. void RcvThread::flush_queue(uint8_t channelid) {
  44. std::lock_guard<std::mutex> lock(listeners[channelid].rcv_buf_mutex);
  45. while(!listeners[channelid].rcv_buf.empty()) {
  46. rcv_ctx* tmp = listeners[channelid].rcv_buf.front();
  47. free(tmp->buf);
  48. free(tmp);
  49. listeners[channelid].rcv_buf.pop();
  50. }
  51. }
  52. void RcvThread::remove_listener(uint8_t channelid) {
  53. rcvlock->Lock();
  54. if(listeners[channelid].inuse) {
  55. listeners[channelid].fin_event->Set();
  56. listeners[channelid].inuse = false;
  57. #ifdef DEBUG_RECEIVE_THREAD
  58. std::cout << "Unsetting channel " << (uint32_t) channelid << std::endl;
  59. #endif
  60. } else {
  61. listeners[channelid].forward_notify_fin = true;
  62. }
  63. rcvlock->Unlock();
  64. }
  65. std::queue<rcv_ctx*>*
  66. RcvThread::add_listener(uint8_t channelid, CEvent* rcv_event, CEvent* fin_event) {
  67. rcvlock->Lock();
  68. #ifdef DEBUG_RECEIVE_THREAD
  69. std::cout << "Registering listener on channel " << (uint32_t) channelid << std::endl;
  70. #endif
  71. if(listeners[channelid].inuse || channelid == ADMIN_CHANNEL) {
  72. std::cerr << "A listener has already been registered on channel " << (uint32_t) channelid << std::endl;
  73. assert(!listeners[channelid].inuse);
  74. assert(channelid != ADMIN_CHANNEL);
  75. }
  76. //listeners[channelid].rcv_buf = rcv_buf;
  77. listeners[channelid].rcv_event = rcv_event;
  78. listeners[channelid].fin_event = fin_event;
  79. listeners[channelid].inuse = true;
  80. // assert(listeners[channelid].rcv_buf->empty());
  81. //std::cout << "Successfully registered on channel " << (uint32_t) channelid << std::endl;
  82. rcvlock->Unlock();
  83. if(listeners[channelid].forward_notify_fin) {
  84. listeners[channelid].forward_notify_fin = false;
  85. remove_listener(channelid);
  86. }
  87. return &listeners[channelid].rcv_buf;
  88. }
  89. std::mutex& RcvThread::get_listener_mutex(uint8_t channelid)
  90. {
  91. return listeners[channelid].rcv_buf_mutex;
  92. }
  93. void RcvThread::ThreadMain() {
  94. uint8_t channelid;
  95. uint64_t rcvbytelen;
  96. uint64_t rcv_len;
  97. while(true) {
  98. //std::cout << "Starting to receive data" << std::endl;
  99. rcv_len = 0;
  100. rcv_len += mysock->Receive(&channelid, sizeof(uint8_t));
  101. rcv_len += mysock->Receive(&rcvbytelen, sizeof(uint64_t));
  102. if(rcv_len > 0) {
  103. #ifdef DEBUG_RECEIVE_THREAD
  104. std::cout << "Received value on channel " << (uint32_t) channelid << " with " << rcvbytelen <<
  105. " bytes length (" << rcv_len << ")" << std::endl;
  106. #endif
  107. if(channelid == ADMIN_CHANNEL) {
  108. std::vector<uint8_t> tmprcvbuf(rcvbytelen);
  109. mysock->Receive(tmprcvbuf.data(), rcvbytelen);
  110. //TODO: Right now finish, can be used for other maintenance tasks
  111. //std::cout << "Got message on Admin channel, shutting down" << std::endl;
  112. #ifdef DEBUG_RECEIVE_THREAD
  113. std::cout << "Receiver thread is being killed" << std::endl;
  114. #endif
  115. return;//continue;
  116. }
  117. if(rcvbytelen == 0) {
  118. remove_listener(channelid);
  119. } else {
  120. rcv_ctx* rcv_buf = (rcv_ctx*) malloc(sizeof(rcv_ctx));
  121. rcv_buf->buf = (uint8_t*) malloc(rcvbytelen);
  122. rcv_buf->rcvbytes = rcvbytelen;
  123. mysock->Receive(rcv_buf->buf, rcvbytelen);
  124. rcvlock->Lock();
  125. {
  126. std::lock_guard<std::mutex> lock(listeners[channelid].rcv_buf_mutex);
  127. listeners[channelid].rcv_buf.push(rcv_buf);
  128. }
  129. bool cond = listeners[channelid].inuse;
  130. rcvlock->Unlock();
  131. if(cond)
  132. listeners[channelid].rcv_event->Set();
  133. }
  134. } else {
  135. // We received 0 bytes, probably due to some major error. Just return.
  136. // TODO: Probably add some more elaborate error handling.
  137. return;
  138. }
  139. }
  140. }