libMesh
parallel_sort.C
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2017 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 // System Includes
20 #include <algorithm>
21 #include <iostream>
22 
23 // Local Includes
24 #include "libmesh/libmesh_common.h"
25 #include "libmesh/parallel.h"
26 #include "libmesh/parallel_hilbert.h"
27 #include "libmesh/parallel_sort.h"
28 #include "libmesh/parallel_bin_sorter.h"
29 
30 namespace libMesh
31 {
32 
33 
34 namespace Parallel {
35 
36 // The Constructor sorts the local data using
37 // std::sort(). Therefore, the construction of
38 // a Parallel::Sort object takes O(n log n) time,
39 // where n is the length of _data.
40 template <typename KeyType, typename IdxType>
42  std::vector<KeyType> & d) :
43  ParallelObject(comm_in),
44  _n_procs(cast_int<processor_id_type>(comm_in.size())),
45  _proc_id(cast_int<processor_id_type>(comm_in.rank())),
46  _bin_is_sorted(false),
47  _data(d)
48 {
49  std::sort(_data.begin(), _data.end());
50 
51  // Allocate storage
52  _local_bin_sizes.resize(_n_procs);
53 }
54 
55 
56 
57 template <typename KeyType, typename IdxType>
59 {
60  // Find the global data size. The sorting
61  // algorithms assume they have a range to
62  // work with, so catch the degenerate cases here
63  IdxType global_data_size = cast_int<IdxType>(_data.size());
64 
65  this->comm().sum (global_data_size);
66 
67  if (global_data_size < 2)
68  {
69  // the entire global range is either empty
70  // or contains only one element
71  _my_bin = _data;
72 
73  this->comm().allgather (static_cast<IdxType>(_my_bin.size()),
75  }
76  else
77  {
78  if (this->n_processors() > 1)
79  {
80  this->binsort();
81  this->communicate_bins();
82  }
83  else
84  _my_bin = _data;
85 
86  this->sort_local_bin();
87  }
88 
89  // Set sorted flag to true
90  _bin_is_sorted = true;
91 }
92 
93 
94 
95 template <typename KeyType, typename IdxType>
97 {
98  // Find the global min and max from all the
99  // processors.
100  std::vector<KeyType> global_min_max(2);
101 
102  // Insert the local min and max for this processor
103  global_min_max[0] = -_data.front();
104  global_min_max[1] = _data.back();
105 
106  // Communicate to determine the global
107  // min and max for all processors.
108  this->comm().max(global_min_max);
109 
110  // Multiply the min by -1 to obtain the true min
111  global_min_max[0] *= -1;
112 
113  // Bin-Sort based on the global min and max
115  bs.binsort(_n_procs, global_min_max[1], global_min_max[0]);
116 
117  // Now save the local bin sizes in a vector so
118  // we don't have to keep around the BinSorter.
119  for (processor_id_type i=0; i<_n_procs; ++i)
120  _local_bin_sizes[i] = bs.sizeof_bin(i);
121 }
122 
123 
124 
125 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
126 // Full specialization for HilbertIndices, there is a fair amount of
127 // code duplication here that could potentially be consolidated with the
128 // above method
129 template <>
131 {
132  // Find the global min and max from all the
133  // processors. Do this using MPI_Allreduce.
135  local_min, local_max,
136  global_min, global_max;
137 
138  if (_data.empty())
139  {
140 #ifdef LIBMESH_ENABLE_UNIQUE_ID
141  local_min.first.rack0 = local_min.first.rack1 = local_min.first.rack2 = static_cast<Hilbert::inttype>(-1);
142  local_min.second = std::numeric_limits<unique_id_type>::max();
143  local_max.first.rack0 = local_max.first.rack1 = local_max.first.rack2 = 0;
144  local_max.second = 0;
145 #else
146  local_min.rack0 = local_min.rack1 = local_min.rack2 = static_cast<Hilbert::inttype>(-1);
147  local_max.rack0 = local_max.rack1 = local_max.rack2 = 0;
148 #endif
149  }
150  else
151  {
152  local_min = _data.front();
153  local_max = _data.back();
154  }
155 
156  MPI_Op hilbert_max, hilbert_min;
157 
158  MPI_Op_create ((MPI_User_function*)dofobjectkey_max_op, true, &hilbert_max);
159  MPI_Op_create ((MPI_User_function*)dofobjectkey_min_op, true, &hilbert_min);
160 
161  // Communicate to determine the global
162  // min and max for all processors.
163  MPI_Allreduce(&local_min,
164  &global_min,
165  1,
167  hilbert_min,
168  this->comm().get());
169 
170  MPI_Allreduce(&local_max,
171  &global_max,
172  1,
174  hilbert_max,
175  this->comm().get());
176 
177  MPI_Op_free (&hilbert_max);
178  MPI_Op_free (&hilbert_min);
179 
180  // Bin-Sort based on the global min and max
182  bs.binsort(_n_procs, global_max, global_min);
183 
184  // Now save the local bin sizes in a vector so
185  // we don't have to keep around the BinSorter.
186  for (processor_id_type i=0; i<_n_procs; ++i)
187  _local_bin_sizes[i] = bs.sizeof_bin(i);
188 }
189 
190 #endif // #ifdef LIBMESH_HAVE_LIBHILBERT
191 
192 
193 template <typename KeyType, typename IdxType>
195 {
196 #ifdef LIBMESH_HAVE_MPI
197  // Create storage for the global bin sizes. This
198  // is the number of keys which will be held in
199  // each bin over all processors.
200  std::vector<IdxType> global_bin_sizes = _local_bin_sizes;
201 
202  // Sum to find the total number of entries in each bin.
203  this->comm().sum(global_bin_sizes);
204 
205  // Create a vector to temporarily hold the results of MPI_Gatherv
206  // calls. The vector dest may be saved away to _my_bin depending on which
207  // processor is being MPI_Gatherv'd.
208  std::vector<KeyType> dest;
209 
210  IdxType local_offset = 0;
211 
212  for (processor_id_type i=0; i<_n_procs; ++i)
213  {
214  // Vector to receive the total bin size for each
215  // processor. Processor i's bin size will be
216  // held in proc_bin_size[i]
217  std::vector<int> proc_bin_size;
218 
219  // Find the number of contributions coming from each
220  // processor for this bin. Note: allgather combines
221  // the MPI_Gather and MPI_Bcast operations into one.
222  this->comm().allgather(static_cast<int>(_local_bin_sizes[i]),
223  proc_bin_size);
224 
225  // Compute the offsets into my_bin for each processor's
226  // portion of the bin. These are basically partial sums
227  // of the proc_bin_size vector.
228  std::vector<int> displacements(_n_procs);
229  for (processor_id_type j=1; j<_n_procs; ++j)
230  displacements[j] = proc_bin_size[j-1] + displacements[j-1];
231 
232  // Resize the destination buffer
233  dest.resize (global_bin_sizes[i]);
234 
235  // Points to the beginning of the bin to be sent
236  void * sendbuf = (_data.size() > local_offset) ? &_data[local_offset] : libmesh_nullptr;
237 
238  // Enough storage to hold all bin contributions
239  void * recvbuf = (dest.empty()) ? libmesh_nullptr : &dest[0];
240 
241  // If the sendbuf is NULL, make sure we aren't claiming to send something.
242  if (sendbuf == libmesh_nullptr && _local_bin_sizes[i] != 0)
243  libmesh_error_msg("Error: invalid MPI_Gatherv call constructed!");
244 
245  KeyType example;
246 
247  MPI_Gatherv(sendbuf,
248  _local_bin_sizes[i], // How much data is in the bin being sent.
249  Parallel::StandardType<KeyType>(&example), // The data type we are sorting
250  recvbuf,
251  &proc_bin_size[0], // How much is to be received from each processor
252  &displacements[0], // Offsets into the receive buffer
253  Parallel::StandardType<KeyType>(&example), // The data type we are sorting
254  i, // The root process (we do this once for each proc)
255  this->comm().get());
256 
257  // Copy the destination buffer if it
258  // corresponds to the bin for this processor
259  if (i == _proc_id)
260  _my_bin = dest;
261 
262  // Increment the local offset counter
263  local_offset += _local_bin_sizes[i];
264  }
265 #endif // LIBMESH_HAVE_MPI
266 }
267 
268 
269 
270 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
271 // Full specialization for HilbertIndices, there is a fair amount of
272 // code duplication here that could potentially be consolidated with the
273 // above method
274 template <>
276 {
277  // Create storage for the global bin sizes. This
278  // is the number of keys which will be held in
279  // each bin over all processors.
280  std::vector<unsigned int> global_bin_sizes(_n_procs);
281 
282  libmesh_assert_equal_to (_local_bin_sizes.size(), global_bin_sizes.size());
283 
284  // Sum to find the total number of entries in each bin.
285  // This is stored in global_bin_sizes. Note, we
286  // explicitly know that we are communicating MPI_UNSIGNED's here.
287  MPI_Allreduce(&_local_bin_sizes[0],
288  &global_bin_sizes[0],
289  _n_procs,
290  MPI_UNSIGNED,
291  MPI_SUM,
292  this->comm().get());
293 
294  // Create a vector to temporarily hold the results of MPI_Gatherv
295  // calls. The vector dest may be saved away to _my_bin depending on which
296  // processor is being MPI_Gatherv'd.
297  std::vector<Parallel::DofObjectKey> dest;
298 
299  unsigned int local_offset = 0;
300 
301  for (unsigned int i=0; i<_n_procs; ++i)
302  {
303  // Vector to receive the total bin size for each
304  // processor. Processor i's bin size will be
305  // held in proc_bin_size[i]
306  std::vector<int> proc_bin_size(_n_procs);
307 
308  // Find the number of contributions coming from each
309  // processor for this bin. Note: Allgather combines
310  // the MPI_Gather and MPI_Bcast operations into one.
311  // Note: Here again we know that we are communicating
312  // MPI_UNSIGNED's so there is no need to check the MPI_traits.
313  MPI_Allgather(&_local_bin_sizes[i], // Source: # of entries on this proc in bin i
314  1, // Number of items to gather
315  MPI_UNSIGNED,
316  &proc_bin_size[0], // Destination: Total # of entries in bin i
317  1,
318  MPI_INT,
319  this->comm().get());
320 
321  // Compute the offsets into my_bin for each processor's
322  // portion of the bin. These are basically partial sums
323  // of the proc_bin_size vector.
324  std::vector<int> displacements(_n_procs);
325  for (unsigned int j=1; j<_n_procs; ++j)
326  displacements[j] = proc_bin_size[j-1] + displacements[j-1];
327 
328  // Resize the destination buffer
329  dest.resize (global_bin_sizes[i]);
330 
331  // Points to the beginning of the bin to be sent
332  void * sendbuf = (_data.size() > local_offset) ? &_data[local_offset] : libmesh_nullptr;
333 
334  // Enough storage to hold all bin contributions
335  void * recvbuf = (dest.empty()) ? libmesh_nullptr : &dest[0];
336 
337  // If the sendbuf is NULL, make sure we aren't claiming to send something.
338  if (sendbuf == libmesh_nullptr && _local_bin_sizes[i] != 0)
339  libmesh_error_msg("Error: invalid MPI_Gatherv call constructed!");
340 
341  Parallel::DofObjectKey example;
342 
343  MPI_Gatherv(sendbuf,
344  _local_bin_sizes[i], // How much data is in the bin being sent.
345  Parallel::StandardType<Parallel::DofObjectKey>(&example), // The data type we are sorting
346  recvbuf,
347  &proc_bin_size[0], // How much is to be received from each processor
348  &displacements[0], // Offsets into the receive buffer
349  Parallel::StandardType<Parallel::DofObjectKey>(&example), // The data type we are sorting
350  i, // The root process (we do this once for each proc)
351  this->comm().get());
352 
353  // Copy the destination buffer if it
354  // corresponds to the bin for this processor
355  if (i == _proc_id)
356  _my_bin = dest;
357 
358  // Increment the local offset counter
359  local_offset += _local_bin_sizes[i];
360  }
361 }
362 
363 #endif // #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
364 
365 
366 
367 template <typename KeyType, typename IdxType>
369 {
370  std::sort(_my_bin.begin(), _my_bin.end());
371 }
372 
373 
374 
375 template <typename KeyType, typename IdxType>
376 const std::vector<KeyType> & Sort<KeyType,IdxType>::bin()
377 {
378  if (!_bin_is_sorted)
379  {
380  libMesh::out << "Warning! Bin is not yet sorted!" << std::endl;
381  }
382 
383  return _my_bin;
384 }
385 
386 }
387 
388 
389 
390 // Explicitly instantiate for int, double
391 template class Parallel::Sort<int, unsigned int>;
393 #if defined(LIBMESH_HAVE_LIBHILBERT) && defined(LIBMESH_HAVE_MPI)
395 #endif
396 
397 } // namespace libMesh
Encapsulates the MPI_Comm object.
Definition: parallel.h:657
const std::vector< KeyType > & bin()
Return a constant reference to _my_bin.
const processor_id_type _proc_id
The identity of this processor.
Definition: parallel_sort.h:93
void max(T &r) const
Take a local variable and replace it with the maximum of it&#39;s values on all processors.
void dofobjectkey_min_op(libMesh::Parallel::DofObjectKey *in, libMesh::Parallel::DofObjectKey *inout, int *len, void *)
std::pair< Hilbert::HilbertIndices, unique_id_type > DofObjectKey
void sort()
This is the only method which needs to be called by the user.
Definition: parallel_sort.C:58
processor_id_type n_processors() const
uint8_t processor_id_type
Definition: id_types.h:99
const class libmesh_nullptr_t libmesh_nullptr
std::vector< KeyType > & _data
The raw, unsorted data which will need to be sorted (in parallel) across all processors.
std::vector< IdxType > _local_bin_sizes
Vector which holds the size of each bin on this processor.
The libMesh namespace provides an interface to certain functionality in the library.
long double max(long double a, double b)
Tnew cast_int(Told oldvar)
void dofobjectkey_max_op(libMesh::Parallel::DofObjectKey *in, libMesh::Parallel::DofObjectKey *inout, int *len, void *)
void sort_local_bin()
After all the bins have been communicated, we can sort our local bin.
void communicate_bins()
Communicates the bins from each processor to the appropriate processor.
const processor_id_type _n_procs
The number of processors to work with.
Definition: parallel_sort.h:88
Perform a parallel sort using a bin-sort method.
Templated class to provide the appropriate MPI datatype for use with built-in C types or simple C++ c...
Definition: parallel.h:380
std::vector< KeyType > _my_bin
The bin which will eventually be held by this processor.
This class forms the base class for all other classes that are expected to be implemented in parallel...
The parallel sorting method is templated on the type of data which is to be sorted.
Definition: parallel_sort.h:53
const Parallel::Communicator & comm() const
OStreamProxy out
IdxType sizeof_bin(const IdxType bin) const
Sort(const Parallel::Communicator &comm, std::vector< KeyType > &d)
Constructor takes the number of processors, the processor id, and a reference to a vector of data to ...
Definition: parallel_sort.C:41
void sum(T &r) const
Take a local variable and replace it with the sum of it&#39;s values on all processors.
bool _bin_is_sorted
Flag which lets you know if sorting is complete.
Definition: parallel_sort.h:98
void binsort()
Sorts the local data into bins across all processors.
Definition: parallel_sort.C:96
void binsort(const IdxType nbins, KeyType max, KeyType min)
The actual function which sorts the data into nbins.
void allgather(const T &send, std::vector< T > &recv) const
Take a vector of length this->size(), and fill in recv[processor_id] = the value of send on that proc...