The TensorFlow Dataset API provides various facilities for creating scalable input pipelines for TensorFlow models, including:
Reading data from a variety of formats including CSV files and TFRecords files (the standard binary format for TensorFlow training data).
Transforming datasets in a variety of ways including mapping arbitrary functions against them.
Shuffling, batching, and repeating datasets over a number of epochs.
Streaming interface to data for reading arbitrarily large datasets.
Reading and transforming data are TensorFlow graph operations, so are executed in C++ and in parallel with model training.
The R interface to TensorFlow datasets provides access to the Dataset API, including high-level convenience functions for easy integration with the keras R package.
To use tfdatasets you need to install both the R package as well as TensorFlow itself.
First, install the tfdatasets R package from GitHub as follows:
Then, use the install_tensorflow()
function to install
TensorFlow:
To create a dataset, use one of the dataset creation functions. Dataset can be created from delimted text files, TFRecords files, as well as from in-memory data.
For example, to create a dataset from a text file, first create a
specification for how records will be decoded from the file, then call
text_line_dataset()
with the file to be read and the
specification:
library(tfdatasets)
# create specification for parsing records from an example file
iris_spec <- csv_record_spec("iris.csv")
# read the dataset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec)
# take a glimpse at the dataset
str(dataset)
<MapDataset shapes: {Sepal.Length: (), Sepal.Width: (), Petal.Length: (), Petal.Width: (),
Species: ()}, types: {Sepal.Length: tf.float32, Sepal.Width: tf.float32, Petal.Length:
tf.float32, Petal.Width: tf.float32, Species: tf.int32}>
In the example above, the csv_record_spec()
function is
passed an example file which is used to automatically detect column
names and types (done by reading up to the first 1,000 lines of the
file). You can also provide explicit column names and/or data types
using the names
and types
parameters (note
that in this case we don’t pass an example file):
# provide colum names and types explicitly
iris_spec <- csv_record_spec(
names = c("SepalLength", "SepalWidth", "PetalLength", "PetalWidth", "Species"),
types = c("double", "double", "double", "double", "integer"),
skip = 1
)
# read the datset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec)
Note that we’ve also specified skip = 1
to indicate that
the first row of the CSV that contains column names should be
skipped.
Supported column types are integer, double, and character. You can
also provide types
in a more compact form using
single-letter abbreviations (e.g. types = "dddi"
). For
example:
Decoding lines of text into a record can be computationally
expensive. You can parallelize these computations using the
parallel_records
parameter. For example:
You can also parallelize the reading of data from storage by
requesting that a buffer of records be prefected. You do this with the
dataset_prefetch()
function. For example:
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4) %>%
dataset_batch(128) %>%
dataset_prefetch(1)
This code will result in the prefetching of a single batch of data on a background thread (i.e. in parallel with training operations).
If you have multiple input files, you can also parallelize reading of these files both across multiple machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). See the section on Reading Multiple Files below for additional details.
You can read datasets from TFRecords
files using the tfrecord_dataset()
function.
In many cases you’ll want to map the records in the dataset into a
set of named columns. You can do this using the
dataset_map()
function along with the
tf$parse_single_example()
function. for example:
# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
filenames <- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
dataset <- tfrecord_dataset(filenames) %>%
dataset_map(function(example_proto) {
features <- list(
image = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int32)
)
tf$parse_single_example(example_proto, features)
})
You can parallelize reading of TFRecord files using the
num_parallel_reads
option, for example:
You can read datasets from SQLite databases using the
sqlite_dataset()
function. To use
sqlite_dataset()
you provide the filename of the database,
a SQL query to execute, and sql_record_spec()
that
describes the names and TensorFlow types of columns within the query.
For example:
library(tfdatasets)
record_spec <- sql_record_spec(
names = c("disp", "drat", "vs", "gear", "mpg", "qsec", "hp", "am", "wt", "carb", "cyl"),
types = c(tf$float64, tf$int32, tf$float64, tf$int32, tf$float64, tf$float64,
tf$float64, tf$int32, tf$int32, tf$int32, tf$int32)
)
dataset <- sqlite_dataset(
"data/mtcars.sqlite3",
"select * from mtcars",
record_spec
)
dataset
<MapDataset shapes: {disp: (), drat: (), vs: (), gear: (), mpg: (), qsec: (), hp: (), am: (),
wt: (), carb: (), cyl: ()}, types: {disp: tf.float64, drat: tf.int32, vs: tf.float64, gear:
tf.int32, mpg: tf.float64, qsec: tf.float64, hp: tf.float64, am: tf.int32, wt: tf.int32, carb:
tf.int32, cyl: tf.int32}>
Note that for floating point data you must use
tf$float64
(reading tf$float32
is not
supported for SQLite databases).
You can map arbitrary transformation functions onto dataset records
using the dataset_map()
function. For example, to transform
the “Species” column into a one-hot encoded vector you would do
this:
dataset <- dataset %>%
dataset_map(function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
})
Note that while dataset_map()
is defined using an R
function, there are some special constraints on this function which
allow it to execute not within R but rather within the
TensorFlow graph.
For a dataset created with the csv_dataset()
function,
the passed record will be named list of tensors (one for each column of
the dataset). The return value should be another set of tensors which
were created from TensorFlow functions (e.g. tf$one_hot
as
illustrated above). This function will be converted to a TensorFlow
graph operation that performs the transformation within native code.
If these transformations are computationally expensive they can be
executed on multiple threads using the num_parallel_calls
parameter. For example:
dataset <- dataset %>%
dataset_map(num_parallel_calls = 4, function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
})
You can control the maximum number of processed elements that will be
buffered when processing in parallel using the
dataset_prefetch()
transformation. For example:
dataset <- dataset %>%
dataset_map(num_parallel_calls = 4, function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
}) %>%
datset_prefetch(1)
If you are batching your data for training, you can optimize
performance using the dataset_map_and_batch()
function
(which fuses together the map and batch operations). For example:
You can filter the elements of a dataset using the
dataset_filter()
function, which takes a
predicate
function that returns a boolean tensor for
records that should be included. For example:
dataset <- csv_dataset("mtcars.csv") %>%
dataset_filter(function(record) {
record$mpg >= 20
})
dataset <- csv_dataset("mtcars.csv") %>%
dataset_filter(function(record) {
record$mpg >= 20 & record$cyl >= 6L
})
Note that the functions used inside the predicate must be tensor
operations (e.g. tf$not_equal
, tf$less
, etc.).
R generic methods for relational operators (e.g. <, >, <=,
etc.) and logical operators (e.g. !, &, |, etc.) are provided so you
can use shorthand syntax for most common comparisons (as illustrated
above).
A common transformation is taking a column oriented dataset (e.g. one
created by csv_dataset()
or
tfrecord_dataset()
) and transforming it into a two-element
list with features (“x”) and response (“y”). You can use the
dataset_prepare()
function to do this type of
transformation. For example:
mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(x = c(mpg, disp), y = cyl)
iris_dataset <- text_line_dataset("iris.csv", record_spec = iris_spec) %>%
dataset_prepare(x = -Species, y = Species)
The dataset_prepare()
function also accepts standard R
formula syntax for defining features and response:
mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(cyl ~ mpg + disp)
If you are batching your data for training you add a
batch_size
parameter to fuse together the
dataset_prepare()
and dataset_batch()
steps
(which generally results in faster training). For example:
There are several functions which control how batches are drawn from the dataset. For example, the following specifies that data will be drawn in batches of 128 from a shuffled window of 1000 records, and that the dataset will be repeated for 10 epochs:
Note that you can optimize performance by fusing the shuffle and
repeat operations into a single step using the
dataset_shuffle_and_repeat()
function. For example:
Earlier we alluded to the dataset_prefetch()
function,
which enables you to ensure that a given number of records (or batches
of records) are prefetched in parallel so they are ready to go when the
next batch is processed. For example:
dataset <- dataset %>%
dataset_map_and_batch(batch_size = 128, function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
}) %>%
dataset_prefetch(1)
If you are using a GPU for training, you can also use the
dataset_prefetch_to_device()
function to specify that the
parallel prefetch operation stage the data directly into GPU memory. For
example:
dataset <- dataset %>%
dataset_map_and_batch(batch_size = 128, function(record) {
record$Species <- tf$one_hot(record$Species, 3L)
record
}) %>%
dataset_prefetch_to_device("/gpu:0")
In this case the buffer size for prefetches is determined
automatically (you can manually speicfy it using the
buffer_size
parameter).
Here’s a complete example of using the various dataset transformation
functions together. We’ll read the mtcars
dataset from a
CSV, filter it on some threshold values, map it into x
and
y
components for modeling, and specify desired shuffling
and batch iteration behavior:
The method for reading data from a TensorFlow Dataset varies depending upon which API you are using to build your models. If you are using the keras3, then TensorFlow Datasets can be used much like in-memory R matrices and arrays. If you are using the lower-level tensorflow core API then you’ll use explicit dataset iteration functions.
The sections below provide additional details and examples for each of the supported APIs.
IMPORTANT NOTE: Using TensorFlow Datasets with Keras requires that you are running the very latest versions of Keras (v2.2) and TensorFlow (v1.9). You can ensure that you have the latest versions of the core Keras and TensorFlow libraries with:
Keras models are often trained by passing in-memory arrays directly
to the fit
function. For example:
However, this requires loading data into an R data frame or matrix
before calling fit. You can use the train_on_batch()
function to stream data one batch at a time, however the reading and
processing of the input data is still being done serially and outside of
native code.
Alternatively, Keras enables you to pass a dataset directly as the
x
argument to fit()
and
evaluate()
. Here’s a complete example that uses datasets to
read from TFRecord files containing MNIST digits:
library(keras3)
library(tfdatasets)
batch_size = 128
steps_per_epoch = 500
# function to read and preprocess mnist dataset
mnist_dataset <- function(filename) {
dataset <- tfrecord_dataset(filename) %>%
dataset_map(function(example_proto) {
# parse record
features <- tf$parse_single_example(
example_proto,
features = list(
image_raw = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int64)
)
)
# preprocess image
image <- tf$decode_raw(features$image_raw, tf$uint8)
image <- tf$cast(image, tf$float32) / 255
# convert label to one-hot
label <- tf$one_hot(tf$cast(features$label, tf$int32), 10L)
# return
list(image, label)
}) %>%
dataset_repeat() %>%
dataset_shuffle(1000) %>%
dataset_batch(batch_size, drop_remainder = TRUE) %>%
dataset_prefetch(1)
}
model <- keras_model_sequential() %>%
layer_dense(units = 256, activation = 'relu', input_shape = c(784)) %>%
layer_dropout(rate = 0.4) %>%
layer_dense(units = 128, activation = 'relu') %>%
layer_dropout(rate = 0.3) %>%
layer_dense(units = 10, activation = 'softmax')
model %>% compile(
loss = 'categorical_crossentropy',
optimizer = optimizer_rmsprop(),
metrics = c('accuracy')
)
history <- model %>% fit(
mnist_dataset("mnist/train.tfrecords"),
steps_per_epoch = steps_per_epoch,
epochs = 20,
validation_data = mnist_dataset("mnist/validation.tfrecords"),
validation_steps = steps_per_epoch
)
score <- model %>% evaluate(
mnist_dataset("mnist/test.tfrecords"),
steps = steps_per_epoch
)
print(score)
Note that all data preprocessing (e.g. one-hot encoding of the
response variable) is done within the dataset_map()
operation.
Also note that we pass drop_remainder = TRUE
to the
dataset_batch()
function (this is to make sure that all
batches are of equal size, a requirement for Keras tensor inputs).
You read batches of data from a dataset by using tensors that yield
the next batch. You can obtain this tensor from a dataset via the
as_iterator()
and iter_next()
functions. For
example:
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset_prepare(cyl ~ mpg + disp) %>%
dataset_shuffle(20) %>%
dataset_batch(5)
iter <- as_iterator(dataset)
next_batch <- iter_next(iter)
next_batch
$x
Tensor("IteratorGetNext_13:0", shape=(?, 2), dtype=float32)
$y
Tensor("IteratorGetNext_13:1", shape=(?,), dtype=int32)
If you have multiple input files you can process them in parallel
both across machines (sharding) and/or on multiple threads per-machine
(parallel reads with interleaving). The read_files()
function provides a high-level interface to parallel file reading.
The read_files()
function takes a set of files and a
read function along with various options to orchestrate parallel
reading. For example, the following function reads all CSV files in a
directory using the text_line_dataset()
function:
dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
parallel_files = 4, parallel_interleave = 16) %>%
dataset_prefetch(5000) %>%
dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>%
dataset_batch(128)
The parallel_files
argument requests that 4 files be
processed in parallel and the parallel_interleave
argument
requests that blocks of 16 consecutive records from each file be
interleaved in the resulting dataset.
Note that because we are processing files in parallel we do
not pass the parallel_records
argument to
text_line_dataset()
, since we are already parallelizing at
the file level.
If you are training on multiple machines and the training supervisor passes a shard index to your training script, you can also parallelizing reading by sharding the file list. For example:
# command line flags for training script (shard info is passed by training
# supervisor that executes the script)
FLAGS <- flags(
flag_integer("num_shards", 1),
flag_integer("shard_index", 1)
)
# forward shard info to read_files
dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
parallel_files = 4, parallel_interleave = 16,
num_shards = FLAGS$num_shards, shard_index = FLAGS$shard_index) %>%
dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>%
dataset_batch(128) %>%
dataset_prefetch(1)