Loading...
Searching...
No Matches
UPstreamReduceOffsets.H
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2025 Mark Olesen
9-------------------------------------------------------------------------------
10License
11 This file is part of OpenFOAM.
12
13 OpenFOAM is free software: you can redistribute it and/or modify it
14 under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
17
18 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25
26Note
27 Creating globally consistent offsets and total uses the following
28 process:
29
30 - apply MPI_Exscan(MPI_SUM) using the rank-local size to produce
31 the offsets. Although MPI_Exscan() does not specify what value
32 rank0 will have, our own mpiExscan_sum() does and assigns 0
33 for rank0.
34 - determine the total size as the offset + local size.
35 This value is only correct on the (nProcs-1) rank.
36 - broad cast the total size from the (nProcs-1) rank to all ranks.
37
38 The reduceOffsets() version uses exactly the same process, except
39 with a work array to bundle/unbundle values and use the same two
40 MPI calls.
41
42\*---------------------------------------------------------------------------*/
43
44#ifndef Foam_UPstreamReduceOffsets_H
45#define Foam_UPstreamReduceOffsets_H
46
47#include "OffsetRange.H"
48#include "UPstream.H"
49#include <array>
50
51// * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
52
53namespace Foam
54{
55namespace PstreamDetail
57
58// Implementation for Foam::reduceOffset
59//
60// Reduction of OffsetRange for a globally consistent offset/total
61// based on the local size
62template<class IntType>
64(
66 const int communicator // The parallel communicator
67)
68{
69 if (UPstream::is_parallel(communicator))
70 {
71 // Exscan (sum) yields the offsets, assigns 0 for rank=0
72 IntType work = range.size();
73 UPstream::mpiExscan_sum(&work, 1, communicator);
74
75 // For the truly paranoid:
76 // if (UPstream::master(communicator)) work = 0;
77
78 range.start() = work; // Copy out offset from work
79 work += range.size(); // Update work as total == (start + size)
80
81 // The rank=(nProcs-1) knows the total - broadcast to others
82 const auto root = (UPstream::nProcs(communicator)-1);
83 UPstream::broadcast(&work, 1, communicator, root);
84
85 range.total() = work;
86 }
87}
88
89
90// Implementation for Foam::reduceOffsets
91//
92// Equivalent to Foam::reduceOffset()
93// but bundles values and performs operations on multiple values,
94// which avoids calling MPI repeatedly
95template
96<
97 class IntType, // Must match OffsetRange::value_type
98 std::size_t... Is,
99 class... OffsetRanges
100>
102(
103 const int communicator, // The parallel communicator
104 std::index_sequence<Is...>, // Indices into items
105 OffsetRanges&... items
106)
107{
108 if (UPstream::is_parallel(communicator))
109 {
110 // Like Foam::reduceOffset()
111 // but handling multiple items at once for lower communication
112
113 // Pack all sizes into the work buffer
114 std::array<IntType, sizeof...(items)> work{ (items.size())... };
115
116 // Exscan (sum) yields the offsets, assigns 0 for rank=0
117 UPstream::mpiExscan_sum(work.data(), work.size(), communicator);
118
119 // For the truly paranoid:
120 // if (UPstream::master(communicator)) work.fill(0);
121
122 // The work buffer now contains the offsets, copy back to starts
123 ((items.start() = work[Is]), ...);
124
125 // The rank=(nProcs-1) knows the total - broadcast to others
126 const auto root = (UPstream::nProcs(communicator)-1);
127 if (root == UPstream::myProcNo(communicator))
128 {
129 // Update work buffer as total == (start + size)
130 ((work[Is] += items.size()), ...);
131 }
132 UPstream::broadcast(work.data(), work.size(), communicator, root);
133
134 // The work buffer now contains the totals, copy back to total
135 ((items.total() = work[Is]), ...);
136 }
137}
138
139} // End namespace PstreamDetail
140} // End namespace Foam
141
142
143// * * * * * * * * * * * * * * * Global Reduction * * * * * * * * * * * * * //
144
145namespace Foam
146{
147
148//- Parallel reduction of OffsetRange (eg, GlobalOffset) on its size value to
149//- yield the globally-consistent offset and the total size across all ranks.
150//
151// \note Only the OffsetRange::size() member is used for this reduction.
152// It is assumed that the combined results will always start with an offset
153// of zero on the first rank, and that ranges represent contiguous addressing
154// across all ranks.
155template<class IntType>
156void reduceOffset
157(
160 const int communicator = UPstream::worldComm
161)
162{
163 // Single-item reduction
165 (
166 range,
167 communicator
168 );
169}
170
171
172//- Parallel reduction of multiple OffsetRange (eg, GlobalOffset) items.
173// To avoid calling MPI multiple times, it packs/unpacks values into
174// a local work array (compile-time).
175//
176// \note Only the OffsetRange::size() member is used for this reduction.
177// It is assumed that the combined results will always start with an offset
178// of zero on the first rank, and that ranges represent contiguous addressing
179// across all ranks.
180//
181// \note OffsetRange is already restricted to integral types,
182// but this wrapper also requires that the parameters have the same
183// representation (ie, cannot mix int32/int64, signed/unsigned etc)
184template
185<
186 class OffsetRangeT,
187 class... Rest,
188 class = std::enable_if_t
189 <
190 (
191 std::is_integral_v<typename OffsetRangeT::value_type>
192 && std::is_base_of_v
194 && (std::is_base_of_v
196 )>
198void reduceOffsets
199(
201 const int communicator,
203 OffsetRangeT& first,
205 Rest&... rest
206)
207{
208 using IntType = typename OffsetRangeT::value_type;
209
210 if constexpr (sizeof...(rest) == 0)
211 {
212 // Single-item reduction
214 (
215 first,
216 communicator
217 );
218 }
219 else
220 {
221 // Reduce multiple items, use pack/unpack
223 (
224 communicator,
225 std::make_index_sequence<1 + sizeof...(rest)>{},
226 first,
227 rest...
228 );
229 }
230}
231
232
233//- Parallel reduction of multiple OffsetRange (eg, GlobalOffset) items.
234// To avoid calling MPI multiple times, it packs/unpacks values into
235// a local work array.
236//
237// \note Only the OffsetRange::size() member is used for this reduction.
238// It is assumed that the combined results will always start with an offset
239// of zero on the first rank, and that ranges represent contiguous addressing
240// across all ranks.
241template
242<
243 class OffsetRangeT,
244 class = std::enable_if_t
245 <
246 (
247 std::is_integral_v<typename OffsetRangeT::value_type>
248 && std::is_base_of_v
250 )>
251>
252void reduceOffsets
253(
254 //! The parallel communicator
255 const int communicator,
258)
259{
260 const auto len = ranges.size();
261
262 if (ranges.empty() || !UPstream::is_parallel(communicator))
263 {
264 // nothing to do
265 }
266 else if (len == 1)
267 {
268 // Single-item reduction
269 Foam::reduceOffset(ranges[0], communicator);
270 }
271 else
272 {
273 using IntType = typename OffsetRangeT::value_type;
274
275 // Like Foam::reduceOffset()
276 // but handling multiple items at once for lower communication
277 List<IntType> work(len);
278
279 // Pack all sizes into the work buffer
280 for (label i = 0; i < len; ++i)
281 {
282 work[i] = ranges[i].size();
283 }
284
285 // Exscan (sum) yields the offsets, assigns 0 for rank=0
286 UPstream::mpiExscan_sum(work.data(), work.size(), communicator);
287
288 // For the truly paranoid:
289 // if (UPstream::master(communicator)) work = 0;
290
291 // The work buffer now contains the offsets, copy back to starts
292 // and update work buffer as total == (start + size)
293 for (label i = 0; i < len; ++i)
294 {
295 ranges[i].start() = work[i];
296 work[i] += ranges[i].size();
297 }
298
299 // The rank=(nProcs-1) knows the total - broadcast to others
300 const auto root = (UPstream::nProcs(communicator)-1);
301 UPstream::broadcast(work.data(), work.size(), communicator, root);
302
303 // The work buffer now contains the totals, copy back to total
304 for (label i = 0; i < len; ++i)
305 {
306 ranges[i].total() = work[i];
307 }
308 }
309}
310
311
312// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
313
314} // End namespace Foam
315
316
317// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
318
319#endif
320
321// ************************************************************************* //
scalar range
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
A tuple of integers comprising start, size, total.
Definition OffsetRange.H:82
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition UList.H:89
bool empty() const noexcept
True if List is empty (ie, size() is zero).
Definition UList.H:701
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition UListI.H:274
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition UPstream.H:1706
static void mpiExscan_sum(T values[], int count, const int communicator)
Exclusive sum scan (in-place).
Definition UPstream.H:2407
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition UPstream.H:1743
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run.
Definition UPstream.H:1697
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition UPstream.H:1069
@ broadcast
broadcast [MPI]
Definition UPstream.H:189
Implementation details for UPstream/Pstream/MPI etc.
Definition UPstream.H:57
void reduce_offsetRanges(const int communicator, std::index_sequence< Is... >, OffsetRanges &... items)
void reduce_offsetRange(Foam::OffsetRange< IntType > &range, const int communicator)
Namespace for OpenFOAM.
void reduceOffset(Foam::OffsetRange< IntType > &range, const int communicator=UPstream::worldComm)
Parallel reduction of OffsetRange (eg, GlobalOffset) on its size value to yield the globally-consiste...
void reduceOffsets(const int communicator, OffsetRangeT &first, Rest &... rest)
Parallel reduction of multiple OffsetRange (eg, GlobalOffset) items.