--- title: "Advanced DAGs - Dynamic Fan-out and Collect" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Advanced DAGs - Dynamic Fan-out and Collect} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r echo=FALSE} knitr::opts_chunk$set( collapse = FALSE, comment = "", out.width = "100%", cache = FALSE, asciicast_knitr_output = "html" ) asciicast::init_knitr_engine( echo = TRUE, echo_input = FALSE, same_process = TRUE, startup = quote({ library(maestro) set.seed(1) }) ) options(asciicast_theme = "pkgdown") ``` More complex types of DAGs involve dynamically spawning new pipelines (dynamic fan-out) based on a list or vector and/or collecting inputs from multiple pipelines into a single pipeline. ## Dynamic Fan-out Sometimes an upstream pipeline returns a collection of values and you want to run a downstream pipeline **once per element** — a pattern called fan-out or scatter. Add `@maestroMap` to the downstream pipeline to enable this. An empty `@maestroMap` tag iterates over each element of the upstream return value directly. ``` get_letters |-shout[1] |-shout[2] |-shout[3] ``` ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(dir.create("pipelines")) writeLines( " #' @maestroFrequency daily get_letters <- function() { c('a', 'b', 'c') } #' @maestroInputs get_letters #' @maestroMap shout <- function(.input) { toupper(.input) }", con = "pipelines/fanout.R" ) ``` ```{r eval=FALSE} #' @maestroFrequency daily get_letters <- function() { c("a", "b", "c") } #' @maestroInputs get_letters #' @maestroMap shout <- function(.input) { toupper(.input) } ``` `shout` will execute three times — once for `"a"`, once for `"b"`, once for `"c"` — and the CLI output labels each branch with its iteration index in square brackets. ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) run_schedule(schedule, run_all = TRUE) get_artifacts(schedule) ``` Note there is no `@maestroOutputs` equivalent for defining dynamic fan-out. Here, you must use `@maestroInputs` combined with `@maestroMap`. ### Iterating over a field of a list When the upstream pipeline returns a **named list**, use `@maestroMap` to select which field to scatter over. The full list remains available as `.input` inside each branch, so other fields are still accessible. ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(file.remove("pipelines/fanout.R")) writeLines( " #' @maestroFrequency daily get_letters <- function() { list( letter = letters[1:3], greeting = 'hello' ) } #' @maestroInputs get_letters #' @maestroMap .input$letter make_message <- function(.input) { paste(.input$greeting, toupper(.input$letter)) }", con = "pipelines/fanout.R" ) ``` ```{r eval=FALSE} #' @maestroFrequency daily get_letters <- function() { list( letter = letters[1:3], greeting = "hello" ) } #' @maestroInputs get_letters #' @maestroMap .input$letter make_message <- function(.input) { paste(.input$greeting, toupper(.input$letter)) } ``` `make_message` runs once per element of `letter`, producing `"hello A"`, `"hello B"`, `"hello C"`. The `greeting` field is available in every branch because the full list is passed as `.input` each time. ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) run_schedule(schedule, run_all = TRUE) get_artifacts(schedule) ``` If the field name in `@maestroMap` does not exist in the upstream return value, maestro records an informative error on the downstream pipeline rather than silently producing zero branches. ### Iterating over multiple fields simultaneously You can supply multiple space-separated expressions to `@maestroMap` to zip across several fields at once — similar to `purrr::pmap()`. Each iteration receives `.input` with all specified fields replaced by their i-th element. ```{r eval=FALSE} #' @maestroFrequency daily get_data <- function() { list( letter = letters[1:3], greeting = c("hello", "cheers", "hi") ) } #' @maestroInputs get_data #' @maestroMap .input$letter .input$greeting make_message <- function(.input) { paste(.input$greeting, toupper(.input$letter)) } ``` This produces `"hello A"`, `"cheers B"`, `"hi C"` — each branch receives a distinct `(letter, greeting)` pair. All vectors must be the same length, or length 1 (in which case the scalar is recycled across all iterations). Mismatched lengths produce a pipeline error. ## Fan-in (Collect) Fan-in is the complement of fan-out: multiple upstream pipelines are gathered into a single downstream pipeline. Wrap one or more upstream names with `collect()` in `@maestroInputs` to enable this. ``` letter_a ─┐ |-+combine letter_b ─┘ ``` The downstream pipeline receives a named list as `.input`, where each name corresponds to an upstream pipeline and each value is that pipeline's return value. ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(file.remove("pipelines/fanout.R")) writeLines( " #' @maestroFrequency daily letter_a <- function() 'a' #' @maestroFrequency daily letter_b <- function() 'b' #' @maestroInputs collect(letter_a, letter_b) combine <- function(.input) { paste0(.input$letter_a, .input$letter_b) }", con = "pipelines/fanin.R" ) ``` ```{r eval=FALSE} #' @maestroFrequency daily letter_a <- function() "a" #' @maestroFrequency daily letter_b <- function() "b" #' @maestroInputs collect(letter_a, letter_b) combine <- function(.input) { paste0(.input$letter_a, .input$letter_b) } ``` `combine` fires only after both `letter_a` and `letter_b` have succeeded. Inside `combine`, `.input$letter_a` is `"a"` and `.input$letter_b` is `"b"`. Collect pipelines are shown with a `|-+` prefix in the CLI to distinguish them from regular downstream pipelines. ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) run_schedule(schedule, run_all = TRUE) get_status(schedule)[, c("pipe_name", "invoked", "success", "input_run_id", "lineage")] ``` If any upstream pipeline fails, the collect pipeline will not fire. The failed pipeline's run ID is also excluded from `input_run_id` in `get_status()`. ### Fan-out into Fan-in `@maestroMap` and `collect()` compose naturally. An upstream pipeline can fan out with `@maestroMap`, and a downstream pipeline can gather all successful iterations back together with `collect()`. Note that in the dynamic fan-out to fan-in case, the downstream pipeline will run if at least one upstream iteration has succeeded. ``` numbers |-multiply[1] ─┐ |-multiply[2] ──|-+add |-multiply[3] ─┘ ``` ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(file.remove("pipelines/fanin.R")) writeLines( " #' @maestroFrequency daily numbers <- function() 1:3 #' @maestroInputs numbers #' @maestroMap multiply <- function(.input) .input * 3 #' @maestroInputs collect(multiply) add <- function(.input) { sum(unlist(.input)) }", con = "pipelines/fanin.R" ) ``` ```{r eval=FALSE} #' @maestroFrequency daily numbers <- function() 1:3 #' @maestroInputs numbers #' @maestroMap multiply <- function(.input) .input * 3 #' @maestroInputs collect(multiply) add <- function(.input) { sum(unlist(.input)) } ``` Here `multiply` executes three times (once per element of `1:3`), then `add` collects all three results and sums them. The `.input` received by `add` is a list of the successful iteration return values. ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) run_schedule(schedule, run_all = TRUE) get_artifacts(schedule)$add ``` ```{r cleanup2, echo=FALSE, message=FALSE, warning=FALSE} invisible(unlink("pipelines", recursive = TRUE)) ```