Amazon MSK で Lambda を使用する - AWS Lambda

Amazon MSK で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon Managed Streaming for Apache Kafka (Amazon MSK) は、Apache Kafka を使ってストリーミングデータを処理するアプリケーションを、構築および実行することを可能にするフルマネージドサービスです。Amazon MSK は、Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。また、Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにより簡単にアプリケーションを設定することができます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、イベントソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。Lambda は、イベントソースからの新しいメッセージを内部的にポーリングした後、ターゲットの Lambda 関数を同期的に呼び出します。Lambda はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは設定可能です (デフォルトでは 100 メッセージ)。詳細については、「バッチ処理動作」を参照してください。

デフォルトでは、Lambda は Amazon MSK イベントソースマッピングのイベントポーラーの数を自動スケーリングします。Amazon MSK イベントソースマッピングのスループットを最適化するには、プロビジョンドモードを設定します。プロビジョンドモードでは、イベントソースマッピングに割り当てられるイベントポーラーの最小数と最大数を定義できます。これにより、イベントソースマッピングが予期しないメッセージスパイクを処理する能力を向上させることができます。詳細については、「プロビジョンドモード」を参照してください。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Amazon MSK をイベントソースとして設定する方法の例については、AWS Compute Blog の Using Amazon MSK as an event source for AWS Lambdaを参照してください。完全なチュートリアルについては、「Amazon MSK Labs」(Amazon MSK ラボ) の「Amazon MSK Lambda Integration」(Amazon MSK の Lambda 統合) を参照してください。

イベントの例

Lambda は、関数を呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }