Pipes のソースとしての Apache Kafka EventBridge ストリーム - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Pipes のソースとしての Apache Kafka EventBridge ストリーム

Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。Amazon Managed Streaming for Apache Kafka (Amazon MSK) またはセルフマネージド Apache Kafka クラスターを使用できます。用語 AWS では、セルフマネージドクラスターとは、 によってホストされていないすべての Apache Kafka クラスターを指します AWS。これには、自分で管理するクラスターと、、、 Confluent Cloud CloudKarafkaなどのサードパーティープロバイダーによってホストされるクラスターの両方が含まれますRedpanda

クラスターの他の AWS ホスティングオプションの詳細については、 AWS ビッグデータブログの「 で Apache Kafka を実行するためのベストプラクティス AWS」を参照してください。

ソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis の使用と同様に動作します。 EventBridge は、ソースからの新しいメッセージを内部的にポーリングし、ターゲットを同期的に呼び出します。 EventBridge はメッセージをバッチで読み取り、これらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafka ベースのソースの場合、 はバッチ処理ウィンドウやバッチサイズなどの処理制御パラメータ EventBridge をサポートします。

EventBridge は、パイプを呼び出すときにイベントパラメータにメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Apache Kafka トピックと Apache Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes でのイベントフィルタリング」を参照してください。

[ { "eventSource": "SelfManagedKafka", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

Apache Kafka クラスター認証

EventBridge Pipes は、セルフマネージド Apache Kafka クラスターで認証するためのいくつかの方法をサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Apache Kafka クラスターを設定しておいてください。Apache Kafka セキュリティの詳細については、Apache Kafka ドキュメントの「Security」(セキュリティ) セクションを参照してください。

VPC アクセス

内の Apache Kafka ユーザーのみが Apache Kafka ブローカーにアクセスできるセルフマネージド Apache Kafka 環境を使用している場合VPCは、Apache Kafka ソースで Amazon Virtual Private Cloud (Amazon VPC) を設定する必要があります。

SASL/SCRAM 認証

EventBridge Pipes は、Transport Layer Security (SCRAM) 暗号化による Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/TLS) 認証をサポートしています。 EventBridge Pipes は暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、RFC「5802」を参照してください。

EventBridge Pipes は、TLS暗号化による SASL/PLAIN 認証をサポートしています。SASL/PLAIN 認証では、 EventBridge Pipes は認証情報をクリアテキスト (暗号化されていない) としてサーバーに送信します。

SASL 認証のために、サインイン認証情報をシークレットとして に保存します AWS Secrets Manager。

相互TLS認証

相互 TLS (m TLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka では、 EventBridge Pipes がクライアントとして機能します。Apache Kafka ブローカーで EventBridge Pipes を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。

Apache Kafka クラスターは、サーバー証明書を EventBridge Pipes に送信して、Pipes で Apache Kafka EventBridge ブローカーを認証します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、 EventBridge Pipes トラストストアにある CA によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバールート CA 証明書を (Secrets Manager のシークレットとして) 設定します。 EventBridge Pipes はルート証明書を使用して Apache Kafka ブローカーを検証します。

m の詳細についてはTLS、「Amazon の相互TLS認証をソース MSK として導入する」を参照してください。

クライアント証明書シークレットの設定

CLIENT_CERTIFICATE_TLS_AUTH シークレットには、証明書フィールドとプライベートキーフィールドが必要です。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーはどちらも PEM形式である必要があります。

注記

EventBridge Pipes は、プライベートキー暗号化アルゴリズム PBES1 (ただし、 はサポートしていませんPBES2) をサポートしています。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは PKCS #8 形式で、次の構造である必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

次の例は、暗号化されたプライベートキーを使用した mTLS 認証用のシークレットの内容を示しています。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

サーバルート CA 証明書シークレットの設定

Apache Kafka ブローカーがプライベート CA によって署名された証明書でTLS暗号化を使用する場合、このシークレットを作成します。TLS 暗号化はVPC、、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、Apache Kafka ブローカーのルート CA 証明書を PEM形式で含む フィールドが必要です。以下は、このシークレットの構造を示す例です。

{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"

ネットワーク構成

プライベートVPC接続を使用するセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ブローカーに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる EventBridge 必要があります。

  • Apache Kafka クラスターVPCの にアクセスするには、ソースのサブネットにアウトバウンドインターネットアクセス EventBridge を使用できます。プライベートサブネットの場合、NATゲートウェイでも独自の でもかまいませんNAT。にパブリック IP アドレスNATがあり、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPCエンドポイントを使用する必要があります (以下で説明)。

  • EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。パブリックインターネットを経由する必要はありません。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネットの Kafka クラスターからの配信をサポートすることもできます。

    VPC エンドポイントを設定するには、「 ユーザーガイド」のVPC「エンドポイントの作成AWS PrivateLink 」を参照してください。サービス名で、 を選択しますcom.amazonaws.region.pipes-data

Amazon VPC セキュリティグループを次のルールで設定します (最低)。

  • インバウンドルール — ソースに指定されたセキュリティグループの Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソースに指定されたセキュリティグループの Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキストの場合は 9092

    • の 9094 TLS

    • の 9096 SASL

    • の 9098 IAM

Apache Kafka ソースによるコンシューマーの自動スケーリング

最初に Apache Kafka ソースを作成すると、 は Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを EventBridge 割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、 は、ワークロードに基づいてコンシューマーの数 EventBridge を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

1 分間隔で、 はトピック内のすべてのパーティションのコンシューマーオフセットラグ EventBridge を評価します。遅延が高すぎる場合、パーティションはメッセージを処理 EventBridge できるよりも速くメッセージを受信しています。必要に応じて、コンシューマーをトピック EventBridge に追加または削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットが過負荷の場合、 はコンシューマーの数 EventBridge を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。