セルフマネージド型 Apache Kafka イベントソースでのイベントフィルタリングの使用 - AWS Lambda

セルフマネージド型 Apache Kafka イベントソースでのイベントフィルタリングの使用

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「Lambda が関数に送信するイベントを制御する」を参照してください。

このセクションでは、セルフマネージド型 Apache Kafka イベントソースのイベントフィルタリングに焦点を当てます。

セルフマネージド型 Apache Kafka イベントフィルタリングの基本

プロデューサーがセルフマネージド型 Apache Kafka クラスターのトピックに、有効な JSON 形式またはプレーン文字列として、メッセージを書き込むとします。レコードの例は次のようになり、value フィールドでメッセージが Base64 でエンコードされた文字列に変換されます。

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

Apache Kafka プロデューサーが次の JSON 形式でトピックにメッセージを書き込むとします。

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

value キーを使用してレコードをフィルタリングできます。device_ID が AB の文字で始まるレコードのみをフィルタリングするとします。FilterCriteria オブジェクトは次のようになります。

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern の値を記載しています。

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

Console

コンソールを使用してこのフィルターを追加するには、イベントソースマッピングへのフィルター条件のアタッチ (コンソール) の指示に従って [フィルター条件] に次の文字列を入力します。

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

セルフマネージド型 Apache Kafka では、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。文字列が「error」を含むメッセージを無視するとします。FilterCriteria オブジェクトは次のようになります。

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern の値を記載しています。

{ "value": [ { "anything-but": [ "error" ] } ] }

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

Console

コンソールを使用してこのフィルターを追加するには、イベントソースマッピングへのフィルター条件のアタッチ (コンソール) の指示に従って [フィルター条件] に次の文字列を入力します。

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

セルフマネージド型 Apache Kafka メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Kafka のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria 形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。

着信メッセージの形式 メッセージプロパティのフィルターパターン形式 結果として生じるアクション

プレーン文字列

プレーン文字列

Lambda がフィルター条件に基づいてフィルタリングを実行します。

プレーン文字列

データプロパティのフィルターパターンがない

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

プレーン文字列

有効な JSON

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

有効な JSON

プレーン文字列

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

有効な JSON

データプロパティのフィルターパターンがない

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。

有効な JSON

有効な JSON

Lambda がフィルター条件に基づいてフィルタリングを実行します。

UTF-8 以外でエンコードされた文字

JSON、プレーン文字列、またはパターンなし

Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。