Loading...
Searching...
No Matches
UPstreamFile.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2025 Mark Olesen
9-------------------------------------------------------------------------------
10License
11 This file is part of OpenFOAM.
12
13 OpenFOAM is free software: you can redistribute it and/or modify it
14 under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
17
18 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
19 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 for more details.
22
23 You should have received a copy of the GNU General Public License
24 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
25
26\*---------------------------------------------------------------------------*/
27
28#include "fileName.H"
29#include "UPstreamFile.H"
30#include "PstreamGlobals.H"
31#include "OSspecific.H"
32
33// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
34
35// Has _c() version?
36#undef Foam_UPstream_largeCountFile
37
38#if (MPI_VERSION >= 4)
39#define Foam_UPstream_largeCountFile
40#endif
41
42// Macros for calling versions with or without '_c'
43#ifdef Foam_UPstream_largeCountFile
44#define Foam_mpiCall(Function) Function##_c
45#else
46#define Foam_mpiCall(Function) Function
47#endif
48
49
50// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
51
52namespace
53{
54
55inline bool checkCount(std::streamsize count, const char* what)
56{
57 #ifndef Foam_UPstream_largeCountFile
58 if (FOAM_UNLIKELY(count > std::streamsize(INT_MAX)))
59 {
60 using namespace Foam;
62 << "Write size " << label(count)
63 << " exceeds INT_MAX bytes for '" << what << "'\n"
65 return false;
66 }
67 #endif
68
69 return true;
70}
71
72} // End anonymous namespace
73
74
75// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
76
77namespace Foam
78{
79
80/*---------------------------------------------------------------------------*\
81 Class UPstream::File::Impl Declaration
82\*---------------------------------------------------------------------------*/
83
85{
86 //- The file-handle
87 MPI_File fh_;
88
89 //- Path of the open file
90 fileName name_;
91
92 //- The current state (open|read|write|closed etc)
93 int state_;
94
95 //- The associated rank when openned
96 int rank_;
97
98public:
99
100 //- The file states
101 enum states : int { CLOSED = 0, READ, WRITE, ATOMIC_WRITE };
102
103 // Constructors
105 //- Default construct
106 Impl()
107 :
108 fh_(MPI_FILE_NULL),
109 state_(CLOSED),
110 rank_(0)
111 {}
112
113
114 // Member Functions
115
116 // The file handle
117 const MPI_File& handle() const noexcept { return fh_; }
118 MPI_File& handle() noexcept { return fh_; }
119
120 // Path to the open file
121 const fileName& name() const noexcept { return name_; }
122 fileName& name() noexcept { return name_; }
124 // Change the file state, return the old value
125 int state(states val) noexcept
127 int old(state_);
128 state_ = val;
129 return old;
131
132 //- Is rank 0 ? (master rank)
133 bool master() const noexcept { return (rank_ == 0); }
134
135 //- Get the associated rank
136 int rank() const noexcept { return rank_; }
137
138 //- Set the associated rank
139 void rank(int val) noexcept { rank_ = val; }
141
142 // Checks
143
144 // The file state
145 bool is_open() const noexcept { return state_; }
146
147 // The file read state
148 bool is_read() const noexcept
149 {
150 return (states::READ == state_);
151 }
152
153 // The file write atomic state
154 bool is_atomic() const noexcept
155 {
156 return (states::ATOMIC_WRITE == state_);
157 }
158
159 // The file write state (atomic or non-atomic)
160 bool is_write() const noexcept
161 {
162 return (states::ATOMIC_WRITE == state_ || states::WRITE == state_);
163 }
164
165 //- Assert is_read() or FatalError
166 inline bool checkReadable(const char* what) const
167 {
168 if (FOAM_UNLIKELY(!is_read()))
169 {
171 << "File handler not open for reading '" << what << "'\n"
172 << "name: " << name() << nl
174 return false;
175 }
176 return true;
177 }
178
179 //- Assert is_write() or FatalError
180 inline bool checkWritable(const char* what) const
181 {
182 if (FOAM_UNLIKELY(!is_write()))
183 {
185 << "File handler not open for writing'" << what << "'\n"
186 << "name: " << name() << nl
188 return false;
189 }
190 return true;
191 }
192};
193
194} // End namespace Foam
196
197// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
198
200{
201 return true;
202}
203
204
205// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
206
208:
209 file_(new UPstream::File::Impl)
210{}
211
212
213// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
214
216{
217 if (FOAM_UNLIKELY(file_ && file_->is_open()))
218 {
220 << "Exited scope without close()" << nl
221 << " FIX YOUR CODE!!" << endl;
222 // Do not call close() since we don't know where that collective
223 // should have been called
224 }
225}
226
227
228// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
229
231{
232 return (file_ ? file_->name() : fileName::null);
233}
234
235
237{
238 return bool(file_ && file_->is_open());
239}
240
241
243{
244 if (FOAM_UNLIKELY(!file_->is_open()))
245 {
247 << "Called without an open file handler !" << endl;
248 return false;
249 }
250
251 MPI_File_close(&(file_->handle()));
252
253 // Atomic rename of file (master only)
254 const fileName& pathname = file_->name();
255
256 if (file_->master() && file_->is_atomic() && !pathname.empty())
257 {
258 std::rename
259 (
260 (pathname + "~tmp~").c_str(),
261 pathname.c_str()
262 );
263 }
264
265 file_->state(Impl::CLOSED);
266 file_->name() = "";
267 file_->rank(0);
268
269 return true;
270}
271
272
273// * * * * * * * * * * * * Member Functions (Reading) * * * * * * * * * * * //
274
275#if 0
276bool Foam::UPstream::File::open_read
277(
278 const int communicator,
279 const fileName& pathname
280)
281{
282 //Needed? PstreamGlobals::checkCommunicator(communicator, 0);
283
284 if (FOAM_UNLIKELY(file_->is_open()))
285 {
287 << "Previous use of file handler did not call close()" << nl
288 << " FIX YOUR CODE!!" << endl;
289 // Do not call close() since we don't know where that collective
290 // should have been called
291 }
292 file_->state(Impl::CLOSED);
293 file_->name() = pathname; // <- set now for external error messages
294 file_->rank(0);
295
296 int returnCode = MPI_File_open
297 (
299 pathname.c_str(),
300 (MPI_MODE_RDONLY),
301 MPI_INFO_NULL,
302 &(file_->handle())
303 );
304
305 if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
306 {
308 << "Error encounted in MPI_File_open() : "
309 << pathname << nl
311
312 return false;
313 }
314
315 file_->state(Impl::READ);
316 file_->name() = pathname;
317 file_->rank(UPstream::myProcNo(communicator));
318
319 return true; // ie, is_read()
320}
321#endif
322
323
324// * * * * * * * * * * * * Non-Collective Reading * * * * * * * * * * * * * //
325
326#if 0
327bool Foam::UPstream::File::get_header(DynamicList<char>& content)
328{
329 std::streamsize headerSize(4096);
330
331 // constexpr const char* const func = "MPI_File_read_at";
332 file_->checkReadable("MPI_File_read_at");
333
334 if (off_t fileLen = Foam::fileSize(this->name()); fileLen >= 0)
335 {
336 std::streamsize size = std::streamsize(fileLen);
337 if (headerSize > size)
338 {
339 headerSize = size;
340 }
341 }
342 else
343 {
344 content.clear();
345 return false;
346 }
347
348 // Get the first header content:
349 content.resize_nocopy(headerSize);
350
351 int returnCode = Foam_mpiCall(MPI_File_read_at)
352 (
353 file_->handle(),
354 0, // offset
355 content.data(),
356 content.size(),
357 MPI_BYTE,
358 MPI_STATUS_IGNORE
359 );
360
361 // Wrap as ISpanStream headerStream(content);
362 if (MPI_SUCCESS == returnCode)
363 {
364 ISpanStream is(content);
365 dictionary headerDict;
366
367 // Read the regular "FoamFile" header
368 bool ok = io.readHeader(headerDict, is);
369
370 // Probably collated - extract class from "data.class"
371 if
372 (
374 && headerDict.readIfPresent("data.class", io.headerClassName())
375 )
376 {
377 return ok;
378 }
379 }
380
381 return (MPI_SUCCESS == returnCode);
382}
383#endif
384
385
386// * * * * * * * * * * * * Member Functions (Writing) * * * * * * * * * * * //
387
389(
390 const int communicator,
391 const fileName& pathname,
393)
394{
395 //Needed? PstreamGlobals::checkCommunicator(communicator, 0);
396
397 if (FOAM_UNLIKELY(file_->is_open()))
398 {
400 << "Previous use of file handler did not call close()" << nl
401 << " FIX YOUR CODE!!" << endl;
402 // Do not call close() since we don't know where that collective
403 // should have been called
404 }
405 file_->state(Impl::CLOSED);
406 file_->name() = pathname; // <- set now for external error messages
407 file_->rank(0);
408
409 const bool atomic = (IOstreamOption::atomicType::ATOMIC == atomicType);
410
411 // When opening new files, remove file variants out of the way.
412 // Eg, opening "file1"
413 // - remove old "file1.gz" (compressed)
414 // - also remove old "file1" if it is a symlink
415
416 const fileName pathname_gz(pathname + ".gz");
417 const fileName pathname_tmp(pathname + "~tmp~");
418
419 // File to open with MPI_File_open
420 const auto& target = (atomic ? pathname_tmp : pathname);
421
422 // Remove old compressed version (if any)
423 if
424 (
425 auto fType = Foam::type(pathname_gz, false);
426 (fType == fileName::SYMLINK || fType == fileName::FILE)
427 )
428 {
429 Foam::rm(pathname_gz);
430 }
431
432 // Avoid writing into symlinked files (non-append mode)
433 if
434 (
435 auto fType = Foam::type(target, false);
436 fType == fileName::SYMLINK
437 )
438 {
439 Foam::rm(target);
440 }
441
442 int returnCode = MPI_File_open
443 (
445 target.c_str(),
446 (MPI_MODE_CREATE | MPI_MODE_WRONLY),
447 MPI_INFO_NULL,
448 &(file_->handle())
449 );
450
451 if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
452 {
454 << "Error encounted in MPI_File_open() : "
455 << target << nl
457
458 return false;
459 }
460
461 file_->state(atomic ? Impl::ATOMIC_WRITE : Impl::WRITE);
462 file_->name() = pathname;
463 file_->rank(UPstream::myProcNo(communicator));
464
465 return true; // ie, is_write()
466}
467
468
469bool Foam::UPstream::File::set_size(std::streamsize num_bytes)
470{
471 if (FOAM_UNLIKELY(!file_->is_open()))
472 {
474 << "Called without an open file handler !" << endl;
475 return false;
476 }
477
478 int returnCode = MPI_File_set_size(file_->handle(), num_bytes);
479
480 return (MPI_SUCCESS == returnCode);
481}
482
483
484// * * * * * * * * * * * * Non-Collective Writing * * * * * * * * * * * * * //
485
487(
488 const void* data,
489 std::streamsize count,
490 const UPstream::dataTypes dataTypeId
491)
492{
493 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
494
495 // constexpr const char* const func = "MPI_File_write";
496 file_->checkWritable("MPI_File_write");
497 checkCount(count, "MPI_File_write");
498
499 int returnCode = Foam_mpiCall(MPI_File_write)
500 (
501 file_->handle(),
502 data,
503 count,
504 datatype,
505 MPI_STATUS_IGNORE
506 );
507
508 return (MPI_SUCCESS == returnCode);
509}
510
511
513(
514 std::streamsize offset,
515 const void* data,
516 std::streamsize count,
517 const UPstream::dataTypes dataTypeId
518)
519{
520 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
521
522 // constexpr const char* const func = "MPI_File_write_at";
523 file_->checkWritable("MPI_File_write_at");
524 checkCount(count, "MPI_File_write_at");
525
526 int returnCode = Foam_mpiCall(MPI_File_write_at)
527 (
528 file_->handle(),
529 offset,
530 data,
531 count,
532 datatype,
533 MPI_STATUS_IGNORE
534 );
535
536 return (MPI_SUCCESS == returnCode);
537}
538
539
540// * * * * * * * * * * * * * Collective Writing * * * * * * * * * * * * * * //
541
543(
544 const void* data,
545 std::streamsize count,
546 const UPstream::dataTypes dataTypeId
547)
548{
549 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
550
551 // constexpr const char* const func = "MPI_File_write_all";
552 file_->checkWritable("MPI_File_write_all");
553 checkCount(count, "MPI_File_write_all");
554
555 int returnCode = Foam_mpiCall(MPI_File_write_all)
556 (
557 file_->handle(),
558 data,
559 count,
560 datatype,
561 MPI_STATUS_IGNORE
562 );
563
564 return (MPI_SUCCESS == returnCode);
565}
566
567
569(
570 std::streamsize offset,
571 const void* data,
572 std::streamsize count,
573 const UPstream::dataTypes dataTypeId
574)
575{
576 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
577
578 // constexpr const char* const func = "MPI_File_write_at_all";
579 file_->checkWritable("MPI_File_write_at_all");
580 checkCount(count, "MPI_File_write_at_all");
581
582 int returnCode = Foam_mpiCall(MPI_File_write_at_all)
583 (
584 file_->handle(),
585 offset,
586 data,
587 count,
588 datatype,
589 MPI_STATUS_IGNORE
590 );
591
592 return (MPI_SUCCESS == returnCode);
593}
594
595
596// bool Foam::UPstream::File::write_data_all_begin
597// (
598// const void* data,
599// std::streamsize count,
600// const UPstream::dataTypes dataTypeId
601// )
602// {
603// MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
604//
605// // constexpr const char* const func = "MPI_File_write_all_begin";
606// file_->checkWritable("MPI_File_write_all_begin");
607// checkCount(count, "MPI_File_write_all_begin");
608//
609// int returnCode = Foam_mpiCall(MPI_File_write_all_begin)
610// (
611// file_->handle(),
612// data,
613// count,
614// datatype,
615// MPI_STATUS_IGNORE
616// );
617//
618// return (MPI_SUCCESS == returnCode);
619// }
620
621
622// bool Foam::UPstream::File::write_data_all_end
623// (
624// const void* data
625// )
626// {
627// file_->checkWritable("MPI_File_write_all_end");
628// int returnCode = Foam_mpiCall(MPI_File_write_all_end)
629// (
630// file_->handle(),
631// data
632// MPI_STATUS_IGNORE
633// );
634//
635// return (MPI_SUCCESS == returnCode);
636// }
637
638
639// bool Foam::UPstream::File::write_data_at_all_begin
640// (
641// std::streamsize offset,
642// const void* data,
643// std::streamsize count,
644// const UPstream::dataTypes dataTypeId
645// )
646// {
647// MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
648//
649// // constexpr const char* const func = "MPI_File_write_at_all_begin";
650// file_->checkWritable("MPI_File_write_at_all_begin");
651// checkCount(count, "MPI_File_write_at_all_begin");
652//
653// int returnCode = Foam_mpiCall(MPI_File_write_at_all_begin)
654// (
655// file_->handle(),
656// offset,
657// data,
658// count,
659// datatype,
660// MPI_STATUS_IGNORE
661// );
662//
663// return (MPI_SUCCESS == returnCode);
664// }
665
666
667// bool Foam::UPstream::File::write_data_at_all_end
668// (
669// const void* data
670// )
671// {
672// file_->checkWritable("MPI_File_write_at_all_end");
673//
674// int returnCode = Foam_mpiCall(MPI_File_write_at_all_end)
675// (
676// file_->handle(),
677// data
678// MPI_STATUS_IGNORE
679// );
680//
681// return (MPI_SUCCESS == returnCode);
682// }
683
684
685// ************************************************************************* //
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void resize_nocopy(const label len)
Alter addressable list size, allocating new space if required without necessarily recovering old cont...
atomicType
Atomic operations (output).
@ ATOMIC
atomic = true
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition UListI.H:274
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
bool checkReadable(const char *what) const
Assert is_read() or FatalError.
void rank(int val) noexcept
Set the associated rank.
MPI_File & handle() noexcept
bool is_write() const noexcept
bool is_open() const noexcept
bool is_read() const noexcept
bool master() const noexcept
Is rank 0 ? (master rank).
fileName & name() noexcept
bool checkWritable(const char *what) const
Assert is_write() or FatalError.
const fileName & name() const noexcept
int state(states val) noexcept
int rank() const noexcept
Get the associated rank.
bool is_atomic() const noexcept
const MPI_File & handle() const noexcept
Impl()
Default construct.
~File()
Destructor. Non-default in header (incomplete types).
bool is_open() const
True if allocated and open has been called.
bool open_write(const int communicator, const fileName &pathname, IOstreamOption::atomicType=IOstreamOption::NON_ATOMIC)
MPI_File_open [collective] : open file in write-only mode, no-append.
bool write_data_at_all(std::streamsize offset, const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_at_all [collective] : write data at specified offset.
bool close()
MPI_File_close [collective].
File(const File &&)=delete
No copy construct.
const fileName & name() const
The name of the open stream.
static bool supported()
True if MPI/IO appears to be supported.
bool set_size(std::streamsize num_bytes)
Set the (output) file size [collective].
File()
Default construct.
bool write_data_at(std::streamsize offset, const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_at [non-collective] : write data at specified offset.
bool write_data_all(const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write_all [collective] : write data.
bool write_data(const void *buffer, std::streamsize count, const UPstream::dataTypes dataTypeId)
MPI_File_write [non-collective] : write data.
Inter-processor communications stream.
Definition UPstream.H:69
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition UPstream.H:1706
UPstream(const commsTypes commsType) noexcept
Construct for given communication type.
Definition UPstream.H:1184
dataTypes
Mapping of some fundamental and aggregate types to MPI data types.
Definition UPstream.H:107
static bool isCollatedType(const word &objectType)
True if object type is a known collated type.
bool readIfPresent(const word &keyword, T &val, enum keyType::option matchOpt=keyType::REGEX) const
Find an entry if present, and assign to T val. FatalIOError if it is found and the number of tokens i...
A class for handling file names.
Definition fileName.H:75
@ SYMLINK
A symbolic link.
Definition fileName.H:86
@ FILE
A regular file.
Definition fileName.H:84
static const fileName null
An empty fileName.
Definition fileName.H:111
static std::string name(const std::string &str)
Return basename (part beyond last /), including its extension.
Definition fileNameI.H:192
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
const auto & io
#define WarningInFunction
Report a warning using Foam::Warning.
#define Foam_mpiCall(Function)
MPI_Datatype getDataType(UPstream::dataTypes id)
Lookup of dataTypes enumeration as an MPI_Datatype.
DynamicList< MPI_Comm > MPICommunicators_
Namespace for OpenFOAM.
bool rm(const fileName &file)
Remove a file (or its gz equivalent), returning true if successful.
Definition POSIX.C:1406
fileName::Type type(const fileName &name, const bool followLink=true)
Return the file type: DIRECTORY or FILE, normally following symbolic links.
Definition POSIX.C:801
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
errorManip< error > abort(error &err)
Definition errorManip.H:139
off_t fileSize(const fileName &name, const bool followLink=true)
Return size of file or -1 on failure (normally follows symbolic links).
Definition POSIX.C:907
const direction noexcept
Definition scalarImpl.H:265
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50
#define FOAM_UNLIKELY(cond)
Definition stdFoam.H:64