#include #include #include #include "sort.hpp" // A set of precomputed WaksmanNetworks of a given size struct SizedWNs { pthread_mutex_t mutex; std::deque wns; }; // A (mutexed) map mapping sizes to SizedWNs struct PrecompWNs { pthread_mutex_t mutex; std::map sized_wns; }; static PrecompWNs precomp_wns; // A (mutexed) map mapping (N, nthreads) pairs to WNEvalPlans struct EvalPlans { pthread_mutex_t mutex; std::map,WNEvalPlan> eval_plans; }; static EvalPlans precomp_eps; void sort_precompute(uint32_t N) { uint32_t *random_permutation = new uint32_t[N]; if (!random_permutation) { printf("Allocating memory failed in sort_precompute\n"); } assert(random_permutation); for (uint32_t i=0;i idx = {N, nthreads}; pthread_mutex_lock(&precomp_eps.mutex); if (!precomp_eps.eval_plans.count(idx)) { precomp_eps.eval_plans.try_emplace(idx, N, nthreads); } pthread_mutex_unlock(&precomp_eps.mutex); } // Perform the sort using up to nthreads threads. The items to sort are // byte arrays of size msg_size. The key is the first 4 bytes of each // item. void sort_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size, uint32_t Nr, uint32_t Na, // the arguments to the callback are nthreads, items, the sorted // indices, and the number of non-padding items std::function cb) { // Find the smallest Nw for which we have a precomputed // WaksmanNetwork with Nr <= Nw <= Na pthread_mutex_lock(&precomp_wns.mutex); std::optional wn; uint32_t Nw; for (auto& N : precomp_wns.sized_wns) { if (N.first > Na) { printf("No precomputed WaksmanNetworks of size at most %u\n", Na); assert(false); } if (N.first < Nr) { continue; } // We're in the right range, but see if we have an actual // precomputed WaksmanNetwork pthread_mutex_lock(&N.second.mutex); if (N.second.wns.size() == 0) { pthread_mutex_unlock(&N.second.mutex); continue; } wn = std::move(N.second.wns.front()); N.second.wns.pop_front(); Nw = N.first; pthread_mutex_unlock(&N.second.mutex); break; } pthread_mutex_unlock(&precomp_wns.mutex); if (!wn) { printf("No precomputed WaksmanNetwork of size range [%u,%u] found.\n", Nr, Na); assert(wn); } std::pair epidx = {Nw, nthreads}; pthread_mutex_lock(&precomp_eps.mutex); if (!precomp_eps.eval_plans.count(epidx)) { printf("No precomputed WNEvalPlan with N=%u, nthreads=%hu\n", Nw, nthreads); assert(false); } const WNEvalPlan &eval_plan = precomp_eps.eval_plans.at(epidx); pthread_mutex_unlock(&precomp_eps.mutex); // Mark Nw-Nr items as padding (Nr, Na, and Nw are _not_ private) for (uint32_t i=Nr; i( items, msg_size, eval_plan); // Create the indices uint64_t *idx = new uint64_t[Nr]; uint64_t *nextidx = idx; for (uint32_t i=0; i(idx, Nr, backingidx, nthreads); uint64_t *sortedidx = whichbuf ? backingidx : idx; for (uint32_t i=0; i