Loading...
Searching...
No Matches
PstreamBuffers.H
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2021-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27Class
28 Foam::PstreamBuffers
29
30Description
31 Buffers for inter-processor communications streams (UOPstream, UIPstream).
32
33 Use UOPstream to stream data into buffers, call finishedSends() to
34 notify that data is in buffers and then use IUPstream to get data out
35 of received buffers. Works with both buffered and non-blocking. Does
36 not make much sense with scheduled since there you would not need these
37 explicit buffers.
38
39 Example usage:
40 \code
41 PstreamBuffers pBufs;
42
43 for (const int proci : UPstream::allProcs())
44 {
45 if (proci != UPstream::myProcNo())
46 {
47 someObject vals;
48
49 UOPstream send(proci, pBufs);
50 send << vals;
51 }
52 }
53
54 pBufs.finishedSends(); // no-op for buffered
55
56 for (const int proci : UPstream::allProcs())
57 {
58 if (proci != UPstream::myProcNo())
59 {
60 UIPstream recv(proci, pBufs);
61 someObject vals(recv);
62 }
63 }
64 \endcode
65
66 There are special versions of finishedSends() for
67 restricted neighbour communication as well as for special
68 one-to-all and all-to-one communication patterns.
69 For example,
70 \code
71 PstreamBuffers pBufs;
72
73 if (UPstream::master())
74 {
75 someObject vals;
76 for (const int proci : UPstream::subProcs())
77 {
78 UOPstream send(proci, pBufs);
79 send << vals;
80 }
81 }
82
83 pBufs.finishedScatters();
84
85 if (!UPstream::master())
86 {
87 UIPstream recv(UPstream::masterNo(), pBufs);
88 someObject vals(recv);
89 }
90 \endcode
91
92 Additionally there are some situations that use speculative sends
93 that may not actually be required. In this case, it is possible to
94 mark all sends as initially \em unregistered and subsequently
95 mark the "real" sends as \em registered.
96
97 For example,
98 \code
99 PstreamBuffers pBufs;
100
101 pBufs.initRegisterSend();
102
103 for (const polyPatch& pp : patches)
104 {
105 const auto* ppp = isA<processorPolyPatch>(pp);
106 if (ppp)
107 {
108 const label nbrProci = ppp->neighbProcNo();
109
110 // Gather some patch information...
111 UOPstream toNbr(nbrProci, pBufs);
112 toNbr << patchInfo;
113
114 // The send is needed if patchInfo is non-empty
115 pBufs.registerSend(nbrProci, !patchInfo.empty());
116 }
117 }
118
119 // optional: pBufs.clearUnregistered();
120
121 pBufs.finishedSends();
122
123 ...
124 \endcode
125
126SourceFiles
127 PstreamBuffers.C
128
129\*---------------------------------------------------------------------------*/
130
131#include "Pstream.H"
132
133#ifndef Foam_PstreamBuffers_H
134#define Foam_PstreamBuffers_H
135
136#include "DynamicList.H"
137#include "UPstream.H"
138#include "IOstream.H"
139
140// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
141
142namespace Foam
143{
144
145// Forward Declarations
146class bitSet;
148/*---------------------------------------------------------------------------*\
149 Class PstreamBuffers Declaration
150\*---------------------------------------------------------------------------*/
151
152class PstreamBuffers
153{
154 // Private Data Types
155
156 //- Private enumeration for handling PEX stage 1 (sizing) modes
157 enum class modeOption : unsigned char
158 {
159 DEFAULT,
160 GATHER,
161 SCATTER,
162 ALL_TO_ALL,
163 NBX_PEX
164 };
165
166
167 // Private Data
168
169 //- Track if sends are complete
170 bool finishedSendsCalled_;
171
172 //- Permit clear of individual receive buffer by external access
173 bool allowClearRecv_;
174
175 //- Buffer format (ascii | binary)
176 const IOstreamOption::streamFormat format_;
177
178 //- Communications type of this stream
179 const UPstream::commsTypes commsType_;
180
181 //- The transfer message type
182 const int tag_;
183
184 //- Communicator
185 const int comm_;
186
187 //- Number of ranks associated with PstreamBuffers (at construction)
188 const int nProcs_;
189
190
191 // Buffer storage
192
193 //- Send buffers. Size is nProcs()
194 List<DynamicList<char>> sendBuffers_;
195
196 //- Receive buffers. Size is nProcs()
197 List<DynamicList<char>> recvBuffers_;
198
199 //- Current read positions within recvBuffers_. Size is nProcs()
200 // This list is also misused for registerSend() bookkeeping
201 labelList recvPositions_;
202
203
204 // Private Member Functions
205
206 //- Change status of finished sends called
207 inline void setFinished(bool on) noexcept;
208
209 //- Clear 'unregistered' send buffers, tag as being send-ready
210 inline void initFinalExchange();
211
212 //- Mark all sends as having been done.
213 // This will start receives (non-blocking comms).
214 void finalExchange
215 (
216 enum modeOption mode,
217 const bool wait,
218 labelList& recvSizes
219 );
220
221 //- Mark sends as done.
222 // Only exchange sizes using the neighbour ranks
223 // (non-blocking comms).
224 void finalExchange
225 (
226 const labelUList& sendProcs,
227 const labelUList& recvProcs,
228 const bool wait,
229 labelList& recvSizes
230 );
231
232
233 // Friendship Access
234
235 //- Access a send buffer for given proc (in range 0-nProcs)
236 DynamicList<char>& accessSendBuffer(const label proci);
237
238 //- Access a recv buffer for given proc (in range 0-nProcs).
239 DynamicList<char>& accessRecvBuffer(const label proci);
240
241 //- Access the recv position within recv buffer for given proc
242 //- (in range 0-nProcs).
243 label& accessRecvPosition(const label proci);
244
245 friend class UOPstreamBase; // accessSendBuffer()
246 friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition()
247
248
249public:
250
251 // Declare name of the class and its debug switch
252 ClassName("PstreamBuffers");
253
254
255 // Static Data
256
257 //- Preferred exchange algorithm (may change or be removed in future)
258 static int algorithm;
259
260
261 // Constructors
262
263 //- Construct given communication type (default: nonBlocking), message
264 //- tag, communicator (default: worldComm), IO format (default: binary)
265 explicit PstreamBuffers
266 (
268 int tag = UPstream::msgType(),
269 int communicator = UPstream::worldComm,
271 );
272
273 //- Construct given communicator, communication type
274 //- (default: nonBlocking), message tag, IO format (default: binary)
275 explicit PstreamBuffers
276 (
277 int communicator,
281 )
282 :
283 PstreamBuffers(commsType, tag, communicator, fmt)
284 {}
285
286 //- Construct given communicator, message tag, communication type
287 //- (default: nonBlocking), IO format (default: binary)
289 (
290 int communicator,
291 int tag,
295 :
296 PstreamBuffers(commsType, tag, communicator, fmt)
297 {}
298
299
300 //- Destructor - checks that all data have been consumed
302
303
304 // Member Functions
305
306 // Attributes
307
308 //- The associated buffer format (ascii | binary)
309 IOstreamOption::streamFormat format() const noexcept { return format_; }
310
311 //- The communications type of the stream
312 UPstream::commsTypes commsType() const noexcept { return commsType_; }
313
314 //- The transfer message tag
315 int tag() const noexcept { return tag_; }
316
317 //- The communicator index
318 int comm() const noexcept { return comm_; }
319
320 //- Number of ranks associated with PstreamBuffers
321 int nProcs() const noexcept { return nProcs_; }
322
323
324 // Sizing
325
326 //- Range of ranks indices associated with PstreamBuffers
328 {
329 // Proc 0 -> nProcs (int value)
330 return UPstream::rangeType(static_cast<int>(nProcs_));
331 }
332
333 //- Range of sub-processes indices associated with PstreamBuffers
335 {
336 // Proc 1 -> nProcs (int value)
337 return UPstream::rangeType(1, static_cast<int>(nProcs_-1));
338 }
339
340
341 // Queries
342
343 //- True if finishedSends() or finishedNeighbourSends() has been called
344 bool finished() const noexcept;
345
346 //- Is clearStorage of individual receive buffer by external hooks
347 //- allowed? (default: true)
348 bool allowClearRecv() const noexcept;
349
350 //- True if any (local) send buffers have data
351 bool hasSendData() const;
352
353 //- True if any (local) recv buffers have unconsumed data.
354 //- Must call finishedSends() or other finished.. method first!
355 bool hasRecvData() const;
356
357 //- Number of send bytes for the specified processor.
358 label sendDataCount(const label proci) const;
359
360 //- Number of unconsumed receive bytes for the specified processor.
361 //- Must call finishedSends() or other finished.. method first!
362 label recvDataCount(const label proci) const;
363
364 //- Number of unconsumed receive bytes for all processors.
365 //- Must call finishedSends() or other finished.. method first!
367
368 //- Maximum receive size from any rocessor rank.
369 //- Must call finishedSends() or other finished.. method first!
370 label maxRecvCount() const;
371
372 //- Maximum receive size, excluding current processor rank
373 //- Must call finishedSends() or other finished.. method first!
374 label maxNonLocalRecvCount() const;
376 //- Maximum receive size, excluding the specified processor rank
377 //- Must call finishedSends() or other finished.. method first!
378 label maxNonLocalRecvCount(const label excludeProci) const;
379
380 //- Number of unconsumed receive bytes for the specified processor.
381 //- Must call finishedSends() or other finished.. method first!
382 // The method is only useful in limited situations, such as when
383 // PstreamBuffers has been used to fill contiguous data
384 // (eg, using OPstream::write).
385 const UList<char> peekRecvData(const label proci) const;
386
387
388 // Edit
389
390 //- Clear all send/recv buffers and reset states.
391 // Does not remove the buffer storage.
392 void clear();
393
394 //- Clear all send buffers (does not remove buffer storage)
395 void clearSends();
396
397 //- Clear all recv buffer and positions (does not remove buffer storage)
398 void clearRecvs();
399
400 //- Clear an individual send buffer (eg, data not required)
401 void clearSend(const label proci);
402
403 //- Clear an individual receive buffer (eg, data not required)
404 // Does not remove the buffer storage.
405 void clearRecv(const label proci);
406
407 //- Clear storage for all send/recv buffers and reset states.
408 void clearStorage();
409
410 //- Change allowClearRecv, return previous value
411 bool allowClearRecv(bool on) noexcept;
412
413
414 // Registered Sending
415
416 //- Initialise registerSend() bookkeeping by mark all send buffers
417 //- as 'unregistered'
418 // Usually called immediately after construction or clear().
419 void initRegisterSend();
420
421 //- Toggle an individual send buffer as 'registered'.
422 //- The setting is sticky (does not turn off)
423 void registerSend(const label proci, const bool toggleOn = true);
424
425 //- Clear any 'unregistered' send buffers.
426 void clearUnregistered();
427
428
429 // Regular Functions
430
431 //- Mark the send phase as being finished.
432 //
433 // Non-blocking mode: populates receive buffers using all-to-all
434 // or NBX (depending on tuning parameters).
435 // \param wait wait for requests to complete (in non-blocking mode)
436 void finishedSends(const bool wait = true);
437
438 //- Mark the send phase as being finished.
439 //
440 // Non-blocking mode: populates receive buffers using NBX.
441 // \param wait wait for requests to complete (in non-blocking mode)
442 void finishedSendsNBX(const bool wait = true);
443
444 //- Mark the send phase as being finished.
445 //- Recovers the sizes (bytes) received.
446 //
447 // Non-blocking mode: populates receive buffers using all-to-all
448 // or NBX (depending on tuning parameters).
449 // \warning currently only valid for non-blocking comms.
450 void finishedSends
451 (
453 labelList& recvSizes,
455 const bool wait = true
456 );
457
458 //- Mark the send phase as being finished.
459 //- Recovers the sizes (bytes) received.
460 //
461 // Non-blocking mode: populates receive buffers using NBX.
462 // \warning currently only valid for non-blocking comms.
464 (
466 labelList& recvSizes,
468 const bool wait = true
469 );
470
471
472 // Functions with restricted neighbours
473
474 //- Mark the send phase as being finished, with communication
475 //- being limited to a known subset of send/recv ranks.
476 //
477 // Non-blocking mode: populates receive buffers.
478 //
479 // \warning currently only valid for non-blocking comms.
480 // \note Same as finishedSends with identical sendProcs/recvProcs
482 (
484 const labelUList& neighProcs,
486 const bool wait = true
487 );
488
489 //- Mark the send phase as being finished, with communication
490 //- being limited to a known subset of send/recv ranks.
491 //- Recovers the sizes (bytes) received.
492 //
493 // Non-blocking mode: it will populate receive buffers.
494 //
495 // \warning currently only valid for non-blocking mode.
497 (
499 const labelUList& neighProcs,
501 labelList& recvSizes,
503 const bool wait = true
504 );
505
506 //- A caching version that uses a limited send/recv connectivity.
507 //
508 // Non-blocking mode: populates receive buffers.
509 // \return True if the send/recv connectivity changed
510 //
511 // \warning currently only valid for non-blocking comms.
512 bool finishedSends
513 (
515 bitSet& sendConnections,
517 DynamicList<label>& sendProcs,
519 DynamicList<label>& recvProcs,
521 const bool wait = true
522 );
523
524
525 // Gather/scatter modes
526
527 //- Mark all sends to master as done.
528 //
529 // Non-blocking mode: populates receive buffers.
530 // Can use recvDataCount, maxRecvCount etc to recover sizes received.
531 //
532 // \param wait wait for requests to complete (in non-blocking mode)
533 //
534 // \warning currently only valid for non-blocking comms.
535 void finishedGathers(const bool wait = true);
536
537 //- Mark all sends to master as done.
538 //- Recovers the sizes (bytes) received.
539 //
540 // Non-blocking mode: populates receive buffers (all-to-one).
541 // \warning currently only valid for non-blocking comms.
542 void finishedGathers
543 (
545 labelList& recvSizes,
547 const bool wait = true
548 );
549
550 //- Mark all sends to sub-procs as done.
551 //
552 // Non-blocking mode: populates receive buffers.
553 // Can use recvDataCount, maxRecvCount etc to recover sizes received.
554 //
555 // \param wait wait for requests to complete (in non-blocking mode)
556 //
557 // \warning currently only valid for non-blocking comms.
558 void finishedScatters(const bool wait = true);
559
560 //- Mark all sends to sub-procs as done.
561 //- Recovers the sizes (bytes) received.
562 //
563 // Non-blocking mode: populates receive buffers (all-to-one).
564 // \warning currently only valid for non-blocking comms.
566 (
568 labelList& recvSizes,
570 const bool wait = true
571 );
572};
573
574
575// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
576
577} // End namespace Foam
578
579// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
580
581#endif
582
583// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
streamFormat
Data format (ascii | binary | coherent).
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
void clearUnregistered()
Clear any 'unregistered' send buffers.
int nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
ClassName("PstreamBuffers")
void clearSends()
Clear all send buffers (does not remove buffer storage).
PstreamBuffers(int communicator, int tag, UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communicator, message tag, communication type (default: nonBlocking),...
UPstream::rangeType allProcs() const noexcept
Range of ranks indices associated with PstreamBuffers.
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
friend class UIPstreamBase
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true).
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished....
int tag() const noexcept
The transfer message tag.
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off).
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'.
void clearStorage()
Clear storage for all send/recv buffers and reset states.
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required).
int comm() const noexcept
The communicator index.
bool hasSendData() const
True if any (local) send buffers have data.
static int algorithm
Preferred exchange algorithm (may change or be removed in future).
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), int communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage).
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished....
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished....
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required).
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
friend class UOPstreamBase
void clear()
Clear all send/recv buffers and reset states.
PstreamBuffers(int communicator, UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communicator, communication type (default: nonBlocking), message tag,...
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished....
UPstream::rangeType subProcs() const noexcept
Range of sub-processes indices associated with PstreamBuffers.
IOstreamOption::streamFormat format() const noexcept
The associated buffer format (ascii | binary).
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition UList.H:89
commsTypes
Communications types.
Definition UPstream.H:81
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition UPstream.H:1069
IntRange< int > rangeType
Int ranges are used for MPI ranks (processes).
Definition UPstream.H:75
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
Definition bitSet.H:61
#define ClassName(TypeNameString)
Add typeName information from argument TypeNameString to a class.
Definition className.H:74
surface1 clear()
Namespace for OpenFOAM.
List< label > labelList
A List of labels.
Definition List.H:62
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Definition POSIX.C:775
const direction noexcept
Definition scalarImpl.H:265
UList< label > labelUList
A UList of labels.
Definition UList.H:75