123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- #include <map>
- #include <deque>
- #include <pthread.h>
- #include "sort.hpp"
- // A set of precomputed WaksmanNetworks of a given size
- struct SizedWNs {
- pthread_mutex_t mutex;
- std::deque<WaksmanNetwork> wns;
- SizedWNs() { pthread_mutex_init(&mutex, NULL); }
- };
- // A (mutexed) map mapping sizes to SizedWNs
- struct PrecompWNs {
- pthread_mutex_t mutex;
- std::map<uint32_t,SizedWNs> sized_wns;
- PrecompWNs() { pthread_mutex_init(&mutex, NULL); }
- };
- static PrecompWNs precomp_wns;
- // A (mutexed) vector of sizes we've used since we were last asked
- struct UsedSizes {
- pthread_mutex_t mutex;
- std::vector<uint32_t> used;
- UsedSizes() { pthread_mutex_init(&mutex, NULL); }
- };
- static UsedSizes used_sizes;
- // A (mutexed) map mapping (N, nthreads) pairs to WNEvalPlans
- struct EvalPlans {
- pthread_mutex_t mutex;
- std::map<std::pair<uint32_t,threadid_t>,WNEvalPlan> eval_plans;
- EvalPlans() { pthread_mutex_init(&mutex, NULL); }
- };
- static EvalPlans precomp_eps;
- size_t sort_precompute(uint32_t N)
- {
- uint32_t *random_permutation = NULL;
- try {
- random_permutation = new uint32_t[N];
- } catch (std::bad_alloc&) {
- printf("Allocating memory failed in sort_precompute\n");
- assert(false);
- }
- for (uint32_t i=0;i<N;++i) {
- random_permutation[i] = i;
- }
- RecursiveShuffle_M2((unsigned char *) random_permutation, N, sizeof(uint32_t));
- WaksmanNetwork wnet(N);
- wnet.setPermutation(random_permutation);
- delete[] random_permutation;
- // Note that sized_wns[N] creates a map entry for N if it doesn't yet exist
- pthread_mutex_lock(&precomp_wns.mutex);
- SizedWNs& szwn = precomp_wns.sized_wns[N];
- pthread_mutex_unlock(&precomp_wns.mutex);
- pthread_mutex_lock(&szwn.mutex);
- szwn.wns.push_back(std::move(wnet));
- size_t ret = szwn.wns.size();
- pthread_mutex_unlock(&szwn.mutex);
- return ret;
- }
- void sort_precompute_evalplan(uint32_t N, threadid_t nthreads)
- {
- std::pair<uint32_t,threadid_t> idx = {N, nthreads};
- pthread_mutex_lock(&precomp_eps.mutex);
- if (!precomp_eps.eval_plans.count(idx)) {
- precomp_eps.eval_plans.try_emplace(idx, N, nthreads);
- }
- pthread_mutex_unlock(&precomp_eps.mutex);
- }
- // Shuffle Nr items at the beginning of an allocated array of Na items
- // using up to nthreads threads. The items to shuffle are byte arrays
- // of size msg_size. Return Nw, the size of the Waksman network we
- // used, which must satisfy Nr <= Nw <= Na.
- uint32_t shuffle_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
- uint32_t Nr, uint32_t Na)
- {
- // Find the smallest Nw for which we have a precomputed
- // WaksmanNetwork with Nr <= Nw <= Na
- pthread_mutex_lock(&precomp_wns.mutex);
- std::optional<WaksmanNetwork> wn;
- uint32_t Nw = 0;
- for (auto& N : precomp_wns.sized_wns) {
- if (N.first > Na) {
- printf("No precomputed WaksmanNetworks of size at most %u\n", Na);
- assert(false);
- }
- // We're actually going to only use WaksmanNetworks of size
- // exactly Na. Otherwise we might use a smaller WN than we
- // were asked for, but we might need that one later.
- if (N.first < Na /*Nr*/) {
- continue;
- }
- // We're in the right range, but see if we have an actual
- // precomputed WaksmanNetwork
- pthread_mutex_lock(&N.second.mutex);
- if (N.second.wns.size() == 0) {
- pthread_mutex_unlock(&N.second.mutex);
- continue;
- }
- pthread_mutex_lock(&used_sizes.mutex);
- used_sizes.used.push_back(N.first);
- pthread_mutex_unlock(&used_sizes.mutex);
- wn = std::move(N.second.wns.front());
- N.second.wns.pop_front();
- Nw = N.first;
- pthread_mutex_unlock(&N.second.mutex);
- break;
- }
- pthread_mutex_unlock(&precomp_wns.mutex);
- if (!wn) {
- printf("No precomputed WaksmanNetwork of size range [%u,%u] found.\n",
- Nr, Na);
- assert(wn);
- }
- std::pair<uint32_t,threadid_t> epidx = {Nw, nthreads};
- pthread_mutex_lock(&precomp_eps.mutex);
- if (!precomp_eps.eval_plans.count(epidx)) {
- printf("No precomputed WNEvalPlan with N=%u, nthreads=%hu\n",
- Nw, nthreads);
- assert(false);
- }
- const WNEvalPlan &eval_plan = precomp_eps.eval_plans.at(epidx);
- pthread_mutex_unlock(&precomp_eps.mutex);
- // Mark Nw-Nr items as padding (Nr, Na, and Nw are _not_ private)
- for (uint32_t i=Nr; i<Nw; ++i) {
- (*(uint32_t*)(items+msg_size*i)) = uint32_t(-1);
- }
- // Shuffle Nw items
- wn.value().applyInversePermutation<OSWAP_16X>(
- items, msg_size, eval_plan);
- return Nw;
- }
- std::vector<uint32_t> sort_get_used()
- {
- std::vector<uint32_t> res;
- pthread_mutex_lock(&used_sizes.mutex);
- res = std::move(used_sizes.used);
- pthread_mutex_unlock(&used_sizes.mutex);
- return res;
- }
|