Process Amazon Kinesis Data Streams records with Lambda - AWS Lambda

Process Amazon Kinesis Data Streams records with Lambda

To process Amazon Kinesis Data Streams records with Lambda, create a consumer for your stream and then create a Lambda event source mapping.

Configuring your data stream and function

Your Lambda function is a consumer application for your data stream. It processes one batch of records at a time from each shard. You can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out.

  • Standard iterator: Lambda polls each shard in your Kinesis stream for records at a base rate of once per second. When more records are available, Lambda keeps processing batches until the function catches up with the stream. The event source mapping shares read throughput with other consumers of the shard.

  • Enhanced fan-out: To minimize latency and maximize read throughput, create a data stream consumer with enhanced fan-out. Enhanced fan-out consumers get a dedicated connection to each shard that doesn't impact other applications reading from the stream. Stream consumers use HTTP/2 to reduce latency by pushing records to Lambda over a long-lived connection and by compressing request headers. You can create a stream consumer with the Kinesis RegisterStreamConsumer API.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

You should see the following output:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

To increase the speed at which your function processes records, add shards to your data stream. Lambda processes records in each shard in order. It stops processing additional records in a shard if your function returns an error. With more shards, there are more batches being processed at once, which lowers the impact of errors on concurrency.

If your function can't scale up to handle the total number of concurrent batches, request a quota increase or reserve concurrency for your function.

Create an event source mapping to invoke a Lambda function

To invoke your Lambda function with records from your data stream, create an event source mapping. You can create multiple event source mappings to process the same data with multiple Lambda functions, or to process items from multiple data streams with a single function. When processing items from multiple streams, each batch contains records from only a single shard or stream.

You can configure event source mappings to process records from a stream in a different AWS account. To learn more, see Creating a cross-account event source mapping.

Before you create an event source mapping, you need to give your Lambda function permission to read from a Kinesis data stream. Lambda needs the following permissions to manage resources related to your Kinesis data stream:

The AWS managed policy AWSLambdaKinesisExecutionRole includes these permissions. Add this managed policy to your function as described in the following procedure.

AWS Management Console
To add Kinesis permissions to your function
  1. Open the Functions page of the Lambda console and select your function.

  2. In the Configuration tab, select Permissions.

  3. In the Execution role pane, under Role name, choose the link to your function’s execution role. This link opens the page for that role in the IAM console.

  4. In the Permissions policies pane, choose Add permissions, then select Attach policies.

  5. In the search field, enter AWSLambdaKinesisExecutionRole.

  6. Select the checkbox next to the policy and choose Add permission.

AWS CLI
To add Kinesis permissions to your function
  • Run the following CLI command to add the AWSLambdaKinesisExecutionRole policy to your function’s execution role:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
To add Kinesis permissions to your function
  • In your function’s definition, add the Policies property as shown in the following example:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

After configuring the required permissions, create the event source mapping.

AWS Management Console
To create the Kinesis event source mapping
  1. Open the Functions page of the Lambda console and select your function.

  2. In the Function overview pane, choose Add trigger.

  3. Under Trigger configuration, for the source, select Kinesis.

  4. Select the Kinesis stream you want to create the event source mapping for and, optionally, a consumer of your stream.

  5. (Optional) edit the Batch size, Starting position, and Batch window for your event source mapping.

  6. Choose Add.

When creating your event source mapping from the console, your IAM role must have the kinesis:ListStreams and kinesis:ListStreamConsumers permissions.

AWS CLI
To create the Kinesis event source mapping
  • Run the following CLI command to create a Kinesis event source mapping. Choose your own batch size and starting position according to your use case.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

To specify a batching window, add the --maximum-batching-window-in-seconds option. For more information about using this and other parameters, see create-event-source-mapping in the AWS CLI Command Reference.

AWS SAM
To create the Kinesis event source mapping
  • In your function’s definition, add the KinesisEvent property as shown in the following example:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

To learn more about creating an event source mapping for Kinesis Data Streams in AWS SAM, see Kinesis in the AWS Serverless Application Model Developer Guide.

Polling and stream starting position

Be aware that stream polling during event source mapping creation and updates is eventually consistent.

  • During event source mapping creation, it may take several minutes to start polling events from the stream.

  • During event source mapping updates, it may take several minutes to stop and restart polling events from the stream.

This behavior means that if you specify LATEST as the starting position for the stream, the event source mapping could miss events during creation or updates. To ensure that no events are missed, specify the stream starting position as TRIM_HORIZON or AT_TIMESTAMP.

Creating a cross-account event source mapping

Amazon Kinesis Data Streams supports resource-based policies. Because of this, you can process data ingested into a stream in one AWS account with a Lambda function in another account.

To create an event source mapping for your Lambda function using a Kinesis stream in a different AWS account, you must configure the stream using a resource-based policy to give your Lambda function permission to read items. To learn how to configure your stream to allow cross-account access, see Sharing access with cross-account AWS Lambda functions in the Amazon Kinesis Streams Developer guide.

Once you’ve configured your stream with a resource-based policy that gives your Lambda function the required permissions, create the event source mapping using any of the methods described in the previous section.

If you choose to create your event source mapping using the Lambda console, paste the ARN of your stream directly into the input field. If you want to specify a consumer for your stream, pasting the ARN of the consumer automatically populates the stream field.