Loading...
Searching...
No Matches
UPstreamReduce.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2022-2025 OpenCFD Ltd.
9-------------------------------------------------------------------------------
10License
11 This file is part of OpenFOAM.
12
13 OpenFOAM is free software: you can redistribute it and/or modify it
14 under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
17
18 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25
26\*---------------------------------------------------------------------------*/
27
28#include "Pstream.H"
29#include "PstreamReduceOps.H"
30#include "UPstreamWrapping.H"
31
32#include <cinttypes>
33
34// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
35
36// Special reductions for bool
37
38void Foam::UPstream::reduceAnd(bool& value, const int communicator)
39{
40 PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LAND, communicator);
41}
42
43
44void Foam::UPstream::reduceOr(bool& value, const int communicator)
45{
46 PstreamDetail::allReduce(&value, 1, MPI_C_BOOL, MPI_LOR, communicator);
47}
48
49
50void Foam::reduce
51(
52 bool& value,
53 Foam::andOp<bool>,
54 const int tag, /* (unused) */
55 const int communicator
56)
57{
58 UPstream::reduceAnd(value, communicator);
59}
60
61
62void Foam::reduce
63(
64 bool& value,
65 Foam::orOp<bool>,
66 const int tag, /* (unused) */
67 const int communicator
69{
70 UPstream::reduceOr(value, communicator);
71}
72
73// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
74
75static inline bool is_basic_dataType(Foam::UPstream::dataTypes id) noexcept
76{
77 return
78 (
81 );
82}
83
84static inline bool is_reduce_opCode(Foam::UPstream::opCodes id) noexcept
85{
86 return
87 (
90 );
91}
92
93
94namespace
95{
96
97using namespace Foam;
98
99// Local function to print some error information
100void printErrorMessage
101(
102 const void* values,
103 const UPstream::dataTypes datatype_id,
104 const UPstream::opCodes opcode_id
105)
106{
108 << "Bad input for reduce(): likely a programming problem\n";
109
110 if (!is_basic_dataType(datatype_id))
111 {
112 FatalError<< " Non-basic data tyoe (" << int(datatype_id) << ")\n";
113 }
114
115 if (!is_reduce_opCode(opcode_id))
116 {
117 FatalError<< " Invalid reduce op (" << int(opcode_id) << ")\n";
118 }
119
120 if (values == nullptr)
121 {
122 FatalError<< " nullptr for values\n";
123 }
125}
126
127} // End anonymous namespace
128
129
130// * * * * * * * * * * Protected Static Member Functions * * * * * * * * * * //
131
132// The intel-mpi version of MPI_Reduce() does not accept IN_PLACE
133// operations (issue #3331)
134//
135// The open-mpi version (tested up to 4.1) accepts IN_PLACE but fails
136// with an MPI_ARG_ERR message.
137//
138// Do not assume that anyone actually supports this!
139
140#undef Foam_vendor_supports_INPLACE_REDUCE
141
143(
144 void* values, // Type checking done by caller
145 int count,
146 const UPstream::dataTypes dataTypeId, // Proper type passed by caller
147 const UPstream::opCodes opCodeId, // Proper code passed by caller
148 const int communicator, // Index into MPICommunicators_
150)
151{
152 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
153 MPI_Op optype = PstreamGlobals::getOpCode(opCodeId);
154
155 if (!count || !UPstream::is_parallel(communicator))
156 {
157 // Nothing to do - ignore
158 return;
159 }
160 if
161 (
163 (
164 !is_basic_dataType(dataTypeId)
165 || !is_reduce_opCode(opCodeId)
166 || (values == nullptr)
167 )
168 )
169 {
171 printErrorMessage(values, dataTypeId, opCodeId);
173 }
174
175 const bool withTopo =
176 (
177 (req == nullptr)
179 && UPstream::usingNodeComms(communicator)
180 );
181
182 if (FOAM_UNLIKELY(UPstream::debug))
183 {
184 Perr<< "[mpi_reduce] : (inplace)"
185 << " op:" << int(opCodeId)
186 << " type:" << int(dataTypeId) << " count:" << count
187 << " comm:" << communicator
188 << " topo:" << withTopo << Foam::endl;
189 // error::printStack(Perr);
190 }
191
192 // Workaround for missing/broken in-place handling.
193 // Use a local buffer to send the data from.
194 // - probably not thread-safe
195
196 #ifndef Foam_vendor_supports_INPLACE_REDUCE
197 static std::unique_ptr<char[]> work;
198 static int work_len(0);
199
200 const int num_bytes = [=](int n)
201 {
202 int size = 1;
203 MPI_Type_size(datatype, &size);
204 return (size * n);
205 }(count);
206
207 if (work_len < num_bytes)
208 {
209 // Min length to avoid many initial re-allocations
210 work_len = std::max(256, num_bytes);
211 work.reset();
212 work = std::make_unique<char[]>(work_len);
213 }
214 void* send_buffer = work.get();
215
216 std::memcpy(send_buffer, values, num_bytes);
217 #else
218 void* send_buffer = values; // ie, in-place
219 #endif
220
221 if (withTopo)
222 {
223 // Topological reduce
224
225 // Stage 1: local reduction within a node -> onto the node leader
226 if (UPstream::is_parallel(UPstream::commLocalNode_))
227 {
228 if (FOAM_UNLIKELY(UPstream::debug))
229 {
230 Perr<< "[mpi_reduce] : (inplace)"
231 << " op:" << int(opCodeId)
232 << " type:" << int(dataTypeId) << " count:" << count
233 << " comm:" << UPstream::commLocalNode_
234 << " stage-1" << Foam::endl;
235 }
236
238 (
239 send_buffer,
240 values,
241 count,
242 datatype,
243 optype,
244 UPstream::commLocalNode_
245 );
246 }
247
248 // Stage 2: reduce between node leaders -> world leader
249 if (UPstream::is_parallel(UPstream::commInterNode_))
250 {
251 // Transcribe the previous results as input for this stage
252 #ifndef Foam_vendor_supports_INPLACE_REDUCE
253 std::memcpy(send_buffer, values, num_bytes);
254 #endif
255
256 if (FOAM_UNLIKELY(UPstream::debug))
257 {
258 Perr<< "[mpi_reduce] : (inplace)"
259 << " op:" << int(opCodeId)
260 << " type:" << int(dataTypeId) << " count:" << count
261 << " comm:" << UPstream::commInterNode_
262 << " stage-2" << Foam::endl;
263 }
264
266 (
267 send_buffer,
268 values,
269 count,
270 datatype,
271 optype,
272 UPstream::commInterNode_
273 );
274 }
275 }
276 else
277 {
278 // Regular reduce
279
281 (
282 send_buffer,
283 values,
284 count,
285 datatype,
286 optype,
287 communicator,
288 req
289 );
290 }
291}
292
293
295(
296 void* values, // Type checking done by caller
297 int count,
298 const UPstream::dataTypes dataTypeId, // Proper type passed by caller
299 const UPstream::opCodes opCodeId, // Proper code passed by caller
300 const int communicator, // Index into MPICommunicators_
302)
303{
304 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
305 MPI_Op optype = PstreamGlobals::getOpCode(opCodeId);
306
307 if (!count || !UPstream::is_parallel(communicator))
308 {
309 // Nothing to do - ignore
310 return;
311 }
312 if
313 (
315 (
316 !is_basic_dataType(dataTypeId)
317 || !is_reduce_opCode(opCodeId)
318 || (values == nullptr)
319 )
320 )
321 {
323 printErrorMessage(values, dataTypeId, opCodeId);
325 }
326
327 const bool withTopo =
328 (
329 (req == nullptr)
331 && UPstream::usingNodeComms(communicator)
332 );
333
334 if (FOAM_UNLIKELY(UPstream::debug))
335 {
336 Perr<< "[mpi_allreduce] :"
337 << " op:" << int(opCodeId)
338 << " type:" << int(dataTypeId) << " count:" << count
339 << " comm:" << communicator
340 << " topo:" << withTopo << Foam::endl;
341 }
342
343 if (withTopo)
344 {
345 // Topological allReduce
346
347 // Stage 1: local reduction within a node -> onto the node leader
348 if (UPstream::is_parallel(UPstream::commLocalNode_))
349 {
350 if (FOAM_UNLIKELY(UPstream::debug))
351 {
352 Perr<< "[mpi_allreduce] :"
353 << " op:" << int(opCodeId)
354 << " type:" << int(dataTypeId) << " count:" << count
355 << " comm:" << UPstream::commLocalNode_
356 << " stage-1:reduce" << Foam::endl;
357 }
358
360 (
361 values,
362 count,
363 dataTypeId,
364 opCodeId,
365 UPstream::commLocalNode_
366 );
367 }
368
369 // Stage 2: all-reduce between node leaders
370 if (UPstream::is_parallel(UPstream::commInterNode_))
371 {
372 if (FOAM_UNLIKELY(UPstream::debug))
373 {
374 Perr<< "[mpi_allreduce] :"
375 << " op:" << int(opCodeId)
376 << " type:" << int(dataTypeId) << " count:" << count
377 << " comm:" << UPstream::commInterNode_
378 << " stage-2:allreduce" << Foam::endl;
379 }
380
382 (
383 values,
384 count,
385 datatype,
386 optype,
387 UPstream::commInterNode_
388 );
389 }
390
391 // Finally, broadcast the information from each local node leader
392 if (UPstream::is_parallel(UPstream::commLocalNode_))
393 {
394 if (FOAM_UNLIKELY(UPstream::debug))
395 {
396 Perr<< "[mpi_allreduce] :"
397 << " op:" << int(opCodeId)
398 << " type:" << int(dataTypeId) << " count:" << count
399 << " comm:" << UPstream::commLocalNode_
400 << " stage-3:broadcast" << Foam::endl;
401 }
402
404 (
405 values,
406 count,
407 datatype,
408 UPstream::commLocalNode_
409 );
410 }
411 }
412 else
413 {
414 // Regular allReduce
415
417 (
418 values,
419 count,
420 datatype,
421 optype,
422 communicator,
423 req
424 );
425 }
426}
427
428
429// Based on the experience with MPI_Reduce (see above),
430// do not assume that anyone actually supports the inplace option
431
433(
434 void* values, // Type checking done by caller
435 int count,
436 const UPstream::dataTypes dataTypeId, // Proper type passed by caller
437 const UPstream::opCodes opCodeId, // Proper code passed by caller
438 const int communicator, // Index into MPICommunicators_
439 const bool exclusive // Exclusive scan
440)
441{
442 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
443 MPI_Op optype = PstreamGlobals::getOpCode(opCodeId);
444
445 if (!count || !UPstream::is_parallel(communicator))
446 {
447 // Nothing to do - ignore
448 return;
449 }
450 if
451 (
453 (
454 !is_basic_dataType(dataTypeId)
455 || !is_reduce_opCode(opCodeId)
456 || (values == nullptr)
457 )
458 )
459 {
461 printErrorMessage(values, dataTypeId, opCodeId);
463 }
464
465 if (FOAM_UNLIKELY(UPstream::debug))
466 {
467 Perr<< "[mpi_scan_reduce] : (inplace)"
468 << " op:" << int(opCodeId)
469 << " type:" << int(dataTypeId) << " count:" << count
470 << " comm:" << communicator
471 << " excl:" << exclusive << Foam::endl;
472 // error::printStack(Perr);
473 }
474
475 // Avoid the possibility of missing/broken in-place handling.
476 // Use a local buffer to send the data from.
477 // - probably not thread-safe
478
479 static std::unique_ptr<char[]> work;
480 static int work_len(0);
481
482 const int num_bytes = [=](int n)
483 {
484 int size = 1;
485 MPI_Type_size(datatype, &size);
486 return (size * n);
487 }(count);
488
489 if (work_len < num_bytes)
490 {
491 // Min length to avoid many initial re-allocations
492 work_len = std::max(256, num_bytes);
493 work.reset();
494 work = std::make_unique<char[]>(work_len);
495 }
496 void* send_buffer = work.get();
497
498 std::memcpy(send_buffer, values, num_bytes);
499
501 (
502 send_buffer,
503 values,
504 count,
505 datatype,
506 optype,
507 communicator,
508 exclusive
509 );
510
511 // In exclusive mode, the value for rank=0 is degenerate and the
512 // standard doesn't say which value (if any) it will have.
513 //
514 // Overwrite with the original value so we are certain to have
515 // a valid but perhaps meaningless value.
516
517 if (exclusive && UPstream::master(communicator))
518 {
519 std::memcpy(values, send_buffer, num_bytes);
520 }
521}
522
523
524// ************************************************************************* //
Inter-processor communication reduction functions.
label n
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi....
Definition UPstream.H:2919
Wrapper for internally indexed communicator label. Always invokes UPstream::allocateCommunicatorCompo...
Definition UPstream.H:2546
static void mpi_reduce(void *values, int count, const UPstream::dataTypes dataTypeId, const UPstream::opCodes opCodeId, const int communicator, UPstream::Request *req=nullptr)
In-place reduction of values with result on rank 0.
opCodes
Mapping of some MPI op codes.
Definition UPstream.H:149
@ Basic_end
(internal use) end marker [reduce types]
Definition UPstream.H:168
@ Basic_begin
(internal use) begin marker [reduce/window types]
Definition UPstream.H:153
static void reduceAnd(bool &value, const int communicator=worldComm)
Logical (and) reduction (MPI_AllReduce).
static bool usingTopoControl(UPstream::topoControls ctrl) noexcept
Test for selection of given topology-aware routine.
Definition UPstream.H:1014
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition UPstream.H:1714
static void reduceOr(bool &value, const int communicator=worldComm)
Logical (or) reduction (MPI_AllReduce).
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition UPstream.H:1743
static bool usingNodeComms(const int communicator)
True if node topology-aware routines have been enabled, it is running in parallel,...
Definition UPstream.C:751
static void mpi_allreduce(void *values, int count, const UPstream::dataTypes dataTypeId, const UPstream::opCodes opCodeId, const int communicator, UPstream::Request *req=nullptr)
In-place reduction of values with same result on all ranks.
static void mpi_scan_reduce(void *values, int count, const UPstream::dataTypes dataTypeId, const UPstream::opCodes opCodeId, const int communicator, const bool exclusive)
In-place scan/exscan reduction of values.
@ reduce
reduce/all-reduce [MPI]
Definition UPstream.H:190
dataTypes
Mapping of some fundamental and aggregate types to MPI data types.
Definition UPstream.H:107
@ Basic_end
(internal use) end marker [basic types]
Definition UPstream.H:125
@ Basic_begin
(internal use) begin marker [basic/all types]
Definition UPstream.H:112
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
static bool is_reduce_opCode(Foam::UPstream::opCodes id) noexcept
static bool is_basic_dataType(Foam::UPstream::dataTypes id) noexcept
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
Definition BitOps.H:73
List< T > values(const HashTable< T, Key, Hash > &tbl, const bool doSort=false)
List of values from HashTable, optionally sorted.
Definition HashOps.H:164
void reduce(const Type *sendData, Type *values, int count, MPI_Datatype datatype, MPI_Op optype, const int communicator, UPstream::Request *req=nullptr)
void allReduce(Type *values, int count, MPI_Datatype datatype, MPI_Op optype, const int communicator, UPstream::Request *req=nullptr)
void scanReduce(const Type *sendData, Type *recvData, int count, MPI_Datatype datatype, MPI_Op optype, const int communicator, const int exclusive)
bool broadcast(Type *values, int count, MPI_Datatype datatype, const int communicator, const int root=0)
MPI_Datatype getDataType(UPstream::dataTypes id)
Lookup of dataTypes enumeration as an MPI_Datatype.
MPI_Op getOpCode(UPstream::opCodes id)
Lookup of opCodes enumeration as an MPI_Op.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
void reduce(T &value, BinaryOp bop, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm)
Reduce inplace (cf. MPI Allreduce).
errorManip< error > abort(error &err)
Definition errorManip.H:139
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FOAM_UNLIKELY(cond)
Definition stdFoam.H:64