Amazon Managed Streaming for Apache Kafka のトピックを EventBridge Pipes のソースとして使用します。 - Amazon EventBridge

Amazon Managed Streaming for Apache Kafka のトピックを EventBridge Pipes のソースとして使用します。

EventBridge Pipes を使用して、Amazon Managed Streaming for Apache Kafka (Amazon MSK) トピックからレコードを受け取ることができます。オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。Amazon MSK に固有の設定があり、パイプをセットアップするときに選択できます。EventBridge Pipes は、データを送信先に送信するときに、メッセージブローカーのレコードの順序を維持します。

Amazon MSK は、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できるようにするフルマネージドサービスです。Amazon MSK は、Apache Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。Amazon MSK を使用すると、AWS Identity and Access Management (IAM) を使って複数のアベイラビリティーゾーンやセキュリティ向けにアプリケーションを設定することができます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

Amazon MSK は、ソースとして Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。EventBridge は、ソースからの新しいメッセージを内部的にポーリングした後、ターゲットを同期的に呼び出します。EventBridge はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafkaa ベースのソースの場合、EventBridge はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。

EventBridge は、パーティションごとにメッセージを順番に読み込みます。EventBridge は各バッチを処理した後、そのバッチ内のメッセージのオフセットをコミットします。パイプのターゲットがバッチ内のいずれかのメッセージに対してエラーを返すとEventBridge は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

EventBridge は、ターゲットを呼び出すとき、イベント内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

ポーリングとストリームの開始位置

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。

  • パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。

  • ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、LATEST をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON として指定します。

MSK クラスター認証

EventBridge には、Amazon MSK クラスターにアクセスする、レコードを取得する、およびその他タスクを実行するための許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。どの認証方法がいつ使用されるかについての詳細は、「EventBridge でのブートストラップブローカーの選択方法」を参照してください。

非認証アクセス

開発には非認証アクセスのみを使用することをお勧めします。非認証アクセスは、クラスターの IAM ロールベース認証が無効になっている場合にのみ機能します。

SASL/SCRAM 認証

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。EventBridge をクラスターに接続するには、認証情報 (サインイン認証情報) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Manager を使用したユーザーネームとパスワードの認証」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

IAM ロールベースの認証

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。MSK クラスターで IAM 認証がアクティブ化されており、認証用のシークレットを指定しない場合、EventBridg はデフォルトで自動的に IAM 認証を使用します。IAM ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「IAM access control」(IAM アクセスコントロール) を参照してください。

EventBridge が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、パイプの実行ロールに以下の許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Amazon MSK Kafka actions」(Amazon MSK Kafka アクション) を参照してください。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、EventBridge がクライアントとして機能します。MSK クラスターのブローカーで EventBridge を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。MSK クラスターは、EventBridge でブローカーを認証するために EventBridge にサーバー証明書を送信します。さーばー証明書は、AWS トラストストア内の CA によって署名される必要があります。

Amazon MSK は自己署名のサーバー証明書をサポートしません。これは、Amazon MSK のすべてのブローカーが、EventBridge がデフォルトで信頼する Amazon Trust Services CA によって署名されたパブリック証明書を使用するためです。

Amazon MSK のための mTLS に関する詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Mutual TLS Authentication」(相互 TLS 認証) を参照してください。

mTLS シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

EventBridge は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

EventBridge でのブートストラップブローカーの選択方法

EventBridge は、クラスターで使用可能な認証方法、および認証用のシークレットが提供されているかどうかに基づき、ブートストラップブローカーを選択します。mTLS または SASL/SCRAM のシークレットを指定すると、EventBridge は自動的にその認証方法を選択します。シークレットを指定しない場合、EventBridge は、クラスターでアクティブ化されている中で、最も強力な認証方法を選択します。以下は、EventBridge によるブローカー選択の優先度を、最も強力な認証から弱い認証の順に示したものです。

  • mTLS (mTLS 用のシークレットを提供)

  • SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)

  • SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)

  • 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)

  • プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

注記

EventBridge から最も安全なブローカータイプへの接続ができない場合でも、 は別の (安全性の低い) ブローカータイプへの接続を試行しません。安全性の低いブローカータイプを EventBridge に選択させたい場合は、クラスターが使用している、より強力な認証方法をすべて無効にします。

ネットワーク構成

EventBridge は、Amazon MSK クラスターに関連付けられている Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。

  • Amazon MSK クラスターの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス·(アウトバウンド)·を使用できます。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPC エンドポイントを使用する必要があります (以下で説明)。

  • EventBridge Pipes は AWS PrivateLink を介したイベント配信もサポートしているため、パブリックインターネットを経由することなく、Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースからパイプターゲットにイベントを送信できます。パイプを使用して、インターネットゲートウェイのデプロイ、ファイアウォールルールの設定、プロキシサーバーの設定を必要とせずに、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド型 Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネット内の Kafka クラスターからの配信をサポートすることもできます。

    VPC エンドポイントを設定するには、「AWS PrivateLink ユーザーガイド」の「VPC エンドポイントの作成」を参照してください。サービス名では、com.amazonaws.region.pipes-data を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – ソース用に指定されたセキュリティグループに対して Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキスト用 9092

    • TLS 用 9094

    • SASL 用 9096

    • IAM 用 9098

注記

Amazon VPC の設定は、Amazon MSK API を使用して検出できます。セットアップ中に設定する必要はありません。

カスタマイズ可能なコンシューマーグループ ID

Apache Kafka をソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、パイプを結合したい Apache Kafka コンシューマーグループの既存の識別子です。この機能を使用すると、実行中の Apache Kafka レコード処理設定を他のコンシューマーから EventBridge に移行できます。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Apache Kafka はすべてのコンシューマーにメッセージを配信します。言い換えると、EventBridge は Apache Kafka トピックのメッセージをすべて受け取るわけではありません。EventBridge にトピック内のすべてのメッセージを処理させたい場合は、そのコンシューマーグループの他のポーラーをすべてオフにします。

さらに、コンシューマーグループ ID を指定し、Apache Kafka が同じ ID を持つ有効な既存のコンシューマーグループを見つけた場合、EventBridge は、パイプの StartingPosition パラメーターを無視します。代わりに、EventBridge はコンシューマーグループのコミットされたオフセットに従ってレコードの処理を開始します。コンシューマーグループ ID を指定しても、Apache Kafka が既存のコンシューマーグループを見つけられない場合、EventBridge は指定された StartingPosition を使用してソースを設定します。

指定するコンシューマーグループ ID は、すべての Apache Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定してパイプを作成した後は、この値を更新することはできません。

Amazon MSK ソースの Auto Scaling

初めて Amazon MSK ソースを作成するときは、EventBridge が Apache Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、EventBridge は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

EventBridge は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは EventBridge で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、EventBridge はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットのがオーバーロードすると、EventBridge はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得しパイプに送信するメッセージの数が減り、パイプへのワークロードが軽減されます。