| Title: | Robust, Logged and Reproducible Iteration at Organizational Scale |
|---|---|
| Description: | Turns one-off iterative R procedures (such as for loops, lapply() or pmap() from 'purrr') into production-grade workflows by wrapping them with orthogonal, composable execution layers. Two layers are always active: structured logging with real traceback and per-case timing; and reproducibility capture, which records the R version, loaded package versions, execution environment, the exact iteration mask, and a stat-based fingerprint of every input file referenced in the mask (with a diff_inputs() helper to detect silent drift between runs). Parallel execution (built on the 'future' framework, Bengtsson (2021) <doi:10.32614/RJ-2021-048>), non-blocking background jobs, and opt-in progress reporting (via 'progressr') are implemented as optional, composable layers. Further layers (error replay, content-hash input fingerprinting, content-based case identifiers) are planned and will remain composable with the default layers. |
| Authors: | Daniel Rakotomalala [aut, cre] |
| Maintainer: | Daniel Rakotomalala <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.2.0 |
| Built: | 2026-05-12 23:03:52 UTC |
| Source: | https://github.com/cran/genproc |
Takes a function and returns a modified version that:
always returns a one-row data.frame (the "log row")
captures errors without stopping, recording the error message and the real traceback (call stack at the point of failure)
records wall-clock execution time
add_trycatch_logrow(f)add_trycatch_logrow(f)
f |
A function to wrap. |
tryCatch() unwinds the call stack before entering the error handler.
This means sys.calls() inside a tryCatch error handler returns
the handler's own stack, not the stack that led to the error.
This function uses withCallingHandlers() (which fires while the
original stack is still intact) to capture the traceback, then lets
the error propagate to an outer tryCatch() for control flow.
The returned data.frame always has one row. Columns:
One column per formal argument of f, with the value passed
success: TRUE if the body executed without error
error_message: the error's conditionMessage(), or NA
traceback: the formatted call stack at the error, or NA
duration_secs: wall-clock seconds elapsed
A function with the same formals as f, whose return value
is a one-row data.frame with columns: all original arguments,
success (logical), error_message (character or NA),
traceback (character or NA), duration_secs (numeric).
safe_sqrt <- add_trycatch_logrow(function(x) sqrt(x)) # Happy path: one-row data.frame, success = TRUE. safe_sqrt(4)[, c("x", "success", "error_message", "duration_secs")] # Failing call: the run does not stop. The row carries the error # message and a filtered traceback instead of throwing. bad <- safe_sqrt("a") bad[, c("x", "success", "error_message")]safe_sqrt <- add_trycatch_logrow(function(x) sqrt(x)) # Happy path: one-row data.frame, success = TRUE. safe_sqrt(4)[, c("x", "success", "error_message", "duration_secs")] # Failing call: the run does not stop. The row carries the error # message and a filtered traceback instead of throwing. bad <- safe_sqrt("a") bad[, c("x", "success", "error_message")]
await() blocks until the background future of a non-blocking
genproc() run has resolved, then returns a genproc_result
with log, n_success, n_error, duration_total_secs, and
status populated. If the wrapper future itself crashed (a rare
case — user errors inside individual cases are caught by the
logging layer and are not wrapper crashes), the returned
object has status = "error" and a populated error_message.
await(x, ...) ## S3 method for class 'genproc_result' await(x, ...)await(x, ...) ## S3 method for class 'genproc_result' await(x, ...)
x |
An object. Methods exist for |
... |
Unused, for future extensions. |
await() is idempotent: calling it on a result that has already
been materialized (or was synchronous to begin with) returns it
unchanged.
A genproc_result with status != "running".
Takes two genproc_result objects produced by
genproc() (the same function over the same mask, run at two
different times) and reports which referenced input files have
changed since the first run.
diff_inputs(r0, r1) ## S3 method for class 'genproc_input_diff' print(x, ...)diff_inputs(r0, r1) ## S3 method for class 'genproc_input_diff' print(x, ...)
r0, r1
|
Two |
x |
A |
... |
Ignored (present for S3 method consistency). |
Files are matched by canonical absolute path. The method field
must agree between the two runs.
An object of class genproc_input_diff (a named list)
with components:
Character, e.g. "stat".
A data.frame with columns path,
size_before, size_after, mtime_before, mtime_after.
One row per file whose size or mtime differs.
Character vector of paths whose size and mtime are identical in both runs.
Character vector of paths present in r0's
snapshot but absent in r1's.
Character vector of paths present in r1's
snapshot but absent in r0's.
A data.frame with columns case_id,
path, column, change_type (one of "changed",
"removed", "added"). One row per (case, input column)
impacted by the diff. Pass to rerun_affected() to re-run
only the impacted cases.
# Two runs of the same procedure, with one input file rewritten # in between. `diff_inputs()` reports the drift. src <- file.path(tempdir(), "diff-inputs-demo") dir.create(src, showWarnings = FALSE, recursive = TRUE) write.csv(head(iris), file.path(src, "a.csv"), row.names = FALSE) mask <- data.frame( path = file.path(src, "a.csv"), stringsAsFactors = FALSE ) read_one <- function(path) nrow(read.csv(path)) r0 <- genproc(read_one, mask) # Rewrite the file with strictly more rows: size changes. write.csv(iris, file.path(src, "a.csv"), row.names = FALSE) r1 <- genproc(read_one, mask) diff_inputs(r0, r1)# Two runs of the same procedure, with one input file rewritten # in between. `diff_inputs()` reports the drift. src <- file.path(tempdir(), "diff-inputs-demo") dir.create(src, showWarnings = FALSE, recursive = TRUE) write.csv(head(iris), file.path(src, "a.csv"), row.names = FALSE) mask <- data.frame( path = file.path(src, "a.csv"), stringsAsFactors = FALSE ) read_one <- function(path) nrow(read.csv(path)) r0 <- genproc(read_one, mask) # Rewrite the file with strictly more rows: size changes. write.csv(iris, file.path(src, "a.csv"), row.names = FALSE) r1 <- genproc(read_one, mask) diff_inputs(r0, r1)
Returns the rows of result$log corresponding to cases where
success == FALSE. The columns are exactly those of
result$log (case_id, mask parameters, success, error_message,
traceback, duration_secs).
errors(x, ...) ## S3 method for class 'genproc_result' errors(x, ...)errors(x, ...) ## S3 method for class 'genproc_result' errors(x, ...)
x |
A |
... |
Unused, for future extensions. |
A data.frame with one row per failed case. Empty
data.frame (with the same columns) if there are no failures.
Returns NULL (with a message) if the run is non-blocking and
has not been materialized yet.
rerun_failed(), summary.genproc_result()
result <- genproc( f = function(x) if (x %% 2 == 0) x / 0 else x, mask = data.frame(x = 1:6) ) errors(result)[, c("case_id", "x", "error_message")]result <- genproc( f = function(x) if (x %% 2 == 0) x / 0 else x, mask = data.frame(x = 1:6) ) errors(result)[, c("case_id", "x", "error_message")]
Takes a concrete R expression (e.g. a data processing script that works on one specific case) and returns a function where every external value (strings, variables from the environment that are not functions) has been replaced by a named parameter with the original value as default.
from_example_to_function(expr, env = parent.frame())from_example_to_function(expr, env = parent.frame())
expr |
An expression of length 1, typically created with
|
env |
The environment in which to look up symbols. Symbols found in this environment that are not functions will be turned into parameters. Defaults to the caller's environment. |
This is the first step in the genproc workflow: the user writes a
working example, and from_example_to_function() extracts a
reusable, parameterized version of it.
String literals: every string in the expression becomes a
parameter (e.g. "output.csv" -> param_1 with default
"output.csv").
Non-function symbols: if a symbol exists in env and its value
is not a function, it becomes a parameter.
Locally bound symbols: variables created by assignments inside
the expression (e.g. result <- ...) are never parameterized.
Function names: the head of a call (e.g. read.csv in
read.csv(path)) is never parameterized.
Functions in the environment: symbols whose value is a function are assumed to be part of the program structure, not data.
Numeric, logical, NULL, and other non-character atomic values.
The same value produces the same parameter. If "output.csv" appears
twice, both occurrences map to the same param_N.
A function whose body is the rewritten expression and whose
formals are the detected parameters with their default values.
The function's environment is set to env.
# --- Basic usage --- # `input_path` exists in this environment; "output.csv" is a # string literal. Both become parameters of the resulting function, # with their original values as defaults. input_path <- "/data/input.csv" expr <- expression({ df <- read.csv(input_path) write.csv(df, "output.csv") }) fn <- from_example_to_function(expr) formals(fn) # --- Local bindings are protected --- # `x` is assigned inside the block, so it is NOT parameterized # even though x = 42 exists in the calling environment. x <- 42 expr2 <- expression({ x <- 1 y <- x + 1 }) fn2 <- from_example_to_function(expr2) formals(fn2)# --- Basic usage --- # `input_path` exists in this environment; "output.csv" is a # string literal. Both become parameters of the resulting function, # with their original values as defaults. input_path <- "/data/input.csv" expr <- expression({ df <- read.csv(input_path) write.csv(df, "output.csv") }) fn <- from_example_to_function(expr) formals(fn) # --- Local bindings are protected --- # `x` is assigned inside the block, so it is NOT parameterized # even though x = 42 exists in the calling environment. x <- 42 expr2 <- expression({ x <- 1 y <- x + 1 }) fn2 <- from_example_to_function(expr2) formals(fn2)
Takes a function (typically produced by from_example_to_function())
and returns a one-row data.frame where each column corresponds to a
parameter, with the default value as the cell value. This "template
mask" is the starting point the user expands into a multi-row mask
that defines all iteration cases.
from_function_to_mask(f)from_function_to_mask(f)
f |
A function whose formals define the mask columns. |
In genproc, a mask is a data.frame where each row is an iteration
case and each column is a parameter. The function genproc() will
call the user's function once per row, passing column values as
arguments.
from_function_to_mask() produces a one-row template. The user then
builds the full mask by adding rows (e.g. via rbind(),
dplyr::bind_rows(), or by constructing a multi-row data.frame
directly).
Only scalar atomic defaults are supported (character, numeric, integer, logical). Non-scalar defaults (vectors, lists, data.frames) will be supported in a future version via list-columns. This extension will preserve backwards compatibility: any mask that works today will continue to work unchanged.
The mask returned here is a pure data.frame of parameter values.
Metadata such as case_id, input file hashes, or seeds are managed
separately by genproc() at execution time — they are not stored
as columns or attributes of the mask. This design ensures that
standard data.frame operations (dplyr::filter(), [, rbind())
never accidentally strip metadata.
When the mask is later generalized to a dedicated class
(genproc_mask), existing code passing a plain data.frame will
continue to work (backwards compatibility is a hard constraint).
A one-row data.frame with one column per parameter.
Parameters with default values get those values; parameters
without defaults get NA.
fn <- function(input_path = "data.csv", n_rows = 100) { head(read.csv(input_path), n_rows) } mask <- from_function_to_mask(fn) mask # input_path n_rows # 1 data.csv 100 # Expand to multiple cases: full_mask <- data.frame( input_path = c("a.csv", "b.csv", "c.csv"), n_rows = c(10, 50, 100) )fn <- function(input_path = "data.csv", n_rows = 100) { head(read.csv(input_path), n_rows) } mask <- from_function_to_mask(fn) mask # input_path n_rows # 1 data.csv 100 # Expand to multiple cases: full_mask <- data.frame( input_path = c("a.csv", "b.csv", "c.csv"), n_rows = c(10, 50, 100) )
This is the central function of the genproc package. It takes a function and an iteration mask (data.frame), calls the function once per row of the mask, and returns a structured result with:
a log data.frame (one row per case, with success/error/traceback/timing)
reproducibility information (R version, packages, environment, parallel spec)
the exact mask used
stable case IDs linking log rows to mask rows
genproc( f, mask, f_mapping = NULL, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )genproc( f, mask, f_mapping = NULL, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )
f |
A function to apply to each row of the mask. Each formal
of |
mask |
A data.frame where each row is an iteration case and
each column is a parameter value. Can be produced by
|
f_mapping |
Optional named character vector to rename |
parallel |
|
nonblocking |
|
track_inputs |
Logical. When |
input_cols |
|
skip_input_cols |
|
The logged and reproducibility layers are always active and
cannot be disabled. Three optional layers compose on top: parallel
(pass a parallel_spec() to parallel), non-blocking (pass a
nonblocking_spec() to nonblocking), and progress monitoring
(wrap the call in progressr::with_progress()).
Cases are executed sequentially in row order by default. Supply
parallel = parallel_spec(...) to dispatch them in parallel via
the future ecosystem. The logging and reproducibility layers
remain active in both modes; the parallel layer is strictly
additive.
Parallel execution preserves the mask row order in the resulting
log data.frame, regardless of the order in which workers return.
Parallel execution requires genproc to be installed (not only
loaded via devtools::load_all()) on each worker, because the
logging layer serializes closures whose environments reference
genproc internals. The only exception is
parallel_spec(strategy = "sequential"), which runs in the
current process and needs nothing extra — this is the recommended
mode for deterministic testing.
Errors in individual cases do not stop the run. Each case is
wrapped with add_trycatch_logrow(), which captures the error
message and the real traceback (via withCallingHandlers). The
run continues with the next case. This holds identically in
sequential and parallel mode.
genproc() emits one progressr signal per completed case in
sequential and parallel modes. The signals are no-op unless the
calling code is wrapped in progressr::with_progress(...), in
which case the user sees a progress bar (or any other handler
chosen via progressr::handlers()):
library(progressr) with_progress( result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4)) )
Without with_progress(), there is zero overhead and zero visible
change: the integration is a hook, not a default behaviour.
progressr is declared in Suggests; the integration is
conditional on its installation. The non-blocking path does not
yet emit signals — live monitoring of background runs is on the
roadmap.
When both parallel and nonblocking are supplied, the
non-blocking wrapper envelops the parallel dispatch (one outer
future submits the run, inner workers process the cases). On
platforms where the wrapper subprocess R inherits a restrictive
default for getOption("mc.cores") (typically 1 on Windows and in
some RStudio configurations), parallelly would otherwise refuse
to spawn the inner workers. genproc() works around this with
two surgical adjustments inside the wrapper subprocess, applied
only in the composed case and only when the user has not set
their own values:
Set R_PARALLELLY_AVAILABLECORES_METHODS = "system" so that
availableCores() ignores the legacy mc.cores option and
reports the true detected core count (lifts the hard-limit
refusal).
Raise options(mc.cores) from 1 to the system core count, so
that parallelly's soft-limit warning no longer fires with a
misleading "only 1 CPU cores available" message.
The calling session is never modified by either adjustment.
Each row of the mask receives a case_id (currently index-based:
case_0001, case_0002, ...). This ID appears in the log and
can be used for replay, monitoring, and cross-referencing.
The mask does not need to contain a column for every parameter of
f. Parameters not present in the mask will use their default
values. However, parameters without defaults that are also missing
from the mask will cause an error before execution starts.
Extra columns in the mask (not matching any parameter) are silently ignored.
An object of class genproc_result (a named list) with
components:
A data.frame with one row per case. Contains all
parameter values, plus case_id, success, error_message,
traceback, and duration_secs.
A list of environment information
captured at run start (R version, packages, OS, locale,
timezone, mask snapshot, parallel and non-blocking specs if
any, and inputs — a stat-based fingerprint of every input
file referenced by the mask). See capture_reproducibility().
Integer, number of successful cases.
Integer, number of failed cases.
Numeric, total wall-clock time for the entire run.
Character. "done" for a synchronous run that
has completed; "running" for a non-blocking run whose
future has not resolved yet; "done (not collected)" for a
non-blocking run whose future has resolved but whose result
has not been collected via await() yet; "error" when the
background run errored out.
The genproc_result class is designed for forward compatibility.
Existing fields (log, reproducibility, n_success, n_error,
duration_total_secs) are guaranteed stable. Future versions may
add new fields (e.g. worker_id in the log for parallel runs,
or collect()/poll() methods for non-blocking execution) but
will never remove or rename existing ones.
Optional execution layers: parallel_spec(), nonblocking_spec(),
status(), await().
Inspecting a result: errors(), summary.genproc_result(),
rerun_failed().
Reproducibility tooling: diff_inputs(), rerun_affected().
# Sequential run (the default). Returns immediately when done. result <- genproc( f = function(x, y) x + y, mask = data.frame(x = c(1, 2, 3), y = c(10, 20, 30)) ) result$log # One-off parallel call: genproc installs a temporary multisession # plan and restores the previous one on exit. Capped at 2 workers # to comply with the CRAN policy on parallelism in examples. result <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), parallel = parallel_spec(workers = 2) ) result$log # Non-blocking + parallel composed: launch in the background, # keep the console, collect later with await(). job <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), parallel = parallel_spec(workers = 2), nonblocking = nonblocking_spec() ) status(job) # "running" until the future resolves job <- await(job) # blocks; idempotent on already-resolved jobs job$log# Sequential run (the default). Returns immediately when done. result <- genproc( f = function(x, y) x + y, mask = data.frame(x = c(1, 2, 3), y = c(10, 20, 30)) ) result$log # One-off parallel call: genproc installs a temporary multisession # plan and restores the previous one on exit. Capped at 2 workers # to comply with the CRAN policy on parallelism in examples. result <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), parallel = parallel_spec(workers = 2) ) result$log # Non-blocking + parallel composed: launch in the background, # keep the console, collect later with await(). job <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), parallel = parallel_spec(workers = 2), nonblocking = nonblocking_spec() ) status(job) # "running" until the future resolves job <- await(job) # blocks; idempotent on already-resolved jobs job$log
Returns a configuration object to pass as the nonblocking
argument of genproc(). When supplied, genproc() returns
immediately with a genproc_result of status "running" while
the actual work continues in a background future. Use status()
to poll the state and await() to block until completion.
nonblocking_spec(strategy = "multisession", packages = NULL, globals = TRUE)nonblocking_spec(strategy = "multisession", packages = NULL, globals = TRUE)
strategy |
Character, or |
packages |
Character vector, or |
globals |
Logical or character. Forwarded to
|
A list of class "genproc_nonblocking_spec" with the
validated, normalized fields.
nonblocking_spec() and parallel_spec() are orthogonal and can
be combined. The non-blocking layer launches one outer future;
inside it, the parallel layer dispatches cases via
future.apply. With both strategies set to "multisession",
future resolves the inner layer as "sequential" by default
(see future::plan() nesting rules) unless the caller installs an
explicit nested plan via future::plan(list(...)).
On Windows and in some RStudio configurations, the wrapper
subprocess inherits getOption("mc.cores") set to 1, which would
lead parallelly to refuse the inner workers ("only 1 CPU core
available for this R process"), and to also emit a misleading
soft-limit warning even after the refusal is lifted. genproc()
works around both issues transparently in the composed case (only
when the user has not set their own values). See ?genproc for
details.
parallel_spec(), status(), await()
# Launch in the background, keep the console. spec <- nonblocking_spec() job <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), nonblocking = spec ) status(job) # "running" job <- await(job) # blocks until done job$log # Deterministic test: exercise the code path without real async spec <- nonblocking_spec(strategy = "sequential")# Launch in the background, keep the console. spec <- nonblocking_spec() job <- genproc( f = function(x) x * 2, mask = data.frame(x = 1:4), nonblocking = spec ) status(job) # "running" job <- await(job) # blocks until done job$log # Deterministic test: exercise the code path without real async spec <- nonblocking_spec(strategy = "sequential")
Returns a configuration object to pass as the parallel argument
of genproc(). The object describes how to parallelize; the
actual execution is carried out by future.apply::future_lapply()
on top of the backend selected by future::plan().
parallel_spec( workers = NULL, strategy = NULL, chunk_size = NULL, seed = TRUE, packages = NULL, globals = TRUE )parallel_spec( workers = NULL, strategy = NULL, chunk_size = NULL, seed = TRUE, packages = NULL, globals = TRUE )
workers |
Integer >= 1, or |
strategy |
Character, or |
chunk_size |
Integer >= 1, or |
seed |
Controls reproducible random-number generation
across workers. Passed to |
packages |
Character vector, or |
globals |
Logical or character. Forwarded to
|
A list of class "genproc_parallel_spec" with the
validated, normalized fields.
"sequential": runs in the current process, no workers.
Exercises the parallel code path without the overhead; useful
for deterministic testing.
"multisession": portable (works on Windows), launches R
subprocesses via parallelly. The default recommendation
for most workloads.
"multicore": forks the current process (Unix/macOS only,
not available on Windows and not reliable inside RStudio).
Faster startup than multisession but loses portability.
"cluster": explicit cluster of workers, possibly on other
machines. For large-scale batch execution.
For most users, leaving strategy = NULL and calling
future::plan() once at the top of the session is the cleanest
setup.
With seed = TRUE, each case receives an independent
L'Ecuyer-CMRG stream derived from a random master seed. Same
master seed -> identical results regardless of worker count or
chunking. To pin the master seed, pass an integer
(seed = 42L).
# Use whatever plan the caller has set spec <- parallel_spec() # One-off parallel call with 4 workers, reproducible RNG spec <- parallel_spec(workers = 4, strategy = "multisession", seed = 42L) # Exercise the parallel code path deterministically in a test spec <- parallel_spec(strategy = "sequential")# Use whatever plan the caller has set spec <- parallel_spec() # One-off parallel call with 4 workers, reproducible RNG spec <- parallel_spec(workers = 4, strategy = "multisession", seed = 42L) # Exercise the parallel code path deterministically in a test spec <- parallel_spec(strategy = "sequential")
Displays a structured summary of the run: status, timestamp, execution mode, case counts, total duration, and an actionable hint when relevant.
## S3 method for class 'genproc_result' print(x, ...)## S3 method for class 'genproc_result' print(x, ...)
x |
A |
... |
Ignored (present for S3 method consistency). |
For non-blocking results, the status is queried live from the
attached future via status() rather than read from the stored
field. Repeated print(x) calls therefore reflect the actual
progress of the background run. status() distinguishes
"done" (the future resolved successfully) from "error" (the
wrapper future itself crashed). Numeric fields stay (pending)
until await() is called to materialize the result.
When the parallel layer was used and startup overhead clearly
dominated the run, the print method emits a Note hinting at
the issue — a pattern that often surprises users on small
workloads. Two metrics depending on whether workers is known:
parallel efficiency ((cumulative / workers) / wall) below 50%
when workers is supplied, or wall > cumulative * 1.2 in
power-user mode (workers unknown). Both require wall > 0.5s to
avoid noise on very short runs.
x, invisibly.
Print method for genproc_result_summary
## S3 method for class 'genproc_result_summary' print(x, ...)## S3 method for class 'genproc_result_summary' print(x, ...)
x |
A |
... |
Unused, for S3 method consistency. |
x, invisibly.
Takes a function and a name mapping, and returns a new function where
both the formals and all symbol references in the body have been
renamed according to the mapping. This is typically used after
from_example_to_function() to replace generated names like
param_1, param_2 with meaningful names.
rename_function_params(f, mapping)rename_function_params(f, mapping)
f |
A function whose parameters should be renamed. |
mapping |
A named character vector. Names are the current
parameter names, values are the new names.
Example: |
The function checks that:
All names in mapping actually exist as formals of f
New names are unique (no duplicates)
New names do not collide with parameters not being renamed
If the body contains a nested function definition whose formals
shadow a parameter being renamed, the shadowed references in that
inner body will still be renamed. This is unlikely in practice
(parameters from from_example_to_function() are named param_N)
but is noted here for completeness.
A function with renamed formals and body.
fn <- function(param_1 = "in.csv", param_2 = "out.csv") { df <- read.csv(param_1) write.csv(df, param_2) } fn2 <- rename_function_params(fn, c( param_1 = "input_path", param_2 = "output_path" )) # Formals were renamed: formals(fn2) # And the body too — references to `param_1` and `param_2` are # updated in place, the function source is not edited. body(fn2)fn <- function(param_1 = "in.csv", param_2 = "out.csv") { df <- read.csv(param_1) write.csv(df, param_2) } fn2 <- rename_function_params(fn, c( param_1 = "input_path", param_2 = "output_path" )) # Formals were renamed: formals(fn2) # And the body too — references to `param_1` and `param_2` are # updated in place, the function source is not edited. body(fn2)
Filters the original mask of r0 down to the cases that
referenced inputs reported as changed, removed, or added by
diff_inputs(), and re-runs genproc() on that subset.
rerun_affected( r0, diff, f, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )rerun_affected( r0, diff, f, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )
r0 |
A |
diff |
A |
f |
A function. Typically the same function passed to the
original |
parallel, nonblocking, track_inputs, input_cols, skip_input_cols
|
Forwarded to |
This is the actionable end of the reproducibility layer: when an
upstream file silently drifts, you do not need to re-run the
whole mask. rerun_affected() produces a smaller run that
refreshes only the impacted outputs.
A new genproc_result covering only the affected cases.
Its case_ids are local to the subset (re-numbered starting at
case_0001); the link back to the original r0 is via the
matching rows of r0$reproducibility$mask_snapshot. If diff
reports no affected cases, the function returns NULL with a
message — there is nothing to re-run.
# Set up a tiny workspace with one tracked input file. csv <- tempfile(fileext = ".csv") write.csv(iris, csv, row.names = FALSE) count_rows <- function(p) nrow(read.csv(p)) r0 <- genproc(count_rows, data.frame(p = csv)) # ... time passes, the upstream file is silently rewritten ... write.csv(head(iris), csv, row.names = FALSE) r1 <- genproc(count_rows, data.frame(p = csv)) d <- diff_inputs(r0, r1) # d$cases_affected lists the case_ids whose inputs drifted. refreshed <- rerun_affected(r0, d, f = count_rows) refreshed$log# Set up a tiny workspace with one tracked input file. csv <- tempfile(fileext = ".csv") write.csv(iris, csv, row.names = FALSE) count_rows <- function(p) nrow(read.csv(p)) r0 <- genproc(count_rows, data.frame(p = csv)) # ... time passes, the upstream file is silently rewritten ... write.csv(head(iris), csv, row.names = FALSE) r1 <- genproc(count_rows, data.frame(p = csv)) d <- diff_inputs(r0, r1) # d$cases_affected lists the case_ids whose inputs drifted. refreshed <- rerun_affected(r0, d, f = count_rows) refreshed$log
Filters the original mask of r0 down to the cases for which
success == FALSE and re-runs genproc() on that subset. Useful
when a transient external problem caused some cases to fail and
the user has fixed the cause: rather than re-running the whole
mask, only the failed cases are refreshed.
rerun_failed( r0, f, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )rerun_failed( r0, f, parallel = NULL, nonblocking = NULL, track_inputs = TRUE, input_cols = NULL, skip_input_cols = NULL )
r0 |
A |
f |
A function. Typically the same function passed to the
original |
parallel, nonblocking, track_inputs, input_cols, skip_input_cols
|
Forwarded to |
A new genproc_result covering only the failed cases.
Its case_ids are local to the subset (re-numbered starting at
case_0001); the link back to the original r0 is via the
matching rows of r0$reproducibility$mask_snapshot. If r0
has zero failed cases, the function returns NULL with a
message — there is nothing to re-run.
rerun_affected(), errors(), summary.genproc_result()
r0 <- genproc( f = function(x) if (x %% 2 == 0) stop("even") else x, mask = data.frame(x = 1:6) ) # 3 cases failed (the even ones). After fixing f, retry only those: r1 <- rerun_failed(r0, f = function(x) abs(x)) r1$logr0 <- genproc( f = function(x) if (x %% 2 == 0) stop("even") else x, mask = data.frame(x = 1:6) ) # 3 cases failed (the even ones). After fixing f, retry only those: r1 <- rerun_failed(r0, f = function(x) abs(x)) r1$log
status() is a non-blocking S3 generic. On a genproc_result,
it returns one of:
"running" — the underlying future is not yet resolved.
"done" — the future has resolved successfully (the
result is ready to be collected via await()).
"error" — the wrapper future itself crashed. Call
await() to retrieve the error message.
status(x, ...) ## S3 method for class 'genproc_result' status(x, ...)status(x, ...) ## S3 method for class 'genproc_result' status(x, ...)
x |
An object. Methods exist for |
... |
Unused, for future extensions. |
For a synchronous (non-nonblocking) result, status() simply
returns result$status ("done" or "error").
status() peeks at the resolved future via future::value()
inside a tryCatch. Because value() consumes the future, the
peek result is cached in a shared environment so that a
subsequent await() does not re-materialize it.
A single character string: "running", "done", or
"error".
Produces a compact digest of the run: status, success rate, duration stats, and the top recurring error messages. Useful on runs with a lot of cases where the raw log is too noisy to eyeball.
## S3 method for class 'genproc_result' summary(object, top_errors = 10L, ...)## S3 method for class 'genproc_result' summary(object, top_errors = 10L, ...)
object |
A |
top_errors |
Integer. Maximum number of distinct error messages to include in the summary, ranked by occurrence. Default 10. |
... |
Unused, for future extensions. |
An object of class genproc_result_summary (a list)
with components:
Logical. FALSE if the run is
non-blocking and has not been collected via await().
In that case the other fields are NA.
Character, mirrors result$status.
Integer.
Integers.
Numeric in 0..1.
Numeric, wall-clock total.
List with total, mean, max,
and slowest_case_id. NULL if no per-case durations.
data.frame with columns error_message and
count, sorted by count descending. Trimmed to
top_errors rows.
result <- genproc( f = function(x) { if (x %% 2 == 0) stop("even") if (x %% 3 == 0) stop("multiple of three") x }, mask = data.frame(x = 1:12) ) summary(result)result <- genproc( f = function(x) { if (x %% 2 == 0) stop("even") if (x %% 3 == 0) stop("multiple of three") x }, mask = data.frame(x = 1:12) ) summary(result)