45bool Foam::OFstreamCollator::writeFile
48 const word& objectType,
49 const fileName& fName,
50 const UList<char>& localData,
51 const labelUList& recvSizes,
52 const UList<std::string_view>& procData,
53 IOstreamOption streamOpt,
54 IOstreamOption::atomicType atomic,
55 IOstreamOption::appendType
append,
61 Pout<<
"OFstreamCollator : Writing local " << localData.size()
62 <<
" bytes to " << fName <<
" using comm " << comm
63 <<
" and " << procData.size() <<
" sub-ranks" <<
endl;
67 Pout<<
" " << proci <<
" size:"
68 << label(procData[proci].size()) <<
nl;
72 autoPtr<OSstream> osPtr;
76 osPtr.reset(
new OFstream(atomic, fName, streamOpt,
append));
106 fileOperations::masterUncollatedFileOperation::
107 maxMasterFileBufferSize
114 List<std::streamoff> blockOffsets;
127 if (osPtr && !osPtr->good())
135 Pout<<
"OFstreamCollator : Finished writing "
136 << localData.size() <<
" bytes";
141 for (
const label recv : recvSizes)
146 Pout<<
" (overall " << std::to_string(total) <<
')';
148 Pout<<
" to " << fName
149 <<
" using comm " << comm <<
endl;
156void* Foam::OFstreamCollator::writeAll(
void *threadarg)
163 std::unique_ptr<writeData> ptr;
166 std::lock_guard<std::mutex> guard(handler.mutex_);
168 if (handler.objects_.size())
171 ptr.reset(handler.objects_.front());
172 handler.objects_.pop_front();
187 procData[proci] = obj.procData_[proci].view();
207 <<
"Failed writing " << obj.pathName_
215 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
219 std::lock_guard<std::mutex> guard(handler.mutex_);
220 handler.threadRunning_ =
false;
227void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const
235 std::lock_guard<std::mutex> guard(mutex_);
238 if (ptr) totalSize += ptr->size();
245 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
253 std::lock_guard<std::mutex> guard(mutex_);
254 Pout<<
"OFstreamCollator : Waiting for buffer space."
255 <<
" Currently in use:" << totalSize
256 <<
" limit:" << maxBufferSize_
257 <<
" files:" << objects_.size()
276 const off_t maxBufferSize,
280 maxBufferSize_(maxBufferSize),
281 threadRunning_(false),
283 threadComm_(
UPstream::dupCommunicator(localComm_))
295 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
298 thread_.reset(
nullptr);
309 const word& objectType,
315 const bool useThread,
327 label maxLocalSize = 0;
331 for (
const label recvSize : recvSizes)
333 totalSize += recvSize;
334 maxLocalSize =
Foam::max(maxLocalSize, recvSize);
342 static_cast<int64_t
>(totalSize),
343 static_cast<int64_t
>(maxLocalSize)
348 totalSize =
static_cast<off_t
>(sizes[0]);
349 maxLocalSize =
static_cast<label
>(sizes[1]);
355 enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED };
357 dispatchModes dispatch(dispatchModes::DIRECT_WRITE);
359 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
361 dispatch = dispatchModes::DIRECT_WRITE;
363 else if (totalSize <= maxBufferSize_)
368 dispatch = dispatchModes::THREADED_WRITE;
374 dispatch = dispatchModes::FULL_THREADED;
379 <<
"MPI not initialized with thread support." <<
nl
380 <<
" maxThreadFileBufferSize = 0 to disable threading" <<
nl
381 <<
" or maxThreadFileBufferSize > " << totalSize
382 <<
" to collate before threaded writing." <<
nl <<
nl;
384 dispatch = dispatchModes::DIRECT_WRITE;
393 if (dispatch == dispatchModes::DIRECT_WRITE)
397 Pout<<
"OFstreamCollator : non-thread gather "
398 <<
"(local comm: " << localComm_
399 <<
"); non-thread write of "
418 else if (dispatch == dispatchModes::THREADED_WRITE)
422 Pout<<
"OFstreamCollator : non-thread gather "
423 <<
"(local comm: " << localComm_
424 <<
"); thread write of "
430 waitForBufferSpace(totalSize);
433 std::unique_ptr<writeData> fileAndDataPtr
447 auto& fileAndData = *fileAndDataPtr;
457 fileAndData.transfer(localData);
464 recvProcs.reserve_exact(order.size());
469 const label proci = order[i];
474 recvProcs.push_back(proci);
494 for (
const int proci : recvProcs)
496 auto& slot = procData[proci];
497 slot.resize_nocopy(recvSizes[proci]);
519 localData.cdata_bytes(),
520 localData.size_bytes(),
527 <<
"Failure to send message (size: "
528 << localData.size() <<
") to master" <<
nl
536 localData.clearStorage();
541 std::lock_guard<std::mutex> guard(mutex_);
544 objects_.push_back(fileAndDataPtr.release());
553 Pout<<
"OFstreamCollator : Waiting for write thread"
561 Pout<<
"OFstreamCollator : Starting write thread"
564 thread_.reset(
new std::thread(writeAll,
this));
565 threadRunning_ =
true;
571 else if (dispatch == dispatchModes::FULL_THREADED)
575 Pout<<
"OFstreamCollator : thread gather; thread write "
576 <<
"(thread comm: " << threadComm_
577 <<
") of " << fName <<
endl;
582 waitForBufferSpace(localData.size());
585 std::unique_ptr<writeData> fileAndDataPtr
601 fileAndDataPtr->transfer(localData);
606 std::lock_guard<std::mutex> guard(mutex_);
609 objects_.push_back(fileAndDataPtr.release());
620 Pout<<
"OFstreamCollator : Waiting for write thread"
628 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
630 thread_.reset(
new std::thread(writeAll,
this));
631 threadRunning_ =
true;
639 <<
"Unknown dispatch mode: " << int(dispatch)
655 Pout<<
"OFstreamCollator : waiting for thread to have consumed all"
658 waitForBufferSpace(-1);
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
void reserve_exact(const label len)
Reserve allocation space for at least this size, allocating new space if required and retaining old c...
void push_back(const T &val)
Copy append an element to the end of this list.
A simple container for options an IOstream can normally have.
atomicType
Atomic operations (output).
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE).
@ NO_APPEND
no append (truncates existing)
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
bool write(const word &objectType, const fileName &fName, DynamicList< char > &&localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents, possibly taking ownership of the content.
virtual ~OFstreamCollator()
Destructor.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size (0 = do not use thread) and with worldComm.
void waitAll()
Wait for all thread actions to have finished.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
static const UList< T > & null() noexcept
Return a null UList (reference to a nullObject). Behaves like an empty UList.
void size(const label n)
Older name for setAddressableSize.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
Inter-processor communications stream.
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests).
commsTypes
Communications types.
@ scheduled
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
static int & msgType() noexcept
Message tag of standard messages.
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
static bool is_subrank(const label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run.
static bool haveThreads() noexcept
Have support for threads.
static void waitRequests()
Wait for all requests to finish.
@ broadcast
broadcast [MPI]
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string ¬e, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &localData, const labelUList &recvSizes, const UList< std::string_view > &procData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
A class for handling file names.
A class for handling words, derived from Foam::string.
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
OBJstream os(runTime.globalPath()/outputName)
#define WarningInFunction
Report a warning using Foam::Warning.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Namespace for handling debugging switches.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
List< label > labelList
A List of labels.
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Ostream & endl(Ostream &os)
Add newline and flush stream.
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
labelList sortedOrder(const UList< T > &input)
Return the (stable) sort order for the list.
errorManip< error > abort(error &err)
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void writeData(Ostream &os, const Type &val)
errorManipArg< error, int > exit(error &err, const int errNo=1)
constexpr char nl
The newline '\n' character (0x0a).
#define forAll(list, i)
Loop across all elements in list.
#define forAllReverse(list, i)
Reverse loop across all elements in list.