#include #include #include #include "sort.hpp" // A set of precomputed WaksmanNetworks of a given size struct SizedWNs { pthread_mutex_t mutex; std::deque wns; SizedWNs() { pthread_mutex_init(&mutex, NULL); } }; // A (mutexed) map mapping sizes to SizedWNs struct PrecompWNs { pthread_mutex_t mutex; std::map sized_wns; PrecompWNs() { pthread_mutex_init(&mutex, NULL); } }; static PrecompWNs precomp_wns; // A (mutexed) vector of sizes we've used since we were last asked struct UsedSizes { pthread_mutex_t mutex; std::vector used; UsedSizes() { pthread_mutex_init(&mutex, NULL); } }; static UsedSizes used_sizes; // A (mutexed) map mapping (N, nthreads) pairs to WNEvalPlans struct EvalPlans { pthread_mutex_t mutex; std::map,WNEvalPlan> eval_plans; EvalPlans() { pthread_mutex_init(&mutex, NULL); } }; static EvalPlans precomp_eps; size_t sort_precompute(uint32_t N) { uint32_t *random_permutation = NULL; try { random_permutation = new uint32_t[N]; } catch (std::bad_alloc&) { printf("Allocating memory failed in sort_precompute\n"); assert(false); } for (uint32_t i=0;i idx = {N, nthreads}; pthread_mutex_lock(&precomp_eps.mutex); if (!precomp_eps.eval_plans.count(idx)) { precomp_eps.eval_plans.try_emplace(idx, N, nthreads); } pthread_mutex_unlock(&precomp_eps.mutex); } // Shuffle Nr items at the beginning of an allocated array of Na items // using up to nthreads threads. The items to shuffle are byte arrays // of size msg_size. Return Nw, the size of the Waksman network we // used, which must satisfy Nr <= Nw <= Na. uint32_t shuffle_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size, uint32_t Nr, uint32_t Na) { // Find the smallest Nw for which we have a precomputed // WaksmanNetwork with Nr <= Nw <= Na pthread_mutex_lock(&precomp_wns.mutex); std::optional wn; uint32_t Nw = 0; for (auto& N : precomp_wns.sized_wns) { if (N.first > Na) { printf("No precomputed WaksmanNetworks of size at most %u\n", Na); assert(false); } if (N.first < Nr) { continue; } // We're in the right range, but see if we have an actual // precomputed WaksmanNetwork pthread_mutex_lock(&N.second.mutex); if (N.second.wns.size() == 0) { pthread_mutex_unlock(&N.second.mutex); continue; } pthread_mutex_lock(&used_sizes.mutex); used_sizes.used.push_back(N.first); pthread_mutex_unlock(&used_sizes.mutex); wn = std::move(N.second.wns.front()); N.second.wns.pop_front(); Nw = N.first; pthread_mutex_unlock(&N.second.mutex); break; } pthread_mutex_unlock(&precomp_wns.mutex); if (!wn) { printf("No precomputed WaksmanNetwork of size range [%u,%u] found.\n", Nr, Na); assert(wn); } std::pair epidx = {Nw, nthreads}; pthread_mutex_lock(&precomp_eps.mutex); if (!precomp_eps.eval_plans.count(epidx)) { printf("No precomputed WNEvalPlan with N=%u, nthreads=%hu\n", Nw, nthreads); assert(false); } const WNEvalPlan &eval_plan = precomp_eps.eval_plans.at(epidx); pthread_mutex_unlock(&precomp_eps.mutex); // Mark Nw-Nr items as padding (Nr, Na, and Nw are _not_ private) for (uint32_t i=Nr; i( items, msg_size, eval_plan); return Nw; } std::vector sort_get_used() { std::vector res; pthread_mutex_lock(&used_sizes.mutex); res = std::move(used_sizes.used); pthread_mutex_unlock(&used_sizes.mutex); return res; }