Title: | Lightweight, General-Purpose Data Analysis Pipelines |
---|---|
Description: | A lightweight yet powerful framework for building robust data analysis pipelines. With 'pipeflow', you initialize a pipeline with your dataset and construct workflows step by step by adding R functions. You can modify, remove, or insert steps and parameters at any stage, while 'pipeflow' ensures the pipeline's integrity. Overall, this package offers a beginner-friendly framework that simplifies and streamlines the development of data analysis pipelines by making them modular, intuitive, and adaptable. |
Authors: | Roman Pahl [aut, cre] |
Maintainer: | Roman Pahl <[email protected]> |
License: | GPL-3 |
Version: | 0.2.2 |
Built: | 2024-12-23 13:41:45 UTC |
Source: | CRAN |
A pipeline consists of a series of steps, which usually
are added one by one. Each step is made up of a function computing
something once the pipeline is run. This function can be an existing
R function (e.g. mean()
) or an anonymous/lambda function specifically
defined for the pipeline. One useful feature is that function
parameters can refer to results of earlier pipeline steps using the
syntax x = ~earlier_step_name
- see the Examples for more details.
pipe_add( pip, step, fun, params = list(), description = "", group = step, keepOut = FALSE )
pipe_add( pip, step, fun, params = list(), description = "", group = step, keepOut = FALSE )
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
keepOut |
|
returns the Pipeline
object invisibly
# Add steps with lambda functions p <- pipe_new("myPipe", data = 1) pipe_add(p, "s1", \(x = ~data) 2*x) # use input data pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y) try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4)) try(pipe_add(p, "calc_mean", mean)) # default value for x is missing pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p |> pipe_run() |> pipe_get_out("calc_mean") # Step description p <- pipe_new("myPipe", data = 1:10) pipe_add(p, "s1", \(x = ~data) 2*x, description = "multiply by 2") print(p, verbose = TRUE) # print all columns including description # Group output p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4)) pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep") pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep") pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y) p |> pipe_run() |> pipe_collect_out(all = TRUE)
# Add steps with lambda functions p <- pipe_new("myPipe", data = 1) pipe_add(p, "s1", \(x = ~data) 2*x) # use input data pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y) try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4)) try(pipe_add(p, "calc_mean", mean)) # default value for x is missing pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p |> pipe_run() |> pipe_get_out("calc_mean") # Step description p <- pipe_new("myPipe", data = 1:10) pipe_add(p, "s1", \(x = ~data) 2*x, description = "multiply by 2") print(p, verbose = TRUE) # print all columns including description # Group output p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4)) pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep") pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep") pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y) p |> pipe_run() |> pipe_collect_out(all = TRUE)
When appending, pipeflow
takes care of potential name
clashes with respect to step names and dependencies, that is, if
needed, it will automatically adapt step names and dependencies to
make sure they are unique in the merged pipeline.
pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
pip |
|
p |
|
outAsIn |
|
tryAutofixNames |
|
sep |
|
returns new combined Pipeline
object.
# Append pipeline p1 <- pipe_new("pipe1") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_new("pipe2") pipe_add(p2, "step2", \(y = 1) y) p1 |> pipe_append(p2) # Append pipeline with potential name clashes p3 <- pipe_new("pipe3") pipe_add(p3, "step1", \(z = 1) z) p1 |> pipe_append(p2) |> pipe_append(p3) # Use output of first pipeline as input for second pipeline p1 <- pipe_new("pipe1", data = 8) p2 <- pipe_new("pipe2") pipe_add(p1, "square", \(x = ~data) x^2) pipe_add(p2, "log2", \(x = ~data) log2(x)) p12 <- p1 |> pipe_append(p2, outAsIn = TRUE) p12 |> pipe_run() |> pipe_get_out("log2") p12 # Custom name separator for adapted step names p1 |> pipe_append(p2, sep = "___")
# Append pipeline p1 <- pipe_new("pipe1") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_new("pipe2") pipe_add(p2, "step2", \(y = 1) y) p1 |> pipe_append(p2) # Append pipeline with potential name clashes p3 <- pipe_new("pipe3") pipe_add(p3, "step1", \(z = 1) z) p1 |> pipe_append(p2) |> pipe_append(p3) # Use output of first pipeline as input for second pipeline p1 <- pipe_new("pipe1", data = 8) p2 <- pipe_new("pipe2") pipe_add(p1, "square", \(x = ~data) x^2) pipe_add(p2, "log2", \(x = ~data) log2(x)) p12 <- p1 |> pipe_append(p2, outAsIn = TRUE) p12 |> pipe_run() |> pipe_get_out("log2") p12 # Custom name separator for adapted step names p1 |> pipe_append(p2, sep = "___")
Appends string to all step names and takes care of updating step dependencies accordingly.
pipe_append_to_step_names(pip, postfix, sep = ".")
pipe_append_to_step_names(pip, postfix, sep = ".")
pip |
|
postfix |
|
sep |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe") pipe_add(p, "step1", \(x = 1) x) pipe_add(p, "step2", \(y = 1) y) pipe_append_to_step_names(p, "new") p pipe_append_to_step_names(p, "foo", sep = "__") p
p <- pipe_new("pipe") pipe_add(p, "step1", \(x = 1) x) pipe_add(p, "step2", \(y = 1) y) pipe_append_to_step_names(p, "new") p pipe_append_to_step_names(p, "foo", sep = "__") p
Creates a copy of a pipeline object.
pipe_clone(pip, deep = FALSE)
pipe_clone(pip, deep = FALSE)
pip |
|
deep |
|
returns the copied Pipeline
object
p1 <- pipe_new("pipe") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_clone(p1) pipe_add(p2, "step2", \(y = 1) y) p1 p2
p1 <- pipe_new("pipe") pipe_add(p1, "step1", \(x = 1) x) p2 <- pipe_clone(p1) pipe_add(p2, "step2", \(y = 1) y) p1 p2
Collects output afer pipeline run, by default, from all
steps for which keepOut
was set to TRUE
when steps were added
(see pipe_add()
). The output is grouped by the group names (see
group
parameter in pipe_add()
),
which by default are set identical to the step names.
pipe_collect_out(pip, groupBy = "group", all = FALSE)
pipe_collect_out(pip, groupBy = "group", all = FALSE)
pip |
|
groupBy |
|
all |
|
list
containing the output, named after the groups, which,
by default, are the steps.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2) pipe_add(p, "step2", \(x = ~step1) x + 2, keepOut = TRUE) pipe_run(p) pipe_collect_out(p) pipe_collect_out(p, all = TRUE) |> str() # Grouped output p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2, group = "add") pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add") pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult") pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult") p pipe_run(p) pipe_collect_out(p, all = TRUE) |> str() # Grouped by state pipe_set_params(p, list(y = 5)) p pipe_collect_out(p, groupBy = "state", all = TRUE) |> str()
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2) pipe_add(p, "step2", \(x = ~step1) x + 2, keepOut = TRUE) pipe_run(p) pipe_collect_out(p) pipe_collect_out(p, all = TRUE) |> str() # Grouped output p <- pipe_new("pipe", data = 1:2) pipe_add(p, "step1", \(x = ~data) x + 2, group = "add") pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add") pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult") pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult") p pipe_run(p) pipe_collect_out(p, all = TRUE) |> str() # Grouped by state pipe_set_params(p, list(y = 5)) p pipe_collect_out(p, groupBy = "state", all = TRUE) |> str()
Discard all steps that match a given pattern
.
pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)
pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)
pip |
|
pattern |
|
recursive |
|
fixed |
|
... |
further arguments passed to |
the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~add1) x + 2) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p pipe_discard_steps(p, "mult") p # Re-add steps pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(pipe_discard_steps(p, "add1")) # ... unless we enforce to remove its downstream dependencies as well pipe_discard_steps(p, "add1", recursive = TRUE) p # Trying to discard non-existent steps is just ignored pipe_discard_steps(p, "non-existent")
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~add1) x + 2) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p pipe_discard_steps(p, "mult") p # Re-add steps pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(pipe_discard_steps(p, "add1")) # ... unless we enforce to remove its downstream dependencies as well pipe_discard_steps(p, "add1", recursive = TRUE) p # Trying to discard non-existent steps is just ignored pipe_discard_steps(p, "non-existent")
Get the data set for the pipeline
pipe_get_data(pip)
pipe_get_data(pip)
pip |
|
the output defined in the data
step, which by default is
the first step of the pipeline
p <- pipe_new("pipe", data = 1:2) pipe_get_data(p) pipe_set_data(p, 3:4) pipe_get_data(p)
p <- pipe_new("pipe", data = 1:2) pipe_get_data(p) pipe_set_data(p, 3:4) pipe_get_data(p)
Get step dependencies
pipe_get_depends(pip) pipe_get_depends_down(pip, step, recursive = TRUE) pipe_get_depends_up(pip, step, recursive = TRUE)
pipe_get_depends(pip) pipe_get_depends_down(pip, step, recursive = TRUE) pipe_get_depends_up(pip, step, recursive = TRUE)
pip |
|
step |
|
recursive |
|
pipe_get_depends
: named list of dependencies for each step
pipe_get_depends_down
: list of downstream dependencies
pipe_get_depends_up
: list of downstream dependencies
pipe_get_depends
: get all dependencies for all steps defined
in the pipeline
pipe_get_depends_down
: get all downstream dependencies of a
given step, by default descending recursively.
pipe_get_depends_up
: get all upstream dependencies of a
given step, by default descending recursively.
# pipe_get_depends p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_get_depends(p) # pipe_get_depends_down p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_down(p, "add1") pipe_get_depends_down(p, "add1", recursive = FALSE) # pipe_get_depends_up p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_up(p, "mult4") pipe_get_depends_up(p, "mult4", recursive = FALSE)
# pipe_get_depends p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_get_depends(p) # pipe_get_depends_down p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_down(p, "add1") pipe_get_depends_down(p, "add1", recursive = FALSE) # pipe_get_depends_up p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_add(p, "mult3", \(x = ~add1) x * 3) pipe_add(p, "mult4", \(x = ~add2) x * 4) pipe_get_depends_up(p, "mult4") pipe_get_depends_up(p, "mult4", recursive = FALSE)
Get the pipeline as a graph with nodes and edges.
pipe_get_graph(pip, groups = NULL)
pipe_get_graph(pip, groups = NULL)
pip |
|
groups |
|
list with two data frames, one for nodes and one for edges
ready to be used with the visNetwork::visNetwork()
function of the
visNetwork package.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = graph) }
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = graph) }
Get output of given step
pipe_get_out(pip, step)
pipe_get_out(pip, step)
pip |
|
step |
|
the output at the given step.
pipe_collect_out()
to collect output of multiple steps.
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2")
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2")
Retrieves unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps.
pipe_get_params(pip, ignoreHidden = TRUE) pipe_get_params_at_step(pip, step, ignoreHidden = TRUE) pipe_get_params_unique(pip, ignoreHidden = TRUE) pipe_get_params_unique_json(pip, ignoreHidden = TRUE)
pipe_get_params(pip, ignoreHidden = TRUE) pipe_get_params_at_step(pip, step, ignoreHidden = TRUE) pipe_get_params_unique(pip, ignoreHidden = TRUE) pipe_get_params_unique_json(pip, ignoreHidden = TRUE)
pip |
|
ignoreHidden |
|
step |
|
pipe_get_params
: list of parameters, sorted and named by step -
steps with no parameters are filtered out
pipe_get_params_at_step
: list of parameters at given step
pipe_get_params_unique
: list of parameters where each parameter
is only listed once. The values of the parameters will be the values
of the first step where the parameters were defined, respectively.
get_params_unique_json
: flat unnamed json list of unique parameters
# pipe_get_params p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "add3", \() 1 + 2) pipe_get_params(p, ) |> str() pipe_get_params(p, ignoreHidden = FALSE) |> str() # pipe_get_params_at_step pipe_get_params_at_step(p, "add2") pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE) pipe_get_params_at_step(p, "add3") # pipe_get_params_unique p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b) pipe_get_params_unique(p) pipe_get_params_unique(p, ignoreHidden = FALSE) # get_params_unique_json pipe_get_params_unique_json(p) pipe_get_params_unique_json(p, ignoreHidden = FALSE)
# pipe_get_params p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "add3", \() 1 + 2) pipe_get_params(p, ) |> str() pipe_get_params(p, ignoreHidden = FALSE) |> str() # pipe_get_params_at_step pipe_get_params_at_step(p, "add2") pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE) pipe_get_params_at_step(p, "add3") # pipe_get_params_unique p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z) pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b) pipe_get_params_unique(p) pipe_get_params_unique(p, ignoreHidden = FALSE) # get_params_unique_json pipe_get_params_unique_json(p) pipe_get_params_unique_json(p, ignoreHidden = FALSE)
Get step information
pipe_get_step(pip, step) pipe_get_step_names(pip) pipe_get_step_number(pip, step) pipe_has_step(pip, step)
pipe_get_step(pip, step) pipe_get_step_names(pip) pipe_get_step_number(pip, step) pipe_has_step(pip, step)
pip |
|
step |
|
pipe_get_step
: data.table
row containing the step
pipe_get_step_names
: character
vector of step names
pipe_get_step_number
: the step number in the pipeline
pipe_get_step_number
: whether step exists
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z) pipe_run(p) # pipe_get_step_names pipe_get_step_names(p) # get_step_number pipe_get_step_number(p, "add1") pipe_get_step_number(p, "add2") # pipe_has_step pipe_has_step(p, "add1") pipe_has_step(p, "foo") # pipe_get_step add1 <- pipe_get_step(p, "add1") add1 add1[["params"]] add1[["fun"]] try(p$get_step("foo")) # error: step 'foo' does not exist
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z) pipe_run(p) # pipe_get_step_names pipe_get_step_names(p) # get_step_number pipe_get_step_number(p, "add1") pipe_get_step_number(p, "add2") # pipe_has_step pipe_has_step(p, "add1") pipe_has_step(p, "foo") # pipe_get_step add1 <- pipe_get_step(p, "add1") add1 add1[["params"]] add1[["fun"]] try(p$get_step("foo")) # error: step 'foo' does not exist
Insert step
pipe_insert_after(pip, afterStep, step, ...) pipe_insert_before(pip, beforeStep, step, ...)
pipe_insert_after(pip, afterStep, step, ...) pipe_insert_before(pip, beforeStep, step, ...)
pip |
|
afterStep |
|
step |
|
... |
further arguments passed to |
beforeStep |
|
returns the Pipeline
object invisibly
pipe_insert_after
: insert step after a certain step of the pipeline
pipe_insert_before
: insert step before a certain step of the pipeline
# pipe_insert_after p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(x = ~f1) x) pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x) p # insert_before pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x) p
# pipe_insert_after p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(x = ~f1) x) pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x) p # insert_before pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x) p
Length of the pipeline
pipe_length(pip)
pipe_length(pip)
pip |
|
numeric
length of pipeline, that is, the total number of steps
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_length(p)
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_length(p)
Locking a step means that both its parameters and its
output (given it has output) are locked such that neither
setting new pipeline parameters nor future pipeline runs can change
the current parameter and output content. To unlock a locked step,
use pipe_unlock_step()
.
pipe_lock_step(pip, step) pipe_unlock_step(pip, step)
pipe_lock_step(pip, step) pipe_unlock_step(pip, step)
pip |
|
step |
|
the Pipeline
object invisibly
# pipe_lock_step p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = 1, data = ~data) x + data) pipe_add(p, "add2", \(x = 1, data = ~data) x + data) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") pipe_lock_step(p, "add1") pipe_set_data(p, 3) pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") # pipe_unlock_step pipe_unlock_step(p, "add1") pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1")
# pipe_lock_step p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = 1, data = ~data) x + data) pipe_add(p, "add2", \(x = 1, data = ~data) x + data) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") pipe_lock_step(p, "add1") pipe_set_data(p, 3) pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1") pipe_get_out(p, "add2") # pipe_unlock_step pipe_unlock_step(p, "add1") pipe_set_params(p, list(x = 3)) pipe_run(p) pipe_get_out(p, "add1")
A new pipeline is always initialized with one 'data' step, which basically is a function returning the data.
pipe_new(name, data = NULL, logger = NULL)
pipe_new(name, data = NULL, logger = NULL)
name |
the name of the Pipeline |
data |
optional data used at the start of the pipeline. The
data also can be set later using the |
logger |
custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
The Note that with the default logger, the log layout can be altered
any time via |
returns the Pipeline
object invisibly
data <- data.frame(x = 1:2, y = 3:4) p <- pipe_new("myPipe", data = data) p |> pipe_run() |> pipe_get_out("data") # Setting data later p <- pipe_new("myPipe") pipe_get_data(p) p <- pipe_set_data(p, data) pipe_get_data(p) p |> pipe_run() |> pipe_get_out("data") # Initialize with custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- pipe_new("myPipe", data = data, logger = my_logger) p |> pipe_run() |> pipe_get_out("data")
data <- data.frame(x = 1:2, y = 3:4) p <- pipe_new("myPipe", data = data) p |> pipe_run() |> pipe_get_out("data") # Setting data later p <- pipe_new("myPipe") pipe_get_data(p) p <- pipe_set_data(p, data) pipe_get_data(p) p |> pipe_run() |> pipe_get_out("data") # Initialize with custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- pipe_new("myPipe", data = data, logger = my_logger) p |> pipe_run() |> pipe_get_out("data")
Use this function to drop steps from the end of the pipeline.
pipe_pop_step(pip) pipe_pop_steps_after(pip, step) pipe_pop_steps_from(pip, step)
pipe_pop_step(pip) pipe_pop_steps_after(pip, step) pipe_pop_steps_from(pip, step)
pip |
|
step |
|
string
the name of the step that was removed
pipe_pop_step
: drop last step from the pipeline
pipe_pop_steps_after
: drop all steps after given steps
pipe_pop_steps_from
: drop all steps from and including given steps
# pipe_pop_step p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_pop_step(p) p # pipe_pop_steps_after pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_after(p, "f1") p # pipe_pop_steps_from pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_from(p, "f1") p
# pipe_pop_step p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) p pipe_pop_step(p) p # pipe_pop_steps_after pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_after(p, "f1") p # pipe_pop_steps_from pipe_add(p, "f2", \(y = 1) y) pipe_add(p, "f3", \(z = 1) z) p pipe_pop_steps_from(p, "f1") p
Print the pipeline as a table
pipe_print(pip, verbose = FALSE)
pipe_print(pip, verbose = FALSE)
pip |
|
verbose |
|
the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) pipe_print(p) pipe_print(p, verbose = TRUE) # Also works with standard print function print(p) print(p, verbose = TRUE)
p <- pipe_new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) pipe_print(p) pipe_print(p, verbose = TRUE) # Also works with standard print function print(p) print(p, verbose = TRUE)
Can be used to remove any given step.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive
was set to TRUE
.
pipe_remove_step(pip, step, recursive = FALSE)
pipe_remove_step(pip, step, recursive = FALSE)
pip |
|
step |
|
recursive |
|
the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y) p pipe_remove_step(p, "mult1") p try(pipe_remove_step(p, "add1")) pipe_remove_step(p, "add1", recursive = TRUE) p
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y) p pipe_remove_step(p, "mult1") p try(pipe_remove_step(p, "add1")) pipe_remove_step(p, "add1", recursive = TRUE) p
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
pipe_rename_step(pip, from, to)
pipe_rename_step(pip, from, to)
pip |
|
from |
|
to |
|
the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) p try(pipe_rename_step(p, from = "add1", to = "add2")) pipe_rename_step(p, from = "add1", to = "first_add") p
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "add1", \(data = ~data, x = 1) x + data) pipe_add(p, "add2", \(x = 1, y = ~add1) x + y) p try(pipe_rename_step(p, from = "add1", to = "add2")) pipe_rename_step(p, from = "add1", to = "first_add") p
Replaces an existing pipeline step.
pipe_replace_step( pip, step, fun, params = list(), description = "", group = step, keepOut = FALSE )
pipe_replace_step( pip, step, fun, params = list(), description = "", group = step, keepOut = FALSE )
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
keepOut |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = 1, y = 2) x * y, keepOut = TRUE) pipe_run(p) |> pipe_collect_out() pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) pipe_run(p) |> pipe_collect_out() try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not exist
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = 1, y = 2) x * y, keepOut = TRUE) pipe_run(p) |> pipe_collect_out() pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) pipe_run(p) |> pipe_collect_out() try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not exist
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
pipe_reset(pip)
pipe_reset(pip)
pip |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) pipe_run(p, ) p pipe_reset(p) p
p <- pipe_new("pipe", data = 1:2) pipe_add(p, "f1", \(x = 1) x) pipe_add(p, "f2", \(y = 1) y) pipe_run(p, ) p pipe_reset(p) p
Runs all new and/or outdated pipeline steps.
pipe_run( pip, force = FALSE, recursive = TRUE, cleanUnkept = FALSE, progress = NULL, showLog = TRUE )
pipe_run( pip, force = FALSE, recursive = TRUE, cleanUnkept = FALSE, progress = NULL, showLog = TRUE )
pip |
|
force |
|
recursive |
|
cleanUnkept |
|
progress |
|
showLog |
|
returns the Pipeline
object invisibly
# Simple pipeline p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p |> pipe_run() |> pipe_collect_out() pipe_set_params(p, list(z = 4)) # outdates steps add2 and final p p |> pipe_run() |> pipe_collect_out() pipe_run(p, cleanUnkept = TRUE) p # Recursive pipeline (for advanced users) p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "new_pipe", \(x = ~add1) { p2 <- pipe_new("new_pipe", data = x) pipe_add(p2, "add1", \(x = ~data) x + 1) pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE) } ) p |> pipe_run() |> pipe_collect_out() # Run pipeline with progress bar p <- pipe_new("pipe", data = 1) pipe_add(p, "first step", \() Sys.sleep(0.5)) pipe_add(p, "second step", \() Sys.sleep(0.5)) pipe_add(p, "last step", \() Sys.sleep(0.5)) pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } pipe_run(p, progress = fprogress, showLog = FALSE)
# Simple pipeline p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p |> pipe_run() |> pipe_collect_out() pipe_set_params(p, list(z = 4)) # outdates steps add2 and final p p |> pipe_run() |> pipe_collect_out() pipe_run(p, cleanUnkept = TRUE) p # Recursive pipeline (for advanced users) p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "new_pipe", \(x = ~add1) { p2 <- pipe_new("new_pipe", data = x) pipe_add(p2, "add1", \(x = ~data) x + 1) pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE) } ) p |> pipe_run() |> pipe_collect_out() # Run pipeline with progress bar p <- pipe_new("pipe", data = 1) pipe_add(p, "first step", \() Sys.sleep(0.5)) pipe_add(p, "second step", \() Sys.sleep(0.5)) pipe_add(p, "last step", \() Sys.sleep(0.5)) pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } pipe_run(p, progress = fprogress, showLog = FALSE)
Run given pipeline step possibly together with upstream and/or downstream dependencies.
pipe_run_step( pip, step, upstream = TRUE, downstream = FALSE, cleanUnkept = FALSE )
pipe_run_step( pip, step, upstream = TRUE, downstream = FALSE, cleanUnkept = FALSE )
pip |
|
step |
|
upstream |
|
downstream |
|
cleanUnkept |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run_step(p, "add2") pipe_run_step(p, "add2", downstream = TRUE) pipe_run_step(p, "mult", upstream = TRUE)
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y) pipe_add(p, "add2", \(x = ~add1, z = 2) x + z) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) pipe_run_step(p, "add2") pipe_run_step(p, "add2", downstream = TRUE) pipe_run_step(p, "mult", upstream = TRUE)
Set data in first step of pipeline.
pipe_set_data(pip, data)
pipe_set_data(pip, data)
pip |
|
data |
initial data set. |
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p |> pipe_run() |> pipe_collect_out() pipe_set_data(p, 3) p |> pipe_run() |> pipe_collect_out()
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p |> pipe_run() |> pipe_collect_out() pipe_set_data(p, 3) p |> pipe_run() |> pipe_collect_out()
This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
pipe_set_data_split( pip, dataList, toStep = character(), groupBySplit = TRUE, sep = "." )
pipe_set_data_split( pip, dataList, toStep = character(), groupBySplit = TRUE, sep = "." )
pip |
|
dataList |
|
toStep |
|
groupBySplit |
|
sep |
|
new combined Pipeline
with each sub-pipeline having set
one of the data sets.
# Split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipe_set_data_split(p, dataList) p p |> pipe_run() |> pipe_collect_out() |> str() # Don't group output by split p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipe_set_data_split(p, dataList, groupBySplit = FALSE) p p |> pipe_run() |> pipe_collect_out() |> str() # Split up to certain step p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y) pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE) p pipe_get_depends(p)[["average_result"]] pipe_set_data_split(p, dataList, toStep = "mult") p pipe_get_depends(p)[["average_result"]] p |> pipe_run() |> pipe_collect_out() |> str()
# Split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipe_set_data_split(p, dataList) p p |> pipe_run() |> pipe_collect_out() |> str() # Don't group output by split p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipe_set_data_split(p, dataList, groupBySplit = FALSE) p p |> pipe_run() |> pipe_collect_out() |> str() # Split up to certain step p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y) pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE) p pipe_get_depends(p)[["average_result"]] pipe_set_data_split(p, dataList, toStep = "mult") p pipe_get_depends(p)[["average_result"]] p |> pipe_run() |> pipe_collect_out() |> str()
Change the keepOut
flag at a given pipeline step,
which determines whether the output of that step is collected
when calling pipe_collect_out()
after the pipeline was run. See column
keepOut' when printing a pipeline to view the status.
pipe_set_keep_out(pip, step, keepOut = TRUE)
pipe_set_keep_out(pip, step, keepOut = TRUE)
pip |
|
step |
|
keepOut |
|
the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) p |> pipe_run() |> pipe_collect_out() pipe_set_keep_out(p, "add1", keepOut = FALSE) pipe_set_keep_out(p, "mult", keepOut = TRUE) p |> pipe_run() |> pipe_collect_out()
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) pipe_add(p, "add2", \(x = ~data, y = 2) x + y) pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y) p |> pipe_run() |> pipe_collect_out() pipe_set_keep_out(p, "add1", keepOut = FALSE) pipe_set_keep_out(p, "mult", keepOut = TRUE) p |> pipe_run() |> pipe_collect_out()
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
pipe_set_params(pip, params, warnUndefined = TRUE)
pipe_set_params(pip, params, warnUndefined = TRUE)
pip |
|
params |
|
warnUndefined |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2) x + y) pipe_add(p, "add2", \(x = ~data, y = 3) x + y) pipe_add(p, "mult", \(x = 4, z = 5) x * z) pipe_get_params(p) pipe_set_params(p, params = list(x = 3, y = 3)) pipe_get_params(p) pipe_set_params(p, params = list(x = 5, z = 3)) pipe_get_params(p) suppressWarnings( pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined ) pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2) x + y) pipe_add(p, "add2", \(x = ~data, y = 3) x + y) pipe_add(p, "mult", \(x = 4, z = 5) x * z) pipe_get_params(p) pipe_set_params(p, params = list(x = 3, y = 3)) pipe_get_params(p) pipe_set_params(p, params = list(x = 5, z = 3)) pipe_get_params(p) suppressWarnings( pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined ) pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps. If one or more parameters don't exist, an error is given.
pipe_set_params_at_step(pip, step, params)
pipe_set_params_at_step(pip, step, params)
pip |
|
step |
|
params |
|
returns the Pipeline
object invisibly
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y) pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6)) pipe_get_params(p) try( pipe_set_params_at_step(p, step = "add1", params = list(foo = 3)) )
p <- pipe_new("pipe", data = 1) pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y) pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6)) pipe_get_params(p) try( pipe_set_params_at_step(p, step = "add1", params = list(foo = 3)) )
Splits pipeline into its independent parts. This can be useful, for example, to split-up the pipeline in order to run each part in parallel.
pipe_split(pip)
pipe_split(pip)
pip |
|
list of Pipeline
objects
# Example for two independent calculation paths p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = ~data) x) pipe_add(p, "f2", \(x = 1) x) pipe_add(p, "f3", \(x = ~f1) x) pipe_add(p, "f4", \(x = ~f2) x) pipe_split(p) # Example of split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipes <- pipe_set_data_split(p, dataList) |> pipe_split() pipes
# Example for two independent calculation paths p <- pipe_new("pipe", data = 1) pipe_add(p, "f1", \(x = ~data) x) pipe_add(p, "f2", \(x = 1) x) pipe_add(p, "f3", \(x = ~f1) x) pipe_add(p, "f4", \(x = ~f2) x) pipe_split(p) # Example of split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- pipe_new("pipe") pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE) pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipes <- pipe_set_data_split(p, dataList) |> pipe_split() pipes
This class implements an analysis pipeline. A pipeline consists of a sequence of analysis steps, which can be added one by one. Each added step may or may not depend on one or more previous steps. The pipeline keeps track of the dependencies among these steps and will ensure that all dependencies are met on creation of the pipeline, that is, before the the pipeline is run. Once the pipeline is run, the output is stored in the pipeline along with each step and can be accessed later. Different pipelines can be bound together while preserving all dependencies within each pipeline.
name
string
name of the pipeline
pipeline
data.table
the pipeline where each row represents one step.
new()
constructor
Pipeline$new(name, data = NULL, logger = NULL)
name
the name of the Pipeline
data
optional data used at the start of the pipeline. The
data also can be set later using the set_data
function.
logger
custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
function(level, msg, ...) { your custom logging code here }
The level
argument is a string and will be one of info
, warn
,
or error
. The msg
argument is a string containing the message
to be logged. The ...
argument is a list of named parameters,
which can be used to add additional information to the log message.
Currently, this is only used to add the context in case of a step
giving a warning or error.
Note that with the default logger, the log layout can be altered
any time via set_log_layout()
.
returns the Pipeline
object invisibly
p <- Pipeline$new("myPipe", data = data.frame(x = 1:8)) p # Passing custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- Pipeline$new("myPipe", logger = my_logger)
add()
Add pipeline step
Pipeline$add( step, fun, params = list(), description = "", group = step, keepOut = FALSE )
step
string
the name of the step. Each step name must
be unique.
fun
function
or name of the function to be applied at
the step. Both existing and anonymous/lambda functions can be used.
All function parameters must have default values. If a parameter
is missing a default value in the function signature, alternatively,
it can be set via the params
argument (see Examples section with
mean()
function).
params
list
list of parameters to set or overwrite
parameters of the passed function.
description
string
optional description of the step
group
string
output collected after pipeline execution
(see function collect_out
) is grouped by the defined group
names. By default, this is the name of the step, which comes in
handy when the pipeline is copy-appended multiple times to keep
the results of the same function/step grouped at one place.
keepOut
logical
if FALSE
(default) the output of the
step is not collected when calling collect_out
after the pipeline
run. This option is used to only keep the results that matter
and skip intermediate results that are not needed. See also
function collect_out
for more details.
returns the Pipeline
object invisibly
# Add steps with lambda functions p <- Pipeline$new("myPipe", data = 1) p$add("s1", \(x = ~data) 2*x) # use input data p$add("s2", \(x = ~data, y = ~s1) x * y) try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4)) p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p$run()$get_out("calc_mean") # Step description p <- Pipeline$new("myPipe", data = 1:10) p$add("s1", \(x = ~data) 2*x, description = "multiply by 2") print(p) print(p, verbose = TRUE) # print all columns # Group output p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5)) p$add("prep_x", \(data = ~data) data$x, group = "prep") p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep") p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y) p$run()$collect_out(all = TRUE)
append()
Append another pipeline
When appending, pipeflow
takes care of potential name clashes with
respect to step names and dependencies, that is, if needed, it will
automatically adapt step names and dependencies to make sure they
are unique in the merged pipeline.
Pipeline$append(p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
p
Pipeline
object to be appended.
outAsIn
logical
if TRUE
, output of first pipeline is used
as input for the second pipeline.
tryAutofixNames
logical
if TRUE
, name clashes are tried
to be automatically resolved by appending the 2nd pipeline's name.
Only set to FALSE
, if you know what you are doing.
sep
string
separator used when auto-resolving step names
returns new combined Pipeline
.
# Append pipeline p1 <- Pipeline$new("pipe1") p1$add("step1", \(x = 1) x) p2 <- Pipeline$new("pipe2") p2$add("step2", \(y = 1) y) p1$append(p2) # Append pipeline with potential name clashes p3 <- Pipeline$new("pipe3") p3$add("step1", \(z = 1) z) p1$append(p2)$append(p3) # Use output of first pipeline as input for second pipeline p1 <- Pipeline$new("pipe1", data = 8) p2 <- Pipeline$new("pipe2") p1$add("square", \(x = ~data) x^2) p2$add("log2", \(x = ~data) log2(x)) p12 <- p1$append(p2, outAsIn = TRUE) p12$run()$get_out("log2") p12 # Custom name separator p1$append(p2, sep = "___")
append_to_step_names()
Appends string to all step names and takes care of updating step dependencies accordingly.
Pipeline$append_to_step_names(postfix, sep = ".")
postfix
string
to be appended to each step name.
sep
string
separator between step name and postfix.
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe") p$add("step1", \(x = 1) x) p$add("step2", \(y = 1) y) p$append_to_step_names("new") p p$append_to_step_names("foo", sep = "__") p
collect_out()
Collect output afer pipeline run, by default, from all
steps for which keepOut
was set to TRUE
. The output is grouped
by the group names (see group
parameter in function add
),
which by default are set identical to the step names.
Pipeline$collect_out(groupBy = "group", all = FALSE)
groupBy
string
column of pipeline by which to group the
output.
all
logical
if TRUE
all output is collected
regardless of the keepOut
flag. This can be useful for debugging.
list
containing the output, named after the groups, which,
by default, are the steps.
p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2) p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE) p$run() p$collect_out() p$collect_out(all = TRUE) |> str() # Grouped output p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2, group = "add") p$add("step2", \(x = ~step1, y = 2) x + y, group = "add") p$add("step3", \(x = ~data) x * 3, group = "mult") p$add("step4", \(x = ~data, y = 2) x * y, group = "mult") p p$run() p$collect_out(all = TRUE) |> str() # Grouped by state p$set_params(list(y = 5)) p p$collect_out(groupBy = "state", all = TRUE) |> str()
discard_steps()
Discard all steps that match a given pattern
.
Pipeline$discard_steps(pattern, recursive = FALSE, fixed = TRUE, ...)
pattern
string
containing a regular expression (or
character string for fixed = TRUE
) to be matched.
recursive
logical
if TRUE
the step is removed together
with all its downstream dependencies.
fixed
logical
If TRUE
, pattern
is a string to
be matched as is. Overrides all conflicting arguments.
...
further arguments passed to grep()
.
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~add1) x + 2) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p p$discard_steps("mult") p # Re-add steps p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(p$discard_steps("add1")) # ... unless we enforce to remove its downstream dependencies as well p$discard_steps("add1", recursive = TRUE) # this works p # Trying to discard non-existent steps is just ignored p$discard_steps("non-existent")
get_data()
Get data
Pipeline$get_data()
the output defined in the data
step, which by default is
the first step of the pipeline
p <- Pipeline$new("pipe", data = 1:2) p$get_data() p$set_data(3:4) p$get_data()
get_depends()
Get all dependencies defined in the pipeline
Pipeline$get_depends()
named list
of dependencies for each step
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$get_depends()
get_depends_down()
Get all downstream dependencies of given step, by default descending recursively.
Pipeline$get_depends_down(step, recursive = TRUE)
step
string
name of step
recursive
logical
if TRUE
, dependencies of dependencies
are also returned.
list
of downstream dependencies
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_down("add1") p$get_depends_down("add1", recursive = FALSE)
get_depends_up()
Get all upstream dependencies of given step, by default descending recursively.
Pipeline$get_depends_up(step, recursive = TRUE)
step
string
name of step
recursive
logical
if TRUE
, dependencies of dependencies
are also returned.
list
of upstream dependencies
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_up("mult4") p$get_depends_up("mult4", recursive = FALSE)
get_graph()
Visualize the pipeline as a graph.
Pipeline$get_graph(groups = NULL)
groups
character
if not NULL
, only steps belonging to the
given groups are considered.
two data frames, one for nodes and one for edges ready to be
used with the visNetwork
package.
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = p$get_graph()) }
get_out()
Get output of given step
Pipeline$get_out(step)
step
string
name of step
the output at the given step.
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$run() p$get_out("add1") p$get_out("add2")
get_params()
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying #' to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Pipeline$get_params(ignoreHidden = TRUE)
ignoreHidden
logical
if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list
of parameters, sorted and named by step. Steps with
no parameters are filtered out.
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params() |> str() p$get_params(ignoreHidden = FALSE) |> str()
get_params_at_step()
Get all unbound (i.e. not referring to other steps) at given step name.
Pipeline$get_params_at_step(step, ignoreHidden = TRUE)
step
string
name of step
ignoreHidden
logical
if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list
of parameters defined at given step.
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params_at_step("add2") p$get_params_at_step("add2", ignoreHidden = FALSE) p$get_params_at_step("add3")
get_params_unique()
Get all unbound (i.e. not referring to other steps)
parameters defined in the pipeline,
but only list each parameter once. The values of the parameters,
will be the values of the first step where the parameter was defined.
This is particularly useful after the parameters where set using
the set_params
function, which will set the same value
for all steps.
Pipeline$get_params_unique(ignoreHidden = TRUE)
ignoreHidden
logical
if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list
of unique parameters
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique() p$get_params_unique(ignoreHidden = FALSE)
get_params_unique_json()
Get all unique function parameters in json format.
Pipeline$get_params_unique_json(ignoreHidden = TRUE)
ignoreHidden
logical
if TRUE, hidden parameters (i.e. all
names starting with a dot) are ignored and thus not returned.
list
flat unnamed json list of unique function parameters
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique_json() p$get_params_unique_json(ignoreHidden = FALSE)
get_step()
Get step of pipeline
Pipeline$get_step(step)
step
string
name of step
data.table
row containing the step.
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z) p$run() add1 <- p$get_step("add1") print(add1) add1[["params"]] add1[["fun"]] try() try(p$get_step("foo")) # error: step 'foo' does not exist
get_step_names()
Get step names of pipeline
Pipeline$get_step_names()
character
vector of step names
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_names()
get_step_number()
Get step number
Pipeline$get_step_number(step)
step
string
name of step
the step number in the pipeline
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_number("f2")
has_step()
Check if pipeline has given step
Pipeline$has_step(step)
step
string
name of step
logical
whether step exists
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$has_step("f2") p$has_step("foo")
insert_after()
Insert step after a certain step
Pipeline$insert_after(afterStep, step, ...)
afterStep
string
name of step after which to insert
step
string
name of step to insert
...
further arguments passed to add
method of the pipeline
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_after("f1", "f3", \(x = ~f1) x) p
insert_before()
Insert step before a certain step
Pipeline$insert_before(beforeStep, step, ...)
beforeStep
string
name of step before which to insert
step
string
name of step to insert
...
further arguments passed to add
method of the pipeline
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_before("f2", "f3", \(x = ~f1) x) p
length()
Length of the pipeline aka number of pipeline steps.
Pipeline$length()
numeric
length of pipeline.
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$length()
lock_step()
Locking a step means that both its parameters and its output (given it has output) are locked such that neither setting new pipeline parameters nor future pipeline runs can change the current parameter and output content.
Pipeline$lock_step(step)
step
string
name of step
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$run() p$get_out("add1") p$get_out("add2") p$lock_step("add1") p$set_data(3) p$set_params(list(x = 3)) p$run() p$get_out("add1") p$get_out("add2")
pop_step()
Drop last step from the pipeline.
Pipeline$pop_step()
string
the name of the step that was removed
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p p$pop_step() # "f2" p
pop_steps_after()
Drop all steps after the given step.
Pipeline$pop_steps_after(step)
step
string
name of step
character
vector of steps that were removed.
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_after("f1") # "f2", "f3" p
pop_steps_from()
Drop all steps from and including the given step.
Pipeline$pop_steps_from(step)
step
string
name of step
character
vector of steps that were removed.
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_from("f2") # "f2", "f3" p
print()
Print the pipeline as a table.
Pipeline$print(verbose = FALSE)
verbose
logical
if TRUE
, print all columns of the
pipeline, otherwise only the most relevant columns are displayed.
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$print()
remove_step()
Remove certain step from the pipeline.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive
was set to
TRUE
.
Pipeline$remove_step(step, recursive = FALSE)
step
string
the name of the step to be removed.
recursive
logical
if TRUE
the step is removed together
with all its downstream dependencies.
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = 1, y = ~add2) x * y) p$remove_step("mult1") p try(p$remove_step("add1")) # fails because "add2" depends on "add1" p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2" p
rename_step()
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
Pipeline$rename_step(from, to)
from
string
the name of the step to be renamed.
to
string
the new name of the step.
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p try(p$rename_step("add1", "add2")) # fails because "add2" exists p$rename_step("add1", "first_add") # Ok p
replace_step()
Replaces an existing pipeline step.
Pipeline$replace_step( step, fun, params = list(), description = "", group = step, keepOut = FALSE )
step
string
the name of the step to be replaced. Step must
exist.
fun
string
or function
operation to be applied at the
step. Both existing and lambda/anonymous functions can be used.
params
list
list of parameters to overwrite default
parameters of existing functions.
description
string
optional description of the step
group
string
grouping information (by default the same as
the name of the step. Any output collected later (see function
collect_out
by default is put together by these group names. This,
for example, comes in handy when the pipeline is copy-appended
multiple times to keep the results of the same function/step at one
place.
keepOut
logical
if FALSE
the output of the function will
be cleaned at the end of the whole pipeline execution. This option
is used to only keep the results that matter.
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE) p$run()$collect_out() p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist
reset()
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
Pipeline$reset()
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() p p$reset() p
run()
Run all new and/or outdated pipeline steps.
Pipeline$run( force = FALSE, recursive = TRUE, cleanUnkept = FALSE, progress = NULL, showLog = TRUE )
force
logical
if TRUE
all steps are run regardless of
whether they are outdated or not.
recursive
logical
if TRUE
and a step returns a new
pipeline, the run of the current pipeline is aborted and the
new pipeline is run recursively.
cleanUnkept
logical
if TRUE
all output that was not
marked to be kept is removed after the pipeline run. This option
can be useful if temporary results require a lot of memory.
progress
function
this parameter can be used to provide a
custom progress function of the form function(value, detail)
,
which will show the progress of the pipeline run for each step,
where value
is the current step number and detail
is the name
of the step.
showLog
logical
should the steps be logged during the
pipeline run?
returns the Pipeline
object invisibly
# Simple pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() p$set_params(list(z = 4)) # outdates steps add2 and final p p$run()$collect_out() p$run(cleanUnkept = TRUE) # clean up temporary results p # Recursive pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("new_pipe", \(x = ~add1) { pp <- Pipeline$new("new_pipe", data = x) pp$add("add1", \(x = ~data) x + 1) pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE) } ) p$run(recursive = TRUE)$collect_out() # Run pipeline with progress bar p <- Pipeline$new("pipe", data = 1) p$add("first step", \() Sys.sleep(1)) p$add("second step", \() Sys.sleep(1)) p$add("last step", \() Sys.sleep(1)) pb <- txtProgressBar(min = 1, max = p$length(), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } p$run(progress = fprogress, showLog = FALSE)
run_step()
Run given pipeline step possibly together with upstream and downstream dependencies.
Pipeline$run_step( step, upstream = TRUE, downstream = FALSE, cleanUnkept = FALSE )
step
string
name of step
upstream
logical
if TRUE
, run all dependent upstream
steps first.
downstream
logical
if TRUE
, run all depdendent
downstream afterwards.
cleanUnkept
logical
if TRUE
all output that was not
marked to be kept is removed after the pipeline run. This option
can be useful if temporary results require a lot of memory.
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run_step("add2") p$run_step("add2", downstream = TRUE) p$run_step("mult", upstream = TRUE)
set_data()
Set data in first step of pipeline.
Pipeline$set_data(data)
data
initial data set
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$run()$collect_out() p$set_data(3) p$run()$collect_out()
set_data_split()
This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
Pipeline$set_data_split( dataList, toStep = character(), groupBySplit = TRUE, sep = "." )
dataList
list
of data sets
toStep
string
step name marking optional subset of
the pipeline, at which the data split should be applied to.
groupBySplit
logical
whether to set step groups according
to data split.
sep
string
separator to be used between step name and
data set name when creating the new step names.
new combined Pipeline
with each sub-pipeline having set
one of the data sets.
# Split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList) p p$run()$collect_out() |> str() # Don't group output by split p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList, groupBySplit = FALSE) p p$run()$collect_out() |> str() # Split up to certain step p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1) p$add("mult", \(x = ~data, y = ~add1) x * y) p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE) p p$get_depends()[["average_result"]] p$set_data_split(dataList, toStep = "mult") p p$get_depends()[["average_result"]] p$run()$collect_out()
set_keep_out()
Change the keepOut
flag at a given pipeline step,
which determines whether the output of that step is collected
when calling collect_out()
after the pipeline was run.
Pipeline$set_keep_out(step, keepOut = TRUE)
step
string
name of step
keepOut
logical
whether to keep output of step
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() p$set_keep_out("add1", keepOut = FALSE) p$set_keep_out("mult", keepOut = TRUE) p$collect_out()
set_params()
Set parameters in the pipeline. If a parameter occurs in several steps, the parameter is set commonly in all steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Pipeline$set_params(params, warnUndefined = TRUE)
params
list
of parameters to be set
warnUndefined
logical
whether to give a warning when trying
to set a parameter that is not defined in the pipeline.
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2) x + y) p$add("add2", \(x = ~data, y = 3) x + y) p$add("mult", \(x = 4, z = 5) x * z) p$get_params() p$set_params(list(x = 3, y = 3)) p$get_params() p$set_params(list(x = 5, z = 3)) p$get_params() suppressWarnings( p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined ) p$set_params(list(foo = 3), warnUndefined = FALSE)
set_params_at_step()
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps.
Pipeline$set_params_at_step(step, params)
step
string
the name of the step
params
list
of parameters to be set
returns the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2, z = 3) x + y) p$set_params_at_step("add1", list(y = 5, z = 6)) p$get_params() try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined
split()
Splits pipeline into its independent parts.
Pipeline$split()
list of Pipeline
objects
# Example for two independent calculation paths p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = ~data) x) p$add("f2", \(x = 1) x) p$add("f3", \(x = ~f1) x) p$add("f4", \(x = ~f2) x) p$split() # Example of split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipes <- p$set_data_split(dataList)$split() pipes
unlock_step()
Unlock previously locked step. If step was not locked, the command is ignored.
Pipeline$unlock_step(step)
step
string
name of step
the Pipeline
object invisibly
p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$lock_step("add1") p$set_params(list(x = 3)) p$get_params() p$unlock_step("add1") p$set_params(list(x = 3)) p$get_params()
clone()
The objects of this class are cloneable with this method.
Pipeline$clone(deep = FALSE)
deep
Whether to make a deep clone.
Roman Pahl
## ------------------------------------------------ ## Method `Pipeline$new` ## ------------------------------------------------ p <- Pipeline$new("myPipe", data = data.frame(x = 1:8)) p # Passing custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- Pipeline$new("myPipe", logger = my_logger) ## ------------------------------------------------ ## Method `Pipeline$add` ## ------------------------------------------------ # Add steps with lambda functions p <- Pipeline$new("myPipe", data = 1) p$add("s1", \(x = ~data) 2*x) # use input data p$add("s2", \(x = ~data, y = ~s1) x * y) try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4)) p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p$run()$get_out("calc_mean") # Step description p <- Pipeline$new("myPipe", data = 1:10) p$add("s1", \(x = ~data) 2*x, description = "multiply by 2") print(p) print(p, verbose = TRUE) # print all columns # Group output p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5)) p$add("prep_x", \(data = ~data) data$x, group = "prep") p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep") p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y) p$run()$collect_out(all = TRUE) ## ------------------------------------------------ ## Method `Pipeline$append` ## ------------------------------------------------ # Append pipeline p1 <- Pipeline$new("pipe1") p1$add("step1", \(x = 1) x) p2 <- Pipeline$new("pipe2") p2$add("step2", \(y = 1) y) p1$append(p2) # Append pipeline with potential name clashes p3 <- Pipeline$new("pipe3") p3$add("step1", \(z = 1) z) p1$append(p2)$append(p3) # Use output of first pipeline as input for second pipeline p1 <- Pipeline$new("pipe1", data = 8) p2 <- Pipeline$new("pipe2") p1$add("square", \(x = ~data) x^2) p2$add("log2", \(x = ~data) log2(x)) p12 <- p1$append(p2, outAsIn = TRUE) p12$run()$get_out("log2") p12 # Custom name separator p1$append(p2, sep = "___") ## ------------------------------------------------ ## Method `Pipeline$append_to_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe") p$add("step1", \(x = 1) x) p$add("step2", \(y = 1) y) p$append_to_step_names("new") p p$append_to_step_names("foo", sep = "__") p ## ------------------------------------------------ ## Method `Pipeline$collect_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2) p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE) p$run() p$collect_out() p$collect_out(all = TRUE) |> str() # Grouped output p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2, group = "add") p$add("step2", \(x = ~step1, y = 2) x + y, group = "add") p$add("step3", \(x = ~data) x * 3, group = "mult") p$add("step4", \(x = ~data, y = 2) x * y, group = "mult") p p$run() p$collect_out(all = TRUE) |> str() # Grouped by state p$set_params(list(y = 5)) p p$collect_out(groupBy = "state", all = TRUE) |> str() ## ------------------------------------------------ ## Method `Pipeline$discard_steps` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~add1) x + 2) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p p$discard_steps("mult") p # Re-add steps p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(p$discard_steps("add1")) # ... unless we enforce to remove its downstream dependencies as well p$discard_steps("add1", recursive = TRUE) # this works p # Trying to discard non-existent steps is just ignored p$discard_steps("non-existent") ## ------------------------------------------------ ## Method `Pipeline$get_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$get_data() p$set_data(3:4) p$get_data() ## ------------------------------------------------ ## Method `Pipeline$get_depends` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$get_depends() ## ------------------------------------------------ ## Method `Pipeline$get_depends_down` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_down("add1") p$get_depends_down("add1", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_depends_up` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_up("mult4") p$get_depends_up("mult4", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_graph` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = p$get_graph()) } ## ------------------------------------------------ ## Method `Pipeline$get_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$get_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params() |> str() p$get_params(ignoreHidden = FALSE) |> str() ## ------------------------------------------------ ## Method `Pipeline$get_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params_at_step("add2") p$get_params_at_step("add2", ignoreHidden = FALSE) p$get_params_at_step("add3") ## ------------------------------------------------ ## Method `Pipeline$get_params_unique` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique() p$get_params_unique(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_params_unique_json` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique_json() p$get_params_unique_json(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z) p$run() add1 <- p$get_step("add1") print(add1) add1[["params"]] add1[["fun"]] try() try(p$get_step("foo")) # error: step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$get_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_names() ## ------------------------------------------------ ## Method `Pipeline$get_step_number` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_number("f2") ## ------------------------------------------------ ## Method `Pipeline$has_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$has_step("f2") p$has_step("foo") ## ------------------------------------------------ ## Method `Pipeline$insert_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_after("f1", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$insert_before` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_before("f2", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$length` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$length() ## ------------------------------------------------ ## Method `Pipeline$lock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$run() p$get_out("add1") p$get_out("add2") p$lock_step("add1") p$set_data(3) p$set_params(list(x = 3)) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$pop_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p p$pop_step() # "f2" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_after("f1") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_from` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_from("f2") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$print` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$print() ## ------------------------------------------------ ## Method `Pipeline$remove_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = 1, y = ~add2) x * y) p$remove_step("mult1") p try(p$remove_step("add1")) # fails because "add2" depends on "add1" p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2" p ## ------------------------------------------------ ## Method `Pipeline$rename_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p try(p$rename_step("add1", "add2")) # fails because "add2" exists p$rename_step("add1", "first_add") # Ok p ## ------------------------------------------------ ## Method `Pipeline$replace_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE) p$run()$collect_out() p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$reset` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() p p$reset() p ## ------------------------------------------------ ## Method `Pipeline$run` ## ------------------------------------------------ # Simple pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() p$set_params(list(z = 4)) # outdates steps add2 and final p p$run()$collect_out() p$run(cleanUnkept = TRUE) # clean up temporary results p # Recursive pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("new_pipe", \(x = ~add1) { pp <- Pipeline$new("new_pipe", data = x) pp$add("add1", \(x = ~data) x + 1) pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE) } ) p$run(recursive = TRUE)$collect_out() # Run pipeline with progress bar p <- Pipeline$new("pipe", data = 1) p$add("first step", \() Sys.sleep(1)) p$add("second step", \() Sys.sleep(1)) p$add("last step", \() Sys.sleep(1)) pb <- txtProgressBar(min = 1, max = p$length(), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } p$run(progress = fprogress, showLog = FALSE) ## ------------------------------------------------ ## Method `Pipeline$run_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run_step("add2") p$run_step("add2", downstream = TRUE) p$run_step("mult", upstream = TRUE) ## ------------------------------------------------ ## Method `Pipeline$set_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$run()$collect_out() p$set_data(3) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_data_split` ## ------------------------------------------------ # Split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList) p p$run()$collect_out() |> str() # Don't group output by split p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList, groupBySplit = FALSE) p p$run()$collect_out() |> str() # Split up to certain step p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1) p$add("mult", \(x = ~data, y = ~add1) x * y) p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE) p p$get_depends()[["average_result"]] p$set_data_split(dataList, toStep = "mult") p p$get_depends()[["average_result"]] p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_keep_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() p$set_keep_out("add1", keepOut = FALSE) p$set_keep_out("mult", keepOut = TRUE) p$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2) x + y) p$add("add2", \(x = ~data, y = 3) x + y) p$add("mult", \(x = 4, z = 5) x * z) p$get_params() p$set_params(list(x = 3, y = 3)) p$get_params() p$set_params(list(x = 5, z = 3)) p$get_params() suppressWarnings( p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined ) p$set_params(list(foo = 3), warnUndefined = FALSE) ## ------------------------------------------------ ## Method `Pipeline$set_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2, z = 3) x + y) p$set_params_at_step("add1", list(y = 5, z = 6)) p$get_params() try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined ## ------------------------------------------------ ## Method `Pipeline$split` ## ------------------------------------------------ # Example for two independent calculation paths p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = ~data) x) p$add("f2", \(x = 1) x) p$add("f3", \(x = ~f1) x) p$add("f4", \(x = ~f2) x) p$split() # Example of split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipes <- p$set_data_split(dataList)$split() pipes ## ------------------------------------------------ ## Method `Pipeline$unlock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$lock_step("add1") p$set_params(list(x = 3)) p$get_params() p$unlock_step("add1") p$set_params(list(x = 3)) p$get_params()
## ------------------------------------------------ ## Method `Pipeline$new` ## ------------------------------------------------ p <- Pipeline$new("myPipe", data = data.frame(x = 1:8)) p # Passing custom logger my_logger <- function(level, msg, ...) { cat(level, msg, "\n") } p <- Pipeline$new("myPipe", logger = my_logger) ## ------------------------------------------------ ## Method `Pipeline$add` ## ------------------------------------------------ # Add steps with lambda functions p <- Pipeline$new("myPipe", data = 1) p$add("s1", \(x = ~data) 2*x) # use input data p$add("s2", \(x = ~data, y = ~s1) x * y) try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found p # Add step with existing function p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4)) p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE)) p$run()$get_out("calc_mean") # Step description p <- Pipeline$new("myPipe", data = 1:10) p$add("s1", \(x = ~data) 2*x, description = "multiply by 2") print(p) print(p, verbose = TRUE) # print all columns # Group output p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5)) p$add("prep_x", \(data = ~data) data$x, group = "prep") p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep") p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y) p$run()$collect_out(all = TRUE) ## ------------------------------------------------ ## Method `Pipeline$append` ## ------------------------------------------------ # Append pipeline p1 <- Pipeline$new("pipe1") p1$add("step1", \(x = 1) x) p2 <- Pipeline$new("pipe2") p2$add("step2", \(y = 1) y) p1$append(p2) # Append pipeline with potential name clashes p3 <- Pipeline$new("pipe3") p3$add("step1", \(z = 1) z) p1$append(p2)$append(p3) # Use output of first pipeline as input for second pipeline p1 <- Pipeline$new("pipe1", data = 8) p2 <- Pipeline$new("pipe2") p1$add("square", \(x = ~data) x^2) p2$add("log2", \(x = ~data) log2(x)) p12 <- p1$append(p2, outAsIn = TRUE) p12$run()$get_out("log2") p12 # Custom name separator p1$append(p2, sep = "___") ## ------------------------------------------------ ## Method `Pipeline$append_to_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe") p$add("step1", \(x = 1) x) p$add("step2", \(y = 1) y) p$append_to_step_names("new") p p$append_to_step_names("foo", sep = "__") p ## ------------------------------------------------ ## Method `Pipeline$collect_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2) p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE) p$run() p$collect_out() p$collect_out(all = TRUE) |> str() # Grouped output p <- Pipeline$new("pipe", data = 1:2) p$add("step1", \(x = ~data) x + 2, group = "add") p$add("step2", \(x = ~step1, y = 2) x + y, group = "add") p$add("step3", \(x = ~data) x * 3, group = "mult") p$add("step4", \(x = ~data, y = 2) x * y, group = "mult") p p$run() p$collect_out(all = TRUE) |> str() # Grouped by state p$set_params(list(y = 5)) p p$collect_out(groupBy = "state", all = TRUE) |> str() ## ------------------------------------------------ ## Method `Pipeline$discard_steps` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~add1) x + 2) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p p$discard_steps("mult") p # Re-add steps p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p # Discarding 'add1' does not work ... try(p$discard_steps("add1")) # ... unless we enforce to remove its downstream dependencies as well p$discard_steps("add1", recursive = TRUE) # this works p # Trying to discard non-existent steps is just ignored p$discard_steps("non-existent") ## ------------------------------------------------ ## Method `Pipeline$get_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$get_data() p$set_data(3:4) p$get_data() ## ------------------------------------------------ ## Method `Pipeline$get_depends` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$get_depends() ## ------------------------------------------------ ## Method `Pipeline$get_depends_down` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_down("add1") p$get_depends_down("add1", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_depends_up` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$add("mult3", \(x = ~add1) x * 3) p$add("mult4", \(x = ~add2) x * 4) p$get_depends_up("mult4") p$get_depends_up("mult4", recursive = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_graph` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = ~add1, y = ~add2) x * y) graph <- pipe_get_graph(p) graph if (require("visNetwork", quietly = TRUE)) { do.call(visNetwork, args = p$get_graph()) } ## ------------------------------------------------ ## Method `Pipeline$get_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(x = ~data) x + 1) p$add("add2", \(x = ~data, y = ~add1) x + y) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$get_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params() |> str() p$get_params(ignoreHidden = FALSE) |> str() ## ------------------------------------------------ ## Method `Pipeline$get_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("add3", \() 1 + 2) p$get_params_at_step("add2") p$get_params_at_step("add2", ignoreHidden = FALSE) p$get_params_at_step("add3") ## ------------------------------------------------ ## Method `Pipeline$get_params_unique` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique() p$get_params_unique(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_params_unique_json` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z) p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b) p$get_params_unique_json() p$get_params_unique_json(ignoreHidden = FALSE) ## ------------------------------------------------ ## Method `Pipeline$get_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z) p$run() add1 <- p$get_step("add1") print(add1) add1[["params"]] add1[["fun"]] try() try(p$get_step("foo")) # error: step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$get_step_names` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_names() ## ------------------------------------------------ ## Method `Pipeline$get_step_number` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$get_step_number("f2") ## ------------------------------------------------ ## Method `Pipeline$has_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$has_step("f2") p$has_step("foo") ## ------------------------------------------------ ## Method `Pipeline$insert_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_after("f1", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$insert_before` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = 1) x) p$add("f2", \(x = ~f1) x) p$insert_before("f2", "f3", \(x = ~f1) x) p ## ------------------------------------------------ ## Method `Pipeline$length` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$length() ## ------------------------------------------------ ## Method `Pipeline$lock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$run() p$get_out("add1") p$get_out("add2") p$lock_step("add1") p$set_data(3) p$set_params(list(x = 3)) p$run() p$get_out("add1") p$get_out("add2") ## ------------------------------------------------ ## Method `Pipeline$pop_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p p$pop_step() # "f2" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_after` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_after("f1") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$pop_steps_from` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$add("f3", \(z = 1) z) p$pop_steps_from("f2") # "f2", "f3" p ## ------------------------------------------------ ## Method `Pipeline$print` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$print() ## ------------------------------------------------ ## Method `Pipeline$remove_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p$add("mult1", \(x = 1, y = ~add2) x * y) p$remove_step("mult1") p try(p$remove_step("add1")) # fails because "add2" depends on "add1" p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2" p ## ------------------------------------------------ ## Method `Pipeline$rename_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$add("add2", \(x = 1, y = ~add1) x + y) p try(p$rename_step("add1", "add2")) # fails because "add2" exists p$rename_step("add1", "first_add") # Ok p ## ------------------------------------------------ ## Method `Pipeline$replace_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE) p$run()$collect_out() p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist ## ------------------------------------------------ ## Method `Pipeline$reset` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() p p$reset() p ## ------------------------------------------------ ## Method `Pipeline$run` ## ------------------------------------------------ # Simple pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE) p$run()$collect_out() p$set_params(list(z = 4)) # outdates steps add2 and final p p$run()$collect_out() p$run(cleanUnkept = TRUE) # clean up temporary results p # Recursive pipeline p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("new_pipe", \(x = ~add1) { pp <- Pipeline$new("new_pipe", data = x) pp$add("add1", \(x = ~data) x + 1) pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE) } ) p$run(recursive = TRUE)$collect_out() # Run pipeline with progress bar p <- Pipeline$new("pipe", data = 1) p$add("first step", \() Sys.sleep(1)) p$add("second step", \() Sys.sleep(1)) p$add("last step", \() Sys.sleep(1)) pb <- txtProgressBar(min = 1, max = p$length(), style = 3) fprogress <- function(value, detail) { setTxtProgressBar(pb, value) } p$run(progress = fprogress, showLog = FALSE) ## ------------------------------------------------ ## Method `Pipeline$run_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y) p$add("add2", \(x = ~add1, z = 2) x + z) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run_step("add2") p$run_step("add2", downstream = TRUE) p$run_step("mult", upstream = TRUE) ## ------------------------------------------------ ## Method `Pipeline$set_data` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$run()$collect_out() p$set_data(3) p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_data_split` ## ------------------------------------------------ # Split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList) p p$run()$collect_out() |> str() # Don't group output by split p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) p$set_data_split(dataList, groupBySplit = FALSE) p p$run()$collect_out() |> str() # Split up to certain step p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1) p$add("mult", \(x = ~data, y = ~add1) x * y) p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE) p p$get_depends()[["average_result"]] p$set_data_split(dataList, toStep = "mult") p p$get_depends()[["average_result"]] p$run()$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_keep_out` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE) p$add("add2", \(x = ~data, y = 2) x + y) p$add("mult", \(x = ~add1, y = ~add2) x * y) p$run()$collect_out() p$set_keep_out("add1", keepOut = FALSE) p$set_keep_out("mult", keepOut = TRUE) p$collect_out() ## ------------------------------------------------ ## Method `Pipeline$set_params` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2) x + y) p$add("add2", \(x = ~data, y = 3) x + y) p$add("mult", \(x = 4, z = 5) x * z) p$get_params() p$set_params(list(x = 3, y = 3)) p$get_params() p$set_params(list(x = 5, z = 3)) p$get_params() suppressWarnings( p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined ) p$set_params(list(foo = 3), warnUndefined = FALSE) ## ------------------------------------------------ ## Method `Pipeline$set_params_at_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = ~data, y = 2, z = 3) x + y) p$set_params_at_step("add1", list(y = 5, z = 6)) p$get_params() try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined ## ------------------------------------------------ ## Method `Pipeline$split` ## ------------------------------------------------ # Example for two independent calculation paths p <- Pipeline$new("pipe", data = 1) p$add("f1", \(x = ~data) x) p$add("f2", \(x = 1) x) p$add("f3", \(x = ~f1) x) p$add("f4", \(x = ~f2) x) p$split() # Example of split by three data sets dataList <- list(a = 1, b = 2, c = 3) p <- Pipeline$new("pipe") p$add("add1", \(x = ~data) x + 1, keepOut = TRUE) p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE) pipes <- p$set_data_split(dataList)$split() pipes ## ------------------------------------------------ ## Method `Pipeline$unlock_step` ## ------------------------------------------------ p <- Pipeline$new("pipe", data = 1) p$add("add1", \(x = 1, data = ~data) x + data) p$add("add2", \(x = 1, data = ~data) x + data) p$lock_step("add1") p$set_params(list(x = 3)) p$get_params() p$unlock_step("add1") p$set_params(list(x = 3)) p$get_params()
This function provides an easy way to set the basic log
layout of the pipeline logging. For a fine-grained control of the logger,
which you can retrieve via lgr::get_logger("pipeflow")
, see e.g. the
logger_config function from the lgr package.
set_log_layout(layout = c("text", "json"))
set_log_layout(layout = c("text", "json"))
layout |
Layout name, which at this point can be either 'text' or 'json'. |
invisibly returns a Logger
object
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()
p <- Pipeline$new("pipe", data = 1:2) p$add("add1", \(data = ~data, x = 1) x + data) p$run() lg <- set_log_layout("json") print(lg) p$run() set_log_layout("text") p$run()