Loading...
Searching...
No Matches
decomposedBlockData.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2017-2018 OpenFOAM Foundation
9 Copyright (C) 2020-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "decomposedBlockData.H"
30#include "Fstream.H"
31#include "IPstream.H"
32#include "OPstream.H"
33#include "SpanStream.H"
34#include "dictionary.H"
37
38// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
39
40namespace Foam
45
46// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
49{
50 return (objectType == decomposedBlockData::typeName);
51}
52
53
55{
56 // same: return io.isHeaderClass(decomposedBlockData::typeName);
57 // same: return isCollatedType(io.headerClassName());
58 return io.isHeaderClass<decomposedBlockData>();
59}
60
61
62// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
63
65(
66 const label comm,
67 const IOobject& io,
68 const UPstream::commsTypes commsType
69)
70:
72 commsType_(commsType),
73 comm_(comm)
74{
75 // Temporary warning
77 {
79 << "decomposedBlockData " << name()
80 << " constructed with READ_MODIFIED"
81 " but decomposedBlockData does not support automatic rereading."
82 << endl;
83 }
84 if (isReadRequired() || (isReadOptional() && headerOk()))
85 {
87 }
88}
89
90
91// * * * * * * * * * * * * * * * Members Functions * * * * * * * * * * * * * //
92
94(
95 OSstream& os,
96 const label blocki,
97 const char* str,
98 const size_t len
99)
100{
101 // Offset to the beginning of this output
102 // This should generally be OK for non-compressed streams
103 // (eg, std::ofstream)
104
105 std::streamoff blockOffset = os.stdStream().tellp();
106
107 const word procName("processor" + Foam::name(blocki));
108
109 // Write as primitiveEntry or commented content
110 constexpr bool isDictFormat = false;
111
112 if constexpr (isDictFormat)
113 {
114 // Like writeKeyword()
115 os << nl << procName << nl;
116 }
117 else
118 {
119 // Human-readable comments
120 os << nl << "// " << procName << nl;
121 }
122
123 // Data
124 if (str && len > 0)
125 {
126 // Special treatment for char data (binary I/O only)
127 const auto oldFmt = os.format(IOstreamOption::BINARY);
128
129 os << label(len) << nl;
130 os.write(str, len);
131 os << nl;
132
133 os.format(oldFmt);
134 }
135 else
136 {
137 os << label(0) << nl;
138 }
139
140 if constexpr (isDictFormat)
141 {
142 os.endEntry();
143 }
144
145 return blockOffset;
146}
147
148
150(
151 Istream& is,
152 List<char>& charData
153)
154{
155 // Handle any of these:
156
157 // 0. NCHARS (...)
158 // 1. List<char> NCHARS (...)
159 // 2. processorN List<char> NCHARS (...) ;
160 // 3. processorN NCHARS (...) ;
161
162 is.fatalCheck(FUNCTION_NAME);
163 token tok(is);
164 is.fatalCheck(FUNCTION_NAME);
165
166 // Dictionary format?
167 // - has a keyword, which is not a compound token:
168 const bool isDictFormat = (tok.isWord() && !tok.isCompound());
169
170 if (!isDictFormat && tok.good())
171 {
172 is.putBack(tok);
173 }
174 charData.readList(is);
175
176 if (isDictFormat)
177 {
178 is.fatalCheck(FUNCTION_NAME);
179 is >> tok;
180 is.fatalCheck(FUNCTION_NAME);
181
182 // Swallow trailing ';'
183 if (tok.good() && !tok.isPunctuation(token::END_STATEMENT))
184 {
185 is.putBack(tok);
187 }
188
189 return true;
190}
191
192
194{
195 // As per readBlockEntry but seeks instead of reading.
196 // Internals like charList::readList - ie, always binary
197
198 // Handle any of these:
199 // 0. NCHARS (...)
200 // 1. List<char> NCHARS (...)
201 // 2. processorN List<char> NCHARS (...) ;
202 // 3. processorN NCHARS (...) ;
203
204 if (!is.good()) return false;
205 token tok(is);
206 if (!is.good()) return false;
207
208 // Dictionary format?
209 // - has a keyword, which is not a compound token:
210 const bool isDictFormat = (tok.isWord() && !tok.isCompound());
211
212 if (isDictFormat)
213 {
214 is >> tok;
215 if (!is.good()) return false;
216 }
217
218
219 bool handled = false;
220
221 // Like charList::readList
222 if (tok.isCompound())
223 {
224 handled = true;
225 }
226 else if (tok.isLabel())
227 {
228 // Label: could be int(..) or just a plain '0'
229
230 const label len = tok.labelToken();
231
232 // Special treatment for char data (binary I/O only)
233 const auto oldFmt = is.format(IOstreamOption::BINARY);
234
235 if (len)
236 {
237 // read(...) includes surrounding start/end delimiters.
238
239 // Note: nullptr to ignore instead of reading
240 is.read(nullptr, std::streamsize(len));
241 }
242 is.format(oldFmt);
243
244 handled = true;
245 }
246 else
247 {
248 // Incorrect token
249 return false;
250 }
251
252 if (isDictFormat)
253 {
254 is.fatalCheck(FUNCTION_NAME);
255 is >> tok;
256 is.fatalCheck(FUNCTION_NAME);
257
258 // Swallow trailing ';'
259 if (tok.good() && !tok.isPunctuation(token::END_STATEMENT))
260 {
261 is.putBack(tok);
263 }
264
265 return handled;
266}
267
268
270(
271 Istream& is,
272 const label maxNumBlocks
273)
274{
275 label nBlocks = 0;
276
277 // Handle OpenFOAM header if it is the first entry
278 if (is.good())
279 {
280 token tok(is);
281
282 if (is.good() && tok.isWord("FoamFile"))
283 {
284 dictionary headerDict(is); // Read sub-dictionary content
285
286 if (headerDict.readIfPresent("version", tok))
287 {
288 is.version(tok);
289 }
290
291 word formatName;
292 if (headerDict.readIfPresent("format", formatName))
293 {
294 is.format(formatName);
295 }
296
299 //if (headerDict.readIfPresent("blocks", nBlocks))
300 //{
301 // return nBlocks;
302 //}
303 }
304 else if (tok.good())
305 {
306 is.putBack(tok);
307 }
308 }
309
310 while (is.good() && skipBlockEntry(is))
311 {
312 ++nBlocks;
313
314 if (maxNumBlocks == nBlocks)
315 {
316 break;
318 }
319
320 return nBlocks;
321}
322
323
324bool Foam::decomposedBlockData::hasBlock(Istream& is, const label blockNumber)
325{
326 return
328 blockNumber >= 0
329 && (blockNumber < getNumBlocks(is, blockNumber+1))
330 );
331}
332
333
335(
336 OSstream& os,
337 IOstreamOption streamOptData,
338 const regIOobject& io,
339 const label blocki,
340 const bool withLocalHeader
341)
342{
343 // Serialize content to write
344 DynamicList<char> serialized;
345 {
346 OCharStream buf(streamOptData);
347 buf.reserve(4*1024); // Start with a slightly larger buffer
348
349 bool ok = true;
350
351 // Generate FoamFile header on master, without comment banner
352 if (withLocalHeader)
353 {
354 const bool old = IOobject::bannerEnabled(false);
355
356 ok = io.writeHeader(buf);
357
359 }
360
361 // Serialize the output
362 ok = ok && io.writeData(buf);
363
364 if (!ok)
365 {
366 return std::streamoff(-1);
367 }
368
369 // Take ownership of serialized content
370 serialized = buf.release();
372
373 return decomposedBlockData::writeBlockEntry(os, blocki, serialized);
374}
375
376
379(
380 const label blocki,
381 ISstream& is,
382 IOobject& headerIO
383)
384{
385 if (debug)
386 {
387 Pout<< "decomposedBlockData::readBlock:"
388 << " stream:" << is.name() << " attempt to read block " << blocki
389 << endl;
390 }
391
392 // The character input stream for the specified block
393 autoPtr<ISstream> blockIsPtr;
394
395 // Extracted header information
396 IOstreamOption streamOptData;
397 unsigned labelWidth = is.labelByteSize();
398 unsigned scalarWidth = is.scalarByteSize();
399
400 // Read master for header
401 List<char> data;
403
404 if (blocki == 0) // ie, UPstream::masterNo()
405 {
406 blockIsPtr.reset(new ICharStream(std::move(data)));
407 blockIsPtr->name() = is.name();
408
409 {
410 // Read header from first block,
411 // advancing the stream position
412 if (!headerIO.readHeader(*blockIsPtr))
413 {
414 FatalIOErrorInFunction(*blockIsPtr)
415 << "Problem while reading object header "
416 << is.relativeName() << nl
417 << exit(FatalIOError);
418 }
419 }
420 }
421 else
422 {
423 {
424 // Read header from first block,
425 // without advancing the stream position
426 ISpanStream headerStream(data);
427 if (!headerIO.readHeader(headerStream))
428 {
429 FatalIOErrorInFunction(headerStream)
430 << "Problem while reading object header "
431 << is.relativeName() << nl
432 << exit(FatalIOError);
433 }
434 streamOptData = static_cast<IOstreamOption>(headerStream);
435 labelWidth = headerStream.labelByteSize();
436 scalarWidth = headerStream.scalarByteSize();
437 }
438
439 // Skip intermediate blocks
440 for (label i = 1; i < blocki; ++i)
441 {
443 }
444
445 // Read the block of interest
447
448 blockIsPtr.reset(new ICharStream(std::move(data)));
449 blockIsPtr->name() = is.name();
450
451 // Apply stream settings
452 {
453 auto& iss = blockIsPtr();
454 iss.format(streamOptData.format());
455 iss.version(streamOptData.version());
456 iss.setLabelByteSize(labelWidth);
457 iss.setScalarByteSize(scalarWidth);
459 }
460
461 return blockIsPtr;
462}
463
464
466(
467 const label comm,
468 autoPtr<ISstream>& isPtr,
469 List<char>& localData,
470 const UPstream::commsTypes /* unused */
471)
472{
473 if (debug)
474 {
475 Pout<< "decomposedBlockData::readBlocks:"
476 << " stream:" << (isPtr ? isPtr->name() : "<null>")
477 << " non-blocking comm:" << comm << endl;
478 }
479
480 // Read data on master and transmit. Always non-blocking
481 bool ok = false;
482
483 // The send buffers
484 List<List<char>> procBuffers;
485
486 // Some unique tag for this read/write grouping (as extra safety)
487 const int messageTag = (UPstream::msgType() + 256);
488
489 const label startOfRequests = UPstream::nRequests();
490
491 if (UPstream::master(comm))
492 {
493 auto& is = isPtr();
494 is.fatalCheck(FUNCTION_NAME);
495
496 // Read master data
498
499 // Read proc data and setup non-blocking sends
500 procBuffers.resize(UPstream::nProcs(comm));
501 for (const int proci : UPstream::subProcs(comm))
502 {
503 auto& slot = procBuffers[proci];
504
506
507 // Send content (non-blocking)
509 (
511 proci,
512 slot.cdata_bytes(),
513 slot.size_bytes(),
514 messageTag,
515 comm
516 );
517 }
518
519 ok = is.good();
520 }
521 else if (UPstream::is_subrank(comm))
522 {
523 List<char>& slot = localData;
524
525 // Probe for the message size
526 const auto [fromProci, numBytes] =
528 (
529 UPstream::commsTypes::scheduled, // blocking call
531 messageTag,
532 comm
533 );
534
535 slot.resize_nocopy(numBytes);
536
537 if (debug)
538 {
539 Pout<< "probed to receive " << label(numBytes) << " from "
540 << fromProci << endl;
541 }
542
543 // Receive content (can also be zero-sized)
545 (
548 slot.data_bytes(),
549 slot.size_bytes(),
550 messageTag,
551 comm
552 );
553 }
554
555 UPstream::waitRequests(startOfRequests);
556 procBuffers.clear();
557
558 // Sync the status
559 Pstream::broadcast(ok, comm);
560
561 return ok;
562}
563
564
566(
567 const label comm,
568 const fileName& fName,
569 autoPtr<ISstream>& isPtr,
570 IOobject& headerIO,
571 const UPstream::commsTypes /* unused */
572)
573{
574 if (debug)
575 {
576 Pout<< "decomposedBlockData::readBlocks:"
577 << " stream:" << (isPtr ? isPtr->name() : "<null>")
578 << " non-blocking" << endl;
579 }
580
581 // Read data on master and transmit. Always non-blocking
582 bool ok = false;
583 List<char> localData;
584 List<List<char>> procBuffers;
585 autoPtr<ISstream> blockIsPtr;
586
587 // Some unique tag for this read/write/probe grouping
588 const int messageTag = (UPstream::msgType() + 256);
589
590 const label startOfRequests = UPstream::nRequests();
591
592 if (UPstream::master(comm))
593 {
594 auto& is = *isPtr;
595 is.fatalCheck(FUNCTION_NAME);
596
597 // Read master data
599
600 // Move block data into a stream
601 blockIsPtr.reset(new ICharStream(std::move(localData)));
602 blockIsPtr->name() = fName;
603
604 {
605 // Read header from first block,
606 // advancing the stream position
607 if (!headerIO.readHeader(*blockIsPtr))
608 {
609 FatalIOErrorInFunction(*blockIsPtr)
610 << "Problem while reading object header "
611 << is.relativeName() << nl
612 << exit(FatalIOError);
613 }
614 }
615
616 // Read proc data and setup non-blocking sends
617 procBuffers.resize(UPstream::nProcs(comm));
618 for (const int proci : UPstream::subProcs(comm))
619 {
620 auto& slot = procBuffers[proci];
621
623
624 // Send content - non-blocking mode
626 (
628 proci,
629 slot.cdata_bytes(),
630 slot.size_bytes(),
631 messageTag,
632 comm
633 );
634 }
635
636 ok = is.good();
637 }
638 else if (UPstream::is_subrank(comm))
639 {
640 List<char>& slot = localData;
641
642 // Probe for the message size
643 const auto [fromProci, numBytes] =
645 (
646 UPstream::commsTypes::scheduled, // blocking call
648 messageTag,
649 comm
650 );
651
652 slot.resize_nocopy(numBytes);
653
654 if (debug)
655 {
656 Pout<< "probed to receive " << label(numBytes) << " from "
657 << fromProci << endl;
658 }
659
660 // Receive content (can also be zero-sized)
662 (
665 slot.data_bytes(),
666 slot.size_bytes(),
667 messageTag,
668 comm
669 );
670 }
671
672 UPstream::waitRequests(startOfRequests);
673 procBuffers.clear();
674
675 if (UPstream::is_subrank(comm))
676 {
677 // Move block data into a stream
678 blockIsPtr.reset(new ICharStream(std::move(localData)));
679 blockIsPtr->name() = fName;
680 }
681
682 // Broadcast master header info,
683 // set stream properties from blockIsPtr on master
684
685 int verValue(0);
686 int fmtValue(0);
687 unsigned labelWidth(0);
688 unsigned scalarWidth(0);
689 word headerName(headerIO.name());
690
691 // The stream characteristics
692 //
693 // unsigned formatSizes
694 // (
695 // ((static_cast<unsigned>(iss.format()) & 0xFF) << 16)
696 // | ((iss.labelByteSize() & 0xFF) << 8)
697 // | ((iss.scalarByteSize() & 0xFF))
698 // );
699
700 if (UPstream::master(comm))
701 {
702 auto& iss = blockIsPtr();
703 verValue = iss.version().canonical();
704 fmtValue = static_cast<int>(iss.format());
705 labelWidth = iss.labelByteSize();
706 scalarWidth = iss.scalarByteSize();
707 }
708
710 (
711 comm,
712 verValue,
713 fmtValue,
714 labelWidth,
715 scalarWidth,
716 headerName,
717 headerIO.headerClassName(),
718 headerIO.note()
719 // Unneeded: headerIO.instance()
720 // Unneeded: headerIO.local()
721 );
722
723 if (blockIsPtr)
724 {
725 auto& iss = *blockIsPtr;
726 iss.version(IOstreamOption::versionNumber::canonical(verValue));
727 iss.format(IOstreamOption::streamFormat(fmtValue));
728 iss.setLabelByteSize(labelWidth);
729 iss.setScalarByteSize(scalarWidth);
730 }
731
732 headerIO.rename(headerName);
733
734 if (debug)
735 {
736 Info<< "reading ok:" << ok << endl;
737 }
738
739 return blockIsPtr;
740}
741
742
744(
745 const label comm,
746 const UList<char>& localData,
747 const labelUList& recvSizes,
748
749 const labelRange& whichProcs,
750
751 List<int>& sliceOffsets,
752 DynamicList<char>& recvData,
753 const UPstream::commsTypes commsType
754)
755{
756 const label myRank = UPstream::myProcNo(comm);
757 const label nProcs = UPstream::nProcs(comm);
758
759 // Some unique tag for this read/write grouping (as extra safety)
760 const int messageTag = (UPstream::msgType() + 256);
761
762 int nSendBytes = 0;
763 recvData.clear();
764
765 // On master, calculate sizing/offsets and resize the recv buffer.
766 // Do not need sliceSizes when nonBlocking
767 List<int> sliceSizes;
768 if (UPstream::master(comm))
769 {
770 sliceSizes.resize_nocopy(nProcs);
771 sliceSizes = 0;
772 sliceOffsets.resize_nocopy(nProcs+1);
773 sliceOffsets = 0;
774
775 int totalSize = 0;
776 for (const label proci : whichProcs)
777 {
778 const auto nRecvBytes = static_cast<int>(recvSizes[proci]);
779
780 sliceOffsets[proci] = totalSize;
781 totalSize += nRecvBytes;
782
783 sliceSizes[proci] = nRecvBytes;
784 }
785
786 // One beyond the end of the range
787 const label endProci = whichProcs.end_value();
788
789 sliceOffsets[endProci] = totalSize;
790 recvData.resize_nocopy(totalSize);
791 }
792 else if (whichProcs.contains(myRank) && !localData.empty())
793 {
794 // Note: UPstream::gather limited to int
795 nSendBytes = static_cast<int>(localData.size_bytes());
796 }
797
798 if (UPstream::commsTypes::nonBlocking == commsType)
799 {
800 if (UPstream::master(comm))
801 {
802 for (const label proci : whichProcs)
803 {
804 SubList<char> procSlice
805 (
806 recvData,
807 sliceOffsets[proci+1]-sliceOffsets[proci],
808 sliceOffsets[proci]
809 );
810
811 if (procSlice.empty())
812 {
813 continue;
814 }
815 else if (proci == UPstream::masterNo())
816 {
817 // No self-communication, although masterNo is normally
818 // not contained in whichProcs range anyhow.
819 std::copy
820 (
821 localData.cbegin(),
822 localData.cbegin(procSlice.size()),
823 procSlice.begin()
824 );
825 }
826 else
827 {
828 // Receive non-zero content
830 (
832 proci,
833 procSlice.data_bytes(),
834 procSlice.size_bytes(),
835 messageTag,
836 comm
837 );
838 }
839 }
840 }
841 else if (whichProcs.contains(myRank) && !localData.empty())
842 {
843 // Send non-zero content
845 (
848 localData.cdata_bytes(),
849 localData.size_bytes(),
850 messageTag,
851 comm
852 );
853 }
854
855 // Waiting is done by the caller
856 }
857 else
858 {
859 // This is MPI_Gatherv() !! - but this path is unlikely to be used
860
862 (
863 localData.cdata(),
864 nSendBytes,
865
866 recvData.data(),
867 sliceSizes,
868 sliceOffsets,
869 comm
870 );
871 }
872}
873
874
876(
877 const label comm,
878 autoPtr<OSstream>& osPtr,
879 List<std::streamoff>& blockOffset,
880 const UList<char>& localData,
881
882 const labelUList& recvSizes,
883 const UList<std::string_view>& procData,
884
885 const UPstream::commsTypes commsType,
886 const bool syncReturnState
887)
888{
889 const label nProcs = UPstream::nProcs(comm);
890
891 bool ok = true;
892
893 // Recovery of blockOffset is optional
894 if (UPstream::master(comm) && notNull(blockOffset))
895 {
896 blockOffset.resize(nProcs);
897 }
898
899 // Max proc data size to be received
900 label maxNonLocalSize = 0;
901 if (UPstream::master(comm) && procData.empty())
902 {
903 for (label proci = 1; proci < nProcs; ++proci)
904 {
905 maxNonLocalSize = Foam::max(maxNonLocalSize, recvSizes[proci]);
906 }
907 }
908
909 if (debug)
910 {
911 Pout<< " stream:" << (osPtr ? osPtr->name() : "<null>")
912 << " data:" << localData.size()
913 << " proc-data:" << procData.size()
914 << " max-size:" << maxNonLocalSize
915 << " " << UPstream::commsTypeNames[commsType] << endl;
916 }
917
918 if (procData.size())
919 {
920 // --------
921 // With pre-gathered proc data
922 // --------
923
924 if (UPstream::master(comm))
925 {
926 auto& os = osPtr();
927
928 std::streamoff currOffset =
930 (
931 os,
933 localData
934 );
935
936 if (blockOffset.size() > UPstream::masterNo())
937 {
938 blockOffset[UPstream::masterNo()] = currOffset;
939 }
940
941 // Write all pre-gathered proc data.
942 for (label proci = 1; proci < nProcs; ++proci)
943 {
944 currOffset =
946 (
947 os,
948 proci,
949 procData[proci]
950 );
951
952 if (blockOffset.size() > proci)
953 {
954 blockOffset[proci] = currOffset;
955 }
956 }
957
958 ok = os.good();
959 }
960 }
961 else if (commsType == UPstream::commsTypes::scheduled)
962 {
963 // --------
964 // Gather/write each rank, one at a time.
965 // Note: This is often associated with maxMasterFileBufferSize == 0
966 // --------
967
968 // Some unique tag for this read/write grouping (as extra safety)
969 const int messageTag = (UPstream::msgType() + 256);
970
971 if (UPstream::master(comm))
972 {
973 auto& os = osPtr();
974
975 std::streamoff currOffset =
977 (
978 os,
980 localData
981 );
982
983 if (blockOffset.size() > UPstream::masterNo())
984 {
985 blockOffset[UPstream::masterNo()] = currOffset;
986 }
987
988 // Could discard/recycle localData on master
989 // (if we had taken ownership...)
990
991 DynamicList<char> recvData(maxNonLocalSize);
992 for (label proci = 1; proci < nProcs; ++proci)
993 {
994 recvData.resize_nocopy(recvSizes[proci]);
995
996 if (!recvData.empty())
997 {
999 (
1001 proci,
1002 recvData.data_bytes(),
1003 recvData.size_bytes(),
1004 messageTag,
1005 comm
1006 );
1007 }
1008
1009 currOffset =
1011 (
1012 os,
1013 proci,
1014 recvData
1015 );
1016
1017 if (blockOffset.size() > proci)
1018 {
1019 blockOffset[proci] = currOffset;
1020 }
1021 }
1022
1023 ok = os.good();
1024 }
1025 else if (UPstream::is_subrank(comm) && !localData.empty())
1026 {
1028 (
1031 localData.cdata_bytes(),
1032 localData.size_bytes(),
1033 messageTag,
1034 comm
1035 );
1036 }
1037 }
1038 else
1039 {
1040 // --------
1041 // Gather/write ranks, packing together several smaller gathers
1042 // into a single buffer space
1043 // --------
1044
1045 DynamicList<char> recvData;
1046 List<int> recvOffsets; // Offsets into recvData
1047
1048 // Offsets of combined ranks for communication.
1049 // Never includes master rank (handled separately)
1050 labelList procOffsets(nProcs, Foam::zero{});
1051
1052 // Max combined data to be received (master only)
1053 label maxRecvCount = 0;
1054
1055 if (UPstream::master(comm))
1056 {
1057 // Find out how many ranks can be received into
1058 // maxMasterFileBufferSize and the corresponding schedule
1059
1060 off_t maxBufferSize
1061 (
1063 maxMasterFileBufferSize
1064 );
1065
1066 // Buffer must fit the largest off-processor size
1067 if (maxBufferSize < off_t(maxNonLocalSize))
1068 {
1069 maxBufferSize = off_t(maxNonLocalSize);
1070 }
1071
1072 // Max combined proc data size to be received
1073 off_t maxCollected = 0;
1074
1075 for (label proci = 1, nChunks = 0; proci < nProcs; /*nil*/)
1076 {
1077 procOffsets[nChunks] = proci;
1078
1079 // At least one proc, regardless of maxBufferSize.
1080 // Also handles the corner case when the first proc has
1081 // size 0, but the next one is too large.
1082
1083 for
1084 (
1085 off_t total = 0;
1086 (
1087 proci < nProcs
1088 && (!total || (total + recvSizes[proci] < maxBufferSize))
1089 );
1090 ++proci
1091 )
1092 {
1093 total += recvSizes[proci];
1094
1095 if (maxCollected < total)
1096 {
1097 maxCollected = total;
1098 }
1099 }
1100
1101 procOffsets[++nChunks] = proci;
1102 }
1103
1104 maxRecvCount = static_cast<label>(maxCollected);
1105 }
1106
1107 if (debug && UPstream::master(comm))
1108 {
1109 OStringStream ranges;
1110
1111 for (label nChunks = 1; nChunks < nProcs; ++nChunks)
1112 {
1113 const labelRange whichProcs
1114 (
1115 procOffsets[nChunks-1],
1116 procOffsets[nChunks]-procOffsets[nChunks-1]
1117 );
1118
1119 if (whichProcs.start() >= nProcs || whichProcs.size() <= 0)
1120 {
1121 break;
1122 }
1123
1124 ranges << ' ' << whichProcs.min() << '-' << whichProcs.max();
1125 }
1126
1127 Pout<< " write-schedule:" << ranges.str().c_str() << endl;
1128 }
1129
1130
1131 // Same schedule to be known by everyone
1132 UPstream::broadcast(procOffsets.data(), procOffsets.size(), comm);
1133
1134 recvData.resize_nocopy(label(maxRecvCount)); // (master only)
1135
1136 if (UPstream::master(comm))
1137 {
1138 auto& os = osPtr();
1139
1140 std::streamoff currOffset =
1142 (
1143 os,
1145 localData
1146 );
1147
1148 if (blockOffset.size() > UPstream::masterNo())
1149 {
1150 blockOffset[UPstream::masterNo()] = currOffset;
1151 }
1152 }
1153
1154 for (label nChunks = 1; nChunks < nProcs; ++nChunks)
1155 {
1156 const labelRange whichProcs
1157 (
1158 procOffsets[nChunks-1],
1159 procOffsets[nChunks]-procOffsets[nChunks-1]
1160 );
1161
1162 if (whichProcs.start() >= nProcs || whichProcs.size() <= 0)
1163 {
1164 break;
1165 }
1166
1167 const label startOfRequests = UPstream::nRequests();
1168
1169 // Setup non-blocking send/recv or MPI_Gatherv
1170 // - uses (UPstream::msgType()+256)
1171 gatherProcData
1172 (
1173 comm,
1174 localData,
1175 recvSizes,
1176
1177 whichProcs,
1178
1179 recvOffsets,
1180 recvData,
1181 commsType // ie, blocking or non-blocking
1182 );
1183
1184 // For sanity checks
1185 // const label endOfRequests = UPstream::nRequests();
1186
1187 if (UPstream::master(comm))
1188 {
1189 auto& os = osPtr();
1190
1191 // Write received data
1192 label currRequest = startOfRequests;
1193 for (const label proci : whichProcs)
1194 {
1195 SubList<char> procSlice
1196 (
1197 recvData,
1198 recvOffsets[proci+1]-recvOffsets[proci],
1199 recvOffsets[proci]
1200 );
1201
1202 if
1203 (
1205 && (proci != UPstream::masterNo())
1206 && !procSlice.empty()
1207 )
1208 {
1209 UPstream::waitRequest(currRequest);
1210 ++currRequest;
1211 }
1212
1213 std::streamoff currOffset =
1215 (
1216 os,
1217 proci,
1218 procSlice
1219 );
1220
1221 if (blockOffset.size() > proci)
1222 {
1223 blockOffset[proci] = currOffset;
1224 }
1225 }
1226
1227 ok = os.good();
1228 }
1229
1230 UPstream::waitRequests(startOfRequests);
1231 }
1232 }
1233
1234 if (syncReturnState)
1235 {
1236 //- Enable to get synchronised error checking.
1237 // Ensures that all procs are as slow as the master
1238 // (which does all the writing)
1239 Pstream::broadcast(ok, comm);
1240 }
1241
1242 return ok;
1243}
1244
1245
1247{
1248 autoPtr<ISstream> isPtr;
1249 fileName objPath(fileHandler().filePath(false, *this, word::null));
1250 if (UPstream::master(comm_))
1251 {
1252 isPtr.reset(new IFstream(objPath));
1253 IOobject::readHeader(*isPtr);
1254 }
1255
1257}
1258
1259
1261{
1262 IOobject io(*this);
1263 IOstreamOption streamOpt(os);
1264
1265 int verValue(0);
1266 int fmtValue(0);
1267 // Unneeded: word masterName(name());
1268 fileName masterLocation(instance()/db().dbDir()/local());
1269
1270 // Re-read my own data to find out the header information
1271 if (UPstream::master(comm_))
1272 {
1273 ISpanStream headerStream(contentData_);
1274 io.readHeader(headerStream);
1275
1276 verValue = headerStream.version().canonical();
1277 fmtValue = static_cast<int>(headerStream.format());
1278 }
1279
1280 // Broadcast header information
1282 (
1283 comm_,
1284 verValue,
1285 fmtValue,
1286 // Unneeded: masterName
1287 io.headerClassName(),
1288 io.note(),
1289 // Unneeded: io.instance()
1290 // Unneeded: io.local()
1291 masterLocation
1292 );
1293
1294 streamOpt.version(IOstreamOption::versionNumber::canonical(verValue));
1295 streamOpt.format(IOstreamOption::streamFormat(fmtValue));
1296
1297 if (UPstream::is_subrank(comm_))
1298 {
1300 (
1301 os,
1302 streamOpt, // streamOpt for data
1303 io.headerClassName(),
1304 io.note(),
1305 masterLocation,
1306 name(),
1307 dictionary()
1308 );
1309 }
1310
1311 // Write the character data
1312 if (isA<OSstream>(os))
1313 {
1314 // Serial stream - can output characters directly
1315 os.writeRaw(contentData_.cdata(), contentData_.size_bytes());
1316 }
1317 else
1318 {
1319 // Other cases are less fortunate, and no std::string_view
1320 os.writeQuoted(contentData_.cdata(), contentData_.size_bytes(), false);
1321 }
1322
1323 if (UPstream::is_subrank(comm_))
1324 {
1326 }
1327
1328 return os.good();
1329}
1330
1331
1333(
1334 IOstreamOption streamOpt,
1335 const bool writeOnProc
1336) const
1337{
1338 autoPtr<OSstream> osPtr;
1339 if (UPstream::master(comm_))
1340 {
1341 // Note: always write binary. These are strings so readable anyway.
1342 // They have already be tokenised on the sending side.
1343
1344 osPtr.reset(new OFstream(objectPath(), IOstreamOption::BINARY));
1345
1346 // Update meta-data for current state
1347 const_cast<regIOobject&>
1348 (
1349 static_cast<const regIOobject&>(*this)
1350 ).updateMetaData();
1351
1353 (
1354 *osPtr,
1355 streamOpt, // streamOpt for data
1356 static_cast<const IOobject&>(*this)
1357 );
1358 }
1359
1360 const labelList recvSizes
1361 (
1362 UPstream::listGatherValues<label>(contentData_.size(), comm_)
1363 );
1364
1365 List<std::streamoff> blockOffsets; // Optional
1366 return writeBlocks
1367 (
1368 comm_,
1369 osPtr,
1370 blockOffsets,
1371 contentData_,
1372 recvSizes,
1373 UList<std::string_view>(), // dummy proc data (nothing pre-gathered)
1374 commsType_
1375 );
1376}
1377
1378
1379// ************************************************************************* //
Input/output streams with (internal or external) character storage.
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
void clear() noexcept
Clear the addressed list, i.e. set the size to zero.
void resize_nocopy(const label len)
Alter addressable list size, allocating new space if required without necessarily recovering old cont...
An ISstream with internal List storage. Always UNCOMPRESSED.
Input from file stream as an ISstream, normally using std::ifstream for the actual input.
Definition IFstream.H:55
bool isReadOptional() const noexcept
True if (LAZY_READ) bits are set [same as READ_IF_PRESENT].
bool isReadRequired() const noexcept
True if (MUST_READ | READ_MODIFIED) bits are set.
readOption readOpt() const noexcept
Get the read option.
Defines the attributes of an object for which implicit objectRegistry management is supported,...
Definition IOobject.H:191
const word & name() const noexcept
Return the object name.
Definition IOobjectI.H:205
const word & headerClassName() const noexcept
Return name of the class name read from header.
Definition IOobjectI.H:223
bool readHeader(Istream &is)
Read header ('FoamFile' dictionary) and set the IOobject and stream characteristics.
static bool bannerEnabled() noexcept
Status of output file banner.
Definition IOobject.H:376
static Ostream & writeEndDivider(Ostream &os)
Write the standard end file divider.
const objectRegistry & db() const noexcept
Return the local objectRegistry.
Definition IOobject.C:450
const fileName & instance() const noexcept
Read access to instance path component.
Definition IOobjectI.H:289
virtual void rename(const word &newName)
Rename the object.
Definition IOobject.H:697
const string & note() const noexcept
Return the optional note.
Definition IOobjectI.H:235
fileName objectPath() const
The complete path + object name.
Definition IOobjectI.H:313
int canonical() const noexcept
From version to canonical integer value.
A simple container for options an IOstream can normally have.
versionNumber version() const noexcept
Get the stream version.
streamFormat format() const noexcept
Get the current stream format.
streamFormat
Data format (ascii | binary | coherent).
bool fatalCheck(const char *operation) const
Check IOstream status for given operation.
Definition IOstream.C:51
unsigned scalarByteSize() const noexcept
The sizeof (scalar) in bytes associated with the stream.
Definition IOstream.H:340
bool good() const noexcept
True if next operation might succeed.
Definition IOstream.H:281
unsigned labelByteSize() const noexcept
The sizeof (label) in bytes associated with the stream.
Definition IOstream.H:332
fileName relativeName() const
Return the name of the stream relative to the current case.
Definition IOstream.C:39
Similar to IStringStream but using an externally managed buffer for its input. This allows the input ...
Generic input stream using a standard (STL) stream.
Definition ISstream.H:54
virtual const fileName & name() const override
The name of the input serial stream. (eg, the name of the Fstream file name).
Definition ISstream.H:147
IntType max() const noexcept
The (inclusive) upper value of the range, same as rbegin_value(). Ill-defined for an empty range.
Definition IntRange.H:235
IntType start() const noexcept
The (inclusive) lower value of the range.
Definition IntRange.H:218
IntType size() const noexcept
The size of the range.
Definition IntRange.H:208
bool contains(IntType value) const noexcept
True if the (global) value is within the range.
Definition IntRangeI.H:143
IntType min() const noexcept
The (inclusive) lower value of the range, same as start(), begin_value().
Definition IntRange.H:229
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition Istream.H:60
void putBack(const token &tok)
Put back a token (copy). Only a single put back is permitted.
Definition Istream.C:71
virtual Istream & read(token &)=0
Return next token from stream.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition List.H:72
void resize_nocopy(const label len)
Adjust allocated size of list without necessarily.
Definition ListI.H:171
void resize(const label len)
Adjust allocated size of list.
Definition ListI.H:153
void clear()
Clear the list, i.e. set size to zero.
Definition ListI.H:133
Istream & readList(Istream &is)
Read List from Istream, discarding contents of existing List.
Definition ListIO.C:167
virtual Ostream & writeQuoted(const char *str, std::streamsize len, const bool quoted=true) override
Write character/string content, with/without surrounding quotes.
Definition OBJstream.C:78
virtual Ostream & write(const char c) override
Write character.
Definition OBJstream.C:69
An OSstream with internal List storage.
DynamicList< char > release()
Reset buffer and return contents.
void reserve(std::streamsize n)
Reserve output space for at least this amount.
Output to file stream as an OSstream, normally using std::ofstream for the actual output.
Definition OFstream.H:75
Generic output stream using a standard (STL) stream.
Definition OSstream.H:53
virtual const fileName & name() const override
Get the name of the output serial stream. (eg, the name of the Fstream file name).
Definition OSstream.H:134
virtual Ostream & writeRaw(const char *data, std::streamsize count) override
Low-level raw binary output.
Definition OSstream.C:280
Output to string buffer, using a OSstream. Always UNCOMPRESSED.
Foam::string str() const
Get the string. As Foam::string instead of std::string (may change in future).
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition Ostream.H:59
static void broadcasts(const int communicator, Type &value, Args &&... values)
Broadcast multiple items to all communicator ranks. Does nothing in non-parallel.
A non-owning sub-view of a List (allocated or unallocated storage).
Definition SubList.H:61
static std::streamsize read(const UPstream::commsTypes commsType, const int fromProcNo, Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr)
Receive buffer contents (contiguous types) from given processor.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition UList.H:89
iterator begin() noexcept
Return an iterator to begin traversing the UList.
Definition UListI.H:410
char * data_bytes() noexcept
Return pointer to the underlying array serving as data storage,.
Definition UListI.H:288
bool empty() const noexcept
True if List is empty (ie, size() is zero).
Definition UList.H:701
const T * cdata() const noexcept
Return pointer to the underlying array serving as data storage.
Definition UListI.H:267
const_iterator cbegin() const noexcept
Return const_iterator to begin traversing the constant UList.
Definition UListI.H:424
T * data() noexcept
Return pointer to the underlying array serving as data storage.
Definition UListI.H:274
const char * cdata_bytes() const noexcept
Return pointer to the underlying array serving as data storage,.
Definition UListI.H:281
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
std::streamsize size_bytes() const noexcept
Number of contiguous bytes for the List data.
Definition UListI.H:295
static bool write(const UPstream::commsTypes commsType, const int toProcNo, const Type *buffer, std::streamsize count, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, UPstream::Request *req=nullptr, const UPstream::sendModes sendMode=UPstream::sendModes::normal)
Write buffer contents (contiguous types only) to given processor.
static std::pair< int, int64_t > probeMessage(const UPstream::commsTypes commsType, const int fromProcNo, const int tag=UPstream::msgType(), const int communicator=worldComm)
Probe for an incoming message.
Definition UPstream.C:132
static void mpiGatherv(const Type *sendData, int sendCount, Type *recvData, const UList< int > &recvCounts, const UList< int > &recvOffsets, const int communicator=UPstream::worldComm)
Receive variable length data from all ranks.
static int myProcNo(const label communicator=worldComm)
Rank of this process in the communicator (starting from masterNo()). Negative if the process is not a...
Definition UPstream.H:1706
static label nRequests() noexcept
Number of outstanding requests (on the internal list of requests).
commsTypes
Communications types.
Definition UPstream.H:81
@ scheduled
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
Definition UPstream.H:83
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition UPstream.H:92
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition UPstream.H:1691
static List< T > listGatherValues(const T &localValue, const int communicator=UPstream::worldComm)
Gather individual values into list locations.
static bool master(const label communicator=worldComm)
True if process corresponds to the master rank in the communicator.
Definition UPstream.H:1714
static bool is_subrank(const label communicator=worldComm)
True if process corresponds to a sub-rank in the given communicator.
Definition UPstream.H:1731
static label nProcs(const label communicator=worldComm)
Number of ranks in parallel run (for given communicator). It is 1 for serial run.
Definition UPstream.H:1697
static rangeType subProcs(const label communicator=worldComm)
Range of process indices for sub-processes.
Definition UPstream.H:1866
static void waitRequests()
Wait for all requests to finish.
Definition UPstream.H:2497
@ broadcast
broadcast [MPI]
Definition UPstream.H:189
static void waitRequest(const label i)
Wait until request i has finished. Corresponds to MPI_Wait().
Pointer management similar to std::unique_ptr, with some additional methods and type checking.
Definition autoPtr.H:65
void reset(T *p=nullptr) noexcept
Delete managed object and set to new given pointer.
Definition autoPtrI.H:37
The decomposedBlockData comprise a List<char> for each output processor, typically with IO on the mas...
virtual bool writeObject(IOstreamOption streamOpt, const bool writeOnProc) const
Write using stream options.
static bool readBlockEntry(Istream &is, List< char > &charData)
Helper: read block of (binary) character data.
static bool hasBlock(Istream &is, const label blockNumber)
True if the given block number (starts at 0) has a corresponding decomposedBlockData block entry....
static void gatherProcData(const label comm, const UList< char > &localData, const labelUList &recvSizes, const labelRange &whichProcs, List< int > &recvOffsets, DynamicList< char > &recvData, const UPstream::commsTypes commsType)
Helper: gather data from (subset of) sub-ranks.
virtual bool writeData(Ostream &os) const
Write separated content (assumes content is the serialised data).
static autoPtr< ISstream > readBlock(const label blocki, ISstream &is, IOobject &headerIO)
Read selected block (non-seeking) + header information.
const label comm_
Communicator for all parallel comms.
static void writeHeader(Ostream &os, IOstreamOption streamOptContainer, const word &objectType, const string &note, const fileName &location, const word &objectName, const dictionary &extraEntries)
Helper: write FoamFile IOobject header.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &blockOffset, const UList< char > &localData, const labelUList &recvSizes, const UList< std::string_view > &procData, const UPstream::commsTypes commsType, const bool syncReturnState=true)
Write *this. Ostream only valid on master.
static std::streamoff writeBlockEntry(OSstream &os, const label blocki, const char *str, const size_t len)
Helper: write block of (binary) character data.
static bool isCollatedType(const word &objectType)
True if object type is a known collated type.
decomposedBlockData(const label comm, const IOobject &io, const UPstream::commsTypes=UPstream::commsTypes::scheduled)
Construct given an IOobject.
List< char > contentData_
The block content.
static bool readBlocks(const label comm, autoPtr< ISstream > &isPtr, List< char > &localData, const UPstream::commsTypes commsType)
Read data (on master) and transmit.
static bool skipBlockEntry(Istream &is)
Helper: skip a block of (binary) character data.
static label getNumBlocks(Istream &is, const label maxNumBlocks=-1)
Extract number of decomposedBlockData block entries, optionally with an upper limit....
virtual bool read()
Read object.
const UPstream::commsTypes commsType_
Type to use for gather.
A list of keyword definitions, which are a keyword followed by a number of values (eg,...
Definition dictionary.H:133
bool readIfPresent(const word &keyword, T &val, enum keyType::option matchOpt=keyType::REGEX) const
Find an entry if present, and assign to T val. FatalIOError if it is found and the number of tokens i...
A class for handling file names.
Definition fileName.H:75
fileOperations that performs all file operations on the master processor. Requires the calls to be pa...
A range or interval of labels defined by a start and a size.
Definition labelRange.H:66
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition regIOobject.H:71
regIOobject(const IOobject &io, const bool isTimeObject=false)
Construct from IOobject. The optional flag adds special handling if the object is the top-level regIO...
Definition regIOobject.C:43
virtual fileName filePath() const
Return complete path + object name if the file exists.
virtual void updateMetaData()
Update internal meta-data (eg, prior to writing).
bool headerOk()
Read and check header info. Does not check the headerClassName.
A token holds an item read from Istream.
Definition token.H:70
@ END_STATEMENT
End entry [isseparator].
Definition token.H:173
bool isLabel() const noexcept
Integral token is convertible to Foam::label.
Definition tokenI.H:843
bool good() const noexcept
True if token is not UNDEFINED or ERROR.
Definition tokenI.H:584
label labelToken() const
Return integer type as label value or Error.
Definition tokenI.H:869
bool isCompound() const noexcept
Token is COMPOUND.
Definition tokenI.H:1096
bool isWord() const noexcept
Token is word-variant (WORD, DIRECTIVE).
Definition tokenI.H:1004
A class for handling words, derived from Foam::string.
Definition word.H:66
static const word null
An empty word.
Definition word.H:84
A class representing the concept of 0 (zero) that can be used to avoid manipulating objects known to ...
Definition zero.H:58
#define defineTypeNameAndDebug(Type, DebugSwitch)
Define the typeName and debug information.
Definition className.H:142
bool local
Definition EEqn.H:20
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition error.H:629
OBJstream os(runTime.globalPath()/outputName)
const auto & io
auto & name
#define WarningInFunction
Report a warning using Foam::Warning.
#define FUNCTION_NAME
Namespace for handling debugging switches.
Definition debug.C:45
Namespace for OpenFOAM.
label max(const labelHashSet &set, label maxValue=labelMin)
Find the max value in labelHashSet, optionally limited by second argument.
Definition hashSets.C:40
List< label > labelList
A List of labels.
Definition List.H:62
refPtr< fileOperation > fileHandler(std::nullptr_t)
Delete current file handler - forwards to fileOperation::handler().
messageStream Info
Information stream (stdout output on master, null elsewhere).
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
const Type * isA(const U &obj)
Attempt dynamic_cast to Type.
Definition typeInfo.H:87
IOerror FatalIOError
Error stream (stdout output on all processes), with additional 'FOAM FATAL IO ERROR' header text and ...
word name(const expressions::valueTypeCode typeCode)
A word representation of a valueTypeCode. Empty for expressions::valueTypeCode::INVALID.
Definition exprTraits.C:127
prefixOSstream Pout
OSstream wrapped stdout (std::cout) with parallel prefix.
bool notNull(const T *ptr) noexcept
True if ptr is not a pointer (of type T) to the nullObject.
Definition nullObject.H:267
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition errorManip.H:125
UList< label > labelUList
A UList of labels.
Definition UList.H:75
constexpr char nl
The newline '\n' character (0x0a).
Definition Ostream.H:50