Loading...
Searching...
No Matches
UIPstreamBase.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2015,2024 OpenFOAM Foundation
9 Copyright (C) 2017-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "error.H"
30#include "UIPstream.H"
31#include "int.H"
32#include "token.H"
33#include <cctype>
35// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
36
37namespace Foam
38{
39
40// Convert a single character to a word with length 1
41inline static Foam::word charToWord(char c)
42{
43 return Foam::word(std::string(1, c), false);
44}
45
46
47// Adjust stream format based on the flagMask
48inline static void processFlags(Istream& is, int flagMask)
49{
50 if ((flagMask & token::ASCII))
51 {
52 is.format(IOstreamOption::ASCII);
53 }
54 else if ((flagMask & token::BINARY))
55 {
56 is.format(IOstreamOption::BINARY);
57 }
58}
59
60
61// Return the position with word boundary alignment
62inline static label byteAlign(const label pos, const size_t align)
63{
64 return
65 (
66 (align > 1)
67 ? (align + ((pos - 1) & ~(align - 1)))
68 : pos
69 );
70}
71
72
73// Read into compound token (assumed to be a known type)
74inline static bool readCompoundToken
75(
76 token& tok,
77 const word& compoundType,
78 Istream& is
79)
80{
81 // The isCompound() check is not needed (already checked by caller)
82 return tok.readCompoundToken(compoundType, is);
83}
84
85} // End namespace Foam
86
87
88// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
89
90inline void Foam::UIPstreamBase::checkEof()
91{
93 {
94 setEof();
95 }
96}
97
98
99inline void Foam::UIPstreamBase::prepareBuffer(const size_t align)
100{
101 recvBufPos_ = byteAlign(recvBufPos_, align);
102}
103
104
105template<class T>
106inline void Foam::UIPstreamBase::readFromBuffer(T& val)
107{
108 prepareBuffer(sizeof(T));
109
110 val = reinterpret_cast<T&>(recvBuf_[recvBufPos_]);
111 recvBufPos_ += sizeof(T);
112 checkEof();
113}
114
115
116inline void Foam::UIPstreamBase::readFromBuffer
117(
118 void* data,
119 const size_t count
120)
121{
122 if (data)
123 {
124 const char* const __restrict__ buf = &recvBuf_[recvBufPos_];
125 char* const __restrict__ output = reinterpret_cast<char*>(data);
126
127 for (size_t i = 0; i < count; ++i)
128 {
129 output[i] = buf[i];
130 }
131 }
132
133 recvBufPos_ += count;
134 checkEof();
135}
136
137
138inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
139{
140 // Use std::string::assign() to copy content, including embedded nul chars.
141 // Stripping (when desired) is the responsibility of the sending side.
142
143 size_t len;
144 readFromBuffer(len);
145
146 if (len)
147 {
148 str.assign(&recvBuf_[recvBufPos_], len);
149 recvBufPos_ += len;
150 checkEof();
151 }
152 else
153 {
154 str.clear();
155 }
157 return *this;
158}
159
160
161// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
162
164(
165 const UPstream::commsTypes commsType,
166 const int fromProcNo,
167 DynamicList<char>& receiveBuf,
168 label& receiveBufPosition,
169 const int tag,
170 const int communicator,
171 const bool clearAtEnd,
173)
174:
175 UPstream(commsType),
176 Istream(fmt),
177 fromProcNo_(fromProcNo),
178 tag_(tag),
179 comm_(communicator),
180 messageSize_(0),
181 storedRecvBufPos_(0),
182 clearAtEnd_(clearAtEnd),
183 recvBuf_(receiveBuf),
184 recvBufPos_(receiveBufPosition)
185{
186 setOpened();
187 setGood();
188}
189
190
192(
193 const int fromProcNo,
194 PstreamBuffers& buffers
195)
196:
197 UPstream(buffers.commsType()),
198 Istream(buffers.format()),
199 fromProcNo_(fromProcNo),
200 tag_(buffers.tag()),
201 comm_(buffers.comm()),
202 messageSize_(0),
203 storedRecvBufPos_(0),
204 clearAtEnd_(buffers.allowClearRecv()),
205 recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
206 recvBufPos_(buffers.accessRecvPosition(fromProcNo))
207{
208 if
209 (
211 && !buffers.finished()
212 )
213 {
214 FatalErrorInFunction
215 << "PstreamBuffers::finishedSends() never called." << endl
216 << "Please call PstreamBuffers::finishedSends() after doing"
217 << " all your sends (using UOPstream) and before doing any"
218 << " receives (using UIPstream)" << Foam::exit(FatalError);
221 setOpened();
222 setGood();
223}
224
227(
228 const DynamicList<char>& receiveBuf,
230)
231:
232 UPstream(UPstream::commsTypes::nonBlocking), // placeholder
234 fromProcNo_(UPstream::masterNo()), // placeholder
235 tag_(UPstream::msgType()), // placeholder
236 comm_(UPstream::commSelf()), // placeholder
237 messageSize_(receiveBuf.size()), // Message == buffer
239 clearAtEnd_(false), // Do not clear recvBuf if at end!!
241 (
242 // The receive buffer is never modified with this code path
243 const_cast<DynamicList<char>&>(receiveBuf)
244 ),
245 recvBufPos_(storedRecvBufPos_) // Internal reference
246{
249}
250
251
252// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
255{
256 if (clearAtEnd_ && eof())
257 {
258 if (debug)
259 {
260 Perr<< "UIPstreamBase Destructor : tag:" << tag_
261 << " fromProcNo:" << fromProcNo_
262 << " clearing receive buffer of size "
263 << recvBuf_.size()
264 << " messageSize_:" << messageSize_ << endl;
266 recvBuf_.clearStorage();
267 }
268}
269
270
271// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
272
274{
275 // Return the put back token if it exists
276 // - with additional handling for special stream flags
277 if (Istream::getBack(t))
278 {
279 if (t.isFlag())
280 {
281 processFlags(*this, t.flagToken());
282 }
283 else
284 {
285 return *this;
286 }
287 }
288
289
290 // Reset token, adjust its line number according to the stream
291 t.reset();
292 t.lineNumber(this->lineNumber());
293
294
295 // Read character, return on error
296 // - with additional handling for special stream flags
297
298 char c;
299 do
300 {
301 if (!read(c))
302 {
303 t.setBad(); // Error
304 return *this;
305 }
306
307 if (c == token::FLAG)
308 {
309 if (char flagVal; read(flagVal))
310 {
311 processFlags(*this, flagVal);
312 }
313 else
314 {
315 t.setBad(); // Error
316 return *this;
317 }
318 }
319 }
320 while (c == token::FLAG);
321
322
323 // Analyse input starting with this character.
324 switch (c)
325 {
326 // Punctuation
328 case token::BEGIN_LIST :
329 case token::END_LIST :
330 case token::BEGIN_SQR :
331 case token::END_SQR :
332 case token::BEGIN_BLOCK :
333 case token::END_BLOCK :
334 case token::COLON :
335 case token::COMMA :
336 case token::ASSIGN :
337 case token::PLUS :
338 case token::MINUS :
339 case token::MULTIPLY :
340 case token::DIVIDE :
341 {
343 return *this;
344 }
345
346 // The word-variants
349 {
350 if (word val; readString(val))
351 {
352 if
353 (
355 || !readCompoundToken(t, val, *this)
356 )
357 {
358 t = std::move(val);
359 t.setType(token::tokenType(c));
360 }
361 }
362 else
363 {
364 t.setBad();
365 }
366 return *this;
367 }
368
369 // The string-variants
375 {
376 if (string val; readString(val))
377 {
378 t = std::move(val);
379 t.setType(token::tokenType(c));
380 }
381 else
382 {
383 t.setBad();
384 }
385 return *this;
386 }
387
388 // (signed) int32
390 {
391 if (int32_t val; read(val))
392 {
393 t.int32Token(val);
394 }
395 else
396 {
397 t.setBad();
398 }
399 return *this;
400 }
401
402 // (signed) int64
404 {
405 if (int64_t val; read(val))
406 {
407 t.int64Token(val);
408 }
409 else
410 {
411 t.setBad();
412 }
413 return *this;
414 }
415
416 // (unsigned) int32
418 {
419 if (uint32_t val; read(val))
420 {
421 t.uint32Token(val);
422 }
423 else
424 {
425 t.setBad();
426 }
427 return *this;
428 }
429
430 // (unsigned) int64
432 {
433 if (uint64_t val; read(val))
434 {
435 t.uint64Token(val);
436 }
437 else
438 {
439 t.setBad();
440 }
441 return *this;
442 }
443
444 // Float
446 {
447 if (float val; read(val))
448 {
449 t = val;
450 }
451 else
452 {
453 t.setBad();
454 }
455 return *this;
456 }
457
458 // Double
460 {
461 if (double val; read(val))
462 {
463 t = val;
464 }
465 else
466 {
467 t.setBad();
468 }
469 return *this;
470 }
471
472 // Character (returned as a single character word) or error
473 default:
474 {
475 if (isalpha(c))
476 {
477 t = charToWord(c);
478 return *this;
479 }
480
481 setBad();
482 t.setBad();
484 return *this;
485 }
486 }
487}
488
489
490Foam::Istream& Foam::UIPstreamBase::read(char& c)
491{
493 ++recvBufPos_;
494 checkEof();
495 return *this;
496}
497
500{
501 return readString(str);
502}
503
506{
507 return readString(str);
508}
509
510
512{
513 readFromBuffer(val);
514 return *this;
515}
516
517
519{
520 readFromBuffer(val);
521 return *this;
522}
523
524
526{
527 readFromBuffer(val);
528 return *this;
529}
530
531
533{
534 readFromBuffer(val);
535 return *this;
536}
537
538
540{
541 readFromBuffer(val);
542 return *this;
543}
544
545
547{
548 readFromBuffer(val);
549 return *this;
550}
551
552
553Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count)
554{
555 if (count)
556 {
557 // For count == 0, a no-op
558 // - see UOPstream::write(const char*, streamsize)
559 beginRawRead();
560 readRaw(data, count);
562 }
563
564 return *this;
565}
566
567
568Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count)
569{
570 // No check for IOstreamOption::BINARY since this is either done in the
571 // beginRawRead() method, or the caller knows what they are doing.
573 // Any alignment must have been done prior to this call
574 readFromBuffer(data, count);
575 return *this;
576}
577
578
580{
582 {
584 << "stream format not binary"
586 }
587
588 // Align on word boundary (64-bit)
589 // - as per read(const char*, streamsize)
590 // The check for zero-size will have been done by the caller
591 prepareBuffer(8);
592
593 return true;
594}
595
596
597// Not needed yet
600/// label pos() const;
601///
602/// Foam::label Foam::UIPstreamBase::pos() const
603/// {
604/// return recvBufPos_;
605/// }
606
608{
609 if (messageSize_ && (recvBufPos_ < recvBuf_.size()))
610 {
611 return (recvBuf_.size() - recvBufPos_);
612 }
613 else
614 {
615 return 0;
616 }
617}
618
619
621{
622 Istream::rewind(); // Drop any putback
623
624 recvBufPos_ = 0; // Assume the entire buffer is for us to read from
625 setOpened();
626 setGood();
627 if (recvBuf_.empty() || !messageSize_)
628 {
629 setEof();
630 }
631}
632
633
634void Foam::UIPstreamBase::print(Ostream& os) const
635{
636 os << "Reading from processor " << fromProcNo_
637 << " using communicator " << comm_
638 << " and tag " << tag_ << Foam::endl;
639}
640
641
642// ************************************************************************* //
A 1D vector of objects of type <T> that resizes itself as necessary to accept the new objects.
Definition DynamicList.H:68
streamFormat format() const noexcept
Get the current stream format.
streamFormat
Data format (ascii | binary | coherent).
@ ASCII
"ascii" (normal default)
void setBad() noexcept
Set stream state to be 'bad'.
Definition IOstream.H:488
label lineNumber() const noexcept
Const access to the current stream line number.
Definition IOstream.H:409
void setEof() noexcept
Set stream state as reached 'eof'.
Definition IOstream.H:472
bool eof() const noexcept
True if end of input seen.
Definition IOstream.H:289
void setGood() noexcept
Set stream state to be good.
Definition IOstream.H:174
void setOpened() noexcept
Set stream opened.
Definition IOstream.H:150
An Istream is an abstract base class for all input systems (streams, files, token lists etc)....
Definition Istream.H:60
bool getBack(token &tok)
Retrieve the put-back token if there is one.
Definition Istream.C:115
virtual void rewind()=0
Rewind the stream so that it may be read again.
Definition Istream.C:214
Istream(const Istream &)=default
Copy construct.
An Ostream is an abstract base class for all output systems (streams, files, token lists,...
Definition Ostream.H:59
Buffers for inter-processor communications streams (UOPstream, UIPstream).
bool finished() const noexcept
True if finishedSends() or finishedNeighbourSends() has been called.
label storedRecvBufPos_
Receive position in buffer data, if ony If there is no external location for recvBufPos_.
Definition UIPstream.H:120
virtual void rewind() override
Rewind the receive stream position so that it may be read again.
virtual Istream & read(token &) override
Return next token from stream.
const int fromProcNo_
Source rank for the data.
Definition UIPstream.H:99
virtual bool endRawRead() override
End of low-level raw binary read.
Definition UIPstream.H:282
void print(Ostream &os) const override
Print stream description to Ostream.
label remaining() const noexcept
The number of characters remaining in the get buffer.
const int tag_
Message tag for communication.
Definition UIPstream.H:104
const int comm_
The communicator index.
Definition UIPstream.H:109
virtual bool beginRawRead() override
Start of low-level raw binary read.
label messageSize_
The message size, read on bufferIPCrecv or set directly.
Definition UIPstream.H:114
UIPstreamBase(const UPstream::commsTypes commsType, const int fromProcNo, DynamicList< char > &receiveBuf, label &receiveBufPosition, const int tag=UPstream::msgType(), const int communicator=UPstream::worldComm, const bool clearAtEnd=false, IOstreamOption::streamFormat fmt=IOstreamOption::BINARY)
Construct given process index to read from using the given attached receive buffer,...
const bool clearAtEnd_
Clear the receive buffer on termination (in the destructor).
Definition UIPstream.H:125
virtual ~UIPstreamBase()
Destructor. Optionally clears external receive buffer.
DynamicList< char > & recvBuf_
Reference to the receive buffer data.
Definition UIPstream.H:130
virtual Istream & readRaw(char *data, std::streamsize count) override
Low-level raw binary read. Reading into a null pointer behaves like a forward seek of count character...
label & recvBufPos_
Reference to the receive position in buffer data.
Definition UIPstream.H:135
void size(const label n)
Older name for setAddressableSize.
Definition UList.H:118
Wrapper for internally indexed communicator label. Always invokes UPstream::allocateCommunicatorCompo...
Definition UPstream.H:2546
Inter-processor communications stream.
Definition UPstream.H:69
static constexpr int commSelf() noexcept
Communicator within the current rank only.
Definition UPstream.H:1088
commsTypes
Communications types.
Definition UPstream.H:81
@ scheduled
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
Definition UPstream.H:83
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
static int & msgType() noexcept
Message tag of standard messages.
Definition UPstream.H:1926
static constexpr int masterNo() noexcept
Relative rank for the master process - is always 0.
Definition UPstream.H:1691
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition UPstream.H:1958
UPstream(const commsTypes commsType) noexcept
Construct for given communication type.
Definition UPstream.H:1184
static bool isCompound(const word &compoundType)
True if a known (registered) compound type.
Definition token.C:104
A token holds an item read from Istream.
Definition token.H:70
tokenType
Enumeration defining the types of token.
Definition token.H:81
@ DOUBLE
double (double-precision) type
Definition token.H:94
@ FLAG
stream flag (1-byte bitmask)
Definition token.H:86
@ WORD
Foam::word.
Definition token.H:97
@ UNSIGNED_INTEGER_32
uint32 type
Definition token.H:91
@ EXPRESSION
Definition token.H:104
@ CHAR_DATA
String-variant: plain character content.
Definition token.H:110
@ INTEGER_64
int64 type
Definition token.H:90
@ FLOAT
float (single-precision) type
Definition token.H:93
@ DIRECTIVE
Definition token.H:101
@ UNSIGNED_INTEGER_64
uint64 type
Definition token.H:92
@ INTEGER_32
int32 type
Definition token.H:89
@ STRING
Foam::string (usually double-quoted).
Definition token.H:98
label lineNumber() const noexcept
The line number for the token.
Definition tokenI.H:570
punctuationToken
Standard punctuation tokens (a character).
Definition token.H:140
@ DIVIDE
Divide [isseparator].
Definition token.H:160
@ BEGIN_BLOCK
Begin block [isseparator].
Definition token.H:178
@ BEGIN_SQR
Begin dimensions [isseparator].
Definition token.H:176
@ COLON
Colon [isseparator].
Definition token.H:146
@ END_BLOCK
End block [isseparator].
Definition token.H:179
@ ASSIGN
Assignment/equals [isseparator].
Definition token.H:156
@ END_STATEMENT
End entry [isseparator].
Definition token.H:173
@ BEGIN_LIST
Begin list [isseparator].
Definition token.H:174
@ PLUS
Addition [isseparator].
Definition token.H:157
@ END_LIST
End list [isseparator].
Definition token.H:175
@ END_SQR
End dimensions [isseparator].
Definition token.H:177
@ MULTIPLY
Multiply [isseparator].
Definition token.H:159
@ MINUS
Subtract or start of negative number.
Definition token.H:158
@ COMMA
Comma [isseparator].
Definition token.H:148
void setBad()
Clear token and set to be ERROR.
Definition tokenI.H:456
bool readCompoundToken(const word &compoundType, Istream &is, const bool readContent=true)
Default construct the specified compound type and read from stream.
Definition token.C:126
@ BINARY
BINARY-mode stream.
Definition token.H:132
@ ASCII
ASCII-mode stream.
Definition token.H:131
uint32_t uint32Token() const
Return int32 value, convert from other integer type or Error.
Definition tokenI.H:798
bool isFlag() const noexcept
Token is FLAG.
Definition tokenI.H:632
bool setType(const tokenType tokType) noexcept
Change the token type, for similar types.
Definition tokenI.H:488
int flagToken() const
Return flag bitmask value.
Definition tokenI.H:638
int64_t int64Token() const
Return int64 value, convert from other integer type or Error.
Definition tokenI.H:767
void reset()
Reset token to UNDEFINED and clear any allocated storage.
Definition tokenI.H:412
int32_t int32Token() const
Return int32 value, convert from other integer type or Error.
Definition tokenI.H:739
uint64_t uint64Token() const
Return int64 value, convert from other integer type or Error.
Definition tokenI.H:829
A class for handling words, derived from Foam::string.
Definition word.H:66
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
OBJstream os(runTime.globalPath()/outputName)
System signed integer.
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
Definition BitOps.H:73
const dimensionedScalar c
Speed of light in a vacuum.
Namespace for handling debugging switches.
Definition debug.C:45
Namespace for OpenFOAM.
dimensionedScalar pos(const dimensionedScalar &ds)
static Foam::word charToWord(char c)
bool read(const char *buf, int32_t &val)
Same as readInt32.
Definition int32.H:127
static void processFlags(Istream &is, int flagMask)
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
static label byteAlign(const label pos, const size_t align)
errorManip< error > abort(error &err)
Definition errorManip.H:139
const direction noexcept
Definition scalarImpl.H:265
static bool readCompoundToken(token &tok, const word &compoundType, Istream &is)
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
word format(conversionProperties.get< word >("format"))