Loading...
Searching...
No Matches
OFstreamCollator.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2019-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "OFstreamCollator.H"
30#include "OFstream.H"
31#include "decomposedBlockData.H"
32#include "dictionary.H"
34
35// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
36
37namespace Foam
38{
40}
41
42
43// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
44
45bool Foam::OFstreamCollator::writeFile
46(
47 const label comm,
48 const word& objectType,
49 const fileName& fName,
50 const UList<char>& localData,
51 const labelUList& recvSizes,
52 const UList<std::string_view>& procData, // optional proc data
53 IOstreamOption streamOpt,
54 IOstreamOption::atomicType atomic,
55 IOstreamOption::appendType append,
56 const dictionary& headerEntries
57)
58{
59 if (debug)
60 {
61 Pout<< "OFstreamCollator : Writing local " << localData.size()
62 << " bytes to " << fName << " using comm " << comm
63 << " and " << procData.size() << " sub-ranks" << endl;
64
65 forAll(procData, proci)
66 {
67 Pout<< " " << proci << " size:"
68 << label(procData[proci].size()) << nl;
69 }
70 }
71
72 autoPtr<OSstream> osPtr;
73 if (UPstream::master(comm))
74 {
75 Foam::mkDir(fName.path());
76 osPtr.reset(new OFstream(atomic, fName, streamOpt, append));
77 auto& os = *osPtr;
78
80 {
81 // No IOobject so cannot use IOobject::writeHeader
82
83 // FoamFile
85 (
86 os,
87 streamOpt, // streamOpt for container
88 objectType,
89 "", // note
90 "", // location (leave empty, otherwise inaccurate)
91 fName.name(), // object name
92 headerEntries
93 );
94 }
95 }
96
97 // Assuming threaded writing hides any slowness so we
98 // can use scheduled communication to send the data to
99 // the master processor in order. However can be unstable
100 // for some mpi so default is non-blocking.
101 const UPstream::commsTypes myCommsType
102 (
103 // Blocking when buffer size is 0
105 (
106 fileOperations::masterUncollatedFileOperation::
107 maxMasterFileBufferSize
108 ) < 1
111 );
112
113
114 List<std::streamoff> blockOffsets; // Optional
116 (
117 comm,
118 osPtr,
119 blockOffsets, // or List<std::streamoff>::null()
120 localData,
121 recvSizes,
122 procData,
123 myCommsType,
124 false // do not sync return state
125 );
126
127 if (osPtr && !osPtr->good())
128 {
130 << "Failed writing to " << fName << exit(FatalIOError);
131 }
132
133 if (debug)
134 {
135 Pout<< "OFstreamCollator : Finished writing "
136 << localData.size() << " bytes";
137
138 if (UPstream::master(comm))
139 {
140 off_t total = 0;
141 for (const label recv : recvSizes)
142 {
143 total += recv;
144 }
145 // Use std::to_string to display long int
146 Pout<< " (overall " << std::to_string(total) << ')';
147 }
148 Pout<< " to " << fName
149 << " using comm " << comm << endl;
150 }
151
152 return true;
153}
154
155
156void* Foam::OFstreamCollator::writeAll(void *threadarg)
157{
158 OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
159
160 // Consume stack
161 while (true)
162 {
163 std::unique_ptr<writeData> ptr;
164
165 {
166 std::lock_guard<std::mutex> guard(handler.mutex_);
167
168 if (handler.objects_.size())
169 {
170 // FIFO
171 ptr.reset(handler.objects_.front());
172 handler.objects_.pop_front();
173 }
174 }
175
176 if (!ptr)
177 {
178 break;
179 }
180
181 writeData& obj = *ptr;
182
183 // Obtain views from storage
184 List<std::string_view> procData(obj.procData_.size());
185 forAll(procData, proci)
186 {
187 procData[proci] = obj.procData_[proci].view();
188 }
189
190 bool ok = writeFile
191 (
192 obj.comm_,
193 obj.objectType_,
194 obj.pathName_,
195 obj.localData_,
196 obj.sizes_,
197 procData,
198 obj.streamOpt_,
199 obj.atomic_,
200 obj.append_,
201 obj.headerEntries_
202 );
203
204 if (!ok)
205 {
206 FatalIOErrorInFunction(obj.pathName_)
207 << "Failed writing " << obj.pathName_
208 << exit(FatalIOError);
209 }
210 //sleep(1);
211 }
212
213 if (debug)
214 {
215 Pout<< "OFstreamCollator : Exiting write thread " << endl;
216 }
217
218 {
219 std::lock_guard<std::mutex> guard(handler.mutex_);
220 handler.threadRunning_ = false;
221 }
222
223 return nullptr;
224}
225
226
227void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
228{
229 while (true)
230 {
231 // The pending output size(s)
232 off_t totalSize = 0;
233
234 {
235 std::lock_guard<std::mutex> guard(mutex_);
236 for (const writeData* ptr : objects_)
237 {
238 if (ptr) totalSize += ptr->size();
239 }
240 }
241
242 if
243 (
244 totalSize == 0
245 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
246 )
247 {
248 break;
249 }
250
251 if (debug)
252 {
253 std::lock_guard<std::mutex> guard(mutex_);
254 Pout<< "OFstreamCollator : Waiting for buffer space."
255 << " Currently in use:" << totalSize
256 << " limit:" << maxBufferSize_
257 << " files:" << objects_.size()
258 << endl;
259 }
260
262 }
263}
264
265
266// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
268Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
269:
270 OFstreamCollator(maxBufferSize, UPstream::worldComm)
271{}
272
273
275(
276 const off_t maxBufferSize,
277 const label comm
278)
279:
280 maxBufferSize_(maxBufferSize),
281 threadRunning_(false),
282 localComm_(comm),
283 threadComm_(UPstream::dupCommunicator(localComm_))
284{}
285
286
287// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
288
290{
291 if (thread_)
292 {
293 if (debug)
294 {
295 Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
296 }
297 thread_->join();
298 thread_.reset(nullptr);
299 }
301 UPstream::freeCommunicator(threadComm_);
302}
303
304
305// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
306
308(
309 const word& objectType,
310 const fileName& fName,
311 DynamicList<char>&& localData,
312 IOstreamOption streamOpt,
315 const bool useThread,
316 const dictionary& headerEntries
317)
318{
319 // Determine (on master) sizes to receive. Note: do NOT use thread
320 // communicator
321 const labelList recvSizes
322 (
323 UPstream::listGatherValues<label>(localData.size(), localComm_)
324 );
325
326 off_t totalSize = 0;
327 label maxLocalSize = 0;
328
329 if (UPstream::master(localComm_))
330 {
331 for (const label recvSize : recvSizes)
332 {
333 totalSize += recvSize;
334 maxLocalSize = Foam::max(maxLocalSize, recvSize);
335 }
336 }
337
338 // Broadcast the information to everyone
339 {
340 int64_t sizes[2] =
341 {
342 static_cast<int64_t>(totalSize),
343 static_cast<int64_t>(maxLocalSize)
344 };
345
346 UPstream::broadcast(sizes, 2, localComm_);
347
348 totalSize = static_cast<off_t>(sizes[0]);
349 maxLocalSize = static_cast<label>(sizes[1]);
350 }
351
352
353 // Determine how things will be gathered and written...
354
355 enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED };
356
357 dispatchModes dispatch(dispatchModes::DIRECT_WRITE);
358
359 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
360 {
361 dispatch = dispatchModes::DIRECT_WRITE;
362 }
363 else if (totalSize <= maxBufferSize_)
364 {
365 // Total size can be stored locally
366 // - gather all data now and only do the writing in the thread
367
368 dispatch = dispatchModes::THREADED_WRITE;
369 }
370 else
371 {
372 // Gather data and write in the thread
373
374 dispatch = dispatchModes::FULL_THREADED;
375
377 {
379 << "MPI not initialized with thread support." << nl
380 << " maxThreadFileBufferSize = 0 to disable threading" << nl
381 << " or maxThreadFileBufferSize > " << totalSize
382 << " to collate before threaded writing." << nl << nl;
383
384 dispatch = dispatchModes::DIRECT_WRITE;
385 }
386 }
387
388
389 // -----------
390 // Dispatching
391 // -----------
392
393 if (dispatch == dispatchModes::DIRECT_WRITE)
394 {
395 if (debug)
396 {
397 Pout<< "OFstreamCollator : non-thread gather "
398 << "(local comm: " << localComm_
399 << "); non-thread write of "
400 << fName << endl;
401 }
402
403 // Direct collating and writing (so master blocks until all written!)
404 return writeFile
405 (
406 localComm_,
407 objectType,
408 fName,
409 localData,
410 recvSizes,
411 UList<std::string_view>::null(), // dummy proc data
412 streamOpt,
413 atomic,
414 append,
415 headerEntries
416 );
417 }
418 else if (dispatch == dispatchModes::THREADED_WRITE)
419 {
420 if (debug)
421 {
422 Pout<< "OFstreamCollator : non-thread gather "
423 << "(local comm: " << localComm_
424 << "); thread write of "
425 << fName << endl;
426 }
427
428 if (UPstream::master(localComm_))
429 {
430 waitForBufferSpace(totalSize);
431 }
432
433 std::unique_ptr<writeData> fileAndDataPtr
434 (
435 new writeData
436 (
437 threadComm_,
438 objectType,
439 fName,
440 recvSizes,
441 streamOpt,
442 atomic,
443 append,
444 headerEntries
445 )
446 );
447 auto& fileAndData = *fileAndDataPtr;
448
449 List<List<char>>& procData = fileAndData.procData_;
450
451 // Receive from these procs
452 DynamicList<int> recvProcs;
453
454 if (UPstream::master(localComm_))
455 {
456 // Move in local data (master only!)
457 fileAndData.transfer(localData);
458
459 // Storage for receive data
460 procData.resize_nocopy(UPstream::nProcs(localComm_));
461
462 // Sorted by message size
463 labelList order(Foam::sortedOrder(recvSizes));
464 recvProcs.reserve_exact(order.size());
465
466 // Want to receive large messages first. Ignore empty slots
467 forAllReverse(order, i)
468 {
469 const label proci = order[i];
470
471 // Ignore empty slots and don't try to receive from self
472 if (recvSizes[proci] > 0 && proci != UPstream::masterNo())
473 {
474 recvProcs.push_back(proci);
475 }
476 }
477 }
478 else if (UPstream::is_subrank(localComm_))
479 {
480 // Requires a size for decomposedBlockData::writeBlocks() logic
481 procData.resize_nocopy(UPstream::nProcs(localComm_));
482 }
483
484
485 // Gather all data onto master. Is done in local communicator since
486 // not in write thread.
487 const label startOfRequests = UPstream::nRequests();
488
489 const int messageTag = (UPstream::msgType() + 256);
490
491 if (UPstream::master(localComm_))
492 {
493 // Receive from these procs (non-empty slots)
494 for (const int proci : recvProcs)
495 {
496 auto& slot = procData[proci];
497 slot.resize_nocopy(recvSizes[proci]);
498
500 (
502 proci,
503 slot.data_bytes(),
504 slot.size_bytes(),
505 messageTag,
506 localComm_
507 );
508 }
509 }
510 else if (UPstream::is_subrank(localComm_) && !localData.empty())
511 {
512 // Send to content to master
513 if
514 (
516 (
519 localData.cdata_bytes(),
520 localData.size_bytes(),
521 messageTag,
522 localComm_
523 )
524 )
525 {
527 << "Failure to send message (size: "
528 << localData.size() << ") to master" << nl
530 }
531 }
532
533 UPstream::waitRequests(startOfRequests);
534
535 // The localData has been moved (master) or communicated
536 localData.clearStorage();
537
538
539 // Queue up for threading
540 {
541 std::lock_guard<std::mutex> guard(mutex_);
542
543 // Add to thread buffer (as FIFO), take ownership
544 objects_.push_back(fileAndDataPtr.release());
545
546 // Start thread if not running
547 if (!threadRunning_)
548 {
549 if (thread_)
550 {
551 if (debug)
552 {
553 Pout<< "OFstreamCollator : Waiting for write thread"
554 << endl;
555 }
556 thread_->join();
557 }
558
559 if (debug)
560 {
561 Pout<< "OFstreamCollator : Starting write thread"
562 << endl;
563 }
564 thread_.reset(new std::thread(writeAll, this));
565 threadRunning_ = true;
566 }
567 }
568
569 return true;
570 }
571 else if (dispatch == dispatchModes::FULL_THREADED)
572 {
573 if (debug)
574 {
575 Pout<< "OFstreamCollator : thread gather; thread write "
576 << "(thread comm: " << threadComm_
577 << ") of " << fName << endl;
578 }
579
580 if (UPstream::master(localComm_))
581 {
582 waitForBufferSpace(localData.size());
583 }
584
585 std::unique_ptr<writeData> fileAndDataPtr
586 (
587 new writeData
588 (
589 threadComm_,
590 objectType,
591 fName,
592 recvSizes,
593 streamOpt,
594 atomic,
595 append,
596 headerEntries
597 )
598 );
599
600 // Move in local data (all procs)
601 fileAndDataPtr->transfer(localData);
602
603
604 // Queue up for threading
605 {
606 std::lock_guard<std::mutex> guard(mutex_);
607
608 // Add to thread buffer (as FIFO), take ownership
609 objects_.push_back(fileAndDataPtr.release());
610
611 // Note: no proc data provided
612 // so it will trigger communication inside the thread!!!
613
614 if (!threadRunning_)
615 {
616 if (thread_)
617 {
618 if (debug)
619 {
620 Pout<< "OFstreamCollator : Waiting for write thread"
621 << endl;
622 }
623 thread_->join();
624 }
625
626 if (debug)
627 {
628 Pout<< "OFstreamCollator : Starting write thread" << endl;
629 }
630 thread_.reset(new std::thread(writeAll, this));
631 threadRunning_ = true;
632 }
633 }
634
635 return true;
636 }
637
639 << "Unknown dispatch mode: " << int(dispatch)
640 << " - programming error?" << abort(FatalError);
641
642 return false;
643}
644
645
647{
648 // Wait for all buffer space to be available
649 // - ie, wait for all jobs to finish
650
651 if (UPstream::master(localComm_))
652 {
653 if (debug)
654 {
655 Pout<< "OFstreamCollator : waiting for thread to have consumed all"
656 << endl;
657 }
658 waitForBufferSpace(-1);
659 }
660}
661
662
663// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
void reserve_exact(const label len)
Reserve allocation space for at least this size, allocating new space if required and retaining old c...
void push_back(const T &val)
Copy append an element to the end of this list.
A simple container for options an IOstream can normally have.
atomicType
Atomic operations (output).
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE).
@ NO_APPEND
no append (truncates existing)
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition ListI.H:171
Threaded file writer.
bool write(const word &objectType, const fileName &fName, DynamicList< char > &&localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents, possibly taking ownership of the content.
virtual ~OFstreamCollator()
Destructor.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size (0 = do not use thread) and with worldComm.
void waitAll()
Wait for all thread actions to have finished.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
static const UList< T > & null() noexcept
Return a null UList (reference to a nullObject). Behaves like an empty UList.
Definition UList.H:225
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
Inter-processor communications stream.
Definition UPstream.H:69
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests).
commsTypes
Communications types.
Definition UPstream.H:81
@ scheduled
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
Definition UPstream.H:83
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition UPstream.H:1691
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition UPstream.H:1714
static bool is_subrank(const label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
Definition UPstream.H:1731
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run.
Definition UPstream.H:1697
static bool haveThreads() noexcept
Have support for threads.
Definition UPstream.H:1686
static void waitRequests()
Wait for all requests to finish.
Definition UPstream.H:2497
@ broadcast
broadcast [MPI]
Definition UPstream.H:189
static void freeCommunicator(const label communicator, const bool withComponents=true)
Free a previously allocated communicator.
Definition UPstream.C:622
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &localData, const labelUList &recvSizes, const UList< std::string_view > &procData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition dictionary.H:133
A class for handling file names.
Definition fileName.H:75
A class for handling words, derived from Foam::string.
Definition word.H:66
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition className.H:142
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition error.H:629
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
OBJstream os(runTime.globalPath()/outputName)
#define WarningInFunction
Report a warning using Foam::Warning.
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Namespace for handling debugging switches.
Definition debug.C:45
Namespace for OpenFOAM.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition hashSets.C:40
List< label > labelList
A List of labels.
Definition List.H:62
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition POSIX.C:616
unsigned int sleep(const unsigned int sec)
Sleep for the specified number of seconds.
Definition POSIX.C:1549
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
dimensioned< typename typeOfMag< Type >::type > mag(const dimensioned< Type > &dt)
labelList sortedOrder(const UList< T > &input)
Return the (stable) sort order for the list.
errorManip< error > abort(error &err)
Definition errorManip.H:139
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
static void writeData(Ostream &os, const Type &val)
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50
#define forAll(list, i)
Loop across all elements in list.
Definition stdFoam.H:299
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition stdFoam.H:315