Install the clustermq
package in R from CRAN. This will
automatically detect if ZeroMQ is installed and
otherwise use the bundled library:
# Recommended:
# If your system has `libzmq` installed but you want to enable the worker
# crash monitor, set the environment variable below to use the bundled
# `libzmq` library with the required feature (`-DZMQ_BUILD_DRAFT_API=1`):
# Sys.setenv(CLUSTERMQ_USE_SYSTEM_LIBZMQ=0)
install.packages("clustermq")
Alternatively you can use the remotes
package to install
directly from Github. Note that this version needs
autoconf
/automake
and CMake
for
compilation:
# Sys.setenv(CLUSTERMQ_USE_SYSTEM_LIBZMQ=0)
# install.packages('remotes')
remotes::install_github("mschubert/clustermq")
# remotes::install_github("mschubert/clustermq@develop") # dev version
In the develop
branch, we will introduce code changes and new features. These may
contain bugs, poor documentation, or other inconveniences. This branch
may not install at times. However, feedback is
very welcome.
For any installation issues please see the FAQ.
An HPC cluster’s scheduler ensures that computing jobs are
distributed to available worker nodes. Hence, this is what
clustermq
interfaces with in order to do computations.
By default, we will take whichever scheduler we find and fall back on local processing. This will work in most, but not all cases. You may need to configure your scheduler.
To set up a scheduler explicitly, see the following links:
options(clustermq.scheduler="PBS"/"Torque")
You may in addition need to activate compute
environments or containers if your shell (e.g.
~/.bashrc
) does not do this automatically.
Check the FAQ
if your job submission/call to Q
errors or gets stuck.
While this is not the main focus of the package, you can use it to parallelize function calls locally on multiple cores or processes. This can also be useful to test your code on a subset of the data before submitting it to a scheduler.
callr
package to run and manage multiple parallel R processes with
options(clustermq.scheduler="multiprocess")
parallel
package to fork the
current R process into multiple threads with
options(clustermq.scheduler="multicore")
. This sometimes
causes problems (macOS, RStudio) and is not available on Windows.There are reasons why you might prefer to not to work on the computing cluster directly but rather on your local machine instead. RStudio is an excellent local IDE, it’s more responsive than and feature-rich than browser-based solutions (RStudio server, Project Jupyter), and it avoids X forwarding issues when you want to look at plots you just made.
Using this setup, however, you lost access to the computing cluster.
Instead, you had to copy your data there, and then submit individual
scripts as jobs, aggregating the data in the end again.
clustermq
is trying to solve this by providing a
transparent SSH interface.
In order to use clustermq
from your local machine, the
package needs to be installed on both there and on the computing
cluster. On the computing cluster, set up
your scheduler and make sure clustermq
runs there
without problems. Note that the remote scheduler can not be
LOCAL
(default if no HPC scheduler found) or
SSH
for this to work.
# If this is set to 'LOCAL' or 'SSH' you will get the following error:
# Expected PROXY_READY, received ‘PROXY_ERROR: Remote SSH QSys is not allowed’
options(
clustermq.scheduler = "multiprocess" # or multicore, LSF, SGE, Slurm etc.
)
On your local machine, add the following options:
options(
clustermq.scheduler = "ssh",
clustermq.ssh.host = "user@host", # use your user and host, obviously
clustermq.ssh.log = "~/cmq_ssh.log" # log for easier debugging
)
We recommend that you set up SSH keys for password-less login.
Q
functionThe following arguments are supported by Q
:
fun
- The function to call. This needs to be
self-sufficient (because it will not have access to the
master
environment)...
- All iterated arguments passed to the function. If
there is more than one, all of them need to be namedconst
- A named list of non-iterated arguments passed
to fun
export
- A named list of objects to export to the
worker environmentBehavior can further be fine-tuned using the options below:
fail_on_error
- Whether to stop if one of the calls
returns an errorseed
- A common seed that is combined with job number
for reproducible resultsmemory
- Amount of memory to request for the job
(bsub -M
)n_jobs
- Number of jobs to submit for all the function
callsjob_size
- Number of function calls per job. If used in
combination with n_jobs
the latter will be overall
limitchunk_size
- How many calls a worker should process
before reporting back to the master. Default: every worker will report
back 100 times totalThe full documentation is available by typing ?Q
.
The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.
The simplest example is to a function call that is completely
self-sufficient, and there is one argument (x
) that we
iterate through:
fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 2
#>
#> [[2]]
#> [1] 4
#>
#> [[3]]
#> [1] 6
Non-iterated arguments are supported by the const
argument:
fx = function(x, y) x * 2 + y
Q(fx, x=1:3, const=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If a function relies on objects in its environment that are not
passed as arguments (including other functions), they can be exported
using the export
argument:
fx = function(x) x * 2 + y
Q(fx, x=1:3, export=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If we want to use a package function we need to load it on the worker
using the pkgs
parameter, or referencing it with
package_name::
:
f1 = function(x) splitIndices(x, 3)
Q(f1, x=3, n_jobs=1, pkgs="parallel")
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [[1]][[1]]
#> [1] 1
#>
#> [[1]][[2]]
#> [1] 2
#>
#> [[1]][[3]]
#> [1] 3
f2 = function(x) parallel::splitIndices(x, 3)
Q(f2, x=8, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [[1]][[1]]
#> [1] 1 2 3
#>
#> [[1]][[2]]
#> [1] 4 5
#>
#> [[1]][[3]]
#> [1] 6 7 8
# Q(f1, x=5, n_jobs=1)
# (Error #1) could not find function "splitIndices"
foreach
backendThe foreach
package provides an interface to perform repeated tasks on different
backends. While it can perform the function of simple loops using
%do%
:
library(foreach)
foreach(i=1:3) %do% sqrt(i)
#> [[1]]
#> [1] 1
#>
#> [[2]]
#> [1] 1.414214
#>
#> [[3]]
#> [1] 1.732051
it can also perform these operations in parallel using
%dopar%
:
foreach(i=1:3) %dopar% sqrt(i)
#> Warning: executing %dopar% sequentially: no parallel backend registered
#> [[1]]
#> [1] 1
#>
#> [[2]]
#> [1] 1.414214
#>
#> [[3]]
#> [1] 1.732051
The latter allows registering different handlers for parallel
execution, where we can use clustermq
:
# set up the scheduler first, otherwise this will run sequentially
# this accepts same arguments as `Q`
clustermq::register_dopar_cmq(n_jobs=2, memory=1024)
# this will be executed as jobs
foreach(i=1:3) %dopar% sqrt(i)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 1
#>
#> [[2]]
#> [1] 1.414214
#>
#> [[3]]
#> [1] 1.732051
As BiocParallel
supports foreach
too, this means we can run all packages
that use BiocParallel
on the cluster as well via
DoparParam
.
targets
The targets
package enables users to define a dependency structure of different
function calls, and only evaluate them if the underlying data
changed.
The
targets
package is a Make-like pipeline tool for statistics and data science in R. The package skips costly runtime for tasks that are already up to date, orchestrates the necessary computation with implicit parallel computing, and abstracts files as R objects. If all the current output matches the current upstream code and data, then the whole pipeline is up to date, and the results are more trustworthy than otherwise.
It can use clustermq
to perform
calculations as jobs.
The various configurable options are mentioned throughout the documentation, where applicable, however, we list all of the options here for reference.
Options can be set by including a call to
options(<key> = <value>)
in your current
session or added as a line to your ~/.Rprofile
. The former
will only be available in your active session, while the latter will be
available any time after you restart R.
clustermq.scheduler
- One of the supported clustermq
schedulers; options are
"LOCAL"
, "multiprocess"
,
"multicore"
, "lsf"
, "sge"
,
"slurm"
, "pbs"
, "Torque"
, or
"ssh"
(default is the HPC scheduler found in
$PATH
, otherwise "LOCAL"
)clustermq.host
- The name of the node or device for
constructing the ZeroMQ
host address (default is
Sys.info()["nodename"]
)clustermq.ports
- A port range used by
clustermq
to initiate connections. (default:
6000:9999
) Important: This option - when used with the ssh
connector - must be set as an option on the remote host.clustermq.ssh.host
- The user name and host for connecting to the HPC via SSH
(e.g. user@host
); we recommend setting up SSH keys for
password-less loginclustermq.ssh.log
- Path for a file (on the SSH host)
that will be created and populated with logging information regarding
the SSH connection (e.g. "~/cmq_ssh.log"
); helpful for
debugging purposesclustermq.ssh.timeout
- The amount of time to wait (in
seconds) for a SSH start-up connection before timing out (default is
10
seconds)clustermq.ssh.hpc_fwd_port
- Port that will be opened
for SSH reverse tunneling between the workers on the HPC and a local
session. Can also be specified as a port range that clustermq will
sample from. (default: one integer randomly sampled from the range
between 50000 and 55000)clustermq.worker.timeout
- The amount of time to wait
(in seconds) for master-worker communication before timing out (default
is to wait indefinitely)clustermq.template
- Path to a template file for submitting HPC jobs;
only necessary if using your own template, otherwise the default
template will be used (default depends on set or inferred
clustermq.scheduler
)clustermq.data.warning
- The threshold for the size of
the common data (in Mb) before clustermq
throws a warning
(default is 1000
)clustermq.defaults
- A named-list of default values for
the HPC template; this takes precedence over defaults specified in the
template file (default is an empty list)Function calls evaluated by workers are wrapped in event handlers, which means that even if a call evaluation throws an error, this should be reported back to the main R session.
However, there are reasons why workers might crash, and in which case they can not report back. These include:
In this case, it is useful to have the worker(s) create a log file that will also include events that are not reported back. It can be requested using:
This will create a file called
You can customize the file name using
Note that in this case log_file
is a template field of
your scheduler script, and hence needs to be present there in order for
this to work. The default templates all have this field included.
In order to log each worker separately, some schedulers support wildcards in their log file names. For instance:
log_file="/path/to.file.%i"
log_file="/path/to.file.$TASK_ID"
log_file="/path/to.file.%I"
log_file="/path/to.file.%a"
log_file="/path/to.file.$PBS_ARRAY_INDEX"
log_file="/path/to.file.$PBS_ARRAYID"
Your scheduler documentation will have more details about the available options.
When reporting a bug that includes worker crashes, please always include a log file.
In some cases, it may be necessary to activate a specific computing environment on the scheduler jobs prior to starting up the worker. This can be, for instance, because R was only installed in a specific environment or container.
Examples for such environments or containers are:
It should be possible to activate them in the job submission script (i.e., the template file). This is widely untested, but would look the following for the LSF scheduler (analogous for others):
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
module load {{ bashenv | default_bash_env }}
# or: source activate {{ conda | default_conda_env_name }}
# or: your environment activation command
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
This template still needs to be filled, so in the above example you need to pass either
or set it via an option:
The package provides its own default scheduler templates, similar to
the ones listed below. Which template is used is decided based on which
scheduler submission executable is present in the user’s
$PATH
, e.g. sbatch
for SLURM or
bsub
for LSF. qsub
is ambiguous between SGE
and PBS/Torque, so in this case
options(clustermq.scheduler = "<opt>")
should be set
to the correct one.
A user can provide their own template file via
options(clustermq.template = "<file>")
, containing
arbitrary template values {{ value | default }}
. These
values will be filled upon job submission in the following order of
priority:
Q(..., template=list(key = value))
or
workers(... template=list(key = value))
getOption("clustermq.defaults")
Set the following options in your R session that will submit jobs:
options(
clustermq.scheduler = "lsf",
clustermq.template = "/path/to/file/below" # if using your own template
)
To supply your own template, save the contents below with any desired
changes to a file and have clustermq.template
point to
it.
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-n {{ cores | 1 }} # number of cores to use per job
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr; %I for array index
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #BSUB-*
defines command-line arguments to
the bsub
program.
BSUB-M
and BSUB-R
.
Check your local setup if the memory values supplied are MiB or KiB,
default is 4096
if not requesting memory when calling
Q()
BSUB-q default
. Use the queue with name
default. This will most likely not exist on your system, so
choose the right name and uncomment by removing the additional
#
BSUB-W {{ walltime }}
. Set the maximum time a
job is allowed to run before being killed. The default here is to
disable this line. If you enable it, enter a fixed value or pass the
walltime
argument to each function call. The way it is
written, it will use 6 hours if no arguemnt is given.#BSUB-*
(where
*
represents the argument){{ ... }}
), as they are used to fill in the right
variablesOnce this is done, the package will use your settings and no longer warn you of the missing options.
Set the following options in your R session that will submit jobs:
options(
clustermq.scheduler = "sge",
clustermq.template = "/path/to/file/below" # if using your own template
)
To supply your own template, save the contents below with any desired
changes to a file and have clustermq.template
point to
it.
#$ -N {{ job_name }} # job name
##$ -q default # submit to queue named "default"
#$ -j y # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }} # output file
#$ -cwd # use pwd as work dir
#$ -V # use environment variable
#$ -t 1-{{ n_jobs }} # submit jobs as array
#$ -pe smp {{ cores | 1 }} # number of cores to use per job
#$ -l m_mem_free={{ memory | 1073741824 }} # 1 Gb in bytes
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #$-*
defines command-line arguments to the
qsub
program.
$ -q default
. Use the queue with name
default. This will most likely not exist on your system, so
choose the right name and uncomment by removing the additional
#
{{ ... }}
), as they are used to fill in the right
variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
Set the following options in your R session that will submit jobs:
options(
clustermq.scheduler = "slurm",
clustermq.template = "/path/to/file/below" # if using your own template
)
To supply your own template, save the contents below with any desired
changes to a file and have clustermq.template
point to
it.
#!/bin/sh
#SBATCH --job-name={{ job_name }}
##SBATCH --partition=default
#SBATCH --output={{ log_file | /dev/null }}
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 4096 }}
#SBATCH --array=1-{{ n_jobs }}
#SBATCH --cpus-per-task={{ cores | 1 }}
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #SBATCH
defines command-line arguments to
the sbatch
program.
SBATCH --partition default
. Use the queue
with name default. This will most likely not exist on your
system, so choose the right name and uncomment by removing the
additional #
{{ ... }}
), as they are used to fill in the right
variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
Set the following options in your R session that will submit jobs:
options(
clustermq.scheduler = "pbs",
clustermq.template = "/path/to/file/below" # if using your own template
)
To supply your own template, save the contents below with any desired
changes to a file and have clustermq.template
point to
it.
#PBS -N {{ job_name }}
#PBS -J 1-{{ n_jobs }}
#PBS -l select=1:ncpus={{ cores | 1 }}:mpiprocs={{ cores | 1 }}:mem={{ memory | 4096 }}MB
#PBS -l walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
##PBS -q default
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to
the qsub
program.
#PBS-q default
. Use the queue with name
default. This will most likely not exist on your system, so
choose the right name and uncomment by removing the additional
#
{{ ... }}
), as they are used
to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
Set the following options in your R session that will submit jobs:
options(
clustermq.scheduler = "Torque",
clustermq.template = "/path/to/file/below" # if using your own template
)
To supply your own template, save the contents below with any desired
changes to a file and have clustermq.template
point to
it.
#PBS -N {{ job_name }}
#PBS -l nodes={{ n_jobs }}:ppn={{ cores | 1 }},walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
##PBS -q default
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to
the qsub
program.
#PBS-q default
. Use the queue with name
default. This will most likely not exist on your system, so
choose the right name and uncomment by removing the additional
#
{{ ... }}
), as they are used
to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
While SSH is not a scheduler, we can access remote schedulers via
SSH. If you want to use it, first make sure that clustermq
works on your server with the real scheduler. Only then move on to
setting up SSH.
options(
clustermq.scheduler = "ssh",
clustermq.ssh.host = "myhost", # set this up in your local ~/.ssh/config
clustermq.ssh.log = "~/ssh_proxy.log", # log file on your HPC
clustermq.ssh.timeout = 30, # if changing default connection timeout
clustermq.template = "/path/to/file/below" # if using your own template
)
The default template is shown below. If R
is not in your
HPC $PATH
, you may need to specify its path or load the
required bash modules/conda environments.
To supply your own template, save its contents with any desired
changes to a file on your local machine and have
clustermq.template
point to it.