Loading...
Searching...
No Matches
OFstreamCollator.H
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2019-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27Class
28 Foam::OFstreamCollator
29
30Description
31 Threaded file writer.
32
33 Collects all data from all processors and writes as single
34 'decomposedBlockData' file. The operation is determined by the
35 buffer size (maxThreadFileBufferSize setting):
36 - local size of data is larger than buffer: receive and write processor
37 by processor (i.e. 'scheduled'). Does not use a thread, no file size
38 limit.
39 - total size of data is larger than buffer (but local is not):
40 thread does all the collecting and writing of the processors. No file
41 size limit.
42 - total size of data is less than buffer:
43 collecting is done locally; the thread only does the writing
44 (since the data has already been collected)
45
46SourceFiles
47 OFstreamCollator.C
48
49\*---------------------------------------------------------------------------*/
50
51#ifndef Foam_OFstreamCollator_H
52#define Foam_OFstreamCollator_H
53
54#include "IOstream.H"
55#include "List.H"
56#include "CircularBuffer.H" // As FIFO
57#include "dictionary.H"
58
59#include <mutex>
60#include <thread>
61
62// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
63
64namespace Foam
65{
67/*---------------------------------------------------------------------------*\
68 Class OFstreamCollator Declaration
69\*---------------------------------------------------------------------------*/
70
72{
73 // Private Class
74
75 //- Holds data to be written
76 struct writeData
77 {
78 const label comm_;
79 const word objectType_;
80 const fileName pathName_;
81 DynamicList<char> localData_;
82 const labelList sizes_;
83 List<List<char>> procData_;
84 const IOstreamOption streamOpt_;
87 const dictionary headerEntries_;
88
89 writeData() = delete; // No default construct
90 writeData(const writeData&) = delete; // No copy construct
91 writeData(writeData&&) = delete; // No move construct
92 void operator=(const writeData&) = delete; // No copy assign
93 void operator=(writeData&&) = delete; // No move assign
94
95 //- Construct without local or proc data
96 writeData
97 (
98 const label comm,
99 const word& objectType,
100 const fileName& pathName,
101 const labelUList& sizes,
102 IOstreamOption streamOpt,
105 const dictionary& headerEntries
106 )
107 :
108 comm_(comm),
109 objectType_(objectType),
110 pathName_(pathName),
111 sizes_(sizes),
112 streamOpt_(streamOpt),
113 atomic_(atomic),
114 append_(append),
115 headerEntries_(headerEntries)
116 {}
117
118 //- Move reset local data
119 void transfer(DynamicList<char>& localData)
120 {
121 localData_.transfer(localData);
122 }
123
124 //- The (approximate) size of local + any optional proc data
125 off_t size() const
126 {
127 off_t total = localData_.size();
128 for (const auto& data : procData_)
129 {
130 total += data.size();
131 }
132 return total;
133 }
134 };
135
136
137 // Private Data
138
139 //- Total amount of storage to use for object stack below
140 const off_t maxBufferSize_;
141
142 mutable std::mutex mutex_;
143
144 std::unique_ptr<std::thread> thread_;
145
146 //- FIFO of files to write and their contents
148
149 //- Whether thread is running (and not exited)
150 bool threadRunning_;
151
152 //- Communicator to use for all parallel ops (in simulation thread)
153 label localComm_;
154
155 //- Communicator to use for all parallel ops (in write thread)
156 label threadComm_;
157
158
159 // Private Member Functions
160
161 //- Write actual file
162 static bool writeFile
163 (
164 const label comm,
165 const word& objectType,
166 const fileName& fName,
167 const UList<char>& localData,
168 const labelUList& recvSizes,
169 const UList<std::string_view>& procData,
170 IOstreamOption streamOpt,
173 const dictionary& headerEntries
174 );
175
176 //- Write all files in stack
177 static void* writeAll(void *threadarg);
178
179 //- Wait for total size of objects_ (master + optional slave data)
180 // to be wantedSize less than overall maxBufferSize.
181 void waitForBufferSpace(const off_t wantedSize) const;
182
183
184public:
185
186 // Declare name of the class and its debug switch
187 TypeName("OFstreamCollator");
188
189
190 // Constructors
191
192 //- Construct from buffer size (0 = do not use thread)
193 //- and with worldComm
194 explicit OFstreamCollator(const off_t maxBufferSize);
195
196 //- Construct from buffer size (0 = do not use thread)
197 //- and specified communicator
198 OFstreamCollator(const off_t maxBufferSize, const label comm);
199
200
201 //- Destructor
202 virtual ~OFstreamCollator();
203
204
205 // Member Functions
206
207 //- Write file with contents, possibly taking ownership of the
208 //- content.
209 // Blocks until write-thread has space available
210 // (total file sizes < maxBufferSize)
211 bool write
212 (
213 const word& objectType,
214 const fileName& fName,
215 DynamicList<char>&& localData,
216 IOstreamOption streamOpt,
219 const bool useThread = true,
220 const dictionary& headerEntries = dictionary::null
221 );
222
223 //- Write file with contents.
224 FOAM_DEPRECATED_FOR(2023-09, "use write with movable content")
225 bool write
226 (
227 const word& objectType,
228 const fileName& fName,
229 const std::string& s,
230 IOstreamOption streamOpt,
231 IOstreamOption::atomicType atomic,
232 IOstreamOption::appendType append,
233 const bool useThread = true,
234 const dictionary& headerEntries = dictionary::null
235 )
236 {
237 DynamicList<char> charData(label(s.size()));
238 charData.resize(s.size());
239 std::copy(s.begin(), s.end(), charData.begin());
240
241 return write
242 (
243 objectType,
244 fName,
245 std::move(charData),
246 streamOpt,
247 atomic,
248 append,
249 useThread,
250 headerEntries
251 );
252 }
253
254 //- Wait for all thread actions to have finished
255 void waitAll();
257
258
259// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
260
261} // End namespace Foam
262
263// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
264
265#endif
266
267// ************************************************************************* //
A simple list of objects of type <T> that is intended to be used as a circular buffer (eg,...
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
void transfer(List< T > &list)
Transfer contents of the argument List into this.
void resize(const label len)
Alter addressable list size, allocating new space if required while recovering old content.
A simple container for options an IOstream can normally have.
atomicType
Atomic operations (output).
appendType
File appending (NO_APPEND | APPEND_APP | APPEND_ATE).
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
bool write(const word &objectType, const fileName &fName, DynamicList< char > &&localData, IOstreamOption streamOpt, IOstreamOption::atomicType atomic, IOstreamOption::appendType append, const bool useThread=true, const dictionary &headerEntries=dictionary::null)
Write file with contents, possibly taking ownership of the content.
virtual ~OFstreamCollator()
Destructor.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size (0 = do not use thread) and with worldComm.
TypeName("OFstreamCollator")
void waitAll()
Wait for all thread actions to have finished.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition UList.H:89
iterator begin() noexcept
Return an iterator to begin traversing the UList.
Definition UListI.H:410
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition dictionary.H:133
static const dictionary null
An empty dictionary, which is also the parent for all dictionaries.
Definition dictionary.H:487
A class for handling file names.
Definition fileName.H:75
A class for handling words, derived from Foam::string.
Definition word.H:66
gmvFile<< "tracers "<< particles.size()<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().x()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().y()<< " ";}gmvFile<< nl;for(const passiveParticle &p :particles){ gmvFile<< p.position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
Namespace for OpenFOAM.
List< label > labelList
A List of labels.
Definition List.H:62
UList< label > labelUList
A UList of labels.
Definition UList.H:75
runTime write()
#define FOAM_DEPRECATED_FOR(since, replacement)
Definition stdFoam.H:43
#define TypeName(TypeNameString)
Declare a ClassName() with extra virtual type info.
Definition typeInfo.H:68