56inline void Foam::PstreamBuffers::setFinished(
bool on)
noexcept
58 finishedSendsCalled_ = on;
62inline void Foam::PstreamBuffers::initFinalExchange()
72void Foam::PstreamBuffers::finalExchange
84 case modeOption::DEFAULT :
90 ? modeOption::ALL_TO_ALL
96 case modeOption::GATHER :
102 for (label proci = 1; proci < sendBuffers_.size(); ++proci)
104 sendBuffers_[proci].clear();
109 case modeOption::SCATTER :
135 case modeOption::GATHER :
145 recvSizes.resize_nocopy(nProcs_);
152 case modeOption::SCATTER :
157 recvSizes.resize_nocopy(nProcs_);
161 forAll(sendBuffers_, proci)
163 recvSizes[proci] = sendBuffers_[proci].size();
173 recvSizes[0] = myRecv;
178 case modeOption::NBX_PEX :
181 sendSizes.resize_nocopy(nProcs_);
182 forAll(sendBuffers_, proci)
184 sendSizes[proci] = sendBuffers_[proci].size();
186 recvSizes.resize_nocopy(nProcs_);
199 case modeOption::DEFAULT :
200 case modeOption::ALL_TO_ALL :
203 sendSizes.resize_nocopy(nProcs_);
204 forAll(sendBuffers_, proci)
206 sendSizes[proci] = sendBuffers_[proci].size();
208 recvSizes.resize_nocopy(nProcs_);
231void Foam::PstreamBuffers::finalExchange
242 <<
" nProcs:" << nProcs_
252 recvSizes.resize_nocopy(nProcs_);
258 for (
const label proci : sendProcs)
260 recvSizes[proci] = 1;
263 for (label proci = 0; proci < nProcs_; ++proci)
265 if (!recvSizes[proci])
267 sendBuffers_[proci].clear();
307 finishedSendsCalled_(false),
308 allowClearRecv_(true),
310 commsType_(commsType),
313 nProcs_(UPstream::nProcs(comm_)),
314 sendBuffers_(nProcs_),
315 recvBuffers_(nProcs_),
316 recvPositions_(nProcs_,
Foam::zero{})
321 <<
" nProcs:" << nProcs_
333 <<
" nProcs:" << nProcs_
337 forAll(recvBuffers_, proci)
339 const label
pos = recvPositions_[proci];
340 const label len = recvBuffers_[proci].size();
342 if (
pos >= 0 &&
pos < len)
345 <<
"Message from processor " << proci
346 <<
" Only consumed " <<
pos <<
" of " << len <<
" bytes" <<
nl
347 <<
" comm " << comm_ <<
" tag " << tag_ <<
nl
356Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
361 return sendBuffers_[proci];
365Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
370 return recvBuffers_[proci];
374Foam::label& Foam::PstreamBuffers::accessRecvPosition(
const label proci)
376 return recvPositions_[proci];
393 for (DynamicList<char>& buf : recvBuffers_)
397 recvPositions_ =
Zero;
411 for (label proci = 0; proci < nProcs_; ++proci)
413 if (recvPositions_[proci] < 0)
415 recvPositions_[proci] = 0;
416 sendBuffers_[proci].clear();
424 sendBuffers_[proci].clear();
425 if (recvPositions_[proci] < 0)
428 recvPositions_[proci] = 0;
435 recvBuffers_[proci].clear();
436 recvPositions_[proci] = 0;
448 for (DynamicList<char>& buf : recvBuffers_)
452 recvPositions_ =
Zero;
462 for (label proci = 0; proci < nProcs_; ++proci)
464 sendBuffers_[proci].
clear();
466 recvPositions_[proci] = -1;
475 if (toggleOn && recvPositions_[proci] < 0)
477 recvPositions_[proci] = 0;
484 for (
const DynamicList<char>& buf : sendBuffers_)
499 forAll(recvBuffers_, proci)
501 if (recvPositions_[proci] < recvBuffers_[proci].size())
521 return sendBuffers_[proci].size();
529 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
554 forAll(recvBuffers_, proci)
556 const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
578 const label excludeProci
585 forAll(recvBuffers_, proci)
587 if (excludeProci != proci)
589 label len(recvBuffers_[proci].size() - recvPositions_[proci]);
624 const label
pos = recvPositions_[proci];
625 const label len = recvBuffers_[proci].size();
631 const_cast<char*
>(recvBuffers_[proci].cbegin(
pos)),
653 <<
" nProcs:" << nProcs_
657 finalExchange(modeOption::DEFAULT, wait, recvSizes);
666 <<
" nProcs:" << nProcs_
670 finalExchange(modeOption::NBX_PEX, wait, recvSizes);
683 <<
" nProcs:" << nProcs_
689 finalExchange(modeOption::DEFAULT, wait, recvSizes);
694 <<
"Obtaining sizes not supported in "
696 <<
" since transfers already in progress. Use non-blocking instead."
712 finalExchange(neighProcs, neighProcs, wait, recvSizes);
723 finalExchange(neighProcs, neighProcs, wait, recvSizes);
735 bool changed = (sendConnections.
size() != nProcs());
739 sendConnections.
resize(nProcs());
743 forAll(sendBuffers_, proci)
745 if (sendConnections.
set(proci, !sendBuffers_[proci].empty()))
759 finishedSends(recvSizes, wait);
763 forAll(sendBuffers_, proci)
765 if (!sendBuffers_[proci].empty())
775 if (recvSizes[proci] > 0)
785 finalExchange(sendProcs, recvProcs, wait, recvSizes);
795 finalExchange(modeOption::GATHER, wait, recvSizes);
802 finalExchange(modeOption::SCATTER, wait, recvSizes);
812 finalExchange(modeOption::GATHER, wait, recvSizes);
817 <<
"Obtaining sizes not supported in "
819 <<
" since transfers already in progress. Use non-blocking instead."
834 finalExchange(modeOption::SCATTER, wait, recvSizes);
839 <<
"Obtaining sizes not supported in "
841 <<
" since transfers already in progress. Use non-blocking instead."
855 return finishedSendsCalled_;
861 return allowClearRecv_;
867 bool old(allowClearRecv_);
868 allowClearRecv_ = on;
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void push_back(const T &val)
Copy append an element to the end of this list.
streamFormat
Data format (ascii | binary | coherent).
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
void resize(const label numElem, const unsigned int val=0u)
Reset addressable list size, does not shrink the allocated size.
label size() const noexcept
Number of entries.
Buffers for inter-processor communications streams (UOPstream, UIPstream).
label sendDataCount(const label proci) const
Number of send bytes for the specified processor.
void clearUnregistered()
Clear any 'unregistered' send buffers.
int nProcs() const noexcept
Number of ranks associated with PstreamBuffers.
const UList< char > peekRecvData(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void clearSends()
Clear all send buffers (does not remove buffer storage).
UPstream::commsTypes commsType() const noexcept
The communications type of the stream.
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
bool allowClearRecv() const noexcept
Is clearStorage of individual receive buffer by external hooks allowed? (default: true).
label maxRecvCount() const
Maximum receive size from any rocessor rank. Must call finishedSends() or other finished....
int tag() const noexcept
The transfer message tag.
void registerSend(const label proci, const bool toggleOn=true)
Toggle an individual send buffer as 'registered'. The setting is sticky (does not turn off).
void initRegisterSend()
Initialise registerSend() bookkeeping by mark all send buffers as 'unregistered'.
void clearStorage()
Clear storage for all send/recv buffers and reset states.
void clearRecv(const label proci)
Clear an individual receive buffer (eg, data not required).
bool hasSendData() const
True if any (local) send buffers have data.
static int algorithm
Preferred exchange algorithm (may change or be removed in future).
PstreamBuffers(UPstream::commsTypes commsType=UPstream::commsTypes::nonBlocking, int tag=UPstream::msgType(), int communicator=UPstream::worldComm, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given communication type (default: nonBlocking), message tag, communicator (default: worldC...
void clearRecvs()
Clear all recv buffer and positions (does not remove buffer storage).
void finishedScatters(const bool wait=true)
Mark all sends to sub-procs as done.
labelList recvDataCounts() const
Number of unconsumed receive bytes for all processors. Must call finishedSends() or other finished....
void finishedSendsNBX(const bool wait=true)
Mark the send phase as being finished.
bool hasRecvData() const
True if any (local) recv buffers have unconsumed data. Must call finishedSends() or other finished....
void clearSend(const label proci)
Clear an individual send buffer (eg, data not required).
label recvDataCount(const label proci) const
Number of unconsumed receive bytes for the specified processor. Must call finishedSends() or other fi...
void finishedSends(const bool wait=true)
Mark the send phase as being finished.
void clear()
Clear all send/recv buffers and reset states.
void finishedNeighbourSends(const labelUList &neighProcs, const bool wait=true)
Mark the send phase as being finished, with communication being limited to a known subset of send/rec...
void finishedGathers(const bool wait=true)
Mark all sends to master as done.
~PstreamBuffers()
Destructor - checks that all data have been consumed.
label maxNonLocalRecvCount() const
Maximum receive size, excluding current processor rank Must call finishedSends() or other finished....
static void exchange(const UList< Container > &sendBufs, const labelUList &recvSizes, List< Container > &recvBufs, const int tag=UPstream::msgType(), const int comm=UPstream::worldComm, const bool wait=true)
Helper: exchange contiguous data. Sends sendBufs, receives into recvBufs using predetermined receive ...
static void exchangeSizes(const labelUList &sendProcs, const labelUList &recvProcs, const Container &sendBufs, labelList &sizes, const int tag=UPstream::msgType(), const int comm=UPstream::worldComm)
Helper: exchange sizes of sendBufs for specified send/recv ranks.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Inter-processor communications stream.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
commsTypes
Communications types.
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
static void allToAll(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int communicator=UPstream::worldComm)
Exchange int32_t data with all ranks in communicator.
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static void reduceOr(bool &value, const int communicator=worldComm)
Logical (or) reduction (MPI_AllReduce).
static T listScatterValues(const UList< T > &allValues, const int communicator=UPstream::worldComm)
Scatter individual values from list locations.
static void allToAllConsensus(const UList< int32_t > &sendData, UList< int32_t > &recvData, const int tag, const int communicator=UPstream::worldComm)
Exchange non-zero int32_t data between ranks [NBX].
A bitSet stores bits (elements with only two states) in packed internal format and supports a variety...
void set(const bitSet &bitset)
Set specified bits from another bitset.
A class representing the concept of 0 (zero) that can be used to avoid manipulating objects known to ...
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
#define DebugPoutInFunction
Report an information message using Foam::Pout.
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
dimensionedScalar pos(const dimensionedScalar &ds)
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
List< label > labelList
A List of labels.
mode_t mode(const fileName &name, const bool followLink=true)
Return the file mode, normally following symbolic links.
Ostream & endl(Ostream &os)
Add newline and flush stream.
errorManip< error > abort(error &err)
static constexpr const zero Zero
Global zero (0).
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
errorManipArg< error, int > exit(error &err, const int errNo=1)
UList< label > labelUList
A UList of labels.
constexpr char nl
The newline '\n' character (0x0a).
#define registerOptSwitch(Name, Type, SwitchVar)
#define forAll(list, i)
Loop across all elements in list.