Title: | Amazon 'Kinesis' Consumer Application for Stream Processing |
---|---|
Description: | Fetching data from Amazon 'Kinesis' Streams using the Java-based 'MultiLangDaemon' interacting with Amazon Web Services ('AWS') for easy stream processing from R. For more information on 'Kinesis', see <https://aws.amazon.com/kinesis>. |
Authors: | Gergely Daroczi <[email protected]> |
Maintainer: | Gergely Daroczi <[email protected]> |
License: | AGPL-3 |
Version: | 1.7.6 |
Built: | 2024-11-10 06:31:00 UTC |
Source: | CRAN |
Checkpoint at current or given sequence number
checkpoint(sequenceNumber)
checkpoint(sequenceNumber)
sequenceNumber |
optional |
Run Kinesis Consumer application
kinesis_consumer( initialize, processRecords, shutdown, checkpointing = TRUE, updater, logfile = tempfile() )
kinesis_consumer( initialize, processRecords, shutdown, checkpointing = TRUE, updater, logfile = tempfile() )
initialize |
optional function to be run on startup. Please note that the variables created inside of this function will not be available to eg |
processRecords |
function to process records taking a |
shutdown |
optional function to be run when finished processing all records in a shard |
checkpointing |
if set to |
updater |
optional list of list(s) including frequency (in minutes) and function to be run, most likely to update some objects in the parent or global namespace populated first in the |
logfile |
file path of the log file. To disable logging, set |
Don't run this function directly, it is to be called by the MultiLangDaemon. See the package README for more details.
## Not run: log_threshold(FATAL, namespace = 'AWR.Kinesis') AWS.Kinesis::kinesis_consumer( initialize = function() log_info('Loading some data'), processRecords = function(records) log_info('Received some records from Kinesis'), updater = list(list(1, function() log_info('Updating some data every minute')), list(1/60, function() log_info('This is a high frequency updater call'))) ) ## End(Not run)
## Not run: log_threshold(FATAL, namespace = 'AWR.Kinesis') AWS.Kinesis::kinesis_consumer( initialize = function() log_info('Loading some data'), processRecords = function(records) log_info('Received some records from Kinesis'), updater = list(list(1, function() log_info('Updating some data every minute')), list(1/60, function() log_info('This is a high frequency updater call'))) ) ## End(Not run)
Get record from a Kinesis Stream
kinesis_get_records( stream, region = "us-west-1", limit = 25, shard_id, iterator_type = c("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "AT_TIMESTAMP"), start_sequence_number, start_timestamp )
kinesis_get_records( stream, region = "us-west-1", limit = 25, shard_id, iterator_type = c("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "AT_TIMESTAMP"), start_sequence_number, start_timestamp )
stream |
stream name (string) |
region |
AWS region (string) |
limit |
number of records to fetch |
shard_id |
optional shard id - will pick a random active shard if left empty |
iterator_type |
shard iterator type |
start_sequence_number |
for |
start_timestamp |
for |
character vector that you might want to post-process eg with jsonlite::stream_in
Use this no more than getting sample data from a stream - it's not intended for prod usage.
Write a record to a Kinesis Stream
kinesis_put_record(stream, region = "us-west-1", data, partitionKey)
kinesis_put_record(stream, region = "us-west-1", data, partitionKey)
stream |
stream name (string) |
region |
AWS region (string) |
data |
data blog (string) |
partitionKey |
determines which shard in the stream the data record is assigned to, eg username, stock symbol etc (string) |
invisible list including the shard id and sequence number
## Not run: df <- mtcars[1, ] str(kinesis_put_record('test-AWR', data = jsonlite::toJSON(df), partitionKey = row.names(df))) ## End(Not run)
## Not run: df <- mtcars[1, ] str(kinesis_put_record('test-AWR', data = jsonlite::toJSON(df), partitionKey = row.names(df))) ## End(Not run)