Amazon Managed Streaming for Apache Kafka トピックをソースとして使用する場合 - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Managed Streaming for Apache Kafka トピックをソースとして使用する場合

EventBridge Pipes を使用して、Amazon Managed Streaming for Apache Kafka (Amazon MSK) トピックからレコードを受信できます。オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。パイプを設定するときに選択できる Amazon MSK 固有の設定があります。 EventBridge Pipes は、そのデータを宛先に送信するときに、メッセージブローカーからのレコードの順序を維持します。

Amazon MSK は、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できるようにするフルマネージドサービスです。Amazon MSK は、Apache Kafka を実行するクラスターのセットアップ、スケーリング、管理を簡素化します。Amazon MSK では、複数のアベイラビリティーゾーンと AWS Identity and Access Management (IAM) によるセキュリティのためにアプリケーションを設定できます。Amazon MSK は、Kafka の複数のオープンソースバージョンをサポートします。

ソースとしての Amazon MSK は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis の使用と同様に動作します。 EventBridge は、ソースからの新しいメッセージを内部的にポーリングし、ターゲットを同期的に呼び出します。 EventBridge はメッセージをバッチで読み取り、これらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafka ベースのソースの場合、 はバッチ処理ウィンドウやバッチサイズなどの処理コントロールパラメータ EventBridge をサポートします。

EventBridge は、パーティションごとにメッセージを順番に読み取ります。が各バッチ EventBridge を処理すると、そのバッチ内のメッセージのオフセットがコミットされます。パイプのターゲットがバッチ内のいずれかのメッセージに対してエラーを返した場合、 は処理が成功するか、メッセージが期限切れになるまで、メッセージのバッチ全体を EventBridge 再試行します。

EventBridge は、ターゲットを呼び出すときに、イベントのメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Amazon MSK トピックとパーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

ポーリングとストリームの開始位置

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。

  • パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。

  • ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、LATEST をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON として指定します。

MSK クラスター認証

EventBridge には、Amazon MSK クラスターへのアクセス、レコードの取得、その他のタスクの実行のためのアクセス許可が必要です。Amazon MSK は、MSK クラスターへのクライアントアクセスを制御するためのいくつかのオプションをサポートしています。どの認証方法がいつ使用されるかについての詳細は、「がブートストラップブローカー EventBridge を選択する方法」を参照してください。

非認証アクセス

開発には非認証アクセスのみを使用することをお勧めします。非認証アクセスは、クラスターの IAM ロールベース認証が無効になっている場合にのみ機能します。

SASL/SCRAM 認証

Amazon MSK は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。がクラスターに接続する EventBridge には、認証情報 (サインイン認証情報) を AWS Secrets Manager シークレットに保存します。

Secrets Manager の使用に関する詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「AWS Secrets Managerを使用したユーザーネームとパスワードの認証」を参照してください。

Amazon MSK は SASL/PLAIN 認証をサポートしません。

IAM ロールベースの認証

IAM を使用して、MSK クラスターに接続するクライアントのアイデンティを認証することができます。MSK クラスターで IAM 認証がアクティブで、認証用のシークレットを指定しない場合、 EventBridge は自動的にデフォルトで IAM 認証を使用します。IAM ユーザーまたはロールベースのポリシーを作成してデプロイするには、IAM コンソール、または API を使用します。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「IAM access control」(IAM アクセスコントロール) を参照してください。

EventBridge が MSK クラスターに接続し、レコードを読み取り、その他の必要なアクションを実行できるようにするには、パイプの実行ロールに次のアクセス許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

これらの許可は、特定のクラスター、トピック、およびグループにスコープできます。詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Amazon MSK Kafka actions」(Amazon MSK Kafka アクション) を参照してください。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

Amazon MSK の場合、 EventBridge はクライアントとして機能します。MSK クラスター内のブローカー EventBridge で認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。MSK クラスターはサーバー証明書を に送信 EventBridge して、 でブローカーを認証します EventBridge。サーバー証明書は、 AWS 信頼ストアにある CA によって署名される必要があります。

Amazon MSK は自己署名サーバー証明書をサポートしていません。Amazon MSK のすべてのブローカーは、デフォルトで が EventBridge 信頼する Amazon Trust Services CAs によって署名されたパブリック証明書を使用するためです。

Amazon MSK のための mTLS に関する詳細については、「Amazon Managed Streaming for Apache Kafka Developer Guide」(Amazon Managed Streaming for Apache Kafka デベロッパーガイド) の「Mutual TLS Authentication」(相互 TLS 認証) を参照してください。

mTLS シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

EventBridge は、PBES1 (PBES2 はサポートしていません) プライベートキー暗号化アルゴリズムをサポートします。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

がブートストラップブローカー EventBridge を選択する方法

EventBridge は、クラスターで使用可能な認証方法と、認証にシークレットを提供するかどうかに基づいて、ブートストラップブローカーを選択します。mTLS または SASL/SCRAM のシークレットを指定すると、 はその認証方法 EventBridge を自動的に選択します。シークレットを指定しない場合、 EventBridge はクラスターでアクティブな最も強力な認証方法を選択します。がブローカー EventBridge を選択する際の優先度の順序を、最も強力な認証から最も弱い認証まで次に示します。

  • mTLS (mTLS 用のシークレットを提供)

  • SASL/SCRAM (SASL/SCRAM 用のシークレットを提供)

  • SASL IAM (シークレットが提供されておらず、IAM 認証がアクティブ)

  • 非認証の TLS (シークレットが提供されておらず、IAM 認証も非アクティブ)

  • プレーンテキスト (シークレットが提供されておらず、IAM 認証と非認証 TLS の両方が非アクティブ)

注記

が最も安全なブローカータイプに接続 EventBridge できない場合、 は別の (弱い) ブローカータイプに接続しようとしません。より弱いブローカータイプ EventBridge を選択する場合は、クラスターでより強力な認証方法をすべて無効にします。

ネットワーク構成

EventBridge は、Amazon MSK クラスターに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。

  • Amazon MSK クラスターの VPC にアクセスするには、ソースのサブネットにアウトバウンドインターネットアクセス EventBridge を使用できます。パブリックサブネットの場合、これはマネージド NAT ゲートウェイである必要があります。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。

  • EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。パブリックインターネットを経由する必要はありません。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。

    VPC エンドポイントを設定するには、「 ユーザーガイド」の「VPC エンドポイントの作成AWS PrivateLink 」を参照してください。サービス名で、 を選択しますcom.amazonaws.region.pipes-data

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール — ソースに指定されたセキュリティグループの Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソースに指定されたセキュリティグループの Amazon MSK ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキストの場合は 9092

    • TLS の場合は 9094

    • SASL の場合は 9096

    • IAM 用 9098

注記

Amazon VPC の設定は、Amazon MSK API を使用して検出できます。セットアップ中に設定する必要はありません。

カスタマイズ可能なコンシューマーグループ ID

Apache Kafka をソースとして設定する場合、コンシューマーグループIDを指定できます。このコンシューマーグループ ID は、パイプを結合したい Apache Kafka コンシューマーグループの既存の識別子です。この機能を使用して、進行中の Apache Kafka レコード処理設定を他のコンシューマーから に移行できます EventBridge。

コンシューマーグループ ID を指定し、そのコンシューマーグループ内に他のアクティブなポーラーが存在する場合、Apache Kafka はすべてのコンシューマーにメッセージを配信します。つまり、Apache Kafka EventBridge トピックのすべてのメッセージを受信しません。トピック内のすべてのメッセージ EventBridge を処理する場合は、そのコンシューマーグループ内の他のポーラーをオフにします。

さらに、コンシューマーグループ ID を指定し、Apache Kafka が同じ ID を持つ有効な既存のコンシューマーグループを検索すると、 はパイプの StartingPositionパラメータ EventBridge を無視します。代わりに、 はコンシューマーグループのコミットされたオフセットに従ってレコードの処理 EventBridge を開始します。コンシューマーグループ ID を指定し、Apache Kafka が既存のコンシューマーグループを見つけられない場合、 は指定された を使用してソース EventBridge を設定しますStartingPosition

指定するコンシューマーグループ ID は、すべての Apache Kafka イベントソースの中で一意でなければなりません。コンシューマーグループ ID を指定してパイプを作成した後は、この値を更新することはできません。

Amazon MSK ソースの Auto Scaling

最初に Amazon MSK ソースを作成すると、 は Apache Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを EventBridge 割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、 は、ワークロードに基づいてコンシューマーの数 EventBridge を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

1 分間隔で、 はトピック内のすべてのパーティションのコンシューマーオフセットラグ EventBridge を評価します。遅延が高すぎる場合、パーティションはメッセージを処理 EventBridge できるよりも速くメッセージを受信しています。必要に応じて、コンシューマーをトピック EventBridge に追加または削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットが過負荷になっている場合、 はコンシューマーの数 EventBridge を減らします。このアクションにより、コンシューマーが取得しパイプに送信するメッセージの数が減り、パイプへのワークロードが軽減されます。