このチュートリアルでは、Amazon Kinesis データストリームのイベントを処理する Lambda 関数を作成します。
-
カスタムアプリケーションがストリームにレコードを書き込みます。
-
AWS Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。
-
AWS Lambda は、Lambda 関数の作成時に指定した実行ロールを引き受けることにより、Lambda 関数を実行します。
前提条件
AWS Command Line Interface をまだインストールしていない場合は、「最新バージョンの AWS CLI のインストールまたは更新」にある手順に従ってインストールしてください。
このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。
注記
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (zip
など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、Windows Subsystem for Linux をインストール
実行ロールを作成する
AWS リソースにアクセスするためのアクセス権限を関数に付与する実行ロールを作成します。
実行ロールを作成するには
-
IAM コンソールの [ロールページ
] を開きます。 -
[ロールの作成] を選択します。
-
次のプロパティでロールを作成します。
-
信頼されたエンティティ - AWS Lambda
-
アクセス許可 - AWSLambdaKinesisExecutionRole。
-
Role name –
lambda-kinesis-role
。
-
AWSLambdaKinesisExecutionRole ポリシーには、Kinesis から項目を読み取り、CloudWatch Logs にログを書き込むために関数が必要とするアクセス許可があります。
関数を作成する
Kinesis メッセージを処理する Lambda 関数を作成します。この関数コードは、Kinesis レコードのイベント ID とイベントデータを CloudWatch Logs にログ記録します。
このチュートリアルでは Node.js 18.x ランタイムを使用しますが、他のランタイム言語のサンプルコードも提供しています。次のボックスでタブを選択すると、関心のあるランタイムのコードが表示されます。このステップで使用する JavaScript コードは、[JavaScript] タブに表示されている最初のサンプルにあります。
- SDK for .NET
-
注記
GitHub には、その他のリソースもあります。サーバーレスサンプル
リポジトリで完全な例を検索し、設定および実行の方法を確認してください。 .NET を使用した Lambda での Kinesis イベントの消費。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
関数を作成するには
-
プロジェクト用のディレクトリを作成し、そのディレクトリに切り替えます。
mkdir kinesis-tutorial cd kinesis-tutorial
-
サンプル JavaScript コードを
index.js
という名前の新しいファイルにコピーします。 -
デプロイパッケージを作成します。
zip function.zip index.js
-
create-function
コマンドを使用して Lambda 関数を作成します。aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::
111122223333
:role/lambda-kinesis-role
Lambda 関数をテストする
invoke
AWS Lambda CLI コマンドおよびサンプルの Kinesis イベントを使用して、手動で Lambda 関数を呼び出します。
Lambda 関数をテストするには
-
以下の JSON をファイルにコピーし、
input.txt
という名前で保存します。{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
-
invoke
コマンドを使用して、関数にイベントを送信します。aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt
AWS CLI バージョン 2 を使用している場合、cli-binary-format オプションは必須です。これをデフォルト設定にするには、
aws configure set cli-binary-format raw-in-base64-out
を実行します。詳細については、バージョン 2 の AWS Command Line Interface ユーザーガイドの「AWS CLI でサポートされているグローバルコマンドラインオプション」を参照してください。レスポンスは
out.txt
に保存されます。
Kinesis Stream を作成する
create-stream
コマンドを使用して、スキーマを作成します。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
次の describe-stream
コマンドを実行して、ストリーム ARN を取得します。
aws kinesis describe-stream --stream-name lambda-stream
次のような出力が表示されます。
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }
次のステップで Lambda 関数にストリームを関連付けるために、ストリーム ARN を使用します。
AWS Lambda でイベントソースを追加する
次の AWS CLI add-event-source
コマンドを実行します。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST
後で使用するために、マッピング ID をメモしておきます。list-event-source-mappings
コマンドを実行して、イベントソースマッピングのリストを取得できます。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
レスポンスでは、ステータス値が enabled
であることを確認できます。イベントソースマッピングを無効にすると、レコードを失うことなくポーリングを一時停止できます。
セットアップをテストする
イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加します。--data
値は、文字列を Kinesis に送信する前に CLI で base64 にエンコードされる文字列です。同じコマンドを複数回実行して、複数のレコードをストリームに追加することができます。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."
Lambda は実行ロールを使用して、ストリームからレコードを読み取ります。次に、Lambda 関数を呼び出し、レコードのバッチを渡します。この関数は、各レコードからデータをデコードしてログ記録し、出力を CloudWatch Logs に送信します。CloudWatch コンソール
リソースのクリーンアップ
このチュートリアル用に作成したリソースは、保持しない場合は削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウント アカウントに請求される料金の発生を防ぎます。
実行ロールを削除する
-
IAM コンソールのロールページ
を開きます。 -
作成した実行ロールを選択します。
-
[削除] を選択します。
-
テキスト入力フィールドにロールの名前を入力し、[削除] を選択します。
Lambda 関数を削除するには
-
Lambda コンソールの関数
ページを開きます。 -
作成した関数を選択します。
-
[アクション] で、[削除] を選択します。
-
テキスト入力フィールドに
confirm
と入力し、[Delete] (削除) を選択します。
Kinesis ストリームを削除するには
-
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis
) を開きます。 -
作成したストリームを選択します。
-
[ Actions] で、[Delete ] を選択します。
-
テキスト入力フィールドに
delete
を入力します。 -
[削除] を選択します。