Title: | R Interface to MPI for HPC Clusters (Programming with Big Data Project) |
---|---|
Description: | A simplified, efficient, interface to MPI for HPC clusters. It is a derivation and rethinking of the Rmpi package. pbdMPI embraces the prevalent parallel programming style on HPC clusters. Beyond the interface, a collection of functions for global work with distributed data and resource-independent RNG reproducibility is included. It is based on S4 classes and methods. |
Authors: | Wei-Chen Chen [aut, cre], George Ostrouchov [aut], Drew Schmidt [aut], Pragneshkumar Patel [aut], Hao Yu [aut], Christian Heckendorf [ctb] (FreeBSD), Brian Ripley [ctb] (Windows HPC Pack 2012), R Core team [ctb] (some functions are modified from the base packages), Sebastien Lamy de la Chapelle [aut] (fix check type for send/recv long vectors) |
Maintainer: | Wei-Chen Chen <[email protected]> |
License: | Mozilla Public License 2.0 |
Version: | 0.5-2 |
Built: | 2024-12-12 06:44:50 UTC |
Source: | CRAN |
A simplified, efficient, interface to MPI for HPC clusters. It is a derivation and rethinking of the Rmpi package that embraces the prevalent parallel programming style on HPC clusters. Beyond the interface, a collection of functions for global work with distributed data is included. It is based on S4 classes and methods.
This package requires an MPI library (OpenMPI, MPICH2, or LAM/MPI). Standard
installation in an R session with
> install.packages("pbdMPI")
should work in most cases.
On HPC clusters, it is
strongly recommended that you check with your HPC cluster documentation for
specific requirements, such as
module
software environments. Some module examples relevant to R and MPI
are $ module load openmpi
$ module load openblas
$ module load flexiblas
$ module load r
possibly giving specific versions and possibly with some upper case letters.
Although module software environments are widely used, the specific module
names and their dependence structure are not standard across cluster
installations. The command $ module avail
usually lists the available software modules on your cluster.
To install on the Unix command line after
downloading the source file, use R CMD INSTALL
.
If the MPI library is not found, after checking that you are loading the
correct module environments, the following arguments can be used to
specify its non-standard location on your system
Argument | Default |
--with-mpi-type |
OPENMPI |
--with-mpi-include |
${MPI_ROOT}/include |
--with-mpi-libpath |
${MPI_ROOT}/lib |
--with-mpi |
${MPI_ROOT} |
where ${MPI_ROOT}
is the path to the MPI root.
See the package source file pbdMPI/configure
for details.
Loading library(pbdMPI)
sets a few global variables, including the
environment .pbd_env
, where many defaults are set, and initializes MPI.
In most cases, the defaults should not be modified. Rather, the parameters
of the functions that use them should be changed. All codes must end
with finalize()
to cleanly exit MPI.
Most functions are assumed to run as Single Program, Multiple Data (SPMD), i.e. in batch mode. SPMD is based on cooperation between parallel copies of a single program, which is more scalable than a manager-workers approach that is natural in interactive programming. Interactivity with an HPC cluster is more efficiently handled by a client-server approach, such as that enabled by the remoter package.
On most clusters, codes run with mpirun
or
mpiexec
and Rscript
, such as > mpiexec -np 2 Rscript some_code.r
where some_code.r
contains the entire SPMD program. The MPI
Standard 4.0 recommends mpiexec
over mpirun
. Some
MPI implementations may have minor differences between the two but under
OpenMPI 5.0 they are synonyms that produce the same behavior.
The package source files provide several examples based on pbdMPI, such as
Directory | Examples |
pbdMPI/inst/examples/test_spmd/ |
main SPMD functions |
pbdMPI/inst/examples/test_rmpi/ |
analogues to Rmpi |
pbdMPI/inst/examples/test_parallel/ |
analogues to parallel |
pbdMPI/inst/examples/test_performance/ |
performance tests |
pbdMPI/inst/examples/test_s4/ |
S4 extension |
pbdMPI/inst/examples/test_cs/ |
client/server examples |
pbdMPI/inst/examples/test_long_vector/ |
long vector examples |
where test_long_vector
needs a recompile with setting
#define MPI_LONG_DEBUG 1
in pbdMPI/src/pkg_constant.h
.
The current version is mainly written and tested under OpenMPI environments on Linux systems (CentOS 7, RHEL 8, Xubuntu). Also, it is tested on macOS with Homebrew-installed OpenMPI and under MPICH2 environments on Windows systems, although the primary target systems are HPC clusters running Linux OS.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
allgather()
,
allreduce()
,
bcast()
,
gather()
,
reduce()
,
scatter()
.
## Not run: ### On command line, run each demo with 2 processors by ### (Use Rscript.exe on Windows systems) # mpiexec -np 2 Rscript -e "demo(allgather,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(allreduce,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(bcast,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(gather,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(reduce,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(scatter,'pbdMPI',ask=F,echo=F)" ### Or # execmpi("demo(allgather,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(allreduce,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(bcast,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(gather,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(reduce,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(scatter,'pbdMPI',ask=F,echo=F)", nranks = 2L) ## End(Not run)
## Not run: ### On command line, run each demo with 2 processors by ### (Use Rscript.exe on Windows systems) # mpiexec -np 2 Rscript -e "demo(allgather,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(allreduce,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(bcast,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(gather,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(reduce,'pbdMPI',ask=F,echo=F)" # mpiexec -np 2 Rscript -e "demo(scatter,'pbdMPI',ask=F,echo=F)" ### Or # execmpi("demo(allgather,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(allreduce,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(bcast,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(gather,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(reduce,'pbdMPI',ask=F,echo=F)", nranks = 2L) # execmpi("demo(scatter,'pbdMPI',ask=F,echo=F)", nranks = 2L) ## End(Not run)
This method lets all ranks gather objects from every rank in the same
communicator. The default return is a list of length equal to
comm.size(comm)
.
allgather(x, x.buffer = NULL, x.count = NULL, displs = NULL, comm = .pbd_env$SPMD.CT$comm, unlist = .pbd_env$SPMD.CT$unlist)
allgather(x, x.buffer = NULL, x.count = NULL, displs = NULL, comm = .pbd_env$SPMD.CT$comm, unlist = .pbd_env$SPMD.CT$unlist)
x |
an object to be gathered from all ranks. |
x.buffer |
a buffer to hold the return object which probably has
'size of |
x.count |
a vector of length ' |
displs |
|
comm |
a communicator number. |
unlist |
apply |
The arguments x.buffer
, x.count
, and displs
can be
left unspecified or NULL
and are computed for you.
If x.buffer
is specified, its type should be one of integer, double,
or raw according to the type of x
. Serialization and unserialization
is avoided for atomic vectors if they are
all the same size and x.buffer
is specified, or if different sizes and
both x.buffer
and x.count
are specified. A single vector
instead of a list is returned in these cases.
Class array
objects are gathered without serialization.
Complex objects can be gathered as serialization and unserialization is used on objects that are not of class "array" or atomic vectors.
The allgather
is efficient due to the underlying MPI parallel
communication and recursive doubling gathering algorithm that results in
a sublinear (log2(comm.size(comm))
) number of communication steps.
Also, serialization is applied only locally and in parallel.
See methods{"allgather"}
for S4 dispatch cases and the source code for
further details.
A list of length comm.size(comm)
, containing the gathered objects from
each rank, is returned to all ranks by default. An exception is for atomic
vectors, when
x.buffer
is specified, where a list is never formed and a single
vector is returned. In other cases, the unlist = TRUE
parameter
simply applies the unlist()
function to this list before returning.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
gather()
, allreduce()
, reduce()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples N <- 5 x <- (1:N) + N * .comm.rank y <- allgather(matrix(x, nrow = 1)) comm.print(y) y <- allgather(x, double(N * .comm.size)) comm.print(y) ### Finish finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples N <- 5 x <- (1:N) + N * .comm.rank y <- allgather(matrix(x, nrow = 1)) comm.print(y) y <- allgather(x, double(N * .comm.size)) comm.print(y) ### Finish finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
This method lets all ranks receive a reduction of objects from every rank in the same communicator based on a given operation. The default return is an object like the input and the default operation is the sum.
allreduce(x, x.buffer = NULL, op = .pbd_env$SPMD.CT$op, comm = .pbd_env$SPMD.CT$comm)
allreduce(x, x.buffer = NULL, op = .pbd_env$SPMD.CT$op, comm = .pbd_env$SPMD.CT$comm)
x |
an object to be reduced from all ranks. |
x.buffer |
for atomic vectors, a buffer to hold the return object which
has the same size and the same type as |
op |
the reduction operation to apply to |
comm |
a communicator number. |
All ranks are presumed to have x
of the same size and type.
Normally, x.buffer
is NULL
or unspecified, and is computed
for you. If specified for atomic vectors, the type should be one of integer,
double, or raw and be the same type as x
.
The allgather
is efficient due to the underlying MPI parallel
communication and recursive doubling reduction algorithm that results in
a sublinear (log2(comm.size(comm))
) number of reduction and
communication steps.
See methods{"allreduce"}
for S4 dispatch cases and the source code for
further details.
The reduced object of the same type as x
is returned to all ranks
by default.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
allgather()
, gather()
, reduce()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- allreduce(matrix(x, nrow = 1), op = \"sum\") comm.print(y) y <- allreduce(x, double(N), op = \"prod\") comm.print(y) comm.set.seed(1234, diff = TRUE) x <- as.logical(round(runif(N))) y <- allreduce(x, logical(N), op = \"land\") comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- allreduce(matrix(x, nrow = 1), op = \"sum\") comm.print(y) y <- allreduce(x, double(N), op = \"prod\") comm.print(y) comm.set.seed(1234, diff = TRUE) x <- as.logical(round(runif(N))) y <- allreduce(x, logical(N), op = \"land\") comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
These functions make calls to MPI_Alltoall()
and
MPI_Alltoallv()
.
spmd.alltoall.integer(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoall.double(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoall.raw(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.integer(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.double(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.raw(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm)
spmd.alltoall.integer(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoall.double(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoall.raw(x.send, x.recv, send.count, recv.count, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.integer(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.double(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm) spmd.alltoallv.raw(x.send, x.recv, send.count, recv.count, sdispls, rdispls, comm = .pbd_env$SPMD.CT$comm)
x.send |
an object to send. |
x.recv |
an object to receive |
send.count |
send counter |
recv.count |
recv counter |
sdispls |
send dis pls |
rdispls |
recv dis pls |
comm |
a communicator number. |
These are very low level functions. Use with cautions. Neigher S4 method nor long vector is supported.
These are very low level functions. Use with cautions. Neigher S4 method nor long vector is supported.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
allgather()
, allgatherv()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript --vanilla [...].r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. n <- as.integer(2) x <- 1:(.comm.size * n) comm.cat(\"Original x:\n\", quiet = TRUE) comm.print(x, all.rank = TRUE) x <- as.integer(x) y <- spmd.alltoall.integer(x, integer(length(x)), n, n) comm.cat(\"\nAlltoall y:\n\", quiet = TRUE) comm.print(y, all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript --vanilla [...].r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. n <- as.integer(2) x <- 1:(.comm.size * n) comm.cat(\"Original x:\n\", quiet = TRUE) comm.print(x, all.rank = TRUE) x <- as.integer(x) y <- spmd.alltoall.integer(x, integer(length(x)), n, n) comm.cat(\"\nAlltoall y:\n\", quiet = TRUE) comm.print(y, all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
The functions are parallel versions of apply and lapply functions.
pbdApply(X, MARGIN, FUN, ..., pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, barrier = TRUE) pbdLapply(X, FUN, ..., pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = FALSE, barrier = TRUE) pbdSapply(X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = FALSE, barrier = TRUE)
pbdApply(X, MARGIN, FUN, ..., pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, barrier = TRUE) pbdLapply(X, FUN, ..., pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = FALSE, barrier = TRUE) pbdSapply(X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE, pbd.mode = c("mw", "spmd", "dist"), rank.source = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = FALSE, barrier = TRUE)
X |
a matrix or array in |
MARGIN |
|
FUN |
as in the |
... |
optional arguments to |
simplify |
as in the |
USE.NAMES |
as in the |
pbd.mode |
mode of distributed data |
rank.source |
a rank of source where |
comm |
a communicator number. |
bcast |
if bcast to all ranks. |
barrier |
if barrier for all ranks. |
All functions are majorly called in manager/workers mode
(pbd.model = "mw"
), and just work the same as their
serial version.
If pbd.mode = "mw"
, the X
in rank.source
(manager)
will be distributed to the workers, then FUN
will be applied to
the new data, and results gathered to rank.source
.
“In SPMD, the manager is one of workers.”
...
is also scatter()
from rank.source
.
If pbd.mode = "spmd"
, the same copy of X
is expected
on all ranks, and the original apply()
, lapply()
,
or sapply()
will operate on part of
X
. An explicit allgather()
or gather()
will be needed to
aggregate the results.
If pbd.mode = "dist"
, different X
are expected on
all ranks, i.e. ‘distinct or distributed’ X
,
and original apply()
, lapply()
, or sapply()
will operate
on the distinct X
. An explicit allgather()
or gather()
will be needed to aggregate the results.
In SPMD, it is better to split data into pieces, so that X
is a local
piece of a global matrix. If the "apply" dimension is local, the base
apply()
function can be used.
A list or a matrix will be returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Example for pbdApply. N <- 100 x <- matrix((1:N) + N * .comm.rank, ncol = 10) y <- pbdApply(x, 1, sum, pbd.mode = \"mw\") comm.print(y) y <- pbdApply(x, 1, sum, pbd.mode = \"spmd\") comm.print(y) y <- pbdApply(x, 1, sum, pbd.mode = \"dist\") comm.print(y) ### Example for pbdApply for 3D array. N <- 60 x <- array((1:N) + N * .comm.rank, c(3, 4, 5)) dimnames(x) <- list(lat = paste(\"lat\", 1:3, sep = \"\"), lon = paste(\"lon\", 1:4, sep = \"\"), time = paste(\"time\", 1:5, sep = \"\")) comm.print(x[,, 1:2]) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"mw\") comm.print(y) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"spmd\") comm.print(y) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"dist\") comm.print(y) ### Example for pbdLapply. N <- 100 x <- split((1:N) + N * .comm.rank, rep(1:10, each = 10)) y <- pbdLapply(x, sum, pbd.mode = \"mw\") comm.print(unlist(y)) y <- pbdLapply(x, sum, pbd.mode = \"spmd\") comm.print(unlist(y)) y <- pbdLapply(x, sum, pbd.mode = \"dist\") comm.print(unlist(y)) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Example for pbdApply. N <- 100 x <- matrix((1:N) + N * .comm.rank, ncol = 10) y <- pbdApply(x, 1, sum, pbd.mode = \"mw\") comm.print(y) y <- pbdApply(x, 1, sum, pbd.mode = \"spmd\") comm.print(y) y <- pbdApply(x, 1, sum, pbd.mode = \"dist\") comm.print(y) ### Example for pbdApply for 3D array. N <- 60 x <- array((1:N) + N * .comm.rank, c(3, 4, 5)) dimnames(x) <- list(lat = paste(\"lat\", 1:3, sep = \"\"), lon = paste(\"lon\", 1:4, sep = \"\"), time = paste(\"time\", 1:5, sep = \"\")) comm.print(x[,, 1:2]) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"mw\") comm.print(y) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"spmd\") comm.print(y) y <- pbdApply(x, c(1, 2), sum, pbd.mode = \"dist\") comm.print(y) ### Example for pbdLapply. N <- 100 x <- split((1:N) + N * .comm.rank, rep(1:10, each = 10)) y <- pbdLapply(x, sum, pbd.mode = \"mw\") comm.print(unlist(y)) y <- pbdLapply(x, sum, pbd.mode = \"spmd\") comm.print(unlist(y)) y <- pbdLapply(x, sum, pbd.mode = \"dist\") comm.print(unlist(y)) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
This method lets a rank broadcast an object to every rank in the same communicator. The default return is the object.
bcast(x, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
bcast(x, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
x |
an object to be broadcast from all ranks. |
rank.source |
a rank of source where |
comm |
a communicator number. |
The same copy of x
is sent to all ranks.
See methods{"bcast"}
for S4 dispatch cases and the source code for
further details.
Every rank has x
returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
scatter()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. x <- matrix(1:5, nrow = 1) y <- bcast(x) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. x <- matrix(1:5, nrow = 1) y <- bcast(x) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
Given a total number of items, N, comm.chunk splits the number into chunks. Tailored especially for situations in SPMD style programming, potentially returning different results to each rank. Optionally, results for all ranks can be returned to all.
comm.chunk( N, form = "number", type = "balance", lo.side = "right", rng = FALSE, all.rank = FALSE, p = NULL, rank = NULL, comm = .pbd_env$SPMD.CT$comm, ... )
comm.chunk( N, form = "number", type = "balance", lo.side = "right", rng = FALSE, all.rank = FALSE, p = NULL, rank = NULL, comm = .pbd_env$SPMD.CT$comm, ... )
N |
The number of items to split into chunks. |
form |
Output a chunk as a single "number", as a "vector" of items from 1:N, or as a "seq" three parameters 'c(from, to, by)' of the base 'seq()' function (replaced deprecated "iopair" for offset and length in a file). Forms "ldim" and "bldim" are available only with type "equal" and are intended for setting "ddmatrix" (see package pbdDMAT) slots. |
type |
Is the primary load and location balance specification. The choices are: "balance" the chunks so they differ by no more than 1 item (used most frequently and default); "cycle" is the same as "balance" in terms of load but differs on location in that chunks are not contiguous, rather are assigned in a cycled way to ranks (note that "balance" and "cycle" are the same if 'form' is "number"); "equal" maximizes the number of same size chunks resulting in one or more smaller or even zero size chunks carrying the leftover (required by pbdDMAT block-cyclic layouts). |
lo.side |
If exact balance is not possible, put the smaller chunks on the "left" (low ranks) or on the "right" (high ranks). |
rng |
If TRUE, set up different L'Ecuyere random number generator streams.
Switch to stream |
all.rank |
FALSE returns only the chunk for rank r. TRUE returns a vector of length p (when form="number"), and a list of length p (when form="vector") each containing the output for the corresponding rank. |
p |
The number of chunks (processors). Normally, it is NOT specified and defaults to NULL, which assigns comm.size(comm). |
rank |
The rank of returned chunk. Normally, it is NOT specified and defaults to NULL, which assigns comm.rank(comm)). Note that ranks are numbered from 0 to p-1, whereas the list elements for all.rank=TRUE are numbered 1 to p. |
comm |
The communicator that determines MPI rank numbers. |
... |
If |
Various chunking options are possible when the number does not split evenly into equal chunks. The output form can be a number, a vector of items, or a few other special forms intended for pbdR components.
A numeric value from 0:N or a vector giving a subset of 1:N (depending on form) for the rank. If all.rank is TRUE, a vector or a list of vectors, respectively.
## Not run: ## Note that the p and rank parameters are provided by comm.size() and ## comm.rank(), respectively, when running SPMD in parallel. Normally, they ## are not specified unless testing in serial mode (as in this example). library(pbdIO) comm.chunk(16, all.rank = TRUE, p = 5) comm.chunk(16, type = "equal", all.rank = TRUE, p = 5) comm.chunk(16, type = "equal", lo.side = "left", all.rank = TRUE, p = 5) comm.chunk(16, p = 5, rank = 0) comm.chunk(16, p = 5, lo.side = "left", rank = 0) ## End(Not run)
## Not run: ## Note that the p and rank parameters are provided by comm.size() and ## comm.rank(), respectively, when running SPMD in parallel. Normally, they ## are not specified unless testing in serial mode (as in this example). library(pbdIO) comm.chunk(16, all.rank = TRUE, p = 5) comm.chunk(16, type = "equal", all.rank = TRUE, p = 5) comm.chunk(16, type = "equal", lo.side = "left", all.rank = TRUE, p = 5) comm.chunk(16, p = 5, rank = 0) comm.chunk(16, p = 5, lo.side = "left", rank = 0) ## End(Not run)
The functions provide controls to communicators.
barrier(comm = .pbd_env$SPMD.CT$comm) comm.is.null(comm = .pbd_env$SPMD.CT$comm) comm.rank(comm = .pbd_env$SPMD.CT$comm) comm.localrank(comm = .pbd_env$SPMD.CT$comm) comm.size(comm = .pbd_env$SPMD.CT$comm) comm.dup(comm, newcomm) comm.free(comm = .pbd_env$SPMD.CT$comm) init(set.seed = TRUE) finalize(mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize) is.finalized() comm.abort(errorcode = 1, comm = .pbd_env$SPMD.CT$comm) comm.split(comm = .pbd_env$SPMD.CT$comm, color = 0L, key = 0L, newcomm = .pbd_env$SPMD.CT$newcomm) comm.disconnect(comm = .pbd_env$SPMD.CT$comm) comm.connect(port.name, info = .pbd_env$SPMD.CT$info, rank.root = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, newcomm = .pbd_env$SPMD.CT$newcomm) comm.accept(port.name, info = .pbd_env$SPMD.CT$info, rank.root = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, newcomm = .pbd_env$SPMD.CT$newcomm) port.open(info = .pbd_env$SPMD.CT$info) port.close(port.name) serv.publish(port.name, serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) serv.unpublish(port.name, serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) serv.lookup(serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) intercomm.merge(intercomm = .pbd_env$SPMD.CT$intercomm, high = 0L, comm = .pbd_env$SPMD.CT$comm) intercomm.create(local.comm = .pbd_env$SPMD.CT$comm, local.leader = .pbd_env$SPMD.CT$rank.source, peer.comm = .pbd_env$SPMD.CT$intercomm, remote.leader = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, newintercomm = .pbd_env$SPMD.CT$newcomm) comm.c2f(comm = .pbd_env$SPMD.CT$comm)
barrier(comm = .pbd_env$SPMD.CT$comm) comm.is.null(comm = .pbd_env$SPMD.CT$comm) comm.rank(comm = .pbd_env$SPMD.CT$comm) comm.localrank(comm = .pbd_env$SPMD.CT$comm) comm.size(comm = .pbd_env$SPMD.CT$comm) comm.dup(comm, newcomm) comm.free(comm = .pbd_env$SPMD.CT$comm) init(set.seed = TRUE) finalize(mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize) is.finalized() comm.abort(errorcode = 1, comm = .pbd_env$SPMD.CT$comm) comm.split(comm = .pbd_env$SPMD.CT$comm, color = 0L, key = 0L, newcomm = .pbd_env$SPMD.CT$newcomm) comm.disconnect(comm = .pbd_env$SPMD.CT$comm) comm.connect(port.name, info = .pbd_env$SPMD.CT$info, rank.root = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, newcomm = .pbd_env$SPMD.CT$newcomm) comm.accept(port.name, info = .pbd_env$SPMD.CT$info, rank.root = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, newcomm = .pbd_env$SPMD.CT$newcomm) port.open(info = .pbd_env$SPMD.CT$info) port.close(port.name) serv.publish(port.name, serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) serv.unpublish(port.name, serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) serv.lookup(serv.name = .pbd_env$SPMD.CT$serv.name, info = .pbd_env$SPMD.CT$info) intercomm.merge(intercomm = .pbd_env$SPMD.CT$intercomm, high = 0L, comm = .pbd_env$SPMD.CT$comm) intercomm.create(local.comm = .pbd_env$SPMD.CT$comm, local.leader = .pbd_env$SPMD.CT$rank.source, peer.comm = .pbd_env$SPMD.CT$intercomm, remote.leader = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, newintercomm = .pbd_env$SPMD.CT$newcomm) comm.c2f(comm = .pbd_env$SPMD.CT$comm)
comm |
a communicator number. |
mpi.finalize |
if MPI should be shutdown. |
set.seed |
if a random seed preset. |
port.name |
a port name with default maximum length 1024 characters for OpenMPI. |
info |
a info number. |
rank.root |
a root rank. |
newcomm |
a new communicator number. |
color |
control of subset assignment. |
key |
control of rank assigment. |
serv.name |
a service name. |
errorcode |
an error code to abort MPI. |
intercomm |
a intercommunicator number. |
high |
used to order the groups within comm. |
local.comm |
a local communicator number. |
local.leader |
the leader number of local communicator. |
peer.comm |
a peer communicator number. |
remote.leader |
the remote leader number of peer communicator. |
newintercomm |
a new intercommunicator number. |
tag |
a tag number. |
Another functions are direct calls to MPI library.
barrier()
blocks all processors until everyone call this.
comm.is.null()
returns -1
if the array of communicators is not
allocated, i.e. init()
is not called yet. It returns 1
if the communicator is not initialized, i.e. NULL
. It
returns 0
if the communicator is initialized.
comm.rank()
returns the processor's rank for the given comm
.
comm.size()
returns the total processes for the given comm
.
comm.dup()
duplicate a newcomm
from comm
.
comm.free()
free a comm
.
init()
initializes a MPI world, and set two global variables
.comm.size
and .comm.rank
in .GlobalEnv
. A random seed
will be preset by default (Sys.getpid() + Sys.time()
) to the package
rlecuyer.
finalize()
frees memory and finishes a MPI world if
mpi.finalize = TRUE
.
is.finalized()
checks if MPI is already finalized.
comm.abort()
aborts MPI.
comm.split()
create a newcomm
by color and key.
comm.disconnect()
frees a comm
.
comm.connect()
connects a newcomm
.
comm.accept()
accepts a newcomm
.
port.open()
opens a port and returns the port name.
port.close()
closes a port by name.
serv.publish()
publishs a service via port.name
.
serv.unpublish()
unpublishs a service via port.name
.
serv.lookup()
lookup the serv.name
and returns the port name.
intercomm.merge()
merges the intercomm
to intracommunicator.
intercomm.create()
creates a new intercomm
from two
peer intracommunicators.
comm.c2f()
returns an integer for Fortran MPI support.
Most function return an invisible state of MPI call.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples . comm.print(.comm.size) comm.print(.comm.rank, all.rank = TRUE) comm.print(comm.rank(), rank.print = 1) comm.print(comm.c2f()) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples . comm.print(.comm.size) comm.print(.comm.rank, all.rank = TRUE) comm.print(comm.rank(), rank.print = 1) comm.print(comm.c2f()) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This method lets one rank gather objects from every rank in the same
communicator. The default return is a list of length equal to comm size
.
gather(x, x.buffer = NULL, x.count = NULL, displs = NULL, rank.dest = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, unlist = .pbd_env$SPMD.CT$unlist)
gather(x, x.buffer = NULL, x.count = NULL, displs = NULL, rank.dest = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, unlist = .pbd_env$SPMD.CT$unlist)
x |
an object to be gathered from all ranks. |
x.buffer |
a buffer to hold the return object which probably has
'size of |
x.count |
a vector of length ' |
displs |
|
rank.dest |
a rank of destination where all |
comm |
a communicator number. |
unlist |
apply |
The arguments x.buffer
, x.count
, and displs
can be
left unspecified or NULL
and are computed for you.
If x.buffer
is specified, its type should be one of integer, double,
or raw according to the type of x
. Serialization and unserialization
is avoided for atomic vectors if they are
all the same size and x.buffer
is specified, or if different sizes and
both x.buffer
and x.count
are specified. A single vector
instead of a list is returned in these cases.
Class array
objects are gathered without serialization.
Complex objects can be gathered as serialization and unserialization is used on objects that are not of class "array" or atomic vectors.
The gather
is efficient due to the underlying MPI parallel
communication and recursive doubling gathering algorithm that results in
a sublinear (log2(comm.size(comm))
) number of communication steps.
Also, serialization is applied only locally and in parallel.
See methods{"gather"}
for S4 dispatch cases and the source code for
further details.
Only rank.dest
(by default rank 0) receives the gathered object. All
other ranks receive NULL
.
See allgather()
for a description of the gathered object.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
allgather()
, allreduce()
, reduce()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- gather(matrix(x, nrow = 1)) comm.print(y) y <- gather(x, double(N * .comm.size)) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- gather(matrix(x, nrow = 1)) comm.print(y) y <- gather(x, double(N * .comm.size)) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
These functions are designed to get MPI and/or pbdMPI configures that were usually needed at the time of pbdMPI installation. In particular, to configure, link, and compile with 'libmpi*.so' or so.
get.conf(arg, arch = '', package = "pbdMPI", return = FALSE) get.lib(arg, arch, package = "pbdPROF") get.sysenv(flag)
get.conf(arg, arch = '', package = "pbdMPI", return = FALSE) get.lib(arg, arch, package = "pbdPROF") get.sysenv(flag)
arg |
an argument to be searched in the configuration file |
arch |
system architecture |
package |
pakge name |
return |
to return (or print if FALSE) the search results or not |
flag |
a system flag that is typically used in windows environment
|
get.conf()
and get.lib()
are typically used by 'pbd*/configure.ac',
'pbd*/src/Makevars.in', and/or 'pbd*/src/Makevar.win' to find
the default configurations from 'pbd*/etc${R_ARCH}/Makconf'.
get.sysenv()
is only called by 'pbdMPI/src/Makevars.win' to obtain
possible MPI dynamic/static library from the environment variable
'MPI_ROOT' preset by users.
Typically, there are no return values, but the values are cat()
to
scrrn or stdin.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: library(pbdMPI) if(Sys.info()["sysname"] != "Windows"){ get.conf("MPI_INCLUDE_PATH"); cat("\n") get.conf("MPI_LIBPATH"); cat("\n") get.conf("MPI_LIBNAME"); cat("\n") get.conf("MPI_LIBS"); cat("\n") } else{ get.conf("MPI_INCLUDE", "/i386"); cat("\n") get.conf("MPI_LIB", "/i386"); cat("\n") get.conf("MPI_INCLUDE", "/x64"); cat("\n") get.conf("MPI_LIB", "/x64"); cat("\n") } ## End(Not run)
## Not run: library(pbdMPI) if(Sys.info()["sysname"] != "Windows"){ get.conf("MPI_INCLUDE_PATH"); cat("\n") get.conf("MPI_LIBPATH"); cat("\n") get.conf("MPI_LIBNAME"); cat("\n") get.conf("MPI_LIBS"); cat("\n") } else{ get.conf("MPI_INCLUDE", "/i386"); cat("\n") get.conf("MPI_LIB", "/i386"); cat("\n") get.conf("MPI_INCLUDE", "/x64"); cat("\n") get.conf("MPI_LIB", "/x64"); cat("\n") } ## End(Not run)
This function obtains job id which can be used to divide jobs.
get.jid(n, method = .pbd_env$SPMD.CT$divide.method[1], all = FALSE, comm = .pbd_env$SPMD.CT$comm, reduced = FALSE)
get.jid(n, method = .pbd_env$SPMD.CT$divide.method[1], all = FALSE, comm = .pbd_env$SPMD.CT$comm, reduced = FALSE)
n |
total number of jobs. |
method |
a way to divide jobs. |
all |
indicate if return all id for each processor. |
comm |
a communicator number. |
reduced |
indicate if return should be a reduced representation. |
n
is total number of jobs needed to be divided into all processors
(comm.size(comm)
, i.e. 1:n
will be split according to
the rank of processor (comm.rank(comm)
) and method
.
Job id will be returned. Currently, three possible methods are provided.
"block"
will use return id's which are nearly equal size blocks.
For example,
7 jobs in 4 processors will have jid=1
for rank 0, jid=2,3
for rank 1, jid=4,5
for rank 2, and jid=6,7
for rank 3.
"block0"
will use return id's which are nearly equal size blocks,
in the opposite direction of "block"
.
For example,
7 jobs in 4 processors will have jid=1,2
for rank 0, jid=3,4
for rank 1, jid=5,6
for rank 2, and jid=7
for rank 3.
"cycle"
will use return id's which are nearly equal size in cycle.
For example, 7 jobs in 4 processors will have jid=1,5
for rank 0,
jid=2,6
for rank 1, jid=3,7
for rank 2, and jid=4
for rank 3.
get.jid()
returns a vector containing job id for each individual
processor if all = FALSE
. While it returns a list containing all
job id for all processor if all = TRUE
. The list has length equal
to comm.size
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
task.pull() and comm.chunk()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.cat(\">>> block\n\", quiet = TRUE) jid <- get.jid(7, method = \"block\") comm.print(jid, all.rank = TRUE) comm.cat(\">>> cycle\n\", quiet = TRUE) jid <- get.jid(7, method = \"cycle\") comm.print(jid, all.rank = TRUE) comm.cat(\">>> block (all)\n\", quiet = TRUE) alljid <- get.jid(7, method = \"block\", all = TRUE) comm.print(alljid) comm.cat(\">>> cycle (all)\n\", quiet = TRUE) alljid <- get.jid(7, method = \"cycle\", all = TRUE) comm.print(alljid) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.cat(\">>> block\n\", quiet = TRUE) jid <- get.jid(7, method = \"block\") comm.print(jid, all.rank = TRUE) comm.cat(\">>> cycle\n\", quiet = TRUE) jid <- get.jid(7, method = \"cycle\") comm.print(jid, all.rank = TRUE) comm.cat(\">>> block (all)\n\", quiet = TRUE) alljid <- get.jid(7, method = \"block\", all = TRUE) comm.print(alljid) comm.cat(\">>> cycle (all)\n\", quiet = TRUE) alljid <- get.jid(7, method = \"cycle\", all = TRUE) comm.print(alljid) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
This function provide global all pairs.
comm.allpairs(N, diag = FALSE, symmetric = TRUE, comm = .pbd_env$SPMD.CT$comm)
comm.allpairs(N, diag = FALSE, symmetric = TRUE, comm = .pbd_env$SPMD.CT$comm)
N |
number of elements for matching, |
diag |
if matching the same elements, |
symmetric |
if matching upper triangular elements. TRUE for
|
comm |
a communicator number. |
The function generates all combinations of N
elements.
The function returns a gbd matrix in row blocks with 2 columns
named i
and j
. The number of rows is dependent
on the options diag
and symmetric
. If diag = TRUE
and
symmetric = FALSE
, then this case has the maximum number of rows,
N^2
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.dist()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. id.matrix <- comm.allpairs(comm.size() + 1) comm.print(id.matrix, all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. id.matrix <- comm.allpairs(comm.size() + 1) comm.print(id.matrix, all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global any and all applying on distributed data for all ranks.
comm.any(x, na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.all(x, na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.allcommon(x, comm = .pbd_env$SPMD.CT$comm, lazy.check = .pbd_env$SPMD.CT$lazy.check)
comm.any(x, na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.all(x, na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.allcommon(x, comm = .pbd_env$SPMD.CT$comm, lazy.check = .pbd_env$SPMD.CT$lazy.check)
x |
a vector. |
na.rm |
if |
comm |
a communicator number. |
lazy.check |
if TRUE, then |
These functions will apply any()
and all()
locally, and
apply allgather()
to get all local results from other ranks, then
apply any()
and all()
on all local results.
comm.allcommon()
is to check if x
is exactly the same
across all ranks. This is a vectorized operation on x
where the
input and output have the same length of vector, while comm.any()
and comm.all()
return a scaler.
Note that lazy.check = TRUE
is faster as
number of cores is large, but it may cause some inconsistence in some
cases. lazy.check = FALSE
is much slower, but it provides more
accurate checking.
The global check values (TRUE, FALSE, NA) are returned to all ranks.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. if(comm.rank() == 0){ a <- c(T, F, NA) } else{ a <- T } comm.any(a) comm.all(a) comm.any(a, na.rm = TRUE) comm.all(a, na.rm = TRUE) comm.allcommon(1:3) if(comm.rank() == 0){ a <- 1:3 } else{ a <- 3:1 } comm.allcommon.integer(a) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. if(comm.rank() == 0){ a <- c(T, F, NA) } else{ a <- T } comm.any(a) comm.all(a) comm.any(a, na.rm = TRUE) comm.all(a, na.rm = TRUE) comm.allcommon(1:3) if(comm.rank() == 0){ a <- 1:3 } else{ a <- 3:1 } comm.allcommon.integer(a) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This function redistributes a regular matrix existed in rank.soure and turns it in a gbd matrix in row blocks.
comm.as.gbd(X, balance.method = .pbd_env$SPMD.IO$balance.method, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
comm.as.gbd(X, balance.method = .pbd_env$SPMD.IO$balance.method, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
X |
a regular |
balance.method |
a balance method. |
rank.source |
a rank of source where elements of |
comm |
a communicator number. |
X
matrix in rank.source
will be redistributed as a gbd
matrix in row blocks.
This function will first set NULL
to X
if it is not
located in rank.source
, then called comm.load.balance()
to redistributed the one located in rank.source
to all other ranks.
A X.gbd
will be returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.load.balance()
,
comm.read.table()
and
comm.write.table()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. X <- matrix(1:15, ncol = 3) X.gbd <- comm.as.gbd(X) comm.print(X.gbd, all.rank = TRUE) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. X <- matrix(1:15, ncol = 3) X.gbd <- comm.as.gbd(X) comm.print(X.gbd, all.rank = TRUE) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
These functions are global balance methods for gbd data.frame
(or
matrix
) distributed in row blocks.
comm.balance.info(X.gbd, balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.load.balance(X.gbd, bal.info = NULL, balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.unload.balance(new.X.gbd, bal.info, comm = .pbd_env$SPMD.CT$comm)
comm.balance.info(X.gbd, balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.load.balance(X.gbd, bal.info = NULL, balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.unload.balance(new.X.gbd, bal.info, comm = .pbd_env$SPMD.CT$comm)
X.gbd |
a gbd |
balance.method |
a balance method. |
bal.info |
a balance information returned from
|
new.X.gbd |
a new gbd of |
comm |
a communicator number. |
A typical use is to balance an input dataset X.gbd
from
comm.read.table()
. Since by default, a two dimension
data.frame
is distributed in row blocks, but each processor (rank)
may not (or closely) have the same number of rows.
These functions redistribute the data.frame
(and maybe
matrix
) according to the specified way in bal.info
.
Currently, there are three balance methods are supported, block
(uniform distributed but favor higher ranks), block0
(as block
but favor lower ranks), and block.cyclic
(as block cyclic with one big block in one cycle).
comm.balance.info()
returns a list
containing
balance information based on the input X.gbd
and balance.method
.
comm.load.balance()
returns a new gbd data.frame
(or
matrix
).
comm.unload.balance()
also returns the new gbd data.frame
back
to the original X.gbd
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.read.table()
,
comm.write.table()
, and
comm.as.gbd()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Get two gbd row-block data.frame. da.block <- iris[get.jid(nrow(iris), method = \"block\"),] da.block0 <- iris[get.jid(nrow(iris), method = \"block0\"),] ### Load balance one and unload it. bal.info <- comm.balance.info(da.block0) da.new <- comm.load.balance(da.block0) da.org <- comm.unload.balance(da.new, bal.info) ### Check if all are equal. comm.print(c(sum(da.new != da.block), sum(da.org != da.block0)), all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Get two gbd row-block data.frame. da.block <- iris[get.jid(nrow(iris), method = \"block\"),] da.block0 <- iris[get.jid(nrow(iris), method = \"block0\"),] ### Load balance one and unload it. bal.info <- comm.balance.info(da.block0) da.new <- comm.load.balance(da.block0) da.org <- comm.unload.balance(da.new, bal.info) ### Check if all are equal. comm.print(c(sum(da.new != da.block), sum(da.org != da.block0)), all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global base functions applying on distributed data for all ranks.
comm.length(x, comm = .pbd_env$SPMD.CT$comm) comm.sum(..., na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.mean(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.var(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.sd(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm)
comm.length(x, comm = .pbd_env$SPMD.CT$comm) comm.sum(..., na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.mean(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.var(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.sd(x, na.rm = TRUE, comm = .pbd_env$SPMD.CT$comm)
x |
a vector. |
... |
as in |
na.rm |
logical, if remove |
comm |
a communicator number. |
These functions will apply globally length()
, sum()
,
mean()
, var()
, and sd()
.
The global values are returned to all ranks.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.length(a) comm.print(b) b <- comm.sum(a) comm.print(b) b <- comm.mean(a) comm.print(b) b <- comm.var(a) comm.print(b) b <- comm.sd(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.length(a) comm.print(b) b <- comm.sum(a) comm.print(b) b <- comm.mean(a) comm.print(b) b <- comm.var(a) comm.print(b) b <- comm.sd(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions globally compute distance for all ranks.
comm.dist(X.gbd, method = "euclidean", diag = FALSE, upper = FALSE, p = 2, comm = .pbd_env$SPMD.CT$comm, return.type = c("common", "gbd"))
comm.dist(X.gbd, method = "euclidean", diag = FALSE, upper = FALSE, p = 2, comm = .pbd_env$SPMD.CT$comm, return.type = c("common", "gbd"))
X.gbd |
a gbd matrix. |
method |
as in |
diag |
as in |
upper |
as in |
p |
as in |
comm |
a communicator number. |
return.type |
returning type for the distance. |
The distance function is implemented for a distributed matrix.
The return type common
is only useful when the number
of rows of the matrix is small since the returning matrix is N * N
for every rank where N
is the total number of rows of X.gbd
of all ranks.
The return type gbd
returns a gbd matrix (distributed across
all ranks, and the gbd matrix has 3 columns, named "i", "j", and "value",
where (i, j)
is the global indices of the
i-th and j-th rows of X.gbd
, and value
is the corresponding
distance. The (i, j)
is ordered as a distance matrix.
A full distance matrix is returned from the common
return type.
Suppose N.gbd
is total rows of X.gbd
, then
the distance will have N.gbd * (N.gbd - 1) / 2
elements
and the distance matrix will have N.gbd^2
elements.
A gbd distance matrix with 3 columns is returned from the
gbd
return type.
The distance or distance matrix could be huge.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.allpairs()
and
comm.pairwise()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.set.seed(123456, diff = TRUE) X.gbd <- matrix(runif(6), ncol = 3) dist.X.common <- comm.dist(X.gbd) dist.X.gbd <- comm.dist(X.gbd, return.type = \"gbd\") ### Verify. dist.X <- dist(do.call(\"rbind\", allgather(X.gbd))) comm.print(all(dist.X == dist.X.common)) ### Verify 2. dist.X.df <- do.call(\"rbind\", allgather(dist.X.gbd)) comm.print(all(dist.X == dist.X.df[, 3])) comm.print(dist.X) comm.print(dist.X.df) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.set.seed(123456, diff = TRUE) X.gbd <- matrix(runif(6), ncol = 3) dist.X.common <- comm.dist(X.gbd) dist.X.gbd <- comm.dist(X.gbd, return.type = \"gbd\") ### Verify. dist.X <- dist(do.call(\"rbind\", allgather(X.gbd))) comm.print(all(dist.X == dist.X.common)) ### Verify 2. dist.X.df <- do.call(\"rbind\", allgather(dist.X.gbd)) comm.print(all(dist.X == dist.X.df[, 3])) comm.print(dist.X) comm.print(dist.X.df) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
A binding for match.arg()
that uses comm.stop()
rather
so that the error message (if there is one) is managed according
to the rules of .pbd_env$SPMD.CT
.
comm.match.arg(arg, choices, several.ok=FALSE, ..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit)
comm.match.arg(arg, choices, several.ok=FALSE, ..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit)
arg , choices , several.ok
|
see match.arg() |
... |
ignored. |
all.rank |
if all ranks print (default = FALSE). |
rank.print |
rank for printing if not all ranks print (default = 0). |
comm |
communicator for printing (default = 1). |
mpi.finalize |
if MPI should be shutdown. |
quit |
if quit R when errors happen. |
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
This function provides global pairwise evaluations.
comm.pairwise(X, pairid.gbd = NULL, FUN = function(x, y, ...){ return(as.vector(dist(rbind(x, y), ...))) }, ..., diag = FALSE, symmetric = TRUE, comm = .pbd_env$SPMD.CT$comm)
comm.pairwise(X, pairid.gbd = NULL, FUN = function(x, y, ...){ return(as.vector(dist(rbind(x, y), ...))) }, ..., diag = FALSE, symmetric = TRUE, comm = .pbd_env$SPMD.CT$comm)
X |
a common matrix across ranks, or a gbd matrix. (See details.) |
pairid.gbd |
a pair-wise id in a gbd format. (See details.) |
FUN |
a function to be evaluated for given pairs. |
... |
extra variables for |
diag |
if matching the same elements, |
symmetric |
if matching upper triangular elements. TRUE for
|
comm |
a communicator number. |
This function evaluates the objective function
FUN(X[i,], X[j, ])
(usually distance of two elements)
on any given pair (i, j)
of a matrix X
.
The input X
should be in common across all ranks if pairid.gbd
is provided, e.g. from comm.pairwise()
.
i.e. X
is exactly the same in every ranks, but
pairid.gbd
is different and in gbd format indicating the row pair
(i, j)
should be evaluated. The returning gbd matrix is ordered
and indexed by pairid.gbd
.
Note that checking consistence of X
across all ranks is not
implemented within this function since that drops performance and
may be not accurate.
The input X
should be a gbd format in row major blocks
(i.e. X.gbd
) if pairid.gbd
is NULL
. A internal
pair indices will be built implicitly for evaluation. The returning
gbd matrix is ordered and indexed by X.gbd
.
This function returns a common matrix with 3 columns
named i
, j
, and value
. Each value
is the
returned value and computed by FUN(X[i,], X[j,])
where
(i, j)
is the global index as ordered in a distance matrix
for i-th row and j-th columns.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.pairwise()
, and
comm.dist()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.set.seed(123456, diff = FALSE) X <- matrix(rnorm(10), ncol = 2) id.matrix <- comm.allpairs(nrow(X)) ### Method original. dist.org <- dist(X) ### Method 1. dist.common <- comm.pairwise(X, pairid.gbd = id.matrix) ### Method 2. # if(comm.rank() != 0){ # X <- matrix(0, nrow = 0, ncol = 4) # } X.gbd <- comm.as.gbd(X) ### The other way. dist.gbd <- comm.pairwise(X.gbd) ### Verify. d.org <- as.vector(dist.org) d.1 <- do.call(\"c\", allgather(dist.common[, 3])) d.2 <- do.call(\"c\", allgather(dist.gbd[, 3])) comm.print(all(d.org == d.1)) comm.print(all(d.org == d.2)) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. comm.set.seed(123456, diff = FALSE) X <- matrix(rnorm(10), ncol = 2) id.matrix <- comm.allpairs(nrow(X)) ### Method original. dist.org <- dist(X) ### Method 1. dist.common <- comm.pairwise(X, pairid.gbd = id.matrix) ### Method 2. # if(comm.rank() != 0){ # X <- matrix(0, nrow = 0, ncol = 4) # } X.gbd <- comm.as.gbd(X) ### The other way. dist.gbd <- comm.pairwise(X.gbd) ### Verify. d.org <- as.vector(dist.org) d.1 <- do.call(\"c\", allgather(dist.common[, 3])) d.2 <- do.call(\"c\", allgather(dist.gbd[, 3])) comm.print(all(d.org == d.1)) comm.print(all(d.org == d.2)) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
The functions globally print or cat a variable from specified processors, by default messages is shown on screen.
comm.print(x, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, quiet = .pbd_env$SPMD.CT$print.quiet, flush = .pbd_env$SPMD.CT$msg.flush, barrier = .pbd_env$SPMD.CT$msg.barrier, con = stdout(), ...) comm.cat(..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, quiet = .pbd_env$SPMD.CT$print.quiet, sep = " ", fill = FALSE, labels = NULL, append = FALSE, flush = .pbd_env$SPMD.CT$msg.flush, barrier = .pbd_env$SPMD.CT$msg.barrier, con = stdout())
comm.print(x, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, quiet = .pbd_env$SPMD.CT$print.quiet, flush = .pbd_env$SPMD.CT$msg.flush, barrier = .pbd_env$SPMD.CT$msg.barrier, con = stdout(), ...) comm.cat(..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, quiet = .pbd_env$SPMD.CT$print.quiet, sep = " ", fill = FALSE, labels = NULL, append = FALSE, flush = .pbd_env$SPMD.CT$msg.flush, barrier = .pbd_env$SPMD.CT$msg.barrier, con = stdout())
x |
a variable to be printed. |
... |
variables to be cat. |
all.rank |
if all ranks print (default = FALSE). |
rank.print |
rank for printing if not all ranks print (default = 0). |
comm |
communicator for printing (default = 1). |
quiet |
FALSE for printing rank number. |
sep |
sep argument as in the |
fill |
fill argument as in the |
labels |
labels argument as in the |
append |
labels argument as in the |
flush |
if flush |
barrier |
if barrier |
con |
|
Warning: These two functions use barrier()
to make sure
the well printing process on screen, so should be called by all processors
to avoid a deadlock. A typical misuse is called inside a condition check,
such as if(.comm.rank == 0) comm.cat(...)
.
rank.print
can be a integer vector containing the ranks of
processors which print messages.
A print()
or cat()
is called for the specified processors
and the messages of the input variables is shown on screen by default.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Example. comm.print(comm.rank(), rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Example. comm.print(comm.rank(), rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global range, max and min applying on distributed data for all ranks.
comm.range(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.max(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.min(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm)
comm.range(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.max(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm) comm.min(..., na.rm = FALSE, comm = .pbd_env$SPMD.CT$comm)
... |
an 'numeric' objects. |
na.rm |
if |
comm |
a communicator number. |
These functions will apply range()
, max()
and min()
locally, and apply allgather to get all local results from other ranks,
then apply range()
, max()
and min()
on all local
results.
The global values (range, max, or min) are returned to all ranks.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.range(a) comm.print(b) b <- comm.max(a) comm.print(b) b <- comm.min(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.range(a) comm.print(b) b <- comm.max(a) comm.print(b) b <- comm.min(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global reading from specified file.
comm.read.table(file, header = FALSE, sep = "", quote = "\"'", dec = ".", na.strings = "NA", colClasses = NA, nrows = -1, skip = 0, check.names = TRUE, fill = !blank.lines.skip, strip.white = FALSE, blank.lines.skip = TRUE, comment.char = "#", allowEscapes = FALSE, flush = FALSE, fileEncoding = "", encoding = "unknown", read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.read.csv(file, header = TRUE, sep = ",", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ..., read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.read.csv2(file, header = TRUE, sep = ";", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ..., read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm)
comm.read.table(file, header = FALSE, sep = "", quote = "\"'", dec = ".", na.strings = "NA", colClasses = NA, nrows = -1, skip = 0, check.names = TRUE, fill = !blank.lines.skip, strip.white = FALSE, blank.lines.skip = TRUE, comment.char = "#", allowEscapes = FALSE, flush = FALSE, fileEncoding = "", encoding = "unknown", read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.read.csv(file, header = TRUE, sep = ",", quote = "\"", dec = ".", fill = TRUE, comment.char = "", ..., read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm) comm.read.csv2(file, header = TRUE, sep = ";", quote = "\"", dec = ",", fill = TRUE, comment.char = "", ..., read.method = .pbd_env$SPMD.IO$read.method[1], balance.method = .pbd_env$SPMD.IO$balance.method[1], comm = .pbd_env$SPMD.CT$comm)
file |
as in |
header |
as in |
sep |
as in |
quote |
as in |
dec |
as in |
na.strings |
as in |
colClasses |
as in |
nrows |
as in |
skip |
as in |
check.names |
as in |
fill |
as in |
strip.white |
as in |
blank.lines.skip |
as in |
comment.char |
as in |
allowEscapes |
as in |
flush |
as in |
fileEncoding |
as in |
encoding |
as in |
... |
as in |
read.method |
either "gbd" or "common". |
balance.method |
balance method for |
comm |
a communicator number. |
These functions will apply read.table()
locally and sequentially
from rank 0, 1, 2, ...
By default, rank 0 reads the file only, then scatter to other ranks for
small datasets (.pbd_env$SPMD.IO$max.read.size
) in
read.method = "gbd"
.
(bcast to others in read.method = "common"
.)
As dataset size increases, the reading is performed from each ranks and read portion of rows in "gbd" format as described in pbdDEMO vignettes and used in pmclust.
comm.load.balance()
is called for "gbd" method as
as nrows = -1
and skip = 0
are set. Note that the default
method "block" is the better way for performance in general that distributes
equally and leaves residuals on higher ranks evenly.
"block0" is the other way around. "block.cyclic" is only useful for
converting to ddmatrix
as in pbdDMAT.
A distributed data.frame is returned.
All factors are disable and read as characters or as what data should be.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.load.balance()
and
comm.write.table()
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Check. if(comm.size() != 2){ comm.stop(\"2 processors are requried.\") } ### Manually distributed iris. da <- iris[get.jid(nrow(iris)),] ### Dump data. comm.write.table(da, file = \"iris.txt\", quote = FALSE, sep = \"\\t\", row.names = FALSE) ### Read back in. da.gbd <- comm.read.table(\"iris.txt\", header = TRUE, sep = \"\\t\", quote = \"\") comm.print(c(nrow(da), nrow(da.gbd)), all.rank = TRUE) ### Read in common. da.common <- comm.read.table(\"iris.txt\", header = TRUE, sep = \"\\t\", quote = \"\", read.method = \"common\") comm.print(c(nrow(da.common), sum(da.common != iris))) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Check. if(comm.size() != 2){ comm.stop(\"2 processors are requried.\") } ### Manually distributed iris. da <- iris[get.jid(nrow(iris)),] ### Dump data. comm.write.table(da, file = \"iris.txt\", quote = FALSE, sep = \"\\t\", row.names = FALSE) ### Read back in. da.gbd <- comm.read.table(\"iris.txt\", header = TRUE, sep = \"\\t\", quote = \"\") comm.print(c(nrow(da), nrow(da.gbd)), all.rank = TRUE) ### Read in common. da.common <- comm.read.table(\"iris.txt\", header = TRUE, sep = \"\\t\", quote = \"\", read.method = \"common\") comm.print(c(nrow(da.common), sum(da.common != iris))) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
A Rprof function for use with parallel codes executed in the batch SPMD style.
comm.Rprof(filename = "Rprof.out", append = FALSE, interval = 0.02, memory.profiling = FALSE, gc.profiling = FALSE, line.profiling = FALSE, numfiles = 100L, bufsize = 10000L, all.rank = .pbd_env$SPMD.CT$Rprof.all.rank, rank.Rprof = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
comm.Rprof(filename = "Rprof.out", append = FALSE, interval = 0.02, memory.profiling = FALSE, gc.profiling = FALSE, line.profiling = FALSE, numfiles = 100L, bufsize = 10000L, all.rank = .pbd_env$SPMD.CT$Rprof.all.rank, rank.Rprof = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
filename |
as in |
append |
as in |
interval |
as in |
memory.profiling |
as in |
gc.profiling |
as in |
line.profiling |
as in |
numfiles |
as in |
bufsize |
as in |
all.rank |
if calling Rprof on all ranks (default = FALSE). |
rank.Rprof |
rank for calling Rprof if
|
comm |
a communicator number. |
as in Rprof()
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
This function globally sorts distributed data for all ranks.
comm.sort(x, decreasing = FALSE, na.last = NA, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
comm.sort(x, decreasing = FALSE, na.last = NA, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
x |
a vector. |
decreasing |
logical. Should the sort order be increasing or decreasing? |
na.last |
for controlling the treatment of |
comm |
a communicator number. |
status |
a status number. |
The distributed quick sort is implemented for this functions.
The returns are the same size of x
but in global sorting order.
All ranks may not have a NULL x
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. comm.set.seed(123456, diff = TRUE) x <- c(rnorm(5 + .comm.rank * 2), NA) # x <- sample(1:5, 5 + .comm.rank * 2, replace = TRUE) comm.end.seed() if(.comm.rank == 1){ x <- NULL ### Test for NULL or 0 vector } y <- allgather(x) comm.print(y) y <- comm.sort(x) y <- allgather(y) comm.print(y) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. comm.set.seed(123456, diff = TRUE) x <- c(rnorm(5 + .comm.rank * 2), NA) # x <- sample(1:5, 5 + .comm.rank * 2, replace = TRUE) comm.end.seed() if(.comm.rank == 1){ x <- NULL ### Test for NULL or 0 vector } y <- allgather(x) comm.print(y) y <- comm.sort(x) y <- allgather(y) comm.print(y) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global stop and warning applying on distributed data for all ranks, and are called by experts only. These functions may lead to potential performance degradation and system termination.
comm.stop(..., call. = TRUE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit) comm.warning(..., call. = TRUE, immediate. = FALSE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm) comm.warnings(..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm) comm.stopifnot(..., call. = TRUE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit)
comm.stop(..., call. = TRUE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit) comm.warning(..., call. = TRUE, immediate. = FALSE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm) comm.warnings(..., all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm) comm.stopifnot(..., call. = TRUE, domain = NULL, all.rank = .pbd_env$SPMD.CT$print.all.rank, rank.print = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm, mpi.finalize = .pbd_env$SPMD.CT$mpi.finalize, quit = .pbd_env$SPMD.CT$quit)
... |
variables to be cat. |
call. |
see stop() and warnings(). |
immediate. |
see stop() and warnings(). |
domain |
see stop() and warnings(). |
all.rank |
if all ranks print (default = FALSE). |
rank.print |
rank for printing if not all ranks print (default = 0). |
comm |
communicator for printing (default = 1). |
mpi.finalize |
if MPI should be shutdown. |
quit |
if quit R when errors happen. |
These functions will respectively apply stop()
, warning()
,
warnings()
, and stopifnot()
locally.
comm.stop()
and comm.stopifnot()
terminate all ranks,
comm.warning()
returns messages, and comm.warnings()
print the message.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. comm.warning(\"test warning.\n\") comm.warnings() comm.stop(\"test stop.\n\") comm.stopifnot(1 == 2) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. comm.warning(\"test warning.\n\") comm.warnings() comm.stop(\"test stop.\n\") comm.stopifnot(1 == 2) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
A timing function for use with parallel codes executed in the batch SPMD style.
comm.timer(timed, comm = .pbd_env$SPMD.CT$comm)
comm.timer(timed, comm = .pbd_env$SPMD.CT$comm)
timed |
expression to be timed. |
comm |
a communicator number. |
Finds the min, mean, and max execution time across all independent
processes executing the operation timed
.
Drew Schmidt.
Programming with Big Data in R Website: https://pbdr.org/
These functions are global which, which.max and which.min applying on distributed data for all ranks.
comm.which(x, arr.ind = FALSE, useNames = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.which.max(x, comm = .pbd_env$SPMD.CT$comm) comm.which.min(x, comm = .pbd_env$SPMD.CT$comm)
comm.which(x, arr.ind = FALSE, useNames = TRUE, comm = .pbd_env$SPMD.CT$comm) comm.which.max(x, comm = .pbd_env$SPMD.CT$comm) comm.which.min(x, comm = .pbd_env$SPMD.CT$comm)
x |
a 'logical' vector or array as in |
arr.ind |
logical, as in |
useNames |
logical, as in |
comm |
a communicator number. |
These functions will apply which()
, which.max()
and
which.min()
locally, and apply allgather()
to get all local
results from other ranks.
The global values (which()
, which.max()
, or which.min()
)
are returned to all ranks.
comm.which()
returns with two columns, 'rank id' and 'index of
TRUE
'.
comm.which.max()
and comm.which.min()
return with three
values, 'the _smallest_ rank id', 'index of the _first_ maximum or minimum',
and 'max/min value of x
'.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.which(a == 2) comm.print(b) b <- comm.which.max(a) comm.print(b) b <- comm.which.min(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. a <- 1:(comm.rank() + 1) b <- comm.which(a == 2) comm.print(b) b <- comm.which.max(a) comm.print(b) b <- comm.which.min(a) comm.print(b) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These functions are global writing applying on distributed data for all ranks.
comm.write(x, file = "data", ncolumns = if(is.character(x)) 1 else 5, append = FALSE, sep = " ", comm = .pbd_env$SPMD.CT$comm) comm.write.table(x, file = "", append = FALSE, quote = TRUE, sep = " ", eol = "\n", na = "NA", dec = ".", row.names = TRUE, col.names = TRUE, qmethod = c("escape", "double"), fileEncoding = "", comm = .pbd_env$SPMD.CT$comm) comm.write.csv(..., comm = .pbd_env$SPMD.CT$comm) comm.write.csv2(..., comm = .pbd_env$SPMD.CT$comm)
comm.write(x, file = "data", ncolumns = if(is.character(x)) 1 else 5, append = FALSE, sep = " ", comm = .pbd_env$SPMD.CT$comm) comm.write.table(x, file = "", append = FALSE, quote = TRUE, sep = " ", eol = "\n", na = "NA", dec = ".", row.names = TRUE, col.names = TRUE, qmethod = c("escape", "double"), fileEncoding = "", comm = .pbd_env$SPMD.CT$comm) comm.write.csv(..., comm = .pbd_env$SPMD.CT$comm) comm.write.csv2(..., comm = .pbd_env$SPMD.CT$comm)
x |
as in |
file |
as in |
ncolumns |
as in |
append |
as in |
sep |
as in |
quote |
as in |
eol |
as in |
na |
as in |
dec |
as in |
row.names |
as in |
col.names |
as in |
qmethod |
as in |
fileEncoding |
as in |
... |
as in |
comm |
a communicator number. |
These functions will apply write*()
locally and sequentially
from rank 0, 1, 2, ...
By default, rank 0 makes the file, and rest of ranks append the data.
A file will be returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
comm.load.balance()
and
comm.read.table()
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. comm.write((1:5) + comm.rank(), file = \"test.txt\") ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) if(comm.size() != 2){ comm.cat(\"2 processors are requried.\n\", quiet = TRUE) finalize() } ### Examples. comm.write((1:5) + comm.rank(), file = \"test.txt\") ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
The functions call MPI info functions.
info.create(info = .pbd_env$SPMD.CT$info) info.set(info = .pbd_env$SPMD.CT$info, key, value) info.free(info = .pbd_env$SPMD.CT$info) info.c2f(info = .pbd_env$SPMD.CT$info)
info.create(info = .pbd_env$SPMD.CT$info) info.set(info = .pbd_env$SPMD.CT$info, key, value) info.free(info = .pbd_env$SPMD.CT$info) info.c2f(info = .pbd_env$SPMD.CT$info)
info |
a info number. |
key |
a character string to be set. |
value |
a character string to be set associate with |
These functions are for internal functions. Potentially, they set information for initialization of manager and workers.
An invisible state of MPI call is returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. info.create(0L) info.set(0L, \"file\", \"appschema\") info.free(0L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. info.create(0L) info.set(0L, \"file\", \"appschema\") info.free(0L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This method lets a rank receive (nonblocking) an object from the other rank in the same communicator. The default return is the object sent from the other rank.
irecv(x.buffer = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, request = .pbd_env$SPMD.CT$request, status = .pbd_env$SPMD.CT$status)
irecv(x.buffer = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, request = .pbd_env$SPMD.CT$request, status = .pbd_env$SPMD.CT$status)
x.buffer |
a buffer to store |
rank.source |
a source rank where |
tag |
a tag number. |
comm |
a communicator number. |
request |
a request number. |
status |
a status number. |
A corresponding send()
/isend()
should be evoked at the
corresponding rank rank.source
.
Warning: irecv()
is not safe for R since R is not a thread
safe package that a dynamic returning object requires certain blocking or
barrier at some where. Current, the default method is equivalent to the
default method of recv()
.
An object is returned by default.
For calling spmd.irecv.*()
:
signature(x = "ANY")
signature(x = "integer")
signature(x = "numeric")
signature(x = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- irecv() } comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- irecv() } comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
The functions check MPI_COMM_NULL.
is.comm.null(comm = .pbd_env$SPMD.CT$comm)
is.comm.null(comm = .pbd_env$SPMD.CT$comm)
comm |
a comm number. |
These functions are for internal uses.
TRUE if input comm is MPI_COMM_NULL, otherwise FALSE.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. is.comm.null(0L) is.comm.null(1L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. is.comm.null(0L) is.comm.null(1L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This method lets a rank send (nonblocking) a object to the other
rank in the same communicator. The default return is NULL
.
isend(x, rank.dest = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, request = .pbd_env$SPMD.CT$request, check.type = .pbd_env$SPMD.CT$check.type)
isend(x, rank.dest = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, request = .pbd_env$SPMD.CT$request, check.type = .pbd_env$SPMD.CT$check.type)
x |
an object to be sent from a rank. |
rank.dest |
a rank of destination where |
tag |
a tag number. |
comm |
a communicator number. |
request |
a request number. |
check.type |
if checking data type first for handshaking. |
A corresponding recv()
or irecv()
should be evoked at the
corresponding rank rank.dest
.
See details of send()
for the arugments check.type
.
A NULL
is returned by default.
For calling spmd.isend.*()
:
signature(x = "ANY")
signature(x = "integer")
signature(x = "numeric")
signature(x = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- isend(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- isend(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
The function set/get a point address in R where the point point to a structure containing MPI arrays.
arrange.mpi.apts()
arrange.mpi.apts()
Since Rmpi/pbdMPI pre-allocate memory to store comm, status, datatype, info, request, this function provides a variable in R to let different APIs share the same memory address.
If the package loads first, then this sets '.__MPI_APTS__
'
in the .GlobalEnv
of R. If the package does not load before other
MPI APIs, then this gives a structure pointer to external memory
according to '.__MPI_APTS__
', i.e. allocated by other MPI APIs.
pbdMPI/R/arrange.mpi.apts
provides the R code, and
pbdMPI/src/pkg_*.*
provides the details of this call.
'.__MPI_APTS__
' is set in .GlobalEnv
of R.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### See source code for the details. ## End(Not run)
## Not run: ### See source code for the details. ## End(Not run)
These functions are designed to get or print MPI_COMM pointer and its address when the SPMD code in R be a foreign application of other applications.
get.mpi.comm.ptr(comm = .pbd_env$SPMD.CT$comm, show.msg = FALSE) addr.mpi.comm.ptr(comm.ptr)
get.mpi.comm.ptr(comm = .pbd_env$SPMD.CT$comm, show.msg = FALSE) addr.mpi.comm.ptr(comm.ptr)
comm |
a communicator number. |
comm.ptr |
a communicator pointer. |
show.msg |
if showing message for debug only. |
get.mpi.comm.ptr()
returns an R external pointer that points
to the address of the comm.
addr.mpi.comm.ptr()
takes the R external points, and
prints the address of the comm. This function is mainly for debugging.
get.mpi.comm.ptr()
returns an R external pointer.
addr.mpi.comm.ptr()
prints the comm pointer address and the
address of MPI_COMM_WORLD.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
### Save code in a file "demo.r" and run with 22processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ptr1 <- get.mpi.comm.ptr(1, show.msg = TRUE) addr.mpi.comm.ptr(ptr1) comm.split(color = as.integer(comm.rank()/2), key = comm.rank()) ptr1.new <- get.mpi.comm.ptr(1, show.msg = TRUE) addr.mpi.comm.ptr(ptr1.new) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 22processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ptr1 <- get.mpi.comm.ptr(1, show.msg = TRUE) addr.mpi.comm.ptr(ptr1) comm.split(color = as.integer(comm.rank()/2), key = comm.rank()) ptr1.new <- get.mpi.comm.ptr(1, show.msg = TRUE) addr.mpi.comm.ptr(ptr1.new) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
The functions call MPI probe functions.
probe(rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status) iprobe(rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
probe(rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status) iprobe(rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
rank.source |
a source rank where an object sent from. |
tag |
a tag number. |
comm |
a communicator number. |
status |
a status number. |
These functions are for internal functions. Potentially, they set/get probe for receiving data.
An invisible state of MPI call is returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### See source code of spmd.recv.default() for an example. ## End(Not run)
## Not run: ### See source code of spmd.recv.default() for an example. ## End(Not run)
This method lets a rank receive (blocking) an object from the other rank in the same communicator. The default return is the object sent from the other rank.
recv(x.buffer = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status, check.type = .pbd_env$SPMD.CT$check.type)
recv(x.buffer = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status, check.type = .pbd_env$SPMD.CT$check.type)
x.buffer |
a buffer to store |
rank.source |
a source rank where |
tag |
a tag number. |
comm |
a communicator number. |
status |
a status number. |
check.type |
if checking data type first for handshaking. |
A corresponding send()
should be evoked at the corresponding rank
rank.source
.
These are high level S4 methods. By default, check.type
is TRUE
and an additional send()/recv()
will make a handshaking call first,
then deliver the data next.
i.e. an integer vector of length two (type and length)
will be deliver first between send()
and recv()
to ensure
a buffer (of right type and right size/length) is properly allocated at
the rank.dest
side.
Currently, four data types are considered: integer
, double
,
raw/byte
, and default/raw.object
. The default method will
make a serialize()
call first to convert the general R object into
a raw
vector before sending it away. After the raw
vector
is received at the rank.dest
side, the vector will be
unserialize()
back to the R object format.
check.type
set as FALSE
will stop the additional
handhsaking call, but the buffer should be prepared carefully
by the user self. This is typically for the advanced users and
more specifically calls are needed. i.e. calling those
spmd.send.integer
with spmd.recv.integer
correspondingly.
check.type
also needs to be set as FALSE
for more efficient
calls such as isend()/recv()
or send()/irecv()
. Currently,
no check types are implemented in those mixed calls.
An object is returned by default and the buffer will be overwritten implicitely.
For calling spmd.recv.*()
:
signature(x = "ANY")
signature(x = "integer")
signature(x = "numeric")
signature(x = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
This method lets a rank receive a reduction of objects from every rank in the same communicator based on a given operation. The default return is an object as the input.
reduce(x, x.buffer = NULL, op = .pbd_env$SPMD.CT$op, rank.dest = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
reduce(x, x.buffer = NULL, op = .pbd_env$SPMD.CT$op, rank.dest = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
x |
an object to be gathered from all ranks. |
x.buffer |
a buffer to hold the return object which probably has
|
op |
a reduction operation applied on combine all |
rank.dest |
a rank of destination where all |
comm |
a communicator number. |
By default, the object is reduced to .pbd_env$SPMD.CT$rank.source
,
i.e. rank 0L.
All x
on all ranks are likely presumed to have the same size and type.
x.buffer
can be NULL
or unspecified. If specified, the type
should be either integer or double specified correctly according to the
type of x
.
See methods{"reduce"}
for S4 dispatch cases and the source code for
further details.
The reduced object of the same type as x
is returned by default.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
allgather()
, gather()
, reduce()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initial. suppressMessages(library(pbdMPI, quietly = TRUE)) init() .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- reduce(matrix(x, nrow = 1), op = \"sum\") comm.print(y) y <- reduce(x, double(N), op = \"prod\") comm.print(y) x <- as.logical(round(runif(N))) y <- reduce(x, logical(N), op = \"land\") comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initial. suppressMessages(library(pbdMPI, quietly = TRUE)) init() .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank y <- reduce(matrix(x, nrow = 1), op = \"sum\") comm.print(y) y <- reduce(x, double(N), op = \"prod\") comm.print(y) x <- as.logical(round(runif(N))) y <- reduce(x, logical(N), op = \"land\") comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code = spmd.code, nranks = 2L)
This method lets a rank scatter objects to every rank in the same communicator. The default input is a list of length equal to ‘comm size’ and the default return is an element of the list.
scatter(x, x.buffer = NULL, x.count = NULL, displs = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
scatter(x, x.buffer = NULL, x.count = NULL, displs = NULL, rank.source = .pbd_env$SPMD.CT$rank.source, comm = .pbd_env$SPMD.CT$comm)
x |
an object of length ‘comm size’ to be scattered to all ranks. |
x.buffer |
a buffer to hold the return object which probably has
'size of element of |
x.count |
a vector of length ‘comm size’ containing all object lengths. |
displs |
|
rank.source |
a rank of source where elements of |
comm |
a communicator number. |
All elements of x
are likely presumed to have the same size and type.
x.buffer
, x.count
, and displs
can be NULL
or
unspecified. If specified, the type should be one of integer, double, or
raw specified correctly according to the type of x
.
If x.count
is specified, then the spmd.scatterv.*()
is
called.
An element of x
is returned according to the rank id.
For calling spmd.scatter.*()
:
signature(x = "ANY", x.buffer = "missing", x.count = "missing")
signature(x = "integer", x.buffer = "integer", x.count = "missing")
signature(x = "numeric", x.buffer = "numeric", x.count = "missing")
signature(x = "raw", x.buffer = "raw", x.count = "missing")
For calling spmd.scatterv.*()
:
signature(x = "ANY", x.buffer = "missing", x.count = "integer")
signature(x = "ANY", x.buffer = "ANY", x.count = "integer")
signature(x = "integer", x.buffer = "integer", x.count = "integer")
signature(x = "numeric", x.buffer = "numeric", x.count = "integer")
signature(x = "raw", x.buffer = "raw", x.count = "integer")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
bcast()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- split(1:(N * .comm.size), rep(1:.comm.size, N)) y <- scatter(lapply(x, matrix, nrow = 1)) comm.print(y) y <- scatter(x, double(N)) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- split(1:(N * .comm.size), rep(1:.comm.size, N)) y <- scatter(lapply(x, matrix, nrow = 1)) comm.print(y) y <- scatter(x, double(N)) comm.print(y) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
These functions control the parallel-capable L'Ecuyer-CMRG pseudo-random number generator (RNG) on clusters and in multicore parallel applications for reproducible results. Reproducibility is possible across different node and core configurations by associating the RNG streams with an application vector.
comm.set.seed( seed = NULL, diff = TRUE, state = NULL, streams = NULL, comm = .pbd_env$SPMD.CT$comm ) comm.set.stream( name = NULL, reset = FALSE, state = NULL, comm = .pbd_env$SPMD.CT$comm ) comm.get.streams( comm = .pbd_env$SPMD.CT$comm, seed = FALSE )
comm.set.seed( seed = NULL, diff = TRUE, state = NULL, streams = NULL, comm = .pbd_env$SPMD.CT$comm ) comm.set.stream( name = NULL, reset = FALSE, state = NULL, comm = .pbd_env$SPMD.CT$comm ) comm.get.streams( comm = .pbd_env$SPMD.CT$comm, seed = FALSE )
seed |
In |
diff |
Logical indicating if the parallel instances should have different random streams. |
state |
In function |
streams |
An vector of sequential integers specifying the streams to be prepared on the current rank. Typically, this is used by 'comm.chunk()' to prepare correct streams for each rank, which are aligned with the vector being chunk-ed. |
name |
Stream number that is coercible to character, indicating to start or continue generating from that stream. |
reset |
If true, reset the requested stream back to its beginning. |
comm |
The communicator that determines MPI rank numbers. |
This implementation uses the function nextRNGStream
in package parallel
to set up streams appropriate for working on a
cluster system with MPI. The main difference from parallel
is
that it adds a reproducibility capability with vector-based
streams that works across different numbers of nodes or cores by associating
streams with an application vector.
Vector-based streams are best set up with the higher level function
comm.chunk
instead of using comm.set.stream
directly.
comm.chunk
will set up only the streams that each rank needs
and provides the stream numbers necessary to switch between them with
comm.set.stream
.
The function uses parallel
's
nextRNGStream()
and sets up the parallel stream seeds in the
.pbd_env$RNG
environment, which are then managed with
comm.set.stream
. There is only one communication broadcast in
this implementation that ensures all ranks have the same seed as rank 0.
Subsequently, each rank maintains only its own streams.
When rank-based streams are set up, comm.chunk
with
form = "number"
and rng = TRUE
parameters, streams are
different for each rank and switching is not needed. Vector-based streams
are obtained with form = "vector"
and rng = TRUE
parameters.
In this latter case, the vector returned to each
rank contains the stream numbers (and vector components) that the rank owns.
Switch with comm.set.stream(v)
, where v is one of the stream numbers.
Switching back and forth is allowed, with each stream continuing where it
left off.
## RNG Notes
R sessions connected by MPI begin like other R sessions as discussed in
Random
. On first use of random number generation,
each rank computes its own seed from a combination of clock time and process
id (unless it reads a previously saved workspace, which is not recommended).
Because of asynchronous execution, imperfectly synchronized node clocks,
and likely different process ids, this
almost guarantees unique seeds and most likely results in independent
streams. However, this is not reproducible and not guaranteed. Both
reproducibility and guarantee are brought by the use of the L'Ecuyer-CMRG
generator implementation in nextRNGStream
and the
use of comm.set.seed
and comm.set.stream
adaptation for
parallel computing on cluster systems.
At a high level, the L'Ecuyer-CMRG pseudo-random number generator can
take jumps (advance the seed) in its
stream (about 2^191 long) so that distant substreams can be assigned. The
nextRNGStream
implementation takes jumps of 2^127
(about 1.7e38) to provide up to 2^64 (about 1.8e19) independent streams. See
https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf
for more details.
In situations that require the same stream on all ranks, a simple
set.seed
from base R and the default RNG will suffice.
comm.set.seed
will also accomplish this with the diff = FALSE
parameter if switching between same and different streams is needed.
comm.set.seed
engages the L'Ecuyer-CMRG RNG and invisibly returns
the previous RNG in use (Output of RNGkind()[1]). Capturing it, enables
the restoration of the previous RNG with RNGkind
. See examples
of use in demo/seed_rank.r
and demo/seed_vec.r
.
comm.set.stream
invisibly returns the current stream number as character.
comm.get.streams
returns the current stream name and other stream names available to the rank as a character string. Optionally, the local
.Random.seed
is included. This function is a debugging aid for
distributed random streams.
All three functions manage and use the environment .pbd_env$RNG
.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Pierre L'Ecuyer, Simard, R., Chen, E.J., and Kelton, W.D. (2002) An Object-Oriented Random-Number Package with Many Long Streams and Substreams. Operations Research, 50(6), 1073-1075.
https://www.iro.umontreal.ca/~lecuyer/myftp/papers/streams00.pdf
Programming with Big Data in R Website: https://pbdr.org/
comm.chunk()
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " suppressMessages(library(pbdMPI, quietly = TRUE)) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) set.seed(1357) comm.print(runif(5), all.rank = TRUE) old.kind = comm.set.seed(1357) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) comm.set.stream(reset = TRUE) comm.print(runif(5), all.rank = TRUE) comm.set.seed(1357, diff = TRUE) comm.print(runif(5), all.rank = TRUE) state <- comm.set.stream() ### save each rank's stream state comm.print(runif(5), all.rank = TRUE) comm.set.stream(state = state) ### set current RNG to state comm.print(runif(5), all.rank = TRUE) RNGkind(old.kind) set.seed(1357) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " suppressMessages(library(pbdMPI, quietly = TRUE)) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) set.seed(1357) comm.print(runif(5), all.rank = TRUE) old.kind = comm.set.seed(1357) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) comm.set.stream(reset = TRUE) comm.print(runif(5), all.rank = TRUE) comm.set.seed(1357, diff = TRUE) comm.print(runif(5), all.rank = TRUE) state <- comm.set.stream() ### save each rank's stream state comm.print(runif(5), all.rank = TRUE) comm.set.stream(state = state) ### set current RNG to state comm.print(runif(5), all.rank = TRUE) RNGkind(old.kind) set.seed(1357) comm.print(RNGkind()) comm.print(runif(5), all.rank = TRUE) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This method lets a rank send (blocking) an object to the other rank
in the same communicator. The default return is NULL
.
send(x, rank.dest = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, check.type = .pbd_env$SPMD.CT$check.type)
send(x, rank.dest = .pbd_env$SPMD.CT$rank.dest, tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, check.type = .pbd_env$SPMD.CT$check.type)
x |
an object to be sent from a rank. |
rank.dest |
a rank of destination where |
tag |
a tag number. |
comm |
a communicator number. |
check.type |
if checking data type first for handshaking. |
A corresponding recv()
should be evoked at the corresponding rank
rank.dest
.
These are high level S4 methods. By default, check.type
is TRUE
and an additional send()/recv()
will make a handshaking call first,
then deliver the data next.
i.e. an integer vector of length two (type and length)
will be deliver first between send()
and recv()
to ensure
a buffer (of right type and right size/length) is properly allocated at
the rank.dest
side.
Currently, four data types are considered: integer
, double
,
raw/byte
, and default/raw.object
. The default method will
make a serialize()
call first to convert the general R object into
a raw
vector before sending it away. After the raw
vector
is received at the rank.dest
side, the vector will be
unserialize()
back to the R object format.
check.type
set as FALSE
will stop the additional
handhsaking call, but the buffer should be prepared carefully
by the user self. This is typically for the advanced users and
more specifically calls are needed. i.e. calling those
spmd.send.integer
with spmd.recv.integer
correspondingly.
check.type
also needs to be set as FALSE
for more efficient
calls such as isend()/recv()
or send()/irecv()
. Currently,
no check types are implemented in those mixed calls.
A NULL
is returned by default.
For calling spmd.send.*()
:
signature(x = "ANY")
signature(x = "integer")
signature(x = "numeric")
signature(x = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ y <- send(matrix(x, nrow = 1)) } else if(.comm.rank == 1){ y <- recv() } comm.print(y, rank.print = 1) ### Finish. finalize() " pbdMPI::execmpi(spmd.code, nranks = 2L)
This method lets a rank send an object to the other rank and
receive an object from another rank in the same communicator.
The default return is x
.
sendrecv(x, x.buffer = NULL, rank.dest = (comm.rank(.pbd_env$SPMD.CT$comm) + 1) %% comm.size(.pbd_env$SPMD.CT$comm), send.tag = .pbd_env$SPMD.CT$tag, rank.source = (comm.rank(.pbd_env$SPMD.CT$comm) - 1) %% comm.size(.pbd_env$SPMD.CT$comm), recv.tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
sendrecv(x, x.buffer = NULL, rank.dest = (comm.rank(.pbd_env$SPMD.CT$comm) + 1) %% comm.size(.pbd_env$SPMD.CT$comm), send.tag = .pbd_env$SPMD.CT$tag, rank.source = (comm.rank(.pbd_env$SPMD.CT$comm) - 1) %% comm.size(.pbd_env$SPMD.CT$comm), recv.tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
x |
an object to be sent from a rank. |
x.buffer |
a buffer to store |
rank.dest |
a rank of destination where |
send.tag |
a send tag number. |
rank.source |
a source rank where |
recv.tag |
a receive tag number. |
comm |
a communicator number. |
status |
a status number. |
A corresponding sendrecv()
should be evoked at the corresponding ranks
rank.dest
and rank.source
.
rank.dest
and rank.source
can be as.integer(NULL)
to
create a silent sendrecv operation which is more efficient than setting
rank.dest
and rank.source
to be equal.
A x
is returned by default.
For calling spmd.sendrecv.*()
:
signature(x = "ANY", x.buffer = "ANY")
signature(x = "integer", x.buffer = "integer")
signature(x = "numeric", x.buffer = "numeric")
signature(x = "raw", x.buffer = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.size y <- sendrecv(matrix(x, nrow = 1)) comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.size y <- sendrecv(matrix(x, nrow = 1)) comm.print(y, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This method lets a rank send an object to the other rank and
receive an object from another rank in the same communicator.
The default return is x
.
sendrecv.replace(x, rank.dest = (comm.rank(.pbd_env$SPMD.CT$comm) + 1) %% comm.size(.pbd_env$SPMD.CT$comm), send.tag = .pbd_env$SPMD.CT$tag, rank.source = (comm.rank(.pbd_env$SPMD.CT$comm) - 1) %% comm.size(.pbd_env$SPMD.CT$comm), recv.tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
sendrecv.replace(x, rank.dest = (comm.rank(.pbd_env$SPMD.CT$comm) + 1) %% comm.size(.pbd_env$SPMD.CT$comm), send.tag = .pbd_env$SPMD.CT$tag, rank.source = (comm.rank(.pbd_env$SPMD.CT$comm) - 1) %% comm.size(.pbd_env$SPMD.CT$comm), recv.tag = .pbd_env$SPMD.CT$tag, comm = .pbd_env$SPMD.CT$comm, status = .pbd_env$SPMD.CT$status)
x |
an object to be sent from a rank. |
rank.dest |
a rank of destination where |
send.tag |
a send tag number. |
rank.source |
a source rank where |
recv.tag |
a receive tag number. |
comm |
a communicator number. |
status |
a status number. |
A corresponding sendrecv.replace()
should be evoked at the
corresponding ranks rank.dest
and rank.source
.
rank.dest
and rank.source
can be as.integer(NULL)
to
create a silent sendrecv operation which is more efficient than setting
rank.dest
and rank.source
to be equal.
Warning: sendrecv.replace()
is not safe for R since R is
not a thread safe package that a dynamic returning object requires certain
blocking or barrier at some where. The replaced object or memory address
‘MUST’ return correctly. This is almost equivalent to sendrecv()
.
A x
is returned by default.
For calling spmd.sendrecv.replace.*()
:
signature(x = "ANY")
signature(x = "integer")
signature(x = "numeric")
signature(x = "raw")
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
sendrecv()
.
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.size x <- sendrecv.replace(matrix(x, nrow = 1)) comm.print(x, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.size x <- sendrecv.replace(matrix(x, nrow = 1)) comm.print(x, rank.print = 1) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
This is an advanced function to set pbdR options.
pbd_opt(..., bytext = "", envir = .GlobalEnv)
pbd_opt(..., bytext = "", envir = .GlobalEnv)
... |
in argument format |
bytext |
in text format |
envir |
by default the global environment is used. |
...
allows multiple options in
envir$.pbd_env
, but only in a simple way.
bytext
allows to assign options by text in
envir$.pbd_env
, but can assign advanced objects. For example,
"option$suboption <- value"
will set
envir$.pbd_env$option$suboption <- value
.
No value is returned.
Wei-Chen Chen [email protected] and Drew Schmidt.
Programming with Big Data in R Website: https://pbdr.org/
.pbd_env
,
SPMD.CT()
,
SPMD.OP()
,
SPMD.IO()
,
SPMD.TP()
, and
.mpiopt_init()
.
## Not run: ### Save code in a file "demo.r" and run with 4 processors by ### SHELL> mpiexec -np 4 Rscript demo.r ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. ls(.pbd_env) pbd_opt(ICTXT = c(2, 2)) pbd_opt(bytext = "grid.new <- list(); grid.new$ICTXT <- c(4, 4)") pbd_opt(BLDIM = c(16, 16), bytext = "grid.new$BLDIM = c(8, 8)") ls(.pbd_env) .pbd_env$ICTXT .pbd_env$BLDIM .pbd_env$grid.new ### Finish. finalize() ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 4 processors by ### SHELL> mpiexec -np 4 Rscript demo.r ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) ### Examples. ls(.pbd_env) pbd_opt(ICTXT = c(2, 2)) pbd_opt(bytext = "grid.new <- list(); grid.new$ICTXT <- c(4, 4)") pbd_opt(BLDIM = c(16, 16), bytext = "grid.new$BLDIM = c(8, 8)") ls(.pbd_env) .pbd_env$ICTXT .pbd_env$BLDIM .pbd_env$grid.new ### Finish. finalize() ## End(Not run)
The functions extract MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_status.source and MPI_status.tag.
anysource() anytag() get.sourcetag(status = .pbd_env$SPMD.CT$status)
anysource() anytag() get.sourcetag(status = .pbd_env$SPMD.CT$status)
status |
a status number. |
These functions are for internal uses.
Corresponding status will be returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() if(.comm.size < 2) comm.stop(\"At least two processors are requried.\") ### Examples. if(.comm.rank != 0){ send(as.integer(.comm.rank * 10), rank.dest = 0L, tag = as.integer(.comm.rank + 10)) } if(.comm.rank == 0){ for(i in 1:(.comm.size - 1)){ ret <- recv(x.buffer = integer(1), rank.source = anysource(), tag = anytag()) sourcetag <- get.sourcetag() print(c(sourcetag, ret)) } } ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() if(.comm.size < 2) comm.stop(\"At least two processors are requried.\") ### Examples. if(.comm.rank != 0){ send(as.integer(.comm.rank * 10), rank.dest = 0L, tag = as.integer(.comm.rank + 10)) } if(.comm.rank == 0){ for(i in 1:(.comm.size - 1)){ ret <- recv(x.buffer = integer(1), rank.source = anysource(), tag = anytag()) sourcetag <- get.sourcetag() print(c(sourcetag, ret)) } } ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
These variables provide default values for most functions in the package.
The environment .pbd_env
contains several objects with parameters for
communicators and methods.
The elements of .pbd_env$SPMD.CT
are default values for various
controls
Elements | Default | Meaning |
comm |
0L | communicator index |
intercomm |
2L | inter communicator index |
info |
0L | info index |
newcomm |
1L | new communicator index |
op |
"sum" | the operation |
port.name |
"spmdport" | the operation |
print.all.rank |
FALSE | whether all ranks print message |
print.quiet |
FALSE | whether rank is added to print/cat |
rank.root |
0L | the rank of root |
rank.source |
0L | the rank of source |
rank.dest |
1L | the rank of destination |
request |
0L | the request index |
serv.name |
"spmdserv" | the service name |
status |
0L | the status index |
tag |
0L | the tag number |
unlist |
FALSE | whether to unlist a return |
divide.method |
"block" | default method for jid |
mpi.finalize |
TRUE | shutdown MPI on finalize() |
quit |
TRUE | quit when errors occur |
msg.flush |
TRUE | flush each comm.cat/comm.print |
msg.barrier |
TRUE | include barrier in comm.cat/comm.print |
Rprof.all.rank |
FALSE | call Rprof on all ranks |
lazy.check |
TRUE | use lazy check on all ranks |
The elements of .pbd_env$SPMD.OP
list the implemented operations for
reduce()
and allreduce()
. Currently, implemented operations are
"sum", "prod", "max", "min", "land", "band", "lor", "bor", "lxor", "bxor".
The elements of .SPMD.IO
are default values for functions in
comm_read.r
and comm_balance.r
.
Elements | Default | Meaning |
max.read.size |
5.2e6 | max of reading size (5 MB) |
max.test.lines |
500 | max of testing lines |
read.method |
"gbd" | default reading method |
balance.method |
"block" | default load balance method |
where balance.method
is only used for "gbd" reading method when
nrows = -1
and skip = 0
are set.
The elements of .pbd_env$SPMD.TP
are default values for
task pull settings
Elements | Default | Meaning |
bcast |
FALSE | whether to bcast() objects to all ranks |
barrier |
TRUE | if call barrier() for all ranks |
try |
TRUE | if use try() in works |
try.silent |
FALSE | if silent the try() message |
See task.pull() for details.
|
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
These sets of controls are used to provide default values in this package. The values are not supposed to be changed in general.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
These functions are designed for SPMD but assume that rank 0 is a manager and the rest are workers.
task.pull(jids, FUN, ..., rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = .pbd_env$SPMD.TP$bcast, barrier = .pbd_env$SPMD.TP$barrier, try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent) task.pull.workers(FUN = function(jid, ...){ return(jid) }, ..., rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent) task.pull.manager(jids, rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm)
task.pull(jids, FUN, ..., rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, bcast = .pbd_env$SPMD.TP$bcast, barrier = .pbd_env$SPMD.TP$barrier, try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent) task.pull.workers(FUN = function(jid, ...){ return(jid) }, ..., rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm, try = .pbd_env$SPMD.TP$try, try.silent = .pbd_env$SPMD.TP$try.silent) task.pull.manager(jids, rank.manager = .pbd_env$SPMD.CT$rank.root, comm = .pbd_env$SPMD.CT$comm)
jids |
all job ids (a vector of positive integers). |
FUN |
a function to be evaluated by workers. |
... |
extra parameters for |
rank.manager |
rank of the manager from where |
comm |
a communicator number. |
bcast |
if bcast to all ranks. |
barrier |
if barrier for all ranks. |
try |
wheter to use |
try.silent |
turn off error messages from |
All of these functions are designed to emulate a manager/workers paradigm in an SPMD environment. If your chunk workloads are known and similar, consider a direct SPMD solution.
FUN
is a user defined function which has jid
as
its first argument and other variables are given in ...
.
The manager will be queried by workers whenever a worker finishes a job to see if more jobs are available.
A list with length comm.size() - 1
will be returned to the manager and NULL
to the workers.
Each element of the list contains the returns ret
of their FUN
results.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
get.jid()
.
## Not run: ### Under command mode, run the demo with 2 processors by ### (Use Rscript.exe for windows system) # mpiexec -np 2 Rscript -e "demo(task_pull,'pbdMPI',ask=F,echo=F)" ### Or # execmpi("demo(task_pull,'pbdMPI',ask=F,echo=F)", nranks = 2L) ## End(Not run)
## Not run: ### Under command mode, run the demo with 2 processors by ### (Use Rscript.exe for windows system) # mpiexec -np 2 Rscript -e "demo(task_pull,'pbdMPI',ask=F,echo=F)" ### Or # execmpi("demo(task_pull,'pbdMPI',ask=F,echo=F)", nranks = 2L) ## End(Not run)
This function basically saves code in a spmd.file and executes
MPI via R's system call e.g.
system("mpiexec -np 1 Rscript spmd.file")
.
execmpi(spmd.code = NULL, spmd.file = NULL, mpicmd = NULL, nranks = 1L, rscmd = NULL, verbose = TRUE, disable.current.mpi = TRUE, mpiopt = NULL, rsopt = NULL) runmpi(spmd.code = NULL, spmd.file = NULL, mpicmd = NULL, nranks = 1L, rscmd = NULL, verbose = TRUE, disable.current.mpi = TRUE, mpiopt = NULL, rsopt = NULL)
execmpi(spmd.code = NULL, spmd.file = NULL, mpicmd = NULL, nranks = 1L, rscmd = NULL, verbose = TRUE, disable.current.mpi = TRUE, mpiopt = NULL, rsopt = NULL) runmpi(spmd.code = NULL, spmd.file = NULL, mpicmd = NULL, nranks = 1L, rscmd = NULL, verbose = TRUE, disable.current.mpi = TRUE, mpiopt = NULL, rsopt = NULL)
spmd.code |
SPMD code to be run via mpicmd and |
spmd.file |
a file contains SPMD code to be run via mpicmd and |
mpicmd |
MPI executable command. If |
nranks |
number of processes to run the SPMD code envoked by mpicmd. |
rscmd |
|
verbose |
print SPMD code outputs and MPI messages. |
disable.current.mpi |
force to finalize the current MPI comm if any, for unix-alike system only. |
mpiopt |
MPI options appended after |
rsopt |
|
When the spmd.code
is NULL
: The code should be already
saved in the file named spmd.file
for using.
When the spmd.code
is not NULL
:
The spmd.code
will be dumped to a temp file (spmd.file
) via the
call writeLines(spmd.code, conn)
where
conn <- file(spmd.file, open = "wt")
. The file will be closed after
the dumping.
When spmd.file
is ready (either dumped from spmd.code
or
provided by the user), the steps below will be followed:
If spmd.file = NULL
, then a temporary file will be generated and
used to dump spmd.code
.
For Unix-alike systems, the command
cmd <- paste(mpicmd, "-np", nranks, mpiopt, rscmd, rscmd spmd.file, ">", log.file, " 2>&1 & echo \"PID=$!\" &")
is executed via system(cmd, intern = TRUE, wait = FALSE, ignore.stdout = TRUE, ignore.stderr = TRUE)
. The log.file
is a temporary file to
save the outputs from the spmd.code
. The results saved to the
log.file
will be read back in and cat
and return
to R.
For OPENMPI, the "–oversubscribe " is added before mpiopt
as
mpiopt <- paste("--oversubscribe ", mpiopt, sep = "")
and is passed to cmd
thereon.
For Windows, the cmd
will be
paste(mpicmd, "-np", nranks, mpiopt, rscmd, rsopt spmd.file)
and is executed via
system(cmd, intern = TRUE, wait = FALSE, ignore.stdout = TRUE, ignore.stderr = TRUE)
.
Basically, only the PID of the MPI job (in background) will be returned in Linux-alike systems. For Windows, the MPI job is always wait until it is complete.
For Unix-alike systems,
in new R and MPI, the pbdMPI::execmpi(...)
may
carry the current MPI comm
into system(cmd, ...)
calls.
Because the comm
has been established/loaded by the
init()
call because of ::
,
the mpiexec
inside the system(cmd, ...)
calls
will be confused with the exist comm
.
Consider that pbdMPI::execmpi(...)
is typically called in
interactive mode (or actually only done for CRAN check in most case),
an argument disable.current.mpi = TRUE
is added/needed to finalize
the existing comm
first before system(cmd, ...)
be executed.
This function is NOT recommended for running SPMD programs. The recommended way is to run under shell command.
Wei-Chen Chen [email protected] and Drew Schmidt.
Programming with Big Data in R Website: https://pbdr.org/
pbdCS::pbdRscript()
.
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.file <- tempfile() cat(" suppressMessages(library(pbdMPI, quietly = TRUE)) allreduce(2) finalize() ", file = spmd.file) pbdMPI::execmpi(spmd.file = spmd.file, nranks = 2L)
### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.file <- tempfile() cat(" suppressMessages(library(pbdMPI, quietly = TRUE)) allreduce(2) finalize() ", file = spmd.file) pbdMPI::execmpi(spmd.file = spmd.file, nranks = 2L)
The functions call MPI wait functions.
wait(request = .pbd_env$SPMD.CT$request, status = .pbd_env$SPMD.CT$status) waitany(count, status = .pbd_env$SPMD.CT$status) waitsome(count) waitall(count)
wait(request = .pbd_env$SPMD.CT$request, status = .pbd_env$SPMD.CT$status) waitany(count, status = .pbd_env$SPMD.CT$status) waitsome(count) waitall(count)
request |
a request number. |
status |
a status number. |
count |
a count number. |
These functions are for internal uses. Potentially, they wait after some nonblocking MPI calls.
An invisible state of MPI call is returned.
Wei-Chen Chen [email protected], George Ostrouchov, Drew Schmidt, Pragneshkumar Patel, and Hao Yu.
Programming with Big Data in R Website: https://pbdr.org/
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ isend(list(x)) } if(.comm.rank == 1){ y <- irecv(list(x)) } wait() comm.print(y, rank.print = 1L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)
## Not run: ### Save code in a file "demo.r" and run with 2 processors by ### SHELL> mpiexec -np 2 Rscript demo.r spmd.code <- " ### Initialize suppressMessages(library(pbdMPI, quietly = TRUE)) .comm.size <- comm.size() .comm.rank <- comm.rank() ### Examples. N <- 5 x <- (1:N) + N * .comm.rank if(.comm.rank == 0){ isend(list(x)) } if(.comm.rank == 1){ y <- irecv(list(x)) } wait() comm.print(y, rank.print = 1L) ### Finish. finalize() " # execmpi(spmd.code, nranks = 2L) ## End(Not run)