Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

EventBridge Pipes のソースとして Apache Kafka ストリームを使用しています。

フォーカスモード
EventBridge Pipes のソースとして Apache Kafka ストリームを使用しています。 - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。ユーザーは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、またはセルフマネージドの Apache Kafka クラスターを使用できます。 AWS 用語では、セルフマネージドクラスターとは、 によってホストされていない Apache Kafka クラスターを指します AWS。これには、自分で管理するクラスターと、Confluent CloudCloudKarafkaRedpanda などのサードパーティープロバイダーがホストするクラスターの両方が含まれます。

クラスターの他の AWS ホスティングオプションの詳細については、 AWS ビッグデータブログの「 で Apache Kafka を実行するためのベストプラクティス AWS」を参照してください。

ソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。EventBridge は、ソースからの新しいメッセージを内部的にポーリングした後、ターゲットを同期的に呼び出します。EventBridge はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafkaa ベースのソースの場合、EventBridge はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。

EventBridge は、パイプを呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Apache Kafka トピックと Apache Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

[ { "eventSource": "SelfManagedKafka", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

Apache Kafka クラスター認証

EventBridge Pipes は、セルフマネージド Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Apache Kafka クラスターを設定しておいてください。Apache Kafka セキュリティの詳細については、Apache Kafka ドキュメントの「Security」(セキュリティ) セクションを参照してください。

VPC アクセス

VPC 内の Apache Kafka ユーザーのみが Apache Kafka ブローカーにアクセスできるセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ソースで Amazon Virtual Private Cloud (Amazon VPC) を設定する必要があります。

SASL/SCRAM 認証

EventBridge Pipes は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。EventBridge Pipes は、暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、「RFC 5802」を参照してください。

EventBridge Pipes は TLS 暗号化による SASL/PLAIN 認証をサポートしています。SASL/PLAIN 認証では、EventBridge Pipes が認証情報をクリアテキスト (暗号化されていない状態) としてサーバーに送信します。

SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Managerに保存します。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka の場合、Lambda はクライアントとして機能します。クライアント証明書を (Secrets Manager のシークレットとして) 設定することにより、Apache Kafka ブローカーで EventBridge Pipes を認証できるようにします。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。

Apache Kafka クラスターは、EventBridge Pipes にサーバー証明書を送信し、EventBridge Pipes で Apache Kafka ブローカーを認証します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、EventBridge Pipes 信頼トストア内の CA で署名する必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。EventBridge Pipes はルート証明書を使用して Apache Kafka ブローカーを検証します。

mTLS の詳細については、「Introducing mutual TLS authentication for Amazon MSK as an source」(ソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

クライアント証明書シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

EventBridge Pipes は、PBES1 (PBES2 ではありません) プライベートキー暗号化アルゴリズムをサポートしています。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

サーバルート CA 証明書シークレットの設定

このシークレットは、Apache Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Apache Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"

ネットワーク構成

プライベート VPC 接続を使用するセルフマネージド Apache Kafka 環境を使用している場合、EventBridge は Apache Kafka ブローカーに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。

  • Apache Kafka クラスターの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス (アウトバウンド) を使用できます。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPC エンドポイントを使用する必要があります (以下で説明)。

  • EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink、パブリックインターネットを経由することなく、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。Pipes を使用すると、インターネットゲートウェイのデプロイ、ファイアウォールルールの設定、プロキシサーバーの設定を必要とせずに、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネット内の Kafka クラスターからの配信をサポートすることもできます。

    VPC エンドポイントを設定するには、「AWS PrivateLink ユーザーガイド」の「VPC エンドポイントの作成」を参照してください。サービス名では、com.amazonaws.region.pipes-data を選択します。

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキスト用 9092

    • TLS 用 9094

    • SASL 用 9096

    • IAM 用 9098

Apache Kafka ソースを使用したコンシューマーの自動スケーリング

初めて Apache Kafka ソースを作成するときは、EventBridge が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、EventBridge は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

EventBridge は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは EventBridge で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、EventBridge はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

実行するためのベストプラクティターゲットのがオーバーロードすると、EventBridge はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。

このページの内容

プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.