イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「Lambda が関数に送信するイベントを制御する」を参照してください。
このセクションでは、セルフマネージド型 Apache Kafka イベントソースのイベントフィルタリングに焦点を当てます。
セルフマネージド型 Apache Kafka イベントフィルタリングの基本
プロデューサーがセルフマネージド型 Apache Kafka クラスターのトピックに、有効な JSON 形式またはプレーン文字列として、メッセージを書き込むとします。レコードの例は次のようになり、value
フィールドでメッセージが Base64 でエンコードされた文字列に変換されます。
{
"mytopic-0":[
{
"topic":"mytopic",
"partition":0,
"offset":15,
"timestamp":1545084650987,
"timestampType":"CREATE_TIME",
"value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers":[]
}
]
}
Apache Kafka プロデューサーが次の JSON 形式でトピックにメッセージを書き込むとします。
{
"device_ID": "AB1234",
"session":{
"start_time": "yyyy-mm-ddThh:mm:ss",
"duration": 162
}
}
value
キーを使用してレコードをフィルタリングできます。device_ID
が AB の文字で始まるレコードのみをフィルタリングするとします。FilterCriteria
オブジェクトは次のようになります。
{
"Filters": [
{
"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"
}
]
}
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{
"value": {
"device_ID": [ { "prefix": "AB" } ]
}
}
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
コンソールを使用してこのフィルターを追加するには、イベントソースマッピングへのフィルター条件のアタッチ (コンソール) の指示に従って [フィルター条件] に次の文字列を入力します。
{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
セルフマネージド型 Apache Kafka では、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。文字列が「error」を含むメッセージを無視するとします。FilterCriteria
オブジェクトは次のようになります。
{
"Filters": [
{
"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"
}
]
}
以下は、わかりやすくするためにプレーン JSON で展開したフィルターの Pattern
の値を記載しています。
{
"value": [
{
"anything-but": [ "error" ]
}
]
}
コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。
コンソールを使用してこのフィルターを追加するには、イベントソースマッピングへのフィルター条件のアタッチ (コンソール) の指示に従って [フィルター条件] に次の文字列を入力します。
{ "value" : [ { "anything-but": [ "error" ] } ] }
セルフマネージド型 Apache Kafka メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Kafka のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が FilterCriteria
形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。
着信メッセージの形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション |
---|---|---|
プレーン文字列 |
プレーン文字列 |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
プレーン文字列 |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
プレーン文字列 |
有効な JSON |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
プレーン文字列 |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
データプロパティのフィルターパターンがない |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |
有効な JSON |
有効な JSON |
Lambda がフィルター条件に基づいてフィルタリングを実行します。 |
UTF-8 以外でエンコードされた文字 |
JSON、プレーン文字列、またはパターンなし |
Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。 |