Title: | Orchestration of Data Pipelines |
---|---|
Description: | Framework for creating and orchestrating data pipelines. Organize, orchestrate, and monitor multiple pipelines in a single project. Use tags to decorate functions with scheduling parameters and configuration. |
Authors: | Will Hipson [cre, aut, cph] , Ryan Garnett [aut, ctb, cph] |
Maintainer: | Will Hipson <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.4.1 |
Built: | 2024-11-25 15:23:22 UTC |
Source: | CRAN |
Builds a schedule data.frame for scheduling pipelines in run_schedule()
.
build_schedule(pipeline_dir = "./pipelines", quiet = FALSE)
build_schedule(pipeline_dir = "./pipelines", quiet = FALSE)
pipeline_dir |
path to directory containing the pipeline scripts |
quiet |
silence metrics to the console (default = |
This function parses the maestro tags of functions located in pipeline_dir
which is
conventionally called 'pipelines'. An orchestrator requires a schedule table
to determine which pipelines are to run and when. Each row in a schedule table
is a pipeline name and its scheduling parameters such as its frequency.
The schedule table is mostly intended to be used by run_schedule()
immediately.
In other words, it is not recommended to make changes to it.
MaestroSchedule
# Creating a temporary directory for demo purposes! In practice, just # create a 'pipelines' directory at the project level. if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) build_schedule(pipeline_dir = pipeline_dir) }
# Creating a temporary directory for demo purposes! In practice, just # create a 'pipelines' directory at the project level. if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) build_schedule(pipeline_dir = pipeline_dir) }
Creates a new maestro project
create_maestro(path, type = "R", overwrite = FALSE, quiet = FALSE, ...)
create_maestro(path, type = "R", overwrite = FALSE, quiet = FALSE, ...)
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
quiet |
whether to silence messages in the console (default = |
... |
unused |
invisible
# Creates a new maestro project with an R orchestrator if (interactive()) { new_proj_dir <- tempdir() create_maestro(new_proj_dir, type = "R", overwrite = TRUE) create_maestro(new_proj_dir, type = "Quarto", overwrite = TRUE) }
# Creates a new maestro project with an R orchestrator if (interactive()) { new_proj_dir <- tempdir() create_maestro(new_proj_dir, type = "R", overwrite = TRUE) create_maestro(new_proj_dir, type = "Quarto", overwrite = TRUE) }
Create a new orchestrator
create_orchestrator( path, type = c("R", "Quarto", "RMarkdown"), open = interactive(), quiet = FALSE, overwrite = FALSE )
create_orchestrator( path, type = c("R", "Quarto", "RMarkdown"), open = interactive(), quiet = FALSE, overwrite = FALSE )
path |
file path for the orchestrator script |
type |
file type for the orchestrator (supports R, Quarto, and RMarkdown) |
open |
whether or not to open the script upon creation |
quiet |
whether to silence messages in the console (default = |
overwrite |
whether to overwrite an existing orchestrator or maestro project |
invisible
Allows the creation of new pipelines (R scripts) and fills in the maestro tags as specified.
create_pipeline( pipe_name, pipeline_dir = "pipelines", frequency = "1 day", start_time = Sys.Date(), tz = "UTC", log_level = "INFO", quiet = FALSE, open = interactive(), overwrite = FALSE, skip = FALSE, inputs = NULL, outputs = NULL )
create_pipeline( pipe_name, pipeline_dir = "pipelines", frequency = "1 day", start_time = Sys.Date(), tz = "UTC", log_level = "INFO", quiet = FALSE, open = interactive(), overwrite = FALSE, skip = FALSE, inputs = NULL, outputs = NULL )
pipe_name |
name of the pipeline and function |
pipeline_dir |
directory containing the pipeline scripts |
frequency |
how often the pipeline should run (e.g., 1 day, daily, 3 hours, 4 months). Fills in maestroFrequency tag |
start_time |
start time of the pipeline schedule. Fills in maestroStartTime tag |
tz |
timezone that pipeline will be scheduled in. Fills in maestroTz tag |
log_level |
log level for the pipeline (e.g., INFO, WARN, ERROR). Fills in maestroLogLevel tag |
quiet |
whether to silence messages in the console (default = |
open |
whether or not to open the script upon creation |
overwrite |
whether or not to overwrite an existing pipeline of the same name and location. |
skip |
whether to skip the pipeline when running in the orchestrator (default = |
inputs |
vector of names of pipelines that input into this pipeline (default = |
outputs |
vector of names of pipelines that receive output from this pipeline (default = |
invisible
if (interactive()) { pipeline_dir <- tempdir() create_pipeline( "extract_data", pipeline_dir = pipeline_dir, frequency = "1 hour", open = FALSE, quiet = TRUE, overwrite = TRUE ) create_pipeline( "new_job", pipeline_dir = pipeline_dir, frequency = "20 minutes", start_time = as.POSIXct("2024-06-21 12:20:00"), log_level = "ERROR", open = FALSE, quiet = TRUE, overwrite = TRUE ) }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline( "extract_data", pipeline_dir = pipeline_dir, frequency = "1 hour", open = FALSE, quiet = TRUE, overwrite = TRUE ) create_pipeline( "new_job", pipeline_dir = pipeline_dir, frequency = "20 minutes", start_time = as.POSIXct("2024-06-21 12:20:00"), log_level = "ERROR", open = FALSE, quiet = TRUE, overwrite = TRUE ) }
Artifacts are return values from pipelines. They are accessible as a named list where the names correspond to the names of the pipeline.
get_artifacts(schedule)
get_artifacts(schedule)
schedule |
object of type MaestroSchedule created using |
named list
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_artifacts(schedule) # Alternatively, use the underlying R6 method schedule$get_artifacts() }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_artifacts(schedule) # Alternatively, use the underlying R6 method schedule$get_artifacts() }
A schedule is represented as a table where each row is a pipeline and the columns contain scheduling parameters such as the frequency and start time.
get_schedule(schedule)
get_schedule(schedule)
schedule |
object of type MaestroSchedule created using |
The schedule table is used internally in a MaestroSchedule object but can be accessed using this function or accessing the R6 method of the MaestroSchedule object.
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_schedule(schedule) # Alternatively, use the underlying R6 method schedule$get_schedule() }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) get_schedule(schedule) # Alternatively, use the underlying R6 method schedule$get_schedule() }
A status data.frame contains the names and locations of the pipelines as well as information around whether they were invoked, the status (error, warning, etc.), and the run time.
get_status(schedule)
get_status(schedule)
schedule |
object of type MaestroSchedule created using |
data.frame
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_status(schedule) # Alternatively, use the underlying R6 method schedule$get_status() }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) get_status(schedule) # Alternatively, use the underlying R6 method schedule$get_status() }
Instantly run a single pipeline from the schedule. This is useful for testing purposes or if you want to just run something one-off.
invoke(schedule, pipe_name, resources = list(), ...)
invoke(schedule, pipe_name, resources = list(), ...)
schedule |
object of type MaestroSchedule created using |
pipe_name |
name of a single pipe name from the schedule |
resources |
named list of shared resources made available to pipelines as needed |
... |
other arguments passed to |
Scheduling parameters such as the frequency, start time, and specifiers are ignored.
The pipeline will be run even if maestroSkip
is present.
invisible
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) invoke(schedule, "my_new_pipeline") }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) invoke(schedule, "my_new_pipeline") }
Gets the latest schedule build errors following use of build_schedule()
. If
the build succeeded or build_schedule()
has not been run it will be NULL
.
last_build_errors()
last_build_errors()
error messages
last_build_errors()
last_build_errors()
Gets the latest pipeline errors following use of run_schedule()
. If
the all runs succeeded or run_schedule()
has not been run it will be NULL
.
last_run_errors()
last_run_errors()
error messages
last_run_errors()
last_run_errors()
Gets the latest pipeline messages following use of run_schedule()
. If
there are no messages or run_schedule()
has not been run it will be NULL
.
last_run_messages()
last_run_messages()
Note that setting maestroLogLevel
to something greater than INFO
will
ignore messages.
messages
last_run_messages()
last_run_messages()
Gets the latest pipeline warnings following use of run_schedule()
. If
there are no warnings or run_schedule()
has not been run it will be NULL
.
last_run_warnings()
last_run_warnings()
Note that setting maestroLogLevel
to something greater than WARN
will
ignore warnings.
warning messages
last_run_warnings()
last_run_warnings()
maestro tags are roxygen2 comments for configuring the scheduling and execution of pipelines.
maestro tags follow the format #' @maestroTagName
tagName | description | value | examples (comma sep.) | default |
maestroFrequency | Time unit for scheduling | string | 1 hour, daily, 3 days, 5 weeks | 1 day |
maestroLogLevel | Threshold for logging when logging is requested | string | INFO, WARN, ERROR | INFO |
maestroSkip | Skips the pipeline when running (presence of tag indicates to skip) | n/a | ||
maestroStartTime | Start time of the pipeline; sets the point in time for recurrence | date or timestamp | 1970-01-01 00:00:00, 2024-03-28 | 1970-01-01 00:00:00 |
maestroTz | Timezone of the start time | string | UTC, America/Halifax | UTC |
maestroHours | Hours of day to run pipeline | ints | 0 12 23 | |
maestroDays | Days of week or days of month to run pipeline | ints or strings | 1 14 30, Mon Wed Sat | |
maestroMonths | Months of year to run pipeline | ints | 1 3 9 12 | |
maestroInputs | Pipelines that input into this pipeline | strings | my_upstream_pipeline | |
maestroOutputs | Pipelines that take the output from this pipeline | strings | my_downstream_pipeline | |
maestro | Generic tag for identifying a maestro pipeline with all defaults | n/a. | ||
Class for a schedule of pipelines
Class for a schedule of pipelines
PipelineList
object of type MaestroPipelineList
new()
Create a MaestroSchedule object
MaestroSchedule$new(Pipelines = NULL)
Pipelines
list of MaestroPipelines
MaestroSchedule
print()
Print the schedule object
MaestroSchedule$print()
run()
Run a MaestroSchedule
MaestroSchedule$run(..., quiet = FALSE, run_all = FALSE, n_show_next = 5)
...
arguments passed to MaestroPipelineList$run
quiet
whether or not to silence console messages
run_all
run all pipelines regardless of the schedule (default is FALSE
) - useful for testing.
n_show_next
show the next n scheduled pipes
invisible
get_schedule()
Get the schedule as a data.frame
MaestroSchedule$get_schedule()
data.frame
get_status()
Get status of the pipelines as a data.frame
MaestroSchedule$get_status()
data.frame
get_artifacts()
Get artifacts (return values) from the pipelines
MaestroSchedule$get_artifacts()
list
get_network()
Get the network structure of the pipelines as an edge list (will be empty if there are no DAG pipelines)
MaestroSchedule$get_network()
data.frame
show_network()
Visualize the DAG relationships between pipelines in the schedule
MaestroSchedule$show_network()
interactive visualization
clone()
The objects of this class are cloneable with this method.
MaestroSchedule$clone(deep = FALSE)
deep
Whether to make a deep clone.
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) }
Given a schedule in a maestro
project, runs the pipelines that are scheduled to execute
based on the current time.
run_schedule( schedule, orch_frequency = "1 day", check_datetime = lubridate::now(tzone = "UTC"), resources = list(), run_all = FALSE, n_show_next = 5, cores = 1, logging = FALSE, log_file = NULL, log_file_max_bytes = 1e+06, quiet = FALSE )
run_schedule( schedule, orch_frequency = "1 day", check_datetime = lubridate::now(tzone = "UTC"), resources = list(), run_all = FALSE, n_show_next = 5, cores = 1, logging = FALSE, log_file = NULL, log_file_max_bytes = 1e+06, quiet = FALSE )
schedule |
object of type MaestroSchedule created using |
orch_frequency |
of the orchestrator, a single string formatted like "1 day", "2 weeks", "hourly", etc. |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
resources |
named list of shared resources made available to pipelines as needed |
run_all |
run all pipelines regardless of the schedule (default is |
n_show_next |
show the next n scheduled pipes |
cores |
number of cpu cores to run if running in parallel. If > 1, |
logging |
whether or not to write the logs to a file (default = |
log_file |
path to the log file (ignored if |
log_file_max_bytes |
numeric specifying the maximum number of bytes allowed in the log file before purging the log (within a margin of error) |
quiet |
silence metrics to the console (default = |
The function run_schedule()
examines each pipeline in the schedule table and determines
whether it is scheduled to run at the current time using some simple time arithmetic. We assume
run_schedule(schedule, check_datetime = Sys.time())
, but this need not be the case.
run_schedule()
returns the same MaestroSchedule object with modified attributes. Use get_status()
to examine the status of each pipeline and use get_artifacts()
to get any return values from the
pipelines as a list.
If a pipeline takes an argument that doesn't include a default value, these can be supplied
in the orchestrator via run_schedule(resources = list(arg1 = val))
. The name of the argument
used by the pipeline must match the name of the argument in the list. Currently, each named
resource must refer to a single object. In other words, you can't have two pipes using
the same argument but requiring different values.
Pipelines can be run in parallel using the cores
argument. First, you must run future::plan(future::multisession)
in the orchestrator. Then, supply the desired number of cores to the cores
argument. Note that
console output appears different in multicore mode.
MaestroSchedule object
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) # Runs the schedule every 1 day run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) # Runs the schedule every 15 minutes run_schedule( schedule, orch_frequency = "15 minutes", quiet = TRUE ) }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) # Runs the schedule every 1 day run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) # Runs the schedule every 15 minutes run_schedule( schedule, orch_frequency = "15 minutes", quiet = TRUE ) }
Create an interactive network visualization to show the dependency structure of pipelines in the schedule. This is only useful if there are pipelines in the schedule that take inputs/outputs from other pipelines.
show_network(schedule)
show_network(schedule)
schedule |
object of type MaestroSchedule created using |
Note that running this function on a schedule with all independent pipelines will produce a network visual with no connections.
This function requires the installation of DiagrammeR
which is not automatically
installed with maestro
.
DiagrammeR visualization
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) show_network(schedule) }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) schedule <- run_schedule( schedule, orch_frequency = "1 day", quiet = TRUE ) show_network(schedule) }
Suggests a frequency to run the orchestrator based on the frequencies of the pipelines in a schedule.
suggest_orch_frequency( schedule, check_datetime = lubridate::now(tzone = "UTC") )
suggest_orch_frequency( schedule, check_datetime = lubridate::now(tzone = "UTC") )
schedule |
MaestroSchedule object created by |
check_datetime |
datetime against which to check the running of pipelines (default is current system time in UTC) |
This function attempts to find the smallest interval of time between all pipelines. If the smallest interval is less than 15 minutes, it just uses the smallest interval.
Note this function is intended to be used interactively when deciding how often to schedule the orchestrator. Programmatic use is not recommended.
frequency string
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) suggest_orch_frequency(schedule) }
if (interactive()) { pipeline_dir <- tempdir() create_pipeline("my_new_pipeline", pipeline_dir, open = FALSE) schedule <- build_schedule(pipeline_dir = pipeline_dir) suggest_orch_frequency(schedule) }