In this tutorial, you create a Lambda function to consume events from a Amazon Kinesis data stream.
-
Custom app writes records to the stream.
-
AWS Lambda polls the stream and, when it detects new records in the stream, invokes your Lambda function.
-
AWS Lambda runs the Lambda function by assuming the execution role you specified at the time you created the Lambda function.
Prerequisites
If you have not yet installed the AWS Command Line Interface, follow the steps at Installing or updating the latest version of the AWS CLI to install it.
The tutorial requires a command line terminal or shell to run commands. In Linux and macOS, use your preferred shell and package manager.
Note
In Windows, some Bash CLI commands that you commonly use with Lambda (such as zip
) are not supported by the operating system's built-in terminals.
To get a Windows-integrated version of Ubuntu and Bash, install the Windows Subsystem for Linux
Create the execution role
Create the execution role that gives your function permission to access AWS resources.
To create an execution role
-
Open the roles page
in the IAM console. -
Choose Create role.
-
Create a role with the following properties.
-
Trusted entity – AWS Lambda.
-
Permissions – AWSLambdaKinesisExecutionRole.
-
Role name –
lambda-kinesis-role
.
-
The AWSLambdaKinesisExecutionRole policy has the permissions that the function needs to read items from Kinesis and write logs to CloudWatch Logs.
Create the function
Create a Lambda function that processes your Kinesis messages. The function code logs the event ID and event data of the Kinesis record to CloudWatch Logs.
This tutorial uses the Node.js 18.x runtime, but we've also provided example code in other runtime languages. You can select the tab in the following box to see code for the runtime you're interested in. The JavaScript code you'll use in this step is in the first example shown in the JavaScript tab.
- SDK for .NET
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Serverless examples
repository. Consuming a Kinesis event with Lambda using .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
To create the function
-
Create a directory for the project, and then switch to that directory.
mkdir kinesis-tutorial cd kinesis-tutorial
-
Copy the sample JavaScript code into a new file named
index.js
. -
Create a deployment package.
zip function.zip index.js
-
Create a Lambda function with the
create-function
command.aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::
111122223333
:role/lambda-kinesis-role
Test the
Lambda function
Invoke your Lambda function manually using the invoke
AWS Lambda CLI command and a sample Kinesis
event.
To test the Lambda function
-
Copy the following JSON into a file and save it as
input.txt
.{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
-
Use the
invoke
command to send the event to the function.aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt
The cli-binary-format option is required if you're using AWS CLI version 2. To make this the default setting, run
aws configure set cli-binary-format raw-in-base64-out
. For more information, see AWS CLI supported global command line options in the AWS Command Line Interface User Guide for Version 2.The response is saved to
out.txt
.
Create a Kinesis stream
Use the create-stream
command to create a stream.
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
Run the following describe-stream
command to get the stream ARN.
aws kinesis describe-stream --stream-name lambda-stream
You should see the following output:
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }
You use the stream ARN in the next step to associate the stream with your Lambda function.
Add an event source in
AWS Lambda
Run the following AWS CLI add-event-source
command.
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST
Note the mapping ID for later use. You can get a list of event source mappings by running the
list-event-source-mappings
command.
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
In the response, you can verify the status value is enabled
. Event source mappings can be
disabled to pause polling temporarily without losing any records.
Test the setup
To test the event source mapping, add event records to your Kinesis stream. The --data
value is a
string that the CLI encodes to base64 prior to sending it to Kinesis. You can run the same command more than once to
add multiple records to the stream.
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."
Lambda uses the execution role to read records from the stream. Then it invokes your Lambda function, passing in
batches of records. The function decodes data from each record and logs it, sending the output to CloudWatch Logs. View the
logs in the CloudWatch console
Clean up your resources
You can now delete the resources that you created for this tutorial, unless you want to retain them. By deleting AWS resources that you're no longer using, you prevent unnecessary charges to your AWS account.
To delete the execution role
-
Open the Roles page
of the IAM console. -
Select the execution role that you created.
-
Choose Delete.
-
Enter the name of the role in the text input field and choose Delete.
To delete the Lambda function
-
Open the Functions page
of the Lambda console. -
Select the function that you created.
-
Choose Actions, Delete.
-
Type
confirm
in the text input field and choose Delete.
To delete the Kinesis stream
-
Sign in to the AWS Management Console and open the Kinesis console at https://console.aws.amazon.com/kinesis
. -
Select the stream you created.
-
Choose Actions, Delete.
-
Enter
delete
in the text input field. -
Choose Delete.