Technical Documentation

Worker API

Base API and schedulers

The main worker functions are wrapped in an R6 class with the name of QSys. This provides a standardized API to the lower-level messages that are sent via ZeroMQ.

The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:

+ QSys
  |- Multicore
  |- LSF
  + SGE
    |- PBS
    |- Torque
  |- etc.

The user-visible object is a worker Pool that wraps this, and will eventually allow to manage different workers.

Workers

Creating a worker pool

A pool of workers can be created using the workers() function, which instantiates a Pool object of the corresponding QSys-derived scheduler class. See ?workers for details.

# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)

# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$cleanup())

Worker startup

For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.

This is achieved by the call to R common to all schedulers:

R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Worker communication

On the master’s side, we wait until a worker connects:

msg = w$recv() # this will block until a worker is ready

We can then send any expression to be evaluated on the worker using the send method:

w$send(expression, ...)

After the expression (in ...), any variables that should be passed along with the call can be added. For batch processing that clustermq usually does, this command is work_chunk, where the chunk data is added:

w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed),
       chunk = chunk(iter, submit_index))

Worker environment

We can add any number of objects to a worker environment using the env method:

w$env(object=value, ...)

This will also invisibly return a data.frame with all objects currently in the environment. If a user wants to inspect the environment without changing it they can call w$env() without arguments. The environment will be propagated to all workers automatically in a greedy fashion.

Main event loop

Putting the above together in an event loop, we get what is essentially implemented in master. w$send invisibly returns an identifier to track which call was submitted, and w$current() matches the same to w$recv().

w = workers(3)
on.exit(w$cleanup())
w$env(...)

while (we have new work to send || jobs pending) {
    res = w$recv() # the result of the call, or NULL for a new worker
    w$current()$call_ref # matches answer to request, -1 otherwise
    # handle result

    if (more work)
        call_ref = w$send(expression, ...) # call_ref tracks request identity
    else
        w$send_shutdown()
}

A loop of a similar structure can be used to extend clustermq. As an example, this was done by the targets package.

ZeroMQ message specification

Communication between the master (main event loop) and workers (QSys base class) is organised in messages. These are chunks of serialized data sent via ZeroMQ’s protocol (ZMTP). The parts of each message are called frames.

Master - Worker communication

The master requests an evaluation in a message with X frames (direct) or Y if proxied. This is all handled by clustermq internally.

  • The worker identity frame or routing identifier
  • A delimiter frame
  • Worker status (wlife_t)
  • The call to be evaluated
  • N repetitions of:
    • The variable name of an environment object that is not yet present on the worker
    • The variable value

If using a proxy, this will be followed by a SEXP that contains variable names the proxy should add before forwarding to the worker.

Worker evaluation

A worker evaluates the call using the R C API:

R_tryEvalSilent(cmd, env, &err);

If an error occurs in this evaluation will be returned as a structure with class worker_error. If a developer wants to catch errors and warnings in a more fine-grained manner, it is recommended to add their own callingHandlers to cmd (as clustermq does work its work_chunk).

Worker - Master communication

The result of this evaluation is then returned in a message with four (direct) or five (proxied) frames:

  • Worker identity frame (handled internally by ZeroMQ’s ZMQ_REQ socket)
  • Empty frame (handled internally by ZeroMQ’s ZMQ_REQ socket)
  • Worker status (wlife_t) that is handled internally by clustermq
  • The result of the call (SEXP), visible to the user

If using a worker via SSH, these frames will be preceded by a routing identify frame that is handled internally by ZeroMQ and added or peeled off by the proxy.