Loading...
Searching...
No Matches
masterOFstream.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017 OpenFOAM Foundation
9 Copyright (C) 2020-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "masterOFstream.H"
30#include "OFstream.H"
31#include "OSspecific.H"
32#include "Pstream.H"
34
35// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
36
37void Foam::masterOFstream::checkWrite
38(
39 const fileName& fName,
40 const char* str,
41 std::streamsize len
42)
43{
44 if (!str || !(len > 0))
45 {
46 // Can skip everything if there is nothing to write
47 return;
48 }
49
50 Foam::mkDir(fName.path());
51
52 OFstream os
53 (
54 atomic_,
55 fName,
57 append_
58 );
59 if (!os.good())
60 {
62 << "Could not open file " << fName << nl
64 }
65
66 // Write characters directly to std::ostream
67 os.writeRaw(str, len);
68
69 if (!os.good())
70 {
72 << "Failed writing to " << fName << nl
74 }
75}
76
77
78void Foam::masterOFstream::commit()
79{
80 // Take ownership of serialized content
82
83 if (!UPstream::parRun())
84 {
85 // Write (non-empty) data
86 checkWrite(pathName_, charData);
87 }
88 else
89 {
90 // Ignore content if not writing
91 if (!writeOnProc_)
92 {
93 charData.clear();
94 }
95
96 List<fileName> filePaths(UPstream::nProcs(comm_));
97 filePaths[UPstream::myProcNo(comm_)] = pathName_;
98 Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
99
100 // Test for identical output paths
101 bool uniform =
102 (
103 UPstream::master(comm_)
104 && fileOperation::uniformFile(filePaths)
105 );
106
108
109 if (uniform)
110 {
111 // Identical file paths - write on master
112 if (UPstream::master(comm_) && writeOnProc_)
113 {
114 checkWrite(pathName_, charData);
115 }
116 return;
117 }
118
119 // Different files
120 // ---------------
121 //
122 // Non-sparse (most ranks have writeOnProc_ == true),
123 // so gather sizes first and use PEX-like handling,
124 // with polling for when data becomes available.
125 //
126 // Could also consider double buffering + write to reduce
127 // memory overhead.
128
129 // Or int64_t
130 const label dataSize =
131 (
132 (UPstream::is_subrank(comm_) && writeOnProc_)
133 ? charData.size()
134 : 0
135 );
136
137 const labelList recvSizes
138 (
139 UPstream::listGatherValues<label>(dataSize, comm_)
140 );
141
142 // Receive from these procs
143 DynamicList<int> recvProcs;
144
145 if (UPstream::master(comm_))
146 {
147 // Sorted by message size
148 labelList order(Foam::sortedOrder(recvSizes));
149 recvProcs.reserve_exact(order.size());
150
151 // Want to receive large messages first. Ignore empty slots
152 forAllReverse(order, i)
153 {
154 const label proci = order[i];
155
156 // Ignore empty slots
157 if (recvSizes[proci] > 0)
158 {
159 recvProcs.push_back(proci);
160 }
161 }
162 }
163
164 // Non-blocking communication
165 const label startOfRequests = UPstream::nRequests();
166
167 // Some unique tag for this read/write grouping (extra precaution)
168 const int messageTag = (UPstream::msgType() + 256);
169
170 if (UPstream::is_subrank(comm_) && dataSize > 0)
171 {
172 // Send to content to master
174 (
177 charData.cdata_bytes(),
178 charData.size_bytes(),
179 messageTag,
180 comm_
181 );
182 }
183 else if (UPstream::master(comm_))
184 {
185 // The receive slots
186 List<List<char>> recvBuffers(UPstream::nProcs(comm_));
187
188 // Receive from these procs (non-empty slots)
189 for (const int proci : recvProcs)
190 {
191 auto& slot = recvBuffers[proci];
192 slot.resize_nocopy(recvSizes[proci]);
193
194 // Receive content
196 (
198 proci,
199 slot.data_bytes(),
200 slot.size_bytes(),
201 messageTag,
202 comm_
203 );
204 }
205
206 if (writeOnProc_)
207 {
208 // Write non-empty master data
209 checkWrite(pathName_, charData);
210 charData.clear();
211 }
212
213 // Poll for completed receive requests and dispatch
214 DynamicList<int> indices(recvProcs.size());
215 while
216 (
218 (
219 startOfRequests,
220 recvProcs.size(),
221 &indices
222 )
223 )
224 {
225 for (const int i : indices)
226 {
227 const int proci = recvProcs[i];
228 auto& slot = recvBuffers[proci];
229
230 // Write non-empty sub-proc data
231 checkWrite(filePaths[proci], slot);
232
233 // Eager cleanup
234 slot.clear();
235 }
236 }
237 }
238
239 UPstream::waitRequests(startOfRequests);
240 }
241}
242
243
244// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
245
247(
249 const int communicator,
250 const fileName& pathName,
251 IOstreamOption streamOpt,
253 const bool writeOnProc
254)
255:
256 OCharStream(streamOpt),
257 pathName_(pathName),
258 atomic_(atomic),
259 compression_(streamOpt.compression()),
260 append_(append),
261 writeOnProc_(writeOnProc),
262 comm_(communicator < 0 ? UPstream::worldComm : communicator)
263{
264 // Start with a slightly larger buffer
265 OCharStream::reserve(4*1024);
266}
267
268
269// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
270
272{
273 commit();
274}
275
276
277// ************************************************************************* //
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
A simple container for options an IOstream can normally have.
versionNumber version() const noexcept
Get the stream version.
constexpr IOstreamOption(streamFormat fmt=streamFormat::ASCII, compressionType comp=compressionType::UNCOMPRESSED) noexcept
Default construct (ASCII, UNCOMPRESSED, currentVersion) or construct with format, compression.
compressionType compression() const noexcept
Get the stream compression.
atomicType
Atomic operations (output).
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE).
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
OCharStream(IOstreamOption streamOpt=IOstreamOption())
Default construct (empty output).
void append(std::streamsize count, char c)
Append repeated character content.
DynamicList< char > release()
Reset buffer and return contents.
auto str() const
For OStringStream compatibility, return the buffer as string copy.
void reserve(std::streamsize n)
Reserve output space for at least this amount.
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
Inter-processor communications stream.
Definition UPstream.H:69
static bool waitSomeRequests(label pos, label len=-1, DynamicList< int > *indices=nullptr)
Wait until some requests (from position onwards) have finished. Corresponds to MPI_Waitsome().
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition UPstream.H:1706
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests).
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static bool parRun(const bool on) noexcept
Set as parallel run on/off.
Definition UPstream.H:1669
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition UPstream.H:1691
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition UPstream.H:1714
static bool is_subrank(const label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
Definition UPstream.H:1731
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run.
Definition UPstream.H:1697
static void waitRequests()
Wait for all requests to finish.
Definition UPstream.H:2497
@ gatherList
gatherList [manual algorithm]
Definition UPstream.H:194
@ broadcast
broadcast [MPI]
Definition UPstream.H:189
A class for handling file names.
Definition fileName.H:75
static bool uniformFile(const fileNameList &names)
True if the file names are identical. False on an empty list.
masterOFstream(IOstreamOption::atomicType atomic, const int communicator, const fileName &pathname, IOstreamOption streamOpt=IOstreamOption(), IOstreamOption::appendType append=IOstreamOption::NO_APPEND, const bool writeOnProc=true)
Construct with specified atomic behaviour and communicator from pathname, stream option,...
~masterOFstream()
Destructor - commits buffered information to file.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition error.H:629
OBJstream os(runTime.globalPath()/outputName)
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
List< label > labelList
A List of labels.
Definition List.H:62
bool mkDir(const fileName &pathName, mode_t mode=0777)
Make a directory and return an error if it could not be created.
Definition POSIX.C:616
labelList sortedOrder(const UList< T > &input)
Return the (stable) sort order for the list.
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50
#define forAllReverse(list, i)
Reverse loop across all elements in list.
Definition stdFoam.H:315