shared_queue.hpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /* Copyright (C) 2014 Carlos Aguilar Melchor, Joris Barrier, Marc-Olivier Killijian
  2. * This file is part of XPIR.
  3. *
  4. * XPIR is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * XPIR is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with XPIR. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #ifndef DEF_SHARED_BUFFER
  18. #define DEF_SHARED_BUFFER
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <limits>
  22. #include <string>
  23. #include <boost/thread.hpp>
  24. #ifdef __APPLE__
  25. #include <boost/interprocess/sync/named_semaphore.hpp>
  26. #else
  27. #include <boost/interprocess/sync/interprocess_semaphore.hpp>
  28. #endif
  29. #include <boost/pending/queue.hpp>
  30. using namespace boost;
  31. using namespace boost::interprocess;
  32. using namespace std;
  33. template<typename T>
  34. class shared_queue
  35. {
  36. public:
  37. shared_queue(const std::string& name, unsigned int max_size=SEM_VALUE_MAX);
  38. ~shared_queue();
  39. std::string id;
  40. void push(T);
  41. void pop();
  42. unsigned int size();
  43. T front();
  44. T pop_front();
  45. bool empty();
  46. private:
  47. boost::mutex mutex;
  48. #ifdef __APPLE__
  49. named_semaphore num_stored, num_space;
  50. #else
  51. interprocess_semaphore num_stored, num_space;
  52. #endif
  53. //Items to fill
  54. boost::queue<T> data;
  55. };
  56. template <typename T>
  57. shared_queue<T>::shared_queue(const std::string& name, unsigned int max_size):
  58. #ifdef __APPLE__
  59. num_stored(open_or_create, string(name + "_num_stored").c_str(), 0, permissions(777)),
  60. num_space(open_or_create, string(name + "_num_space").c_str(), max_size, permissions(777)),
  61. id(name)
  62. #else
  63. num_stored(0), num_space(max_size)
  64. #endif
  65. {
  66. #ifdef __APPLE__
  67. /*reset semaphore*/
  68. while (num_stored.try_wait()){}
  69. if (num_space.try_wait() == false) {
  70. for (unsigned int i = 0 ; i < max_size ; i++) {
  71. num_space.post();
  72. }
  73. }
  74. else {
  75. num_space.post();
  76. }
  77. #endif
  78. }
  79. // template <typename T>
  80. // shared_queue<T>::shared_queue(const std::string& name):
  81. // shared_queue(name, SEM_VALUE_MAX)
  82. // {}
  83. template <typename T>
  84. void shared_queue<T>::push(T item){
  85. num_space.wait();
  86. mutex.lock();
  87. data.push(item);
  88. mutex.unlock();
  89. num_stored.post();
  90. }
  91. template <typename T>
  92. T shared_queue<T>::front(){
  93. T pt;
  94. num_stored.wait();
  95. mutex.lock();
  96. pt = data.front();
  97. mutex.unlock();
  98. num_stored.post();
  99. return pt;
  100. }
  101. template <typename T>
  102. T shared_queue<T>::pop_front(){
  103. T pt;
  104. num_stored.wait();
  105. mutex.lock();
  106. pt = data.front();
  107. data.pop();
  108. mutex.unlock();
  109. num_space.post();
  110. return pt;
  111. }
  112. template <typename T>
  113. void shared_queue<T>::pop(){
  114. num_stored.wait();
  115. mutex.lock();
  116. data.pop();
  117. mutex.unlock();
  118. num_space.post();
  119. }
  120. template <typename T>
  121. bool shared_queue<T>::empty(){
  122. bool b;
  123. mutex.lock();
  124. b = data.empty();
  125. mutex.unlock();
  126. return b;
  127. }
  128. template <typename T>
  129. unsigned int shared_queue<T>::size(){
  130. int s;
  131. mutex.lock();
  132. s = data.size();
  133. mutex.unlock();
  134. return s;
  135. }
  136. template <typename T>
  137. shared_queue<T>::~shared_queue(){
  138. #ifdef __APPLE__
  139. boost::interprocess::named_semaphore::remove(string(id + "_mutex").c_str());
  140. boost::interprocess::named_semaphore::remove(string(id + "_num_stored").c_str());
  141. boost::interprocess::named_semaphore::remove(string(id + "_num_space").c_str());
  142. #endif
  143. }
  144. #endif