Title: | Automatically Retrieve Multidimensional Distributed Data Sets |
---|---|
Description: | Tool to automatically fetch, transform and arrange subsets of multi- dimensional data sets (collections of files) stored in local and/or remote file systems or servers, using multicore capabilities where possible. The tool provides an interface to perceive a collection of data sets as a single large multidimensional data array, and enables the user to request for automatic retrieval, processing and arrangement of subsets of the large array. Wrapper functions to add support for custom file formats can be plugged in/out, making the tool suitable for any research field where large multidimensional data sets are involved. |
Authors: | Nicolau Manubens [aut], An-Chi Ho [aut] , Nuria Perez-Zanon [aut] , Eva Rifa [ctb], Victoria Agudetse [cre, ctb], Bruno de Paula Kinoshita [ctb], Javier Vegas [ctb], Pierre-Antoine Bretonniere [ctb], Roberto Serrano [ctb], BSC-CNS [aut, cph] |
Maintainer: | Victoria Agudetse <[email protected]> |
License: | GPL-3 |
Version: | 2.4.0 |
Built: | 2024-12-01 08:51:31 UTC |
Source: | CRAN |
The step that combines the previous declared data and operation together to create the complete workflow. It is the final step before data processing.
AddStep(inputs, step_fun, ...)
AddStep(inputs, step_fun, ...)
inputs |
One or a list of objects of the class 'startR_cube' returned by Start(), indicating the data to be processed. |
step_fun |
A startR step function as returned by Step(). |
... |
Additional parameters for the inputs of function defined in 'step_fun' by Step(). |
A list of the class 'startR_workflow' containing all the objects needed for the data operation.
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) pi_short <- 3.14 fun <- function(x, pi_val) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi_val / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step, pi_val = pi_short)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) pi_short <- 3.14 fun <- function(x, pi_val) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi_val / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step, pi_val = pi_short)
This is a transform function that uses CDO software to remap longitude-latitude
data subsets onto a specified target grid, intended for use as parameter
'transform' in a Start() call. This function complies with the input/output
interface required by Start() defined in the documentation for the parameter
'transform' of function Start().
This function uses the function CDORemap() in the package 's2dv' to
perform the interpolation, hence CDO is required to be installed.
CDORemapper( data_array, variables, file_selectors = NULL, crop_domain = NULL, ... )
CDORemapper( data_array, variables, file_selectors = NULL, crop_domain = NULL, ... )
data_array |
A data array to be transformed. See details in the documentation of the parameter 'transform' of the function Start(). |
variables |
A list of auxiliary variables required for the transformation, automatically provided by Start(). See details in the documentation of the parameter 'transform' of the function Start(). |
file_selectors |
A charcter vector indicating the information of the path of the file parameter 'data_array' comes from. See details in the documentation of the parameter 'transform' of the function Start(). The default value is NULL. |
crop_domain |
A list of the transformed domain of each transform variable, automatically provided by Start(). |
... |
A list of additional parameters to adjust the transform process, as provided in the parameter 'transform_params' in a Start() call. See details in the documentation of the parameter 'transform' of the function Start(). |
An array with the same amount of dimensions as the input data array, potentially with different sizes, and potentially with the attribute 'variables' with additional auxiliary data. See details in the documentation of the parameter 'transform' of the function Start().
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011') ## Not run: data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(list(-60, 60)), latitude_reorder = Sort(decreasing = TRUE), longitude = values(list(-120, 120)), longitude_reorder = CircularSort(-180, 180), transform = CDORemapper, transform_params = list(grid = 'r360x181', method = 'conservative'), transform_vars = c('latitude', 'longitude'), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) ## End(Not run)
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011') ## Not run: data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(list(-60, 60)), latitude_reorder = Sort(decreasing = TRUE), longitude = values(list(-120, 120)), longitude_reorder = CircularSort(-180, 180), transform = CDORemapper, transform_params = list(grid = 'r360x181', method = 'conservative'), transform_vars = c('latitude', 'longitude'), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) ## End(Not run)
The final step of the startR workflow after the data operation. It is used when the parameter 'wait' of Compute() is FALSE. It combines all the chunks of the results as one data array when the execution is done. See more details on practical guide. Collect() calls Collect_ecflow() or Collect_autosubmit() according to the chosen workflow manager.
Collect(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE)
Collect(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE)
startr_exec |
An R object returned by Compute() when the parameter 'wait' of Compute() is FALSE. It can be directly from a Compute() call or read from the RDS file. |
wait |
A logical value deciding whether the R session waits for the Collect() call to finish (TRUE) or not (FALSE). If TRUE, it will be a blocking call, in which Collect() will retrieve information from the HPC, including signals and outputs, each polling_period seconds. The the status can be monitored on the workflow manager GUI. Collect() will not return until the results of all the chunks have been received. If FALSE, Collect() return an error if the execution has not finished, otherwise it will return the merged array. The default value is TRUE. |
remove |
A logical value deciding whether to remove of all chunk results received from the HPC after data being collected, as well as the local job folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the data and Collect() them as many times as desired, set remove to FALSE. The default value is TRUE. |
on_remote |
A logical value deciding to the function is run locally and sync the outputs back from HPC (FALSE, default), or it is run on HPC (TRUE). |
A list of merged data array.
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step) ## Not run: res <- Compute(wf, chunks = list(longitude = 2, sdate = 2), threads_load = 1, threads_compute = 4, cluster = list(queue_host = 'nord3', queue_type = 'lsf', temp_dir = '/on_hpc/tmp_dir/', cores_per_job = 2, job_wallclock = '05:00', max_jobs = 4, extra_queue_params = list('#BSUB -q bsc_es'), bidirectional = FALSE, polling_period = 10 ), ecflow_suite_dir = '/on_local_machine/username/ecflow_dir/', wait = FALSE) saveRDS(res, file = 'test_collect.Rds') collect_info <- readRDS('test_collect.Rds') result <- Collect(collect_info, wait = TRUE) ## End(Not run)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step) ## Not run: res <- Compute(wf, chunks = list(longitude = 2, sdate = 2), threads_load = 1, threads_compute = 4, cluster = list(queue_host = 'nord3', queue_type = 'lsf', temp_dir = '/on_hpc/tmp_dir/', cores_per_job = 2, job_wallclock = '05:00', max_jobs = 4, extra_queue_params = list('#BSUB -q bsc_es'), bidirectional = FALSE, polling_period = 10 ), ecflow_suite_dir = '/on_local_machine/username/ecflow_dir/', wait = FALSE) saveRDS(res, file = 'test_collect.Rds') collect_info <- readRDS('test_collect.Rds') result <- Collect(collect_info, wait = TRUE) ## End(Not run)
The step of the startR workflow after the complete workflow is defined by
AddStep(). This function specifies the execution parameters and triggers the
execution. The execution can be operated locally or on a remote machine. If
it is the latter case, the configuration of the machine needs to be
sepecified in the function, and the EC-Flow server is required to be
installed.
The execution can be operated by chunks to avoid overloading the RAM memory.
After all the chunks are finished, Compute() will gather and merge them, and
return a single data object, including one or multiple multidimensional data
arrays and additional metadata.
Compute( workflow, chunks = "auto", workflow_manager = "ecFlow", threads_load = 1, threads_compute = 1, cluster = NULL, ecflow_suite_dir = NULL, ecflow_server = NULL, autosubmit_suite_dir = NULL, autosubmit_server = NULL, silent = FALSE, debug = FALSE, wait = TRUE )
Compute( workflow, chunks = "auto", workflow_manager = "ecFlow", threads_load = 1, threads_compute = 1, cluster = NULL, ecflow_suite_dir = NULL, ecflow_server = NULL, autosubmit_suite_dir = NULL, autosubmit_server = NULL, silent = FALSE, debug = FALSE, wait = TRUE )
workflow |
A list of the class 'startR_workflow' returned by function AddSteop() or of class 'startR_cube' returned by function Start(). It contains all the objects needed for the execution. |
chunks |
A named list of dimensions which to split the data along and the number of chunks to make for each. The chunked dimension can only be those not required as the target dimension in function Step(). The default value is 'auto', which lists all the non-target dimensions and each one has one chunk. |
workflow_manager |
Can be NULL, 'ecFlow' or 'Autosubmit'. The default is 'ecFlow'. |
threads_load |
An integer indicating the number of parallel execution cores to use for the data retrieval stage. The default value is 1. |
threads_compute |
An integer indicating the number of parallel execution cores to use for the computation. The default value is 1. |
cluster |
A list of components that define the configuration of the machine to be run on. The comoponents vary from the different machines. Check Practical guide on GitLab for more details and examples. Only needed when the computation is not run locally. The default value is NULL. |
ecflow_suite_dir |
A character string indicating the path to a folder in the local workstation where to store temporary files generated for the automatic management of the workflow. Only needed when the execution is run remotely. The default value is NULL. |
ecflow_server |
A named vector indicating the host and port of the
EC-Flow server. The vector form should be
|
autosubmit_suite_dir |
A character string indicating the path to a folder where to store temporary files generated for the automatic management of the workflow manager. This path should be available in local workstation as well as autosubmit machine. The default value is NULL, and a temporary folder under the current working folder will be created. |
autosubmit_server |
A character vector indicating the login node of the autosubmit machine. It can be "bscesautosubmit01" or "bscesautosubmit02". The default value is NULL, and the node will be randomly chosen. |
silent |
A logical value deciding whether to print the computation progress (FALSE) on the R session or not (TRUE). It only works when the execution runs locally or the parameter 'wait' is TRUE. The default value is FALSE. |
debug |
A logical value deciding whether to return detailed messages on the progress and operations in a Compute() call (TRUE) or not (FALSE). Automatically changed to FALSE if parameter 'silent' is TRUE. The default value is FALSE. |
wait |
A logical value deciding whether the R session waits for the Compute() call to finish (TRUE) or not (FALSE). If FALSE, it will return an object with all the information of the startR execution that can be stored in your disk. After that, the R session can be closed and the results can be collected later with the Collect() function. The default value is TRUE. |
A list of data arrays for the output returned by the last step in the specified workflow (wait = TRUE), or an object with information about the startR execution (wait = FALSE). The configuration details and profiling information are attached as attributes to the returned list of arrays.
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step) res <- Compute(wf, chunks = list(longitude = 4, sdate = 2))
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step) res <- Compute(wf, chunks = list(longitude = 4, sdate = 2))
This is a helper function used in a Start() call to define the desired range
of dimensions. It selects the indices of the coordinate variable from
original data. See details in the documentation of the parameter ...
'indices to take' of the function Start().
indices(x)
indices(x)
x |
A numeric vector or a list with two nemerics to take all the elements between the two specified indices (both extremes inclusive). |
Same as input, but with additional attribute 'indices', 'values', and 'chunk'.
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = indices(1:2), longitude = indices(list(2, 14)), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = indices(1:2), longitude = indices(list(2, 14)), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
This is a file closer function for NetCDF files, intended for use as
parameter 'file_closer' in a Start() call. This function complies with the
input/output interface required by Start() defined in the documentation for
the parameter 'file_closer'.
This function uses the function NcClose() in the package 'easyNCDF',
which in turn uses nc_close() in the package 'ncdf4'.
NcCloser(file_object)
NcCloser(file_object)
file_object |
An open connection to a NetCDF file, optionally with additional header information. See details in the documentation of the parameter 'file_closer' of the function Start(). |
This function returns NULL.
NcOpener
NcDataReader
NcDimReader
NcVarReader
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') connection <- NcOpener(path_obs) NcCloser(connection)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') connection <- NcOpener(path_obs) NcCloser(connection)
This is a data reader function for NetCDF files, intended for use as parameter
file_data_reader in a Start() call. This function complies with the
input/output interface required by Start() defined in the documentation for
the parameter 'file_data_reader'.
This function uses the function NcToArray() in the package 'easyNCDF', which
in turn uses nc_var_get() in the package 'ncdf4'.
NcDataReader( file_path = NULL, file_object = NULL, file_selectors = NULL, inner_indices = NULL, synonims )
NcDataReader( file_path = NULL, file_object = NULL, file_selectors = NULL, inner_indices = NULL, synonims )
file_path |
A character string indicating the path to the data file to read. See details in the documentation of the parameter 'file_data_reader' of the function Start(). The default value is NULL. |
file_object |
An open connection to a NetCDF file, optionally with additional header information. See details in the documentation of the parameter 'file_data_reader' of the function Start(). The default value is NULL. |
file_selectors |
A named list containing the information of the path of the file to read data from. It is automatically provided by Start(). See details in the documentation of the parameter 'file_data_reader' of the function Start(). The default value is NULL. |
inner_indices |
A named list of numeric vectors indicating the indices to take from each of the inner dimensions in the requested file. It is automatically provided by Start(). See details in the documentation of the parameter 'file_data_reader' of the function Start(). The default value is NULL. |
synonims |
A named list indicating the synonims for the dimension names to look for in the requested file, exactly as provided in the parameter 'synonims' in a Start() call. See details in the documentation of the parameter 'file_data_reader' of the function Start(). |
A multidimensional data array with the named dimensions and indices requested in 'inner_indices', potentially with the attribute 'variables' with additional auxiliary data. See details in the documentation of the parameter 'file_data_reader' of the function Start().
NcOpener
NcDimReader
NcCloser
NcVarReader
data_path <- system.file('extdata', package = 'startR', mustWork = TRUE) file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') first_round_indices <- list(time = 1, latitude = 1:8, longitude = 1:16) synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') sub_array <- NcDataReader(file_to_open, NULL, file_selectors, first_round_indices, synonims)
data_path <- system.file('extdata', package = 'startR', mustWork = TRUE) file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') first_round_indices <- list(time = 1, latitude = 1:8, longitude = 1:16) synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') sub_array <- NcDataReader(file_to_open, NULL, file_selectors, first_round_indices, synonims)
A dimension reader function for NetCDF files, intended for use as parameter
'file_dim_reader' in a Start() call. It complies with the input/output
interface required by Start() defined in the documentation for the parameter
'file_dim_reader' of that function.
This function uses the function NcReadDims() in the package 'easyNCDF'.
NcDimReader( file_path = NULL, file_object = NULL, file_selectors = NULL, inner_indices = NULL, synonims )
NcDimReader( file_path = NULL, file_object = NULL, file_selectors = NULL, inner_indices = NULL, synonims )
file_path |
A character string indicating the path to the data file to read. See details in the documentation of the parameter 'file_dim_reader' of the function Start(). The default value is NULL. |
file_object |
An open connection to a NetCDF file, optionally with additional header information. See details in the documentation of the parameter 'file_dim_reader' of the function Start(). The default value is NULL. |
file_selectors |
A named list containing the information of the path of the file to read data from. It is automatically provided by Start(). See details in the documentation of the parameter 'file_dim_reader' of the function Start(). The default value is NULL. |
inner_indices |
A named list of numeric vectors indicating the indices to take from each of the inner dimensions in the requested file. It is automatically provided by Start(). See details in the documentation of the parameter 'file_dim_reader' of the function Start(). The default value is NULL. |
synonims |
A named list indicating the synonims for the dimension names to look for in the requested file, exactly as provided in the parameter 'synonims' in a Start() call. See details in the documentation of the parameter 'file_dim_reader' of the function Start(). |
A named numeric vector with the names and sizes of the dimensions of the requested file.
NcOpener
NcDataReader
NcCloser
NcVarReader
data_path <- system.file('extdata', package = 'startR') file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') first_round_indices <- list(time = 1, latitude = 1:8, longitude = 1:16) synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') dim_of_file <- NcDimReader(file_to_open, NULL, file_selectors, first_round_indices, synonims)
data_path <- system.file('extdata', package = 'startR') file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') first_round_indices <- list(time = 1, latitude = 1:8, longitude = 1:16) synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') dim_of_file <- NcDimReader(file_to_open, NULL, file_selectors, first_round_indices, synonims)
This is a file opener function for NetCDF files, intended for use as parameter
'file_opener' in a Start() call. This function complies with the input/output
interface required by Start() defined in the documentation for the parameter
'file_opener'.
This function uses the function NcOpen() in the package 'easyNCDF', which in
turn uses nc_open() in the package 'ncdf4'.
NcOpener(file_path)
NcOpener(file_path)
file_path |
A character string indicating the path to the data file to read. See details in the documentation of the parameter 'file_opener' of the function Start(). |
An open connection to a NetCDF file with additional header information as returned by nc_open() in the package 'ncdf4'. See details in the documentation of the parameter 'file_opener' of the function Start().
NcDimReader
NcDataReader
NcCloser
NcVarReader
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') connection <- NcOpener(path_obs) NcCloser(connection)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') connection <- NcOpener(path_obs) NcCloser(connection)
This is an auxiliary variable reader function for NetCDF files, intended for
use as parameter 'file_var_reader' in a Start() call. It complies with the
input/output interface required by Start() defined in the documentation for
the parameter 'file_var_reader' of that function.
This function uses the function NcDataReader() in the package 'startR',
which in turn uses NcToArray() in the package 'easyNCDF', which in turn uses
nc_var_get() in the package 'ncdf4'.
NcVarReader( file_path = NULL, file_object = NULL, file_selectors = NULL, var_name = NULL, synonims )
NcVarReader( file_path = NULL, file_object = NULL, file_selectors = NULL, var_name = NULL, synonims )
file_path |
A character string indicating the path to the data file to read the variable from. See details in the documentation of the parameter 'file_var_reader' of the function Start(). The default value is NULL. |
file_object |
An open connection to a NetCDF file, optionally with additional header information. See details in the documentation of the parameter 'file_var_reader' of the function Start(). The default value is NULL. |
file_selectors |
A named list containing the information of the path of the file to read data from. It is automatically provided by Start(). See details in the documentation of the parameter 'file_var_reader' of the function Start(). The default value is NULL. |
var_name |
A character string with the name of the variable to be read. The default value is NULL. |
synonims |
A named list indicating the synonims for the dimension names to look for in the requested file, exactly as provided in the parameter 'synonims' in a Start() call. See details in the documentation of the parameter 'file_var_reader' of the function Start(). |
A multidimensional data array with the named dimensions, potentially with the attribute 'variables' with additional auxiliary data. See details in the documentation of the parameter 'file_var_reader' of the function Start().
NcOpener
NcDataReader
NcCloser
NcDimReader
data_path <- system.file('extdata', package = 'startR') file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') var <- NcVarReader(file_to_open, NULL, file_selectors, 'tos', synonims)
data_path <- system.file('extdata', package = 'startR') file_to_open <- file.path(data_path, 'obs/monthly_mean/tos/tos_200011.nc') file_selectors <- c(dat = 'dat1', var = 'tos', sdate = '200011') synonims <- list(dat = 'dat', var = 'var', sdate = 'sdate', time = 'time', latitude = 'latitude', longitude = 'longitude') var <- NcVarReader(file_to_open, NULL, file_selectors, 'tos', synonims)
This is a selector checker function intended for use as parameter 'selector_checker' in a Start() call. It translates a set of selectors which is the value for one dimension into a set of numeric indices corresponding to the coordinate variable. The function complies with the input/output interface required by Start() defined in the documentation for the parameter 'selector_checker' of Start().
SelectorChecker(selectors, var = NULL, return_indices = TRUE, tolerance = NULL)
SelectorChecker(selectors, var = NULL, return_indices = TRUE, tolerance = NULL)
selectors |
A vector or a list of two of numeric indices or variable values to be retrieved for a dimension, automatically provided by Start(). See details in the documentation of the parameters 'selector_checker' and '...' of the function Start(). |
var |
A vector of values of a coordinate variable for which to search matches with the provided indices or values in the parameter 'selectors', automatically provided by Start(). See details in the documentation of the parameters 'selector_checker' and '...' of the function Start(). The default value is NULL. When not specified, SelectorChecker() simply returns the input indices. |
return_indices |
A logical value automatically configured by Start(), telling whether to return the numeric indices or coordinate variable values after the matching. The default value is TRUE. |
tolerance |
A numeric value indicating a tolerance value to be used in
the matching of 'selectors' and 'var'. See documentation on
'<dim_name>_tolerance' in |
A vector of either the indices of the matching values (if return_indices = TRUE) or the matching values themselves (if return_indices = FALSE).
# Get the latitudes from 10 to 20 degree sub_array_of_selectors <- list(10, 20) # The latitude values from original file sub_array_of_values <- seq(90, -90, length.out = 258)[2:257] SelectorChecker(sub_array_of_selectors, sub_array_of_values)
# Get the latitudes from 10 to 20 degree sub_array_of_selectors <- list(10, 20) # The latitude values from original file sub_array_of_values <- seq(90, -90, length.out = 258)[2:257] SelectorChecker(sub_array_of_selectors, sub_array_of_values)
The reorder function intended for use as parameter '<dim_name>_reorder'
in a call to the function Start(). This function complies with the
input/output interface required by Start() defined in the documentation
for the parameter ...
of that function.
The coordinate applied to Sort() consists of an increasing or decreasing
sort of the values. It is useful for adjusting the latitude order.
The coordinate applied to CircularSort() consists of a circular sort of
values, where any values beyond the limits specified in the parameters
'start' and 'end' is applied a modulus to fall in the specified
range. This is useful for circular coordinates such as the Earth longitudes.
Sort(...) CircularSort(start, end, ...)
Sort(...) CircularSort(start, end, ...)
... |
Additional parameters to adjust the reorderig. See function sort() for more details. |
start |
A numeric indicating the lower bound of the circular range. |
end |
A numeric indicating the upper bound of the circular range. |
A list of 2 containing:
$x |
The reordered values. |
$ix |
The permutation indices of $x in the original coordinate. |
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(list(-60, 60)), latitude_reorder = Sort(decreasing = TRUE), longitude = values(list(-120, 120)), longitude_reorder = CircularSort(-180, 180), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(list(-60, 60)), latitude_reorder = Sort(decreasing = TRUE), longitude = values(list(-120, 120)), longitude_reorder = CircularSort(-180, 180), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
See the startR documentation and
tutorial for a step-by-step explanation on how to use Start().
Nowadays in the era of big data, large multidimensional data sets from
diverse sources need to be combined and processed. Analysis of big data in any
field is often highly complex and time-consuming. Taking subsets of these data
sets and processing them efficiently become an indispensable practice. This
technique is also known as Domain Decomposition, Map Reduce or, more commonly,
'chunking'.
startR (Subset, TrAnsform, ReTrieve, arrange and process large
multidimensional data sets in R) is an R project started at BSC with the aim
to develop a tool that allows the user to automatically process large
multidimensional distributed data sets. It is an open source project that is
open to external collaboration and funding, and will continuously evolve to
support as many data set formats as possible while maximizing its efficiency.
startR provides a framework under which a data set (collection of one
or multiple data files, potentially distributed over various remote servers)
are perceived as if they all were part of a single large multidimensional
array. Once such multidimensional array is declared, any user-defined function
can be applied to the data in a apply
-like fashion, where startR
transparently implements the Map Reduce paradigm. The steps to follow in order
to process a collection of big data sets are as follows:
Declaring the data set, i.e. declaring the distribution of the data files involved, the dimensions and shape of the multidimensional array, and the boundaries of the target data. This step can be performed with the Start() function. Numeric indices or coordinate values can be used when fixing the boundaries. It is common having the need to apply transformations, pre-processing or reordering to the data. Start() accepts user-defined transformation or reordering functions to be applied for such purposes. Once a data set is declared, a list of involved files, dimension lengths, memory size and other metadata is made available. Optionally, the data set can be retrieved and loaded onto the current R session if it is small enough.
Declaring the workflow of operations to perform on the involved data set(s). This step can be performed with the Step() and AddStep() functions.
Defining the computation settings. The mandatory settings include a) how many subsets to divide the data sets into and along which dimensions; b) which platform to perform the workflow of operations on (local machine or remote machine/HPC?), how to communicate with it (unidirectional or bidirectional connection? shared or separate file systems?), which queuing system it uses (slurm, PBS, LSF, none?); and c) how many parallel jobs and execution threads per job to use when running the calculations. This step can be performed when building up the call to the Compute() function.
Running the computation. startR transparently implements the Map Reduce paradigm, according to the settings in the previous steps. The progress can optionally be monitored with the EC-Flow workflow management tool. When the computation ends, a report of performance timings is displayed. This step can be triggered with the Compute() function.
startR is not bound to a specific file format. Interface functions to custom file formats can be provided for Start() to read them. As this version, startR includes interface functions to the following file formats:
NetCDF
Metadata and auxilliary data is also preserved and arranged by Start() in the measure that it is retrieved by the interface functions for a specific file format.
Start( ..., return_vars = NULL, synonims = NULL, file_opener = NcOpener, file_var_reader = NcVarReader, file_dim_reader = NcDimReader, file_data_reader = NcDataReader, file_closer = NcCloser, transform = NULL, transform_params = NULL, transform_vars = NULL, transform_extra_cells = 2, apply_indices_after_transform = FALSE, pattern_dims = NULL, metadata_dims = NULL, selector_checker = SelectorChecker, merge_across_dims = FALSE, merge_across_dims_narm = TRUE, split_multiselected_dims = FALSE, path_glob_permissive = FALSE, largest_dims_length = FALSE, retrieve = FALSE, num_procs = 1, ObjectBigmemory = NULL, silent = FALSE, debug = FALSE )
Start( ..., return_vars = NULL, synonims = NULL, file_opener = NcOpener, file_var_reader = NcVarReader, file_dim_reader = NcDimReader, file_data_reader = NcDataReader, file_closer = NcCloser, transform = NULL, transform_params = NULL, transform_vars = NULL, transform_extra_cells = 2, apply_indices_after_transform = FALSE, pattern_dims = NULL, metadata_dims = NULL, selector_checker = SelectorChecker, merge_across_dims = FALSE, merge_across_dims_narm = TRUE, split_multiselected_dims = FALSE, path_glob_permissive = FALSE, largest_dims_length = FALSE, retrieve = FALSE, num_procs = 1, ObjectBigmemory = NULL, silent = FALSE, debug = FALSE )
... |
A selection of custemized parameters depending on the data
format. When we retrieve data from one or a collection of data sets,
the involved data can be perceived as belonging to a large multi-dimensional
array. For instance, let us consider an example case. We want to retrieve data
from a source, which contains data for the number of monthly sales of various
items, and also for their retail price each month. The data on source is
stored as follows:
For each dimension, the 3 first information items can be specified with a set
of parameters to be provided through |
return_vars |
A named list where the names are the names of the
variables to be fetched in the files, and the values are vectors of
character strings with the names of the file dimension which to retrieve each
variable for, or NULL if the variable has to be retrieved only once
from any (the first) of the involved files. |
synonims |
A named list where the names are the requested variable or
dimension names, and the values are vectors of character strings with
alternative names to seek for such dimension or variable. |
file_opener |
A function that receives as a single parameter
'file_path' a character string with the path to a file to be opened,
and returns an object with an open connection to the file (optionally with
header information) on success, or returns NULL on failure.
|
file_var_reader |
A function with the header |
file_dim_reader |
A function with the header |
file_data_reader |
A function with the header |
file_closer |
A function that receives as a single parameter
'file_object' an open connection (as returned by 'file_opener')
to one of the files to be read, optionally with header information, and
closes the open connection. Always returns NULL.
|
transform |
A function with the header |
transform_params |
A named list with additional parameters to be sent to the 'transform' function (if specified). See documentation on parameter 'transform' for details. |
transform_vars |
A vector of character strings with the names of auxiliary variables to be sent to the 'transform' function (if specified). All the variables to be sent to 'transform' must also have been requested as return variables in the parameter 'return_vars' of Start(). |
transform_extra_cells |
An integer of extra indices to retrieve from the
data set, beyond the requested indices in |
apply_indices_after_transform |
A logical value indicating when a 'transform' is specified in Start() and numeric indices are provided for any of the inner dimensions that depend on coordinate variables, these numeric indices can be made effective (retrieved) before applying the transformation or after. The boolean flag allows to adjust this behaviour. It takes FALSE by default (numeric indices are applied before sending data to 'transform'). |
pattern_dims |
A character string indicating the name of the dimension
with path pattern specifications (see |
metadata_dims |
A vector of character strings with the names of the file
dimensions which to return metadata for. As noted in 'file_data_reader',
the data reader can optionally return auxiliary data via the attribute
'variables' of the returned array. Start() by default returns the
auxiliary data read for only the first file of each source (or data set) in
the pattern dimension (see |
selector_checker |
A function used internaly by Start() to translate a set of selectors (values for a dimension associated to a coordinate variable) into a set of numeric indices. It takes by default SelectorChecker() and, in principle, it should not be required to change it for customized file formats. The option to replace it is left open for more versatility. See the code of SelectorChecker() for details on the inputs, functioning and outputs of a selector checker. |
merge_across_dims |
A logical value indicating whether to merge
dimensions across which another dimension extends (according to the
'<dimname>_across' parameters). Takes the value FALSE by default. For
example, if the dimension 'time' extends across the dimension 'chunk' and
|
merge_across_dims_narm |
A logical value indicating whether to remove the additional NAs from data when parameter 'merge_across_dims' is TRUE. It is helpful when the length of the to-be-merged dimension is different across another dimension. For example, if the dimension 'time' extends across dimension 'chunk', and the time length along the first chunk is 2 while along the second chunk is 10. Setting this parameter as TRUE can remove the additional 8 NAs at position 3 to 10. The default value is TRUE, but will be automatically turned to FALSE if 'merge_across_dims = FALSE'. |
split_multiselected_dims |
A logical value indicating whether to split a dimension that has been selected with a multidimensional array of selectors into as many dimensions as present in the selector array. The default value is FALSE. |
path_glob_permissive |
A logical value or an integer specifying how many
folder levels in the path pattern, beginning from the end, the shell glob
expressions must be preserved and worked out for each file. The default
value is FALSE, which is equivalent to 0. TRUE is equivalent to 1. |
largest_dims_length |
A logical value or a named integer vector
indicating if Start() should examine all the files to get the largest
length of the inner dimensions (TRUE) or use the first valid file of each
dataset as the returned dimension length (FALSE). Since examining all the
files could be time-consuming, a vector can be used to explicitly specify
the expected length of the inner dimensions. For those inner dimensions not
specified, the first valid file will be used. The default value is FALSE. |
retrieve |
A logical value indicating whether to retrieve the data defined in the Start() call or to explore only its dimension lengths and names, and the values for the file and inner dimensions. The default value is FALSE. |
num_procs |
An integer of number of processes to be created for the parallel execution of the retrieval/transformation/arrangement of the multiple involved files in a call to Start(). If set to NULL, takes the number of available cores (as detected by future::availableCores). The default value is 1 (no parallel execution). |
ObjectBigmemory |
a character string to be included as part of the bigmemory object name. This parameter is thought to be used internally by the chunking capabilities of startR. |
silent |
A logical value of whether to display progress messages (FALSE) or not (TRUE). The default value is FALSE. |
debug |
A logical value of whether to return detailed messages on the progress and operations in a Start() call (TRUE) or not (FALSE). The default value is FALSE. |
If retrieve = TRUE
the involved data is loaded into RAM memory
and an object of the class 'startR_cube' with the following components is
returned:
Data |
Multidimensional data array with named dimensions, with the data values
requested via |
Variables |
Named list of 1 + N components, containing lists of retrieved variables (as
requested in 'return_vars') common to all the data sources (in the 1st
component, |
Files |
Multidimensonal character string array with named dimensions. Its dimensions
are the file dimensions (as requested in |
NotFoundFiles |
Array with the same shape as |
FileSelectors |
Multidimensional character string array with named dimensions, with the same
shape as |
PatternDim |
Character string containing the name of the file pattern dimension. |
If retrieve = FALSE
the involved data is not loaded into RAM memory and
an object of the class 'startR_header' with the following components is
returned:
Dimensions |
Named vector with the dimension lengths and names of the data involved in the Start() call. |
Variables |
Named list of 1 + N components, containing lists of retrieved variables (as
requested in 'return_vars') common to all the data sources (in the 1st
component, |
ExpectedFiles |
Multidimensonal character string array with named dimensions. Its dimensions are the file dimensions (as requested in ...). Each cell in this array contains a path to a file to be retrieved (which may exist or not). |
FileSelectors |
Multidimensional character string array with named dimensions, with the same
shape as |
PatternDim |
Character string containing the name of the file pattern dimension. |
StartRCall |
List of parameters sent to the Start() call, with the parameter 'retrieve' set to TRUE. Intended for calling in order to retrieve the associated data a posteriori with a call to do.call(). |
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
The step of the startR workflow after declaring data by Start() call. It identifies the operation (i.e., function) and the target and output dimensions of data array for the function. Ideally, it expects the dimension name to be in the same order as the one requested in the Start() call. If a different order is specified, startR will reorder the subset dimension to the expected order for this function.
Step( fun, target_dims, output_dims, use_libraries = NULL, use_attributes = NULL )
Step( fun, target_dims, output_dims, use_libraries = NULL, use_attributes = NULL )
fun |
A function in R format defining the operation to be applied to the data declared by a Start() call. It should only work on the essential dimensions rather than all the data dimensions. Since the function will be called numerous times through all the non-essential dimensions, it is recommended to keep them as light as possible. |
target_dims |
A vector for single input array or a list of vectors for multiple input arrays indicating the names of the dimensions 'fun' to be applied along. |
output_dims |
A vector for single returned array or a list of vectors for multiple returned arrays indicating the dimension names of the function output. |
use_libraries |
A vector of character string indicating the R library
names to be used in 'fun'. Only used when the jobs are run on HPCs; if the
jobs are run locally, load the necessary libraries by |
use_attributes |
One or more lists of vectors of character string indicating the data attributes to be used in 'fun'. The list name should be consistent with the list name of 'data' in AddStep(). The default value is NULL. |
A closure that contains all the objects assigned. It serves as the input of Addstep().
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step)
data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = 'all', longitude = 'all', return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE) fun <- function(x) { lat = attributes(x)$Variables$dat1$latitude weight = sqrt(cos(lat * pi / 180)) corrected = Apply(list(x), target_dims = "latitude", fun = function(x) {x * weight}) } step <- Step(fun = fun, target_dims = 'latitude', output_dims = 'latitude', use_libraries = c('multiApply'), use_attributes = list(data = "Variables")) wf <- AddStep(data, step)
This is a helper function used in a Start() call to define the desired range
of dimensions. It specifies the actual value to be matched with the
coordinate variable. See details in the documentation of the parameter
...
'indices to take' of the function Start().
values(x)
values(x)
x |
A numeric vector or a list with two nemerics to take all the element between the two specified values (both extremes inclusive). |
Same as input, but with additional attribute 'indices', 'values', and 'chunk'.
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(seq(-80, 80, 20)), latitude_reorder = Sort(), longitude = values(list(10, 300)), longitude_reorder = CircularSort(0, 360), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)
# Used in Start(): data_path <- system.file('extdata', package = 'startR') path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc') sdates <- c('200011', '200012') data <- Start(dat = list(list(path = path_obs)), var = 'tos', sdate = sdates, time = 'all', latitude = values(seq(-80, 80, 20)), latitude_reorder = Sort(), longitude = values(list(10, 300)), longitude_reorder = CircularSort(0, 360), return_vars = list(latitude = 'dat', longitude = 'dat', time = 'sdate'), retrieve = FALSE)