|
@@ -64,9 +64,12 @@ int compare_keys<NidPriorityKey>(const void* a, const void* b)
|
|
return oselect_uint32_t(-1, 1, alarge);
|
|
return oselect_uint32_t(-1, 1, alarge);
|
|
}
|
|
}
|
|
|
|
|
|
-// Perform the sort using up to nthreads threads. The items to sort are
|
|
|
|
-// byte arrays of size msg_size. The key is the 10-bit storage server
|
|
|
|
-// id concatenated with the 22-bit uid at the storage server.
|
|
|
|
|
|
+// Sort Nr items at the beginning of an allocated array of Na items
|
|
|
|
+// using up to nthreads threads. The items to sort are byte arrays of
|
|
|
|
+// size msg_size. The keys are of type T. T must have set_key<T> and
|
|
|
|
+// compare_keys<T> defined. The items will be shuffled in-place, and a
|
|
|
|
+// sorted array of keys will be passed to the provided callback
|
|
|
|
+// function.
|
|
template<typename T>
|
|
template<typename T>
|
|
void sort_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
|
|
void sort_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
|
|
uint32_t Nr, uint32_t Na,
|
|
uint32_t Nr, uint32_t Na,
|
|
@@ -102,3 +105,86 @@ void sort_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
|
|
delete[] backingidx;
|
|
delete[] backingidx;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+template <typename T>
|
|
|
|
+struct move_sorted_args {
|
|
|
|
+ const T* sorted_keys;
|
|
|
|
+ const uint8_t *items;
|
|
|
|
+ uint8_t *destbuf;
|
|
|
|
+ uint32_t start, num;
|
|
|
|
+ uint16_t msg_size;
|
|
|
|
+};
|
|
|
|
+
|
|
|
|
+template <typename T>
|
|
|
|
+static void *move_sorted(void *voidargs)
|
|
|
|
+{
|
|
|
|
+ const move_sorted_args<T> *args =
|
|
|
|
+ (move_sorted_args<T> *)voidargs;
|
|
|
|
+ uint16_t msg_size = args->msg_size;
|
|
|
|
+ uint32_t start = args->start;
|
|
|
|
+ uint32_t end = start + args->num;
|
|
|
|
+ const T *sorted_keys = args->sorted_keys;
|
|
|
|
+ const uint8_t *items = args->items;
|
|
|
|
+ uint8_t *destbuf = args->destbuf;
|
|
|
|
+ for (uint32_t i=start; i<end; ++i) {
|
|
|
|
+ memmove(destbuf + i * msg_size,
|
|
|
|
+ items + (sorted_keys[i].index()) * msg_size,
|
|
|
|
+ msg_size);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// As above, but the first Nr msg_size-byte entries in the items array
|
|
|
|
+// will end up with the sorted values. Note: entries beyond Nr may also
|
|
|
|
+// change, but you should not even look at those values! This calls the
|
|
|
|
+// above function, and then copies the data in sorted order into a
|
|
|
|
+// temporary buffer, then copies that buffer back into the items array,
|
|
|
|
+// so it's less efficient, both in memory and CPU, than the above
|
|
|
|
+// function.
|
|
|
|
+template<typename T>
|
|
|
|
+void sort_mtobliv(threadid_t nthreads, uint8_t* items, uint16_t msg_size,
|
|
|
|
+ uint32_t Nr, uint32_t Na)
|
|
|
|
+{
|
|
|
|
+ sort_mtobliv<T>(nthreads, items, msg_size, Nr, Na, [nthreads, msg_size]
|
|
|
|
+ (const uint8_t* origitems, const T* keys, uint32_t Nr) {
|
|
|
|
+ // A temporary buffer into which to copy the items in sorted
|
|
|
|
+ // order
|
|
|
|
+ uint8_t *tempbuf = new uint8_t[Nr * msg_size];
|
|
|
|
+
|
|
|
|
+ // Special-case nthreads=1 for efficiency
|
|
|
|
+ if (nthreads <= 1) {
|
|
|
|
+ move_sorted_args<T> args = {
|
|
|
|
+ keys, origitems, tempbuf, 0, Nr, msg_size
|
|
|
|
+ };
|
|
|
|
+ move_sorted<T>(&args);
|
|
|
|
+ } else {
|
|
|
|
+ move_sorted_args<T> args[nthreads];
|
|
|
|
+ uint32_t inc = Nr / nthreads;
|
|
|
|
+ uint32_t extra = Nr % nthreads;
|
|
|
|
+ uint32_t last = 0;
|
|
|
|
+ for (threadid_t i=0; i<nthreads; ++i) {
|
|
|
|
+ uint32_t num = inc + (i < extra);
|
|
|
|
+ args[i] = {
|
|
|
|
+ keys, origitems, tempbuf, last, num, msg_size
|
|
|
|
+ };
|
|
|
|
+ last += num;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Launch all but the first section into other threads
|
|
|
|
+ for (threadid_t i=1; i<nthreads; ++i) {
|
|
|
|
+ threadpool_dispatch(g_thread_id+i,
|
|
|
|
+ move_sorted<T>, args+i);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Do the first section ourselves
|
|
|
|
+ move_sorted<T>(args);
|
|
|
|
+
|
|
|
|
+ // Join the threads
|
|
|
|
+ for (threadid_t i=1; i<nthreads; ++i) {
|
|
|
|
+ threadpool_join(g_thread_id+i, NULL);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Copy the temporary buffer back to items
|
|
|
|
+ memmove(origitems, tempbuf, Nr * msg_size);
|
|
|
|
+ delete[] tempbuf;
|
|
|
|
+ });
|
|
|
|
+}
|