Loading...
Searching...
No Matches
UPstream.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2016-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "UPstream.H"
30#include "PstreamGlobals.H"
31#include "profilingPstream.H"
32#include "UPstreamWrapping.H"
34
35#include <algorithm>
36#include <cstdlib>
37#include <cstring>
38#include <memory>
39#include <numeric>
40#include <string>
42// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
43
44// The min value and default for MPI buffer length
45constexpr int minBufLen = 20000000;
46
47// Track size of attached MPI buffer
48static int attachedBufLen = 0;
49
50// Track if we initialized MPI
51static bool ourMpi = false;
52
53
54// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
55
56// Attach user-defined send buffer
57static void attachOurBuffers()
58{
59#ifndef SGIMPI
61 {
62 return; // Already attached
63 }
64
65 // Use UPstream::mpiBufferSize (optimisationSwitch),
66 // but allow override with MPI_BUFFER_SIZE env variable (int value)
67
68 int len = 0;
69
70 const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
71 if (str.empty() || !Foam::read(str, len) || len <= 0)
72 {
74 }
75
76 if (len < minBufLen)
77 {
78 len = minBufLen;
79 }
80
81 char* buf = new char[len];
82
83 if (MPI_SUCCESS == MPI_Buffer_attach(buf, len))
84 {
85 // Properly attached
86 attachedBufLen = len;
87
88 if (Foam::UPstream::debug)
89 {
90 Foam::Perr<< "UPstream::init : buffer-size " << len << '\n';
91 }
92 }
93 else
94 {
95 delete[] buf;
96 Foam::Perr<< "UPstream::init : could not attach buffer\n";
97 }
98#endif
100
101
102// Remove an existing user-defined send buffer
103// IMPORTANT:
104// This operation will block until all messages currently in the
105// buffer have been transmitted.
106static void detachOurBuffers()
107{
108#ifndef SGIMPI
109 if (!attachedBufLen)
110 {
111 return; // Nothing to detach
112 }
113
114 // Some MPI notes suggest that the return code is MPI_SUCCESS when
115 // no buffer is attached.
116 // Be extra careful and require a non-zero size as well.
117
118 char* buf = nullptr;
119 int len = 0;
120
121 if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
122 {
123 // This was presumably the buffer that we attached
124 // and not someone else.
125 delete[] buf;
126 }
127
128 // Nothing attached
129 attachedBufLen = 0;
130#endif
131}
132
133
134// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
135
136// NOTE:
137// valid parallel options vary between implementations, but flag common ones.
138// if they are not removed by MPI_Init(), the subsequent argument processing
139// will notice that they are wrong
140void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
141{
142 validParOptions.insert("np", "");
143 validParOptions.insert("p4pg", "PI file");
144 validParOptions.insert("p4wd", "directory");
145 validParOptions.insert("p4amslave", "");
146 validParOptions.insert("p4yourname", "hostname");
147 validParOptions.insert("machinefile", "machine file");
148}
149
150
152{
153 int flag = 0;
154
155 MPI_Finalized(&flag);
156 if (flag)
157 {
158 // Already finalized - this is an error
160 << "MPI was already finalized - cannot perform MPI_Init\n"
162
163 return false;
164 }
165
166 MPI_Initialized(&flag);
167 if (flag)
168 {
169 if (UPstream::debug)
170 {
171 Perr<< "UPstream::initNull : was already initialized\n";
172 }
173 }
174 else
175 {
176 // Not already initialized
177
178 MPI_Init_thread
179 (
180 nullptr, // argc
181 nullptr, // argv
182 MPI_THREAD_SINGLE,
183 &flag // provided_thread_support
184 );
185
186 ourMpi = true;
187 }
188
189 // Could also attach buffers etc.
190
191 return true;
192}
193
194
195bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
196{
197 int flag = 0;
198 int provided_thread_support = 0;
199
200 MPI_Finalized(&flag);
201 if (flag)
202 {
203 // Already finalized - this is an error
205 << "MPI was already finalized - cannot perform MPI_Init" << endl
207
208 return false;
209 }
210
211 MPI_Initialized(&flag);
212 if (flag)
213 {
214 // Already initialized.
215 // Warn if we've called twice, but skip if initialized externally
216
217 if (ourMpi)
218 {
220 << "MPI was already initialized - cannot perform MPI_Init" << nl
221 << "This could indicate an application programming error!"
222 << endl;
223
224 return true;
225 }
226 else if (UPstream::debug)
227 {
228 Perr<< "UPstream::init : was already initialized\n";
229 }
230
231 MPI_Query_thread(&provided_thread_support);
232 }
233 else
234 {
235 // (SINGLE | FUNNELED | SERIALIZED | MULTIPLE)
236 int required_thread_support =
237 (
238 needsThread
239 ? MPI_THREAD_MULTIPLE
240 : MPI_THREAD_SINGLE
241 );
242
243 MPI_Init_thread
244 (
245 &argc,
246 &argv,
247 required_thread_support,
248 &provided_thread_support
249 );
250
251 ourMpi = true;
252 }
253
254 // Define data type mappings and user data types. Defined now so that
255 // any OpenFOAM Pstream operations may make immediate use of them.
258
259 if (UPstream::debug)
260 {
262 }
263
264
265 // Check argument list for any of the following:
266 // - local world
267 // -> Extract world name and filter out '-world <name>' from argv list
268 // - mpi-no-comm-dup option
269 // -> disable initial comm_dup and filter out the option
270 // - mpi-split-by-appnum option
271 // -> disable initial comm_dup, select split-by-appnum
272 // and filter out the option
273
274 // Default handling of initial MPI_Comm_dup(MPI_COMM_WORLD,...)
275 UPstream::noInitialCommDup_ = false;
276 bool split_by_appnum = false;
277
278 // Local world name
279 word worldName;
280
281 for (int argi = 1; argi < argc; ++argi)
282 {
283 const char *optName = argv[argi];
284 if (optName[0] != '-')
285 {
286 continue;
287 }
288 ++optName; // Looks like an option, skip leading '-'
289
290 if (strcmp(optName, "world") == 0)
291 {
292 if (argi+1 >= argc)
293 {
295 << "Missing world name for option '-world'" << nl
297 }
298 worldName = argv[argi+1];
299
300 // Remove two arguments (-world name)
301 for (int i = argi+2; i < argc; ++i)
302 {
303 argv[i-2] = argv[i];
304 }
305 argc -= 2;
306 --argi; // re-examine
307 }
308 else if (strcmp(optName, "mpi-no-comm-dup") == 0)
309 {
310 UPstream::noInitialCommDup_ = true;
311
312 // Remove one argument
313 for (int i = argi+1; i < argc; ++i)
314 {
315 argv[i-1] = argv[i];
316 }
317 --argc;
318 --argi; // re-examine
319 }
320 else if (strcmp(optName, "mpi-split-by-appnum") == 0)
321 {
322 split_by_appnum = true;
323 UPstream::noInitialCommDup_ = true;
324
325 // Remove one argument
326 for (int i = argi+1; i < argc; ++i)
327 {
328 argv[i-1] = argv[i];
329 }
330 --argc;
331 --argi; // re-examine
332 }
333 }
334
335 const bool hasLocalWorld(!worldName.empty());
336
337 if (hasLocalWorld && split_by_appnum)
338 {
340 << "Cannot specify both -world and -mpi-split-by-appnum" << nl
342 }
343
344 int numProcs = 0, globalRanki = 0;
345 MPI_Comm_rank(MPI_COMM_WORLD, &globalRanki);
346 MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
347
348 if (UPstream::debug)
349 {
350 Perr<< "UPstream::init :"
351 << " thread-support : requested:" << needsThread
352 << " provided:"
353 << (
354 (provided_thread_support == MPI_THREAD_SINGLE)
355 ? "SINGLE"
356 : (provided_thread_support == MPI_THREAD_SERIALIZED)
357 ? "SERIALIZED"
358 : (provided_thread_support == MPI_THREAD_MULTIPLE)
359 ? "MULTIPLE"
360 : "other"
361 )
362 << " procs:" << numProcs
363 << " rank:" << globalRanki
364 << " world:" << worldName << endl;
365 }
366
367 if (numProcs <= 1 && !(hasLocalWorld || split_by_appnum))
368 {
370 << "attempt to run parallel on 1 processor"
372 }
373
374 // Initialise parallel structure
375 setParRun(numProcs, provided_thread_support == MPI_THREAD_MULTIPLE);
376
377 if (hasLocalWorld)
378 {
379 // Using local worlds.
380 // During startup, so commWorld() == commGlobal()
381 const auto mpiGlobalComm =
383
384 // Gather the names of all worlds and determine unique names/indices.
385 //
386 // Minimize communication and use low-level MPI to avoid relying on any
387 // OpenFOAM structures which not yet have been created
388
389 {
390 // Include a trailing nul character in the lengths
391 int stride = int(worldName.size()) + 1;
392
393 // Use identical size on all ranks (avoids MPI_Allgatherv)
394 MPI_Allreduce
395 (
396 MPI_IN_PLACE,
397 &stride,
398 1,
399 MPI_INT,
400 MPI_MAX,
401 mpiGlobalComm
402 );
403
404 // Gather as an extended C-string with embedded nul characters
405 auto buffer_storage = std::make_unique<char[]>(numProcs*stride);
406 char* allStrings = buffer_storage.get();
407
408 // Fill in local value, slot starts at (rank*stride)
409 {
410 char* slot = (allStrings + (globalRanki*stride));
411 std::fill_n(slot, stride, '\0');
412 std::copy_n(worldName.data(), worldName.size(), slot);
413 }
414
415 // Gather everything into the extended C-string
416 MPI_Allgather
417 (
418 MPI_IN_PLACE, 0, MPI_CHAR,
419 allStrings, stride, MPI_CHAR,
420 mpiGlobalComm
421 );
422
423 worldIDs_.resize_nocopy(numProcs);
424
425 // Transcribe and compact (unique world names)
426 DynamicList<word> uniqWorlds(numProcs);
427
428 for (label proci = 0; proci < numProcs; ++proci)
429 {
430 // Create from C-string at slot=(rank*stride),
431 // relying on the embedded nul chars
432 word world(allStrings + (proci*stride));
433
434 worldIDs_[proci] = uniqWorlds.find(world);
435
436 if (worldIDs_[proci] == -1)
437 {
438 worldIDs_[proci] = uniqWorlds.size();
439 uniqWorlds.push_back(std::move(world));
440 }
441 }
442
443 allWorlds_ = std::move(uniqWorlds);
444 }
445
446 const label myWorldId = worldIDs_[globalRanki];
447
448 DynamicList<label> subRanks;
449 forAll(worldIDs_, proci)
450 {
451 if (worldIDs_[proci] == myWorldId)
452 {
453 subRanks.push_back(proci);
454 }
455 }
456
457 // New local-world communicator with comm-global as its parent.
458 // - the updated (const) world comm does not change after this.
459
460 UPstream::constWorldComm_ =
462
463 UPstream::worldComm = UPstream::constWorldComm_;
464 UPstream::warnComm = UPstream::constWorldComm_;
465
466 const int worldRanki = UPstream::myProcNo(UPstream::constWorldComm_);
467
468 // MPI_COMM_SELF : the processor number wrt the new world communicator
469 if (procIDs_[UPstream::commSelf()].size())
470 {
471 procIDs_[UPstream::commSelf()].front() = worldRanki;
472 }
473
474 // Name the old world communicator as '<openfoam:global>'
475 // - it is the inter-world communicator
476 if (MPI_COMM_NULL != mpiGlobalComm)
477 {
478 MPI_Comm_set_name(mpiGlobalComm, "<openfoam:global>");
479 }
480
481 const auto mpiWorldComm =
482 PstreamGlobals::MPICommunicators_[UPstream::constWorldComm_];
483
484 if (MPI_COMM_NULL != mpiWorldComm)
485 {
486 MPI_Comm_set_name(mpiWorldComm, ("world=" + worldName).data());
487 }
488
489 if (UPstream::debug)
490 {
491 // Check
492 int newRanki, newSize;
493 MPI_Comm_rank(mpiWorldComm, &newRanki);
494 MPI_Comm_size(mpiWorldComm, &newSize);
495
496 Perr<< "UPstream::init : in world:" << worldName
497 << " using local communicator:" << constWorldComm_
498 << " rank " << newRanki << " of " << newSize << endl;
499 }
500
501 // Override Pout prefix (move to setParRun?)
502 Pout.prefix() = '[' + worldName + '/' + Foam::name(worldRanki) + "] ";
503 Perr.prefix() = Pout.prefix();
504 }
505 else if (split_by_appnum)
506 {
507 // Splitting by APPNUM.
508 //
509 // During startup, so commWorld() == commGlobal() and both are
510 // guaranteed to be MPI_COMM_WORLD since the logic automatically
511 // sets UPstream::noInitialCommDup_ = true (ie, no MPI_Comm_dup)
512
513 const auto mpiGlobalComm =
515
516 int appNum(0);
517
518 {
519 void* val;
520 int flag;
521
522 MPI_Comm_get_attr(mpiGlobalComm, MPI_APPNUM, &val, &flag);
523 if (flag)
524 {
525 appNum = *static_cast<int*>(val);
526 }
527 else
528 {
529 appNum = 0;
530 Perr<< "UPstream::init : used -mpi-split-by-appnum"
531 " with a single application??" << endl;
532 }
533 }
534
535 // New world communicator with comm-global as its parent.
536 // - the updated (const) world comm does not change after this.
537
538 // Using MPI_APPNUM as the colour for splitting with MPI_Comm_split.
539 // Do **NOT** use Allgather+Comm_create_group two-step process here
540 // since other applications will not expect that (ie, deadlock)
541
542 UPstream::constWorldComm_ =
544
545 UPstream::worldComm = UPstream::constWorldComm_;
546 UPstream::warnComm = UPstream::constWorldComm_;
547
548 const int worldRanki = UPstream::myProcNo(UPstream::constWorldComm_);
549
550 // MPI_COMM_SELF : the processor number wrt the new world communicator
551 if (procIDs_[UPstream::commSelf()].size())
552 {
553 procIDs_[UPstream::commSelf()].front() = worldRanki;
554 }
555
556 // Name the old world communicator as '<openfoam:global>'
557 // - it is the inter-world communicator
558 if (MPI_COMM_NULL != mpiGlobalComm)
559 {
560 MPI_Comm_set_name(mpiGlobalComm, "<openfoam:global>");
561 }
562
563 const auto mpiWorldComm =
564 PstreamGlobals::MPICommunicators_[UPstream::constWorldComm_];
565
566 const word commName("app=" + Foam::name(appNum));
567
568 if (MPI_COMM_NULL != mpiWorldComm)
569 {
570 MPI_Comm_set_name(mpiWorldComm, commName.data());
571 }
572
573 if (UPstream::debug)
574 {
575 // Check
576 int newRanki, newSize;
577 MPI_Comm_rank(mpiWorldComm, &newRanki);
578 MPI_Comm_size(mpiWorldComm, &newSize);
579
580 Perr<< "UPstream::init : app:" << appNum
581 << " using local communicator:" << constWorldComm_
582 << " rank " << newRanki << " of " << newSize << endl;
583 }
584
585 // Override Pout prefix (move to setParRun?)
586 Pout.prefix() = '[' + commName + '/' + Foam::name(worldRanki) + "] ";
587 Perr.prefix() = Pout.prefix();
588 }
589 else
590 {
591 // All processors use world 0
592 worldIDs_.resize_nocopy(numProcs);
593 worldIDs_ = 0;
594
595 const auto mpiWorldComm =
596 PstreamGlobals::MPICommunicators_[UPstream::constWorldComm_];
597
598 // Name the world communicator as '<openfoam:world>'
599 if (MPI_COMM_NULL != mpiWorldComm)
600 {
601 MPI_Comm_set_name(mpiWorldComm, "<openfoam:world>");
602 }
603 }
604
605
606 // Define inter-node and intra-node communicators
608 {
609 // Debugging: split with given number per node
610 setHostCommunicators(UPstream::nodeCommsControl_);
611 }
612 #ifndef MSMPI_VER /* Uncertain if this would work with MSMPI */
613 else if (UPstream::nodeCommsControl_ == 2)
614 {
615 // Defined based on shared-memory hardware information
616 setSharedMemoryCommunicators();
617 }
618 #endif
619 else
620 {
621 // Defined based on hostname, even if nominally disabled
622 setHostCommunicators();
623 }
624
625
626 // Provide some names for these communicators
627 if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[commInterNode_])
628 {
629 MPI_Comm_set_name
630 (
631 PstreamGlobals::MPICommunicators_[commInterNode_],
632 "<openfoam:inter-node>"
633 );
634 }
635 if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[commLocalNode_])
636 {
637 MPI_Comm_set_name
638 (
639 PstreamGlobals::MPICommunicators_[commLocalNode_],
640 "<openfoam:local-node>"
641 );
642 }
643
645
646 return true;
647}
648
649
650void Foam::UPstream::shutdown(int errNo)
651{
652 int flag = 0;
653
654 MPI_Initialized(&flag);
655 if (!flag)
656 {
657 // MPI not initialized - we have nothing to do
658 return;
659 }
660
661 MPI_Finalized(&flag);
662 if (flag)
663 {
664 // MPI already finalized - we have nothing to do
665 if (ourMpi)
666 {
668 << "MPI was already finalized (by a connected program?)\n";
669 }
670 else if (UPstream::debug && errNo == 0)
671 {
672 Perr<< "UPstream::shutdown : was already finalized\n";
673 }
674 ourMpi = false;
675 return;
676 }
677
678 if (!ourMpi)
679 {
681 << "Finalizing MPI, but was initialized elsewhere\n";
682 }
683 ourMpi = false;
684
685
686 // Abort - stop now, without any final synchonization steps!
687 // -----
688
689 if (errNo != 0)
690 {
691 UPstream::abort(errNo);
692 return;
693 }
694
695
696 // Regular cleanup
697 // ---------------
698
699 if (UPstream::debug)
700 {
701 Perr<< "UPstream::shutdown\n";
702 }
703
704 // Check for any outstanding requests
705 {
706 label nOutstanding = 0;
707
708 for (MPI_Request request : PstreamGlobals::outstandingRequests_)
709 {
710 if (MPI_REQUEST_NULL != request)
711 {
712 // TBD: MPI_Cancel(&request); MPI_Request_free(&request);
713 ++nOutstanding;
714 }
715 }
716
717 if (nOutstanding)
718 {
720 << "Still have " << nOutstanding
721 << " outstanding MPI requests."
722 << " Should not happen for a normal code exit."
723 << endl;
724 }
725
727 }
728
729
730 {
732
733 forAllReverse(myProcNo_, communicator)
734 {
735 freeCommunicatorComponents(communicator);
736 }
737 }
738
739 // Free any user data types
742
743 MPI_Finalize();
744}
745
746
747void Foam::UPstream::exit(int errNo)
748{
749 UPstream::shutdown(errNo);
750 std::exit(errNo);
751}
752
753
754void Foam::UPstream::abort(int errNo)
755{
756 MPI_Comm abortComm = MPI_COMM_WORLD;
757
758 // TBD: only abort on our own communicator?
759 #if 0
760 const label index = UPstream::commGlobal();
761
762 if (index > 0 && index < PstreamGlobals::MPICommunicators_.size())
763 {
764 abortComm = PstreamGlobals::MPICommunicators_[index];
765 if (MPI_COMM_NULL == abortComm)
766 {
767 abortComm = MPI_COMM_WORLD;
768 }
769 }
770 #endif
771
772 MPI_Abort(abortComm, errNo);
773}
774
775
776// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
777
778void Foam::UPstream::allocateCommunicatorComponents
779(
780 const label parentIndex,
781 const label index
782)
783{
785
786 int returnCode = MPI_SUCCESS;
787
788 if (parentIndex == -1)
789 {
790 // Global communicator. Same as world communicator for single-world
791
792 if (index != UPstream::commGlobal())
793 {
795 << "base world communicator should always be index "
798 }
799 auto& mpiNewComm = PstreamGlobals::MPICommunicators_[index];
800
801 if (UPstream::noInitialCommDup_)
802 {
803 PstreamGlobals::pendingMPIFree_[index] = false;
804 PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
805 }
806 else
807 {
809 MPI_Comm_dup(MPI_COMM_WORLD, &mpiNewComm);
810 }
811
812 MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
813
814 // Set the number of ranks to the actual number
815 int numProcs = 0;
816 MPI_Comm_size(mpiNewComm, &numProcs);
817
818 // identity [0-numProcs], as 'int'
819 procIDs_[index].resize_nocopy(numProcs);
820 std::iota(procIDs_[index].begin(), procIDs_[index].end(), 0);
821 }
822 else if (parentIndex == -2)
823 {
824 // MPI_COMM_SELF
825
826 PstreamGlobals::pendingMPIFree_[index] = false;
827 PstreamGlobals::MPICommunicators_[index] = MPI_COMM_SELF;
828
829 MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
830
831 // For MPI_COMM_SELF : the process IDs within the world communicator.
832 // Uses MPI_COMM_WORLD in case called before UPstream::commGlobal()
833 // was initialized
834
835 procIDs_[index].resize_nocopy(1);
836 MPI_Comm_rank(MPI_COMM_WORLD, &procIDs_[index].front());
837 }
838 else
839 {
840 // General sub-communicator.
841 // Create based on the groupings predefined by procIDs_
842
843 const auto mpiParentComm =
845
846 auto& mpiNewComm =
848
850
851 // Starting from parent
852 MPI_Group parent_group;
853 MPI_Comm_group(mpiParentComm, &parent_group);
854
855 MPI_Group active_group;
856 MPI_Group_incl
857 (
858 parent_group,
859 procIDs_[index].size(),
860 procIDs_[index].cdata(),
861 &active_group
862 );
863
864 #if defined(MSMPI_VER)
865 // ms-mpi (10.0 and others?) does not have MPI_Comm_create_group
866 MPI_Comm_create
867 (
868 mpiParentComm,
869 active_group,
870 &mpiNewComm
871 );
872 #else
873 // Create new communicator for this group
874 MPI_Comm_create_group
875 (
876 mpiParentComm,
877 active_group,
879 &mpiNewComm
880 );
881 #endif
882
883 // Groups not needed after this...
884 MPI_Group_free(&parent_group);
885 MPI_Group_free(&active_group);
886
887 if (MPI_COMM_NULL == mpiNewComm)
888 {
889 // This process is not involved in the new communication pattern
890 myProcNo_[index] = -1;
891 PstreamGlobals::pendingMPIFree_[index] = false;
892
893 // ~~~~~~~~~
894 // IMPORTANT
895 // ~~~~~~~~~
896 // Always retain knowledge of the inter-node leaders,
897 // even if this process is not on that communicator.
898 // This will help when constructing topology-aware communication.
899
900 if (index != commInterNode_)
901 {
902 procIDs_[index].clear();
903 }
904 }
905 else
906 {
907 returnCode = MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
908
909 if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
910 {
912 << "Problem :"
913 << " when allocating communicator at " << index
914 << " from ranks " << flatOutput(procIDs_[index])
915 << " of parent " << parentIndex
916 << " cannot find my own rank"
918 }
919 }
920 }
921}
922
923
924void Foam::UPstream::dupCommunicatorComponents
925(
926 const label parentIndex,
927 const label index
928)
929{
931
933 MPI_Comm_dup
934 (
937 );
938
939 myProcNo_[index] = myProcNo_[parentIndex];
940 procIDs_[index] = procIDs_[parentIndex];
941}
942
943
944void Foam::UPstream::splitCommunicatorComponents
945(
946 const label parentIndex,
947 const label index,
948 int colour,
949 const bool two_step
950)
951{
953
954 // ------------------------------------------------------------------------
955 // Create sub-communicator according to its colouring
956 // => MPI_Comm_split().
957 // Since other parts of OpenFOAM may still need a view of the siblings:
958 // => MPI_Group_translate_ranks().
959 //
960 // The MPI_Group_translate_ranks() step can be replaced with an
961 // MPI_Allgather() of the involved parent ranks (since we alway maintain
962 // the relative rank order when splitting).
963 //
964 // Since MPI_Comm_split() already does an MPI_Allgather() internally
965 // to pick out the colours (and do any sorting), we can simply
966 // do the same thing:
967 //
968 // Do the Allgather first and pickout identical colours to define the
969 // group and create a communicator based on that.
970 //
971 // This is no worse than the Allgather communication overhead of using
972 // MPI_Comm_split() and saves the extra translate_ranks step.
973 // ------------------------------------------------------------------------
974
975 const auto mpiParentComm = PstreamGlobals::MPICommunicators_[parentIndex];
976
977 int parentRank = 0;
978 int parentSize = 0;
979 MPI_Comm_rank(mpiParentComm, &parentRank);
980 MPI_Comm_size(mpiParentComm, &parentSize);
981
982 auto& procIds = procIDs_[index];
983 myProcNo_[index] = -1;
984
985 if (two_step)
986 {
987 // First gather the colours
988 procIds.resize_nocopy(parentSize);
989 procIds[parentRank] = colour;
990
991 MPI_Allgather
992 (
993 MPI_IN_PLACE, 0, MPI_INT,
994 procIds.data(), 1, MPI_INT,
995 mpiParentComm
996 );
997
998 if (colour < 0)
999 {
1000 // Not involved
1001 procIds.clear();
1002 }
1003 else
1004 {
1005 // Select ranks based on the matching colour
1006 int nranks = 0;
1007 for (int i = 0; i < parentSize; ++i)
1008 {
1009 if (procIds[i] == colour)
1010 {
1011 procIds[nranks++] = i;
1012 }
1013 }
1014 procIds.resize(nranks);
1015 }
1016
1017 allocateCommunicatorComponents(parentIndex, index);
1018 }
1019 else
1020 {
1021 auto& mpiNewComm = PstreamGlobals::MPICommunicators_[index];
1022
1023 MPI_Comm_split
1024 (
1025 mpiParentComm,
1026 (colour >= 0 ? colour : MPI_UNDEFINED),
1027 0, // maintain relative ordering
1028 &mpiNewComm
1029 );
1030
1031 if (MPI_COMM_NULL == mpiNewComm)
1032 {
1033 // Not involved
1034 PstreamGlobals::pendingMPIFree_[index] = false;
1035 procIds.clear();
1036 }
1037 else
1038 {
1039 PstreamGlobals::pendingMPIFree_[index] = true;
1040 MPI_Comm_rank(mpiNewComm, &myProcNo_[index]);
1041
1042 // Starting from parent
1043 MPI_Group parent_group;
1044 MPI_Comm_group(mpiParentComm, &parent_group);
1045
1046 MPI_Group new_group;
1047 MPI_Comm_group(mpiNewComm, &new_group);
1048
1049 // Parent ranks: identity map
1050 List<int> parentIds(parentSize);
1051 std::iota(parentIds.begin(), parentIds.end(), 0);
1052
1053 // New ranks:
1054 procIds.resize_nocopy(parentSize);
1055 procIds = -1; // Some extra safety...
1056
1057 MPI_Group_translate_ranks
1058 (
1059 parent_group, parentSize, parentIds.data(),
1060 new_group, procIds.data()
1061 );
1062
1063 // Groups not needed after this...
1064 MPI_Group_free(&parent_group);
1065 MPI_Group_free(&new_group);
1066
1067 // The corresponding ranks.
1068 // - since old ranks are an identity map, can just use position
1069
1070 int nranks = 0;
1071 for (int i = 0; i < parentSize; ++i)
1072 {
1073 // Exclude MPI_UNDEFINED and MPI_PROC_NULL etc...
1074 if (procIds[i] >= 0 && procIds[i] < parentSize)
1075 {
1076 procIds[nranks++] = i;
1077 }
1078 }
1079 procIds.resize(nranks);
1080 }
1081 }
1082}
1083
1084
1085void Foam::UPstream::freeCommunicatorComponents(const label index)
1086{
1087 if (UPstream::debug)
1088 {
1089 Perr<< "freeCommunicatorComponents: " << index
1090 << " from " << PstreamGlobals::MPICommunicators_.size() << endl;
1091 }
1092
1093 // Only free communicators that we have specifically allocated ourselves
1094 //
1095 // Bounds checking needed since there are no UPstream communicator indices
1096 // when MPI is initialized outside of OpenFOAM
1097
1098 if
1099 (
1100 (index >= 0 && index < PstreamGlobals::MPICommunicators_.size())
1102 )
1103 {
1104 PstreamGlobals::pendingMPIFree_[index] = false;
1105
1106 // Free communicator. Sets communicator to MPI_COMM_NULL
1107 if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[index])
1108 {
1109 MPI_Comm_free(&PstreamGlobals::MPICommunicators_[index]);
1110 }
1111 }
1112}
1113
1114
1115bool Foam::UPstream::setSharedMemoryCommunicators()
1116{
1117 // Uses the world communicator (not global communicator)
1118
1119 // Skip if non-parallel
1120 if (!UPstream::parRun())
1121 {
1122 numNodes_ = 1;
1123 return false;
1124 }
1125
1126 if (FOAM_UNLIKELY(commInterNode_ >= 0 || commLocalNode_ >= 0))
1127 {
1128 // Failed sanity check
1130 << "Node communicator(s) already created!" << endl
1132 return false;
1133 }
1134
1135 commInterNode_ = getAvailableCommIndex(constWorldComm_);
1136 commLocalNode_ = getAvailableCommIndex(constWorldComm_);
1137
1138 PstreamGlobals::initCommunicator(commInterNode_);
1139 PstreamGlobals::initCommunicator(commLocalNode_);
1140
1141 // Overwritten later
1142 myProcNo_[commInterNode_] = UPstream::masterNo();
1143 myProcNo_[commLocalNode_] = UPstream::masterNo();
1144
1145 // Sorted order, purely cosmetic
1146 if (commLocalNode_ < commInterNode_)
1147 {
1148 std::swap(commLocalNode_, commInterNode_);
1149 }
1150
1151 if (debug)
1152 {
1153 Perr<< "Allocating node communicators "
1154 << commInterNode_ << ", " << commLocalNode_ << nl
1155 << " parent : " << constWorldComm_ << nl
1156 << endl;
1157 }
1158
1159
1160 const auto mpiParentComm =
1161 PstreamGlobals::MPICommunicators_[constWorldComm_];
1162
1163 auto& mpiLocalNode =
1164 PstreamGlobals::MPICommunicators_[commLocalNode_];
1165
1166 int parentRank = 0;
1167 int parentSize = 0;
1168 MPI_Comm_rank(mpiParentComm, &parentRank);
1169 MPI_Comm_size(mpiParentComm, &parentSize);
1170
1171 List<int> nodeLeaders(parentSize);
1172 nodeLeaders = -1;
1173
1174 MPI_Comm_split_type
1175 (
1176 mpiParentComm,
1177 MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
1178 &mpiLocalNode
1179 );
1180
1181 if (FOAM_UNLIKELY(MPI_COMM_NULL == mpiLocalNode))
1182 {
1183 // This process is not involved in an intra-host communication?
1184 // - should never happen!
1185
1186 const label index = commLocalNode_;
1187 PstreamGlobals::pendingMPIFree_[index] = false;
1188
1189 myProcNo_[index] = -1;
1190 procIDs_[index].clear();
1191
1193 << "Comm_split_type(shared) failed\n"
1195 }
1196 else
1197 {
1198 // This process is involved in intra-host communication
1199 const label index = commLocalNode_;
1200 auto& procIds = procIDs_[index];
1201
1202 PstreamGlobals::pendingMPIFree_[index] = true;
1203
1204 int localRank = 0;
1205 int localSize = 0;
1206 MPI_Comm_rank(mpiLocalNode, &localRank);
1207 MPI_Comm_size(mpiLocalNode, &localSize);
1208
1209 if (localRank == 0)
1210 {
1211 // This process is a host leader - mark its position
1212 nodeLeaders[parentRank] = parentRank;
1213 }
1214
1215 procIds.resize_nocopy(localSize);
1216 procIds[localRank] = UPstream::myProcNo(UPstream::constWorldComm_);
1217 // OR: procIds[localRank] = parentRank;
1218
1219 // Get all of the siblings (within the node)
1220 MPI_Allgather
1221 (
1222 MPI_IN_PLACE, 0, MPI_INT,
1223 procIds.data(), 1, MPI_INT,
1224 mpiLocalNode
1225 );
1226 }
1227
1228
1229 // Get all of the host-leader information and find who they are.
1230 {
1231 auto& procIds = procIDs_[commInterNode_];
1232
1233 MPI_Allgather
1234 (
1235 MPI_IN_PLACE, 0, MPI_INT,
1236 nodeLeaders.data(), 1, MPI_INT,
1237 mpiParentComm
1238 );
1239
1240 // Capture the size (number of nodes) before doing anything further
1241 numNodes_ = std::count_if
1242 (
1243 nodeLeaders.cbegin(),
1244 nodeLeaders.cend(),
1245 [](int rank){ return (rank >= 0); }
1246 );
1247
1248 // ~~~~~~~~~
1249 // IMPORTANT
1250 // ~~~~~~~~~
1251 // Always retain knowledge of the inter-node leaders,
1252 // even if this process is not on that communicator.
1253 // This will help when constructing topology-aware communication.
1254
1255 procIds.resize_nocopy(numNodes_);
1256
1257 std::copy_if
1258 (
1259 nodeLeaders.cbegin(),
1260 nodeLeaders.cend(),
1261 procIds.begin(),
1262 [](int rank){ return (rank >= 0); }
1263 );
1264 }
1265
1266 // From master to host-leader. Ranks between hosts.
1267 allocateCommunicatorComponents(UPstream::worldComm, commInterNode_);
1268
1269 return true;
1270}
1271
1272
1273void Foam::UPstream::barrier(const int communicator, UPstream::Request* req)
1274{
1275 // No-op for non-parallel or not on communicator
1276 if (!UPstream::is_parallel(communicator))
1277 {
1279 return;
1280 }
1281
1282 if (req)
1283 {
1284 MPI_Request request;
1285
1286 // Non-blocking
1287 if
1288 (
1289 MPI_Ibarrier
1290 (
1292 &request
1293 )
1294 )
1295 {
1297 << "MPI_Ibarrier returned with error"
1299 }
1300
1301 *req = UPstream::Request(request);
1302 }
1303 else
1304 {
1305 // Blocking
1306 if
1307 (
1308 MPI_Barrier
1309 (
1311 )
1312 )
1313 {
1315 << "MPI_Barrier returned with error"
1317 }
1318 }
1319}
1320
1321
1323(
1324 const int toProcNo,
1325 const int communicator,
1326 const int tag // Message tag (must match on receiving side)
1327)
1328{
1329 if (!UPstream::is_parallel(communicator))
1330 {
1331 // Nothing to do
1332 return;
1333 }
1334
1335 {
1336 MPI_Send
1337 (
1338 nullptr, 0, MPI_BYTE, toProcNo, tag,
1340 );
1341 }
1342}
1343
1344
1346(
1347 const int fromProcNo,
1348 const int communicator,
1349 const int tag // Message tag (must match on sending side)
1350)
1351{
1352 if (!UPstream::is_parallel(communicator))
1353 {
1354 // Nothing to do
1355 return -1;
1356 }
1357 else if (fromProcNo < 0)
1358 {
1359 MPI_Status status;
1360 MPI_Recv
1361 (
1362 nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, tag,
1364 &status
1365 );
1366 return status.MPI_SOURCE;
1367 }
1368 else
1369 {
1370 MPI_Recv
1371 (
1372 nullptr, 0, MPI_BYTE, fromProcNo, tag,
1374 MPI_STATUS_IGNORE
1375 );
1376 return fromProcNo;
1377 }
1378}
1379
1380
1381std::pair<int,int64_t>
1383(
1384 const UPstream::commsTypes commsType,
1385 const int fromProcNo,
1386 const int tag,
1387 const int communicator
1388)
1389{
1390 std::pair<int,int64_t> result(-1, 0);
1391
1392 // No-op for non-parallel or not on communicator
1393 if (!UPstream::is_parallel(communicator))
1394 {
1395 return result;
1396 }
1397
1398 const int source = (fromProcNo < 0) ? MPI_ANY_SOURCE : fromProcNo;
1399 // Supporting MPI_ANY_TAG is not particularly useful...
1400
1401 int flag = 0;
1402 MPI_Status status;
1403
1404 if (UPstream::commsTypes::nonBlocking == commsType)
1405 {
1406 // Non-blocking
1408
1409 if
1410 (
1411 MPI_Iprobe
1412 (
1413 source,
1414 tag,
1416 &flag,
1417 &status
1418 )
1419 )
1420 {
1422 << "MPI_Iprobe returned with error"
1424 }
1425
1427 }
1428 else
1429 {
1430 // Blocking
1432
1433 if
1434 (
1435 MPI_Probe
1436 (
1437 source,
1438 tag,
1440 &status
1441 )
1442 )
1443 {
1445 << "MPI_Probe returned with error"
1447 }
1448
1450 flag = 1;
1451 }
1452
1453 if (flag)
1454 {
1455 // Unlikely to be used with large amounts of data,
1456 // but use MPI_Get_elements_x() instead of MPI_Count() anyhow
1457
1458 MPI_Count num_recv(0);
1459 MPI_Get_elements_x(&status, MPI_BYTE, &num_recv);
1460
1461 // Errors
1462 if (FOAM_UNLIKELY(num_recv == MPI_UNDEFINED || int64_t(num_recv) < 0))
1463 {
1465 << "MPI_Get_elements_x() : "
1466 "returned undefined or negative value"
1468 }
1469 else if (FOAM_UNLIKELY(int64_t(num_recv) > int64_t(INT_MAX)))
1470 {
1472 << "MPI_Get_elements_x() : "
1473 "count is larger than INT_MAX bytes"
1475 }
1476
1477
1478 result.first = status.MPI_SOURCE;
1479 result.second = int64_t(num_recv);
1480 }
1481
1482 return result;
1483}
1484
1485
1486// ************************************************************************* //
static void detachOurBuffers()
Definition UPstream.C:99
static bool ourMpi
Definition UPstream.C:44
static int attachedBufLen
Definition UPstream.C:41
constexpr int minBufLen
Definition UPstream.C:38
static void attachOurBuffers()
Definition UPstream.C:50
Functions to wrap MPI_Bcast, MPI_Allreduce, MPI_Iallreduce etc.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
An opaque wrapper for MPI_Request with a vendor-independent representation without any <mpi....
Definition UPstream.H:2919
static constexpr int commSelf() noexcept
Communicator within the current rank only.
Definition UPstream.H:1088
static std::pair< int, int64_t > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const int communicator=worldComm)
Probe for an incoming message.
Definition UPstream.C:132
static int wait_done(const int fromProcNo, const int communicator, const int tag=UPstream::msgType()+1970)
Impose a point-to-point synchronisation barrier by receiving a zero-byte "done" message from given ra...
Definition UPstream.C:120
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition UPstream.H:1706
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition UPstream.C:40
commsTypes
Communications types.
Definition UPstream.H:81
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static bool parRun(const bool on) noexcept
Set as parallel run on/off.
Definition UPstream.H:1669
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition UPstream.H:1074
static bool initNull()
Special purpose initialisation function.
Definition UPstream.C:30
static void shutdown(int errNo=0)
Shutdown (finalize) MPI as required.
Definition UPstream.C:57
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition UPstream.H:1691
static void send_done(const int toProcNo, const int communicator, const int tag=UPstream::msgType()+1970)
Impose a point-to-point synchronisation barrier by sending a zero-byte "done" message to given rank.
Definition UPstream.C:111
static void barrier(const int communicator, UPstream::Request *req=nullptr)
Impose a synchronisation barrier (optionally non-blocking).
Definition UPstream.C:106
static const int mpiBufferSize
MPI buffer-size (bytes).
Definition UPstream.H:1060
static void exit(int errNo=1)
Shutdown (finalize) MPI as required and exit program with errNo.
Definition UPstream.C:61
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition UPstream.H:1743
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition UPstream.H:1069
static void abort(int errNo=1)
Call MPI_Abort with no other checks or cleanup.
Definition UPstream.C:68
static label splitCommunicator(const label parent, const int colour, const bool two_step=true)
Allocate a new communicator by splitting the parent communicator on the given colour.
Definition UPstream.C:439
static int nodeCommsControl_
Use of host/node topology-aware routines.
Definition UPstream.H:995
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library adds/requires on the command line.
Definition UPstream.C:26
static constexpr int commGlobal() noexcept
Communicator for all ranks, irrespective of any local worlds.
Definition UPstream.H:1081
static label newCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Create new communicator with sub-ranks on the parent communicator.
Definition UPstream.C:272
static void beginTiming()
Update timer prior to measurement.
static void addRequestTime()
Add time increment to request time.
static void addProbeTime()
Add time increment to probe time.
A class for handling words, derived from Foam::string.
Definition word.H:66
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
#define WarningInFunction
Report a warning using Foam::Warning.
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
DynamicList< bool > pendingMPIFree_
DynamicList< MPI_Comm > MPICommunicators_
void initOpCodes()
Create mapping into MPIopCodes_.
void printDataTypes(bool all=false)
Debugging only: print data type names (all or just user-defined).
void reset_request(UPstream::Request *req) noexcept
Reset UPstream::Request to MPI_REQUEST_NULL.
void deinitOpCodes()
Free any user-defined op codes.
void deinitDataTypes()
Free any user data types.
void initDataTypes()
Create mapping into MPIdataTypes_ and define user data types.
void initCommunicator(const label index)
Initialize bookkeeping for MPI communicator index.
const char * end
Definition SVGTools.H:223
Namespace for handling debugging switches.
Definition debug.C:45
string getEnv(const std::string &envName)
Get environment value for given envName.
Definition POSIX.C:341
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition int32.H:127
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
FlatOutput::OutputAdaptor< Container, Delimiters > flatOutput(const Container &obj, Delimiters delim)
Global flatOutput() function with specified output delimiters.
Definition FlatOutput.H:217
errorManip< error > abort(error &err)
Definition errorManip.H:139
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
Definition exprTraits.C:127
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50
#define forAll(list, i)
Loop across all elements in list.
Definition stdFoam.H:299
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition stdFoam.H:315
#define FOAM_UNLIKELY(cond)
Definition stdFoam.H:64