Loading...
Searching...
No Matches
collatedFileOperation.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2020-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
31#include "Pstream.H"
32#include "Time.H"
34#include "decomposedBlockData.H"
35#include "registerSwitch.H"
36#include "masterOFstream.H"
37#include "OFstream.H"
38#include "foamVersion.H"
39#include "UPstreamFile.H"
41/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
42
43namespace Foam
44{
45namespace fileOperations
46{
49 (
52 word
53 );
55 (
58 comm
59 );
60
62 (
63 debug::floatOptimisationSwitch("maxThreadFileBufferSize", 0)
64 );
66 (
67 "maxThreadFileBufferSize",
68 float,
70 );
71
72 // Threaded MPI: depending on buffering
74 (
77 word,
78 collated
79 );
80}
82
83
85(
86 Foam::debug::optimisationSwitch("collated.backend", 0)
87);
89(
90 "collated.backend",
91 int,
93);
94
95
96// * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
97
99(
100 const bool withRanks
101) const
102{
104 << "I/O : " << this->type();
105
106 if
107 (
108 collatedFileOperation::backend_ == backendTypes::BACKEND_MPI_IO
110 )
111 {
112 DetailInfo<< " [mpi/io]" << nl;
113 }
115 {
116 // FUTURE: deprecate or remove threading?
118 << " [threaded] (maxThreadFileBufferSize = "
119 << maxThreadFileBufferSize << ")." << nl
120 << " Requires buffer large enough to collect all data"
121 " or MPI thread support." << nl
122 << " To avoid MPI threading [slow], set"
123 " (maxThreadFileBufferSize = 0) in" << nl
124 << " OpenFOAM etc/controlDict" << endl;
125 }
126 else
127 {
129 << " [unthreaded] (maxThreadFileBufferSize = 0)." << nl;
130
132 {
134 << " With scheduled transfer" << nl;
135 }
136 else if (maxMasterFileBufferSize >= 1)
137 {
139 << " With non-blocking transfer,"
140 " buffer-size = " << maxMasterFileBufferSize << nl;
141 }
142 else
143 {
145 << " With non-blocking transfer,"
146 " minimal buffer size" << nl;
147 }
148 }
149
150 if (withRanks)
151 {
153 }
154
155 //- fileModificationChecking already set by base class (masterUncollated)
156 // if (IOobject::fileModificationChecking == IOobject::timeStampMaster)
157 // {
158 // WarningInFunction
159 // << "Resetting fileModificationChecking to timeStamp" << endl;
160 // }
161 // else if (IOobject::fileModificationChecking == IOobject::inotifyMaster)
162 // {
163 // WarningInFunction
164 // << "Resetting fileModificationChecking to inotify" << endl;
165 // }
166}
167
168
170(
171 const regIOobject& io,
172 const fileName& pathName,
173 IOstreamOption streamOpt
174) const
175{
176 // Append to processorsNN/ file
177
178 const label proci = fileOperation::detectProcessorPath(io.objectPath());
179
180 if (debug)
181 {
182 Pout<< "collatedFileOperation::writeObject :"
183 << " For local object : " << io.name()
184 << " appending processor " << proci
185 << " data to " << pathName << endl;
186 }
187 if (proci == -1)
188 {
190 << "Invalid processor path: " << pathName
191 << exit(FatalError);
192 }
193
194 const bool isIOmaster = fileOperation::isIOrank(proci);
195
196 // Update meta-data for current state
197 if (isIOmaster)
198 {
199 const_cast<regIOobject&>(io).updateMetaData();
200 }
201
202 // Note: cannot do append + compression. This is a limitation
203 // of ogzstream (or rather most compressed formats)
204 //
205 // File should always be created as non-atomic
206 // (consistency between append/non-append)
207
208 OFstream os
209 (
210 pathName,
211 // UNCOMPRESSED (binary only)
212 IOstreamOption(IOstreamOption::BINARY, streamOpt.version()),
213 // Append on sub-ranks
215 );
216
217 if (!os.good())
218 {
220 << "Cannot open for appending"
221 << exit(FatalIOError);
222 }
223
224 if (isIOmaster)
225 {
227 }
228
229 std::streamoff blockOffset = decomposedBlockData::writeBlockEntry
230 (
231 os,
232 streamOpt,
233 io,
234 proci,
235 isIOmaster // With FoamFile header on master
236 );
237
238 return (blockOffset >= 0) && os.good();
239}
240
241
242// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
243
244namespace Foam
245{
246
247// Construction helper: self/world/local communicator and IO ranks
249{
250 // Default is COMM_WORLD (single master)
251 Tuple2<label, labelList> commAndIORanks
252 (
255 );
256
257 if (UPstream::parRun() && commAndIORanks.second().size() > 1)
258 {
259 // Multiple masters: ranks for my IO range
260 commAndIORanks.first() = UPstream::newCommunicator
261 (
263 fileOperation::subRanks(commAndIORanks.second())
264 );
265 }
266
267 return commAndIORanks;
268}
269
270} // End namespace Foam
271
272
273// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
274
275void Foam::fileOperations::collatedFileOperation::init(bool verbose)
276{
277 verbose = (verbose && Foam::infoDetailLevel > 0);
278
279 if (verbose)
280 {
281 this->printBanner(ioRanks_.size());
283}
284
285
287(
288 bool verbose
289)
290:
291 masterUncollatedFileOperation
292 (
294 false, // distributedRoots
295 false // verbose
296 ),
297 managedComm_(getManagedComm(comm_)), // Possibly locally allocated
298 writer_(mag(maxThreadFileBufferSize), comm_)
299{
300 init(verbose);
301}
302
303
305(
306 const Tuple2<label, labelList>& commAndIORanks,
307 const bool distributedRoots,
308 bool verbose
309)
310:
312 (
313 commAndIORanks,
314 distributedRoots,
315 false // verbose
316 ),
317 managedComm_(-1), // Externally managed
318 writer_(mag(maxThreadFileBufferSize), comm_)
319{
320 init(verbose);
321}
322
323
325{
326 // From externally -> locally managed
327 managedComm_ = getManagedComm(comm_);
328}
330
331// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
332
334{
335 // Wait for any outstanding file operations
336 flush();
337
338 UPstream::freeCommunicator(managedComm_);
339}
341
342// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
343
345(
346 const IOobject& io,
347 const word& typeName
348) const
349{
350 // Replacement for objectPath
351 if (io.time().processorCase())
352 {
354 (
355 io,
357 "dummy", // not used for processorsobject
358 io.instance()
359 );
360 }
361 else
362 {
364 (
365 io,
368 io.instance()
369 );
370 }
371}
372
373
374// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
375
376bool Foam::fileOperations::collatedFileOperation::writeObject_legacy
377(
378 const fileName& pathName,
379 const regIOobject& io,
380 IOstreamOption streamOpt,
381 const bool writeOnProc
382) const
383{
384 const Time& tm = io.time();
385 const fileName& inst = io.instance();
386
387 if
388 (
389 (inst.isAbsolute() || !tm.processorCase())
390 || (io.global() || io.globalObject())
391 || (!UPstream::is_parallel(comm_))
392 )
393 {
395 << "Should not have been called for any of these conditions:"
396 << " - isAbsolute" << nl
397 << " - not processorCase" << nl
398 << " - global or globalObject" << nl
399 << " - not parallel" << nl
400 << abort(FatalError);
401
402 return false;
403 }
404 else
405 {
406 // Re-check static maxThreadFileBufferSize variable to see
407 // if needs to use threading
408 const bool useThread = (Foam::mag(maxThreadFileBufferSize) > 1);
409
410 if (debug)
411 {
412 Pout<< "collatedFileOperation::writeObject :"
413 << " For object : " << io.name()
414 << " starting collating output to " << pathName
415 << " useThread:" << useThread << endl;
416 }
417
418 if (!useThread)
419 {
420 writer_.waitAll();
421 }
422
423 // Note: currently still NON_ATOMIC (Dec-2022)
424 threadedCollatedOFstream os
425 (
426 writer_,
427 pathName,
428 streamOpt,
429 useThread
430 );
431
432 bool ok = os.good();
433
434 if (UPstream::master(comm_))
435 {
436 // Suppress comment banner
437 const bool old = IOobject::bannerEnabled(false);
438
439 ok = ok && io.writeHeader(os);
440
442
443 // Additional header content
446 (
447 dict,
448 streamOpt,
449 io
450 );
451 os.setHeaderEntries(dict);
452 }
453
454 ok = ok && io.writeData(os);
455 // No end divider for collated output
456
457 return ok;
458 }
459}
461
462// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
463
465(
466 const regIOobject& io,
467 IOstreamOption streamOpt,
468 const bool writeOnProc
469) const
470{
471 const Time& tm = io.time();
472 const fileName& inst = io.instance();
473
474 // Update meta-data for current state
475 const_cast<regIOobject&>(io).updateMetaData();
476
477 if (inst.isAbsolute() || !tm.processorCase())
478 {
479 // Note: delay mkdir to masterOFstream so it does not get created
480 // if not needed (e.g. when running distributed)
481
482 const fileName pathName(io.objectPath());
483
484 if (debug)
485 {
486 Pout<< "collatedFileOperation::writeObject :"
487 << " For object : " << io.name()
488 << " falling back to master-only output to " << io.path()
489 << endl;
490 }
491
492 // Note: currently still NON_ATOMIC (Dec-2022)
494 (
495 comm_,
496 pathName,
497 streamOpt,
499 writeOnProc
500 );
501
502 // If any of these fail, return
503 // (leave error handling to Ostream class)
504
505 const bool ok =
506 (
507 os.good()
508 && io.writeHeader(os)
509 && io.writeData(os)
510 );
511
512 if (ok)
513 {
515 }
516
517 return ok;
518 }
519 else
520 {
521 // Construct the equivalent processors/ directory
523
524 // Note: delay mkdir to masterOFstream so it does not get created
525 // if not needed (e.g. when running distributed)
526
527 const fileName pathName(path/io.name());
528
529 if (io.global() || io.globalObject())
530 {
531 if (debug)
532 {
533 Pout<< "collatedFileOperation::writeObject :"
534 << " For global object : " << io.name()
535 << " falling back to master-only output to " << pathName
536 << endl;
537 }
538
539 // Note: currently still NON_ATOMIC (Dec-2022)
541 (
542 comm_,
543 pathName,
544 streamOpt,
546 writeOnProc
547 );
548
549 // If any of these fail, return
550 // (leave error handling to Ostream class)
551
552 const bool ok =
553 (
554 os.good()
555 && io.writeHeader(os)
556 && io.writeData(os)
557 );
558
559 if (ok)
560 {
562 }
563
564 return ok;
565 }
566 else if (!UPstream::parRun())
567 {
568 // Special path for e.g. decomposePar. Append to
569 // processorsDDD/ file
570 if (debug)
571 {
572 Pout<< "collatedFileOperation::writeObject :"
573 << " For object : " << io.name()
574 << " appending to " << pathName << endl;
575 }
576
577 mkDir(path);
578 return appendObject(io, pathName, streamOpt);
579 }
580 else
581 {
582 if
583 (
584 collatedFileOperation::backend_ == backendTypes::BACKEND_MPI_IO
586 )
587 {
588 return writeObject_mpiio
589 (
590 pathName,
591 io,
592 streamOpt,
593 writeOnProc
594 );
595 }
596 else
597 {
598 return writeObject_legacy
599 (
600 pathName,
601 io,
602 streamOpt,
603 writeOnProc
604 );
605 }
606 }
608}
609
610
612{
613 if (debug)
614 {
615 Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
616 << endl;
617 }
619 // Wait for thread to finish (note: also removes thread)
620 writer_.waitAll();
621}
622
623
625(
626 const fileName& fName
627) const
628{
629 if (UPstream::parRun())
630 {
631 const auto& procs = UPstream::procID(comm_);
632
633 word procDir(processorsBaseDir+Foam::name(nProcs_));
634
635 if (procs.size() != nProcs_)
636 {
637 procDir +=
638 + "_"
639 + Foam::name(procs.first())
640 + "-"
641 + Foam::name(procs.last());
642 }
643 return procDir;
644 }
645 else
646 {
647 word procDir(processorsBaseDir+Foam::name(nProcs_));
648
649 if (ioRanks_.size())
650 {
651 // Detect current processor number
652 label proci = fileOperation::detectProcessorPath(fName);
653
654 if (proci != -1)
655 {
656 // Find lowest io rank
657 label minProc = 0;
658 label maxProc = nProcs_-1;
659 for (const label ranki : ioRanks_)
660 {
661 if (ranki >= nProcs_)
662 {
663 break;
664 }
665 else if (ranki <= proci)
666 {
667 minProc = ranki;
668 }
669 else
670 {
671 maxProc = ranki-1;
672 break;
673 }
674 }
675
676 // Add range if not all processors
677 if (maxProc-minProc+1 != nProcs_)
678 {
679 procDir +=
680 + "_"
681 + Foam::name(minProc)
682 + "-"
683 + Foam::name(maxProc);
684 }
685 }
686 }
687
688 return procDir;
696) const
697{
698 return processorsDir(io.objectPath());
699}
700
701
702// ************************************************************************* //
703// Various backends
704
705#include "collatedFileOperation_mpiio.cxx"
706
707// ************************************************************************* //
Macros for easy insertion into run-time selection tables.
#define addNamedToRunTimeSelectionTable(baseType, thisType, argNames, lookupName)
Add to construction table with 'lookupName' as the key.
#define addToRunTimeSelectionTable(baseType, thisType, argNames)
Add to construction table with typeName as the key.
Defines the attributes of an object for which implicit objectRegistry management is supported,...
Definition IOobject.H:191
static bool bannerEnabled() noexcept
Status of output file banner.
Definition IOobject.H:376
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
A simple container for options an IOstream can normally have.
versionNumber version() const noexcept
Get the stream version.
@ NO_APPEND
no append (truncates existing)
@ APPEND_APP
append (seek end each write)
Output to file stream as an OSstream, normally using std::ofstream for the actual output.
Definition OFstream.H:75
bool processorCase() const noexcept
True if this is a processor case.
Definition TimePathsI.H:52
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition Time.H:75
A 2-tuple for storing two objects of dissimilar types. The container is similar in purpose to std::pa...
Definition Tuple2.H:51
const T1 & first() const noexcept
Access the first element.
Definition Tuple2.H:132
const T2 & second() const noexcept
Access the second element.
Definition Tuple2.H:142
static bool supported()
True if MPI/IO appears to be supported.
static List< int > & procID(int communicator)
The list of ranks within a given communicator.
Definition UPstream.H:1767
static bool parRun(const bool on) noexcept
Set as parallel run on/off.
Definition UPstream.H:1669
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition UPstream.H:1714
static bool is_parallel(const label communicator=worldComm)
True if parallel algorithm or exchange is required.
Definition UPstream.H:1743
static label worldComm
Communicator for all ranks. May differ from commGlobal() if local worlds are in use.
Definition UPstream.H:1069
static label newCommunicator(const label parent, const labelRange &subRanks, const bool withComponents=true)
Create new communicator with sub-ranks on the parent communicator.
Definition UPstream.C:272
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition UPstream.C:622
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const char *str, const size_t len)
Helper: write block of (binary) character data.
static void writeExtraHeaderContent(dictionary &dict, IOstreamOption streamOptData, const IOobject &io)
Helper: generate additional entries for FoamFile header.
A class for handling file names.
Definition fileName.H:75
static bool isAbsolute(const std::string &str)
Return true if filename starts with a '/' or '\' or (windows-only) with a filesystem-root.
Definition fileNameI.H:129
An encapsulation of filesystem-related operations.
static labelRange subRanks(const labelUList &mainIOranks)
Get (contiguous) range/bounds of ranks addressed within the given main io-ranks.
static label getManagedComm(const label communicator)
Construction helper: check for locally allocated communicator.
const labelList ioRanks_
The list of IO ranks (global ranks).
fileName processorsPath(const IOobject &io, const word &instance, const word &procDir) const
Generate path (like io.path) with provided instance and any.
label nProcs_
Overall number of processors.
static label detectProcessorPath(const fileName &objPath)
Detect processor number from '/aa/bb/processorDDD/cc'.
@ OBJECT
io.objectPath() exists
@ PROCOBJECT
objectPath exists in 'processorsNN_first-last'
bool isIOrank(const label proci) const
Is proci a master rank in the communicator (in parallel) or a master rank in the IO ranks (non-parall...
label comm_
Communicator to use.
static labelList getGlobalIORanks()
Get list of global IO ranks from FOAM_IORANKS env variable. If set, these correspond to the IO master...
void printRanks() const
Helper: output which ranks are IO.
static word processorsBaseDir
Return the processors directory name (usually "processors").
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
static int backend_
The type of backend to be used.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
collatedFileOperation(bool verbose=false)
Default construct.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual void storeComm() const
Transfer ownership of communicator to this fileOperation. Use with caution.
bool appendObject(const regIOobject &io, const fileName &pathName, IOstreamOption streamOpt) const
Append to processorsNN/ file.
virtual bool writeObject(const regIOobject &, IOstreamOption streamOpt=IOstreamOption(), const bool writeOnProc=true) const
Writes a regIOobject (so header, contents and divider).
void printBanner(const bool withRanks=false) const
Print banner information, optionally with io ranks.
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A fileOperation initialiser for collated file handlers. Requires threading for non-zero maxThreadFile...
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static float maxMasterFileBufferSize
Max size of parallel communications. Switches from non-blocking.
masterUncollatedFileOperation(bool verbose=false)
Default construct.
virtual fileName::Type type(const fileName &, const bool followLink=true) const
Return the file type: DIRECTORY, FILE or SYMLINK.
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Master-only drop-in replacement for OFstream.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition regIOobject.H:71
A class for handling words, derived from Foam::string.
Definition word.H:66
static const word null
An empty word.
Definition word.H:84
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition className.H:142
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition error.H:629
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
#define DetailInfo
Definition evalEntry.C:30
OBJstream os(runTime.globalPath()/outputName)
const auto & io
Namespace for handling debugging switches.
Definition debug.C:45
float floatOptimisationSwitch(const char *name, const float deflt=0)
Lookup optimisation switch or add default value.
Definition debug.C:240
int optimisationSwitch(const char *name, const int deflt=0)
Lookup optimisation switch or add default value.
Definition debug.C:234
Namespace for implementations of a fileOperation.
Definition regIOobject.H:60
Namespace for OpenFOAM.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition POSIX.C:616
const word GlobalIOList< Tuple2< scalar, vector > >::typeName("scalarVectorTable")
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
errorManip< error > abort(error &err)
Definition errorManip.H:139
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...
int infoDetailLevel
Global for selective suppression of Info output.
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
Definition exprTraits.C:127
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
static Tuple2< label, labelList > getCommPattern()
Ostream & flush(Ostream &os)
Flush stream.
Definition Ostream.H:509
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50
#define registerOptSwitch(Name, Type, SwitchVar)
dictionary dict