Kinesis examples using SDK for Ruby - AWS SDK for Ruby

Kinesis examples using SDK for Ruby

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Ruby with Kinesis.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Serverless examples

The following code example shows how to implement a Lambda function that receives an event triggered by receiving records from a Kinesis stream. The function retrieves the Kinesis payload, decodes from Base64, and logs the record contents.

SDK for Ruby
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Consuming an Kinesis event with Lambda using Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end

The following code example shows how to implement partial batch response for Lambda functions that receive events from a Kinesis stream. The function reports the batch item failures in the response, signaling to Lambda to retry those messages later.

SDK for Ruby
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples repository.

Reporting Kinesis batch item failures with Lambda using Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) batch_item_failures = [] event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue StandardError => err puts "An error occurred #{err}" # Since we are working with streams, we can return the failed item immediately. # Lambda will immediately begin to retry processing from this failed item onwards. return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] } end end puts "Successfully processed #{event['Records'].length} records." { batchItemFailures: batch_item_failures } end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('utf-8') # Placeholder for actual async work sleep(1) data end