Loading...
Searching...
No Matches
UIPstreamRead.C
Go to the documentation of this file.
1/*---------------------------------------------------------------------------*\
2 ========= |
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4 \\ / O peration |
5 \\ / A nd | www.openfoam.com
6 \\/ M anipulation |
7-------------------------------------------------------------------------------
8 Copyright (C) 2011-2017 OpenFOAM Foundation
9 Copyright (C) 2019-2025 OpenCFD Ltd.
10-------------------------------------------------------------------------------
11License
12 This file is part of OpenFOAM.
13
14 OpenFOAM is free software: you can redistribute it and/or modify it
15 under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
20 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
26
27\*---------------------------------------------------------------------------*/
28
29#include "UIPstream.H"
30#include "PstreamGlobals.H"
31#include "profilingPstream.H"
32#include "IOstreams.H"
33
34// FUTURE? probe and receive message
35// - as of 2023-06 appears to be broken with INTELMPI + PMI-2 (slurm)
36// and perhaps other places so currently avoid
37
38// * * * * * * * * * * Protected Static Member Functions * * * * * * * * * * //
39
40// General blocking/non-blocking MPI receive
41std::streamsize Foam::UPstream::mpi_receive
42(
43 const UPstream::commsTypes commsType,
44 void* buf, // Type checking done by caller
45 std::streamsize count,
46 const UPstream::dataTypes dataTypeId, // Proper type passed by caller
47 const int fromProcNo,
48 const int tag,
49 const int communicator,
50 UPstream::Request* req
51)
52{
53 MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
54
56
57 // Could check if nonBlocking and request are consistently specified...
58
59
60 // TODO: some corrective action, at least when not nonBlocking
61 #if 0
62 // No warnings here, just on the sender side.
63 if (count > std::streamsize(INT_MAX))
64 {
65 Perr<< "[mpi_recv] from rank " << fromProcNo
66 << " exceeds INT_MAX values of "
68 << Foam::endl;
69
71 }
72 #endif
73
75 {
76 Perr<< "[mpi_recv] : starting recv from:" << fromProcNo
77 << " type:" << int(dataTypeId)
78 << " count:" << label(count)
79 << " tag:" << tag << " comm:" << communicator
80 << " commsType:" << UPstream::commsTypeNames[commsType]
81 << " warnComm:" << UPstream::warnComm
82 << Foam::endl;
84 }
85 else if (FOAM_UNLIKELY(UPstream::debug))
86 {
87 Perr<< "[mpi_recv] : starting recv from:" << fromProcNo
88 << " type:" << int(dataTypeId)
89 << " count:" << label(count)
90 << " tag:" << tag << " comm:" << communicator
91 << " commsType:" << UPstream::commsTypeNames[commsType]
92 << Foam::endl;
93 }
94
95 int returnCode = MPI_ERR_UNKNOWN;
96
98
99 if
100 (
103 )
104 {
105 // Not UPstream::commsTypes::nonBlocking
106
107 MPI_Status status;
108
109 {
110 returnCode = MPI_Recv
111 (
112 buf,
113 count,
114 datatype,
115 fromProcNo,
116 tag,
118 &status
119 );
120 }
121
123
124 if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
125 {
127 << "[mpi_recv] : cannot receive message from:"
128 << fromProcNo
129 << " type:" << int(dataTypeId)
130 << " count:" << label(count) << " tag:" << tag
132 return 0;
133 }
134 else if (FOAM_UNLIKELY(UPstream::debug))
135 {
136 Perr<< "[mpi_recv] : finished recv from:"
137 << fromProcNo
138 << " type:" << int(dataTypeId)
139 << " count:" << label(count) << " tag:" << tag
140 << Foam::endl;
141 }
142
143 // Check size of message read (number of basic elements)
144 MPI_Count num_recv(0);
145 MPI_Get_elements_x(&status, datatype, &num_recv);
146
147 // Errors
148 if (FOAM_UNLIKELY(num_recv == MPI_UNDEFINED || int64_t(num_recv) < 0))
149 {
151 << "[mpi_recv] : receive from:" << fromProcNo
152 << " type:" << int(dataTypeId)
153 << " received count is undefined or negative value"
155 }
156 else
157 {
158 // From number of basic elements to number of 'datatype'
159 num_recv /= PstreamGlobals::dataTypesCount_[int(dataTypeId)];
160 }
161
162 if (FOAM_UNLIKELY(int64_t(num_recv) > int64_t(UList<char>::max_size())))
163 {
165 << "[mpi_recv] : receive from:" << fromProcNo
166 << " type:" << int(dataTypeId)
167 << " received count is larger than UList<T>::max_size()"
169 }
170 else if (FOAM_UNLIKELY(count < std::streamsize(num_recv)))
171 {
173 << "[mpi_recv] : receive from:" << fromProcNo
174 << " type:" << int(dataTypeId)
175 << " count:" << label(count)
176 << " buffer is too small for incoming message ("
177 << label(num_recv) << ')'
179 }
180
181 return std::streamsize(num_recv);
182 }
184 {
185 MPI_Request request;
186
187 {
188 returnCode = MPI_Irecv
189 (
190 buf,
191 count,
192 datatype,
193 fromProcNo,
194 tag,
196 &request
197 );
198 }
199
200 if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
201 {
203 << "[mpi_recv] : cannot start non-blocking receive from:"
204 << fromProcNo
205 << " type:" << int(dataTypeId)
206 << " count:" << label(count)
208
209 return 0;
210 }
211
212 PstreamGlobals::push_request(request, req);
214
215
216 if (FOAM_UNLIKELY(UPstream::debug))
217 {
218 Perr<< "[mpi_recv] : started non-blocking recv from:"
219 << fromProcNo
220 << " type:" << int(dataTypeId)
221 << " count:" << label(count) << " tag:" << tag
222 << " request:" <<
223 (req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
224 << Foam::endl;
225 }
226
227 // Assume the message will be completely received.
228 return count;
229 }
230
232 << "Unsupported communications type " << int(commsType)
234
235 return 0;
236}
237
238
239// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
240
241void Foam::UIPstream::bufferIPCrecv()
242{
243 // Called by constructor
244 if (FOAM_UNLIKELY(UPstream::debug))
245 {
246 Perr<< "UIPstream IPC read buffer :"
247 << " from:" << fromProcNo_
248 << " tag:" << tag_ << " comm:" << comm_
249 << " wanted size:" << recvBuf_.capacity()
250 << Foam::endl;
251 }
252
253 // Fallback value
254 messageSize_ = recvBuf_.capacity();
255
256 if (commsType() == UPstream::commsTypes::nonBlocking)
257 {
258 // Non-blocking
259 // ~~~~~~~~~~~~
260 // No chance of probing for size nor relying on the returned message
261 // size (since it returns immediately without any further checks)
262 //
263 // Fortunately there are not many (any?) places that are using
264 // a non-blocking IPstream with streaming anyhow.
265
266 messageSize_ = recvBuf_.size();
267 }
268 else if (!recvBuf_.capacity())
269 {
270 // No buffer size allocated/specified - probe size of incoming message
272
273 MPI_Status status;
274
275 MPI_Probe
276 (
277 fromProcNo_,
278 tag_,
280 &status
281 );
282
284
285 // Buffer of characters (bytes)
286 MPI_Count num_recv(0);
287 MPI_Get_elements_x(&status, MPI_BYTE, &num_recv);
288
289 // Errors
290 if (FOAM_UNLIKELY(num_recv == MPI_UNDEFINED || int64_t(num_recv) < 0))
291 {
293 << "UIPstream IPC read buffer from:" << fromProcNo_
294 << " received count is undefined or negative value"
296 }
297
298 // Count is already in basic elements, no need to scale the result
299
300 if (FOAM_UNLIKELY(int64_t(num_recv) > int64_t(UList<char>::max_size())))
301 {
303 << "UIPstream IPC read buffer from:" << fromProcNo_
304 << " received count is larger than UList<T>::max_size()"
306 }
307
308 if (FOAM_UNLIKELY(UPstream::debug))
309 {
310 Perr<< "UIPstream::bufferIPCrecv : probed size:"
311 << label(num_recv) << Foam::endl;
312 }
313
314 recvBuf_.resize(label(num_recv));
315 messageSize_ = label(num_recv);
316 }
317
318 std::streamsize count = UPstream::mpi_receive
319 (
320 commsType(),
321 recvBuf_.data(), // buffer
322 messageSize_, // expected size
324 fromProcNo_,
325 tag_,
326 comm_,
327 nullptr // UPstream::Request
328 );
329
330 // Errors
331 if (FOAM_UNLIKELY(count < 0))
332 {
334 << "UIPstream IPC read buffer from:" << fromProcNo_
335 << " with negative size?"
337 }
338 else if (FOAM_UNLIKELY(int64_t(count) > int64_t(UList<char>::max_size())))
339 {
341 << "UIPstream IPC read buffer from:" << fromProcNo_
342 << " received size is larger than UList<T>::max_size()"
344 }
345
346 // Set addressed size. Leave actual allocated memory intact.
347 recvBuf_.resize(label(count));
348 messageSize_ = label(count);
349
350 if (recvBuf_.empty())
351 {
352 setEof();
353 }
354}
355
356
357// ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
static constexpr label max_size() noexcept
The size of the largest possible UList.
Definition UList.H:716
Wrapper for internally indexed communicator label. Always invokes UPstream::allocateCommunicatorCompo...
Definition UPstream.H:2546
@ scheduled
"scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
Definition UPstream.H:83
@ nonBlocking
"nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
Definition UPstream.H:84
@ buffered
"buffered" : (MPI_Bsend, MPI_Recv)
Definition UPstream.H:82
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition UPstream.H:1074
static const Enum< commsTypes > commsTypeNames
Enumerated names for the communication types.
Definition UPstream.H:92
static std::streamsize mpi_receive(const UPstream::commsTypes commsType, void *buf, std::streamsize count, const UPstream::dataTypes dataTypeId, const int fromProcNo, const int tag, const int communicator, UPstream::Request *req=nullptr)
Receive buffer contents of specified data type from given processor.
commsTypes commsType() const noexcept
Get the communications type of the stream.
Definition UPstream.H:1958
@ type_byte
byte, char, unsigned char, ...
Definition UPstream.H:113
static void printStack(Ostream &os, int size=-1)
Helper function to print a stack, with optional upper limit.
static void beginTiming()
Update timer prior to measurement.
static void addGatherTime()
Add time increment to gather time.
static void addRequestTime()
Add time increment to request time.
static void addProbeTime()
Add time increment to probe time.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition error.H:600
unsigned int count(const UList< bool > &bools, const bool val=true)
Count number of 'true' entries.
Definition BitOps.H:73
DynamicList< MPI_Request > outstandingRequests_
Outstanding non-blocking operations.
DataTypeCountLookupTable dataTypesCount_
Fundamental count for each valid UPstream::dataTypes entry Indexed by UPstream::dataTypes enum.
std::string dataType_name(MPI_Datatype datatype)
Return MPI internal name for specified MPI_Datatype.
MPI_Datatype getDataType(UPstream::dataTypes id)
Lookup of dataTypes enumeration as an MPI_Datatype.
DynamicList< MPI_Comm > MPICommunicators_
void push_request(MPI_Request request, UPstream::Request *req=nullptr)
Transcribe MPI_Request to UPstream::Request (does not affect the stack of outstanding requests) or el...
void reset_request(UPstream::Request *req) noexcept
Reset UPstream::Request to MPI_REQUEST_NULL.
bool warnCommunicator(int comm) noexcept
True if warn communicator is active and not equal to given communicator.
prefixOSstream Perr
OSstream wrapped stderr (std::cerr) with parallel prefix.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition Ostream.H:519
errorManip< error > abort(error &err)
Definition errorManip.H:139
error FatalError
Error stream (stdout output on all processes), with additional 'FOAM FATAL ERROR' header text and sta...
#define FOAM_UNLIKELY(cond)
Definition stdFoam.H:64