翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。ユーザーは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、またはセルフマネージドの Apache Kafka クラスターを使用できます。 AWS 用語では、セルフマネージドクラスターとは、 によってホストされていない Apache Kafka クラスターを指します AWS。これには、自分で管理するクラスターと、Confluent
Cloud
クラスターの他の AWS ホスティングオプションの詳細については、 AWS ビッグデータブログの「 で Apache Kafka を実行するためのベストプラクティス AWS
ソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis を使用する場合と同様に動作します。EventBridge は、ソースからの新しいメッセージを内部的にポーリングした後、ターゲットを同期的に呼び出します。EventBridge はメッセージをバッチで読み込み、それらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。
Apache Kafkaa ベースのソースの場合、EventBridge はバッチ処理ウィンドウやバッチサイズなどの制御パラメータの処理をサポートします。
EventBridge は、パイプを呼び出すとき、イベントパラメータ内のメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Apache Kafka トピックと Apache Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。
イベントの例
次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。
[
{
"eventSource": "SelfManagedKafka",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"eventSourceKey": "mytopic-0",
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
"value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers": [
{
"headerKey": [
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
Apache Kafka クラスター認証
EventBridge Pipes は、セルフマネージド Apache Kafka クラスターで認証するための方法をいくつかサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Apache Kafka クラスターを設定しておいてください。Apache Kafka セキュリティの詳細については、Apache Kafka ドキュメントの「Security
VPC アクセス
VPC 内の Apache Kafka ユーザーのみが Apache Kafka ブローカーにアクセスできるセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ソースで Amazon Virtual Private Cloud (Amazon VPC) を設定する必要があります。
SASL/SCRAM 認証
EventBridge Pipes は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。EventBridge Pipes は、暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、「RFC 5802
EventBridge Pipes は TLS 暗号化による SASL/PLAIN 認証をサポートしています。SASL/PLAIN 認証では、EventBridge Pipes が認証情報をクリアテキスト (暗号化されていない状態) としてサーバーに送信します。
SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Managerに保存します。
相互 TLS 認証
相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。
セルフマネージド Apache Kafka の場合、Lambda はクライアントとして機能します。クライアント証明書を (Secrets Manager のシークレットとして) 設定することにより、Apache Kafka ブローカーで EventBridge Pipes を認証できるようにします。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。
Apache Kafka クラスターは、EventBridge Pipes にサーバー証明書を送信し、EventBridge Pipes で Apache Kafka ブローカーを認証します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、EventBridge Pipes 信頼トストア内の CA で署名する必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。EventBridge Pipes はルート証明書を使用して Apache Kafka ブローカーを検証します。
mTLS の詳細については、「Introducing mutual TLS authentication for Amazon MSK as an source
クライアント証明書シークレットの設定
CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。
注記
EventBridge Pipes は、PBES1
証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。
-----BEGIN CERTIFICATE-----
<certificate contents>
-----END CERTIFICATE-----
Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。
プライベートキーは、以下の構造を使用した PKCS #8
-----BEGIN PRIVATE KEY-----
<private key contents>
-----END PRIVATE KEY-----
暗号化されたプライベートキーには、以下の構造を使用します。
-----BEGIN ENCRYPTED PRIVATE KEY-----
<private key contents>
-----END ENCRYPTED PRIVATE KEY-----
以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。
{
"privateKeyPassword": "testpassword",
"certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
"privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
サーバルート CA 証明書シークレットの設定
このシークレットは、Apache Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。
サーバールート CA 証明書シークレットには、PEM 形式の Apache Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。
{
"certificate": "-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
-----END CERTIFICATE-----"
ネットワーク構成
プライベート VPC 接続を使用するセルフマネージド Apache Kafka 環境を使用している場合、EventBridge は Apache Kafka ブローカーに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる必要があります。
-
Apache Kafka クラスターの VPC にアクセスするため、EventBridge はソースのサブネットへのインターネットアクセス (アウトバウンド) を使用できます。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。パブリックサブネットの場合は、VPC エンドポイントを使用する必要があります (以下で説明)。
-
EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink
、パブリックインターネットを経由することなく、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。Pipes を使用すると、インターネットゲートウェイのデプロイ、ファイアウォールルールの設定、プロキシサーバーの設定を必要とせずに、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。VPC エンドポイントを使用して、パブリックサブネット内の Kafka クラスターからの配信をサポートすることもできます。 VPC エンドポイントを設定するには、「AWS PrivateLink ユーザーガイド」の「VPC エンドポイントの作成」を参照してください。サービス名では、
com.amazonaws.
を選択します。region
.pipes-data
Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。
-
インバウンドルール – ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。
-
アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソース用に指定されたセキュリティグループに対して Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。
ブローカーポートには以下が含まれます。
プレーンテキスト用 9092
TLS 用 9094
SASL 用 9096
IAM 用 9098
Apache Kafka ソースを使用したコンシューマーの自動スケーリング
初めて Apache Kafka ソースを作成するときは、EventBridge が Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、EventBridge は、ワークロードに基づいてコンシューマーの数を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。
EventBridge は、1 分間隔でトピック内のすべてのパーティションのコンシューマーオフセット遅延を評価します。遅延が大きすぎる場合、パーティションは EventBridge で処理可能な速度よりも速い速度でメッセージを受信します。必要に応じて、EventBridge はトピックにコンシューマーを追加するか、またはトピックからコンシューマーを削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。
実行するためのベストプラクティターゲットのがオーバーロードすると、EventBridge はコンシューマーの数を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。