

# Lambda を使用して Amazon Kinesis Data Streams からのレコードを処理する
<a name="with-kinesis"></a>

Lambda 関数を使用して、[Amazon Kinesis データストリーム](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)のレコードを処理できます。Lambda 関数を Kinesis Data Streams 共有スループットコンシューマー (標準イテレーター) にマップすることも、[拡張ファンアウト](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)を使用する専用スループットコンシューマーにマップすることもできます。標準イテレーターの場合、Lambda は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。

 Kinesis Data Streams の詳細については、[Reading Data from Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html) を参照してください。

**注記**  
Kinesis は、各シャードに対して課金し、拡張ファンアウトの場合はストリームから読み取られたデータに対して課金します。料金の詳細については、[Amazon Kinesis の料金](https://aws.amazon.com/kinesis/data-streams/pricing)を参照してください。

## ポーリングストリームとバッチストリーム
<a name="kinesis-polling-and-batching"></a>

Lambda はデータストリームからレコードを読み取り、関数を、ストリームのレコードを含むイベントと共に[同期的に](invocation-sync.md)呼び出します。Lambda はバッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。各バッチには、単一のシャード/データストリームのレコードが含まれます。

Lambda 関数は、データストリームのコンシューマーアプリケーションです。シャードごとに 1 つのレコードのバッチを一度に処理します。Lambda 関数を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。
+ **標準イテレーター:** Lambda は、レコードの Kinesis ストリームにある各シャードを 1 秒あたり 1 回の基本レートでポーリングします。利用可能なレコードが増えると、Lambda は関数がストリームに追いつくまでバッチを処理し続けます。イベントソースマッピングは、シャードの他のコンシューマーと読み取りスループットを共有します。
+ **拡張ファンアウト:** レイテンシーを最小限に抑え、読み取りスループットを最大化するには、[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)を使用してデータストリームコンシューマーを作成します。拡張ファンアウトを使用するコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。ストリームのコンシューマーは HTTP/2 を使用して、長時間にわたる接続とリクエストヘッダーの圧縮でレコードを Lambda にプッシュすることによってレイテンシーを短縮します。ストリームコンシューマーは、Kinesis [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) API を使用して作成できます。

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

次のような出力が表示されます。

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

関数がレコードを処理する速度を上げるには、[データストリームにシャードを追加します](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards)。Lambda は、各シャードのレコードを順番に処理します。関数からエラーが返された場合、シャードのさらなるレコードの処理は停止されます。シャードが増えると、一度に処理されるバッチが増え、同時実行のエラーの影響を下げることができます。

同時実行のバッチの合計分を処理できるように関数をスケールアップできない場合は、関数の[クォータ引き上げをリクエスト](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html)するか、[同時実行数を予約](configuration-concurrency.md)します。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、*バッチ処理ウィンドウ*を設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

**警告**  
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「[Lambda 関数を冪等にするにはどうすればよいですか?](https://repost.aws/knowledge-center/lambda-function-idempotent)」を参照してください。

Lambda は、設定された[拡張機能](lambda-extensions.md)の完了を待たずに、次のバッチを処理のために送信します。つまり、Lambda がレコードの次のバッチを処理する間も拡張機能を実行し続けることができます。アカウントの[同時実行](lambda-concurrency.md)設定または制限に違反すると、スロットリングの問題が発生する可能性があります。これが潜在的な問題であるかどうかを検出するには、関数を監視し、イベントソースマッピングで予想よりも高い[同時実行メトリクス](monitoring-concurrency.md#general-concurrency-metrics)が表示されているかどうかを確認します。呼び出しの間隔が短いため、Lambda はシャードの数よりも高い同時実行使用量を一時的に報告する場合があります。これは、拡張機能のない Lambda 関数にも当てはまります。

Kinesis データストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するには、[ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 設定を構成します。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)～10 の並列化係数で指定できます。例えば、`ParallelizationFactor` を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、100 個の Kinesis データシャードを処理できます (ただし、`ConcurrentExecutions` メトリクスに異なる値が表示される場合があります)。これにより、データボリュームが揮発性で `IteratorAge` が高いときに処理のスループットをスケールアップすることができます。シャードごとの同時実行バッチの数を増やしても、Lambda はパーティションキーレベルで順序立った処理を確実に行います。

Kinesis 集約で `ParallelizationFactor` を使用することもできます。イベントソースマッピングの動作は、[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)を使用しているかどうかによって異なります。
+ **拡張ファンアウトなし**: 集約イベント内のすべてのイベントは、同じパーティションキーを持つ必要があります。パーティションキーは、集約イベントのパーティションキーとも一致する必要があります。集約イベント内のイベントに異なるパーティションキーがある場合、Lambda ではパーティションキーによるイベントが順序通りに処理されないことがあります。
+ **拡張ファンアウトあり**: まず、Lambda は集約イベントを個々のイベントにデコードします。集約イベントには、含まれるイベントとは異なるパーティションキーを設定できます。ただし、パーティションキーに一致しないイベントは[削除され、失われ](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md)ます。Lambda ではこれらのイベントは処理されず、設定された障害時の送信先には送信されません。

## イベントの例
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Lambda を使用した Amazon Kinesis Data Streams レコードの処理
<a name="services-kinesis-create"></a>

Lambda を使用して Amazon Kinesis Data Streams レコードを処理するには、Lambda イベントソースマッピングを作成します。Lambda 関数は標準イテレーターか、拡張ファンアウトコンシューマーにマッピングすることができます。詳細については、「[ポーリングストリームとバッチストリーム](with-kinesis.md#kinesis-polling-and-batching)」を参照してください。

## Kinesis イベントソースマッピングを作成する
<a name="services-kinesis-eventsourcemapping"></a>

データストリームからのレコードを使用して Lambda 関数を呼び出すには、[イベントソースマッピング](invocation-eventsourcemapping.md)を作成します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のデータストリームの項目を処理したりできます。複数のストリームから項目を処理する場合、各バッチには 1 つのシャードまたはストリームのレコードのみが含まれます。

別の AWS アカウント のストリームからのレコードを処理するようにイベント ソース マッピングを構成できます。詳細については[クロスアカウントのイベントソースマッピングの作成](#services-kinesis-eventsourcemapping-cross-account)を参照してください。

イベントソースマッピングを作成する前に、Kinesis データストリームから読み取るためのアクセス許可を Lambda 関数に付与する必要があります。Lambda には、Kinesis データストリームに関連するリソースを管理するために次のアクセス許可が必要です。
+ [kinesis:DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis:DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis:GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis:GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis:ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis:SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

AWS マネージドポリシー [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html) には、これらのアクセス許可が含まれています。次の手順の説明に従って、この管理ポリシーを関数に追加します。

**注記**  
Kinesis のイベントソースマッピングを作成および管理するための `kinesis:ListStreams` アクセス許可は必要ありません。ただし、コンソールでイベントソースマッピングを作成し、このアクセス許可がない場合、ドロップダウンリストから Kinesis ストリームを選択できず、コンソールにエラーが表示されます。イベントソースマッピングを作成するには、ストリームの Amazon リソースネーム (ARN) を手動で入力する必要があります。
Lambda は、失敗した呼び出しを再試行する際に `kinesis:GetRecords` および `kinesis:GetShardIterator` API 呼び出しを行います。

------
#### [ AWS マネジメントコンソール ]

**関数に Kinesis アクセス許可を追加するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開き、関数を選択します。

1. **[構成]** タブで、**[アクセス許可]** を選択します。

1. **[実行ロール]** ペインの **[ロール名]** で、関数の実行ロールへのリンクを選択します。このリンクを選択すると、IAM コンソールでそのロールのページが開きます。

1. **[アクセス許可ポリシー]** ペインで、**[アクセス許可を追加]** を選択し、**[ポリシーをアタッチ]** を続けて選択します。

1. [検索] フィールドに **AWSLambdaKinesisExecutionRole** を入力します。

1. ポリシーの名前の横にあるチェックボックスを選択し、**[アクセス許可を追加]** を選択します。

------
#### [ AWS CLI ]

**関数に Kinesis アクセス許可を追加するには**
+ 次の CLI コマンドを実行して、`AWSLambdaKinesisExecutionRole` ポリシーを関数の実行ロールに追加します。

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**関数に Kinesis アクセス許可を追加するには**
+ 関数の定義で、次の例に示すように `Policies` プロパティを追加します。

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

必要なアクセス許可を設定した後、イベントソースマッピングを作成します。

------
#### [ AWS マネジメントコンソール ]

**Kinesis イベントソースマッピングを作成するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開き、関数を選択します。

1. **[関数の概要]** ペインで、**[トリガーを追加]** を選択します。

1. **[トリガー設定]** で、ソースとして **[Kinesis]** を選択します。

1. イベントソースマッピングを作成する Kinesis ストリームを選択し、オプションでストリームのコンシューマーを選択します。

1. (オプション) イベントソースマッピングの**バッチサイズ** 、**開始位置**、**バッチウィンドウ**を編集します。

1. **[Add]** (追加) を選択します。

コンソールからイベントソースマッピングを作成する場合は、IAM ロールには [kinesis:ListStreams](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html) 権限と [kinesis:ListStreamConsumers](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html) 権限が必要です。

------
#### [ AWS CLI ]

**Kinesis イベントソースマッピングを作成するには**
+ 次の CLI コマンドを実行して、Kinesis イベントソースマッピングを作成します。ユースケースに応じて、独自のバッチサイズと開始位置を選択します。

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

バッチ処理ウィンドウを指定するには、`--maximum-batching-window-in-seconds` オプションを追加します。このパラメータおよびその他のパラメータの使用の詳細については、「*AWS CLI コマンドリファレンス*」の「[create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)」を参照してください。

------
#### [ AWS SAM ]

**Kinesis イベントソースマッピングを作成するには**
+ 関数の定義で、次の例に示すように `KinesisEvent` プロパティを追加します。

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

AWS SAM で Kinesis Data Streams のイベントソースマッピングを作成する方法の詳細については、「*AWS Serverless Application Model デベロッパーガイド*」の「[Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html)」を参照してください。

------

## ポーリングとストリームの開始位置
<a name="services-kinesis-stream-start-pos"></a>

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。
+ イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。
+ イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、`LATEST` をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` または `AT_TIMESTAMP` として指定します。

## クロスアカウントのイベントソースマッピングの作成
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

[Amazon Kinesis Data Streams](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html) は、リソースベースのポリシーをサポートします。このため、別のアカウントの Lambda 関数を使用して AWS アカウント のストリームに取り込まれたデータを処理できます。

別の AWS アカウント の Kinesis ストリームを使用して Lambda 関数のイベントソースマッピングを作成するには、リソースベースのポリシーを使用してストリームを設定し、Lambda 関数に項目を読み取るアクセス許可を付与する必要があります。クロスアカウントアクセスを許可するようにストリームを設定する方法については、「*Amazon Kinesis Streams デベロッパーガイド*」の「[クロスアカウント AWS Lambda 関数とアクセスを共有する](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda)」を参照してください。

Lambda 関数に必要なアクセス許可を付与するリソースベースのポリシーでストリームを設定したら、前のセクションで説明した方法のいずれかを使用してイベントソースマッピングを作成します。

Lambda コンソールでイベントソースマッピングを作成する場合は、ストリームの ARN を入力フィールドに直接貼り付けます。ストリームにコンシューマーを指定する場合、コンシューマーの ARN を貼り付けると、ストリームフィールドが自動的に入力されます。

# Kinesis Data Streams と Lambda を使用した部分的なバッチレスポンスの設定
<a name="services-kinesis-batchfailurereporting"></a>

イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に Lambda チェックポイントが設定されます。Lambda は、他のすべての結果を完全な失敗として扱い、再試行の上限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、`ReportBatchItemFailures`をオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。

`ReportBatchItemFailures` をオンにするには、列挙値 **ReportBatchItemFailures** を [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) リストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、イベントソースマッピングを[作成](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)または[更新](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)するときに設定できます。

**注記**  
関数コードが部分的なバッチ処理失敗レスポンスを返しても、イベントソースマッピングに対して `ReportBatchItemFailures` 機能が明示的に有効化されていなければ、Lambda はこれらのレスポンスを処理しません。

## レポートの構文
<a name="streams-batchfailurereporting-syntax"></a>

バッチアイテムの失敗に関するレポートを設定する場合、`StreamsEventResponse` クラスはバッチアイテムの失敗のリストとともに返されます。`StreamsEventResponse`オブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**注記**  
`batchItemFailures` 配列に複数の項目が含まれている場合、Lambda はシーケンス番号が最も小さいレコードをチェックポイントとして使用します。その後、Lambda はそのチェックポイントからすべてのレコードを再試行します。

## 成功条件と失敗の条件
<a name="streams-batchfailurereporting-conditions"></a>

次のいずれかを返すと、Lambda はバッチを完全な成功として処理します:
+ 空の`batchItemFailure`リストです。
+ null の `batchItemFailure` リスト
+ 空の `EventResponse`
+ ヌル `EventResponse`

次のいずれかを返すと、Lambda はバッチを完全な失敗として処理します:
+ 空の文字列`itemIdentifier`
+ ヌル `itemIdentifier`
+ `itemIdentifier`間違えているキー名

Lambda は、再試行戦略に基づいて失敗を再試行します。

## バッチを２分割します
<a name="streams-batchfailurereporting-bisect"></a>

呼び出しが失敗し、`BisectBatchOnFunctionError` オンになっている場合、バッチは`ReportBatchItemFailures`設定に関係なく２分割されます。

部分的なバッチ成功レスポンスを受信し、`BisectBatchOnFunctionError` と `ReportBatchItemFailures` の両方がオンになっている場合、バッチは返されたシーケンス番号で 2 分割され、Lambda は残りのレコードのみを再試行します。

部分的なバッチレスポンスロジックの実装を簡素化するには、Powertools for AWS Lambda の[バッチプロセッサユーティリティ](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)を使用することを検討してください。これらの複雑さが自動的に処理されます。

バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Javascript を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
TypeScript を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用して Lambda で Kinesis バッチアイテム失敗のレポートをします。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Kinesis バッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Powertools for AWS Lambda バッチプロセッサを使用する
<a name="services-kinesis-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda のバッチプロセッサユーティリティは、部分的なバッチレスポンスロジックを自動的に処理するため、バッチ障害レポートの実装の複雑さが軽減されます。バッチプロセッサを使用した例を次に示します。

**Python**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した Kinesis Data Streams ストリームレコードの処理。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した Kinesis Data Streams ストリームレコードの処理。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した Kinesis Data Streams ストリームレコードの処理。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/)を参照してください。
AWS Lambda バッチプロセッサを使用した Kinesis Data Streams ストリームレコードの処理。  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Lambda で Kinesis Data Streams イベントソースの破棄されたバッチレコードを保持する
<a name="kinesis-on-failure-destination"></a>

Kinesis イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。
+ **呼び出し前:** スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) を超えるまで再試行します。
+ **呼び出し中:** 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) を超えるか、設定された再試行クォータ ([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)) に達するまで再試行します。関数エラーの場合、[BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError) を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 週間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

## 失敗した呼び出しの送信先の設定
<a name="kinesis-on-failure-destination-console"></a>

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、Amazon S3 バケット、または Kafka を送信先として設定できます。

Amazon S3 送信先を使用すると、[Amazon S3 イベント通知](https://docs.aws.amazon.com/)機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。

実行ロールには、送信先に対するアクセス許可が必要です。
+ **SQS 送信先の場合:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **SNS 送信先の場合:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **S3 送信先の場合:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) および [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Kafka 送信先の場合:** [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

Kafka トピックは、Kafka イベントソースマッピングの障害発生時の送信先として設定できます。再試行回数を超えた後、またはレコードが最大経過時間を超えた後に Lambda がレコードを処理できなくなると、Lambda は失敗したレコードを指定された Kafka トピックに送信して、後で処理します。「[Kafka トピックを障害発生時の送信先として使用する](kafka-on-failure-destination.md)」を参照してください。

S3 送信先に対して独自の KMS キーを使用した暗号化を有効にしている場合、関数の実行ロールには [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) を呼び出すためのアクセス許可も必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

1. Lambda コンソールの [[関数ページ]](https://console.aws.amazon.com/lambda/home#/functions) を開きます。

1. 関数を選択します。

1. [**機能の概要 **] で、[**送信先を追加 **] を選択します。

1. **[ソース]** には、**[イベントソースマッピング呼び出し]** を選択します。

1. **[イベントソースマッピング]** では、この関数用に設定されているイベントソースを選択します。

1. **[条件]** には **[失敗時]** を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

1. **[送信先タイプ]** では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

1. [**送信先**] で、リソースを選択します。

1. **[保存]** を選択します。

AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) コマンドは、SQS を障害発生時の送信先として持つイベントソースマッピングを `MyFunction` に追加します。

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

次の [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) コマンドは、2 回の再試行後、またはレコードが 1 時間以上経過した場合に失敗した呼び出しレコードを SNS 送信先に送信するように、イベントソースマッピングを更新します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、[get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) コマンドを使用します。

送信先を削除するには、`destination-config` パラメータの引数として空の文字列を指定します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 送信先のセキュリティのベストプラクティス
<a name="kinesis-s3-destination-security"></a>

関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。

**警告**  
関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、`s3:PutObject` アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。

次の例は、関数の `s3:PutObject` アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な `s3:ListBucket` アクセス許可も Lambda に付与します。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

AWS マネジメントコンソールまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。

------
#### [ Console ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (コンソール)**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 実行ロールを変更する Lambda 関数を選択します。

1. **[構成]** タブで、**[アクセス許可]** を選択します。

1. **[実行ロール]** タブで、関数の **[ロール名]** を選択して、ロールの IAM コンソールページを開きます。

1. 次の手順を実行してアクセス許可ポリシーをロールに追加します。

   1. **[アクセス許可ポリシー]** ペインで、**[アクセス許可の追加]**、**[インラインポリシーを作成]** を選択します。

   1. **ポリシーエディタ**で、**[JSON]** を選択します。

   1. 追加するポリシーをエディタに貼り付け (既存の JSON を置き換える)、**[次へ]** を選択します。

   1. **[ポリシーの詳細]** で **[ポリシー名]** を入力します。

   1. [**Create policy**] (ポリシーの作成) を選択します。

------
#### [ AWS CLI ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (CLI)**

1. 必要なアクセス許可を持つ JSON ポリシードキュメントを作成し、ローカルディレクトリに保存します。

1. IAM `put-role-policy` CLI コマンドを使用して、関数の実行ロールにアクセス許可を追加します。JSON ポリシードキュメントを保存したディレクトリから次のコマンドを実行して、ロール名、ポリシー名、ポリシードキュメントを独自の値に置き換えます。

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Amazon SNS および Amazon SQS の呼び出しレコードの例
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

以下の例は、Kinesis イベントソース呼び出しが失敗した場合に Lambda が SQS キューまたは SNS トピックに送信する内容を示しています。Lambda はこれらの送信先タイプにメタデータのみを送信するため、元のレコード全体を取得するには、`streamArn`、`shardId`、`startSequenceNumber`、`endSequenceNumber` の各フィールドを使用します。`KinesisBatchInfo` プロパティに表示されるフィールドはすべて常に存在します。

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。

### Amazon S3 呼び出しレコードの例
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

次の例は、Kinesis イベントソースの呼び出しが失敗した場合に Amazon S3 バケットに送信する内容を示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、`payload` フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

呼び出しレコードを含む S3 オブジェクトでは、次の命名規則が使用されます。

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Lambda でのステートフル Kinesis Data Streams 処理の実装
<a name="services-kinesis-windows"></a>

Lambda 関数は、連続ストリーム処理アプリケーションを実行できます。ストリームは、アプリケーションを継続的に流れる無限のデータを表します。この継続的に更新される入力からの情報を分析するために、時間に関して定義されたウィンドウを使用して、含まれるレコードをバインドできます。

タンブリングウィンドウは、一定の間隔で開閉する別個のタイムウィンドウです。ディフォルトでは、Lambda 呼び出しはステートレス — 外部データベースがない場合、複数の連続した呼び出しでデータを処理するために使用することはできません。ただし、タンブリングウィンドウを使用して、呼び出し間で状態を維持できます。この状態は、現在のウィンドウに対して以前に処理されたメッセージの集計結果が含まれます。状態は、シャードごとに最大 1 MB にすることができます。このサイズを超えると、Lambda はウィンドウを早期に終了します。

ストリームの各レコードは、特定のウィンドウに属しています。Lambda は各レコードを少なくとも 1 回処理しますが、各レコードが 1 回だけ処理される保証はありません。エラー処理などのまれなケースでは、一部のレコードが複数回処理されることがあります。レコードは常に最初から順番に処理されます。レコードが複数回処理される場合、順不同で処理されます。

## 集約と処理
<a name="streams-tumbling-processing"></a>

ユーザー管理関数は、集約と、その集約の最終結果を処理するために呼び出されます。Lambda は、ウィンドウで受信したすべてのレコードを集約します。これらのレコードは、個別の呼び出しとして複数のバッチで受け取ることができます。各呼び出しは状態を受け取ります。したがって、タンブリングウィンドウを使用する場合、Lambda 関数の応答に `state` プロパティが含まれている必要があります。応答に `state` プロパティが含まれてないと、Lambda はこれを失敗した呼び出しと見なします。この条件を満たすために、関数は次の JSON 形式の `TimeWindowEventResponse` オブジェクトを返すことができます。

**Example `TimeWindowEventResponse`値**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注記**  
Java 関数の場合は、`Map<String, String>`を使用して状態を表すことをお勧めします。

ウィンドウの最後で、フラグ`isFinalInvokeForWindow`が`true`に設定され、これが最終状態であり、処理の準備ができていることが示されます。処理が完了すると、ウィンドウが完了し、最終的な呼び出しが完了し、状態は削除されます。

ウィンドウの最後に、Lambda は集計結果に対するアクションの最終処理を使用します。最終処理が同期的に呼び出されます。呼び出しが成功すると、関数はシーケンス番号をチェックポイントし、ストリーム処理が続行されます。呼び出しが失敗した場合、Lambda 関数は呼び出しが成功するまで処理を一時停止します。

**Example kinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## 設定
<a name="streams-tumbling-config"></a>

イベントソースマッピングを作成または更新するときに 、タンブリングウィンドウを設定できます。タンブリングウィンドウを設定するには、ウィンドウを秒単位で指定します ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds))。次の例の AWS Command Line Interface (AWS CLI) コマンドは、タンブリングウィンドウが 120 秒に設定されたストリーミングイベントソースマッピングを作成します。集約と処理のために Lambda 関数が定義した関数の名前は `tumbling-window-example-function` です。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda は、レコードがストリームに挿入された時間に基づいて、タンブリングウィンドウの境界を決定します。すべてのレコードには、Lambda が境界の決定に使用するおおよそのタイムスタンプがあります。

ウィンドウの集合をタンブルしても、再共有はサポートされません。シャードが終了すると、Lambda はウィンドウが閉じられると見なし、子シャードは新しい状態で独自のウィンドウを開始します。現在のウィンドウに新しいレコードが追加されていない場合、Lambda は最大で 2 分間待機してから、ウィンドウが終了したと見なします。これにより、レコードが断続的に追加された場合でも、関数は現在のウィンドウ内のすべてのレコードを読み取ることができます。

タンブルウィンドウは、既存の再試行ポリシー`maxRetryAttempts`および`maxRecordAge`を完全にサポートします。

**Example Handler.py - 集約と処理**  
次の Python 関数は、最終状態を集約して処理する方法を示しています。  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Amazon Kinesis Data Streams イベントソースマッピングの Lambda パラメータ
<a name="services-kinesis-parameters"></a>

すべての Lambda イベントソースマッピングで、同じ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) および [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API オペレーションが共有されます。ただし、Kinesis に適用されるのは一部のパラメータのみです。


| [Parameter] (パラメータ) | 必須 | デフォルト | メモ | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  N  |  100  |  最大: 10,000  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  N  |  false  |  なし | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  N  | 該当なし |  破棄されたレコードの Amazon SQS キューまたは Amazon SNS トピックの送信先。詳細については、「[失敗した呼び出しの送信先の設定](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console)」を参照してください。  | 
|  [[Enabled]](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled) (有効)  |  N  |  true  |  なし | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  Y  | 該当なし |  データストリームまたはストリームコンシューマーの ARN。  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  Y  | 該当なし |  なし | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  N  |  該当なし |  関数がバッチ内の特定の失敗を報告できるようにするには、`FunctionResponseTypes` に値 `ReportBatchItemFailures` を含めます。詳細については、「[Kinesis Data Streams と Lambda を使用した部分的なバッチレスポンスの設定](services-kinesis-batchfailurereporting.md)」を参照してください。  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  N  |  0  |  なし | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  N  |  -1  |  -1 は無制限を意味します: Lambda はレコードを破棄しません ([Kinesis Data Streams データ保持設定](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html)は引き続き適用されます) 最小: -1 最大: 604,800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  N  |  -1  |  -1 に設定すると無制限になり、失敗したレコードはレコードの有効期限が切れるまで再試行されます。 最小: -1 最大: 10,000  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  N  |  1  |  最大: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  Y  |  該当なし |  AT\$1TIMESTAMP、TRIM\$1HORIZON、または LATEST  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  N  |  該当なし |  StartingPosition が AT\$1TIMESTAMP に設定されている場合にのみ有効です。Unix タイム秒単位で読み取りをスタートする時間  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  N  |  該当なし |  最小: 0 最大: 900  | 

# Kinesis イベントソースでのイベントフィルタリングの使用
<a name="with-kinesis-filtering"></a>

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「[Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)」を参照してください。

このセクションでは、Kinesis イベントソースのイベントフィルタリングに焦点を当てます。

**注記**  
Kinesis イベントソースマッピングは、 `data` キーでのフィルタリングのみをサポートします。

**Topics**
+ [Kinesis イベントフィルタリングの基本](#filtering-kinesis)
+ [Kinesis 集約レコードのフィルタリング](#filtering-kinesis-efo)

## Kinesis イベントフィルタリングの基本
<a name="filtering-kinesis"></a>

プロデューサーが JSON 形式のデータを Kinesis データストリームに入力するとします。レコードの例は次のようになり、`data` フィールドで JSON データが Base64 でエンコードされた文字列に変換されます。

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

プロデューサーがストリームに入力するデータが有効な JSON である限り、イベントフィルタリングを使用して `data` キーを使用するレコードをフィルタリングできます。プロデューサーが次の JSON 形式でレコードを Kinesis ストリームに入力するとします。

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

注文タイプが「購入」のレコードのみをフィルタリングするには、`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Kinesis ソースからイベントを適切にフィルタリングするには、データフィールドおよびデータフィールドのフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。


| 着信データの形式 | データプロパティのフィルターパターンの形式 | 結果として生じるアクション | 
| --- | --- | --- | 
|  有効な JSON  |  有効な JSON  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  有効な JSON  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 
|  JSON 以外  |  有効な JSON  |  Lambda がレコードをドロップします。  | 
|  JSON 以外  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  JSON 以外  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 

## Kinesis 集約レコードのフィルタリング
<a name="filtering-kinesis-efo"></a>

Kinesis を使用すると、複数のレコードを 1 つの Kinesis データストリームレコードに集約し、データスループットを増加させることができます。Lambda は、Kinesis 「[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)」を使用する場合に限り、集約レコードにフィルター条件を適用できます。標準 Kinesis による集約レコードのフィルタリングはサポートされていません。拡張ファンアウトを使用するときは、Kinesis 専用スループットコンシューマーが Lambda 関数のトリガーとして機能するように設定します。次に、Lambda は集約されたレコードをフィルタリングし、フィルター条件を満たすレコードのみを渡します。

Kinesis レコード集約の詳細については、「Kinesis プロデューサーライブラリ (KPL) のキーコンセプト」ページの「[集約](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation)」セクションを参照してください。Kinesis 拡張ファンアウトを用いた Lambda の使用に関する詳細については、「AWS コンピュートブログ」の「[Amazon Kinesis Data Streams 拡張ファンアウトおよび AWS Lambda でのリアルタイムストリーム処理パフォーマンスの向上](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/)」を参照してください。

# チュートリアル: Lambda を Kinesis Data Streams で使用する
<a name="with-kinesis-example"></a>

このチュートリアルでは、Amazon Kinesis データストリームのイベントを処理する Lambda 関数を作成します。

1. カスタムアプリケーションがストリームにレコードを書き込みます。

1. AWS Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。

1. AWS Lambda は、Lambda 関数の作成時に指定した実行ロールを引き受けることにより、Lambda 関数を実行します。

## 前提条件
<a name="with-kinesis-prepare"></a>

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## 実行ロールを作成する
<a name="with-kinesis-example-create-iam-role"></a>

AWS リソースにアクセスするためのアクセス権限を関数に付与する[実行ロール](lambda-intro-execution-role.md)を作成します。

**実行ロールを作成するには**

1. IAM コンソールの [[ロールページ](https://console.aws.amazon.com/iam/home#/roles)] を開きます。

1. [**ロールの作成**] を選択します。

1. 次のプロパティでロールを作成します。
   + **信頼されたエンティティ** - **AWS Lambda**
   + **アクセス許可** - **AWSLambdaKinesisExecutionRole**。
   + **Role name** – **lambda-kinesis-role**。

**AWSLambdaKinesisExecutionRole** ポリシーには、Kinesis から項目を読み取り、CloudWatch Logs にログを書き込むために関数が必要とするアクセス許可があります。

## 関数を作成する
<a name="with-kinesis-example-create-function"></a>

Kinesis メッセージを処理する Lambda 関数を作成します。この関数コードは、Kinesis レコードのイベント ID とイベントデータを CloudWatch Logs にログ記録します。

このチュートリアルでは Node.js 24 ランタイムを使用しますが、他のランタイム言語のサンプルコードも提供しています。次のボックスでタブを選択すると、関心のあるランタイムのコードが表示されます。このステップで使用する JavaScript コードは、**[JavaScript]** タブに表示されている最初のサンプルにあります。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
TypeScript を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用した Lambda での Kinesis イベントの消費。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用した Lambda での Kinesis イベントの消費。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**関数を作成するには**

1. プロジェクト用のディレクトリを作成し、そのディレクトリに切り替えます。

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. サンプル JavaScript コードを `index.js` という名前の新しいファイルにコピーします。

1. デプロイパッケージを作成します。

   ```
   zip function.zip index.js
   ```

1. `create-function` コマンドを使用して Lambda 関数を作成します。

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Lambda 関数をテストする
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

`invoke`AWS Lambda CLI コマンドおよびサンプルの Kinesis イベントを使用して、手動で Lambda 関数を呼び出します。

**Lambda 関数をテストするには**

1. 以下の JSON をファイルにコピーし、`input.txt` という名前で保存します。

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. `invoke` コマンドを使用して、関数にイベントを送信します。

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   AWS CLI バージョン 2 を使用している場合、**cli-binary-format** オプションは必須です。これをデフォルト設定にするには、`aws configure set cli-binary-format raw-in-base64-out` を実行します。詳細については、バージョン 2 の AWS Command Line Interface ユーザーガイドの「[AWS CLI でサポートされているグローバルコマンドラインオプション](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)」を参照してください。

   レスポンスは `out.txt` に保存されます。

## Kinesis Stream を作成する
<a name="with-kinesis-example-configure-event-source-create"></a>

`create-stream ` コマンドを使用して、スキーマを作成します。

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

次の `describe-stream` コマンドを実行して、ストリーム ARN を取得します。

```
aws kinesis describe-stream --stream-name lambda-stream
```

次のような出力が表示されます。

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

次のステップで Lambda 関数にストリームを関連付けるために、ストリーム ARN を使用します。

## AWS Lambda でイベントソースを追加する
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

次の AWS CLI `add-event-source` コマンドを実行します。

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

後で使用するために、マッピング ID をメモしておきます。`list-event-source-mappings` コマンドを実行して、イベントソースマッピングのリストを取得できます。

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

レスポンスでは、ステータス値が `enabled` であることを確認できます。イベントソースマッピングを無効にすると、レコードを失うことなくポーリングを一時停止できます。

## セットアップをテストする
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加します。`--data` 値は、文字列を Kinesis に送信する前に CLI で base64 にエンコードされる文字列です。同じコマンドを複数回実行して、複数のレコードをストリームに追加することができます。

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda は実行ロールを使用して、ストリームからレコードを読み取ります。次に、Lambda 関数を呼び出し、レコードのバッチを渡します。この関数は、各レコードからデータをデコードしてログ記録し、出力を CloudWatch Logs に送信します。[CloudWatch コンソール](https://console.aws.amazon.com/cloudwatch)でログを表示する

## リソースのクリーンアップ
<a name="cleanup"></a>

このチュートリアル用に作成したリソースは、保持しない場合は削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウント アカウントに請求される料金の発生を防ぎます。

**実行ロールを削除する**

1. IAM コンソールの [[ロール]](https://console.aws.amazon.com/iam/home#/roles) ページを開きます。

1. 作成した実行ロールを選択します。

1. **[削除]** を選択します。

1. テキスト入力フィールドにロールの名前を入力し、**[削除]** を選択します。

**Lambda 関数を削除するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 作成した関数を選択します。

1. **[アクション]** で、**[削除]** を選択します。

1. テキスト入力フィールドに **confirm** と入力し、**[Delete]** (削除) を選択します。

**Kinesis ストリームを削除するには**

1. AWS マネジメントコンソール にサインインし、Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. 作成したストリームを選択します。

1. [** Actions**] で、[**Delete **] を選択します。

1. テキスト入力フィールドに **delete** を入力します。

1. **[Delete]** (削除) をクリックします。