[gdbsupport] Add task size parameter in parallel_for_each
authorTom de Vries <tdevries@suse.de>
Fri, 5 Aug 2022 14:12:56 +0000 (16:12 +0200)
committerTom de Vries <tdevries@suse.de>
Fri, 5 Aug 2022 14:12:56 +0000 (16:12 +0200)
Add a task_size parameter to parallel_for_each, defaulting to nullptr, and use
the task size to distribute similarly-sized chunks to the threads.

Tested on x86_64-linux.

gdb/unittests/parallel-for-selftests.c
gdbsupport/parallel-for.h

index 8a86b435fd347e7039767585676a2960bd84b9e5..6e341f64037f19af13cfb490ac5a0a0325a3db60 100644 (file)
@@ -68,6 +68,34 @@ test (int n_threads)
                          });
   SELF_CHECK (counter == 0);
 
+  auto task_size_max_ = [] (int iter)
+    {
+      return (size_t)SIZE_MAX;
+    };
+  auto task_size_max = gdb::make_function_view (task_size_max_);
+
+  counter = 0;
+  gdb::parallel_for_each (1, 0, NUMBER,
+                         [&] (int start, int end)
+                         {
+                           counter += end - start;
+                         }, task_size_max);
+  SELF_CHECK (counter == NUMBER);
+
+  auto task_size_one_ = [] (int iter)
+    {
+      return (size_t)1;
+    };
+  auto task_size_one = gdb::make_function_view (task_size_one_);
+
+  counter = 0;
+  gdb::parallel_for_each (1, 0, NUMBER,
+                         [&] (int start, int end)
+                         {
+                           counter += end - start;
+                         }, task_size_one);
+  SELF_CHECK (counter == NUMBER);
+
 #undef NUMBER
 }
 
index 0037ee23ff32256b172c4c114e32e5f330eedd99..4cd1dbf847e9e7040b1d3fd9999c89b8799cce0c 100644 (file)
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <type_traits>
 #include "gdbsupport/thread-pool.h"
+#include "gdbsupport/function-view.h"
 
 namespace gdb
 {
@@ -134,7 +135,8 @@ typename gdb::detail::par_for_accumulator<
     typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type
   >::result_type
 parallel_for_each (unsigned n, RandomIt first, RandomIt last,
-                  RangeFunction callback)
+                  RangeFunction callback,
+                  gdb::function_view<size_t(RandomIt)> task_size = nullptr)
 {
   using result_type
     = typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type;
@@ -148,17 +150,41 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
   size_t n_elements = last - first;
   size_t elts_per_thread = 0;
   size_t elts_left_over = 0;
+  size_t total_size = 0;
+  size_t size_per_thread = 0;
+  size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements;
 
   if (n_threads > 1)
     {
-      /* Require that there should be at least N elements in a
-        thread.  */
-      gdb_assert (n > 0);
-      if (n_elements / n_threads < n)
-       n_threads = std::max (n_elements / n, (size_t) 1);
-      elts_per_thread = n_elements / n_threads;
-      elts_left_over = n_elements % n_threads;
-      /* n_elements == n_threads * elts_per_thread + elts_left_over. */
+      if (task_size != nullptr)
+       {
+         gdb_assert (n == 1);
+         for (RandomIt i = first; i != last; ++i)
+           {
+             size_t element_size = task_size (i);
+             gdb_assert (element_size > 0);
+             if (element_size > max_element_size)
+               /* We could start scaling here, but that doesn't seem to be
+                  worth the effort.  */
+               element_size = max_element_size;
+             size_t prev_total_size = total_size;
+             total_size += element_size;
+             /* Check for overflow.  */
+             gdb_assert (prev_total_size < total_size);
+           }
+         size_per_thread = total_size / n_threads;
+       }
+      else
+       {
+         /* Require that there should be at least N elements in a
+            thread.  */
+         gdb_assert (n > 0);
+         if (n_elements / n_threads < n)
+           n_threads = std::max (n_elements / n, (size_t) 1);
+         elts_per_thread = n_elements / n_threads;
+         elts_left_over = n_elements % n_threads;
+         /* n_elements == n_threads * elts_per_thread + elts_left_over. */
+       }
     }
 
   size_t count = n_threads == 0 ? 0 : n_threads - 1;
@@ -167,20 +193,52 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
   if (parallel_for_each_debug)
     {
       debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements);
-      debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
-      debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
+      if (task_size != nullptr)
+       {
+         debug_printf (_("Parallel for: total_size: %zu\n"), total_size);
+         debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread);
+       }
+      else
+       {
+         debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
+         debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
+       }
     }
 
+  size_t remaining_size = total_size;
   for (int i = 0; i < count; ++i)
     {
-      RandomIt end = first + elts_per_thread;
-      if (i < elts_left_over)
-       /* Distribute the leftovers over the worker threads, to avoid having
-          to handle all of them in a single thread.  */
-       end++;
+      RandomIt end;
+      size_t chunk_size = 0;
+      if (task_size == nullptr)
+       {
+         end = first + elts_per_thread;
+         if (i < elts_left_over)
+           /* Distribute the leftovers over the worker threads, to avoid having
+              to handle all of them in a single thread.  */
+           end++;
+       }
+      else
+       {
+         RandomIt j;
+         for (j = first; j < last && chunk_size < size_per_thread; ++j)
+           {
+             size_t element_size = task_size (j);
+             if (element_size > max_element_size)
+               element_size = max_element_size;
+             chunk_size += element_size;
+           }
+         end = j;
+         remaining_size -= chunk_size;
+       }
       if (parallel_for_each_debug)
-       debug_printf (_("Parallel for: elements on worker thread %i\t: %zu\n"),
-                     i, (size_t)(end - first));
+       {
+         debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
+                       i, (size_t)(end - first));
+         if (task_size != nullptr)
+           debug_printf (_("\t(size: %zu)"), chunk_size);
+         debug_printf (_("\n"));
+       }
       results.post (i, [=] ()
         {
          return callback (first, end);
@@ -190,12 +248,22 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
 
   for (int i = count; i < n_worker_threads; ++i)
     if (parallel_for_each_debug)
-      debug_printf (_("Parallel for: elements on worker thread %i\t: 0\n"), i);
+      {
+       debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i);
+       if (task_size != nullptr)
+         debug_printf (_("\t(size: 0)"));
+       debug_printf (_("\n"));
+      }
 
   /* Process all the remaining elements in the main thread.  */
   if (parallel_for_each_debug)
-    debug_printf (_("Parallel for: elements on main thread\t\t: %zu\n"),
-                 (size_t)(last - first));
+    {
+      debug_printf (_("Parallel for: elements on main thread\t\t: %zu"),
+                   (size_t)(last - first));
+      if (task_size != nullptr)
+       debug_printf (_("\t(size: %zu)"), remaining_size);
+      debug_printf (_("\n"));
+    }
   return results.finish ([=] ()
     {
       return callback (first, last);
@@ -211,7 +279,8 @@ typename gdb::detail::par_for_accumulator<
     typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type
   >::result_type
 sequential_for_each (unsigned n, RandomIt first, RandomIt last,
-                  RangeFunction callback)
+                    RangeFunction callback,
+                    gdb::function_view<size_t(RandomIt)> task_size = nullptr)
 {
   using result_type
     = typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type;