翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Kafka
Apache Kafka (Kafka) アクションは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、Confluent Cloud
注記
このトピックでは、Apache Kafka プラットフォームおよび関連概念について精通していることを前提としています。Apache Kafka の詳細については、「Apache Kafka
要件
このルールアクションには、以下の要件があります。
-
が 、
ec2:CreateNetworkInterface
、ec2:DescribeNetworkInterfaces
、、、、ec2:DescribeVpcAttribute
、およびec2:DescribeSecurityGroups
オペレーションを実行するために引き受け AWS IoT ることができるec2:CreateNetworkInterfacePermission
ec2:DeleteNetworkInterface
ec2:DescribeSubnets
ec2:DescribeVpcs
IAMロール。このロールは、Kafka ブローカーに到達するために、Amazon Virtual Private Cloud への伸縮自在なネットワークインターフェイスを作成および管理します。詳細については、「AWS IoT ルールに必要なアクセス許可の付与」を参照してください。AWS IoT コンソールでは、 がこのルールアクションを実行することを許可 AWS IoT Core するロールを選択または作成できます。
ネットワークインターフェイスの詳細については、「Amazon EC2ユーザーガイド」の「Elastic Network Interface」を参照してください。
指定したロールにアタッチされるポリシーは次の例のようになります。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
AWS Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、 が
secretsmanager:GetSecretValue
およびsecretsmanager:DescribeSecret
オペレーションを実行するために引き受け AWS IoT Core ることができる IAMロールを作成する必要があります。指定したロールにアタッチされるポリシーは次の例のようになります。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] } -
Amazon Virtual Private Cloud (Amazon ) 内で Apache Kafka クラスターを実行できますVPC。からパブリック Kafka クラスター AWS IoT にメッセージを転送するには、Amazon VPC送信先を作成し、サブネットで NATゲートウェイを使用する必要があります。 AWS IoT ルールエンジンは、VPC宛先にリストされている各サブネットにネットワークインターフェイスを作成し、トラフィックを に直接ルーティングしますVPC。VPC 送信先を作成すると、 AWS IoT ルールエンジンは自動的にVPCルールアクションを作成します。VPC ルールアクションの詳細については、「」を参照してください仮想プライベートクラウド (VPC) の送信先。
-
カスタマーマネージド AWS KMS key (KMS キー) を使用して保管中のデータを暗号化する場合、サービスには発信者に代わってKMSキーを使用するアクセス許可が必要です。詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「Amazon MSK暗号化」を参照してください。
パラメータ
このアクションで AWS IoT ルールを作成するときは、次の情報を指定する必要があります。
- destinationArn
-
VPC 送信先の Amazon リソースネーム (ARN)。VPC 送信先の作成については、「」を参照してください仮想プライベートクラウド (VPC) の送信先。
- トピック
-
Kafka のブローカーに送信されるメッセージの Kafka のトピック。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- キー (オプション)
-
Kafka のメッセージキー
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- ヘッダー (オプション)
-
指定した Kafka ヘッダーのリスト。各ヘッダーは、Kafka アクションを作成するときに指定できるキーと値のペア (1 つのキーと 1 つの値) です。これらのヘッダーを使用して、メッセージペイロードを変更せずに IoT クライアントからダウンストリーム Kafka クラスターにデータをルーティングできます。
このフィールドは、置換テンプレートを使用して置換できます。インラインルールの関数を Kafka Action のヘッダーで代替テンプレートとして渡す方法については、「例」を参照してください。詳細については、「置換テンプレート」を参照してください。
注記
バイナリ形式のヘッダーはサポートされていません。
- パーティション (オプション)
-
Kafka のメッセージパーティション。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- clientProperties
-
Apache Kafka プロデューサークライアントのプロパティを定義するオブジェクト。
- acks (オプション)
-
リクエストが完了したとみなされる前に、プロデューサーがサーバーに受信することを求める確認応答の数。
値として 0 を指定すると、プロデューサーはサーバーからの確認応答を待機しなくなります。サーバーがメッセージを受信しない場合、プロデューサーはメッセージの送信を再試行しません。
有効な値は、
-1
、0
、1
、all
です。デフォルト値は1
です。 - bootstrap.servers
-
Kafka クラスターへの初期接続を確立するために使用されるホストとポートのペア (
host1:port1
、host2:port2
など) のリスト。 - compression.type (optional)
-
プロデューサーによって生成されるすべてのデータの圧縮タイプ。
有効な値:
none
、gzip
、snappy
、lz4
、zstd
。デフォルト値はnone
です。 - security.protocol
-
Kafka ブローカーにアタッチするために使用されるセキュリティプロトコル。
有効な値:
SSL
、SASL_SSL
。デフォルト値はSSL
です。 - key.serializer
-
ProducerRecord
で提供するキーオブジェクトをバイトに変換する方法を指定します。有効な値:
StringSerializer
。 - value.serializer
-
ProducerRecord
で提供する値オブジェクトをバイトに変換する方法を指定します。有効な値:
ByteBufferSerializer
。 - ssl.truststore
-
base64 形式のトラストストアファイル、または AWS Secrets Manager 内のトラストストアファイルの場所。トラストストアが Amazon 認証機関 (CA) によって信頼されている場合は、この値は必須ではありません。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、
get_secret
SQL関数を使用してこのフィールドの値を取得できます。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret
SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。トラストストアがファイル形式の場合は、SecretBinary
パラメータを使用します。トラストストアが文字列の形式である場合は、SecretString
パラメータを使用します。この値の最大サイズは 65 KB です。
- ssl.truststore.password
-
信頼ストアのパスワード。この値は、トラストストアのパスワードを作成した場合にのみ必要です。
- ssl.keystore
-
キーストアファイル。
security.protocol
の値としてSSL
を指定する場合、この値は必須です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret
SQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret
SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。SecretBinary
パラメータを使用します。 - ssl.keystore.password
-
キーストアファイルのストアパスワード。
ssl.keystore
の値を指定している場合、この値は必須です。このフィールドの値はプレーンテキストにすることができます。このフィールドは、代替テンプレートもサポートします。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret
SQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret
SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。SecretString
パラメータを使用します。 - ssl.key.password
-
キーストアファイル内のプライベートキーのパスワード。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret
SQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret
SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。SecretString
パラメータを使用します。 - sasl.mechanism
-
Kafka のブローカーに接続するために使用されるセキュリティメカニズム。この値は、
security.protocol
のSASL_SSL
を指定する場合に必要です。有効な値:
PLAIN
、SCRAM-SHA-512
、GSSAPI
。注記
SCRAM-SHA-512
は、cn-north-1、cn-northwest-1、 us-gov-east-1、 us-gov-west-1 の各リージョンでサポートされている唯一のセキュリティメカニズムです。 - sasl.plain.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のPLAIN
を指定する場合に必要です。 - sasl.plain.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のPLAIN
を指定する場合に必要です。 - sasl.scram.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のSCRAM-SHA-512
を指定する場合に必要です。 - sasl.scram.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のSCRAM-SHA-512
を指定する場合に必要です。 - sasl.kerberos.keytab
-
Secrets Manager の Kerberos 認証用のキータブファイル。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のGSSAPI
を指定する場合に必要です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret
SQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret
SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。SecretBinary
パラメータを使用します。 - sasl.kerberos.service.name
-
Apache Kafka が実行される Kerberos プリンシパル名。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のGSSAPI
を指定する場合に必要です。 - sasl.kerberos.krb5.kdc
-
Apache Kafka プロデューサークライアントが接続するキーディストリビューションセンター (KDC) のホスト名。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のGSSAPI
を指定する場合に必要です。 - sasl.kerberos.krb5.realm
-
Apache Kafka プロデューサークライアントが接続する領域。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のGSSAPI
を指定する場合に必要です。 - sasl.kerberos.principal
-
Kerberos 対応サービスにアクセスするためのチケットを Kerberos が割り当てることができる一意の Kerberos ID。この値は、
security.protocol
のSASL_SSL
およびsasl.mechanism
のGSSAPI
を指定する場合に必要です。
例
次のJSON例では、 AWS IoT ルールで Apache Kafka アクションを定義します。次の例では、 sourceIp() インライン関数を Kafka アクションヘッダーの置換テンプレートとして渡します。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Kerberos セットアップに関する重要な注意事項
-
キー分散センター (KDC) は、ターゲット 内のプライベートドメインネームシステム (DNS) を介して解決可能である必要がありますVPC。考えられる方法の 1 つは、KDCDNSエントリをプライベートホストゾーンに追加することです。このアプローチの詳細については、「プライベートホストゾーンの使用」を参照してください。
-
各 でDNS解像度が有効になっていVPCる必要があります。詳細については、「 DNSで を使用するVPC」を参照してください。
-
VPC 送信先のネットワークインターフェイスセキュリティグループとインスタンスレベルのセキュリティグループは、次のポートVPCの 内からのトラフィックを許可する必要があります。
-
TCP ブートストラップブローカーリスナーポートの トラフィック (多くの場合 9092 ですが、9000~9100 の範囲内である必要があります)
-
TCP のポート 88 の および UDPトラフィック KDC
-
-
SCRAM-SHA-512
は、cn-north-1、cn-northwest-1、 us-gov-east-1、 us-gov-west-1 の各リージョンでサポートされている唯一のセキュリティメカニズムです。