sort.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #include <map>
  2. #include <deque>
  3. #include <pthread.h>
  4. #include "sort.hpp"
  5. // A set of precomputed WaksmanNetworks of a given size
  6. struct SizedWNs {
  7. pthread_mutex_t mutex;
  8. std::deque<WaksmanNetwork> wns;
  9. SizedWNs() { pthread_mutex_init(&mutex, NULL); }
  10. };
  11. // A (mutexed) map mapping sizes to SizedWNs
  12. struct PrecompWNs {
  13. pthread_mutex_t mutex;
  14. std::map<uint32_t,SizedWNs> sized_wns;
  15. PrecompWNs() { pthread_mutex_init(&mutex, NULL); }
  16. };
  17. static PrecompWNs precomp_wns;
  18. // A (mutexed) vector of sizes we've used since we were last asked
  19. struct UsedSizes {
  20. pthread_mutex_t mutex;
  21. std::vector<uint32_t> used;
  22. UsedSizes() { pthread_mutex_init(&mutex, NULL); }
  23. };
  24. static UsedSizes used_sizes;
  25. // A (mutexed) map mapping (N, nthreads) pairs to WNEvalPlans
  26. struct EvalPlans {
  27. pthread_mutex_t mutex;
  28. std::map<std::pair<uint32_t,threadid_t>,WNEvalPlan> eval_plans;
  29. EvalPlans() { pthread_mutex_init(&mutex, NULL); }
  30. };
  31. static EvalPlans precomp_eps;
  32. size_t sort_precompute(uint32_t N)
  33. {
  34. uint32_t *random_permutation = NULL;
  35. try {
  36. random_permutation = new uint32_t[N];
  37. } catch (std::bad_alloc&) {
  38. printf("Allocating memory failed in sort_precompute\n");
  39. assert(false);
  40. }
  41. for (uint32_t i=0;i<N;++i) {
  42. random_permutation[i] = i;
  43. }
  44. RecursiveShuffle_M2((unsigned char *) random_permutation, N, sizeof(uint32_t));
  45. WaksmanNetwork wnet(N);
  46. wnet.setPermutation(random_permutation);
  47. delete[] random_permutation;
  48. // Note that sized_wns[N] creates a map entry for N if it doesn't yet exist
  49. pthread_mutex_lock(&precomp_wns.mutex);
  50. SizedWNs& szwn = precomp_wns.sized_wns[N];
  51. pthread_mutex_unlock(&precomp_wns.mutex);
  52. pthread_mutex_lock(&szwn.mutex);
  53. szwn.wns.push_back(std::move(wnet));
  54. size_t ret = szwn.wns.size();
  55. pthread_mutex_unlock(&szwn.mutex);
  56. return ret;
  57. }
  58. void sort_precompute_evalplan(uint32_t N, threadid_t nthreads)
  59. {
  60. std::pair<uint32_t,threadid_t> idx = {N, nthreads};
  61. pthread_mutex_lock(&precomp_eps.mutex);
  62. if (!precomp_eps.eval_plans.count(idx)) {
  63. precomp_eps.eval_plans.try_emplace(idx, N, nthreads);
  64. }
  65. pthread_mutex_unlock(&precomp_eps.mutex);
  66. }
  67. // Shuffle Nr items at the beginning of an allocated array of Na items
  68. // using up to nthreads threads. The items to shuffle are byte arrays
  69. // of size msg_size. Return Nw, the size of the Waksman network we
  70. // used, which must satisfy Nr <= Nw <= Na.
  71. uint32_t shuffle_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
  72. uint32_t Nr, uint32_t Na)
  73. {
  74. // Find the smallest Nw for which we have a precomputed
  75. // WaksmanNetwork with Nr <= Nw <= Na
  76. pthread_mutex_lock(&precomp_wns.mutex);
  77. std::optional<WaksmanNetwork> wn;
  78. uint32_t Nw = 0;
  79. for (auto& N : precomp_wns.sized_wns) {
  80. if (N.first > Na) {
  81. printf("No precomputed WaksmanNetworks of size at most %u\n", Na);
  82. assert(false);
  83. }
  84. // We're actually going to only use WaksmanNetworks of size
  85. // exactly Na. Otherwise we might use a smaller WN than we
  86. // were asked for, but we might need that one later.
  87. if (N.first < Na /*Nr*/) {
  88. continue;
  89. }
  90. // We're in the right range, but see if we have an actual
  91. // precomputed WaksmanNetwork
  92. pthread_mutex_lock(&N.second.mutex);
  93. if (N.second.wns.size() == 0) {
  94. pthread_mutex_unlock(&N.second.mutex);
  95. continue;
  96. }
  97. pthread_mutex_lock(&used_sizes.mutex);
  98. used_sizes.used.push_back(N.first);
  99. pthread_mutex_unlock(&used_sizes.mutex);
  100. wn = std::move(N.second.wns.front());
  101. N.second.wns.pop_front();
  102. Nw = N.first;
  103. pthread_mutex_unlock(&N.second.mutex);
  104. break;
  105. }
  106. pthread_mutex_unlock(&precomp_wns.mutex);
  107. if (!wn) {
  108. printf("No precomputed WaksmanNetwork of size range [%u,%u] found.\n",
  109. Nr, Na);
  110. assert(wn);
  111. }
  112. std::pair<uint32_t,threadid_t> epidx = {Nw, nthreads};
  113. pthread_mutex_lock(&precomp_eps.mutex);
  114. if (!precomp_eps.eval_plans.count(epidx)) {
  115. printf("No precomputed WNEvalPlan with N=%u, nthreads=%hu\n",
  116. Nw, nthreads);
  117. assert(false);
  118. }
  119. const WNEvalPlan &eval_plan = precomp_eps.eval_plans.at(epidx);
  120. pthread_mutex_unlock(&precomp_eps.mutex);
  121. // Mark Nw-Nr items as padding (Nr, Na, and Nw are _not_ private)
  122. for (uint32_t i=Nr; i<Nw; ++i) {
  123. (*(uint32_t*)(items+msg_size*i)) = uint32_t(-1);
  124. }
  125. // Shuffle Nw items
  126. wn.value().applyInversePermutation<OSWAP_16X>(
  127. items, msg_size, eval_plan);
  128. return Nw;
  129. }
  130. std::vector<uint32_t> sort_get_used()
  131. {
  132. std::vector<uint32_t> res;
  133. pthread_mutex_lock(&used_sizes.mutex);
  134. res = std::move(used_sizes.used);
  135. pthread_mutex_unlock(&used_sizes.mutex);
  136. return res;
  137. }