71 if (str.empty() || !
Foam::read(str, len) || len <= 0)
81 char* buf =
new char[len];
83 if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
88 if (Foam::UPstream::debug)
90 Foam::Perr<<
"UPstream::init : buffer-size " << len <<
'\n';
96 Foam::Perr<<
"UPstream::init : could not attach buffer\n";
121 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
142 validParOptions.insert(
"np",
"");
143 validParOptions.insert(
"p4pg",
"PI file");
144 validParOptions.insert(
"p4wd",
"directory");
145 validParOptions.insert(
"p4amslave",
"");
146 validParOptions.insert(
"p4yourname",
"hostname");
147 validParOptions.insert(
"machinefile",
"machine file");
155 MPI_Finalized(&flag);
160 <<
"MPI was already finalized - cannot perform MPI_Init\n"
166 MPI_Initialized(&flag);
171 Perr<<
"UPstream::initNull : was already initialized\n";
198 int provided_thread_support = 0;
200 MPI_Finalized(&flag);
205 <<
"MPI was already finalized - cannot perform MPI_Init" <<
endl
211 MPI_Initialized(&flag);
220 <<
"MPI was already initialized - cannot perform MPI_Init" <<
nl
221 <<
"This could indicate an application programming error!"
226 else if (UPstream::debug)
228 Perr<<
"UPstream::init : was already initialized\n";
231 MPI_Query_thread(&provided_thread_support);
236 int required_thread_support =
239 ? MPI_THREAD_MULTIPLE
247 required_thread_support,
248 &provided_thread_support
275 UPstream::noInitialCommDup_ =
false;
276 bool split_by_appnum =
false;
281 for (
int argi = 1; argi < argc; ++argi)
283 const char *optName = argv[argi];
284 if (optName[0] !=
'-')
290 if (strcmp(optName,
"world") == 0)
295 <<
"Missing world name for option '-world'" <<
nl
298 worldName = argv[argi+1];
301 for (
int i = argi+2; i < argc; ++i)
308 else if (strcmp(optName,
"mpi-no-comm-dup") == 0)
310 UPstream::noInitialCommDup_ =
true;
313 for (
int i = argi+1; i < argc; ++i)
320 else if (strcmp(optName,
"mpi-split-by-appnum") == 0)
322 split_by_appnum =
true;
323 UPstream::noInitialCommDup_ =
true;
326 for (
int i = argi+1; i < argc; ++i)
335 const bool hasLocalWorld(!worldName.empty());
337 if (hasLocalWorld && split_by_appnum)
340 <<
"Cannot specify both -world and -mpi-split-by-appnum" <<
nl
344 int numProcs = 0, globalRanki = 0;
345 MPI_Comm_rank(MPI_COMM_WORLD, &globalRanki);
346 MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
350 Perr<<
"UPstream::init :"
351 <<
" thread-support : requested:" << needsThread
354 (provided_thread_support == MPI_THREAD_SINGLE)
356 : (provided_thread_support == MPI_THREAD_SERIALIZED)
358 : (provided_thread_support == MPI_THREAD_MULTIPLE)
362 <<
" procs:" << numProcs
363 <<
" rank:" << globalRanki
364 <<
" world:" << worldName <<
endl;
367 if (numProcs <= 1 && !(hasLocalWorld || split_by_appnum))
370 <<
"attempt to run parallel on 1 processor"
375 setParRun(numProcs, provided_thread_support == MPI_THREAD_MULTIPLE);
381 const auto mpiGlobalComm =
391 int stride = int(worldName.size()) + 1;
405 auto buffer_storage = std::make_unique<char[]>(numProcs*stride);
406 char* allStrings = buffer_storage.get();
410 char* slot = (allStrings + (globalRanki*stride));
411 std::fill_n(slot, stride,
'\0');
412 std::copy_n(worldName.data(), worldName.size(), slot);
418 MPI_IN_PLACE, 0, MPI_CHAR,
419 allStrings, stride, MPI_CHAR,
423 worldIDs_.resize_nocopy(numProcs);
428 for (label proci = 0; proci < numProcs; ++proci)
432 word world(allStrings + (proci*stride));
434 worldIDs_[proci] = uniqWorlds.find(world);
436 if (worldIDs_[proci] == -1)
438 worldIDs_[proci] = uniqWorlds.size();
439 uniqWorlds.push_back(std::move(world));
443 allWorlds_ = std::move(uniqWorlds);
446 const label myWorldId = worldIDs_[globalRanki];
451 if (worldIDs_[proci] == myWorldId)
453 subRanks.push_back(proci);
460 UPstream::constWorldComm_ =
476 if (MPI_COMM_NULL != mpiGlobalComm)
478 MPI_Comm_set_name(mpiGlobalComm,
"<openfoam:global>");
481 const auto mpiWorldComm =
484 if (MPI_COMM_NULL != mpiWorldComm)
486 MPI_Comm_set_name(mpiWorldComm, (
"world=" + worldName).data());
492 int newRanki, newSize;
493 MPI_Comm_rank(mpiWorldComm, &newRanki);
494 MPI_Comm_size(mpiWorldComm, &newSize);
496 Perr<<
"UPstream::init : in world:" << worldName
497 <<
" using local communicator:" << constWorldComm_
498 <<
" rank " << newRanki <<
" of " << newSize <<
endl;
502 Pout.prefix() =
'[' + worldName +
'/' +
Foam::name(worldRanki) +
"] ";
505 else if (split_by_appnum)
513 const auto mpiGlobalComm =
522 MPI_Comm_get_attr(mpiGlobalComm, MPI_APPNUM, &val, &flag);
525 appNum = *
static_cast<int*
>(val);
530 Perr<<
"UPstream::init : used -mpi-split-by-appnum"
531 " with a single application??" <<
endl;
542 UPstream::constWorldComm_ =
558 if (MPI_COMM_NULL != mpiGlobalComm)
560 MPI_Comm_set_name(mpiGlobalComm,
"<openfoam:global>");
563 const auto mpiWorldComm =
568 if (MPI_COMM_NULL != mpiWorldComm)
570 MPI_Comm_set_name(mpiWorldComm, commName.data());
576 int newRanki, newSize;
577 MPI_Comm_rank(mpiWorldComm, &newRanki);
578 MPI_Comm_size(mpiWorldComm, &newSize);
580 Perr<<
"UPstream::init : app:" << appNum
581 <<
" using local communicator:" << constWorldComm_
582 <<
" rank " << newRanki <<
" of " << newSize <<
endl;
586 Pout.prefix() =
'[' + commName +
'/' +
Foam::name(worldRanki) +
"] ";
592 worldIDs_.resize_nocopy(numProcs);
595 const auto mpiWorldComm =
599 if (MPI_COMM_NULL != mpiWorldComm)
601 MPI_Comm_set_name(mpiWorldComm,
"<openfoam:world>");
616 setSharedMemoryCommunicators();
622 setHostCommunicators();
632 "<openfoam:inter-node>"
640 "<openfoam:local-node>"
654 MPI_Initialized(&flag);
661 MPI_Finalized(&flag);
668 <<
"MPI was already finalized (by a connected program?)\n";
670 else if (UPstream::debug && errNo == 0)
672 Perr<<
"UPstream::shutdown : was already finalized\n";
681 <<
"Finalizing MPI, but was initialized elsewhere\n";
701 Perr<<
"UPstream::shutdown\n";
706 label nOutstanding = 0;
710 if (MPI_REQUEST_NULL != request)
720 <<
"Still have " << nOutstanding
721 <<
" outstanding MPI requests."
722 <<
" Should not happen for a normal code exit."
735 freeCommunicatorComponents(communicator);
756 MPI_Comm abortComm = MPI_COMM_WORLD;
765 if (MPI_COMM_NULL == abortComm)
767 abortComm = MPI_COMM_WORLD;
772 MPI_Abort(abortComm, errNo);
778void Foam::UPstream::allocateCommunicatorComponents
780 const label parentIndex,
786 int returnCode = MPI_SUCCESS;
788 if (parentIndex == -1)
795 <<
"base world communicator should always be index "
801 if (UPstream::noInitialCommDup_)
809 MPI_Comm_dup(MPI_COMM_WORLD, &mpiNewComm);
812 MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
816 MPI_Comm_size(mpiNewComm, &numProcs);
819 procIDs_[index].resize_nocopy(numProcs);
820 std::iota(procIDs_[index].begin(), procIDs_[index].
end(), 0);
822 else if (parentIndex == -2)
829 MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
835 procIDs_[index].resize_nocopy(1);
836 MPI_Comm_rank(MPI_COMM_WORLD, &procIDs_[index].front());
843 const auto mpiParentComm =
852 MPI_Group parent_group;
853 MPI_Comm_group(mpiParentComm, &parent_group);
855 MPI_Group active_group;
859 procIDs_[index].size(),
860 procIDs_[index].cdata(),
864 #if defined(MSMPI_VER)
874 MPI_Comm_create_group
884 MPI_Group_free(&parent_group);
885 MPI_Group_free(&active_group);
887 if (MPI_COMM_NULL == mpiNewComm)
890 myProcNo_[index] = -1;
900 if (index != commInterNode_)
902 procIDs_[index].clear();
907 returnCode = MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
913 <<
" when allocating communicator at " << index
914 <<
" from ranks " <<
flatOutput(procIDs_[index])
915 <<
" of parent " << parentIndex
916 <<
" cannot find my own rank"
924void Foam::UPstream::dupCommunicatorComponents
926 const label parentIndex,
939 myProcNo_[index] = myProcNo_[parentIndex];
940 procIDs_[index] = procIDs_[parentIndex];
944void Foam::UPstream::splitCommunicatorComponents
946 const label parentIndex,
979 MPI_Comm_rank(mpiParentComm, &parentRank);
980 MPI_Comm_size(mpiParentComm, &parentSize);
982 auto& procIds = procIDs_[index];
983 myProcNo_[index] = -1;
988 procIds.resize_nocopy(parentSize);
989 procIds[parentRank] = colour;
993 MPI_IN_PLACE, 0, MPI_INT,
994 procIds.data(), 1, MPI_INT,
1007 for (
int i = 0; i < parentSize; ++i)
1009 if (procIds[i] == colour)
1011 procIds[nranks++] = i;
1014 procIds.resize(nranks);
1017 allocateCommunicatorComponents(parentIndex, index);
1026 (colour >= 0 ? colour : MPI_UNDEFINED),
1031 if (MPI_COMM_NULL == mpiNewComm)
1040 MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
1043 MPI_Group parent_group;
1044 MPI_Comm_group(mpiParentComm, &parent_group);
1046 MPI_Group new_group;
1047 MPI_Comm_group(mpiNewComm, &new_group);
1051 std::iota(parentIds.begin(), parentIds.end(), 0);
1054 procIds.resize_nocopy(parentSize);
1057 MPI_Group_translate_ranks
1059 parent_group, parentSize, parentIds.data(),
1060 new_group, procIds.data()
1064 MPI_Group_free(&parent_group);
1065 MPI_Group_free(&new_group);
1071 for (
int i = 0; i < parentSize; ++i)
1074 if (procIds[i] >= 0 && procIds[i] < parentSize)
1076 procIds[nranks++] = i;
1079 procIds.resize(nranks);
1085void Foam::UPstream::freeCommunicatorComponents(
const label index)
1087 if (UPstream::debug)
1089 Perr<<
"freeCommunicatorComponents: " << index
1115bool Foam::UPstream::setSharedMemoryCommunicators()
1126 if (
FOAM_UNLIKELY(commInterNode_ >= 0 || commLocalNode_ >= 0))
1130 <<
"Node communicator(s) already created!" <<
endl
1135 commInterNode_ = getAvailableCommIndex(constWorldComm_);
1136 commLocalNode_ = getAvailableCommIndex(constWorldComm_);
1146 if (commLocalNode_ < commInterNode_)
1148 std::swap(commLocalNode_, commInterNode_);
1153 Perr<<
"Allocating node communicators "
1154 << commInterNode_ <<
", " << commLocalNode_ <<
nl
1155 <<
" parent : " << constWorldComm_ <<
nl
1160 const auto mpiParentComm =
1163 auto& mpiLocalNode =
1168 MPI_Comm_rank(mpiParentComm, &parentRank);
1169 MPI_Comm_size(mpiParentComm, &parentSize);
1177 MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
1186 const label index = commLocalNode_;
1189 myProcNo_[index] = -1;
1190 procIDs_[index].clear();
1193 <<
"Comm_split_type(shared) failed\n"
1199 const label index = commLocalNode_;
1200 auto& procIds = procIDs_[index];
1206 MPI_Comm_rank(mpiLocalNode, &localRank);
1207 MPI_Comm_size(mpiLocalNode, &localSize);
1212 nodeLeaders[parentRank] = parentRank;
1215 procIds.resize_nocopy(localSize);
1222 MPI_IN_PLACE, 0, MPI_INT,
1223 procIds.data(), 1, MPI_INT,
1231 auto& procIds = procIDs_[commInterNode_];
1235 MPI_IN_PLACE, 0, MPI_INT,
1236 nodeLeaders.data(), 1, MPI_INT,
1241 numNodes_ = std::count_if
1243 nodeLeaders.cbegin(),
1245 [](
int rank){ return (rank >= 0); }
1255 procIds.resize_nocopy(numNodes_);
1259 nodeLeaders.cbegin(),
1262 [](
int rank){ return (rank >= 0); }
1284 MPI_Request request;
1297 <<
"MPI_Ibarrier returned with error"
1315 <<
"MPI_Barrier returned with error"
1325 const int communicator,
1338 nullptr, 0, MPI_BYTE, toProcNo, tag,
1347 const int fromProcNo,
1348 const int communicator,
1357 else if (fromProcNo < 0)
1362 nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, tag,
1366 return status.MPI_SOURCE;
1372 nullptr, 0, MPI_BYTE, fromProcNo, tag,
1381std::pair<int,int64_t>
1385 const int fromProcNo,
1387 const int communicator
1390 std::pair<int,int64_t> result(-1, 0);
1398 const int source = (fromProcNo < 0) ? MPI_ANY_SOURCE : fromProcNo;
1422 <<
"MPI_Iprobe returned with error"
1445 <<
"MPI_Probe returned with error"
1458 MPI_Count num_recv(0);
1459 MPI_Get_elements_x(&status, MPI_BYTE, &num_recv);
1462 if (
FOAM_UNLIKELY(num_recv == MPI_UNDEFINED || int64_t(num_recv) < 0))
1465 <<
"MPI_Get_elements_x() : "
1466 "returned undefined or negative value"
1469 else if (
FOAM_UNLIKELY(int64_t(num_recv) > int64_t(INT_MAX)))
1472 <<
"MPI_Get_elements_x() : "
1473 "count is larger than INT_MAX bytes"
1478 result.first = status.MPI_SOURCE;
1479 result.second = int64_t(num_recv);
static void detachOurBuffers()
static int attachedBufLen
static void attachOurBuffers()
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi....
static constexpr int commSelf() noexcept
Communicator within the current rank only.
static std::pair< int, int64_t > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const int communicator=worldComm)
Probe for an incoming message.
static int wait_done(const int fromProcNo, const int communicator, const int tag=UPstream::msgType()+1970)
Impose a point-to-point synchronisation barrier by receiving a zero-byte "done" message from given ra...
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
commsTypes
Communications types.
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static bool parRun(const bool on) noexcept
Set as parallel run on/off.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
static bool initNull()
Special purpose initialisation function.
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
static int & msgType() noexcept
Message tag of standard messages.
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
static void send_done(const int toProcNo, const int communicator, const int tag=UPstream::msgType()+1970)
Impose a point-to-point synchronisation barrier by sending a zero-byte "done" message to given rank.
static void barrier(const int communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking).
static const int mpiBufferSize
MPI buffer-size (bytes).
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
static void abort(int errNo=1)
Call MPI_Abort with no other checks or cleanup.
static label splitCommunicator(const label parent, const int colour, const bool two_step=true)
Allocate a new communicator by splitting the parent communicator on the given colour.
static int nodeCommsControl_
Use of host/node topology-aware routines.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library adds/requires on the command line.
static constexpr int commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.
static label newCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Create new communicator with sub-ranks on the parent communicator.
static void beginTiming()
Update timer prior to measurement.
static void addRequestTime()
Add time increment to request time.
static void addProbeTime()
Add time increment to probe time.
A class for handling words, derived from Foam::string.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
#define WarningInFunction
Report a warning using Foam::Warning.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
DynamicList< bool > pendingMPIFree_
DynamicList< MPI_Comm > MPICommunicators_
void initOpCodes()
Create mapping into MPIopCodes_.
void printDataTypes(bool all=false)
Debugging only: print data type names (all or just user-defined).
void reset_request(UPstream::Request *req) noexcept
Reset UPstream::Request to MPI_REQUEST_NULL.
void deinitOpCodes()
Free any user-defined op codes.
void deinitDataTypes()
Free any user data types.
void initDataTypes()
Create mapping into MPIdataTypes_ and define user data types.
void initCommunicator(const label index)
Initialize bookkeeping for MPI communicator index.
Namespace for handling debugging switches.
string getEnv(const std::string &envName)
Get environment value for given envName.
bool read(const char *buf, int32_t &val)
Same as readInt32.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.
errorManip< error > abort(error &err)
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
constexpr char nl
The newline '\n' character (0x0a).
#define forAll(list, i)
Loop across all elements in list.
#define forAllReverse(list, i)
Reverse loop across all elements in list.
#define FOAM_UNLIKELY(cond)